diff --git a/rest/request.go b/rest/request.go index 956d924d..4c04eaad 100644 --- a/rest/request.go +++ b/rest/request.go @@ -39,6 +39,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/serializer/streaming" + utilclock "k8s.io/apimachinery/pkg/util/clock" "k8s.io/apimachinery/pkg/util/net" "k8s.io/apimachinery/pkg/watch" restclientwatch "k8s.io/client-go/rest/watch" @@ -556,37 +557,69 @@ func (r *Request) tryThrottle(ctx context.Context) error { klog.V(3).Infof("Throttling request took %v, request: %s:%s", latency, r.verb, r.URL().String()) } if latency > extraLongThrottleLatency { - globalThrottledLogger.Log(2, fmt.Sprintf("Throttling request took %v, request: %s:%s", latency, r.verb, r.URL().String())) + // If the rate limiter latency is very high, the log message should be printed at a higher log level, + // but we use a throttled logger to prevent spamming. + globalThrottledLogger.Infof("Throttling request took %v, request: %s:%s", latency, r.verb, r.URL().String()) } + metrics.RateLimiterLatency.Observe(r.verb, r.finalURLTemplate(), latency) return err } -type throttledLogger struct { - logTimeLock sync.RWMutex - lastLogTime time.Time +type throttleSettings struct { + logLevel klog.Level minLogInterval time.Duration + + lastLogTime time.Time + lock sync.RWMutex +} + +type throttledLogger struct { + clock utilclock.PassiveClock + settings []*throttleSettings } var globalThrottledLogger = &throttledLogger{ - minLogInterval: 1 * time.Second, + clock: utilclock.RealClock{}, + settings: []*throttleSettings{ + { + logLevel: 2, + minLogInterval: 1 * time.Second, + }, { + logLevel: 0, + minLogInterval: 10 * time.Second, + }, + }, } -func (b *throttledLogger) Log(level klog.Level, message string) { - if bool(klog.V(level)) { - if func() bool { - b.logTimeLock.RLock() - defer b.logTimeLock.RUnlock() - return time.Since(b.lastLogTime) > b.minLogInterval - }() { - b.logTimeLock.Lock() - defer b.logTimeLock.Unlock() - if time.Since(b.lastLogTime) > b.minLogInterval { - klog.V(level).Info(message) - b.lastLogTime = time.Now() +func (b *throttledLogger) attemptToLog() (klog.Level, bool) { + for _, setting := range b.settings { + if bool(klog.V(setting.logLevel)) { + // Return early without write locking if possible. + if func() bool { + setting.lock.RLock() + defer setting.lock.RUnlock() + return b.clock.Since(setting.lastLogTime) >= setting.minLogInterval + }() { + setting.lock.Lock() + defer setting.lock.Unlock() + if b.clock.Since(setting.lastLogTime) >= setting.minLogInterval { + setting.lastLogTime = b.clock.Now() + return setting.logLevel, true + } } + return -1, false } } + return -1, false +} + +// Infof will write a log message at each logLevel specified by the reciever's throttleSettings +// as long as it hasn't written a log message more recently than minLogInterval. +func (b *throttledLogger) Infof(message string, args ...interface{}) { + if logLevel, ok := b.attemptToLog(); ok { + klog.V(logLevel).Infof(message, args...) + } } // Watch attempts to begin watching the requested location. diff --git a/rest/request_test.go b/rest/request_test.go index 5d9fbc4a..2a183ba4 100644 --- a/rest/request_test.go +++ b/rest/request_test.go @@ -31,6 +31,7 @@ import ( "os" "reflect" "strings" + "sync" "syscall" "testing" "time" @@ -2190,3 +2191,30 @@ func TestRequestPreflightCheck(t *testing.T) { }) } } + +func TestThrottledLogger(t *testing.T) { + now := time.Now() + clock := clock.NewFakeClock(now) + globalThrottledLogger.clock = clock + + logMessages := 0 + for i := 0; i < 10000; i++ { + var wg sync.WaitGroup + wg.Add(100) + for j := 0; j < 100; j++ { + go func() { + if _, ok := globalThrottledLogger.attemptToLog(); ok { + logMessages++ + } + wg.Done() + }() + } + wg.Wait() + now = now.Add(1 * time.Second) + clock.SetTime(now) + } + + if a, e := logMessages, 1000; a != e { + t.Fatalf("expected %v log messages, but got %v", e, a) + } +} diff --git a/tools/metrics/metrics.go b/tools/metrics/metrics.go index 6a8f25a9..5194026b 100644 --- a/tools/metrics/metrics.go +++ b/tools/metrics/metrics.go @@ -53,6 +53,8 @@ var ( ClientCertRotationAge DurationMetric = noopDuration{} // RequestLatency is the latency metric that rest clients will update. RequestLatency LatencyMetric = noopLatency{} + // RateLimiterLatency is the client side rate limiter latency metric. + RateLimiterLatency LatencyMetric = noopLatency{} // RequestResult is the result metric that rest clients will update. RequestResult ResultMetric = noopResult{} ) @@ -62,6 +64,7 @@ type RegisterOpts struct { ClientCertExpiry ExpiryMetric ClientCertRotationAge DurationMetric RequestLatency LatencyMetric + RateLimiterLatency LatencyMetric RequestResult ResultMetric } @@ -78,6 +81,9 @@ func Register(opts RegisterOpts) { if opts.RequestLatency != nil { RequestLatency = opts.RequestLatency } + if opts.RateLimiterLatency != nil { + RateLimiterLatency = opts.RateLimiterLatency + } if opts.RequestResult != nil { RequestResult = opts.RequestResult }