mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 11:50:44 +00:00
Merge pull request #88846 from mborsz/revert-88261-rate
Revert "Implement ItemBucketRateLimiter"
This commit is contained in:
commit
e7e73f7364
@ -20,7 +20,6 @@ go_test(
|
|||||||
deps = [
|
deps = [
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
||||||
"//vendor/golang.org/x/time/rate:go_default_library",
|
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -62,54 +62,6 @@ func (r *BucketRateLimiter) NumRequeues(item interface{}) int {
|
|||||||
func (r *BucketRateLimiter) Forget(item interface{}) {
|
func (r *BucketRateLimiter) Forget(item interface{}) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// ItemBucketRateLimiter implements a workqueue ratelimiter API using standard rate.Limiter.
|
|
||||||
// Each key is using a separate limiter.
|
|
||||||
type ItemBucketRateLimiter struct {
|
|
||||||
r rate.Limit
|
|
||||||
burst int
|
|
||||||
|
|
||||||
limitersLock sync.Mutex
|
|
||||||
limiters map[interface{}]*rate.Limiter
|
|
||||||
}
|
|
||||||
|
|
||||||
var _ RateLimiter = &ItemBucketRateLimiter{}
|
|
||||||
|
|
||||||
// NewItemBucketRateLimiter creates new ItemBucketRateLimiter instance.
|
|
||||||
func NewItemBucketRateLimiter(r rate.Limit, burst int) *ItemBucketRateLimiter {
|
|
||||||
return &ItemBucketRateLimiter{
|
|
||||||
r: r,
|
|
||||||
burst: burst,
|
|
||||||
limiters: make(map[interface{}]*rate.Limiter),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// When returns a time.Duration which we need to wait before item is processed.
|
|
||||||
func (r *ItemBucketRateLimiter) When(item interface{}) time.Duration {
|
|
||||||
r.limitersLock.Lock()
|
|
||||||
defer r.limitersLock.Unlock()
|
|
||||||
|
|
||||||
limiter, ok := r.limiters[item]
|
|
||||||
if !ok {
|
|
||||||
limiter = rate.NewLimiter(r.r, r.burst)
|
|
||||||
r.limiters[item] = limiter
|
|
||||||
}
|
|
||||||
|
|
||||||
return limiter.Reserve().Delay()
|
|
||||||
}
|
|
||||||
|
|
||||||
// NumRequeues returns always 0 (doesn't apply to ItemBucketRateLimiter).
|
|
||||||
func (r *ItemBucketRateLimiter) NumRequeues(item interface{}) int {
|
|
||||||
return 0
|
|
||||||
}
|
|
||||||
|
|
||||||
// Forget removes item from the internal state.
|
|
||||||
func (r *ItemBucketRateLimiter) Forget(item interface{}) {
|
|
||||||
r.limitersLock.Lock()
|
|
||||||
defer r.limitersLock.Unlock()
|
|
||||||
|
|
||||||
delete(r.limiters, item)
|
|
||||||
}
|
|
||||||
|
|
||||||
// ItemExponentialFailureRateLimiter does a simple baseDelay*2^<num-failures> limit
|
// ItemExponentialFailureRateLimiter does a simple baseDelay*2^<num-failures> limit
|
||||||
// dealing with max failures and expiration are up to the caller
|
// dealing with max failures and expiration are up to the caller
|
||||||
type ItemExponentialFailureRateLimiter struct {
|
type ItemExponentialFailureRateLimiter struct {
|
||||||
|
@ -19,8 +19,6 @@ package workqueue
|
|||||||
import (
|
import (
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"golang.org/x/time/rate"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestItemExponentialFailureRateLimiter(t *testing.T) {
|
func TestItemExponentialFailureRateLimiter(t *testing.T) {
|
||||||
@ -98,33 +96,6 @@ func TestItemExponentialFailureRateLimiterOverFlow(t *testing.T) {
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestItemBucketRateLimiter(t *testing.T) {
|
|
||||||
limiter := NewItemBucketRateLimiter(rate.Every(100*time.Millisecond), 1)
|
|
||||||
|
|
||||||
// Use initial burst.
|
|
||||||
if got := limiter.When("one"); got != 0 {
|
|
||||||
t.Errorf("limiter.When(two) = %v; want 0", got)
|
|
||||||
}
|
|
||||||
for i := 0; i < 1000; i++ {
|
|
||||||
limiter.When("one")
|
|
||||||
}
|
|
||||||
// limiter.When should be at this point = 1000 * rate.Limit.
|
|
||||||
// We set the threshold 1s below this value to avoid race conditions.
|
|
||||||
if got, want := limiter.When("one"), 990*100*time.Millisecond; got < want {
|
|
||||||
t.Errorf("limiter.When(one) = %v; want at least %v", got, want)
|
|
||||||
}
|
|
||||||
|
|
||||||
if got := limiter.When("two"); got != 0 {
|
|
||||||
t.Errorf("limiter.When(two) = %v; want 0", got)
|
|
||||||
}
|
|
||||||
|
|
||||||
limiter.Forget("one")
|
|
||||||
// Use new budget.
|
|
||||||
if got := limiter.When("one"); got != 0 {
|
|
||||||
t.Errorf("limiter.When(two) = %v; want 0", got)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestItemFastSlowRateLimiter(t *testing.T) {
|
func TestItemFastSlowRateLimiter(t *testing.T) {
|
||||||
limiter := NewItemFastSlowRateLimiter(5*time.Millisecond, 10*time.Second, 3)
|
limiter := NewItemFastSlowRateLimiter(5*time.Millisecond, 10*time.Second, 3)
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user