mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 11:50:44 +00:00
Merge pull request #103917 from aojea/slow_handlers
client-go: deltaFIFO trace slow handlers
This commit is contained in:
commit
ec663ada4b
@ -20,10 +20,12 @@ import (
|
|||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"sync"
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
"k8s.io/apimachinery/pkg/util/sets"
|
"k8s.io/apimachinery/pkg/util/sets"
|
||||||
|
|
||||||
"k8s.io/klog/v2"
|
"k8s.io/klog/v2"
|
||||||
|
utiltrace "k8s.io/utils/trace"
|
||||||
)
|
)
|
||||||
|
|
||||||
// DeltaFIFOOptions is the configuration parameters for DeltaFIFO. All are
|
// DeltaFIFOOptions is the configuration parameters for DeltaFIFO. All are
|
||||||
@ -526,6 +528,7 @@ func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
|
|||||||
}
|
}
|
||||||
id := f.queue[0]
|
id := f.queue[0]
|
||||||
f.queue = f.queue[1:]
|
f.queue = f.queue[1:]
|
||||||
|
depth := len(f.queue)
|
||||||
if f.initialPopulationCount > 0 {
|
if f.initialPopulationCount > 0 {
|
||||||
f.initialPopulationCount--
|
f.initialPopulationCount--
|
||||||
}
|
}
|
||||||
@ -536,6 +539,18 @@ func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
delete(f.items, id)
|
delete(f.items, id)
|
||||||
|
// Only log traces if the queue depth is greater than 10 and it takes more than
|
||||||
|
// 100 milliseconds to process one item from the queue.
|
||||||
|
// Queue depth never goes high because processing an item is locking the queue,
|
||||||
|
// and new items can't be added until processing finish.
|
||||||
|
// https://github.com/kubernetes/kubernetes/issues/103789
|
||||||
|
if depth > 10 {
|
||||||
|
trace := utiltrace.New("DeltaFIFO Pop Process",
|
||||||
|
utiltrace.Field{Key: "ID", Value: id},
|
||||||
|
utiltrace.Field{Key: "Depth", Value: depth},
|
||||||
|
utiltrace.Field{Key: "Reason", Value: "slow event handlers blocking the queue"})
|
||||||
|
defer trace.LogIfLong(100 * time.Millisecond)
|
||||||
|
}
|
||||||
err := process(item)
|
err := process(item)
|
||||||
if e, ok := err.(ErrRequeue); ok {
|
if e, ok := err.(ErrRequeue); ok {
|
||||||
f.addIfNotPresent(id, item)
|
f.addIfNotPresent(id, item)
|
||||||
|
Loading…
Reference in New Issue
Block a user