Merge pull request #137358 from xigang/processlister_log

client-go/cache: add slow-handler tracing in processorListener

Kubernetes-commit: 1f5701a46d6edeb396531968dc8ed332bf69e5f4
This commit is contained in:
Kubernetes Publisher
2026-03-06 12:02:18 +05:30

View File

@@ -35,6 +35,7 @@ import (
"k8s.io/utils/buffer"
"k8s.io/utils/clock"
"k8s.io/utils/ptr"
utiltrace "k8s.io/utils/trace"
"k8s.io/klog/v2"
@@ -1213,7 +1214,8 @@ type processorListener struct {
addCh chan interface{}
done chan struct{}
handler ResourceEventHandler
handler ResourceEventHandler
handlerName string
syncTracker *synctrack.SingleFileTracker
upstreamHasSynced DoneChecker
@@ -1224,6 +1226,9 @@ type processorListener struct {
// TODO: This is no worse than before, since reflectors were backed by unbounded DeltaFIFOs, but
// we should try to do something better.
pendingNotifications buffer.RingGrowing
// pendingNotificationsLength tracks pendingNotifications size and is only mutated by pop().
// run() reads this to decide when to enable expensive time tracing.
pendingNotificationsLength atomic.Int64
// requestedResyncPeriod is how frequently the listener wants a
// full resync from the shared informer, but modified by two
@@ -1261,6 +1266,7 @@ func (p *processorListener) HasSyncedChecker() DoneChecker {
}
func newProcessListener(logger klog.Logger, handler ResourceEventHandler, requestedResyncPeriod, resyncPeriod time.Duration, now time.Time, bufferSize int, hasSynced DoneChecker) *processorListener {
handlerName := nameForHandler(handler)
ret := &processorListener{
logger: logger,
nextCh: make(chan interface{}),
@@ -1268,7 +1274,8 @@ func newProcessListener(logger klog.Logger, handler ResourceEventHandler, reques
done: make(chan struct{}),
upstreamHasSynced: hasSynced,
handler: handler,
syncTracker: synctrack.NewSingleFileTracker(fmt.Sprintf("%s + event handler %s", hasSynced.Name(), nameForHandler(handler))),
handlerName: handlerName,
syncTracker: synctrack.NewSingleFileTracker(fmt.Sprintf("%s + event handler %s", hasSynced.Name(), handlerName)),
pendingNotifications: *buffer.NewRingGrowing(bufferSize),
requestedResyncPeriod: requestedResyncPeriod,
resyncPeriod: resyncPeriod,
@@ -1299,7 +1306,9 @@ func (p *processorListener) pop() {
// Notification dispatched
var ok bool
notification, ok = p.pendingNotifications.ReadOne()
if !ok { // Nothing to pop
if ok {
p.pendingNotificationsLength.Add(-1)
} else { // Nothing to pop
nextCh = nil // Disable this select case
}
case notificationToAdd, ok := <-p.addCh:
@@ -1312,6 +1321,7 @@ func (p *processorListener) pop() {
nextCh = p.nextCh
} else { // There is already a notification waiting to be dispatched
p.pendingNotifications.WriteOne(notificationToAdd)
p.pendingNotificationsLength.Add(1)
}
}
}
@@ -1334,6 +1344,14 @@ func (p *processorListener) run() {
// Gets reset below, but only if we get that far.
sleepAfterCrash = true
defer utilruntime.HandleCrashWithLogger(p.logger)
pendingNotifications := p.pendingNotificationsLength.Load()
if pendingNotifications > initialBufferSize {
trace := utiltrace.New("processorListener handler",
utiltrace.Field{Key: "handler", Value: p.handlerName},
utiltrace.Field{Key: "pendingNotifications", Value: pendingNotifications},
)
defer trace.LogIfLong(100 * time.Millisecond)
}
switch notification := next.(type) {
case updateNotification: