diff --git a/util/workqueue/default_rate_limiters.go b/util/workqueue/default_rate_limiters.go index 6609d7c2..efda7c19 100644 --- a/util/workqueue/default_rate_limiters.go +++ b/util/workqueue/default_rate_limiters.go @@ -209,3 +209,30 @@ func (r *MaxOfRateLimiter) Forget(item interface{}) { limiter.Forget(item) } } + +// WithMaxWaitRateLimiter have maxDelay which avoids waiting too long +type WithMaxWaitRateLimiter struct { + limiter RateLimiter + maxDelay time.Duration +} + +func NewWithMaxWaitRateLimiter(limiter RateLimiter, maxDelay time.Duration) RateLimiter { + return &WithMaxWaitRateLimiter{limiter: limiter, maxDelay: maxDelay} +} + +func (w WithMaxWaitRateLimiter) When(item interface{}) time.Duration { + delay := w.limiter.When(item) + if delay > w.maxDelay { + return w.maxDelay + } + + return delay +} + +func (w WithMaxWaitRateLimiter) Forget(item interface{}) { + w.limiter.Forget(item) +} + +func (w WithMaxWaitRateLimiter) NumRequeues(item interface{}) int { + return w.limiter.NumRequeues(item) +} diff --git a/util/workqueue/default_rate_limiters_test.go b/util/workqueue/default_rate_limiters_test.go index 91d34a31..5b1546f3 100644 --- a/util/workqueue/default_rate_limiters_test.go +++ b/util/workqueue/default_rate_limiters_test.go @@ -182,3 +182,24 @@ func TestMaxOfRateLimiter(t *testing.T) { } } + +func TestWithMaxWaitRateLimiter(t *testing.T) { + limiter := NewWithMaxWaitRateLimiter(DefaultControllerRateLimiter(), 500*time.Second) + for i := 0; i < 100; i++ { + if e, a := 5*time.Millisecond, limiter.When(i); e != a { + t.Errorf("expected %v, got %v", e, a) + } + } + + for i := 100; i < 5100; i++ { + if e, a := 500*time.Second, limiter.When(i); e < a { + t.Errorf("expected %v, got %v", e, a) + } + } + + for i := 5100; i < 5200; i++ { + if e, a := 500*time.Second, limiter.When(i); e != a { + t.Errorf("expected %v, got %v", e, a) + } + } +}