diff --git a/staging/src/k8s.io/client-go/util/workqueue/delaying_queue.go b/staging/src/k8s.io/client-go/util/workqueue/delaying_queue.go index 1ace76711c4..fdf923e1a35 100644 --- a/staging/src/k8s.io/client-go/util/workqueue/delaying_queue.go +++ b/staging/src/k8s.io/client-go/util/workqueue/delaying_queue.go @@ -33,28 +33,71 @@ type DelayingInterface interface { AddAfter(item interface{}, duration time.Duration) } +// DelayingQueueConfig specifies optional configurations to customize a DelayingInterface. +type DelayingQueueConfig struct { + // Name for the queue. If unnamed, the metrics will not be registered. + Name string + + // MetricsProvider optionally allows specifying a metrics provider to use for the queue + // instead of the global provider. + MetricsProvider MetricsProvider + + // Clock optionally allows injecting a real or fake clock for testing purposes. + Clock clock.WithTicker + + // Queue optionally allows injecting custom queue Interface instead of the default one. + Queue Interface +} + // NewDelayingQueue constructs a new workqueue with delayed queuing ability. // NewDelayingQueue does not emit metrics. For use with a MetricsProvider, please use -// NewNamedDelayingQueue instead. +// NewDelayingQueueWithConfig instead and specify a name. func NewDelayingQueue() DelayingInterface { - return NewDelayingQueueWithCustomClock(clock.RealClock{}, "") + return NewDelayingQueueWithConfig(DelayingQueueConfig{}) +} + +// NewDelayingQueueWithConfig constructs a new workqueue with options to +// customize different properties. +func NewDelayingQueueWithConfig(config DelayingQueueConfig) DelayingInterface { + if config.Clock == nil { + config.Clock = clock.RealClock{} + } + + if config.Queue == nil { + config.Queue = NewWithConfig(QueueConfig{ + Name: config.Name, + MetricsProvider: config.MetricsProvider, + Clock: config.Clock, + }) + } + + return newDelayingQueue(config.Clock, config.Queue, config.Name) } // NewDelayingQueueWithCustomQueue constructs a new workqueue with ability to // inject custom queue Interface instead of the default one +// Deprecated: Use NewDelayingQueueWithConfig instead. func NewDelayingQueueWithCustomQueue(q Interface, name string) DelayingInterface { - return newDelayingQueue(clock.RealClock{}, q, name) + return NewDelayingQueueWithConfig(DelayingQueueConfig{ + Name: name, + Queue: q, + }) } -// NewNamedDelayingQueue constructs a new named workqueue with delayed queuing ability -func NewNamedDelayingQueue(name string, opts ...QueueOption) DelayingInterface { - return NewDelayingQueueWithCustomClock(clock.RealClock{}, name) +// NewNamedDelayingQueue constructs a new named workqueue with delayed queuing ability. +// Deprecated: Use NewDelayingQueueWithConfig instead. +func NewNamedDelayingQueue(name string) DelayingInterface { + return NewDelayingQueueWithConfig(DelayingQueueConfig{Name: name}) } // NewDelayingQueueWithCustomClock constructs a new named workqueue -// with ability to inject real or fake clock for testing purposes -func NewDelayingQueueWithCustomClock(clock clock.WithTicker, name string, opts ...QueueOption) DelayingInterface { - return newDelayingQueue(clock, NewNamed(name, opts...), name) +// with ability to inject real or fake clock for testing purposes. +// Deprecated: Use NewDelayingQueueWithConfig instead. +func NewDelayingQueueWithCustomClock(clock clock.WithTicker, name string) DelayingInterface { + return NewDelayingQueueWithConfig(DelayingQueueConfig{ + Name: name, + Clock: clock, + }) } func newDelayingQueue(clock clock.WithTicker, q Interface, name string) *delayingType { diff --git a/staging/src/k8s.io/client-go/util/workqueue/delaying_queue_test.go b/staging/src/k8s.io/client-go/util/workqueue/delaying_queue_test.go index 0967a7558f9..3c589e17c9f 100644 --- a/staging/src/k8s.io/client-go/util/workqueue/delaying_queue_test.go +++ b/staging/src/k8s.io/client-go/util/workqueue/delaying_queue_test.go @@ -29,7 +29,7 @@ import ( func TestSimpleQueue(t *testing.T) { fakeClock := testingclock.NewFakeClock(time.Now()) - q := NewDelayingQueueWithCustomClock(fakeClock, "") + q := NewDelayingQueueWithConfig(DelayingQueueConfig{Clock: fakeClock}) first := "foo" @@ -71,7 +71,7 @@ func TestSimpleQueue(t *testing.T) { func TestDeduping(t *testing.T) { fakeClock := testingclock.NewFakeClock(time.Now()) - q := NewDelayingQueueWithCustomClock(fakeClock, "") + q := NewDelayingQueueWithConfig(DelayingQueueConfig{Clock: fakeClock}) first := "foo" @@ -127,7 +127,7 @@ func TestDeduping(t *testing.T) { func TestAddTwoFireEarly(t *testing.T) { fakeClock := testingclock.NewFakeClock(time.Now()) - q := NewDelayingQueueWithCustomClock(fakeClock, "") + q := NewDelayingQueueWithConfig(DelayingQueueConfig{Clock: fakeClock}) first := "foo" second := "bar" @@ -176,7 +176,7 @@ func TestAddTwoFireEarly(t *testing.T) { func TestCopyShifting(t *testing.T) { fakeClock := testingclock.NewFakeClock(time.Now()) - q := NewDelayingQueueWithCustomClock(fakeClock, "") + q := NewDelayingQueueWithConfig(DelayingQueueConfig{Clock: fakeClock}) first := "foo" second := "bar" @@ -214,7 +214,7 @@ func TestCopyShifting(t *testing.T) { func BenchmarkDelayingQueue_AddAfter(b *testing.B) { fakeClock := testingclock.NewFakeClock(time.Now()) - q := NewDelayingQueueWithCustomClock(fakeClock, "") + q := NewDelayingQueueWithConfig(DelayingQueueConfig{Clock: fakeClock}) // Add items for n := 0; n < b.N; n++ { diff --git a/staging/src/k8s.io/client-go/util/workqueue/metrics_test.go b/staging/src/k8s.io/client-go/util/workqueue/metrics_test.go index 6102345155f..3c3cc36111a 100644 --- a/staging/src/k8s.io/client-go/util/workqueue/metrics_test.go +++ b/staging/src/k8s.io/client-go/util/workqueue/metrics_test.go @@ -171,7 +171,12 @@ func TestMetrics(t *testing.T) { mp := testMetricsProvider{} t0 := time.Unix(0, 0) c := testingclock.NewFakeClock(t0) - q := newNamedQueueWithCustomClock(c, "test", time.Millisecond, WithMetricsProvider(&mp)) + config := QueueConfig{ + Name: "test", + Clock: c, + MetricsProvider: &mp, + } + q := newQueueWithConfig(config, time.Millisecond) defer q.ShutDown() for !c.HasWaiters() { // Wait for the go routine to call NewTicker() diff --git a/staging/src/k8s.io/client-go/util/workqueue/options.go b/staging/src/k8s.io/client-go/util/workqueue/options.go deleted file mode 100644 index cf33a9b0bed..00000000000 --- a/staging/src/k8s.io/client-go/util/workqueue/options.go +++ /dev/null @@ -1,51 +0,0 @@ -/* -Copyright 2022 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package workqueue - -type QueueConfig struct { - metricsProvider MetricsProvider -} - -// QueueOption is an interface for applying queue configuration options. -type QueueOption interface { - apply(*QueueConfig) -} - -type optionFunc func(*QueueConfig) - -func (fn optionFunc) apply(config *QueueConfig) { - fn(config) -} - -var _ QueueOption = optionFunc(nil) - -// NewConfig creates a new QueueConfig and applies all the given options. -func NewConfig(opts ...QueueOption) *QueueConfig { - config := &QueueConfig{} - for _, o := range opts { - o.apply(config) - } - return config -} - -// WithMetricsProvider allows specifying a metrics provider to use for the queue -// instead of the global provider. -func WithMetricsProvider(provider MetricsProvider) QueueOption { - return optionFunc(func(config *QueueConfig) { - config.metricsProvider = provider - }) -} diff --git a/staging/src/k8s.io/client-go/util/workqueue/queue.go b/staging/src/k8s.io/client-go/util/workqueue/queue.go index 99853cfa672..380c0645526 100644 --- a/staging/src/k8s.io/client-go/util/workqueue/queue.go +++ b/staging/src/k8s.io/client-go/util/workqueue/queue.go @@ -33,32 +33,59 @@ type Interface interface { ShuttingDown() bool } +// QueueConfig specifies optional configurations to customize an Interface. +type QueueConfig struct { + // Name for the queue. If unnamed, the metrics will not be registered. + Name string + + // MetricsProvider optionally allows specifying a metrics provider to use for the queue + // instead of the global provider. + MetricsProvider MetricsProvider + + // Clock ability to inject real or fake clock for testing purposes. + Clock clock.WithTicker +} + // New constructs a new work queue (see the package comment). -func New(opts ...QueueOption) *Type { - return NewNamed("", opts...) +func New() *Type { + return NewWithConfig(QueueConfig{ + Name: "", + }) } -func NewNamed(name string, opts ...QueueOption) *Type { - return newNamedQueueWithCustomClock(clock.RealClock{}, name, defaultUnfinishedWorkUpdatePeriod, opts...) +// NewWithConfig constructs a new workqueue with ability to +// customize different properties. +func NewWithConfig(config QueueConfig) *Type { + return newQueueWithConfig(config, defaultUnfinishedWorkUpdatePeriod) } -// newNamedQueueWithCustomClock constructs a new named workqueue -// with ability to inject real or fake clock for testing purposes -func newNamedQueueWithCustomClock(clock clock.WithTicker, name string, updatePeriod time.Duration, opts ...QueueOption) *Type { - config := NewConfig(opts...) +// NewNamed creates a new named queue. +// Deprecated: Use NewWithConfig instead. +func NewNamed(name string) *Type { + return NewWithConfig(QueueConfig{ + Name: name, + }) +} +// newQueueWithConfig constructs a new named workqueue +// with the ability to customize different properties for testing purposes +func newQueueWithConfig(config QueueConfig, updatePeriod time.Duration) *Type { var metricsFactory *queueMetricsFactory - if config.metricsProvider != nil { + if config.MetricsProvider != nil { metricsFactory = &queueMetricsFactory{ - metricsProvider: config.metricsProvider, + metricsProvider: config.MetricsProvider, } } else { metricsFactory = &globalMetricsFactory } + if config.Clock == nil { + config.Clock = clock.RealClock{} + } + return newQueue( - clock, - metricsFactory.newQueueMetrics(name, clock), + config.Clock, + metricsFactory.newQueueMetrics(config.Name, config.Clock), updatePeriod, ) } diff --git a/staging/src/k8s.io/client-go/util/workqueue/rate_limiting_queue.go b/staging/src/k8s.io/client-go/util/workqueue/rate_limiting_queue.go index 6b07b4a36b9..3e4016fb04f 100644 --- a/staging/src/k8s.io/client-go/util/workqueue/rate_limiting_queue.go +++ b/staging/src/k8s.io/client-go/util/workqueue/rate_limiting_queue.go @@ -16,6 +16,8 @@ limitations under the License. package workqueue +import "k8s.io/utils/clock" + // RateLimitingInterface is an interface that rate limits items being added to the queue. type RateLimitingInterface interface { DelayingInterface @@ -32,29 +34,68 @@ type RateLimitingInterface interface { NumRequeues(item interface{}) int } +// RateLimitingQueueConfig specifies optional configurations to customize a RateLimitingInterface. + +type RateLimitingQueueConfig struct { + // Name for the queue. If unnamed, the metrics will not be registered. + Name string + + // MetricsProvider optionally allows specifying a metrics provider to use for the queue + // instead of the global provider. + MetricsProvider MetricsProvider + + // Clock optionally allows injecting a real or fake clock for testing purposes. + Clock clock.WithTicker + + // DelayingQueue optionally allows injecting custom delaying queue DelayingInterface instead of the default one. + DelayingQueue DelayingInterface +} + // NewRateLimitingQueue constructs a new workqueue with rateLimited queuing ability // Remember to call Forget! If you don't, you may end up tracking failures forever. // NewRateLimitingQueue does not emit metrics. For use with a MetricsProvider, please use -// NewNamedRateLimitingQueue instead. +// NewRateLimitingQueueWithConfig instead and specify a name. func NewRateLimitingQueue(rateLimiter RateLimiter) RateLimitingInterface { + return NewRateLimitingQueueWithConfig(rateLimiter, RateLimitingQueueConfig{}) +} + +// NewRateLimitingQueueWithConfig constructs a new workqueue with rateLimited queuing ability +// with options to customize different properties. +// Remember to call Forget! If you don't, you may end up tracking failures forever. +func NewRateLimitingQueueWithConfig(rateLimiter RateLimiter, config RateLimitingQueueConfig) RateLimitingInterface { + if config.Clock == nil { + config.Clock = clock.RealClock{} + } + + if config.DelayingQueue == nil { + config.DelayingQueue = NewDelayingQueueWithConfig(DelayingQueueConfig{ + Name: config.Name, + MetricsProvider: config.MetricsProvider, + Clock: config.Clock, + }) + } + return &rateLimitingType{ - DelayingInterface: NewDelayingQueue(), + DelayingInterface: config.DelayingQueue, rateLimiter: rateLimiter, } } -func NewNamedRateLimitingQueue(rateLimiter RateLimiter, name string, opts ...QueueOption) RateLimitingInterface { - return &rateLimitingType{ - DelayingInterface: NewNamedDelayingQueue(name, opts...), - rateLimiter: rateLimiter, - } +// NewNamedRateLimitingQueue constructs a new named workqueue with rateLimited queuing ability. +// Deprecated: Use NewRateLimitingQueueWithConfig instead. +func NewNamedRateLimitingQueue(rateLimiter RateLimiter, name string) RateLimitingInterface { + return NewRateLimitingQueueWithConfig(rateLimiter, RateLimitingQueueConfig{ + Name: name, + }) } +// NewRateLimitingQueueWithDelayingInterface constructs a new named workqueue with rateLimited queuing ability +// with the option to inject a custom delaying queue instead of the default one. +// Deprecated: Use NewRateLimitingQueueWithConfig instead. func NewRateLimitingQueueWithDelayingInterface(di DelayingInterface, rateLimiter RateLimiter) RateLimitingInterface { - return &rateLimitingType{ - DelayingInterface: di, - rateLimiter: rateLimiter, - } + return NewRateLimitingQueueWithConfig(rateLimiter, RateLimitingQueueConfig{ + DelayingQueue: di, + }) } // rateLimitingType wraps an Interface and provides rateLimited re-enquing