mirror of
https://github.com/k3s-io/kubernetes.git
synced 2026-01-30 05:43:58 +00:00
Merge pull request #38320 from liggitt/golang-ratelimit
Automatic merge from submit-queue (batch tested with PRs 59158, 38320, 59059, 55516, 59357). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>. Switch from juju/ratelimit to golang.org/x/time/rate Replaces juju/ratelimit with golang.org/x/time/rate xref https://github.com/kubernetes/steering/issues/21 Requires removing the Saturation() method on the rate limiter. In the process of attempting to contribute it to the `golang.org/x/time/rate` implementation, it became clear that what it was calculating was not very useful when combined with periodic polling. See discussion in https://go-review.googlesource.com/c/time/+/29958#message-4caffc11669cadd90e2da4c05122cfec50ea6a22 ```release-note NONE ```
This commit is contained in:
@@ -230,10 +230,6 @@
|
||||
"ImportPath": "github.com/json-iterator/go",
|
||||
"Rev": "13f86432b882000a51c6e610c620974462691a97"
|
||||
},
|
||||
{
|
||||
"ImportPath": "github.com/juju/ratelimit",
|
||||
"Rev": "5b9ff866471762aa2ab2dced63c9fb6f53921342"
|
||||
},
|
||||
{
|
||||
"ImportPath": "github.com/mailru/easyjson/buffer",
|
||||
"Rev": "2f5df55504ebc322e4d52d34df6a1f5b503bf26d"
|
||||
@@ -406,6 +402,10 @@
|
||||
"ImportPath": "golang.org/x/text/width",
|
||||
"Rev": "b19bf474d317b857955b12035d2c5acb57ce8b01"
|
||||
},
|
||||
{
|
||||
"ImportPath": "golang.org/x/time/rate",
|
||||
"Rev": "f51c12702a4d776e4c1fa9b0fabab841babae631"
|
||||
},
|
||||
{
|
||||
"ImportPath": "google.golang.org/genproto/googleapis/api/annotations",
|
||||
"Rev": "09f6ed296fc66555a25fe4ce95173148778dfa85"
|
||||
|
||||
4
staging/src/k8s.io/apiserver/Godeps/Godeps.json
generated
4
staging/src/k8s.io/apiserver/Godeps/Godeps.json
generated
@@ -558,10 +558,6 @@
|
||||
"ImportPath": "github.com/json-iterator/go",
|
||||
"Rev": "13f86432b882000a51c6e610c620974462691a97"
|
||||
},
|
||||
{
|
||||
"ImportPath": "github.com/juju/ratelimit",
|
||||
"Rev": "5b9ff866471762aa2ab2dced63c9fb6f53921342"
|
||||
},
|
||||
{
|
||||
"ImportPath": "github.com/mailru/easyjson/buffer",
|
||||
"Rev": "2f5df55504ebc322e4d52d34df6a1f5b503bf26d"
|
||||
|
||||
8
staging/src/k8s.io/client-go/Godeps/Godeps.json
generated
8
staging/src/k8s.io/client-go/Godeps/Godeps.json
generated
@@ -186,10 +186,6 @@
|
||||
"ImportPath": "github.com/json-iterator/go",
|
||||
"Rev": "13f86432b882000a51c6e610c620974462691a97"
|
||||
},
|
||||
{
|
||||
"ImportPath": "github.com/juju/ratelimit",
|
||||
"Rev": "5b9ff866471762aa2ab2dced63c9fb6f53921342"
|
||||
},
|
||||
{
|
||||
"ImportPath": "github.com/pmezard/go-difflib/difflib",
|
||||
"Rev": "d8ed2627bdf02c080bf22230dbb337003b7aba2d"
|
||||
@@ -274,6 +270,10 @@
|
||||
"ImportPath": "golang.org/x/text/unicode/norm",
|
||||
"Rev": "b19bf474d317b857955b12035d2c5acb57ce8b01"
|
||||
},
|
||||
{
|
||||
"ImportPath": "golang.org/x/time/rate",
|
||||
"Rev": "f51c12702a4d776e4c1fa9b0fabab841babae631"
|
||||
},
|
||||
{
|
||||
"ImportPath": "gopkg.in/inf.v0",
|
||||
"Rev": "3887ee99ecf07df5b447e9b00d9c0b2adaa9f3e4"
|
||||
|
||||
@@ -25,7 +25,7 @@ go_library(
|
||||
],
|
||||
importpath = "k8s.io/client-go/util/flowcontrol",
|
||||
deps = [
|
||||
"//vendor/github.com/juju/ratelimit:go_default_library",
|
||||
"//vendor/golang.org/x/time/rate:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/clock:go_default_library",
|
||||
"//vendor/k8s.io/client-go/util/integer:go_default_library",
|
||||
],
|
||||
|
||||
@@ -18,8 +18,9 @@ package flowcontrol
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/juju/ratelimit"
|
||||
"golang.org/x/time/rate"
|
||||
)
|
||||
|
||||
type RateLimiter interface {
|
||||
@@ -30,17 +31,13 @@ type RateLimiter interface {
|
||||
Accept()
|
||||
// Stop stops the rate limiter, subsequent calls to CanAccept will return false
|
||||
Stop()
|
||||
// Saturation returns a percentage number which describes how saturated
|
||||
// this rate limiter is.
|
||||
// Usually we use token bucket rate limiter. In that case,
|
||||
// 1.0 means no tokens are available; 0.0 means we have a full bucket of tokens to use.
|
||||
Saturation() float64
|
||||
// QPS returns QPS of this rate limiter
|
||||
QPS() float32
|
||||
}
|
||||
|
||||
type tokenBucketRateLimiter struct {
|
||||
limiter *ratelimit.Bucket
|
||||
limiter *rate.Limiter
|
||||
clock Clock
|
||||
qps float32
|
||||
}
|
||||
|
||||
@@ -50,42 +47,48 @@ type tokenBucketRateLimiter struct {
|
||||
// The bucket is initially filled with 'burst' tokens, and refills at a rate of 'qps'.
|
||||
// The maximum number of tokens in the bucket is capped at 'burst'.
|
||||
func NewTokenBucketRateLimiter(qps float32, burst int) RateLimiter {
|
||||
limiter := ratelimit.NewBucketWithRate(float64(qps), int64(burst))
|
||||
return newTokenBucketRateLimiter(limiter, qps)
|
||||
limiter := rate.NewLimiter(rate.Limit(qps), burst)
|
||||
return newTokenBucketRateLimiter(limiter, realClock{}, qps)
|
||||
}
|
||||
|
||||
// An injectable, mockable clock interface.
|
||||
type Clock interface {
|
||||
ratelimit.Clock
|
||||
Now() time.Time
|
||||
Sleep(time.Duration)
|
||||
}
|
||||
|
||||
type realClock struct{}
|
||||
|
||||
func (realClock) Now() time.Time {
|
||||
return time.Now()
|
||||
}
|
||||
func (realClock) Sleep(d time.Duration) {
|
||||
time.Sleep(d)
|
||||
}
|
||||
|
||||
// NewTokenBucketRateLimiterWithClock is identical to NewTokenBucketRateLimiter
|
||||
// but allows an injectable clock, for testing.
|
||||
func NewTokenBucketRateLimiterWithClock(qps float32, burst int, clock Clock) RateLimiter {
|
||||
limiter := ratelimit.NewBucketWithRateAndClock(float64(qps), int64(burst), clock)
|
||||
return newTokenBucketRateLimiter(limiter, qps)
|
||||
func NewTokenBucketRateLimiterWithClock(qps float32, burst int, c Clock) RateLimiter {
|
||||
limiter := rate.NewLimiter(rate.Limit(qps), burst)
|
||||
return newTokenBucketRateLimiter(limiter, c, qps)
|
||||
}
|
||||
|
||||
func newTokenBucketRateLimiter(limiter *ratelimit.Bucket, qps float32) RateLimiter {
|
||||
func newTokenBucketRateLimiter(limiter *rate.Limiter, c Clock, qps float32) RateLimiter {
|
||||
return &tokenBucketRateLimiter{
|
||||
limiter: limiter,
|
||||
clock: c,
|
||||
qps: qps,
|
||||
}
|
||||
}
|
||||
|
||||
func (t *tokenBucketRateLimiter) TryAccept() bool {
|
||||
return t.limiter.TakeAvailable(1) == 1
|
||||
}
|
||||
|
||||
func (t *tokenBucketRateLimiter) Saturation() float64 {
|
||||
capacity := t.limiter.Capacity()
|
||||
avail := t.limiter.Available()
|
||||
return float64(capacity-avail) / float64(capacity)
|
||||
return t.limiter.AllowN(t.clock.Now(), 1)
|
||||
}
|
||||
|
||||
// Accept will block until a token becomes available
|
||||
func (t *tokenBucketRateLimiter) Accept() {
|
||||
t.limiter.Wait(1)
|
||||
now := t.clock.Now()
|
||||
t.clock.Sleep(t.limiter.ReserveN(now, 1).DelayFrom(now))
|
||||
}
|
||||
|
||||
func (t *tokenBucketRateLimiter) Stop() {
|
||||
@@ -105,10 +108,6 @@ func (t *fakeAlwaysRateLimiter) TryAccept() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func (t *fakeAlwaysRateLimiter) Saturation() float64 {
|
||||
return 0
|
||||
}
|
||||
|
||||
func (t *fakeAlwaysRateLimiter) Stop() {}
|
||||
|
||||
func (t *fakeAlwaysRateLimiter) Accept() {}
|
||||
@@ -131,10 +130,6 @@ func (t *fakeNeverRateLimiter) TryAccept() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (t *fakeNeverRateLimiter) Saturation() float64 {
|
||||
return 1
|
||||
}
|
||||
|
||||
func (t *fakeNeverRateLimiter) Stop() {
|
||||
t.wg.Done()
|
||||
}
|
||||
|
||||
@@ -17,7 +17,6 @@ limitations under the License.
|
||||
package flowcontrol
|
||||
|
||||
import (
|
||||
"math"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
@@ -116,29 +115,6 @@ func TestThrottle(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestRateLimiterSaturation(t *testing.T) {
|
||||
const e = 0.000001
|
||||
tests := []struct {
|
||||
capacity int
|
||||
take int
|
||||
|
||||
expectedSaturation float64
|
||||
}{
|
||||
{1, 1, 1},
|
||||
{10, 3, 0.3},
|
||||
}
|
||||
for i, tt := range tests {
|
||||
rl := NewTokenBucketRateLimiter(1, tt.capacity)
|
||||
for i := 0; i < tt.take; i++ {
|
||||
rl.Accept()
|
||||
}
|
||||
if math.Abs(rl.Saturation()-tt.expectedSaturation) > e {
|
||||
t.Fatalf("#%d: Saturation rate difference isn't within tolerable range\n want=%f, get=%f",
|
||||
i, tt.expectedSaturation, rl.Saturation())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestAlwaysFake(t *testing.T) {
|
||||
rl := NewFakeAlwaysRateLimiter()
|
||||
if !rl.TryAccept() {
|
||||
|
||||
@@ -34,7 +34,7 @@ go_library(
|
||||
],
|
||||
importpath = "k8s.io/client-go/util/workqueue",
|
||||
deps = [
|
||||
"//vendor/github.com/juju/ratelimit:go_default_library",
|
||||
"//vendor/golang.org/x/time/rate:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/clock:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
|
||||
],
|
||||
|
||||
@@ -21,7 +21,7 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/juju/ratelimit"
|
||||
"golang.org/x/time/rate"
|
||||
)
|
||||
|
||||
type RateLimiter interface {
|
||||
@@ -40,19 +40,19 @@ func DefaultControllerRateLimiter() RateLimiter {
|
||||
return NewMaxOfRateLimiter(
|
||||
NewItemExponentialFailureRateLimiter(5*time.Millisecond, 1000*time.Second),
|
||||
// 10 qps, 100 bucket size. This is only for retry speed and its only the overall factor (not per item)
|
||||
&BucketRateLimiter{Bucket: ratelimit.NewBucketWithRate(float64(10), int64(100))},
|
||||
&BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
|
||||
)
|
||||
}
|
||||
|
||||
// BucketRateLimiter adapts a standard bucket to the workqueue ratelimiter API
|
||||
type BucketRateLimiter struct {
|
||||
*ratelimit.Bucket
|
||||
*rate.Limiter
|
||||
}
|
||||
|
||||
var _ RateLimiter = &BucketRateLimiter{}
|
||||
|
||||
func (r *BucketRateLimiter) When(item interface{}) time.Duration {
|
||||
return r.Bucket.Take(1)
|
||||
return r.Limiter.Reserve().Delay()
|
||||
}
|
||||
|
||||
func (r *BucketRateLimiter) NumRequeues(item interface{}) int {
|
||||
|
||||
@@ -210,10 +210,6 @@
|
||||
"ImportPath": "github.com/json-iterator/go",
|
||||
"Rev": "13f86432b882000a51c6e610c620974462691a97"
|
||||
},
|
||||
{
|
||||
"ImportPath": "github.com/juju/ratelimit",
|
||||
"Rev": "5b9ff866471762aa2ab2dced63c9fb6f53921342"
|
||||
},
|
||||
{
|
||||
"ImportPath": "github.com/mailru/easyjson/buffer",
|
||||
"Rev": "2f5df55504ebc322e4d52d34df6a1f5b503bf26d"
|
||||
@@ -382,6 +378,10 @@
|
||||
"ImportPath": "golang.org/x/text/width",
|
||||
"Rev": "b19bf474d317b857955b12035d2c5acb57ce8b01"
|
||||
},
|
||||
{
|
||||
"ImportPath": "golang.org/x/time/rate",
|
||||
"Rev": "f51c12702a4d776e4c1fa9b0fabab841babae631"
|
||||
},
|
||||
{
|
||||
"ImportPath": "google.golang.org/genproto/googleapis/api/annotations",
|
||||
"Rev": "09f6ed296fc66555a25fe4ce95173148778dfa85"
|
||||
|
||||
12
staging/src/k8s.io/metrics/Godeps/Godeps.json
generated
12
staging/src/k8s.io/metrics/Godeps/Godeps.json
generated
@@ -62,14 +62,14 @@
|
||||
"ImportPath": "github.com/json-iterator/go",
|
||||
"Rev": "13f86432b882000a51c6e610c620974462691a97"
|
||||
},
|
||||
{
|
||||
"ImportPath": "github.com/juju/ratelimit",
|
||||
"Rev": "5b9ff866471762aa2ab2dced63c9fb6f53921342"
|
||||
},
|
||||
{
|
||||
"ImportPath": "github.com/spf13/pflag",
|
||||
"Rev": "4c012f6dcd9546820e378d0bdda4d8fc772cdfea"
|
||||
},
|
||||
{
|
||||
"ImportPath": "golang.org/x/net/context",
|
||||
"Rev": "1c05540f6879653db88113bc4a2b70aec4bd491f"
|
||||
},
|
||||
{
|
||||
"ImportPath": "golang.org/x/net/http2",
|
||||
"Rev": "1c05540f6879653db88113bc4a2b70aec4bd491f"
|
||||
@@ -102,6 +102,10 @@
|
||||
"ImportPath": "golang.org/x/text/unicode/norm",
|
||||
"Rev": "b19bf474d317b857955b12035d2c5acb57ce8b01"
|
||||
},
|
||||
{
|
||||
"ImportPath": "golang.org/x/time/rate",
|
||||
"Rev": "f51c12702a4d776e4c1fa9b0fabab841babae631"
|
||||
},
|
||||
{
|
||||
"ImportPath": "gopkg.in/inf.v0",
|
||||
"Rev": "3887ee99ecf07df5b447e9b00d9c0b2adaa9f3e4"
|
||||
|
||||
@@ -202,10 +202,6 @@
|
||||
"ImportPath": "github.com/json-iterator/go",
|
||||
"Rev": "13f86432b882000a51c6e610c620974462691a97"
|
||||
},
|
||||
{
|
||||
"ImportPath": "github.com/juju/ratelimit",
|
||||
"Rev": "5b9ff866471762aa2ab2dced63c9fb6f53921342"
|
||||
},
|
||||
{
|
||||
"ImportPath": "github.com/mailru/easyjson/buffer",
|
||||
"Rev": "2f5df55504ebc322e4d52d34df6a1f5b503bf26d"
|
||||
@@ -366,6 +362,10 @@
|
||||
"ImportPath": "golang.org/x/text/width",
|
||||
"Rev": "b19bf474d317b857955b12035d2c5acb57ce8b01"
|
||||
},
|
||||
{
|
||||
"ImportPath": "golang.org/x/time/rate",
|
||||
"Rev": "f51c12702a4d776e4c1fa9b0fabab841babae631"
|
||||
},
|
||||
{
|
||||
"ImportPath": "google.golang.org/genproto/googleapis/api/annotations",
|
||||
"Rev": "09f6ed296fc66555a25fe4ce95173148778dfa85"
|
||||
|
||||
@@ -86,10 +86,6 @@
|
||||
"ImportPath": "github.com/json-iterator/go",
|
||||
"Rev": "13f86432b882000a51c6e610c620974462691a97"
|
||||
},
|
||||
{
|
||||
"ImportPath": "github.com/juju/ratelimit",
|
||||
"Rev": "5b9ff866471762aa2ab2dced63c9fb6f53921342"
|
||||
},
|
||||
{
|
||||
"ImportPath": "github.com/spf13/pflag",
|
||||
"Rev": "4c012f6dcd9546820e378d0bdda4d8fc772cdfea"
|
||||
@@ -142,6 +138,10 @@
|
||||
"ImportPath": "golang.org/x/text/unicode/norm",
|
||||
"Rev": "b19bf474d317b857955b12035d2c5acb57ce8b01"
|
||||
},
|
||||
{
|
||||
"ImportPath": "golang.org/x/time/rate",
|
||||
"Rev": "f51c12702a4d776e4c1fa9b0fabab841babae631"
|
||||
},
|
||||
{
|
||||
"ImportPath": "gopkg.in/inf.v0",
|
||||
"Rev": "3887ee99ecf07df5b447e9b00d9c0b2adaa9f3e4"
|
||||
|
||||
Reference in New Issue
Block a user