From bf95b124e66acf503e6699e42504d8a5d45883ca Mon Sep 17 00:00:00 2001 From: deads2k Date: Fri, 8 Apr 2016 13:43:57 -0400 Subject: [PATCH] add built-in ratelimiter to workqueue --- pkg/util/workqueue/default_rate_limiters.go | 204 ++++++++++++++++++ .../workqueue/default_rate_limiters_test.go | 151 +++++++++++++ pkg/util/workqueue/rate_limitting_queue.go | 61 ++++++ .../workqueue/rate_limitting_queue_test.go | 74 +++++++ 4 files changed, 490 insertions(+) create mode 100644 pkg/util/workqueue/default_rate_limiters.go create mode 100644 pkg/util/workqueue/default_rate_limiters_test.go create mode 100644 pkg/util/workqueue/rate_limitting_queue.go create mode 100644 pkg/util/workqueue/rate_limitting_queue_test.go diff --git a/pkg/util/workqueue/default_rate_limiters.go b/pkg/util/workqueue/default_rate_limiters.go new file mode 100644 index 00000000000..6ca484b62cb --- /dev/null +++ b/pkg/util/workqueue/default_rate_limiters.go @@ -0,0 +1,204 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package workqueue + +import ( + "math" + "sync" + "time" + + "github.com/juju/ratelimit" +) + +type RateLimiter interface { + // When gets an item and gets to decide how long that item should wait + When(item interface{}) time.Duration + // Forget indicates that an item is finished being retried. Doesn't matter whether its for perm failing + // or for success, we'll stop tracking it + Forget(item interface{}) + // NumRequeues returns back how many failures the item has had + NumRequeues(item interface{}) int +} + +// DefaultControllerRateLimiter is a no-arg constructor for a default rate limiter for a workqueue. It has +// both overall and per-item rate limitting. The overall is a token bucket and the per-item is exponential +func DefaultControllerRateLimiter() RateLimiter { + return NewMaxOfRateLimiter( + DefaultItemBasedRateLimiter(), + // 10 qps, 100 bucket size. This is only for retry speed and its only the overall factor (not per item) + &BucketRateLimiter{Bucket: ratelimit.NewBucketWithRate(float64(10), int64(100))}, + ) +} + +// BucketRateLimiter adapts a standard bucket to the workqueue ratelimiter API +type BucketRateLimiter struct { + *ratelimit.Bucket +} + +var _ RateLimiter = &BucketRateLimiter{} + +func (r *BucketRateLimiter) When(item interface{}) time.Duration { + return r.Bucket.Take(1) +} + +func (r *BucketRateLimiter) NumRequeues(item interface{}) int { + return 0 +} + +func (r *BucketRateLimiter) Forget(item interface{}) { +} + +// ItemExponentialFailureRateLimiter does a simple baseDelay*10^ limit +// dealing with max failures and expiration are up to the caller +type ItemExponentialFailureRateLimiter struct { + failuresLock sync.Mutex + failures map[interface{}]int + + baseDelay time.Duration + maxDelay time.Duration +} + +var _ RateLimiter = &ItemExponentialFailureRateLimiter{} + +func NewItemExponentialFailureRateLimiter(baseDelay time.Duration, maxDelay time.Duration) RateLimiter { + return &ItemExponentialFailureRateLimiter{ + failures: map[interface{}]int{}, + baseDelay: baseDelay, + maxDelay: maxDelay, + } +} + +func DefaultItemBasedRateLimiter() RateLimiter { + return NewItemExponentialFailureRateLimiter(1*time.Millisecond, 1000*time.Second) +} + +func (r *ItemExponentialFailureRateLimiter) When(item interface{}) time.Duration { + r.failuresLock.Lock() + defer r.failuresLock.Unlock() + + r.failures[item] = r.failures[item] + 1 + + calculated := r.baseDelay * time.Duration(math.Pow10(r.failures[item]-1)) + if calculated > r.maxDelay { + return r.maxDelay + } + + return calculated +} + +func (r *ItemExponentialFailureRateLimiter) NumRequeues(item interface{}) int { + r.failuresLock.Lock() + defer r.failuresLock.Unlock() + + return r.failures[item] +} + +func (r *ItemExponentialFailureRateLimiter) Forget(item interface{}) { + r.failuresLock.Lock() + defer r.failuresLock.Unlock() + + delete(r.failures, item) +} + +// ItemFastSlowRateLimiter does a quick retry for a certain number of attempts, then a slow retry after that +type ItemFastSlowRateLimiter struct { + failuresLock sync.Mutex + failures map[interface{}]int + + maxFastAttempts int + fastDelay time.Duration + slowDelay time.Duration +} + +var _ RateLimiter = &ItemFastSlowRateLimiter{} + +func NewItemFastSlowRateLimiter(fastDelay, slowDelay time.Duration, maxFastAttempts int) RateLimiter { + return &ItemFastSlowRateLimiter{ + failures: map[interface{}]int{}, + fastDelay: fastDelay, + slowDelay: slowDelay, + maxFastAttempts: maxFastAttempts, + } +} + +func (r *ItemFastSlowRateLimiter) When(item interface{}) time.Duration { + r.failuresLock.Lock() + defer r.failuresLock.Unlock() + + r.failures[item] = r.failures[item] + 1 + + if r.failures[item] <= r.maxFastAttempts { + return r.fastDelay + } + + return r.slowDelay +} + +func (r *ItemFastSlowRateLimiter) NumRequeues(item interface{}) int { + r.failuresLock.Lock() + defer r.failuresLock.Unlock() + + return r.failures[item] +} + +func (r *ItemFastSlowRateLimiter) Forget(item interface{}) { + r.failuresLock.Lock() + defer r.failuresLock.Unlock() + + delete(r.failures, item) +} + +// MaxOfRateLimiter calls every RateLimiter and returns the worst case response +// When used with a token bucket limiter, the burst could be apparently exceeded in cases where particular items +// were separately delayed a longer time. +type MaxOfRateLimiter struct { + limiters []RateLimiter +} + +func (r *MaxOfRateLimiter) When(item interface{}) time.Duration { + ret := time.Duration(0) + for _, limiter := range r.limiters { + curr := limiter.When(item) + if curr > ret { + ret = curr + } + } + + return ret +} + +func NewMaxOfRateLimiter(limiters ...RateLimiter) RateLimiter { + return &MaxOfRateLimiter{limiters: limiters} +} + +func (r *MaxOfRateLimiter) NumRequeues(item interface{}) int { + ret := 0 + for _, limiter := range r.limiters { + curr := limiter.NumRequeues(item) + if curr > ret { + ret = curr + } + } + + return ret +} + +func (r *MaxOfRateLimiter) Forget(item interface{}) { + for _, limiter := range r.limiters { + limiter.Forget(item) + } +} diff --git a/pkg/util/workqueue/default_rate_limiters_test.go b/pkg/util/workqueue/default_rate_limiters_test.go new file mode 100644 index 00000000000..8a96fb04e28 --- /dev/null +++ b/pkg/util/workqueue/default_rate_limiters_test.go @@ -0,0 +1,151 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package workqueue + +import ( + "testing" + "time" +) + +func TestItemExponentialFailureRateLimiter(t *testing.T) { + limiter := NewItemExponentialFailureRateLimiter(1*time.Millisecond, 1*time.Second) + + if e, a := 1*time.Millisecond, limiter.When("one"); e != a { + t.Errorf("expected %v, got %v", e, a) + } + if e, a := 10*time.Millisecond, limiter.When("one"); e != a { + t.Errorf("expected %v, got %v", e, a) + } + if e, a := 100*time.Millisecond, limiter.When("one"); e != a { + t.Errorf("expected %v, got %v", e, a) + } + if e, a := 1*time.Second, limiter.When("one"); e != a { + t.Errorf("expected %v, got %v", e, a) + } + if e, a := 1*time.Second, limiter.When("one"); e != a { + t.Errorf("expected %v, got %v", e, a) + } + if e, a := 5, limiter.NumRequeues("one"); e != a { + t.Errorf("expected %v, got %v", e, a) + } + + if e, a := 1*time.Millisecond, limiter.When("two"); e != a { + t.Errorf("expected %v, got %v", e, a) + } + if e, a := 10*time.Millisecond, limiter.When("two"); e != a { + t.Errorf("expected %v, got %v", e, a) + } + if e, a := 2, limiter.NumRequeues("two"); e != a { + t.Errorf("expected %v, got %v", e, a) + } + + limiter.Forget("one") + if e, a := 0, limiter.NumRequeues("one"); e != a { + t.Errorf("expected %v, got %v", e, a) + } + if e, a := 1*time.Millisecond, limiter.When("one"); e != a { + t.Errorf("expected %v, got %v", e, a) + } + +} + +func TestItemFastSlowRateLimiter(t *testing.T) { + limiter := NewItemFastSlowRateLimiter(5*time.Millisecond, 10*time.Second, 3) + + if e, a := 5*time.Millisecond, limiter.When("one"); e != a { + t.Errorf("expected %v, got %v", e, a) + } + if e, a := 5*time.Millisecond, limiter.When("one"); e != a { + t.Errorf("expected %v, got %v", e, a) + } + if e, a := 5*time.Millisecond, limiter.When("one"); e != a { + t.Errorf("expected %v, got %v", e, a) + } + if e, a := 10*time.Second, limiter.When("one"); e != a { + t.Errorf("expected %v, got %v", e, a) + } + if e, a := 10*time.Second, limiter.When("one"); e != a { + t.Errorf("expected %v, got %v", e, a) + } + if e, a := 5, limiter.NumRequeues("one"); e != a { + t.Errorf("expected %v, got %v", e, a) + } + + if e, a := 5*time.Millisecond, limiter.When("two"); e != a { + t.Errorf("expected %v, got %v", e, a) + } + if e, a := 5*time.Millisecond, limiter.When("two"); e != a { + t.Errorf("expected %v, got %v", e, a) + } + if e, a := 2, limiter.NumRequeues("two"); e != a { + t.Errorf("expected %v, got %v", e, a) + } + + limiter.Forget("one") + if e, a := 0, limiter.NumRequeues("one"); e != a { + t.Errorf("expected %v, got %v", e, a) + } + if e, a := 5*time.Millisecond, limiter.When("one"); e != a { + t.Errorf("expected %v, got %v", e, a) + } + +} + +func TestMaxOfRateLimiter(t *testing.T) { + limiter := NewMaxOfRateLimiter( + NewItemFastSlowRateLimiter(5*time.Millisecond, 3*time.Second, 3), + NewItemExponentialFailureRateLimiter(1*time.Millisecond, 1*time.Second), + ) + + if e, a := 5*time.Millisecond, limiter.When("one"); e != a { + t.Errorf("expected %v, got %v", e, a) + } + if e, a := 10*time.Millisecond, limiter.When("one"); e != a { + t.Errorf("expected %v, got %v", e, a) + } + if e, a := 100*time.Millisecond, limiter.When("one"); e != a { + t.Errorf("expected %v, got %v", e, a) + } + if e, a := 3*time.Second, limiter.When("one"); e != a { + t.Errorf("expected %v, got %v", e, a) + } + if e, a := 3*time.Second, limiter.When("one"); e != a { + t.Errorf("expected %v, got %v", e, a) + } + if e, a := 5, limiter.NumRequeues("one"); e != a { + t.Errorf("expected %v, got %v", e, a) + } + + if e, a := 5*time.Millisecond, limiter.When("two"); e != a { + t.Errorf("expected %v, got %v", e, a) + } + if e, a := 10*time.Millisecond, limiter.When("two"); e != a { + t.Errorf("expected %v, got %v", e, a) + } + if e, a := 2, limiter.NumRequeues("two"); e != a { + t.Errorf("expected %v, got %v", e, a) + } + + limiter.Forget("one") + if e, a := 0, limiter.NumRequeues("one"); e != a { + t.Errorf("expected %v, got %v", e, a) + } + if e, a := 5*time.Millisecond, limiter.When("one"); e != a { + t.Errorf("expected %v, got %v", e, a) + } + +} diff --git a/pkg/util/workqueue/rate_limitting_queue.go b/pkg/util/workqueue/rate_limitting_queue.go new file mode 100644 index 00000000000..272ba52d9e3 --- /dev/null +++ b/pkg/util/workqueue/rate_limitting_queue.go @@ -0,0 +1,61 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package workqueue + +// RateLimitingInterface is an Interface that can Add an item at a later time. This makes it easier to +// requeue items after failures without ending up in a hot-loop. +type RateLimitingInterface interface { + DelayingInterface + // AddRateLimited adds an item to the workqueue after the rate limiter says its ok + AddRateLimited(item interface{}) + + // Forget indicates that an item is finished being retried. Doesn't matter whether its for perm failing + // or for success, we'll stop the rate limiter from tracking it. This only clears the `rateLimiter`, you + // still have to call `Done` on the queue. + Forget(item interface{}) + // NumRequeues returns back how many times the item was requeued + NumRequeues(item interface{}) int +} + +// NewRateLimitingQueue constructs a new workqueue with rateLimited queuing ability +// Remember to call Forget! If you don't, you may end up tracking failures forever. +func NewRateLimitingQueue(rateLimiter RateLimiter) RateLimitingInterface { + return &rateLimitingType{ + DelayingInterface: NewDelayingQueue(), + rateLimiter: rateLimiter, + } +} + +// rateLimitingType wraps an Interface and provides rateLimited re-enquing +type rateLimitingType struct { + DelayingInterface + + rateLimiter RateLimiter +} + +// AddRateLimited AddAfter's the item based on the time when the rate limiter says its ok +func (q *rateLimitingType) AddRateLimited(item interface{}) { + q.DelayingInterface.AddAfter(item, q.rateLimiter.When(item)) +} + +func (q *rateLimitingType) NumRequeues(item interface{}) int { + return q.rateLimiter.NumRequeues(item) +} + +func (q *rateLimitingType) Forget(item interface{}) { + q.rateLimiter.Forget(item) +} diff --git a/pkg/util/workqueue/rate_limitting_queue_test.go b/pkg/util/workqueue/rate_limitting_queue_test.go new file mode 100644 index 00000000000..deca140a43b --- /dev/null +++ b/pkg/util/workqueue/rate_limitting_queue_test.go @@ -0,0 +1,74 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package workqueue + +import ( + "testing" + "time" + + "k8s.io/kubernetes/pkg/util" +) + +func TestRateLimitingQueue(t *testing.T) { + limiter := NewItemExponentialFailureRateLimiter(1*time.Millisecond, 1*time.Second) + queue := NewRateLimitingQueue(limiter).(*rateLimitingType) + fakeClock := util.NewFakeClock(time.Now()) + delayingQueue := &delayingType{ + Interface: New(), + clock: fakeClock, + heartbeat: fakeClock.Tick(maxWait), + stopCh: make(chan struct{}), + waitingForAddCh: make(chan waitFor, 1000), + } + queue.DelayingInterface = delayingQueue + + queue.AddRateLimited("one") + waitEntry := <-delayingQueue.waitingForAddCh + if e, a := 1*time.Millisecond, waitEntry.readyAt.Sub(fakeClock.Now()); e != a { + t.Errorf("expected %v, got %v", e, a) + } + queue.AddRateLimited("one") + waitEntry = <-delayingQueue.waitingForAddCh + if e, a := 10*time.Millisecond, waitEntry.readyAt.Sub(fakeClock.Now()); e != a { + t.Errorf("expected %v, got %v", e, a) + } + if e, a := 2, queue.NumRequeues("one"); e != a { + t.Errorf("expected %v, got %v", e, a) + } + + queue.AddRateLimited("two") + waitEntry = <-delayingQueue.waitingForAddCh + if e, a := 1*time.Millisecond, waitEntry.readyAt.Sub(fakeClock.Now()); e != a { + t.Errorf("expected %v, got %v", e, a) + } + queue.AddRateLimited("two") + waitEntry = <-delayingQueue.waitingForAddCh + if e, a := 10*time.Millisecond, waitEntry.readyAt.Sub(fakeClock.Now()); e != a { + t.Errorf("expected %v, got %v", e, a) + } + + queue.Forget("one") + if e, a := 0, queue.NumRequeues("one"); e != a { + t.Errorf("expected %v, got %v", e, a) + } + queue.AddRateLimited("one") + waitEntry = <-delayingQueue.waitingForAddCh + if e, a := 1*time.Millisecond, waitEntry.readyAt.Sub(fakeClock.Now()); e != a { + t.Errorf("expected %v, got %v", e, a) + } + +}