mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-06 02:34:03 +00:00
Merge pull request #124263 from alvaroaleman/typed
Workqueue: Add generic versions that are properly typed
This commit is contained in:
commit
b3f5c57223
@ -53,7 +53,7 @@ type DiscoveryController struct {
|
|||||||
// To allow injection for testing.
|
// To allow injection for testing.
|
||||||
syncFn func(version schema.GroupVersion) error
|
syncFn func(version schema.GroupVersion) error
|
||||||
|
|
||||||
queue workqueue.RateLimitingInterface
|
queue workqueue.TypedRateLimitingInterface[schema.GroupVersion]
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewDiscoveryController(
|
func NewDiscoveryController(
|
||||||
@ -69,7 +69,10 @@ func NewDiscoveryController(
|
|||||||
crdLister: crdInformer.Lister(),
|
crdLister: crdInformer.Lister(),
|
||||||
crdsSynced: crdInformer.Informer().HasSynced,
|
crdsSynced: crdInformer.Informer().HasSynced,
|
||||||
|
|
||||||
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "DiscoveryController"),
|
queue: workqueue.NewTypedRateLimitingQueueWithConfig(
|
||||||
|
workqueue.DefaultTypedControllerRateLimiter[schema.GroupVersion](),
|
||||||
|
workqueue.TypedRateLimitingQueueConfig[schema.GroupVersion]{Name: "DiscoveryController"},
|
||||||
|
),
|
||||||
}
|
}
|
||||||
|
|
||||||
crdInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
crdInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||||
@ -337,7 +340,7 @@ func (c *DiscoveryController) processNextWorkItem() bool {
|
|||||||
}
|
}
|
||||||
defer c.queue.Done(key)
|
defer c.queue.Done(key)
|
||||||
|
|
||||||
err := c.syncFn(key.(schema.GroupVersion))
|
err := c.syncFn(key)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
c.queue.Forget(key)
|
c.queue.Forget(key)
|
||||||
return true
|
return true
|
||||||
|
@ -24,49 +24,66 @@ import (
|
|||||||
"golang.org/x/time/rate"
|
"golang.org/x/time/rate"
|
||||||
)
|
)
|
||||||
|
|
||||||
type RateLimiter interface {
|
// Deprecated: RateLimiter is deprecated, use TypedRateLimiter instead.
|
||||||
|
type RateLimiter TypedRateLimiter[any]
|
||||||
|
|
||||||
|
type TypedRateLimiter[T comparable] interface {
|
||||||
// When gets an item and gets to decide how long that item should wait
|
// When gets an item and gets to decide how long that item should wait
|
||||||
When(item interface{}) time.Duration
|
When(item T) time.Duration
|
||||||
// Forget indicates that an item is finished being retried. Doesn't matter whether it's for failing
|
// Forget indicates that an item is finished being retried. Doesn't matter whether it's for failing
|
||||||
// or for success, we'll stop tracking it
|
// or for success, we'll stop tracking it
|
||||||
Forget(item interface{})
|
Forget(item T)
|
||||||
// NumRequeues returns back how many failures the item has had
|
// NumRequeues returns back how many failures the item has had
|
||||||
NumRequeues(item interface{}) int
|
NumRequeues(item T) int
|
||||||
}
|
}
|
||||||
|
|
||||||
// DefaultControllerRateLimiter is a no-arg constructor for a default rate limiter for a workqueue. It has
|
// DefaultControllerRateLimiter is a no-arg constructor for a default rate limiter for a workqueue. It has
|
||||||
// both overall and per-item rate limiting. The overall is a token bucket and the per-item is exponential
|
// both overall and per-item rate limiting. The overall is a token bucket and the per-item is exponential
|
||||||
|
//
|
||||||
|
// Deprecated: Use DefaultTypedControllerRateLimiter instead.
|
||||||
func DefaultControllerRateLimiter() RateLimiter {
|
func DefaultControllerRateLimiter() RateLimiter {
|
||||||
return NewMaxOfRateLimiter(
|
return DefaultTypedControllerRateLimiter[any]()
|
||||||
NewItemExponentialFailureRateLimiter(5*time.Millisecond, 1000*time.Second),
|
}
|
||||||
|
|
||||||
|
// DefaultTypedControllerRateLimiter is a no-arg constructor for a default rate limiter for a workqueue. It has
|
||||||
|
// both overall and per-item rate limiting. The overall is a token bucket and the per-item is exponential
|
||||||
|
func DefaultTypedControllerRateLimiter[T comparable]() TypedRateLimiter[T] {
|
||||||
|
return NewTypedMaxOfRateLimiter(
|
||||||
|
NewTypedItemExponentialFailureRateLimiter[T](5*time.Millisecond, 1000*time.Second),
|
||||||
// 10 qps, 100 bucket size. This is only for retry speed and its only the overall factor (not per item)
|
// 10 qps, 100 bucket size. This is only for retry speed and its only the overall factor (not per item)
|
||||||
&BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
|
&TypedBucketRateLimiter[T]{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
// BucketRateLimiter adapts a standard bucket to the workqueue ratelimiter API
|
// Deprecated: BucketRateLimiter is deprecated, use TypedBucketRateLimiter instead.
|
||||||
type BucketRateLimiter struct {
|
type BucketRateLimiter = TypedBucketRateLimiter[any]
|
||||||
|
|
||||||
|
// TypedBucketRateLimiter adapts a standard bucket to the workqueue ratelimiter API
|
||||||
|
type TypedBucketRateLimiter[T comparable] struct {
|
||||||
*rate.Limiter
|
*rate.Limiter
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ RateLimiter = &BucketRateLimiter{}
|
var _ RateLimiter = &BucketRateLimiter{}
|
||||||
|
|
||||||
func (r *BucketRateLimiter) When(item interface{}) time.Duration {
|
func (r *TypedBucketRateLimiter[T]) When(item T) time.Duration {
|
||||||
return r.Limiter.Reserve().Delay()
|
return r.Limiter.Reserve().Delay()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *BucketRateLimiter) NumRequeues(item interface{}) int {
|
func (r *TypedBucketRateLimiter[T]) NumRequeues(item T) int {
|
||||||
return 0
|
return 0
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *BucketRateLimiter) Forget(item interface{}) {
|
func (r *TypedBucketRateLimiter[T]) Forget(item T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// ItemExponentialFailureRateLimiter does a simple baseDelay*2^<num-failures> limit
|
// Deprecated: ItemExponentialFailureRateLimiter is deprecated, use TypedItemExponentialFailureRateLimiter instead.
|
||||||
|
type ItemExponentialFailureRateLimiter = TypedItemExponentialFailureRateLimiter[any]
|
||||||
|
|
||||||
|
// TypedItemExponentialFailureRateLimiter does a simple baseDelay*2^<num-failures> limit
|
||||||
// dealing with max failures and expiration are up to the caller
|
// dealing with max failures and expiration are up to the caller
|
||||||
type ItemExponentialFailureRateLimiter struct {
|
type TypedItemExponentialFailureRateLimiter[T comparable] struct {
|
||||||
failuresLock sync.Mutex
|
failuresLock sync.Mutex
|
||||||
failures map[interface{}]int
|
failures map[T]int
|
||||||
|
|
||||||
baseDelay time.Duration
|
baseDelay time.Duration
|
||||||
maxDelay time.Duration
|
maxDelay time.Duration
|
||||||
@ -74,19 +91,29 @@ type ItemExponentialFailureRateLimiter struct {
|
|||||||
|
|
||||||
var _ RateLimiter = &ItemExponentialFailureRateLimiter{}
|
var _ RateLimiter = &ItemExponentialFailureRateLimiter{}
|
||||||
|
|
||||||
|
// Deprecated: NewItemExponentialFailureRateLimiter is deprecated, use NewTypedItemExponentialFailureRateLimiter instead.
|
||||||
func NewItemExponentialFailureRateLimiter(baseDelay time.Duration, maxDelay time.Duration) RateLimiter {
|
func NewItemExponentialFailureRateLimiter(baseDelay time.Duration, maxDelay time.Duration) RateLimiter {
|
||||||
return &ItemExponentialFailureRateLimiter{
|
return NewTypedItemExponentialFailureRateLimiter[any](baseDelay, maxDelay)
|
||||||
failures: map[interface{}]int{},
|
}
|
||||||
|
|
||||||
|
func NewTypedItemExponentialFailureRateLimiter[T comparable](baseDelay time.Duration, maxDelay time.Duration) TypedRateLimiter[T] {
|
||||||
|
return &TypedItemExponentialFailureRateLimiter[T]{
|
||||||
|
failures: map[T]int{},
|
||||||
baseDelay: baseDelay,
|
baseDelay: baseDelay,
|
||||||
maxDelay: maxDelay,
|
maxDelay: maxDelay,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Deprecated: DefaultItemBasedRateLimiter is deprecated, use DefaultTypedItemBasedRateLimiter instead.
|
||||||
func DefaultItemBasedRateLimiter() RateLimiter {
|
func DefaultItemBasedRateLimiter() RateLimiter {
|
||||||
return NewItemExponentialFailureRateLimiter(time.Millisecond, 1000*time.Second)
|
return DefaultTypedItemBasedRateLimiter[any]()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *ItemExponentialFailureRateLimiter) When(item interface{}) time.Duration {
|
func DefaultTypedItemBasedRateLimiter[T comparable]() TypedRateLimiter[T] {
|
||||||
|
return NewTypedItemExponentialFailureRateLimiter[T](time.Millisecond, 1000*time.Second)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *TypedItemExponentialFailureRateLimiter[T]) When(item T) time.Duration {
|
||||||
r.failuresLock.Lock()
|
r.failuresLock.Lock()
|
||||||
defer r.failuresLock.Unlock()
|
defer r.failuresLock.Unlock()
|
||||||
|
|
||||||
@ -107,14 +134,14 @@ func (r *ItemExponentialFailureRateLimiter) When(item interface{}) time.Duration
|
|||||||
return calculated
|
return calculated
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *ItemExponentialFailureRateLimiter) NumRequeues(item interface{}) int {
|
func (r *TypedItemExponentialFailureRateLimiter[T]) NumRequeues(item T) int {
|
||||||
r.failuresLock.Lock()
|
r.failuresLock.Lock()
|
||||||
defer r.failuresLock.Unlock()
|
defer r.failuresLock.Unlock()
|
||||||
|
|
||||||
return r.failures[item]
|
return r.failures[item]
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *ItemExponentialFailureRateLimiter) Forget(item interface{}) {
|
func (r *TypedItemExponentialFailureRateLimiter[T]) Forget(item T) {
|
||||||
r.failuresLock.Lock()
|
r.failuresLock.Lock()
|
||||||
defer r.failuresLock.Unlock()
|
defer r.failuresLock.Unlock()
|
||||||
|
|
||||||
@ -122,9 +149,13 @@ func (r *ItemExponentialFailureRateLimiter) Forget(item interface{}) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// ItemFastSlowRateLimiter does a quick retry for a certain number of attempts, then a slow retry after that
|
// ItemFastSlowRateLimiter does a quick retry for a certain number of attempts, then a slow retry after that
|
||||||
type ItemFastSlowRateLimiter struct {
|
// Deprecated: Use TypedItemFastSlowRateLimiter instead.
|
||||||
|
type ItemFastSlowRateLimiter = TypedItemFastSlowRateLimiter[any]
|
||||||
|
|
||||||
|
// TypedItemFastSlowRateLimiter does a quick retry for a certain number of attempts, then a slow retry after that
|
||||||
|
type TypedItemFastSlowRateLimiter[T comparable] struct {
|
||||||
failuresLock sync.Mutex
|
failuresLock sync.Mutex
|
||||||
failures map[interface{}]int
|
failures map[T]int
|
||||||
|
|
||||||
maxFastAttempts int
|
maxFastAttempts int
|
||||||
fastDelay time.Duration
|
fastDelay time.Duration
|
||||||
@ -133,16 +164,21 @@ type ItemFastSlowRateLimiter struct {
|
|||||||
|
|
||||||
var _ RateLimiter = &ItemFastSlowRateLimiter{}
|
var _ RateLimiter = &ItemFastSlowRateLimiter{}
|
||||||
|
|
||||||
|
// Deprecated: NewItemFastSlowRateLimiter is deprecated, use NewTypedItemFastSlowRateLimiter instead.
|
||||||
func NewItemFastSlowRateLimiter(fastDelay, slowDelay time.Duration, maxFastAttempts int) RateLimiter {
|
func NewItemFastSlowRateLimiter(fastDelay, slowDelay time.Duration, maxFastAttempts int) RateLimiter {
|
||||||
return &ItemFastSlowRateLimiter{
|
return NewTypedItemFastSlowRateLimiter[any](fastDelay, slowDelay, maxFastAttempts)
|
||||||
failures: map[interface{}]int{},
|
}
|
||||||
|
|
||||||
|
func NewTypedItemFastSlowRateLimiter[T comparable](fastDelay, slowDelay time.Duration, maxFastAttempts int) TypedRateLimiter[T] {
|
||||||
|
return &TypedItemFastSlowRateLimiter[T]{
|
||||||
|
failures: map[T]int{},
|
||||||
fastDelay: fastDelay,
|
fastDelay: fastDelay,
|
||||||
slowDelay: slowDelay,
|
slowDelay: slowDelay,
|
||||||
maxFastAttempts: maxFastAttempts,
|
maxFastAttempts: maxFastAttempts,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *ItemFastSlowRateLimiter) When(item interface{}) time.Duration {
|
func (r *TypedItemFastSlowRateLimiter[T]) When(item T) time.Duration {
|
||||||
r.failuresLock.Lock()
|
r.failuresLock.Lock()
|
||||||
defer r.failuresLock.Unlock()
|
defer r.failuresLock.Unlock()
|
||||||
|
|
||||||
@ -155,14 +191,14 @@ func (r *ItemFastSlowRateLimiter) When(item interface{}) time.Duration {
|
|||||||
return r.slowDelay
|
return r.slowDelay
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *ItemFastSlowRateLimiter) NumRequeues(item interface{}) int {
|
func (r *TypedItemFastSlowRateLimiter[T]) NumRequeues(item T) int {
|
||||||
r.failuresLock.Lock()
|
r.failuresLock.Lock()
|
||||||
defer r.failuresLock.Unlock()
|
defer r.failuresLock.Unlock()
|
||||||
|
|
||||||
return r.failures[item]
|
return r.failures[item]
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *ItemFastSlowRateLimiter) Forget(item interface{}) {
|
func (r *TypedItemFastSlowRateLimiter[T]) Forget(item T) {
|
||||||
r.failuresLock.Lock()
|
r.failuresLock.Lock()
|
||||||
defer r.failuresLock.Unlock()
|
defer r.failuresLock.Unlock()
|
||||||
|
|
||||||
@ -172,11 +208,18 @@ func (r *ItemFastSlowRateLimiter) Forget(item interface{}) {
|
|||||||
// MaxOfRateLimiter calls every RateLimiter and returns the worst case response
|
// MaxOfRateLimiter calls every RateLimiter and returns the worst case response
|
||||||
// When used with a token bucket limiter, the burst could be apparently exceeded in cases where particular items
|
// When used with a token bucket limiter, the burst could be apparently exceeded in cases where particular items
|
||||||
// were separately delayed a longer time.
|
// were separately delayed a longer time.
|
||||||
type MaxOfRateLimiter struct {
|
//
|
||||||
limiters []RateLimiter
|
// Deprecated: Use TypedMaxOfRateLimiter instead.
|
||||||
|
type MaxOfRateLimiter = TypedMaxOfRateLimiter[any]
|
||||||
|
|
||||||
|
// TypedMaxOfRateLimiter calls every RateLimiter and returns the worst case response
|
||||||
|
// When used with a token bucket limiter, the burst could be apparently exceeded in cases where particular items
|
||||||
|
// were separately delayed a longer time.
|
||||||
|
type TypedMaxOfRateLimiter[T comparable] struct {
|
||||||
|
limiters []TypedRateLimiter[T]
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *MaxOfRateLimiter) When(item interface{}) time.Duration {
|
func (r *TypedMaxOfRateLimiter[T]) When(item T) time.Duration {
|
||||||
ret := time.Duration(0)
|
ret := time.Duration(0)
|
||||||
for _, limiter := range r.limiters {
|
for _, limiter := range r.limiters {
|
||||||
curr := limiter.When(item)
|
curr := limiter.When(item)
|
||||||
@ -188,11 +231,16 @@ func (r *MaxOfRateLimiter) When(item interface{}) time.Duration {
|
|||||||
return ret
|
return ret
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewMaxOfRateLimiter(limiters ...RateLimiter) RateLimiter {
|
// Deprecated: NewMaxOfRateLimiter is deprecated, use NewTypedMaxOfRateLimiter instead.
|
||||||
return &MaxOfRateLimiter{limiters: limiters}
|
func NewMaxOfRateLimiter(limiters ...TypedRateLimiter[any]) RateLimiter {
|
||||||
|
return NewTypedMaxOfRateLimiter(limiters...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *MaxOfRateLimiter) NumRequeues(item interface{}) int {
|
func NewTypedMaxOfRateLimiter[T comparable](limiters ...TypedRateLimiter[T]) TypedRateLimiter[T] {
|
||||||
|
return &TypedMaxOfRateLimiter[T]{limiters: limiters}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *TypedMaxOfRateLimiter[T]) NumRequeues(item T) int {
|
||||||
ret := 0
|
ret := 0
|
||||||
for _, limiter := range r.limiters {
|
for _, limiter := range r.limiters {
|
||||||
curr := limiter.NumRequeues(item)
|
curr := limiter.NumRequeues(item)
|
||||||
@ -204,23 +252,32 @@ func (r *MaxOfRateLimiter) NumRequeues(item interface{}) int {
|
|||||||
return ret
|
return ret
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *MaxOfRateLimiter) Forget(item interface{}) {
|
func (r *TypedMaxOfRateLimiter[T]) Forget(item T) {
|
||||||
for _, limiter := range r.limiters {
|
for _, limiter := range r.limiters {
|
||||||
limiter.Forget(item)
|
limiter.Forget(item)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// WithMaxWaitRateLimiter have maxDelay which avoids waiting too long
|
// WithMaxWaitRateLimiter have maxDelay which avoids waiting too long
|
||||||
type WithMaxWaitRateLimiter struct {
|
// Deprecated: Use TypedWithMaxWaitRateLimiter instead.
|
||||||
limiter RateLimiter
|
type WithMaxWaitRateLimiter = TypedWithMaxWaitRateLimiter[any]
|
||||||
|
|
||||||
|
// TypedWithMaxWaitRateLimiter have maxDelay which avoids waiting too long
|
||||||
|
type TypedWithMaxWaitRateLimiter[T comparable] struct {
|
||||||
|
limiter TypedRateLimiter[T]
|
||||||
maxDelay time.Duration
|
maxDelay time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Deprecated: NewWithMaxWaitRateLimiter is deprecated, use NewTypedWithMaxWaitRateLimiter instead.
|
||||||
func NewWithMaxWaitRateLimiter(limiter RateLimiter, maxDelay time.Duration) RateLimiter {
|
func NewWithMaxWaitRateLimiter(limiter RateLimiter, maxDelay time.Duration) RateLimiter {
|
||||||
return &WithMaxWaitRateLimiter{limiter: limiter, maxDelay: maxDelay}
|
return NewTypedWithMaxWaitRateLimiter[any](limiter, maxDelay)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w WithMaxWaitRateLimiter) When(item interface{}) time.Duration {
|
func NewTypedWithMaxWaitRateLimiter[T comparable](limiter TypedRateLimiter[T], maxDelay time.Duration) TypedRateLimiter[T] {
|
||||||
|
return &TypedWithMaxWaitRateLimiter[T]{limiter: limiter, maxDelay: maxDelay}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w TypedWithMaxWaitRateLimiter[T]) When(item T) time.Duration {
|
||||||
delay := w.limiter.When(item)
|
delay := w.limiter.When(item)
|
||||||
if delay > w.maxDelay {
|
if delay > w.maxDelay {
|
||||||
return w.maxDelay
|
return w.maxDelay
|
||||||
@ -229,10 +286,10 @@ func (w WithMaxWaitRateLimiter) When(item interface{}) time.Duration {
|
|||||||
return delay
|
return delay
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w WithMaxWaitRateLimiter) Forget(item interface{}) {
|
func (w TypedWithMaxWaitRateLimiter[T]) Forget(item T) {
|
||||||
w.limiter.Forget(item)
|
w.limiter.Forget(item)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w WithMaxWaitRateLimiter) NumRequeues(item interface{}) int {
|
func (w TypedWithMaxWaitRateLimiter[T]) NumRequeues(item T) int {
|
||||||
return w.limiter.NumRequeues(item)
|
return w.limiter.NumRequeues(item)
|
||||||
}
|
}
|
||||||
|
@ -27,14 +27,25 @@ import (
|
|||||||
|
|
||||||
// DelayingInterface is an Interface that can Add an item at a later time. This makes it easier to
|
// DelayingInterface is an Interface that can Add an item at a later time. This makes it easier to
|
||||||
// requeue items after failures without ending up in a hot-loop.
|
// requeue items after failures without ending up in a hot-loop.
|
||||||
type DelayingInterface interface {
|
//
|
||||||
Interface
|
// Deprecated: use TypedDelayingInterface instead.
|
||||||
|
type DelayingInterface TypedDelayingInterface[any]
|
||||||
|
|
||||||
|
// TypedDelayingInterface is an Interface that can Add an item at a later time. This makes it easier to
|
||||||
|
// requeue items after failures without ending up in a hot-loop.
|
||||||
|
type TypedDelayingInterface[T comparable] interface {
|
||||||
|
TypedInterface[T]
|
||||||
// AddAfter adds an item to the workqueue after the indicated duration has passed
|
// AddAfter adds an item to the workqueue after the indicated duration has passed
|
||||||
AddAfter(item interface{}, duration time.Duration)
|
AddAfter(item T, duration time.Duration)
|
||||||
}
|
}
|
||||||
|
|
||||||
// DelayingQueueConfig specifies optional configurations to customize a DelayingInterface.
|
// DelayingQueueConfig specifies optional configurations to customize a DelayingInterface.
|
||||||
type DelayingQueueConfig struct {
|
//
|
||||||
|
// Deprecated: use TypedDelayingQueueConfig instead.
|
||||||
|
type DelayingQueueConfig = TypedDelayingQueueConfig[any]
|
||||||
|
|
||||||
|
// TypedDelayingQueueConfig specifies optional configurations to customize a DelayingInterface.
|
||||||
|
type TypedDelayingQueueConfig[T comparable] struct {
|
||||||
// Name for the queue. If unnamed, the metrics will not be registered.
|
// Name for the queue. If unnamed, the metrics will not be registered.
|
||||||
Name string
|
Name string
|
||||||
|
|
||||||
@ -46,25 +57,42 @@ type DelayingQueueConfig struct {
|
|||||||
Clock clock.WithTicker
|
Clock clock.WithTicker
|
||||||
|
|
||||||
// Queue optionally allows injecting custom queue Interface instead of the default one.
|
// Queue optionally allows injecting custom queue Interface instead of the default one.
|
||||||
Queue Interface
|
Queue TypedInterface[T]
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewDelayingQueue constructs a new workqueue with delayed queuing ability.
|
// NewDelayingQueue constructs a new workqueue with delayed queuing ability.
|
||||||
// NewDelayingQueue does not emit metrics. For use with a MetricsProvider, please use
|
// NewDelayingQueue does not emit metrics. For use with a MetricsProvider, please use
|
||||||
// NewDelayingQueueWithConfig instead and specify a name.
|
// NewDelayingQueueWithConfig instead and specify a name.
|
||||||
|
//
|
||||||
|
// Deprecated: use TypedNewDelayingQueue instead.
|
||||||
func NewDelayingQueue() DelayingInterface {
|
func NewDelayingQueue() DelayingInterface {
|
||||||
return NewDelayingQueueWithConfig(DelayingQueueConfig{})
|
return NewDelayingQueueWithConfig(DelayingQueueConfig{})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TypedNewDelayingQueue constructs a new workqueue with delayed queuing ability.
|
||||||
|
// TypedNewDelayingQueue does not emit metrics. For use with a MetricsProvider, please use
|
||||||
|
// TypedNewDelayingQueueWithConfig instead and specify a name.
|
||||||
|
func TypedNewDelayingQueue[T comparable]() TypedDelayingInterface[T] {
|
||||||
|
return NewTypedDelayingQueueWithConfig(TypedDelayingQueueConfig[T]{})
|
||||||
|
}
|
||||||
|
|
||||||
// NewDelayingQueueWithConfig constructs a new workqueue with options to
|
// NewDelayingQueueWithConfig constructs a new workqueue with options to
|
||||||
// customize different properties.
|
// customize different properties.
|
||||||
|
//
|
||||||
|
// Deprecated: use TypedNewDelayingQueueWithConfig instead.
|
||||||
func NewDelayingQueueWithConfig(config DelayingQueueConfig) DelayingInterface {
|
func NewDelayingQueueWithConfig(config DelayingQueueConfig) DelayingInterface {
|
||||||
|
return NewTypedDelayingQueueWithConfig[any](config)
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewTypedDelayingQueueWithConfig constructs a new workqueue with options to
|
||||||
|
// customize different properties.
|
||||||
|
func NewTypedDelayingQueueWithConfig[T comparable](config TypedDelayingQueueConfig[T]) TypedDelayingInterface[T] {
|
||||||
if config.Clock == nil {
|
if config.Clock == nil {
|
||||||
config.Clock = clock.RealClock{}
|
config.Clock = clock.RealClock{}
|
||||||
}
|
}
|
||||||
|
|
||||||
if config.Queue == nil {
|
if config.Queue == nil {
|
||||||
config.Queue = NewWithConfig(QueueConfig{
|
config.Queue = NewTypedWithConfig[T](TypedQueueConfig[T]{
|
||||||
Name: config.Name,
|
Name: config.Name,
|
||||||
MetricsProvider: config.MetricsProvider,
|
MetricsProvider: config.MetricsProvider,
|
||||||
Clock: config.Clock,
|
Clock: config.Clock,
|
||||||
@ -100,9 +128,9 @@ func NewDelayingQueueWithCustomClock(clock clock.WithTicker, name string) Delayi
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func newDelayingQueue(clock clock.WithTicker, q Interface, name string, provider MetricsProvider) *delayingType {
|
func newDelayingQueue[T comparable](clock clock.WithTicker, q TypedInterface[T], name string, provider MetricsProvider) *delayingType[T] {
|
||||||
ret := &delayingType{
|
ret := &delayingType[T]{
|
||||||
Interface: q,
|
TypedInterface: q,
|
||||||
clock: clock,
|
clock: clock,
|
||||||
heartbeat: clock.NewTicker(maxWait),
|
heartbeat: clock.NewTicker(maxWait),
|
||||||
stopCh: make(chan struct{}),
|
stopCh: make(chan struct{}),
|
||||||
@ -115,8 +143,8 @@ func newDelayingQueue(clock clock.WithTicker, q Interface, name string, provider
|
|||||||
}
|
}
|
||||||
|
|
||||||
// delayingType wraps an Interface and provides delayed re-enquing
|
// delayingType wraps an Interface and provides delayed re-enquing
|
||||||
type delayingType struct {
|
type delayingType[T comparable] struct {
|
||||||
Interface
|
TypedInterface[T]
|
||||||
|
|
||||||
// clock tracks time for delayed firing
|
// clock tracks time for delayed firing
|
||||||
clock clock.Clock
|
clock clock.Clock
|
||||||
@ -193,16 +221,16 @@ func (pq waitForPriorityQueue) Peek() interface{} {
|
|||||||
|
|
||||||
// ShutDown stops the queue. After the queue drains, the returned shutdown bool
|
// ShutDown stops the queue. After the queue drains, the returned shutdown bool
|
||||||
// on Get() will be true. This method may be invoked more than once.
|
// on Get() will be true. This method may be invoked more than once.
|
||||||
func (q *delayingType) ShutDown() {
|
func (q *delayingType[T]) ShutDown() {
|
||||||
q.stopOnce.Do(func() {
|
q.stopOnce.Do(func() {
|
||||||
q.Interface.ShutDown()
|
q.TypedInterface.ShutDown()
|
||||||
close(q.stopCh)
|
close(q.stopCh)
|
||||||
q.heartbeat.Stop()
|
q.heartbeat.Stop()
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// AddAfter adds the given item to the work queue after the given delay
|
// AddAfter adds the given item to the work queue after the given delay
|
||||||
func (q *delayingType) AddAfter(item interface{}, duration time.Duration) {
|
func (q *delayingType[T]) AddAfter(item T, duration time.Duration) {
|
||||||
// don't add if we're already shutting down
|
// don't add if we're already shutting down
|
||||||
if q.ShuttingDown() {
|
if q.ShuttingDown() {
|
||||||
return
|
return
|
||||||
@ -229,7 +257,7 @@ func (q *delayingType) AddAfter(item interface{}, duration time.Duration) {
|
|||||||
const maxWait = 10 * time.Second
|
const maxWait = 10 * time.Second
|
||||||
|
|
||||||
// waitingLoop runs until the workqueue is shutdown and keeps a check on the list of items to be added.
|
// waitingLoop runs until the workqueue is shutdown and keeps a check on the list of items to be added.
|
||||||
func (q *delayingType) waitingLoop() {
|
func (q *delayingType[T]) waitingLoop() {
|
||||||
defer utilruntime.HandleCrash()
|
defer utilruntime.HandleCrash()
|
||||||
|
|
||||||
// Make a placeholder channel to use when there are no items in our list
|
// Make a placeholder channel to use when there are no items in our list
|
||||||
@ -244,7 +272,7 @@ func (q *delayingType) waitingLoop() {
|
|||||||
waitingEntryByData := map[t]*waitFor{}
|
waitingEntryByData := map[t]*waitFor{}
|
||||||
|
|
||||||
for {
|
for {
|
||||||
if q.Interface.ShuttingDown() {
|
if q.TypedInterface.ShuttingDown() {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -258,7 +286,7 @@ func (q *delayingType) waitingLoop() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
entry = heap.Pop(waitingForQueue).(*waitFor)
|
entry = heap.Pop(waitingForQueue).(*waitFor)
|
||||||
q.Add(entry.data)
|
q.Add(entry.data.(T))
|
||||||
delete(waitingEntryByData, entry.data)
|
delete(waitingEntryByData, entry.data)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -287,7 +315,7 @@ func (q *delayingType) waitingLoop() {
|
|||||||
if waitEntry.readyAt.After(q.clock.Now()) {
|
if waitEntry.readyAt.After(q.clock.Now()) {
|
||||||
insert(waitingForQueue, waitingEntryByData, waitEntry)
|
insert(waitingForQueue, waitingEntryByData, waitEntry)
|
||||||
} else {
|
} else {
|
||||||
q.Add(waitEntry.data)
|
q.Add(waitEntry.data.(T))
|
||||||
}
|
}
|
||||||
|
|
||||||
drained := false
|
drained := false
|
||||||
@ -297,7 +325,7 @@ func (q *delayingType) waitingLoop() {
|
|||||||
if waitEntry.readyAt.After(q.clock.Now()) {
|
if waitEntry.readyAt.After(q.clock.Now()) {
|
||||||
insert(waitingForQueue, waitingEntryByData, waitEntry)
|
insert(waitingForQueue, waitingEntryByData, waitEntry)
|
||||||
} else {
|
} else {
|
||||||
q.Add(waitEntry.data)
|
q.Add(waitEntry.data.(T))
|
||||||
}
|
}
|
||||||
default:
|
default:
|
||||||
drained = true
|
drained = true
|
||||||
|
@ -241,7 +241,7 @@ func waitForAdded(q DelayingInterface, depth int) error {
|
|||||||
|
|
||||||
func waitForWaitingQueueToFill(q DelayingInterface) error {
|
func waitForWaitingQueueToFill(q DelayingInterface) error {
|
||||||
return wait.Poll(1*time.Millisecond, 10*time.Second, func() (done bool, err error) {
|
return wait.Poll(1*time.Millisecond, 10*time.Second, func() (done bool, err error) {
|
||||||
if len(q.(*delayingType).waitingForAddCh) == 0 {
|
if len(q.(*delayingType[any]).waitingForAddCh) == 0 {
|
||||||
return true, nil
|
return true, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -41,7 +41,7 @@ func TestMetricShutdown(t *testing.T) {
|
|||||||
updateCalled: ch,
|
updateCalled: ch,
|
||||||
}
|
}
|
||||||
c := testingclock.NewFakeClock(time.Now())
|
c := testingclock.NewFakeClock(time.Now())
|
||||||
q := newQueue(c, DefaultQueue(), m, time.Millisecond)
|
q := newQueue[any](c, DefaultQueue[any](), m, time.Millisecond)
|
||||||
for !c.HasWaiters() {
|
for !c.HasWaiters() {
|
||||||
// Wait for the go routine to call NewTicker()
|
// Wait for the go routine to call NewTicker()
|
||||||
time.Sleep(time.Millisecond)
|
time.Sleep(time.Millisecond)
|
||||||
@ -176,7 +176,7 @@ func TestMetrics(t *testing.T) {
|
|||||||
Clock: c,
|
Clock: c,
|
||||||
MetricsProvider: &mp,
|
MetricsProvider: &mp,
|
||||||
}
|
}
|
||||||
q := newQueueWithConfig(config, time.Millisecond)
|
q := newQueueWithConfig[any](config, time.Millisecond)
|
||||||
defer q.ShutDown()
|
defer q.ShutDown()
|
||||||
for !c.HasWaiters() {
|
for !c.HasWaiters() {
|
||||||
// Wait for the go routine to call NewTicker()
|
// Wait for the go routine to call NewTicker()
|
||||||
|
@ -23,11 +23,14 @@ import (
|
|||||||
"k8s.io/utils/clock"
|
"k8s.io/utils/clock"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Interface interface {
|
// Deprecated: Interface is deprecated, use TypedInterface instead.
|
||||||
Add(item interface{})
|
type Interface TypedInterface[any]
|
||||||
|
|
||||||
|
type TypedInterface[T comparable] interface {
|
||||||
|
Add(item T)
|
||||||
Len() int
|
Len() int
|
||||||
Get() (item interface{}, shutdown bool)
|
Get() (item T, shutdown bool)
|
||||||
Done(item interface{})
|
Done(item T)
|
||||||
ShutDown()
|
ShutDown()
|
||||||
ShutDownWithDrain()
|
ShutDownWithDrain()
|
||||||
ShuttingDown() bool
|
ShuttingDown() bool
|
||||||
@ -35,48 +38,51 @@ type Interface interface {
|
|||||||
|
|
||||||
// Queue is the underlying storage for items. The functions below are always
|
// Queue is the underlying storage for items. The functions below are always
|
||||||
// called from the same goroutine.
|
// called from the same goroutine.
|
||||||
type Queue interface {
|
type Queue[T comparable] interface {
|
||||||
// Touch can be hooked when an existing item is added again. This may be
|
// Touch can be hooked when an existing item is added again. This may be
|
||||||
// useful if the implementation allows priority change for the given item.
|
// useful if the implementation allows priority change for the given item.
|
||||||
Touch(item interface{})
|
Touch(item T)
|
||||||
// Push adds a new item.
|
// Push adds a new item.
|
||||||
Push(item interface{})
|
Push(item T)
|
||||||
// Len tells the total number of items.
|
// Len tells the total number of items.
|
||||||
Len() int
|
Len() int
|
||||||
// Pop retrieves an item.
|
// Pop retrieves an item.
|
||||||
Pop() (item interface{})
|
Pop() (item T)
|
||||||
}
|
}
|
||||||
|
|
||||||
// DefaultQueue is a slice based FIFO queue.
|
// DefaultQueue is a slice based FIFO queue.
|
||||||
func DefaultQueue() Queue {
|
func DefaultQueue[T comparable]() Queue[T] {
|
||||||
return new(queue)
|
return new(queue[T])
|
||||||
}
|
}
|
||||||
|
|
||||||
// queue is a slice which implements Queue.
|
// queue is a slice which implements Queue.
|
||||||
type queue []interface{}
|
type queue[T comparable] []T
|
||||||
|
|
||||||
func (q *queue) Touch(item interface{}) {}
|
func (q *queue[T]) Touch(item T) {}
|
||||||
|
|
||||||
func (q *queue) Push(item interface{}) {
|
func (q *queue[T]) Push(item T) {
|
||||||
*q = append(*q, item)
|
*q = append(*q, item)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (q *queue) Len() int {
|
func (q *queue[T]) Len() int {
|
||||||
return len(*q)
|
return len(*q)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (q *queue) Pop() (item interface{}) {
|
func (q *queue[T]) Pop() (item T) {
|
||||||
item = (*q)[0]
|
item = (*q)[0]
|
||||||
|
|
||||||
// The underlying array still exists and reference this object, so the object will not be garbage collected.
|
// The underlying array still exists and reference this object, so the object will not be garbage collected.
|
||||||
(*q)[0] = nil
|
(*q)[0] = *new(T)
|
||||||
*q = (*q)[1:]
|
*q = (*q)[1:]
|
||||||
|
|
||||||
return item
|
return item
|
||||||
}
|
}
|
||||||
|
|
||||||
// QueueConfig specifies optional configurations to customize an Interface.
|
// QueueConfig specifies optional configurations to customize an Interface.
|
||||||
type QueueConfig struct {
|
// Deprecated: use TypedQueueConfig instead.
|
||||||
|
type QueueConfig = TypedQueueConfig[any]
|
||||||
|
|
||||||
|
type TypedQueueConfig[T comparable] struct {
|
||||||
// Name for the queue. If unnamed, the metrics will not be registered.
|
// Name for the queue. If unnamed, the metrics will not be registered.
|
||||||
Name string
|
Name string
|
||||||
|
|
||||||
@ -88,19 +94,36 @@ type QueueConfig struct {
|
|||||||
Clock clock.WithTicker
|
Clock clock.WithTicker
|
||||||
|
|
||||||
// Queue provides the underlying queue to use. It is optional and defaults to slice based FIFO queue.
|
// Queue provides the underlying queue to use. It is optional and defaults to slice based FIFO queue.
|
||||||
Queue Queue
|
Queue Queue[T]
|
||||||
}
|
}
|
||||||
|
|
||||||
// New constructs a new work queue (see the package comment).
|
// New constructs a new work queue (see the package comment).
|
||||||
|
//
|
||||||
|
// Deprecated: use NewTyped instead.
|
||||||
func New() *Type {
|
func New() *Type {
|
||||||
return NewWithConfig(QueueConfig{
|
return NewWithConfig(QueueConfig{
|
||||||
Name: "",
|
Name: "",
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NewTyped constructs a new work queue (see the package comment).
|
||||||
|
func NewTyped[T comparable]() *Typed[T] {
|
||||||
|
return NewTypedWithConfig(TypedQueueConfig[T]{
|
||||||
|
Name: "",
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
// NewWithConfig constructs a new workqueue with ability to
|
// NewWithConfig constructs a new workqueue with ability to
|
||||||
// customize different properties.
|
// customize different properties.
|
||||||
|
//
|
||||||
|
// Deprecated: use NewTypedWithConfig instead.
|
||||||
func NewWithConfig(config QueueConfig) *Type {
|
func NewWithConfig(config QueueConfig) *Type {
|
||||||
|
return NewTypedWithConfig(config)
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewTypedWithConfig constructs a new workqueue with ability to
|
||||||
|
// customize different properties.
|
||||||
|
func NewTypedWithConfig[T comparable](config TypedQueueConfig[T]) *Typed[T] {
|
||||||
return newQueueWithConfig(config, defaultUnfinishedWorkUpdatePeriod)
|
return newQueueWithConfig(config, defaultUnfinishedWorkUpdatePeriod)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -114,7 +137,7 @@ func NewNamed(name string) *Type {
|
|||||||
|
|
||||||
// newQueueWithConfig constructs a new named workqueue
|
// newQueueWithConfig constructs a new named workqueue
|
||||||
// with the ability to customize different properties for testing purposes
|
// with the ability to customize different properties for testing purposes
|
||||||
func newQueueWithConfig(config QueueConfig, updatePeriod time.Duration) *Type {
|
func newQueueWithConfig[T comparable](config TypedQueueConfig[T], updatePeriod time.Duration) *Typed[T] {
|
||||||
var metricsFactory *queueMetricsFactory
|
var metricsFactory *queueMetricsFactory
|
||||||
if config.MetricsProvider != nil {
|
if config.MetricsProvider != nil {
|
||||||
metricsFactory = &queueMetricsFactory{
|
metricsFactory = &queueMetricsFactory{
|
||||||
@ -129,7 +152,7 @@ func newQueueWithConfig(config QueueConfig, updatePeriod time.Duration) *Type {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if config.Queue == nil {
|
if config.Queue == nil {
|
||||||
config.Queue = DefaultQueue()
|
config.Queue = DefaultQueue[T]()
|
||||||
}
|
}
|
||||||
|
|
||||||
return newQueue(
|
return newQueue(
|
||||||
@ -140,12 +163,12 @@ func newQueueWithConfig(config QueueConfig, updatePeriod time.Duration) *Type {
|
|||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
func newQueue(c clock.WithTicker, queue Queue, metrics queueMetrics, updatePeriod time.Duration) *Type {
|
func newQueue[T comparable](c clock.WithTicker, queue Queue[T], metrics queueMetrics, updatePeriod time.Duration) *Typed[T] {
|
||||||
t := &Type{
|
t := &Typed[T]{
|
||||||
clock: c,
|
clock: c,
|
||||||
queue: queue,
|
queue: queue,
|
||||||
dirty: set{},
|
dirty: set[T]{},
|
||||||
processing: set{},
|
processing: set[T]{},
|
||||||
cond: sync.NewCond(&sync.Mutex{}),
|
cond: sync.NewCond(&sync.Mutex{}),
|
||||||
metrics: metrics,
|
metrics: metrics,
|
||||||
unfinishedWorkUpdatePeriod: updatePeriod,
|
unfinishedWorkUpdatePeriod: updatePeriod,
|
||||||
@ -163,20 +186,23 @@ func newQueue(c clock.WithTicker, queue Queue, metrics queueMetrics, updatePerio
|
|||||||
const defaultUnfinishedWorkUpdatePeriod = 500 * time.Millisecond
|
const defaultUnfinishedWorkUpdatePeriod = 500 * time.Millisecond
|
||||||
|
|
||||||
// Type is a work queue (see the package comment).
|
// Type is a work queue (see the package comment).
|
||||||
type Type struct {
|
// Deprecated: Use Typed instead.
|
||||||
|
type Type = Typed[any]
|
||||||
|
|
||||||
|
type Typed[t comparable] struct {
|
||||||
// queue defines the order in which we will work on items. Every
|
// queue defines the order in which we will work on items. Every
|
||||||
// element of queue should be in the dirty set and not in the
|
// element of queue should be in the dirty set and not in the
|
||||||
// processing set.
|
// processing set.
|
||||||
queue Queue
|
queue Queue[t]
|
||||||
|
|
||||||
// dirty defines all of the items that need to be processed.
|
// dirty defines all of the items that need to be processed.
|
||||||
dirty set
|
dirty set[t]
|
||||||
|
|
||||||
// Things that are currently being processed are in the processing set.
|
// Things that are currently being processed are in the processing set.
|
||||||
// These things may be simultaneously in the dirty set. When we finish
|
// These things may be simultaneously in the dirty set. When we finish
|
||||||
// processing something and remove it from this set, we'll check if
|
// processing something and remove it from this set, we'll check if
|
||||||
// it's in the dirty set, and if so, add it to the queue.
|
// it's in the dirty set, and if so, add it to the queue.
|
||||||
processing set
|
processing set[t]
|
||||||
|
|
||||||
cond *sync.Cond
|
cond *sync.Cond
|
||||||
|
|
||||||
@ -191,27 +217,27 @@ type Type struct {
|
|||||||
|
|
||||||
type empty struct{}
|
type empty struct{}
|
||||||
type t interface{}
|
type t interface{}
|
||||||
type set map[t]empty
|
type set[t comparable] map[t]empty
|
||||||
|
|
||||||
func (s set) has(item t) bool {
|
func (s set[t]) has(item t) bool {
|
||||||
_, exists := s[item]
|
_, exists := s[item]
|
||||||
return exists
|
return exists
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s set) insert(item t) {
|
func (s set[t]) insert(item t) {
|
||||||
s[item] = empty{}
|
s[item] = empty{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s set) delete(item t) {
|
func (s set[t]) delete(item t) {
|
||||||
delete(s, item)
|
delete(s, item)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s set) len() int {
|
func (s set[t]) len() int {
|
||||||
return len(s)
|
return len(s)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Add marks item as needing processing.
|
// Add marks item as needing processing.
|
||||||
func (q *Type) Add(item interface{}) {
|
func (q *Typed[T]) Add(item T) {
|
||||||
q.cond.L.Lock()
|
q.cond.L.Lock()
|
||||||
defer q.cond.L.Unlock()
|
defer q.cond.L.Unlock()
|
||||||
if q.shuttingDown {
|
if q.shuttingDown {
|
||||||
@ -240,7 +266,7 @@ func (q *Type) Add(item interface{}) {
|
|||||||
// Len returns the current queue length, for informational purposes only. You
|
// Len returns the current queue length, for informational purposes only. You
|
||||||
// shouldn't e.g. gate a call to Add() or Get() on Len() being a particular
|
// shouldn't e.g. gate a call to Add() or Get() on Len() being a particular
|
||||||
// value, that can't be synchronized properly.
|
// value, that can't be synchronized properly.
|
||||||
func (q *Type) Len() int {
|
func (q *Typed[T]) Len() int {
|
||||||
q.cond.L.Lock()
|
q.cond.L.Lock()
|
||||||
defer q.cond.L.Unlock()
|
defer q.cond.L.Unlock()
|
||||||
return q.queue.Len()
|
return q.queue.Len()
|
||||||
@ -249,7 +275,7 @@ func (q *Type) Len() int {
|
|||||||
// Get blocks until it can return an item to be processed. If shutdown = true,
|
// Get blocks until it can return an item to be processed. If shutdown = true,
|
||||||
// the caller should end their goroutine. You must call Done with item when you
|
// the caller should end their goroutine. You must call Done with item when you
|
||||||
// have finished processing it.
|
// have finished processing it.
|
||||||
func (q *Type) Get() (item interface{}, shutdown bool) {
|
func (q *Typed[T]) Get() (item T, shutdown bool) {
|
||||||
q.cond.L.Lock()
|
q.cond.L.Lock()
|
||||||
defer q.cond.L.Unlock()
|
defer q.cond.L.Unlock()
|
||||||
for q.queue.Len() == 0 && !q.shuttingDown {
|
for q.queue.Len() == 0 && !q.shuttingDown {
|
||||||
@ -257,7 +283,7 @@ func (q *Type) Get() (item interface{}, shutdown bool) {
|
|||||||
}
|
}
|
||||||
if q.queue.Len() == 0 {
|
if q.queue.Len() == 0 {
|
||||||
// We must be shutting down.
|
// We must be shutting down.
|
||||||
return nil, true
|
return *new(T), true
|
||||||
}
|
}
|
||||||
|
|
||||||
item = q.queue.Pop()
|
item = q.queue.Pop()
|
||||||
@ -273,7 +299,7 @@ func (q *Type) Get() (item interface{}, shutdown bool) {
|
|||||||
// Done marks item as done processing, and if it has been marked as dirty again
|
// Done marks item as done processing, and if it has been marked as dirty again
|
||||||
// while it was being processed, it will be re-added to the queue for
|
// while it was being processed, it will be re-added to the queue for
|
||||||
// re-processing.
|
// re-processing.
|
||||||
func (q *Type) Done(item interface{}) {
|
func (q *Typed[T]) Done(item T) {
|
||||||
q.cond.L.Lock()
|
q.cond.L.Lock()
|
||||||
defer q.cond.L.Unlock()
|
defer q.cond.L.Unlock()
|
||||||
|
|
||||||
@ -290,7 +316,7 @@ func (q *Type) Done(item interface{}) {
|
|||||||
|
|
||||||
// ShutDown will cause q to ignore all new items added to it and
|
// ShutDown will cause q to ignore all new items added to it and
|
||||||
// immediately instruct the worker goroutines to exit.
|
// immediately instruct the worker goroutines to exit.
|
||||||
func (q *Type) ShutDown() {
|
func (q *Typed[T]) ShutDown() {
|
||||||
q.cond.L.Lock()
|
q.cond.L.Lock()
|
||||||
defer q.cond.L.Unlock()
|
defer q.cond.L.Unlock()
|
||||||
|
|
||||||
@ -308,7 +334,7 @@ func (q *Type) ShutDown() {
|
|||||||
// indefinitely. It is, however, safe to call ShutDown after having called
|
// indefinitely. It is, however, safe to call ShutDown after having called
|
||||||
// ShutDownWithDrain, as to force the queue shut down to terminate immediately
|
// ShutDownWithDrain, as to force the queue shut down to terminate immediately
|
||||||
// without waiting for the drainage.
|
// without waiting for the drainage.
|
||||||
func (q *Type) ShutDownWithDrain() {
|
func (q *Typed[T]) ShutDownWithDrain() {
|
||||||
q.cond.L.Lock()
|
q.cond.L.Lock()
|
||||||
defer q.cond.L.Unlock()
|
defer q.cond.L.Unlock()
|
||||||
|
|
||||||
@ -321,14 +347,14 @@ func (q *Type) ShutDownWithDrain() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (q *Type) ShuttingDown() bool {
|
func (q *Typed[T]) ShuttingDown() bool {
|
||||||
q.cond.L.Lock()
|
q.cond.L.Lock()
|
||||||
defer q.cond.L.Unlock()
|
defer q.cond.L.Unlock()
|
||||||
|
|
||||||
return q.shuttingDown
|
return q.shuttingDown
|
||||||
}
|
}
|
||||||
|
|
||||||
func (q *Type) updateUnfinishedWorkLoop() {
|
func (q *Typed[T]) updateUnfinishedWorkLoop() {
|
||||||
t := q.clock.NewTicker(q.unfinishedWorkUpdatePeriod)
|
t := q.clock.NewTicker(q.unfinishedWorkUpdatePeriod)
|
||||||
defer t.Stop()
|
defer t.Stop()
|
||||||
for range t.C() {
|
for range t.C() {
|
||||||
|
@ -29,7 +29,7 @@ import (
|
|||||||
|
|
||||||
// traceQueue traces whether items are touched
|
// traceQueue traces whether items are touched
|
||||||
type traceQueue struct {
|
type traceQueue struct {
|
||||||
workqueue.Queue
|
workqueue.Queue[any]
|
||||||
|
|
||||||
touched map[interface{}]struct{}
|
touched map[interface{}]struct{}
|
||||||
}
|
}
|
||||||
@ -42,7 +42,7 @@ func (t *traceQueue) Touch(item interface{}) {
|
|||||||
t.touched[item] = struct{}{}
|
t.touched[item] = struct{}{}
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ workqueue.Queue = &traceQueue{}
|
var _ workqueue.Queue[any] = &traceQueue{}
|
||||||
|
|
||||||
func TestBasic(t *testing.T) {
|
func TestBasic(t *testing.T) {
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
@ -215,7 +215,7 @@ func TestReinsert(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestCollapse(t *testing.T) {
|
func TestCollapse(t *testing.T) {
|
||||||
tq := &traceQueue{Queue: workqueue.DefaultQueue()}
|
tq := &traceQueue{Queue: workqueue.DefaultQueue[any]()}
|
||||||
q := workqueue.NewWithConfig(workqueue.QueueConfig{
|
q := workqueue.NewWithConfig(workqueue.QueueConfig{
|
||||||
Name: "",
|
Name: "",
|
||||||
Queue: tq,
|
Queue: tq,
|
||||||
@ -244,7 +244,7 @@ func TestCollapse(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestCollapseWhileProcessing(t *testing.T) {
|
func TestCollapseWhileProcessing(t *testing.T) {
|
||||||
tq := &traceQueue{Queue: workqueue.DefaultQueue()}
|
tq := &traceQueue{Queue: workqueue.DefaultQueue[any]()}
|
||||||
q := workqueue.NewWithConfig(workqueue.QueueConfig{
|
q := workqueue.NewWithConfig(workqueue.QueueConfig{
|
||||||
Name: "",
|
Name: "",
|
||||||
Queue: tq,
|
Queue: tq,
|
||||||
|
@ -19,24 +19,33 @@ package workqueue
|
|||||||
import "k8s.io/utils/clock"
|
import "k8s.io/utils/clock"
|
||||||
|
|
||||||
// RateLimitingInterface is an interface that rate limits items being added to the queue.
|
// RateLimitingInterface is an interface that rate limits items being added to the queue.
|
||||||
type RateLimitingInterface interface {
|
//
|
||||||
DelayingInterface
|
// Deprecated: Use TypedRateLimitingInterface instead.
|
||||||
|
type RateLimitingInterface TypedRateLimitingInterface[any]
|
||||||
|
|
||||||
|
// TypedRateLimitingInterface is an interface that rate limits items being added to the queue.
|
||||||
|
type TypedRateLimitingInterface[T comparable] interface {
|
||||||
|
TypedDelayingInterface[T]
|
||||||
|
|
||||||
// AddRateLimited adds an item to the workqueue after the rate limiter says it's ok
|
// AddRateLimited adds an item to the workqueue after the rate limiter says it's ok
|
||||||
AddRateLimited(item interface{})
|
AddRateLimited(item T)
|
||||||
|
|
||||||
// Forget indicates that an item is finished being retried. Doesn't matter whether it's for perm failing
|
// Forget indicates that an item is finished being retried. Doesn't matter whether it's for perm failing
|
||||||
// or for success, we'll stop the rate limiter from tracking it. This only clears the `rateLimiter`, you
|
// or for success, we'll stop the rate limiter from tracking it. This only clears the `rateLimiter`, you
|
||||||
// still have to call `Done` on the queue.
|
// still have to call `Done` on the queue.
|
||||||
Forget(item interface{})
|
Forget(item T)
|
||||||
|
|
||||||
// NumRequeues returns back how many times the item was requeued
|
// NumRequeues returns back how many times the item was requeued
|
||||||
NumRequeues(item interface{}) int
|
NumRequeues(item T) int
|
||||||
}
|
}
|
||||||
|
|
||||||
// RateLimitingQueueConfig specifies optional configurations to customize a RateLimitingInterface.
|
// RateLimitingQueueConfig specifies optional configurations to customize a RateLimitingInterface.
|
||||||
|
//
|
||||||
|
// Deprecated: Use TypedRateLimitingQueueConfig instead.
|
||||||
|
type RateLimitingQueueConfig = TypedRateLimitingQueueConfig[any]
|
||||||
|
|
||||||
type RateLimitingQueueConfig struct {
|
// TypedRateLimitingQueueConfig specifies optional configurations to customize a TypedRateLimitingInterface.
|
||||||
|
type TypedRateLimitingQueueConfig[T comparable] struct {
|
||||||
// Name for the queue. If unnamed, the metrics will not be registered.
|
// Name for the queue. If unnamed, the metrics will not be registered.
|
||||||
Name string
|
Name string
|
||||||
|
|
||||||
@ -48,36 +57,55 @@ type RateLimitingQueueConfig struct {
|
|||||||
Clock clock.WithTicker
|
Clock clock.WithTicker
|
||||||
|
|
||||||
// DelayingQueue optionally allows injecting custom delaying queue DelayingInterface instead of the default one.
|
// DelayingQueue optionally allows injecting custom delaying queue DelayingInterface instead of the default one.
|
||||||
DelayingQueue DelayingInterface
|
DelayingQueue TypedDelayingInterface[T]
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewRateLimitingQueue constructs a new workqueue with rateLimited queuing ability
|
// NewRateLimitingQueue constructs a new workqueue with rateLimited queuing ability
|
||||||
// Remember to call Forget! If you don't, you may end up tracking failures forever.
|
// Remember to call Forget! If you don't, you may end up tracking failures forever.
|
||||||
// NewRateLimitingQueue does not emit metrics. For use with a MetricsProvider, please use
|
// NewRateLimitingQueue does not emit metrics. For use with a MetricsProvider, please use
|
||||||
// NewRateLimitingQueueWithConfig instead and specify a name.
|
// NewRateLimitingQueueWithConfig instead and specify a name.
|
||||||
|
//
|
||||||
|
// Deprecated: Use NewTypedRateLimitingQueue instead.
|
||||||
func NewRateLimitingQueue(rateLimiter RateLimiter) RateLimitingInterface {
|
func NewRateLimitingQueue(rateLimiter RateLimiter) RateLimitingInterface {
|
||||||
return NewRateLimitingQueueWithConfig(rateLimiter, RateLimitingQueueConfig{})
|
return NewRateLimitingQueueWithConfig(rateLimiter, RateLimitingQueueConfig{})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NewTypedRateLimitingQueue constructs a new workqueue with rateLimited queuing ability
|
||||||
|
// Remember to call Forget! If you don't, you may end up tracking failures forever.
|
||||||
|
// NewTypedRateLimitingQueue does not emit metrics. For use with a MetricsProvider, please use
|
||||||
|
// NewTypedRateLimitingQueueWithConfig instead and specify a name.
|
||||||
|
func NewTypedRateLimitingQueue[T comparable](rateLimiter TypedRateLimiter[T]) TypedRateLimitingInterface[T] {
|
||||||
|
return NewTypedRateLimitingQueueWithConfig(rateLimiter, TypedRateLimitingQueueConfig[T]{})
|
||||||
|
}
|
||||||
|
|
||||||
// NewRateLimitingQueueWithConfig constructs a new workqueue with rateLimited queuing ability
|
// NewRateLimitingQueueWithConfig constructs a new workqueue with rateLimited queuing ability
|
||||||
// with options to customize different properties.
|
// with options to customize different properties.
|
||||||
// Remember to call Forget! If you don't, you may end up tracking failures forever.
|
// Remember to call Forget! If you don't, you may end up tracking failures forever.
|
||||||
|
//
|
||||||
|
// Deprecated: Use NewTypedRateLimitingQueueWithConfig instead.
|
||||||
func NewRateLimitingQueueWithConfig(rateLimiter RateLimiter, config RateLimitingQueueConfig) RateLimitingInterface {
|
func NewRateLimitingQueueWithConfig(rateLimiter RateLimiter, config RateLimitingQueueConfig) RateLimitingInterface {
|
||||||
|
return NewTypedRateLimitingQueueWithConfig(rateLimiter, config)
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewTypedRateLimitingQueueWithConfig constructs a new workqueue with rateLimited queuing ability
|
||||||
|
// with options to customize different properties.
|
||||||
|
// Remember to call Forget! If you don't, you may end up tracking failures forever.
|
||||||
|
func NewTypedRateLimitingQueueWithConfig[T comparable](rateLimiter TypedRateLimiter[T], config TypedRateLimitingQueueConfig[T]) TypedRateLimitingInterface[T] {
|
||||||
if config.Clock == nil {
|
if config.Clock == nil {
|
||||||
config.Clock = clock.RealClock{}
|
config.Clock = clock.RealClock{}
|
||||||
}
|
}
|
||||||
|
|
||||||
if config.DelayingQueue == nil {
|
if config.DelayingQueue == nil {
|
||||||
config.DelayingQueue = NewDelayingQueueWithConfig(DelayingQueueConfig{
|
config.DelayingQueue = NewTypedDelayingQueueWithConfig(TypedDelayingQueueConfig[T]{
|
||||||
Name: config.Name,
|
Name: config.Name,
|
||||||
MetricsProvider: config.MetricsProvider,
|
MetricsProvider: config.MetricsProvider,
|
||||||
Clock: config.Clock,
|
Clock: config.Clock,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
return &rateLimitingType{
|
return &rateLimitingType[T]{
|
||||||
DelayingInterface: config.DelayingQueue,
|
TypedDelayingInterface: config.DelayingQueue,
|
||||||
rateLimiter: rateLimiter,
|
rateLimiter: rateLimiter,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -99,21 +127,21 @@ func NewRateLimitingQueueWithDelayingInterface(di DelayingInterface, rateLimiter
|
|||||||
}
|
}
|
||||||
|
|
||||||
// rateLimitingType wraps an Interface and provides rateLimited re-enquing
|
// rateLimitingType wraps an Interface and provides rateLimited re-enquing
|
||||||
type rateLimitingType struct {
|
type rateLimitingType[T comparable] struct {
|
||||||
DelayingInterface
|
TypedDelayingInterface[T]
|
||||||
|
|
||||||
rateLimiter RateLimiter
|
rateLimiter TypedRateLimiter[T]
|
||||||
}
|
}
|
||||||
|
|
||||||
// AddRateLimited AddAfter's the item based on the time when the rate limiter says it's ok
|
// AddRateLimited AddAfter's the item based on the time when the rate limiter says it's ok
|
||||||
func (q *rateLimitingType) AddRateLimited(item interface{}) {
|
func (q *rateLimitingType[T]) AddRateLimited(item T) {
|
||||||
q.DelayingInterface.AddAfter(item, q.rateLimiter.When(item))
|
q.TypedDelayingInterface.AddAfter(item, q.rateLimiter.When(item))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (q *rateLimitingType) NumRequeues(item interface{}) int {
|
func (q *rateLimitingType[T]) NumRequeues(item T) int {
|
||||||
return q.rateLimiter.NumRequeues(item)
|
return q.rateLimiter.NumRequeues(item)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (q *rateLimitingType) Forget(item interface{}) {
|
func (q *rateLimitingType[T]) Forget(item T) {
|
||||||
q.rateLimiter.Forget(item)
|
q.rateLimiter.Forget(item)
|
||||||
}
|
}
|
||||||
|
@ -25,17 +25,17 @@ import (
|
|||||||
|
|
||||||
func TestRateLimitingQueue(t *testing.T) {
|
func TestRateLimitingQueue(t *testing.T) {
|
||||||
limiter := NewItemExponentialFailureRateLimiter(1*time.Millisecond, 1*time.Second)
|
limiter := NewItemExponentialFailureRateLimiter(1*time.Millisecond, 1*time.Second)
|
||||||
queue := NewRateLimitingQueue(limiter).(*rateLimitingType)
|
queue := NewRateLimitingQueue(limiter).(*rateLimitingType[any])
|
||||||
fakeClock := testingclock.NewFakeClock(time.Now())
|
fakeClock := testingclock.NewFakeClock(time.Now())
|
||||||
delayingQueue := &delayingType{
|
delayingQueue := &delayingType[any]{
|
||||||
Interface: New(),
|
TypedInterface: New(),
|
||||||
clock: fakeClock,
|
clock: fakeClock,
|
||||||
heartbeat: fakeClock.NewTicker(maxWait),
|
heartbeat: fakeClock.NewTicker(maxWait),
|
||||||
stopCh: make(chan struct{}),
|
stopCh: make(chan struct{}),
|
||||||
waitingForAddCh: make(chan *waitFor, 1000),
|
waitingForAddCh: make(chan *waitFor, 1000),
|
||||||
metrics: newRetryMetrics("", nil),
|
metrics: newRetryMetrics("", nil),
|
||||||
}
|
}
|
||||||
queue.DelayingInterface = delayingQueue
|
queue.TypedDelayingInterface = delayingQueue
|
||||||
|
|
||||||
queue.AddRateLimited("one")
|
queue.AddRateLimited("one")
|
||||||
waitEntry := <-delayingQueue.waitingForAddCh
|
waitEntry := <-delayingQueue.waitingForAddCh
|
||||||
|
Loading…
Reference in New Issue
Block a user