mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-25 20:53:33 +00:00
Merge pull request #17673 from hongchaodeng/metrics
Auto commit by PR queue bot
This commit is contained in:
commit
3180b00f6c
2
Godeps/Godeps.json
generated
2
Godeps/Godeps.json
generated
@ -554,7 +554,7 @@
|
|||||||
},
|
},
|
||||||
{
|
{
|
||||||
"ImportPath": "github.com/juju/ratelimit",
|
"ImportPath": "github.com/juju/ratelimit",
|
||||||
"Rev": "772f5c38e468398c4511514f4f6aa9a4185bc0a0"
|
"Rev": "77ed1c8a01217656d2080ad51981f6e99adaa177"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"ImportPath": "github.com/kardianos/osext",
|
"ImportPath": "github.com/kardianos/osext",
|
||||||
|
26
Godeps/_workspace/src/github.com/juju/ratelimit/ratelimit.go
generated
vendored
26
Godeps/_workspace/src/github.com/juju/ratelimit/ratelimit.go
generated
vendored
@ -8,10 +8,10 @@
|
|||||||
package ratelimit
|
package ratelimit
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"math"
|
||||||
"strconv"
|
"strconv"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
"math"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// Bucket represents a token bucket that fills at a predetermined rate.
|
// Bucket represents a token bucket that fills at a predetermined rate.
|
||||||
@ -171,6 +171,30 @@ func (tb *Bucket) takeAvailable(now time.Time, count int64) int64 {
|
|||||||
return count
|
return count
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Available returns the number of available tokens. It will be negative
|
||||||
|
// when there are consumers waiting for tokens. Note that if this
|
||||||
|
// returns greater than zero, it does not guarantee that calls that take
|
||||||
|
// tokens from the buffer will succeed, as the number of available
|
||||||
|
// tokens could have changed in the meantime. This method is intended
|
||||||
|
// primarily for metrics reporting and debugging.
|
||||||
|
func (tb *Bucket) Available() int64 {
|
||||||
|
return tb.available(time.Now())
|
||||||
|
}
|
||||||
|
|
||||||
|
// available is the internal version of available - it takes the current time as
|
||||||
|
// an argument to enable easy testing.
|
||||||
|
func (tb *Bucket) available(now time.Time) int64 {
|
||||||
|
tb.mu.Lock()
|
||||||
|
defer tb.mu.Unlock()
|
||||||
|
tb.adjust(now)
|
||||||
|
return tb.avail
|
||||||
|
}
|
||||||
|
|
||||||
|
// Capacity returns the capacity that the bucket was created with.
|
||||||
|
func (tb *Bucket) Capacity() int64 {
|
||||||
|
return tb.capacity
|
||||||
|
}
|
||||||
|
|
||||||
// Rate returns the fill rate of the bucket, in tokens per second.
|
// Rate returns the fill rate of the bucket, in tokens per second.
|
||||||
func (tb *Bucket) Rate() float64 {
|
func (tb *Bucket) Rate() float64 {
|
||||||
return 1e9 * float64(tb.quantum) / float64(tb.fillInterval)
|
return 1e9 * float64(tb.quantum) / float64(tb.fillInterval)
|
||||||
|
@ -26,9 +26,14 @@ type RateLimiter interface {
|
|||||||
Accept()
|
Accept()
|
||||||
// Stop stops the rate limiter, subsequent calls to CanAccept will return false
|
// Stop stops the rate limiter, subsequent calls to CanAccept will return false
|
||||||
Stop()
|
Stop()
|
||||||
|
// Saturation returns a percentage number which describes how saturated
|
||||||
|
// this rate limiter is.
|
||||||
|
// Usually we use token bucket rate limiter. In that case,
|
||||||
|
// 1.0 means no tokens are available; 0.0 means we have a full bucket of tokens to use.
|
||||||
|
Saturation() float64
|
||||||
}
|
}
|
||||||
|
|
||||||
type tickRateLimiter struct {
|
type tokenBucketRateLimiter struct {
|
||||||
limiter *ratelimit.Bucket
|
limiter *ratelimit.Bucket
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -39,7 +44,7 @@ type tickRateLimiter struct {
|
|||||||
// The maximum number of tokens in the bucket is capped at 'burst'.
|
// The maximum number of tokens in the bucket is capped at 'burst'.
|
||||||
func NewTokenBucketRateLimiter(qps float32, burst int) RateLimiter {
|
func NewTokenBucketRateLimiter(qps float32, burst int) RateLimiter {
|
||||||
limiter := ratelimit.NewBucketWithRate(float64(qps), int64(burst))
|
limiter := ratelimit.NewBucketWithRate(float64(qps), int64(burst))
|
||||||
return &tickRateLimiter{limiter}
|
return &tokenBucketRateLimiter{limiter}
|
||||||
}
|
}
|
||||||
|
|
||||||
type fakeRateLimiter struct{}
|
type fakeRateLimiter struct{}
|
||||||
@ -48,22 +53,32 @@ func NewFakeRateLimiter() RateLimiter {
|
|||||||
return &fakeRateLimiter{}
|
return &fakeRateLimiter{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *tickRateLimiter) TryAccept() bool {
|
func (t *tokenBucketRateLimiter) TryAccept() bool {
|
||||||
return t.limiter.TakeAvailable(1) == 1
|
return t.limiter.TakeAvailable(1) == 1
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (t *tokenBucketRateLimiter) Saturation() float64 {
|
||||||
|
capacity := t.limiter.Capacity()
|
||||||
|
avail := t.limiter.Available()
|
||||||
|
return float64(capacity-avail) / float64(capacity)
|
||||||
|
}
|
||||||
|
|
||||||
// Accept will block until a token becomes available
|
// Accept will block until a token becomes available
|
||||||
func (t *tickRateLimiter) Accept() {
|
func (t *tokenBucketRateLimiter) Accept() {
|
||||||
t.limiter.Wait(1)
|
t.limiter.Wait(1)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *tickRateLimiter) Stop() {
|
func (t *tokenBucketRateLimiter) Stop() {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *fakeRateLimiter) TryAccept() bool {
|
func (t *fakeRateLimiter) TryAccept() bool {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (t *fakeRateLimiter) Saturation() float64 {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
||||||
func (t *fakeRateLimiter) Stop() {}
|
func (t *fakeRateLimiter) Stop() {}
|
||||||
|
|
||||||
func (t *fakeRateLimiter) Accept() {}
|
func (t *fakeRateLimiter) Accept() {}
|
||||||
|
@ -17,6 +17,7 @@ limitations under the License.
|
|||||||
package util
|
package util
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"math"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
@ -63,3 +64,26 @@ func TestThrottle(t *testing.T) {
|
|||||||
t.Error("rate limit was not respected, finished too early")
|
t.Error("rate limit was not respected, finished too early")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestRateLimiterSaturation(t *testing.T) {
|
||||||
|
const e = 0.000001
|
||||||
|
tests := []struct {
|
||||||
|
capacity int
|
||||||
|
take int
|
||||||
|
|
||||||
|
expectedSaturation float64
|
||||||
|
}{
|
||||||
|
{1, 1, 1},
|
||||||
|
{10, 3, 0.3},
|
||||||
|
}
|
||||||
|
for i, tt := range tests {
|
||||||
|
rl := NewTokenBucketRateLimiter(1, tt.capacity)
|
||||||
|
for i := 0; i < tt.take; i++ {
|
||||||
|
rl.Accept()
|
||||||
|
}
|
||||||
|
if math.Abs(rl.Saturation()-tt.expectedSaturation) > e {
|
||||||
|
t.Fatalf("#%d: Saturation rate difference isn't within tolerable range\n want=%f, get=%f",
|
||||||
|
i, tt.expectedSaturation, rl.Saturation())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -25,6 +25,8 @@ import (
|
|||||||
|
|
||||||
const schedulerSubsystem = "scheduler"
|
const schedulerSubsystem = "scheduler"
|
||||||
|
|
||||||
|
var BindingSaturationReportInterval = 1 * time.Second
|
||||||
|
|
||||||
var (
|
var (
|
||||||
E2eSchedulingLatency = prometheus.NewSummary(
|
E2eSchedulingLatency = prometheus.NewSummary(
|
||||||
prometheus.SummaryOpts{
|
prometheus.SummaryOpts{
|
||||||
@ -50,6 +52,13 @@ var (
|
|||||||
MaxAge: time.Hour,
|
MaxAge: time.Hour,
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
BindingRateLimiterSaturation = prometheus.NewGauge(
|
||||||
|
prometheus.GaugeOpts{
|
||||||
|
Subsystem: schedulerSubsystem,
|
||||||
|
Name: "binding_ratelimiter_saturation",
|
||||||
|
Help: "Binding rateLimiter's saturation rate in percentage",
|
||||||
|
},
|
||||||
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
var registerMetrics sync.Once
|
var registerMetrics sync.Once
|
||||||
@ -61,6 +70,7 @@ func Register() {
|
|||||||
prometheus.MustRegister(E2eSchedulingLatency)
|
prometheus.MustRegister(E2eSchedulingLatency)
|
||||||
prometheus.MustRegister(SchedulingAlgorithmLatency)
|
prometheus.MustRegister(SchedulingAlgorithmLatency)
|
||||||
prometheus.MustRegister(BindingLatency)
|
prometheus.MustRegister(BindingLatency)
|
||||||
|
prometheus.MustRegister(BindingRateLimiterSaturation)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -77,6 +77,7 @@ type Config struct {
|
|||||||
Binder Binder
|
Binder Binder
|
||||||
|
|
||||||
// Rate at which we can create pods
|
// Rate at which we can create pods
|
||||||
|
// If this field is nil, we don't have any rate limit.
|
||||||
BindPodsRateLimiter util.RateLimiter
|
BindPodsRateLimiter util.RateLimiter
|
||||||
|
|
||||||
// NextPod should be a function that blocks until the next pod
|
// NextPod should be a function that blocks until the next pod
|
||||||
@ -107,6 +108,12 @@ func New(c *Config) *Scheduler {
|
|||||||
|
|
||||||
// Run begins watching and scheduling. It starts a goroutine and returns immediately.
|
// Run begins watching and scheduling. It starts a goroutine and returns immediately.
|
||||||
func (s *Scheduler) Run() {
|
func (s *Scheduler) Run() {
|
||||||
|
if s.config.BindPodsRateLimiter != nil {
|
||||||
|
go util.Forever(func() {
|
||||||
|
sat := s.config.BindPodsRateLimiter.Saturation()
|
||||||
|
metrics.BindingRateLimiterSaturation.Set(sat)
|
||||||
|
}, metrics.BindingSaturationReportInterval)
|
||||||
|
}
|
||||||
go util.Until(s.scheduleOne, 0, s.config.StopEverything)
|
go util.Until(s.scheduleOne, 0, s.config.StopEverything)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -307,6 +307,10 @@ func (fr *FakeRateLimiter) TryAccept() bool {
|
|||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (fr *FakeRateLimiter) Saturation() float64 {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
||||||
func (fr *FakeRateLimiter) Stop() {}
|
func (fr *FakeRateLimiter) Stop() {}
|
||||||
|
|
||||||
func (fr *FakeRateLimiter) Accept() {
|
func (fr *FakeRateLimiter) Accept() {
|
||||||
|
Loading…
Reference in New Issue
Block a user