diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index b925b631beb..613f5ba2bbd 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -554,7 +554,7 @@ }, { "ImportPath": "github.com/juju/ratelimit", - "Rev": "772f5c38e468398c4511514f4f6aa9a4185bc0a0" + "Rev": "77ed1c8a01217656d2080ad51981f6e99adaa177" }, { "ImportPath": "github.com/kardianos/osext", diff --git a/Godeps/_workspace/src/github.com/juju/ratelimit/ratelimit.go b/Godeps/_workspace/src/github.com/juju/ratelimit/ratelimit.go index a36ffc74fec..3ef32fbcc09 100644 --- a/Godeps/_workspace/src/github.com/juju/ratelimit/ratelimit.go +++ b/Godeps/_workspace/src/github.com/juju/ratelimit/ratelimit.go @@ -8,10 +8,10 @@ package ratelimit import ( + "math" "strconv" "sync" "time" - "math" ) // Bucket represents a token bucket that fills at a predetermined rate. @@ -171,6 +171,30 @@ func (tb *Bucket) takeAvailable(now time.Time, count int64) int64 { return count } +// Available returns the number of available tokens. It will be negative +// when there are consumers waiting for tokens. Note that if this +// returns greater than zero, it does not guarantee that calls that take +// tokens from the buffer will succeed, as the number of available +// tokens could have changed in the meantime. This method is intended +// primarily for metrics reporting and debugging. +func (tb *Bucket) Available() int64 { + return tb.available(time.Now()) +} + +// available is the internal version of available - it takes the current time as +// an argument to enable easy testing. +func (tb *Bucket) available(now time.Time) int64 { + tb.mu.Lock() + defer tb.mu.Unlock() + tb.adjust(now) + return tb.avail +} + +// Capacity returns the capacity that the bucket was created with. +func (tb *Bucket) Capacity() int64 { + return tb.capacity +} + // Rate returns the fill rate of the bucket, in tokens per second. func (tb *Bucket) Rate() float64 { return 1e9 * float64(tb.quantum) / float64(tb.fillInterval) diff --git a/pkg/util/throttle.go b/pkg/util/throttle.go index b7d7785d90d..54a2fe58dae 100644 --- a/pkg/util/throttle.go +++ b/pkg/util/throttle.go @@ -26,9 +26,14 @@ type RateLimiter interface { Accept() // Stop stops the rate limiter, subsequent calls to CanAccept will return false Stop() + // Saturation returns a percentage number which describes how saturated + // this rate limiter is. + // Usually we use token bucket rate limiter. In that case, + // 1.0 means no tokens are available; 0.0 means we have a full bucket of tokens to use. + Saturation() float64 } -type tickRateLimiter struct { +type tokenBucketRateLimiter struct { limiter *ratelimit.Bucket } @@ -39,7 +44,7 @@ type tickRateLimiter struct { // The maximum number of tokens in the bucket is capped at 'burst'. func NewTokenBucketRateLimiter(qps float32, burst int) RateLimiter { limiter := ratelimit.NewBucketWithRate(float64(qps), int64(burst)) - return &tickRateLimiter{limiter} + return &tokenBucketRateLimiter{limiter} } type fakeRateLimiter struct{} @@ -48,22 +53,32 @@ func NewFakeRateLimiter() RateLimiter { return &fakeRateLimiter{} } -func (t *tickRateLimiter) TryAccept() bool { +func (t *tokenBucketRateLimiter) TryAccept() bool { return t.limiter.TakeAvailable(1) == 1 } +func (t *tokenBucketRateLimiter) Saturation() float64 { + capacity := t.limiter.Capacity() + avail := t.limiter.Available() + return float64(capacity-avail) / float64(capacity) +} + // Accept will block until a token becomes available -func (t *tickRateLimiter) Accept() { +func (t *tokenBucketRateLimiter) Accept() { t.limiter.Wait(1) } -func (t *tickRateLimiter) Stop() { +func (t *tokenBucketRateLimiter) Stop() { } func (t *fakeRateLimiter) TryAccept() bool { return true } +func (t *fakeRateLimiter) Saturation() float64 { + return 0 +} + func (t *fakeRateLimiter) Stop() {} func (t *fakeRateLimiter) Accept() {} diff --git a/pkg/util/throttle_test.go b/pkg/util/throttle_test.go index b75553df17a..089d7087191 100644 --- a/pkg/util/throttle_test.go +++ b/pkg/util/throttle_test.go @@ -17,6 +17,7 @@ limitations under the License. package util import ( + "math" "testing" "time" ) @@ -63,3 +64,26 @@ func TestThrottle(t *testing.T) { t.Error("rate limit was not respected, finished too early") } } + +func TestRateLimiterSaturation(t *testing.T) { + const e = 0.000001 + tests := []struct { + capacity int + take int + + expectedSaturation float64 + }{ + {1, 1, 1}, + {10, 3, 0.3}, + } + for i, tt := range tests { + rl := NewTokenBucketRateLimiter(1, tt.capacity) + for i := 0; i < tt.take; i++ { + rl.Accept() + } + if math.Abs(rl.Saturation()-tt.expectedSaturation) > e { + t.Fatalf("#%d: Saturation rate difference isn't within tolerable range\n want=%f, get=%f", + i, tt.expectedSaturation, rl.Saturation()) + } + } +} diff --git a/plugin/pkg/scheduler/metrics/metrics.go b/plugin/pkg/scheduler/metrics/metrics.go index 77bd64bfb32..db6258a31d9 100644 --- a/plugin/pkg/scheduler/metrics/metrics.go +++ b/plugin/pkg/scheduler/metrics/metrics.go @@ -25,6 +25,8 @@ import ( const schedulerSubsystem = "scheduler" +var BindingSaturationReportInterval = 1 * time.Second + var ( E2eSchedulingLatency = prometheus.NewSummary( prometheus.SummaryOpts{ @@ -50,6 +52,13 @@ var ( MaxAge: time.Hour, }, ) + BindingRateLimiterSaturation = prometheus.NewGauge( + prometheus.GaugeOpts{ + Subsystem: schedulerSubsystem, + Name: "binding_ratelimiter_saturation", + Help: "Binding rateLimiter's saturation rate in percentage", + }, + ) ) var registerMetrics sync.Once @@ -61,6 +70,7 @@ func Register() { prometheus.MustRegister(E2eSchedulingLatency) prometheus.MustRegister(SchedulingAlgorithmLatency) prometheus.MustRegister(BindingLatency) + prometheus.MustRegister(BindingRateLimiterSaturation) }) } diff --git a/plugin/pkg/scheduler/scheduler.go b/plugin/pkg/scheduler/scheduler.go index 2362f1eb0d6..cbfcbc8fd62 100644 --- a/plugin/pkg/scheduler/scheduler.go +++ b/plugin/pkg/scheduler/scheduler.go @@ -77,6 +77,7 @@ type Config struct { Binder Binder // Rate at which we can create pods + // If this field is nil, we don't have any rate limit. BindPodsRateLimiter util.RateLimiter // NextPod should be a function that blocks until the next pod @@ -107,6 +108,12 @@ func New(c *Config) *Scheduler { // Run begins watching and scheduling. It starts a goroutine and returns immediately. func (s *Scheduler) Run() { + if s.config.BindPodsRateLimiter != nil { + go util.Forever(func() { + sat := s.config.BindPodsRateLimiter.Saturation() + metrics.BindingRateLimiterSaturation.Set(sat) + }, metrics.BindingSaturationReportInterval) + } go util.Until(s.scheduleOne, 0, s.config.StopEverything) } diff --git a/plugin/pkg/scheduler/scheduler_test.go b/plugin/pkg/scheduler/scheduler_test.go index 94523bcdd92..715461a881b 100644 --- a/plugin/pkg/scheduler/scheduler_test.go +++ b/plugin/pkg/scheduler/scheduler_test.go @@ -307,6 +307,10 @@ func (fr *FakeRateLimiter) TryAccept() bool { return true } +func (fr *FakeRateLimiter) Saturation() float64 { + return 0 +} + func (fr *FakeRateLimiter) Stop() {} func (fr *FakeRateLimiter) Accept() {