Re-use juju ratelimit

Reverts changes in cebfc821a4
This commit is contained in:
Jordan Liggitt 2016-12-07 10:30:15 -05:00
parent 45cf32810d
commit 1c89a10556
No known key found for this signature in database
GPG Key ID: 24E7ADF9A3B42012
8 changed files with 14 additions and 379 deletions

View File

@ -20,7 +20,7 @@ go_library(
deps = [
"//pkg/util/clock:go_default_library",
"//pkg/util/integer:go_default_library",
"//pkg/util/ratelimit:go_default_library",
"//vendor:github.com/juju/ratelimit",
],
)

View File

@ -19,7 +19,7 @@ package flowcontrol
import (
"sync"
"k8s.io/kubernetes/pkg/util/ratelimit"
"github.com/juju/ratelimit"
)
type RateLimiter interface {

View File

@ -1,25 +0,0 @@
package(default_visibility = ["//visibility:public"])
licenses(["notice"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_binary",
"go_library",
"go_test",
"cgo_library",
)
go_library(
name = "go_default_library",
srcs = ["bucket.go"],
tags = ["automanaged"],
)
go_test(
name = "go_default_test",
srcs = ["bucket_test.go"],
library = "go_default_library",
tags = ["automanaged"],
deps = [],
)

View File

@ -1,170 +0,0 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package ratelimit
import (
"math"
"sync"
"time"
)
// Bucket models a token bucket
type Bucket struct {
unitsPerNano float64
nanosPerUnit float64
capacity int64
mutex sync.Mutex
available int64
lastRefill int64
// fractionalAvailable "buffers" any amounts that flowed into the bucket smaller than one unit
// This lets us retain precision even with pathological refill rates like (1E9 + 1) per second
fractionalAvailable float64
}
// NewBucketWithRate creates a new token bucket, with maximum capacity = initial capacity, and a refill rate of qps
// We use floats for refill calculations, which introduces the possibility of truncation and rounding errors.
// For "sensible" qps values though, is is acceptable: jbeda did some tests here https://play.golang.org/p/LSKUOGz2LG
func NewBucketWithRate(qps float64, capacity int64) *Bucket {
unitsPerNano := qps / 1E9
nanosPerUnit := 1E9 / qps
b := &Bucket{
unitsPerNano: unitsPerNano,
nanosPerUnit: nanosPerUnit,
capacity: capacity,
available: capacity,
lastRefill: time.Now().UnixNano(),
}
return b
}
// Take takes n units from the bucket, reducing the available quantity even below zero,
// but then returns the amount of time we should wait
func (b *Bucket) Take(n int64) time.Duration {
b.mutex.Lock()
defer b.mutex.Unlock()
var d time.Duration
if b.available >= n {
// Fast path when bucket has sufficient availability before refilling
} else {
b.refill()
if b.available < n {
deficit := n - b.available
d = time.Duration(int64(float64(deficit) * b.nanosPerUnit))
}
}
b.available -= n
return d
}
// TakeAvailable immediately takes whatever quantity is available, up to max
func (b *Bucket) TakeAvailable(max int64) int64 {
b.mutex.Lock()
defer b.mutex.Unlock()
var took int64
if b.available >= max {
// Fast path when bucket has sufficient availability before refilling
took = max
} else {
b.refill()
took = b.available
if took < 0 {
took = 0
} else if took > max {
took = max
}
}
if took > 0 {
b.available -= took
}
return took
}
// Wait combines a call to Take with a sleep call
func (b *Bucket) Wait(n int64) {
d := b.Take(n)
if d != 0 {
time.Sleep(d)
}
}
// Capacity returns the maximum capacity of the bucket
func (b *Bucket) Capacity() int64 {
return b.capacity
}
// Available returns the quantity available in the bucket (which may be negative), but does not take it.
// This function is for diagnostic / informational purposes only - the returned capacity may immediately
// be inaccurate if another thread is operating on the bucket concurrently.
func (b *Bucket) Available() int64 {
b.mutex.Lock()
defer b.mutex.Unlock()
b.refill()
return b.available
}
// refill replenishes the bucket based on elapsed time; mutex must be held
func (b *Bucket) refill() {
// Note that we really want a monotonic clock here, but go says no:
// https://github.com/golang/go/issues/12914
now := time.Now().UnixNano()
b.refillAtTimestamp(now)
}
// refillAtTimestamp is the logic of the refill function, for testing
func (b *Bucket) refillAtTimestamp(now int64) {
nanosSinceLastRefill := now - b.lastRefill
if nanosSinceLastRefill <= 0 {
// we really want monotonic
return
}
// Compute units that have flowed into bucket
refillFloat := (float64(nanosSinceLastRefill) * b.unitsPerNano) + b.fractionalAvailable
if refillFloat > float64(b.capacity) {
// float64 > MaxInt64 can be converted to negative int64; side step this
b.available = b.capacity
// Don't worry about the fractional units with huge refill rates
} else {
whole, fraction := math.Modf(refillFloat)
refill := int64(whole)
b.fractionalAvailable = fraction
if refill != 0 {
// Refill with overflow
b.available += refill
if b.available >= b.capacity {
b.available = b.capacity
b.fractionalAvailable = 0
}
}
}
b.lastRefill = now
}

View File

@ -1,179 +0,0 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package ratelimit
import (
"testing"
"time"
)
func TestSimpleExhaustion(t *testing.T) {
capacity := int64(3)
b := NewBucketWithRate(1, capacity)
// Empty the bucket
for i := int64(0); i < capacity; i++ {
testAvailable(t, b, capacity-i)
testTakeNoDelay(t, b, 1)
}
testAvailable(t, b, 0)
// A take on an empty bucket should incur a delay
testTakeDelay(t, b, 1, 1*time.Second, 100*time.Millisecond)
testAvailable(t, b, -1)
}
func TestRefill(t *testing.T) {
capacity := int64(3)
b := NewBucketWithRate(1, capacity)
clock := b.lastRefill
// Empty the bucket
testAvailable(t, b, capacity)
for i := int64(0); i < capacity; i++ {
testTakeNoDelay(t, b, 1)
}
testAvailable(t, b, 0)
// In one second, one unit should be refilled
clock += time.Second.Nanoseconds()
b.refillAtTimestamp(clock)
testAvailable(t, b, 1)
testTakeNoDelay(t, b, 1)
testAvailable(t, b, 0)
// Partial refill periods don't result in lost time
for i := 0; i < 4; i++ {
clock += time.Millisecond.Nanoseconds() * 200
b.refillAtTimestamp(clock)
testAvailable(t, b, 0)
}
clock += time.Millisecond.Nanoseconds() * 200
b.refillAtTimestamp(clock)
testAvailable(t, b, 1)
testTakeNoDelay(t, b, 1)
testAvailable(t, b, 0)
}
// TestSlowRefillRate checks we don't have problems with tiny refill rates
func TestSlowRefillRate(t *testing.T) {
for _, capacity := range []int64{int64(1), int64(1E18)} {
b := NewBucketWithRate(1E-9, capacity)
clock := b.lastRefill
// Empty the bucket
testTakeNoDelay(t, b, b.available)
// In one second, should refill nothing
clock += time.Second.Nanoseconds()
b.refillAtTimestamp(clock)
testAvailable(t, b, 0)
// We need to have 1E18 nanos to see any refill
clock += 1E18
b.refillAtTimestamp(clock)
testAvailable(t, b, 1)
testTakeNoDelay(t, b, 1)
testAvailable(t, b, 0)
}
}
// TestFastRefillRate checks for refill rates that are around 1 / ns (our granularity)
func TestFastRefillRate(t *testing.T) {
for _, capacity := range []int64{int64(1), int64(1E18)} {
b := NewBucketWithRate(1E9, capacity)
// Empty the bucket
testTakeNoDelay(t, b, b.available)
// In one nanosecond, should refill exactly one unit
clock := b.lastRefill + 1
b.refillAtTimestamp(clock)
testAvailable(t, b, 1)
testTakeNoDelay(t, b, 1)
testAvailable(t, b, 0)
}
}
// TestRefillRatePrecision checks for rounding errors
func TestRefillRatePrecision(t *testing.T) {
capacity := int64(1E18)
b := NewBucketWithRate(1+1E9, capacity)
// Empty the bucket
testTakeNoDelay(t, b, b.available)
// In one nanosecond, should refill exactly one unit
clock := b.lastRefill + 1
b.refillAtTimestamp(clock)
testAvailable(t, b, 1)
testTakeNoDelay(t, b, 1)
testAvailable(t, b, 0)
// In one second, should refill the 1 extra also
clock += 1E9
b.refillAtTimestamp(clock)
testAvailable(t, b, 1000000001)
testTakeNoDelay(t, b, 1000000001)
testAvailable(t, b, 0)
}
// TestSlowRefillRate checks we don't have problems with ridiculously high refill rates
func TestHugeRefillRate(t *testing.T) {
for _, capacity := range []int64{int64(1), int64(1E18)} {
b := NewBucketWithRate(1E27, capacity)
// Empty the bucket
testTakeNoDelay(t, b, b.available)
// In one nanosecond, should refill to capacity
clock := b.lastRefill + 1
b.refillAtTimestamp(clock)
testAvailable(t, b, capacity)
testTakeNoDelay(t, b, capacity)
testAvailable(t, b, 0)
// In one second, should refill to capacity, but with huge overflow that must be discarded
clock += time.Second.Nanoseconds()
b.refillAtTimestamp(clock)
testAvailable(t, b, capacity)
testTakeNoDelay(t, b, capacity)
testAvailable(t, b, 0)
}
}
func testAvailable(t *testing.T, b *Bucket, expected int64) {
available := b.available
if available != expected {
t.Errorf("unexpected available; expected=%d, actual=%d", expected, available)
}
}
func testTakeDelay(t *testing.T, b *Bucket, take int64, expected time.Duration, tolerance time.Duration) {
actual := b.Take(take)
error := expected.Nanoseconds() - actual.Nanoseconds()
if error < 0 {
error = -error
}
if error > tolerance.Nanoseconds() {
t.Errorf("unexpected delay on take(%d); expected=%d, actual=%d", take, expected, actual)
}
}
func testTakeNoDelay(t *testing.T, b *Bucket, take int64) {
testTakeDelay(t, b, take, 0, 0)
}

View File

@ -25,8 +25,8 @@ go_library(
tags = ["automanaged"],
deps = [
"//pkg/util/clock:go_default_library",
"//pkg/util/ratelimit:go_default_library",
"//pkg/util/runtime:go_default_library",
"//vendor:github.com/juju/ratelimit",
],
)

View File

@ -21,7 +21,7 @@ import (
"sync"
"time"
"k8s.io/kubernetes/pkg/util/ratelimit"
"github.com/juju/ratelimit"
)
type RateLimiter interface {
@ -35,7 +35,7 @@ type RateLimiter interface {
}
// DefaultControllerRateLimiter is a no-arg constructor for a default rate limiter for a workqueue. It has
// both overall and per-item rate limiting. The overall is a token bucket and the per-item is exponential
// both overall and per-item rate limitting. The overall is a token bucket and the per-item is exponential
func DefaultControllerRateLimiter() RateLimiter {
return NewMaxOfRateLimiter(
NewItemExponentialFailureRateLimiter(5*time.Millisecond, 1000*time.Second),

9
vendor/BUILD vendored
View File

@ -4240,6 +4240,15 @@ go_binary(
deps = ["//vendor:github.com/jteeuwen/go-bindata"],
)
go_library(
name = "github.com/juju/ratelimit",
srcs = [
"github.com/juju/ratelimit/ratelimit.go",
"github.com/juju/ratelimit/reader.go",
],
tags = ["automanaged"],
)
go_library(
name = "github.com/kardianos/osext",
srcs = [