mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-28 22:17:14 +00:00
Unify queue constructors, deprecate current constructors
This commit is contained in:
parent
9e83e7e975
commit
fea83b7824
@ -33,28 +33,71 @@ type DelayingInterface interface {
|
|||||||
AddAfter(item interface{}, duration time.Duration)
|
AddAfter(item interface{}, duration time.Duration)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// DelayingQueueConfig specifies optional configurations to customize a DelayingInterface.
|
||||||
|
type DelayingQueueConfig struct {
|
||||||
|
// Name for the queue. If unnamed, the metrics will not be registered.
|
||||||
|
Name string
|
||||||
|
|
||||||
|
// MetricsProvider optionally allows specifying a metrics provider to use for the queue
|
||||||
|
// instead of the global provider.
|
||||||
|
MetricsProvider MetricsProvider
|
||||||
|
|
||||||
|
// Clock optionally allows injecting a real or fake clock for testing purposes.
|
||||||
|
Clock clock.WithTicker
|
||||||
|
|
||||||
|
// Queue optionally allows injecting custom queue Interface instead of the default one.
|
||||||
|
Queue Interface
|
||||||
|
}
|
||||||
|
|
||||||
// NewDelayingQueue constructs a new workqueue with delayed queuing ability.
|
// NewDelayingQueue constructs a new workqueue with delayed queuing ability.
|
||||||
// NewDelayingQueue does not emit metrics. For use with a MetricsProvider, please use
|
// NewDelayingQueue does not emit metrics. For use with a MetricsProvider, please use
|
||||||
// NewNamedDelayingQueue instead.
|
// NewDelayingQueueWithConfig instead and specify a name.
|
||||||
func NewDelayingQueue() DelayingInterface {
|
func NewDelayingQueue() DelayingInterface {
|
||||||
return NewDelayingQueueWithCustomClock(clock.RealClock{}, "")
|
return NewDelayingQueueWithConfig(DelayingQueueConfig{})
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewDelayingQueueWithConfig constructs a new workqueue with options to
|
||||||
|
// customize different properties.
|
||||||
|
func NewDelayingQueueWithConfig(config DelayingQueueConfig) DelayingInterface {
|
||||||
|
if config.Clock == nil {
|
||||||
|
config.Clock = clock.RealClock{}
|
||||||
|
}
|
||||||
|
|
||||||
|
if config.Queue == nil {
|
||||||
|
config.Queue = NewWithConfig(QueueConfig{
|
||||||
|
Name: config.Name,
|
||||||
|
MetricsProvider: config.MetricsProvider,
|
||||||
|
Clock: config.Clock,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
return newDelayingQueue(config.Clock, config.Queue, config.Name)
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewDelayingQueueWithCustomQueue constructs a new workqueue with ability to
|
// NewDelayingQueueWithCustomQueue constructs a new workqueue with ability to
|
||||||
// inject custom queue Interface instead of the default one
|
// inject custom queue Interface instead of the default one
|
||||||
|
// Deprecated: Use NewDelayingQueueWithConfig instead.
|
||||||
func NewDelayingQueueWithCustomQueue(q Interface, name string) DelayingInterface {
|
func NewDelayingQueueWithCustomQueue(q Interface, name string) DelayingInterface {
|
||||||
return newDelayingQueue(clock.RealClock{}, q, name)
|
return NewDelayingQueueWithConfig(DelayingQueueConfig{
|
||||||
|
Name: name,
|
||||||
|
Queue: q,
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewNamedDelayingQueue constructs a new named workqueue with delayed queuing ability
|
// NewNamedDelayingQueue constructs a new named workqueue with delayed queuing ability.
|
||||||
func NewNamedDelayingQueue(name string, opts ...QueueOption) DelayingInterface {
|
// Deprecated: Use NewDelayingQueueWithConfig instead.
|
||||||
return NewDelayingQueueWithCustomClock(clock.RealClock{}, name)
|
func NewNamedDelayingQueue(name string) DelayingInterface {
|
||||||
|
return NewDelayingQueueWithConfig(DelayingQueueConfig{Name: name})
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewDelayingQueueWithCustomClock constructs a new named workqueue
|
// NewDelayingQueueWithCustomClock constructs a new named workqueue
|
||||||
// with ability to inject real or fake clock for testing purposes
|
// with ability to inject real or fake clock for testing purposes.
|
||||||
func NewDelayingQueueWithCustomClock(clock clock.WithTicker, name string, opts ...QueueOption) DelayingInterface {
|
// Deprecated: Use NewDelayingQueueWithConfig instead.
|
||||||
return newDelayingQueue(clock, NewNamed(name, opts...), name)
|
func NewDelayingQueueWithCustomClock(clock clock.WithTicker, name string) DelayingInterface {
|
||||||
|
return NewDelayingQueueWithConfig(DelayingQueueConfig{
|
||||||
|
Name: name,
|
||||||
|
Clock: clock,
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func newDelayingQueue(clock clock.WithTicker, q Interface, name string) *delayingType {
|
func newDelayingQueue(clock clock.WithTicker, q Interface, name string) *delayingType {
|
||||||
|
@ -29,7 +29,7 @@ import (
|
|||||||
|
|
||||||
func TestSimpleQueue(t *testing.T) {
|
func TestSimpleQueue(t *testing.T) {
|
||||||
fakeClock := testingclock.NewFakeClock(time.Now())
|
fakeClock := testingclock.NewFakeClock(time.Now())
|
||||||
q := NewDelayingQueueWithCustomClock(fakeClock, "")
|
q := NewDelayingQueueWithConfig(DelayingQueueConfig{Clock: fakeClock})
|
||||||
|
|
||||||
first := "foo"
|
first := "foo"
|
||||||
|
|
||||||
@ -71,7 +71,7 @@ func TestSimpleQueue(t *testing.T) {
|
|||||||
|
|
||||||
func TestDeduping(t *testing.T) {
|
func TestDeduping(t *testing.T) {
|
||||||
fakeClock := testingclock.NewFakeClock(time.Now())
|
fakeClock := testingclock.NewFakeClock(time.Now())
|
||||||
q := NewDelayingQueueWithCustomClock(fakeClock, "")
|
q := NewDelayingQueueWithConfig(DelayingQueueConfig{Clock: fakeClock})
|
||||||
|
|
||||||
first := "foo"
|
first := "foo"
|
||||||
|
|
||||||
@ -127,7 +127,7 @@ func TestDeduping(t *testing.T) {
|
|||||||
|
|
||||||
func TestAddTwoFireEarly(t *testing.T) {
|
func TestAddTwoFireEarly(t *testing.T) {
|
||||||
fakeClock := testingclock.NewFakeClock(time.Now())
|
fakeClock := testingclock.NewFakeClock(time.Now())
|
||||||
q := NewDelayingQueueWithCustomClock(fakeClock, "")
|
q := NewDelayingQueueWithConfig(DelayingQueueConfig{Clock: fakeClock})
|
||||||
|
|
||||||
first := "foo"
|
first := "foo"
|
||||||
second := "bar"
|
second := "bar"
|
||||||
@ -176,7 +176,7 @@ func TestAddTwoFireEarly(t *testing.T) {
|
|||||||
|
|
||||||
func TestCopyShifting(t *testing.T) {
|
func TestCopyShifting(t *testing.T) {
|
||||||
fakeClock := testingclock.NewFakeClock(time.Now())
|
fakeClock := testingclock.NewFakeClock(time.Now())
|
||||||
q := NewDelayingQueueWithCustomClock(fakeClock, "")
|
q := NewDelayingQueueWithConfig(DelayingQueueConfig{Clock: fakeClock})
|
||||||
|
|
||||||
first := "foo"
|
first := "foo"
|
||||||
second := "bar"
|
second := "bar"
|
||||||
@ -214,7 +214,7 @@ func TestCopyShifting(t *testing.T) {
|
|||||||
|
|
||||||
func BenchmarkDelayingQueue_AddAfter(b *testing.B) {
|
func BenchmarkDelayingQueue_AddAfter(b *testing.B) {
|
||||||
fakeClock := testingclock.NewFakeClock(time.Now())
|
fakeClock := testingclock.NewFakeClock(time.Now())
|
||||||
q := NewDelayingQueueWithCustomClock(fakeClock, "")
|
q := NewDelayingQueueWithConfig(DelayingQueueConfig{Clock: fakeClock})
|
||||||
|
|
||||||
// Add items
|
// Add items
|
||||||
for n := 0; n < b.N; n++ {
|
for n := 0; n < b.N; n++ {
|
||||||
|
@ -171,7 +171,12 @@ func TestMetrics(t *testing.T) {
|
|||||||
mp := testMetricsProvider{}
|
mp := testMetricsProvider{}
|
||||||
t0 := time.Unix(0, 0)
|
t0 := time.Unix(0, 0)
|
||||||
c := testingclock.NewFakeClock(t0)
|
c := testingclock.NewFakeClock(t0)
|
||||||
q := newNamedQueueWithCustomClock(c, "test", time.Millisecond, WithMetricsProvider(&mp))
|
config := QueueConfig{
|
||||||
|
Name: "test",
|
||||||
|
Clock: c,
|
||||||
|
MetricsProvider: &mp,
|
||||||
|
}
|
||||||
|
q := newQueueWithConfig(config, time.Millisecond)
|
||||||
defer q.ShutDown()
|
defer q.ShutDown()
|
||||||
for !c.HasWaiters() {
|
for !c.HasWaiters() {
|
||||||
// Wait for the go routine to call NewTicker()
|
// Wait for the go routine to call NewTicker()
|
||||||
|
@ -1,51 +0,0 @@
|
|||||||
/*
|
|
||||||
Copyright 2022 The Kubernetes Authors.
|
|
||||||
|
|
||||||
Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
you may not use this file except in compliance with the License.
|
|
||||||
You may obtain a copy of the License at
|
|
||||||
|
|
||||||
http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
|
|
||||||
Unless required by applicable law or agreed to in writing, software
|
|
||||||
distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
See the License for the specific language governing permissions and
|
|
||||||
limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package workqueue
|
|
||||||
|
|
||||||
type QueueConfig struct {
|
|
||||||
metricsProvider MetricsProvider
|
|
||||||
}
|
|
||||||
|
|
||||||
// QueueOption is an interface for applying queue configuration options.
|
|
||||||
type QueueOption interface {
|
|
||||||
apply(*QueueConfig)
|
|
||||||
}
|
|
||||||
|
|
||||||
type optionFunc func(*QueueConfig)
|
|
||||||
|
|
||||||
func (fn optionFunc) apply(config *QueueConfig) {
|
|
||||||
fn(config)
|
|
||||||
}
|
|
||||||
|
|
||||||
var _ QueueOption = optionFunc(nil)
|
|
||||||
|
|
||||||
// NewConfig creates a new QueueConfig and applies all the given options.
|
|
||||||
func NewConfig(opts ...QueueOption) *QueueConfig {
|
|
||||||
config := &QueueConfig{}
|
|
||||||
for _, o := range opts {
|
|
||||||
o.apply(config)
|
|
||||||
}
|
|
||||||
return config
|
|
||||||
}
|
|
||||||
|
|
||||||
// WithMetricsProvider allows specifying a metrics provider to use for the queue
|
|
||||||
// instead of the global provider.
|
|
||||||
func WithMetricsProvider(provider MetricsProvider) QueueOption {
|
|
||||||
return optionFunc(func(config *QueueConfig) {
|
|
||||||
config.metricsProvider = provider
|
|
||||||
})
|
|
||||||
}
|
|
@ -33,32 +33,59 @@ type Interface interface {
|
|||||||
ShuttingDown() bool
|
ShuttingDown() bool
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// QueueConfig specifies optional configurations to customize an Interface.
|
||||||
|
type QueueConfig struct {
|
||||||
|
// Name for the queue. If unnamed, the metrics will not be registered.
|
||||||
|
Name string
|
||||||
|
|
||||||
|
// MetricsProvider optionally allows specifying a metrics provider to use for the queue
|
||||||
|
// instead of the global provider.
|
||||||
|
MetricsProvider MetricsProvider
|
||||||
|
|
||||||
|
// Clock ability to inject real or fake clock for testing purposes.
|
||||||
|
Clock clock.WithTicker
|
||||||
|
}
|
||||||
|
|
||||||
// New constructs a new work queue (see the package comment).
|
// New constructs a new work queue (see the package comment).
|
||||||
func New(opts ...QueueOption) *Type {
|
func New() *Type {
|
||||||
return NewNamed("", opts...)
|
return NewWithConfig(QueueConfig{
|
||||||
|
Name: "",
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewNamed(name string, opts ...QueueOption) *Type {
|
// NewWithConfig constructs a new workqueue with ability to
|
||||||
return newNamedQueueWithCustomClock(clock.RealClock{}, name, defaultUnfinishedWorkUpdatePeriod, opts...)
|
// customize different properties.
|
||||||
|
func NewWithConfig(config QueueConfig) *Type {
|
||||||
|
return newQueueWithConfig(config, defaultUnfinishedWorkUpdatePeriod)
|
||||||
}
|
}
|
||||||
|
|
||||||
// newNamedQueueWithCustomClock constructs a new named workqueue
|
// NewNamed creates a new named queue.
|
||||||
// with ability to inject real or fake clock for testing purposes
|
// Deprecated: Use NewWithConfig instead.
|
||||||
func newNamedQueueWithCustomClock(clock clock.WithTicker, name string, updatePeriod time.Duration, opts ...QueueOption) *Type {
|
func NewNamed(name string) *Type {
|
||||||
config := NewConfig(opts...)
|
return NewWithConfig(QueueConfig{
|
||||||
|
Name: name,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// newQueueWithConfig constructs a new named workqueue
|
||||||
|
// with the ability to customize different properties for testing purposes
|
||||||
|
func newQueueWithConfig(config QueueConfig, updatePeriod time.Duration) *Type {
|
||||||
var metricsFactory *queueMetricsFactory
|
var metricsFactory *queueMetricsFactory
|
||||||
if config.metricsProvider != nil {
|
if config.MetricsProvider != nil {
|
||||||
metricsFactory = &queueMetricsFactory{
|
metricsFactory = &queueMetricsFactory{
|
||||||
metricsProvider: config.metricsProvider,
|
metricsProvider: config.MetricsProvider,
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
metricsFactory = &globalMetricsFactory
|
metricsFactory = &globalMetricsFactory
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if config.Clock == nil {
|
||||||
|
config.Clock = clock.RealClock{}
|
||||||
|
}
|
||||||
|
|
||||||
return newQueue(
|
return newQueue(
|
||||||
clock,
|
config.Clock,
|
||||||
metricsFactory.newQueueMetrics(name, clock),
|
metricsFactory.newQueueMetrics(config.Name, config.Clock),
|
||||||
updatePeriod,
|
updatePeriod,
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
@ -16,6 +16,8 @@ limitations under the License.
|
|||||||
|
|
||||||
package workqueue
|
package workqueue
|
||||||
|
|
||||||
|
import "k8s.io/utils/clock"
|
||||||
|
|
||||||
// RateLimitingInterface is an interface that rate limits items being added to the queue.
|
// RateLimitingInterface is an interface that rate limits items being added to the queue.
|
||||||
type RateLimitingInterface interface {
|
type RateLimitingInterface interface {
|
||||||
DelayingInterface
|
DelayingInterface
|
||||||
@ -32,29 +34,68 @@ type RateLimitingInterface interface {
|
|||||||
NumRequeues(item interface{}) int
|
NumRequeues(item interface{}) int
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// RateLimitingQueueConfig specifies optional configurations to customize a RateLimitingInterface.
|
||||||
|
|
||||||
|
type RateLimitingQueueConfig struct {
|
||||||
|
// Name for the queue. If unnamed, the metrics will not be registered.
|
||||||
|
Name string
|
||||||
|
|
||||||
|
// MetricsProvider optionally allows specifying a metrics provider to use for the queue
|
||||||
|
// instead of the global provider.
|
||||||
|
MetricsProvider MetricsProvider
|
||||||
|
|
||||||
|
// Clock optionally allows injecting a real or fake clock for testing purposes.
|
||||||
|
Clock clock.WithTicker
|
||||||
|
|
||||||
|
// DelayingQueue optionally allows injecting custom delaying queue DelayingInterface instead of the default one.
|
||||||
|
DelayingQueue DelayingInterface
|
||||||
|
}
|
||||||
|
|
||||||
// NewRateLimitingQueue constructs a new workqueue with rateLimited queuing ability
|
// NewRateLimitingQueue constructs a new workqueue with rateLimited queuing ability
|
||||||
// Remember to call Forget! If you don't, you may end up tracking failures forever.
|
// Remember to call Forget! If you don't, you may end up tracking failures forever.
|
||||||
// NewRateLimitingQueue does not emit metrics. For use with a MetricsProvider, please use
|
// NewRateLimitingQueue does not emit metrics. For use with a MetricsProvider, please use
|
||||||
// NewNamedRateLimitingQueue instead.
|
// NewRateLimitingQueueWithConfig instead and specify a name.
|
||||||
func NewRateLimitingQueue(rateLimiter RateLimiter) RateLimitingInterface {
|
func NewRateLimitingQueue(rateLimiter RateLimiter) RateLimitingInterface {
|
||||||
|
return NewRateLimitingQueueWithConfig(rateLimiter, RateLimitingQueueConfig{})
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewRateLimitingQueueWithConfig constructs a new workqueue with rateLimited queuing ability
|
||||||
|
// with options to customize different properties.
|
||||||
|
// Remember to call Forget! If you don't, you may end up tracking failures forever.
|
||||||
|
func NewRateLimitingQueueWithConfig(rateLimiter RateLimiter, config RateLimitingQueueConfig) RateLimitingInterface {
|
||||||
|
if config.Clock == nil {
|
||||||
|
config.Clock = clock.RealClock{}
|
||||||
|
}
|
||||||
|
|
||||||
|
if config.DelayingQueue == nil {
|
||||||
|
config.DelayingQueue = NewDelayingQueueWithConfig(DelayingQueueConfig{
|
||||||
|
Name: config.Name,
|
||||||
|
MetricsProvider: config.MetricsProvider,
|
||||||
|
Clock: config.Clock,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
return &rateLimitingType{
|
return &rateLimitingType{
|
||||||
DelayingInterface: NewDelayingQueue(),
|
DelayingInterface: config.DelayingQueue,
|
||||||
rateLimiter: rateLimiter,
|
rateLimiter: rateLimiter,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewNamedRateLimitingQueue(rateLimiter RateLimiter, name string, opts ...QueueOption) RateLimitingInterface {
|
// NewNamedRateLimitingQueue constructs a new named workqueue with rateLimited queuing ability.
|
||||||
return &rateLimitingType{
|
// Deprecated: Use NewRateLimitingQueueWithConfig instead.
|
||||||
DelayingInterface: NewNamedDelayingQueue(name, opts...),
|
func NewNamedRateLimitingQueue(rateLimiter RateLimiter, name string) RateLimitingInterface {
|
||||||
rateLimiter: rateLimiter,
|
return NewRateLimitingQueueWithConfig(rateLimiter, RateLimitingQueueConfig{
|
||||||
}
|
Name: name,
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NewRateLimitingQueueWithDelayingInterface constructs a new named workqueue with rateLimited queuing ability
|
||||||
|
// with the option to inject a custom delaying queue instead of the default one.
|
||||||
|
// Deprecated: Use NewRateLimitingQueueWithConfig instead.
|
||||||
func NewRateLimitingQueueWithDelayingInterface(di DelayingInterface, rateLimiter RateLimiter) RateLimitingInterface {
|
func NewRateLimitingQueueWithDelayingInterface(di DelayingInterface, rateLimiter RateLimiter) RateLimitingInterface {
|
||||||
return &rateLimitingType{
|
return NewRateLimitingQueueWithConfig(rateLimiter, RateLimitingQueueConfig{
|
||||||
DelayingInterface: di,
|
DelayingQueue: di,
|
||||||
rateLimiter: rateLimiter,
|
})
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// rateLimitingType wraps an Interface and provides rateLimited re-enquing
|
// rateLimitingType wraps an Interface and provides rateLimited re-enquing
|
||||||
|
Loading…
Reference in New Issue
Block a user