mirror of
https://github.com/kubernetes/client-go.git
synced 2025-09-06 17:40:57 +00:00
Switch from juju/ratelimit to golang.org/x/time/rate
Kubernetes-commit: 4b9f00988b9401f6c774d3d5e1bce45a8e8f81c8
This commit is contained in:
committed by
Kubernetes Publisher
parent
7392f7f78f
commit
acc52496ce
@@ -18,8 +18,9 @@ package flowcontrol
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/juju/ratelimit"
|
||||
"golang.org/x/time/rate"
|
||||
)
|
||||
|
||||
type RateLimiter interface {
|
||||
@@ -35,7 +36,8 @@ type RateLimiter interface {
|
||||
}
|
||||
|
||||
type tokenBucketRateLimiter struct {
|
||||
limiter *ratelimit.Bucket
|
||||
limiter *rate.Limiter
|
||||
clock Clock
|
||||
qps float32
|
||||
}
|
||||
|
||||
@@ -45,36 +47,48 @@ type tokenBucketRateLimiter struct {
|
||||
// The bucket is initially filled with 'burst' tokens, and refills at a rate of 'qps'.
|
||||
// The maximum number of tokens in the bucket is capped at 'burst'.
|
||||
func NewTokenBucketRateLimiter(qps float32, burst int) RateLimiter {
|
||||
limiter := ratelimit.NewBucketWithRate(float64(qps), int64(burst))
|
||||
return newTokenBucketRateLimiter(limiter, qps)
|
||||
limiter := rate.NewLimiter(rate.Limit(qps), burst)
|
||||
return newTokenBucketRateLimiter(limiter, realClock{}, qps)
|
||||
}
|
||||
|
||||
// An injectable, mockable clock interface.
|
||||
type Clock interface {
|
||||
ratelimit.Clock
|
||||
Now() time.Time
|
||||
Sleep(time.Duration)
|
||||
}
|
||||
|
||||
type realClock struct{}
|
||||
|
||||
func (realClock) Now() time.Time {
|
||||
return time.Now()
|
||||
}
|
||||
func (realClock) Sleep(d time.Duration) {
|
||||
time.Sleep(d)
|
||||
}
|
||||
|
||||
// NewTokenBucketRateLimiterWithClock is identical to NewTokenBucketRateLimiter
|
||||
// but allows an injectable clock, for testing.
|
||||
func NewTokenBucketRateLimiterWithClock(qps float32, burst int, clock Clock) RateLimiter {
|
||||
limiter := ratelimit.NewBucketWithRateAndClock(float64(qps), int64(burst), clock)
|
||||
return newTokenBucketRateLimiter(limiter, qps)
|
||||
func NewTokenBucketRateLimiterWithClock(qps float32, burst int, c Clock) RateLimiter {
|
||||
limiter := rate.NewLimiter(rate.Limit(qps), burst)
|
||||
return newTokenBucketRateLimiter(limiter, c, qps)
|
||||
}
|
||||
|
||||
func newTokenBucketRateLimiter(limiter *ratelimit.Bucket, qps float32) RateLimiter {
|
||||
func newTokenBucketRateLimiter(limiter *rate.Limiter, c Clock, qps float32) RateLimiter {
|
||||
return &tokenBucketRateLimiter{
|
||||
limiter: limiter,
|
||||
clock: c,
|
||||
qps: qps,
|
||||
}
|
||||
}
|
||||
|
||||
func (t *tokenBucketRateLimiter) TryAccept() bool {
|
||||
return t.limiter.TakeAvailable(1) == 1
|
||||
return t.limiter.AllowN(t.clock.Now(), 1)
|
||||
}
|
||||
|
||||
// Accept will block until a token becomes available
|
||||
func (t *tokenBucketRateLimiter) Accept() {
|
||||
t.limiter.Wait(1)
|
||||
now := t.clock.Now()
|
||||
t.clock.Sleep(t.limiter.ReserveN(now, 1).DelayFrom(now))
|
||||
}
|
||||
|
||||
func (t *tokenBucketRateLimiter) Stop() {
|
||||
|
Reference in New Issue
Block a user