diff --git a/staging/src/k8s.io/client-go/util/workqueue/delaying_queue.go b/staging/src/k8s.io/client-go/util/workqueue/delaying_queue.go index 26eacc2ba77..c1df7203021 100644 --- a/staging/src/k8s.io/client-go/util/workqueue/delaying_queue.go +++ b/staging/src/k8s.io/client-go/util/workqueue/delaying_queue.go @@ -33,38 +33,81 @@ type DelayingInterface interface { AddAfter(item interface{}, duration time.Duration) } +// DelayingQueueConfig specifies optional configurations to customize a DelayingInterface. +type DelayingQueueConfig struct { + // Name for the queue. If unnamed, the metrics will not be registered. + Name string + + // MetricsProvider optionally allows specifying a metrics provider to use for the queue + // instead of the global provider. + MetricsProvider MetricsProvider + + // Clock optionally allows injecting a real or fake clock for testing purposes. + Clock clock.WithTicker + + // Queue optionally allows injecting custom queue Interface instead of the default one. + Queue Interface +} + // NewDelayingQueue constructs a new workqueue with delayed queuing ability. // NewDelayingQueue does not emit metrics. For use with a MetricsProvider, please use -// NewNamedDelayingQueue instead. +// NewDelayingQueueWithConfig instead and specify a name. func NewDelayingQueue() DelayingInterface { - return NewDelayingQueueWithCustomClock(clock.RealClock{}, "") + return NewDelayingQueueWithConfig(DelayingQueueConfig{}) +} + +// NewDelayingQueueWithConfig constructs a new workqueue with options to +// customize different properties. +func NewDelayingQueueWithConfig(config DelayingQueueConfig) DelayingInterface { + if config.Clock == nil { + config.Clock = clock.RealClock{} + } + + if config.Queue == nil { + config.Queue = NewWithConfig(QueueConfig{ + Name: config.Name, + MetricsProvider: config.MetricsProvider, + Clock: config.Clock, + }) + } + + return newDelayingQueue(config.Clock, config.Queue, config.Name, config.MetricsProvider) } // NewDelayingQueueWithCustomQueue constructs a new workqueue with ability to // inject custom queue Interface instead of the default one +// Deprecated: Use NewDelayingQueueWithConfig instead. func NewDelayingQueueWithCustomQueue(q Interface, name string) DelayingInterface { - return newDelayingQueue(clock.RealClock{}, q, name) + return NewDelayingQueueWithConfig(DelayingQueueConfig{ + Name: name, + Queue: q, + }) } -// NewNamedDelayingQueue constructs a new named workqueue with delayed queuing ability +// NewNamedDelayingQueue constructs a new named workqueue with delayed queuing ability. +// Deprecated: Use NewDelayingQueueWithConfig instead. func NewNamedDelayingQueue(name string) DelayingInterface { - return NewDelayingQueueWithCustomClock(clock.RealClock{}, name) + return NewDelayingQueueWithConfig(DelayingQueueConfig{Name: name}) } // NewDelayingQueueWithCustomClock constructs a new named workqueue -// with ability to inject real or fake clock for testing purposes +// with ability to inject real or fake clock for testing purposes. +// Deprecated: Use NewDelayingQueueWithConfig instead. func NewDelayingQueueWithCustomClock(clock clock.WithTicker, name string) DelayingInterface { - return newDelayingQueue(clock, NewNamed(name), name) + return NewDelayingQueueWithConfig(DelayingQueueConfig{ + Name: name, + Clock: clock, + }) } -func newDelayingQueue(clock clock.WithTicker, q Interface, name string) *delayingType { +func newDelayingQueue(clock clock.WithTicker, q Interface, name string, provider MetricsProvider) *delayingType { ret := &delayingType{ Interface: q, clock: clock, heartbeat: clock.NewTicker(maxWait), stopCh: make(chan struct{}), waitingForAddCh: make(chan *waitFor, 1000), - metrics: newRetryMetrics(name), + metrics: newRetryMetrics(name, provider), } go ret.waitingLoop() diff --git a/staging/src/k8s.io/client-go/util/workqueue/delaying_queue_test.go b/staging/src/k8s.io/client-go/util/workqueue/delaying_queue_test.go index 0967a7558f9..3c589e17c9f 100644 --- a/staging/src/k8s.io/client-go/util/workqueue/delaying_queue_test.go +++ b/staging/src/k8s.io/client-go/util/workqueue/delaying_queue_test.go @@ -29,7 +29,7 @@ import ( func TestSimpleQueue(t *testing.T) { fakeClock := testingclock.NewFakeClock(time.Now()) - q := NewDelayingQueueWithCustomClock(fakeClock, "") + q := NewDelayingQueueWithConfig(DelayingQueueConfig{Clock: fakeClock}) first := "foo" @@ -71,7 +71,7 @@ func TestSimpleQueue(t *testing.T) { func TestDeduping(t *testing.T) { fakeClock := testingclock.NewFakeClock(time.Now()) - q := NewDelayingQueueWithCustomClock(fakeClock, "") + q := NewDelayingQueueWithConfig(DelayingQueueConfig{Clock: fakeClock}) first := "foo" @@ -127,7 +127,7 @@ func TestDeduping(t *testing.T) { func TestAddTwoFireEarly(t *testing.T) { fakeClock := testingclock.NewFakeClock(time.Now()) - q := NewDelayingQueueWithCustomClock(fakeClock, "") + q := NewDelayingQueueWithConfig(DelayingQueueConfig{Clock: fakeClock}) first := "foo" second := "bar" @@ -176,7 +176,7 @@ func TestAddTwoFireEarly(t *testing.T) { func TestCopyShifting(t *testing.T) { fakeClock := testingclock.NewFakeClock(time.Now()) - q := NewDelayingQueueWithCustomClock(fakeClock, "") + q := NewDelayingQueueWithConfig(DelayingQueueConfig{Clock: fakeClock}) first := "foo" second := "bar" @@ -214,7 +214,7 @@ func TestCopyShifting(t *testing.T) { func BenchmarkDelayingQueue_AddAfter(b *testing.B) { fakeClock := testingclock.NewFakeClock(time.Now()) - q := NewDelayingQueueWithCustomClock(fakeClock, "") + q := NewDelayingQueueWithConfig(DelayingQueueConfig{Clock: fakeClock}) // Add items for n := 0; n < b.N; n++ { diff --git a/staging/src/k8s.io/client-go/util/workqueue/metrics.go b/staging/src/k8s.io/client-go/util/workqueue/metrics.go index 4b0a69616d3..f012ccc5548 100644 --- a/staging/src/k8s.io/client-go/util/workqueue/metrics.go +++ b/staging/src/k8s.io/client-go/util/workqueue/metrics.go @@ -244,13 +244,18 @@ func (f *queueMetricsFactory) newQueueMetrics(name string, clock clock.Clock) qu } } -func newRetryMetrics(name string) retryMetrics { +func newRetryMetrics(name string, provider MetricsProvider) retryMetrics { var ret *defaultRetryMetrics if len(name) == 0 { return ret } + + if provider == nil { + provider = globalMetricsFactory.metricsProvider + } + return &defaultRetryMetrics{ - retries: globalMetricsFactory.metricsProvider.NewRetriesMetric(name), + retries: provider.NewRetriesMetric(name), } } diff --git a/staging/src/k8s.io/client-go/util/workqueue/metrics_test.go b/staging/src/k8s.io/client-go/util/workqueue/metrics_test.go index bfc9c1a3a6e..3c3cc36111a 100644 --- a/staging/src/k8s.io/client-go/util/workqueue/metrics_test.go +++ b/staging/src/k8s.io/client-go/util/workqueue/metrics_test.go @@ -171,9 +171,12 @@ func TestMetrics(t *testing.T) { mp := testMetricsProvider{} t0 := time.Unix(0, 0) c := testingclock.NewFakeClock(t0) - mf := queueMetricsFactory{metricsProvider: &mp} - m := mf.newQueueMetrics("test", c) - q := newQueue(c, m, time.Millisecond) + config := QueueConfig{ + Name: "test", + Clock: c, + MetricsProvider: &mp, + } + q := newQueueWithConfig(config, time.Millisecond) defer q.ShutDown() for !c.HasWaiters() { // Wait for the go routine to call NewTicker() diff --git a/staging/src/k8s.io/client-go/util/workqueue/queue.go b/staging/src/k8s.io/client-go/util/workqueue/queue.go index 6f7063269fb..380c0645526 100644 --- a/staging/src/k8s.io/client-go/util/workqueue/queue.go +++ b/staging/src/k8s.io/client-go/util/workqueue/queue.go @@ -33,17 +33,60 @@ type Interface interface { ShuttingDown() bool } -// New constructs a new work queue (see the package comment). -func New() *Type { - return NewNamed("") +// QueueConfig specifies optional configurations to customize an Interface. +type QueueConfig struct { + // Name for the queue. If unnamed, the metrics will not be registered. + Name string + + // MetricsProvider optionally allows specifying a metrics provider to use for the queue + // instead of the global provider. + MetricsProvider MetricsProvider + + // Clock ability to inject real or fake clock for testing purposes. + Clock clock.WithTicker } +// New constructs a new work queue (see the package comment). +func New() *Type { + return NewWithConfig(QueueConfig{ + Name: "", + }) +} + +// NewWithConfig constructs a new workqueue with ability to +// customize different properties. +func NewWithConfig(config QueueConfig) *Type { + return newQueueWithConfig(config, defaultUnfinishedWorkUpdatePeriod) +} + +// NewNamed creates a new named queue. +// Deprecated: Use NewWithConfig instead. func NewNamed(name string) *Type { - rc := clock.RealClock{} + return NewWithConfig(QueueConfig{ + Name: name, + }) +} + +// newQueueWithConfig constructs a new named workqueue +// with the ability to customize different properties for testing purposes +func newQueueWithConfig(config QueueConfig, updatePeriod time.Duration) *Type { + var metricsFactory *queueMetricsFactory + if config.MetricsProvider != nil { + metricsFactory = &queueMetricsFactory{ + metricsProvider: config.MetricsProvider, + } + } else { + metricsFactory = &globalMetricsFactory + } + + if config.Clock == nil { + config.Clock = clock.RealClock{} + } + return newQueue( - rc, - globalMetricsFactory.newQueueMetrics(name, rc), - defaultUnfinishedWorkUpdatePeriod, + config.Clock, + metricsFactory.newQueueMetrics(config.Name, config.Clock), + updatePeriod, ) } diff --git a/staging/src/k8s.io/client-go/util/workqueue/rate_limiting_queue.go b/staging/src/k8s.io/client-go/util/workqueue/rate_limiting_queue.go index 91cd33f193b..3e4016fb04f 100644 --- a/staging/src/k8s.io/client-go/util/workqueue/rate_limiting_queue.go +++ b/staging/src/k8s.io/client-go/util/workqueue/rate_limiting_queue.go @@ -16,6 +16,8 @@ limitations under the License. package workqueue +import "k8s.io/utils/clock" + // RateLimitingInterface is an interface that rate limits items being added to the queue. type RateLimitingInterface interface { DelayingInterface @@ -32,29 +34,68 @@ type RateLimitingInterface interface { NumRequeues(item interface{}) int } +// RateLimitingQueueConfig specifies optional configurations to customize a RateLimitingInterface. + +type RateLimitingQueueConfig struct { + // Name for the queue. If unnamed, the metrics will not be registered. + Name string + + // MetricsProvider optionally allows specifying a metrics provider to use for the queue + // instead of the global provider. + MetricsProvider MetricsProvider + + // Clock optionally allows injecting a real or fake clock for testing purposes. + Clock clock.WithTicker + + // DelayingQueue optionally allows injecting custom delaying queue DelayingInterface instead of the default one. + DelayingQueue DelayingInterface +} + // NewRateLimitingQueue constructs a new workqueue with rateLimited queuing ability // Remember to call Forget! If you don't, you may end up tracking failures forever. // NewRateLimitingQueue does not emit metrics. For use with a MetricsProvider, please use -// NewNamedRateLimitingQueue instead. +// NewRateLimitingQueueWithConfig instead and specify a name. func NewRateLimitingQueue(rateLimiter RateLimiter) RateLimitingInterface { + return NewRateLimitingQueueWithConfig(rateLimiter, RateLimitingQueueConfig{}) +} + +// NewRateLimitingQueueWithConfig constructs a new workqueue with rateLimited queuing ability +// with options to customize different properties. +// Remember to call Forget! If you don't, you may end up tracking failures forever. +func NewRateLimitingQueueWithConfig(rateLimiter RateLimiter, config RateLimitingQueueConfig) RateLimitingInterface { + if config.Clock == nil { + config.Clock = clock.RealClock{} + } + + if config.DelayingQueue == nil { + config.DelayingQueue = NewDelayingQueueWithConfig(DelayingQueueConfig{ + Name: config.Name, + MetricsProvider: config.MetricsProvider, + Clock: config.Clock, + }) + } + return &rateLimitingType{ - DelayingInterface: NewDelayingQueue(), + DelayingInterface: config.DelayingQueue, rateLimiter: rateLimiter, } } +// NewNamedRateLimitingQueue constructs a new named workqueue with rateLimited queuing ability. +// Deprecated: Use NewRateLimitingQueueWithConfig instead. func NewNamedRateLimitingQueue(rateLimiter RateLimiter, name string) RateLimitingInterface { - return &rateLimitingType{ - DelayingInterface: NewNamedDelayingQueue(name), - rateLimiter: rateLimiter, - } + return NewRateLimitingQueueWithConfig(rateLimiter, RateLimitingQueueConfig{ + Name: name, + }) } +// NewRateLimitingQueueWithDelayingInterface constructs a new named workqueue with rateLimited queuing ability +// with the option to inject a custom delaying queue instead of the default one. +// Deprecated: Use NewRateLimitingQueueWithConfig instead. func NewRateLimitingQueueWithDelayingInterface(di DelayingInterface, rateLimiter RateLimiter) RateLimitingInterface { - return &rateLimitingType{ - DelayingInterface: di, - rateLimiter: rateLimiter, - } + return NewRateLimitingQueueWithConfig(rateLimiter, RateLimitingQueueConfig{ + DelayingQueue: di, + }) } // rateLimitingType wraps an Interface and provides rateLimited re-enquing diff --git a/staging/src/k8s.io/client-go/util/workqueue/rate_limiting_queue_test.go b/staging/src/k8s.io/client-go/util/workqueue/rate_limiting_queue_test.go index 84d8495dda3..77e161307e6 100644 --- a/staging/src/k8s.io/client-go/util/workqueue/rate_limiting_queue_test.go +++ b/staging/src/k8s.io/client-go/util/workqueue/rate_limiting_queue_test.go @@ -33,7 +33,7 @@ func TestRateLimitingQueue(t *testing.T) { heartbeat: fakeClock.NewTicker(maxWait), stopCh: make(chan struct{}), waitingForAddCh: make(chan *waitFor, 1000), - metrics: newRetryMetrics(""), + metrics: newRetryMetrics("", nil), } queue.DelayingInterface = delayingQueue