mirror of
https://github.com/kubernetes/client-go.git
synced 2025-07-04 10:46:16 +00:00
Improve rate limiter latency logging and metrics
Kubernetes-commit: 2bcf99f05fdc47fb4bc3601b9134408483f59773
This commit is contained in:
parent
f772958f8a
commit
2d3138825e
@ -39,6 +39,7 @@ import (
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/apimachinery/pkg/runtime/serializer/streaming"
|
||||
utilclock "k8s.io/apimachinery/pkg/util/clock"
|
||||
"k8s.io/apimachinery/pkg/util/net"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
restclientwatch "k8s.io/client-go/rest/watch"
|
||||
@ -556,36 +557,68 @@ func (r *Request) tryThrottle(ctx context.Context) error {
|
||||
klog.V(3).Infof("Throttling request took %v, request: %s:%s", latency, r.verb, r.URL().String())
|
||||
}
|
||||
if latency > extraLongThrottleLatency {
|
||||
globalThrottledLogger.Log(2, fmt.Sprintf("Throttling request took %v, request: %s:%s", latency, r.verb, r.URL().String()))
|
||||
// If the rate limiter latency is very high, the log message should be printed at a higher log level,
|
||||
// but we use a throttled logger to prevent spamming.
|
||||
globalThrottledLogger.Infof("Throttling request took %v, request: %s:%s", latency, r.verb, r.URL().String())
|
||||
}
|
||||
metrics.RateLimiterLatency.Observe(r.verb, r.finalURLTemplate(), latency)
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
type throttledLogger struct {
|
||||
logTimeLock sync.RWMutex
|
||||
lastLogTime time.Time
|
||||
type throttleSettings struct {
|
||||
logLevel klog.Level
|
||||
minLogInterval time.Duration
|
||||
|
||||
lastLogTime time.Time
|
||||
lock sync.RWMutex
|
||||
}
|
||||
|
||||
type throttledLogger struct {
|
||||
clock utilclock.PassiveClock
|
||||
settings []*throttleSettings
|
||||
}
|
||||
|
||||
var globalThrottledLogger = &throttledLogger{
|
||||
clock: utilclock.RealClock{},
|
||||
settings: []*throttleSettings{
|
||||
{
|
||||
logLevel: 2,
|
||||
minLogInterval: 1 * time.Second,
|
||||
}, {
|
||||
logLevel: 0,
|
||||
minLogInterval: 10 * time.Second,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
func (b *throttledLogger) Log(level klog.Level, message string) {
|
||||
if bool(klog.V(level)) {
|
||||
func (b *throttledLogger) attemptToLog() (klog.Level, bool) {
|
||||
for _, setting := range b.settings {
|
||||
if bool(klog.V(setting.logLevel)) {
|
||||
// Return early without write locking if possible.
|
||||
if func() bool {
|
||||
b.logTimeLock.RLock()
|
||||
defer b.logTimeLock.RUnlock()
|
||||
return time.Since(b.lastLogTime) > b.minLogInterval
|
||||
setting.lock.RLock()
|
||||
defer setting.lock.RUnlock()
|
||||
return b.clock.Since(setting.lastLogTime) >= setting.minLogInterval
|
||||
}() {
|
||||
b.logTimeLock.Lock()
|
||||
defer b.logTimeLock.Unlock()
|
||||
if time.Since(b.lastLogTime) > b.minLogInterval {
|
||||
klog.V(level).Info(message)
|
||||
b.lastLogTime = time.Now()
|
||||
setting.lock.Lock()
|
||||
defer setting.lock.Unlock()
|
||||
if b.clock.Since(setting.lastLogTime) >= setting.minLogInterval {
|
||||
setting.lastLogTime = b.clock.Now()
|
||||
return setting.logLevel, true
|
||||
}
|
||||
}
|
||||
return -1, false
|
||||
}
|
||||
}
|
||||
return -1, false
|
||||
}
|
||||
|
||||
// Infof will write a log message at each logLevel specified by the reciever's throttleSettings
|
||||
// as long as it hasn't written a log message more recently than minLogInterval.
|
||||
func (b *throttledLogger) Infof(message string, args ...interface{}) {
|
||||
if logLevel, ok := b.attemptToLog(); ok {
|
||||
klog.V(logLevel).Infof(message, args...)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -31,6 +31,7 @@ import (
|
||||
"os"
|
||||
"reflect"
|
||||
"strings"
|
||||
"sync"
|
||||
"syscall"
|
||||
"testing"
|
||||
"time"
|
||||
@ -2190,3 +2191,30 @@ func TestRequestPreflightCheck(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestThrottledLogger(t *testing.T) {
|
||||
now := time.Now()
|
||||
clock := clock.NewFakeClock(now)
|
||||
globalThrottledLogger.clock = clock
|
||||
|
||||
logMessages := 0
|
||||
for i := 0; i < 10000; i++ {
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(100)
|
||||
for j := 0; j < 100; j++ {
|
||||
go func() {
|
||||
if _, ok := globalThrottledLogger.attemptToLog(); ok {
|
||||
logMessages++
|
||||
}
|
||||
wg.Done()
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
now = now.Add(1 * time.Second)
|
||||
clock.SetTime(now)
|
||||
}
|
||||
|
||||
if a, e := logMessages, 1000; a != e {
|
||||
t.Fatalf("expected %v log messages, but got %v", e, a)
|
||||
}
|
||||
}
|
||||
|
@ -53,6 +53,8 @@ var (
|
||||
ClientCertRotationAge DurationMetric = noopDuration{}
|
||||
// RequestLatency is the latency metric that rest clients will update.
|
||||
RequestLatency LatencyMetric = noopLatency{}
|
||||
// RateLimiterLatency is the client side rate limiter latency metric.
|
||||
RateLimiterLatency LatencyMetric = noopLatency{}
|
||||
// RequestResult is the result metric that rest clients will update.
|
||||
RequestResult ResultMetric = noopResult{}
|
||||
)
|
||||
@ -62,6 +64,7 @@ type RegisterOpts struct {
|
||||
ClientCertExpiry ExpiryMetric
|
||||
ClientCertRotationAge DurationMetric
|
||||
RequestLatency LatencyMetric
|
||||
RateLimiterLatency LatencyMetric
|
||||
RequestResult ResultMetric
|
||||
}
|
||||
|
||||
@ -78,6 +81,9 @@ func Register(opts RegisterOpts) {
|
||||
if opts.RequestLatency != nil {
|
||||
RequestLatency = opts.RequestLatency
|
||||
}
|
||||
if opts.RateLimiterLatency != nil {
|
||||
RateLimiterLatency = opts.RateLimiterLatency
|
||||
}
|
||||
if opts.RequestResult != nil {
|
||||
RequestResult = opts.RequestResult
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user