client-go: deltaFIFO trace slow handlers

If the informers handlers are slow processing the objects, the deltaFIFO
blocks the queue and the streamWatchers can not add new elements to the
queue, creating contention and causing different problems, like high
memory usage.
The problem is not easy to identify from a user perspective, typically
you can use pprof to identify a high memory usage on the StreamWatchers
or some handler consuming most of the cpu time, but users should not
have to profile the golang binary in order to know that.

Metrics were disabled on the reflector because of memory leaks, also
monitoring the queue depth can't give a good signal, since it never goes high

However, we can trace slow handlers and inform users about the problem.

Kubernetes-commit: d38c2df2c4b945bcf1f81714fc6bfd01bbd0f538
This commit is contained in:
Antonio Ojea 2021-07-26 16:00:31 +02:00 committed by Kubernetes Publisher
parent fa98c04850
commit c63d830a92

View File

@ -20,10 +20,12 @@ import (
"errors"
"fmt"
"sync"
"time"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/klog/v2"
utiltrace "k8s.io/utils/trace"
)
// DeltaFIFOOptions is the configuration parameters for DeltaFIFO. All are
@ -526,6 +528,7 @@ func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
}
id := f.queue[0]
f.queue = f.queue[1:]
depth := len(f.queue)
if f.initialPopulationCount > 0 {
f.initialPopulationCount--
}
@ -536,6 +539,18 @@ func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
continue
}
delete(f.items, id)
// Only log traces if the queue depth is greater than 10 and it takes more than
// 100 milliseconds to process one item from the queue.
// Queue depth never goes high because processing an item is locking the queue,
// and new items can't be added until processing finish.
// https://github.com/kubernetes/kubernetes/issues/103789
if depth > 10 {
trace := utiltrace.New("DeltaFIFO Pop Process",
utiltrace.Field{Key: "ID", Value: id},
utiltrace.Field{Key: "Depth", Value: depth},
utiltrace.Field{Key: "Reason", Value: "slow event handlers blocking the queue"})
defer trace.LogIfLong(100 * time.Millisecond)
}
err := process(item)
if e, ok := err.(ErrRequeue); ok {
f.addIfNotPresent(id, item)