From c63d830a9209086ce6fa875f358360b8d7615c02 Mon Sep 17 00:00:00 2001 From: Antonio Ojea Date: Mon, 26 Jul 2021 16:00:31 +0200 Subject: [PATCH] client-go: deltaFIFO trace slow handlers If the informers handlers are slow processing the objects, the deltaFIFO blocks the queue and the streamWatchers can not add new elements to the queue, creating contention and causing different problems, like high memory usage. The problem is not easy to identify from a user perspective, typically you can use pprof to identify a high memory usage on the StreamWatchers or some handler consuming most of the cpu time, but users should not have to profile the golang binary in order to know that. Metrics were disabled on the reflector because of memory leaks, also monitoring the queue depth can't give a good signal, since it never goes high However, we can trace slow handlers and inform users about the problem. Kubernetes-commit: d38c2df2c4b945bcf1f81714fc6bfd01bbd0f538 --- tools/cache/delta_fifo.go | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/tools/cache/delta_fifo.go b/tools/cache/delta_fifo.go index 82038e0f..54b81766 100644 --- a/tools/cache/delta_fifo.go +++ b/tools/cache/delta_fifo.go @@ -20,10 +20,12 @@ import ( "errors" "fmt" "sync" + "time" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/klog/v2" + utiltrace "k8s.io/utils/trace" ) // DeltaFIFOOptions is the configuration parameters for DeltaFIFO. All are @@ -526,6 +528,7 @@ func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) { } id := f.queue[0] f.queue = f.queue[1:] + depth := len(f.queue) if f.initialPopulationCount > 0 { f.initialPopulationCount-- } @@ -536,6 +539,18 @@ func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) { continue } delete(f.items, id) + // Only log traces if the queue depth is greater than 10 and it takes more than + // 100 milliseconds to process one item from the queue. + // Queue depth never goes high because processing an item is locking the queue, + // and new items can't be added until processing finish. + // https://github.com/kubernetes/kubernetes/issues/103789 + if depth > 10 { + trace := utiltrace.New("DeltaFIFO Pop Process", + utiltrace.Field{Key: "ID", Value: id}, + utiltrace.Field{Key: "Depth", Value: depth}, + utiltrace.Field{Key: "Reason", Value: "slow event handlers blocking the queue"}) + defer trace.LogIfLong(100 * time.Millisecond) + } err := process(item) if e, ok := err.(ErrRequeue); ok { f.addIfNotPresent(id, item)