From 1c89a1055666f06e3e56182e87e6a1ec2d7d86d6 Mon Sep 17 00:00:00 2001 From: Jordan Liggitt Date: Wed, 7 Dec 2016 10:30:15 -0500 Subject: [PATCH] Re-use juju ratelimit Reverts changes in cebfc821a4418a1c23b4bf7e1cff6967aff2df0f --- pkg/util/flowcontrol/BUILD | 2 +- pkg/util/flowcontrol/throttle.go | 2 +- pkg/util/ratelimit/BUILD | 25 --- pkg/util/ratelimit/bucket.go | 170 ------------------- pkg/util/ratelimit/bucket_test.go | 179 -------------------- pkg/util/workqueue/BUILD | 2 +- pkg/util/workqueue/default_rate_limiters.go | 4 +- vendor/BUILD | 9 + 8 files changed, 14 insertions(+), 379 deletions(-) delete mode 100644 pkg/util/ratelimit/BUILD delete mode 100644 pkg/util/ratelimit/bucket.go delete mode 100644 pkg/util/ratelimit/bucket_test.go diff --git a/pkg/util/flowcontrol/BUILD b/pkg/util/flowcontrol/BUILD index 3caecf81cf2..433cd4350e9 100644 --- a/pkg/util/flowcontrol/BUILD +++ b/pkg/util/flowcontrol/BUILD @@ -20,7 +20,7 @@ go_library( deps = [ "//pkg/util/clock:go_default_library", "//pkg/util/integer:go_default_library", - "//pkg/util/ratelimit:go_default_library", + "//vendor:github.com/juju/ratelimit", ], ) diff --git a/pkg/util/flowcontrol/throttle.go b/pkg/util/flowcontrol/throttle.go index 42170a18922..881a2f57d7d 100644 --- a/pkg/util/flowcontrol/throttle.go +++ b/pkg/util/flowcontrol/throttle.go @@ -19,7 +19,7 @@ package flowcontrol import ( "sync" - "k8s.io/kubernetes/pkg/util/ratelimit" + "github.com/juju/ratelimit" ) type RateLimiter interface { diff --git a/pkg/util/ratelimit/BUILD b/pkg/util/ratelimit/BUILD deleted file mode 100644 index 760940e2e89..00000000000 --- a/pkg/util/ratelimit/BUILD +++ /dev/null @@ -1,25 +0,0 @@ -package(default_visibility = ["//visibility:public"]) - -licenses(["notice"]) - -load( - "@io_bazel_rules_go//go:def.bzl", - "go_binary", - "go_library", - "go_test", - "cgo_library", -) - -go_library( - name = "go_default_library", - srcs = ["bucket.go"], - tags = ["automanaged"], -) - -go_test( - name = "go_default_test", - srcs = ["bucket_test.go"], - library = "go_default_library", - tags = ["automanaged"], - deps = [], -) diff --git a/pkg/util/ratelimit/bucket.go b/pkg/util/ratelimit/bucket.go deleted file mode 100644 index 752a251d05c..00000000000 --- a/pkg/util/ratelimit/bucket.go +++ /dev/null @@ -1,170 +0,0 @@ -/* -Copyright 2016 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package ratelimit - -import ( - "math" - "sync" - "time" -) - -// Bucket models a token bucket -type Bucket struct { - unitsPerNano float64 - nanosPerUnit float64 - capacity int64 - - mutex sync.Mutex - available int64 - lastRefill int64 - // fractionalAvailable "buffers" any amounts that flowed into the bucket smaller than one unit - // This lets us retain precision even with pathological refill rates like (1E9 + 1) per second - fractionalAvailable float64 -} - -// NewBucketWithRate creates a new token bucket, with maximum capacity = initial capacity, and a refill rate of qps -// We use floats for refill calculations, which introduces the possibility of truncation and rounding errors. -// For "sensible" qps values though, is is acceptable: jbeda did some tests here https://play.golang.org/p/LSKUOGz2LG -func NewBucketWithRate(qps float64, capacity int64) *Bucket { - unitsPerNano := qps / 1E9 - nanosPerUnit := 1E9 / qps - b := &Bucket{ - unitsPerNano: unitsPerNano, - nanosPerUnit: nanosPerUnit, - capacity: capacity, - available: capacity, - lastRefill: time.Now().UnixNano(), - } - return b -} - -// Take takes n units from the bucket, reducing the available quantity even below zero, -// but then returns the amount of time we should wait -func (b *Bucket) Take(n int64) time.Duration { - b.mutex.Lock() - defer b.mutex.Unlock() - - var d time.Duration - if b.available >= n { - // Fast path when bucket has sufficient availability before refilling - } else { - b.refill() - - if b.available < n { - deficit := n - b.available - d = time.Duration(int64(float64(deficit) * b.nanosPerUnit)) - } - } - - b.available -= n - - return d -} - -// TakeAvailable immediately takes whatever quantity is available, up to max -func (b *Bucket) TakeAvailable(max int64) int64 { - b.mutex.Lock() - defer b.mutex.Unlock() - - var took int64 - if b.available >= max { - // Fast path when bucket has sufficient availability before refilling - took = max - } else { - b.refill() - - took = b.available - - if took < 0 { - took = 0 - } else if took > max { - took = max - } - } - - if took > 0 { - b.available -= took - } - - return took -} - -// Wait combines a call to Take with a sleep call -func (b *Bucket) Wait(n int64) { - d := b.Take(n) - if d != 0 { - time.Sleep(d) - } -} - -// Capacity returns the maximum capacity of the bucket -func (b *Bucket) Capacity() int64 { - return b.capacity -} - -// Available returns the quantity available in the bucket (which may be negative), but does not take it. -// This function is for diagnostic / informational purposes only - the returned capacity may immediately -// be inaccurate if another thread is operating on the bucket concurrently. -func (b *Bucket) Available() int64 { - b.mutex.Lock() - defer b.mutex.Unlock() - - b.refill() - - return b.available -} - -// refill replenishes the bucket based on elapsed time; mutex must be held -func (b *Bucket) refill() { - // Note that we really want a monotonic clock here, but go says no: - // https://github.com/golang/go/issues/12914 - now := time.Now().UnixNano() - - b.refillAtTimestamp(now) -} - -// refillAtTimestamp is the logic of the refill function, for testing -func (b *Bucket) refillAtTimestamp(now int64) { - nanosSinceLastRefill := now - b.lastRefill - if nanosSinceLastRefill <= 0 { - // we really want monotonic - return - } - - // Compute units that have flowed into bucket - refillFloat := (float64(nanosSinceLastRefill) * b.unitsPerNano) + b.fractionalAvailable - if refillFloat > float64(b.capacity) { - // float64 > MaxInt64 can be converted to negative int64; side step this - b.available = b.capacity - - // Don't worry about the fractional units with huge refill rates - } else { - whole, fraction := math.Modf(refillFloat) - refill := int64(whole) - b.fractionalAvailable = fraction - if refill != 0 { - // Refill with overflow - b.available += refill - if b.available >= b.capacity { - b.available = b.capacity - b.fractionalAvailable = 0 - } - } - - } - b.lastRefill = now -} diff --git a/pkg/util/ratelimit/bucket_test.go b/pkg/util/ratelimit/bucket_test.go deleted file mode 100644 index 00a5b799ab8..00000000000 --- a/pkg/util/ratelimit/bucket_test.go +++ /dev/null @@ -1,179 +0,0 @@ -/* -Copyright 2016 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package ratelimit - -import ( - "testing" - "time" -) - -func TestSimpleExhaustion(t *testing.T) { - capacity := int64(3) - b := NewBucketWithRate(1, capacity) - - // Empty the bucket - for i := int64(0); i < capacity; i++ { - testAvailable(t, b, capacity-i) - testTakeNoDelay(t, b, 1) - } - testAvailable(t, b, 0) - - // A take on an empty bucket should incur a delay - testTakeDelay(t, b, 1, 1*time.Second, 100*time.Millisecond) - testAvailable(t, b, -1) -} - -func TestRefill(t *testing.T) { - capacity := int64(3) - b := NewBucketWithRate(1, capacity) - clock := b.lastRefill - - // Empty the bucket - testAvailable(t, b, capacity) - for i := int64(0); i < capacity; i++ { - testTakeNoDelay(t, b, 1) - } - testAvailable(t, b, 0) - - // In one second, one unit should be refilled - clock += time.Second.Nanoseconds() - b.refillAtTimestamp(clock) - testAvailable(t, b, 1) - testTakeNoDelay(t, b, 1) - testAvailable(t, b, 0) - - // Partial refill periods don't result in lost time - for i := 0; i < 4; i++ { - clock += time.Millisecond.Nanoseconds() * 200 - b.refillAtTimestamp(clock) - testAvailable(t, b, 0) - } - clock += time.Millisecond.Nanoseconds() * 200 - b.refillAtTimestamp(clock) - testAvailable(t, b, 1) - testTakeNoDelay(t, b, 1) - testAvailable(t, b, 0) -} - -// TestSlowRefillRate checks we don't have problems with tiny refill rates -func TestSlowRefillRate(t *testing.T) { - for _, capacity := range []int64{int64(1), int64(1E18)} { - b := NewBucketWithRate(1E-9, capacity) - clock := b.lastRefill - - // Empty the bucket - testTakeNoDelay(t, b, b.available) - - // In one second, should refill nothing - clock += time.Second.Nanoseconds() - b.refillAtTimestamp(clock) - testAvailable(t, b, 0) - - // We need to have 1E18 nanos to see any refill - clock += 1E18 - b.refillAtTimestamp(clock) - testAvailable(t, b, 1) - testTakeNoDelay(t, b, 1) - testAvailable(t, b, 0) - } -} - -// TestFastRefillRate checks for refill rates that are around 1 / ns (our granularity) -func TestFastRefillRate(t *testing.T) { - for _, capacity := range []int64{int64(1), int64(1E18)} { - b := NewBucketWithRate(1E9, capacity) - - // Empty the bucket - testTakeNoDelay(t, b, b.available) - - // In one nanosecond, should refill exactly one unit - clock := b.lastRefill + 1 - b.refillAtTimestamp(clock) - testAvailable(t, b, 1) - testTakeNoDelay(t, b, 1) - testAvailable(t, b, 0) - } -} - -// TestRefillRatePrecision checks for rounding errors -func TestRefillRatePrecision(t *testing.T) { - capacity := int64(1E18) - b := NewBucketWithRate(1+1E9, capacity) - - // Empty the bucket - testTakeNoDelay(t, b, b.available) - - // In one nanosecond, should refill exactly one unit - clock := b.lastRefill + 1 - b.refillAtTimestamp(clock) - testAvailable(t, b, 1) - testTakeNoDelay(t, b, 1) - testAvailable(t, b, 0) - - // In one second, should refill the 1 extra also - clock += 1E9 - b.refillAtTimestamp(clock) - testAvailable(t, b, 1000000001) - testTakeNoDelay(t, b, 1000000001) - testAvailable(t, b, 0) -} - -// TestSlowRefillRate checks we don't have problems with ridiculously high refill rates -func TestHugeRefillRate(t *testing.T) { - for _, capacity := range []int64{int64(1), int64(1E18)} { - b := NewBucketWithRate(1E27, capacity) - - // Empty the bucket - testTakeNoDelay(t, b, b.available) - - // In one nanosecond, should refill to capacity - clock := b.lastRefill + 1 - b.refillAtTimestamp(clock) - testAvailable(t, b, capacity) - testTakeNoDelay(t, b, capacity) - testAvailable(t, b, 0) - - // In one second, should refill to capacity, but with huge overflow that must be discarded - clock += time.Second.Nanoseconds() - b.refillAtTimestamp(clock) - testAvailable(t, b, capacity) - testTakeNoDelay(t, b, capacity) - testAvailable(t, b, 0) - } -} - -func testAvailable(t *testing.T, b *Bucket, expected int64) { - available := b.available - if available != expected { - t.Errorf("unexpected available; expected=%d, actual=%d", expected, available) - } -} - -func testTakeDelay(t *testing.T, b *Bucket, take int64, expected time.Duration, tolerance time.Duration) { - actual := b.Take(take) - error := expected.Nanoseconds() - actual.Nanoseconds() - if error < 0 { - error = -error - } - if error > tolerance.Nanoseconds() { - t.Errorf("unexpected delay on take(%d); expected=%d, actual=%d", take, expected, actual) - } -} - -func testTakeNoDelay(t *testing.T, b *Bucket, take int64) { - testTakeDelay(t, b, take, 0, 0) -} diff --git a/pkg/util/workqueue/BUILD b/pkg/util/workqueue/BUILD index af8eb00324f..8b8e98d92a6 100644 --- a/pkg/util/workqueue/BUILD +++ b/pkg/util/workqueue/BUILD @@ -25,8 +25,8 @@ go_library( tags = ["automanaged"], deps = [ "//pkg/util/clock:go_default_library", - "//pkg/util/ratelimit:go_default_library", "//pkg/util/runtime:go_default_library", + "//vendor:github.com/juju/ratelimit", ], ) diff --git a/pkg/util/workqueue/default_rate_limiters.go b/pkg/util/workqueue/default_rate_limiters.go index 0b5b9bba29c..35caed4fa41 100644 --- a/pkg/util/workqueue/default_rate_limiters.go +++ b/pkg/util/workqueue/default_rate_limiters.go @@ -21,7 +21,7 @@ import ( "sync" "time" - "k8s.io/kubernetes/pkg/util/ratelimit" + "github.com/juju/ratelimit" ) type RateLimiter interface { @@ -35,7 +35,7 @@ type RateLimiter interface { } // DefaultControllerRateLimiter is a no-arg constructor for a default rate limiter for a workqueue. It has -// both overall and per-item rate limiting. The overall is a token bucket and the per-item is exponential +// both overall and per-item rate limitting. The overall is a token bucket and the per-item is exponential func DefaultControllerRateLimiter() RateLimiter { return NewMaxOfRateLimiter( NewItemExponentialFailureRateLimiter(5*time.Millisecond, 1000*time.Second), diff --git a/vendor/BUILD b/vendor/BUILD index aac6d8115c0..7f6a8f34863 100644 --- a/vendor/BUILD +++ b/vendor/BUILD @@ -4240,6 +4240,15 @@ go_binary( deps = ["//vendor:github.com/jteeuwen/go-bindata"], ) +go_library( + name = "github.com/juju/ratelimit", + srcs = [ + "github.com/juju/ratelimit/ratelimit.go", + "github.com/juju/ratelimit/reader.go", + ], + tags = ["automanaged"], +) + go_library( name = "github.com/kardianos/osext", srcs = [