diff --git a/util/workqueue/delaying_queue.go b/util/workqueue/delaying_queue.go index 26eacc2b..1fd9ad5c 100644 --- a/util/workqueue/delaying_queue.go +++ b/util/workqueue/delaying_queue.go @@ -36,8 +36,8 @@ type DelayingInterface interface { // NewDelayingQueue constructs a new workqueue with delayed queuing ability. // NewDelayingQueue does not emit metrics. For use with a MetricsProvider, please use // NewNamedDelayingQueue instead. -func NewDelayingQueue() DelayingInterface { - return NewDelayingQueueWithCustomClock(clock.RealClock{}, "") +func NewDelayingQueue(opts ...QueueOption) DelayingInterface { + return NewDelayingQueueWithCustomClock(clock.RealClock{}, "", opts...) } // NewDelayingQueueWithCustomQueue constructs a new workqueue with ability to @@ -47,14 +47,14 @@ func NewDelayingQueueWithCustomQueue(q Interface, name string) DelayingInterface } // NewNamedDelayingQueue constructs a new named workqueue with delayed queuing ability -func NewNamedDelayingQueue(name string) DelayingInterface { +func NewNamedDelayingQueue(name string, opts ...QueueOption) DelayingInterface { return NewDelayingQueueWithCustomClock(clock.RealClock{}, name) } // NewDelayingQueueWithCustomClock constructs a new named workqueue // with ability to inject real or fake clock for testing purposes -func NewDelayingQueueWithCustomClock(clock clock.WithTicker, name string) DelayingInterface { - return newDelayingQueue(clock, NewNamed(name), name) +func NewDelayingQueueWithCustomClock(clock clock.WithTicker, name string, opts ...QueueOption) DelayingInterface { + return newDelayingQueue(clock, NewNamed(name, opts...), name) } func newDelayingQueue(clock clock.WithTicker, q Interface, name string) *delayingType { diff --git a/util/workqueue/metrics_test.go b/util/workqueue/metrics_test.go index bfc9c1a3..a35be035 100644 --- a/util/workqueue/metrics_test.go +++ b/util/workqueue/metrics_test.go @@ -276,3 +276,113 @@ func TestMetrics(t *testing.T) { t.Errorf("expected %v, got %v", e, a) } } + +func TestPerQueueMetrics(t *testing.T) { + // TODO(austince): refactor TestMetrics to make this test just another case + mp := testMetricsProvider{} + t0 := time.Unix(0, 0) + c := testingclock.NewFakeClock(t0) + + q := newNamedQueueWithCustomClock(c, "test", time.Millisecond, WithMetricsProvider(&mp)) + defer q.ShutDown() + for !c.HasWaiters() { + // Wait for the go routine to call NewTicker() + time.Sleep(time.Millisecond) + } + + q.Add("foo") + if e, a := 1.0, mp.adds.gaugeValue(); e != a { + t.Errorf("expected %v, got %v", e, a) + } + + if e, a := 1.0, mp.depth.gaugeValue(); e != a { + t.Errorf("expected %v, got %v", e, a) + } + + c.Step(50 * time.Microsecond) + + // Start processing + i, _ := q.Get() + if i != "foo" { + t.Errorf("Expected %v, got %v", "foo", i) + } + + if e, a := 5e-05, mp.latency.observationValue(); e != a { + t.Errorf("expected %v, got %v", e, a) + } + if e, a := 1, mp.latency.observationCount(); e != a { + t.Errorf("expected %v, got %v", e, a) + } + + // Add it back while processing; multiple adds of the same item are + // de-duped. + q.Add(i) + q.Add(i) + q.Add(i) + q.Add(i) + q.Add(i) + if e, a := 2.0, mp.adds.gaugeValue(); e != a { + t.Errorf("expected %v, got %v", e, a) + } + // One thing remains in the queue + if e, a := 1.0, mp.depth.gaugeValue(); e != a { + t.Errorf("expected %v, got %v", e, a) + } + + c.Step(25 * time.Microsecond) + + // Finish it up + q.Done(i) + + if e, a := 2.5e-05, mp.duration.observationValue(); e != a { + t.Errorf("expected %v, got %v", e, a) + } + if e, a := 1, mp.duration.observationCount(); e != a { + t.Errorf("expected %v, got %v", e, a) + } + + // One thing remains in the queue + if e, a := 1.0, mp.depth.gaugeValue(); e != a { + t.Errorf("expected %v, got %v", e, a) + } + + // It should be back on the queue + i, _ = q.Get() + if i != "foo" { + t.Errorf("Expected %v, got %v", "foo", i) + } + + if e, a := 2.5e-05, mp.latency.observationValue(); e != a { + t.Errorf("expected %v, got %v", e, a) + } + if e, a := 2, mp.latency.observationCount(); e != a { + t.Errorf("expected %v, got %v", e, a) + } + + // use a channel to ensure we don't look at the metric before it's + // been set. + ch := make(chan struct{}, 1) + longestCh := make(chan struct{}, 1) + mp.unfinished.notifyCh = ch + mp.longest.notifyCh = longestCh + c.Step(time.Millisecond) + <-ch + mp.unfinished.notifyCh = nil + if e, a := .001, mp.unfinished.gaugeValue(); e != a { + t.Errorf("expected %v, got %v", e, a) + } + <-longestCh + mp.longest.notifyCh = nil + if e, a := .001, mp.longest.gaugeValue(); e != a { + t.Errorf("expected %v, got %v", e, a) + } + + // Finish that one up + q.Done(i) + if e, a := .001, mp.duration.observationValue(); e != a { + t.Errorf("expected %v, got %v", e, a) + } + if e, a := 2, mp.duration.observationCount(); e != a { + t.Errorf("expected %v, got %v", e, a) + } +} diff --git a/util/workqueue/options.go b/util/workqueue/options.go new file mode 100644 index 00000000..15c2cdd4 --- /dev/null +++ b/util/workqueue/options.go @@ -0,0 +1,35 @@ +package workqueue + +type QueueConfig struct { + metricsProvider MetricsProvider +} + +// QueueOption is an interface for applying queue configuration options. +type QueueOption interface { + apply(*QueueConfig) +} + +type optionFunc func(*QueueConfig) + +func (fn optionFunc) apply(config *QueueConfig) { + fn(config) +} + +var _ QueueOption = optionFunc(nil) + +// NewConfig creates a new QueueConfig and applies all the given options. +func NewConfig(opts ...QueueOption) *QueueConfig { + config := &QueueConfig{} + for _, o := range opts { + o.apply(config) + } + return config +} + +// WithMetricsProvider allows specifying a metrics provider to use for the queue +// instead of the global provider. +func WithMetricsProvider(provider MetricsProvider) QueueOption { + return optionFunc(func(config *QueueConfig) { + config.metricsProvider = provider + }) +} diff --git a/util/workqueue/queue.go b/util/workqueue/queue.go index 6f706326..5c2890b3 100644 --- a/util/workqueue/queue.go +++ b/util/workqueue/queue.go @@ -34,16 +34,33 @@ type Interface interface { } // New constructs a new work queue (see the package comment). -func New() *Type { - return NewNamed("") +func New(opts ...QueueOption) *Type { + return NewNamed("", opts...) } -func NewNamed(name string) *Type { - rc := clock.RealClock{} +func NewNamed(name string, opts ...QueueOption) *Type { + return newNamedQueueWithCustomClock(clock.RealClock{}, name, defaultUnfinishedWorkUpdatePeriod, opts...) +} + +// newNamedQueueWithCustomClock constructs a new named workqueue +// with ability to inject real or fake clock for testing purposes +// TODO(austince): should WithUpdatePeriod be a QueueOption? Could this func then be public? +func newNamedQueueWithCustomClock(clock clock.WithTicker, name string, updatePeriod time.Duration, opts ...QueueOption) *Type { + config := NewConfig(opts...) + + var metricsFactory *queueMetricsFactory + if config.metricsProvider != nil { + metricsFactory = &queueMetricsFactory{ + metricsProvider: config.metricsProvider, + } + } else { + metricsFactory = &globalMetricsFactory + } + return newQueue( - rc, - globalMetricsFactory.newQueueMetrics(name, rc), - defaultUnfinishedWorkUpdatePeriod, + clock, + metricsFactory.newQueueMetrics(name, clock), + updatePeriod, ) } diff --git a/util/workqueue/rate_limiting_queue.go b/util/workqueue/rate_limiting_queue.go index 91cd33f1..39fddc0a 100644 --- a/util/workqueue/rate_limiting_queue.go +++ b/util/workqueue/rate_limiting_queue.go @@ -36,16 +36,16 @@ type RateLimitingInterface interface { // Remember to call Forget! If you don't, you may end up tracking failures forever. // NewRateLimitingQueue does not emit metrics. For use with a MetricsProvider, please use // NewNamedRateLimitingQueue instead. -func NewRateLimitingQueue(rateLimiter RateLimiter) RateLimitingInterface { +func NewRateLimitingQueue(rateLimiter RateLimiter, opts ...QueueOption) RateLimitingInterface { return &rateLimitingType{ - DelayingInterface: NewDelayingQueue(), + DelayingInterface: NewDelayingQueue(opts...), rateLimiter: rateLimiter, } } -func NewNamedRateLimitingQueue(rateLimiter RateLimiter, name string) RateLimitingInterface { +func NewNamedRateLimitingQueue(rateLimiter RateLimiter, name string, opts ...QueueOption) RateLimitingInterface { return &rateLimitingType{ - DelayingInterface: NewNamedDelayingQueue(name), + DelayingInterface: NewNamedDelayingQueue(name, opts...), rateLimiter: rateLimiter, } }