From 6846a0a6b62725a5888d66e2789f2af4fb172045 Mon Sep 17 00:00:00 2001 From: Maciej Borsz Date: Tue, 18 Feb 2020 11:24:20 +0100 Subject: [PATCH] Implement ItemBucketRateLimiter --- .../src/k8s.io/client-go/util/workqueue/BUILD | 1 + .../util/workqueue/default_rate_limiters.go | 48 +++++++++++++++++++ .../workqueue/default_rate_limiters_test.go | 29 +++++++++++ 3 files changed, 78 insertions(+) diff --git a/staging/src/k8s.io/client-go/util/workqueue/BUILD b/staging/src/k8s.io/client-go/util/workqueue/BUILD index 538ad31cde9..264cead1da3 100644 --- a/staging/src/k8s.io/client-go/util/workqueue/BUILD +++ b/staging/src/k8s.io/client-go/util/workqueue/BUILD @@ -20,6 +20,7 @@ go_test( deps = [ "//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", + "//vendor/golang.org/x/time/rate:go_default_library", ], ) diff --git a/staging/src/k8s.io/client-go/util/workqueue/default_rate_limiters.go b/staging/src/k8s.io/client-go/util/workqueue/default_rate_limiters.go index 71bb6322e07..6dc8ec5f225 100644 --- a/staging/src/k8s.io/client-go/util/workqueue/default_rate_limiters.go +++ b/staging/src/k8s.io/client-go/util/workqueue/default_rate_limiters.go @@ -62,6 +62,54 @@ func (r *BucketRateLimiter) NumRequeues(item interface{}) int { func (r *BucketRateLimiter) Forget(item interface{}) { } +// ItemBucketRateLimiter implements a workqueue ratelimiter API using standard rate.Limiter. +// Each key is using a separate limiter. +type ItemBucketRateLimiter struct { + r rate.Limit + burst int + + limitersLock sync.Mutex + limiters map[interface{}]*rate.Limiter +} + +var _ RateLimiter = &ItemBucketRateLimiter{} + +// NewItemBucketRateLimiter creates new ItemBucketRateLimiter instance. +func NewItemBucketRateLimiter(r rate.Limit, burst int) *ItemBucketRateLimiter { + return &ItemBucketRateLimiter{ + r: r, + burst: burst, + limiters: make(map[interface{}]*rate.Limiter), + } +} + +// When returns a time.Duration which we need to wait before item is processed. +func (r *ItemBucketRateLimiter) When(item interface{}) time.Duration { + r.limitersLock.Lock() + defer r.limitersLock.Unlock() + + limiter, ok := r.limiters[item] + if !ok { + limiter = rate.NewLimiter(r.r, r.burst) + r.limiters[item] = limiter + } + + return limiter.Reserve().Delay() +} + +// NumRequeues returns always 0 (doesn't apply to ItemBucketRateLimiter). +func (r *ItemBucketRateLimiter) NumRequeues(item interface{}) int { + return 0 +} + +// Forget removes item from the internal state. +func (r *ItemBucketRateLimiter) Forget(item interface{}) { + r.limitersLock.Lock() + defer r.limitersLock.Unlock() + + delete(r.limiters, item) +} + // ItemExponentialFailureRateLimiter does a simple baseDelay*2^ limit // dealing with max failures and expiration are up to the caller type ItemExponentialFailureRateLimiter struct { diff --git a/staging/src/k8s.io/client-go/util/workqueue/default_rate_limiters_test.go b/staging/src/k8s.io/client-go/util/workqueue/default_rate_limiters_test.go index 91d34a31779..9d7ab747e4e 100644 --- a/staging/src/k8s.io/client-go/util/workqueue/default_rate_limiters_test.go +++ b/staging/src/k8s.io/client-go/util/workqueue/default_rate_limiters_test.go @@ -19,6 +19,8 @@ package workqueue import ( "testing" "time" + + "golang.org/x/time/rate" ) func TestItemExponentialFailureRateLimiter(t *testing.T) { @@ -96,6 +98,33 @@ func TestItemExponentialFailureRateLimiterOverFlow(t *testing.T) { } +func TestItemBucketRateLimiter(t *testing.T) { + limiter := NewItemBucketRateLimiter(rate.Every(100*time.Millisecond), 1) + + // Use initial burst. + if got := limiter.When("one"); got != 0 { + t.Errorf("limiter.When(two) = %v; want 0", got) + } + for i := 0; i < 1000; i++ { + limiter.When("one") + } + // limiter.When should be at this point = 1000 * rate.Limit. + // We set the threshold 1s below this value to avoid race conditions. + if got, want := limiter.When("one"), 990*100*time.Millisecond; got < want { + t.Errorf("limiter.When(one) = %v; want at least %v", got, want) + } + + if got := limiter.When("two"); got != 0 { + t.Errorf("limiter.When(two) = %v; want 0", got) + } + + limiter.Forget("one") + // Use new budget. + if got := limiter.When("one"); got != 0 { + t.Errorf("limiter.When(two) = %v; want 0", got) + } +} + func TestItemFastSlowRateLimiter(t *testing.T) { limiter := NewItemFastSlowRateLimiter(5*time.Millisecond, 10*time.Second, 3)