client-go/cache: add slow-handler tracing in processorListener

Signed-off-by: xigang <wangxigang2014@gmail.com>

Kubernetes-commit: ac1eb12281bdc7fb7e0269fb4bb914369b92bb64
This commit is contained in:
xigang
2026-03-03 13:07:08 +08:00
committed by Kubernetes Publisher
parent c3a1049f7e
commit f6c2d7ca65

View File

@@ -35,6 +35,7 @@ import (
"k8s.io/utils/buffer"
"k8s.io/utils/clock"
"k8s.io/utils/ptr"
utiltrace "k8s.io/utils/trace"
"k8s.io/klog/v2"
@@ -1213,7 +1214,8 @@ type processorListener struct {
addCh chan interface{}
done chan struct{}
handler ResourceEventHandler
handler ResourceEventHandler
handlerName string
syncTracker *synctrack.SingleFileTracker
upstreamHasSynced DoneChecker
@@ -1224,6 +1226,9 @@ type processorListener struct {
// TODO: This is no worse than before, since reflectors were backed by unbounded DeltaFIFOs, but
// we should try to do something better.
pendingNotifications buffer.RingGrowing
// pendingNotificationsLength tracks pendingNotifications size and is only mutated by pop().
// run() reads this to decide when to enable expensive time tracing.
pendingNotificationsLength atomic.Int64
// requestedResyncPeriod is how frequently the listener wants a
// full resync from the shared informer, but modified by two
@@ -1261,6 +1266,7 @@ func (p *processorListener) HasSyncedChecker() DoneChecker {
}
func newProcessListener(logger klog.Logger, handler ResourceEventHandler, requestedResyncPeriod, resyncPeriod time.Duration, now time.Time, bufferSize int, hasSynced DoneChecker) *processorListener {
handlerName := nameForHandler(handler)
ret := &processorListener{
logger: logger,
nextCh: make(chan interface{}),
@@ -1268,7 +1274,8 @@ func newProcessListener(logger klog.Logger, handler ResourceEventHandler, reques
done: make(chan struct{}),
upstreamHasSynced: hasSynced,
handler: handler,
syncTracker: synctrack.NewSingleFileTracker(fmt.Sprintf("%s + event handler %s", hasSynced.Name(), nameForHandler(handler))),
handlerName: handlerName,
syncTracker: synctrack.NewSingleFileTracker(fmt.Sprintf("%s + event handler %s", hasSynced.Name(), handlerName)),
pendingNotifications: *buffer.NewRingGrowing(bufferSize),
requestedResyncPeriod: requestedResyncPeriod,
resyncPeriod: resyncPeriod,
@@ -1299,7 +1306,9 @@ func (p *processorListener) pop() {
// Notification dispatched
var ok bool
notification, ok = p.pendingNotifications.ReadOne()
if !ok { // Nothing to pop
if ok {
p.pendingNotificationsLength.Add(-1)
} else { // Nothing to pop
nextCh = nil // Disable this select case
}
case notificationToAdd, ok := <-p.addCh:
@@ -1312,6 +1321,7 @@ func (p *processorListener) pop() {
nextCh = p.nextCh
} else { // There is already a notification waiting to be dispatched
p.pendingNotifications.WriteOne(notificationToAdd)
p.pendingNotificationsLength.Add(1)
}
}
}
@@ -1334,6 +1344,14 @@ func (p *processorListener) run() {
// Gets reset below, but only if we get that far.
sleepAfterCrash = true
defer utilruntime.HandleCrashWithLogger(p.logger)
pendingNotifications := p.pendingNotificationsLength.Load()
if pendingNotifications > initialBufferSize {
trace := utiltrace.New("processorListener handler",
utiltrace.Field{Key: "handler", Value: p.handlerName},
utiltrace.Field{Key: "pendingNotifications", Value: pendingNotifications},
)
defer trace.LogIfLong(100 * time.Millisecond)
}
switch notification := next.(type) {
case updateNotification: