mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-29 22:46:12 +00:00
Merge pull request #24052 from deads2k/rate-limitting-requeue
Automatic merge from submit-queue Rate limitting requeue I think this will do what I want in almost every case. I'm going to try it out for a single writer quota evaluation. `NewRateLimitingQueue(NewControllerRateLimiter(qps, burst))` gives an `AddRateLimited(item)` that backs off based on the max of a bucket and exponential backoff per item. @liggitt Want to see if the interface works for you too? <!-- Reviewable:start --> --- This change is [<img src="http://reviewable.k8s.io/review_button.svg" height="35" align="absmiddle" alt="Reviewable"/>](http://reviewable.k8s.io/reviews/kubernetes/kubernetes/24052) <!-- Reviewable:end -->
This commit is contained in:
commit
203b5b7c5f
204
pkg/util/workqueue/default_rate_limiters.go
Normal file
204
pkg/util/workqueue/default_rate_limiters.go
Normal file
@ -0,0 +1,204 @@
|
||||
/*
|
||||
Copyright 2016 The Kubernetes Authors All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package workqueue
|
||||
|
||||
import (
|
||||
"math"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/juju/ratelimit"
|
||||
)
|
||||
|
||||
type RateLimiter interface {
|
||||
// When gets an item and gets to decide how long that item should wait
|
||||
When(item interface{}) time.Duration
|
||||
// Forget indicates that an item is finished being retried. Doesn't matter whether its for perm failing
|
||||
// or for success, we'll stop tracking it
|
||||
Forget(item interface{})
|
||||
// NumRequeues returns back how many failures the item has had
|
||||
NumRequeues(item interface{}) int
|
||||
}
|
||||
|
||||
// DefaultControllerRateLimiter is a no-arg constructor for a default rate limiter for a workqueue. It has
|
||||
// both overall and per-item rate limitting. The overall is a token bucket and the per-item is exponential
|
||||
func DefaultControllerRateLimiter() RateLimiter {
|
||||
return NewMaxOfRateLimiter(
|
||||
DefaultItemBasedRateLimiter(),
|
||||
// 10 qps, 100 bucket size. This is only for retry speed and its only the overall factor (not per item)
|
||||
&BucketRateLimiter{Bucket: ratelimit.NewBucketWithRate(float64(10), int64(100))},
|
||||
)
|
||||
}
|
||||
|
||||
// BucketRateLimiter adapts a standard bucket to the workqueue ratelimiter API
|
||||
type BucketRateLimiter struct {
|
||||
*ratelimit.Bucket
|
||||
}
|
||||
|
||||
var _ RateLimiter = &BucketRateLimiter{}
|
||||
|
||||
func (r *BucketRateLimiter) When(item interface{}) time.Duration {
|
||||
return r.Bucket.Take(1)
|
||||
}
|
||||
|
||||
func (r *BucketRateLimiter) NumRequeues(item interface{}) int {
|
||||
return 0
|
||||
}
|
||||
|
||||
func (r *BucketRateLimiter) Forget(item interface{}) {
|
||||
}
|
||||
|
||||
// ItemExponentialFailureRateLimiter does a simple baseDelay*10^<num-failures> limit
|
||||
// dealing with max failures and expiration are up to the caller
|
||||
type ItemExponentialFailureRateLimiter struct {
|
||||
failuresLock sync.Mutex
|
||||
failures map[interface{}]int
|
||||
|
||||
baseDelay time.Duration
|
||||
maxDelay time.Duration
|
||||
}
|
||||
|
||||
var _ RateLimiter = &ItemExponentialFailureRateLimiter{}
|
||||
|
||||
func NewItemExponentialFailureRateLimiter(baseDelay time.Duration, maxDelay time.Duration) RateLimiter {
|
||||
return &ItemExponentialFailureRateLimiter{
|
||||
failures: map[interface{}]int{},
|
||||
baseDelay: baseDelay,
|
||||
maxDelay: maxDelay,
|
||||
}
|
||||
}
|
||||
|
||||
func DefaultItemBasedRateLimiter() RateLimiter {
|
||||
return NewItemExponentialFailureRateLimiter(1*time.Millisecond, 1000*time.Second)
|
||||
}
|
||||
|
||||
func (r *ItemExponentialFailureRateLimiter) When(item interface{}) time.Duration {
|
||||
r.failuresLock.Lock()
|
||||
defer r.failuresLock.Unlock()
|
||||
|
||||
r.failures[item] = r.failures[item] + 1
|
||||
|
||||
calculated := r.baseDelay * time.Duration(math.Pow10(r.failures[item]-1))
|
||||
if calculated > r.maxDelay {
|
||||
return r.maxDelay
|
||||
}
|
||||
|
||||
return calculated
|
||||
}
|
||||
|
||||
func (r *ItemExponentialFailureRateLimiter) NumRequeues(item interface{}) int {
|
||||
r.failuresLock.Lock()
|
||||
defer r.failuresLock.Unlock()
|
||||
|
||||
return r.failures[item]
|
||||
}
|
||||
|
||||
func (r *ItemExponentialFailureRateLimiter) Forget(item interface{}) {
|
||||
r.failuresLock.Lock()
|
||||
defer r.failuresLock.Unlock()
|
||||
|
||||
delete(r.failures, item)
|
||||
}
|
||||
|
||||
// ItemFastSlowRateLimiter does a quick retry for a certain number of attempts, then a slow retry after that
|
||||
type ItemFastSlowRateLimiter struct {
|
||||
failuresLock sync.Mutex
|
||||
failures map[interface{}]int
|
||||
|
||||
maxFastAttempts int
|
||||
fastDelay time.Duration
|
||||
slowDelay time.Duration
|
||||
}
|
||||
|
||||
var _ RateLimiter = &ItemFastSlowRateLimiter{}
|
||||
|
||||
func NewItemFastSlowRateLimiter(fastDelay, slowDelay time.Duration, maxFastAttempts int) RateLimiter {
|
||||
return &ItemFastSlowRateLimiter{
|
||||
failures: map[interface{}]int{},
|
||||
fastDelay: fastDelay,
|
||||
slowDelay: slowDelay,
|
||||
maxFastAttempts: maxFastAttempts,
|
||||
}
|
||||
}
|
||||
|
||||
func (r *ItemFastSlowRateLimiter) When(item interface{}) time.Duration {
|
||||
r.failuresLock.Lock()
|
||||
defer r.failuresLock.Unlock()
|
||||
|
||||
r.failures[item] = r.failures[item] + 1
|
||||
|
||||
if r.failures[item] <= r.maxFastAttempts {
|
||||
return r.fastDelay
|
||||
}
|
||||
|
||||
return r.slowDelay
|
||||
}
|
||||
|
||||
func (r *ItemFastSlowRateLimiter) NumRequeues(item interface{}) int {
|
||||
r.failuresLock.Lock()
|
||||
defer r.failuresLock.Unlock()
|
||||
|
||||
return r.failures[item]
|
||||
}
|
||||
|
||||
func (r *ItemFastSlowRateLimiter) Forget(item interface{}) {
|
||||
r.failuresLock.Lock()
|
||||
defer r.failuresLock.Unlock()
|
||||
|
||||
delete(r.failures, item)
|
||||
}
|
||||
|
||||
// MaxOfRateLimiter calls every RateLimiter and returns the worst case response
|
||||
// When used with a token bucket limiter, the burst could be apparently exceeded in cases where particular items
|
||||
// were separately delayed a longer time.
|
||||
type MaxOfRateLimiter struct {
|
||||
limiters []RateLimiter
|
||||
}
|
||||
|
||||
func (r *MaxOfRateLimiter) When(item interface{}) time.Duration {
|
||||
ret := time.Duration(0)
|
||||
for _, limiter := range r.limiters {
|
||||
curr := limiter.When(item)
|
||||
if curr > ret {
|
||||
ret = curr
|
||||
}
|
||||
}
|
||||
|
||||
return ret
|
||||
}
|
||||
|
||||
func NewMaxOfRateLimiter(limiters ...RateLimiter) RateLimiter {
|
||||
return &MaxOfRateLimiter{limiters: limiters}
|
||||
}
|
||||
|
||||
func (r *MaxOfRateLimiter) NumRequeues(item interface{}) int {
|
||||
ret := 0
|
||||
for _, limiter := range r.limiters {
|
||||
curr := limiter.NumRequeues(item)
|
||||
if curr > ret {
|
||||
ret = curr
|
||||
}
|
||||
}
|
||||
|
||||
return ret
|
||||
}
|
||||
|
||||
func (r *MaxOfRateLimiter) Forget(item interface{}) {
|
||||
for _, limiter := range r.limiters {
|
||||
limiter.Forget(item)
|
||||
}
|
||||
}
|
151
pkg/util/workqueue/default_rate_limiters_test.go
Normal file
151
pkg/util/workqueue/default_rate_limiters_test.go
Normal file
@ -0,0 +1,151 @@
|
||||
/*
|
||||
Copyright 2016 The Kubernetes Authors All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package workqueue
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestItemExponentialFailureRateLimiter(t *testing.T) {
|
||||
limiter := NewItemExponentialFailureRateLimiter(1*time.Millisecond, 1*time.Second)
|
||||
|
||||
if e, a := 1*time.Millisecond, limiter.When("one"); e != a {
|
||||
t.Errorf("expected %v, got %v", e, a)
|
||||
}
|
||||
if e, a := 10*time.Millisecond, limiter.When("one"); e != a {
|
||||
t.Errorf("expected %v, got %v", e, a)
|
||||
}
|
||||
if e, a := 100*time.Millisecond, limiter.When("one"); e != a {
|
||||
t.Errorf("expected %v, got %v", e, a)
|
||||
}
|
||||
if e, a := 1*time.Second, limiter.When("one"); e != a {
|
||||
t.Errorf("expected %v, got %v", e, a)
|
||||
}
|
||||
if e, a := 1*time.Second, limiter.When("one"); e != a {
|
||||
t.Errorf("expected %v, got %v", e, a)
|
||||
}
|
||||
if e, a := 5, limiter.NumRequeues("one"); e != a {
|
||||
t.Errorf("expected %v, got %v", e, a)
|
||||
}
|
||||
|
||||
if e, a := 1*time.Millisecond, limiter.When("two"); e != a {
|
||||
t.Errorf("expected %v, got %v", e, a)
|
||||
}
|
||||
if e, a := 10*time.Millisecond, limiter.When("two"); e != a {
|
||||
t.Errorf("expected %v, got %v", e, a)
|
||||
}
|
||||
if e, a := 2, limiter.NumRequeues("two"); e != a {
|
||||
t.Errorf("expected %v, got %v", e, a)
|
||||
}
|
||||
|
||||
limiter.Forget("one")
|
||||
if e, a := 0, limiter.NumRequeues("one"); e != a {
|
||||
t.Errorf("expected %v, got %v", e, a)
|
||||
}
|
||||
if e, a := 1*time.Millisecond, limiter.When("one"); e != a {
|
||||
t.Errorf("expected %v, got %v", e, a)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestItemFastSlowRateLimiter(t *testing.T) {
|
||||
limiter := NewItemFastSlowRateLimiter(5*time.Millisecond, 10*time.Second, 3)
|
||||
|
||||
if e, a := 5*time.Millisecond, limiter.When("one"); e != a {
|
||||
t.Errorf("expected %v, got %v", e, a)
|
||||
}
|
||||
if e, a := 5*time.Millisecond, limiter.When("one"); e != a {
|
||||
t.Errorf("expected %v, got %v", e, a)
|
||||
}
|
||||
if e, a := 5*time.Millisecond, limiter.When("one"); e != a {
|
||||
t.Errorf("expected %v, got %v", e, a)
|
||||
}
|
||||
if e, a := 10*time.Second, limiter.When("one"); e != a {
|
||||
t.Errorf("expected %v, got %v", e, a)
|
||||
}
|
||||
if e, a := 10*time.Second, limiter.When("one"); e != a {
|
||||
t.Errorf("expected %v, got %v", e, a)
|
||||
}
|
||||
if e, a := 5, limiter.NumRequeues("one"); e != a {
|
||||
t.Errorf("expected %v, got %v", e, a)
|
||||
}
|
||||
|
||||
if e, a := 5*time.Millisecond, limiter.When("two"); e != a {
|
||||
t.Errorf("expected %v, got %v", e, a)
|
||||
}
|
||||
if e, a := 5*time.Millisecond, limiter.When("two"); e != a {
|
||||
t.Errorf("expected %v, got %v", e, a)
|
||||
}
|
||||
if e, a := 2, limiter.NumRequeues("two"); e != a {
|
||||
t.Errorf("expected %v, got %v", e, a)
|
||||
}
|
||||
|
||||
limiter.Forget("one")
|
||||
if e, a := 0, limiter.NumRequeues("one"); e != a {
|
||||
t.Errorf("expected %v, got %v", e, a)
|
||||
}
|
||||
if e, a := 5*time.Millisecond, limiter.When("one"); e != a {
|
||||
t.Errorf("expected %v, got %v", e, a)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestMaxOfRateLimiter(t *testing.T) {
|
||||
limiter := NewMaxOfRateLimiter(
|
||||
NewItemFastSlowRateLimiter(5*time.Millisecond, 3*time.Second, 3),
|
||||
NewItemExponentialFailureRateLimiter(1*time.Millisecond, 1*time.Second),
|
||||
)
|
||||
|
||||
if e, a := 5*time.Millisecond, limiter.When("one"); e != a {
|
||||
t.Errorf("expected %v, got %v", e, a)
|
||||
}
|
||||
if e, a := 10*time.Millisecond, limiter.When("one"); e != a {
|
||||
t.Errorf("expected %v, got %v", e, a)
|
||||
}
|
||||
if e, a := 100*time.Millisecond, limiter.When("one"); e != a {
|
||||
t.Errorf("expected %v, got %v", e, a)
|
||||
}
|
||||
if e, a := 3*time.Second, limiter.When("one"); e != a {
|
||||
t.Errorf("expected %v, got %v", e, a)
|
||||
}
|
||||
if e, a := 3*time.Second, limiter.When("one"); e != a {
|
||||
t.Errorf("expected %v, got %v", e, a)
|
||||
}
|
||||
if e, a := 5, limiter.NumRequeues("one"); e != a {
|
||||
t.Errorf("expected %v, got %v", e, a)
|
||||
}
|
||||
|
||||
if e, a := 5*time.Millisecond, limiter.When("two"); e != a {
|
||||
t.Errorf("expected %v, got %v", e, a)
|
||||
}
|
||||
if e, a := 10*time.Millisecond, limiter.When("two"); e != a {
|
||||
t.Errorf("expected %v, got %v", e, a)
|
||||
}
|
||||
if e, a := 2, limiter.NumRequeues("two"); e != a {
|
||||
t.Errorf("expected %v, got %v", e, a)
|
||||
}
|
||||
|
||||
limiter.Forget("one")
|
||||
if e, a := 0, limiter.NumRequeues("one"); e != a {
|
||||
t.Errorf("expected %v, got %v", e, a)
|
||||
}
|
||||
if e, a := 5*time.Millisecond, limiter.When("one"); e != a {
|
||||
t.Errorf("expected %v, got %v", e, a)
|
||||
}
|
||||
|
||||
}
|
61
pkg/util/workqueue/rate_limitting_queue.go
Normal file
61
pkg/util/workqueue/rate_limitting_queue.go
Normal file
@ -0,0 +1,61 @@
|
||||
/*
|
||||
Copyright 2016 The Kubernetes Authors All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package workqueue
|
||||
|
||||
// RateLimitingInterface is an Interface that can Add an item at a later time. This makes it easier to
|
||||
// requeue items after failures without ending up in a hot-loop.
|
||||
type RateLimitingInterface interface {
|
||||
DelayingInterface
|
||||
// AddRateLimited adds an item to the workqueue after the rate limiter says its ok
|
||||
AddRateLimited(item interface{})
|
||||
|
||||
// Forget indicates that an item is finished being retried. Doesn't matter whether its for perm failing
|
||||
// or for success, we'll stop the rate limiter from tracking it. This only clears the `rateLimiter`, you
|
||||
// still have to call `Done` on the queue.
|
||||
Forget(item interface{})
|
||||
// NumRequeues returns back how many times the item was requeued
|
||||
NumRequeues(item interface{}) int
|
||||
}
|
||||
|
||||
// NewRateLimitingQueue constructs a new workqueue with rateLimited queuing ability
|
||||
// Remember to call Forget! If you don't, you may end up tracking failures forever.
|
||||
func NewRateLimitingQueue(rateLimiter RateLimiter) RateLimitingInterface {
|
||||
return &rateLimitingType{
|
||||
DelayingInterface: NewDelayingQueue(),
|
||||
rateLimiter: rateLimiter,
|
||||
}
|
||||
}
|
||||
|
||||
// rateLimitingType wraps an Interface and provides rateLimited re-enquing
|
||||
type rateLimitingType struct {
|
||||
DelayingInterface
|
||||
|
||||
rateLimiter RateLimiter
|
||||
}
|
||||
|
||||
// AddRateLimited AddAfter's the item based on the time when the rate limiter says its ok
|
||||
func (q *rateLimitingType) AddRateLimited(item interface{}) {
|
||||
q.DelayingInterface.AddAfter(item, q.rateLimiter.When(item))
|
||||
}
|
||||
|
||||
func (q *rateLimitingType) NumRequeues(item interface{}) int {
|
||||
return q.rateLimiter.NumRequeues(item)
|
||||
}
|
||||
|
||||
func (q *rateLimitingType) Forget(item interface{}) {
|
||||
q.rateLimiter.Forget(item)
|
||||
}
|
74
pkg/util/workqueue/rate_limitting_queue_test.go
Normal file
74
pkg/util/workqueue/rate_limitting_queue_test.go
Normal file
@ -0,0 +1,74 @@
|
||||
/*
|
||||
Copyright 2016 The Kubernetes Authors All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package workqueue
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"k8s.io/kubernetes/pkg/util"
|
||||
)
|
||||
|
||||
func TestRateLimitingQueue(t *testing.T) {
|
||||
limiter := NewItemExponentialFailureRateLimiter(1*time.Millisecond, 1*time.Second)
|
||||
queue := NewRateLimitingQueue(limiter).(*rateLimitingType)
|
||||
fakeClock := util.NewFakeClock(time.Now())
|
||||
delayingQueue := &delayingType{
|
||||
Interface: New(),
|
||||
clock: fakeClock,
|
||||
heartbeat: fakeClock.Tick(maxWait),
|
||||
stopCh: make(chan struct{}),
|
||||
waitingForAddCh: make(chan waitFor, 1000),
|
||||
}
|
||||
queue.DelayingInterface = delayingQueue
|
||||
|
||||
queue.AddRateLimited("one")
|
||||
waitEntry := <-delayingQueue.waitingForAddCh
|
||||
if e, a := 1*time.Millisecond, waitEntry.readyAt.Sub(fakeClock.Now()); e != a {
|
||||
t.Errorf("expected %v, got %v", e, a)
|
||||
}
|
||||
queue.AddRateLimited("one")
|
||||
waitEntry = <-delayingQueue.waitingForAddCh
|
||||
if e, a := 10*time.Millisecond, waitEntry.readyAt.Sub(fakeClock.Now()); e != a {
|
||||
t.Errorf("expected %v, got %v", e, a)
|
||||
}
|
||||
if e, a := 2, queue.NumRequeues("one"); e != a {
|
||||
t.Errorf("expected %v, got %v", e, a)
|
||||
}
|
||||
|
||||
queue.AddRateLimited("two")
|
||||
waitEntry = <-delayingQueue.waitingForAddCh
|
||||
if e, a := 1*time.Millisecond, waitEntry.readyAt.Sub(fakeClock.Now()); e != a {
|
||||
t.Errorf("expected %v, got %v", e, a)
|
||||
}
|
||||
queue.AddRateLimited("two")
|
||||
waitEntry = <-delayingQueue.waitingForAddCh
|
||||
if e, a := 10*time.Millisecond, waitEntry.readyAt.Sub(fakeClock.Now()); e != a {
|
||||
t.Errorf("expected %v, got %v", e, a)
|
||||
}
|
||||
|
||||
queue.Forget("one")
|
||||
if e, a := 0, queue.NumRequeues("one"); e != a {
|
||||
t.Errorf("expected %v, got %v", e, a)
|
||||
}
|
||||
queue.AddRateLimited("one")
|
||||
waitEntry = <-delayingQueue.waitingForAddCh
|
||||
if e, a := 1*time.Millisecond, waitEntry.readyAt.Sub(fakeClock.Now()); e != a {
|
||||
t.Errorf("expected %v, got %v", e, a)
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue
Block a user