Merge pull request #129345 from pohly/log-client-go-workqueue

client-go workqueue: add optional logger
This commit is contained in:
Kubernetes Prow Robot 2025-03-14 06:37:53 -07:00 committed by GitHub
commit 9fd0e20bc2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 24 additions and 12 deletions

View File

@ -182,21 +182,24 @@ func NewDisruptionControllerInternal(ctx context.Context,
workqueue.DefaultTypedControllerRateLimiter[string](),
workqueue.TypedRateLimitingQueueConfig[string]{
DelayingQueue: workqueue.NewTypedDelayingQueueWithConfig(workqueue.TypedDelayingQueueConfig[string]{
Clock: clock,
Name: "disruption",
Logger: &logger,
Clock: clock,
Name: "disruption",
}),
},
),
recheckQueue: workqueue.NewTypedDelayingQueueWithConfig(workqueue.TypedDelayingQueueConfig[string]{
Clock: clock,
Name: "disruption_recheck",
Logger: &logger,
Clock: clock,
Name: "disruption_recheck",
}),
stalePodDisruptionQueue: workqueue.NewTypedRateLimitingQueueWithConfig(
workqueue.DefaultTypedControllerRateLimiter[string](),
workqueue.TypedRateLimitingQueueConfig[string]{
DelayingQueue: workqueue.NewTypedDelayingQueueWithConfig(workqueue.TypedDelayingQueueConfig[string]{
Clock: clock,
Name: "stale_pod_disruption",
Logger: &logger,
Clock: clock,
Name: "stale_pod_disruption",
}),
},
),

View File

@ -22,6 +22,7 @@ import (
"time"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/klog/v2"
"k8s.io/utils/clock"
)
@ -46,6 +47,10 @@ type DelayingQueueConfig = TypedDelayingQueueConfig[any]
// TypedDelayingQueueConfig specifies optional configurations to customize a DelayingInterface.
type TypedDelayingQueueConfig[T comparable] struct {
// An optional logger. The name of the queue does *not* get added to it, this should
// be done by the caller if desired.
Logger *klog.Logger
// Name for the queue. If unnamed, the metrics will not be registered.
Name string
@ -94,6 +99,10 @@ func TypedNewDelayingQueue[T comparable]() TypedDelayingInterface[T] {
// NewTypedDelayingQueueWithConfig constructs a new workqueue with options to
// customize different properties.
func NewTypedDelayingQueueWithConfig[T comparable](config TypedDelayingQueueConfig[T]) TypedDelayingInterface[T] {
logger := klog.Background()
if config.Logger != nil {
logger = *config.Logger
}
if config.Clock == nil {
config.Clock = clock.RealClock{}
}
@ -106,7 +115,7 @@ func NewTypedDelayingQueueWithConfig[T comparable](config TypedDelayingQueueConf
})
}
return newDelayingQueue(config.Clock, config.Queue, config.Name, config.MetricsProvider)
return newDelayingQueue(logger, config.Clock, config.Queue, config.Name, config.MetricsProvider)
}
// NewDelayingQueueWithCustomQueue constructs a new workqueue with ability to
@ -135,7 +144,7 @@ func NewDelayingQueueWithCustomClock(clock clock.WithTicker, name string) Delayi
})
}
func newDelayingQueue[T comparable](clock clock.WithTicker, q TypedInterface[T], name string, provider MetricsProvider) *delayingType[T] {
func newDelayingQueue[T comparable](logger klog.Logger, clock clock.WithTicker, q TypedInterface[T], name string, provider MetricsProvider) *delayingType[T] {
ret := &delayingType[T]{
TypedInterface: q,
clock: clock,
@ -145,7 +154,7 @@ func newDelayingQueue[T comparable](clock clock.WithTicker, q TypedInterface[T],
metrics: newRetryMetrics(name, provider),
}
go ret.waitingLoop()
go ret.waitingLoop(logger)
return ret
}
@ -264,8 +273,8 @@ func (q *delayingType[T]) AddAfter(item T, duration time.Duration) {
const maxWait = 10 * time.Second
// waitingLoop runs until the workqueue is shutdown and keeps a check on the list of items to be added.
func (q *delayingType[T]) waitingLoop() {
defer utilruntime.HandleCrash()
func (q *delayingType[T]) waitingLoop(logger klog.Logger) {
defer utilruntime.HandleCrashWithLogger(logger)
// Make a placeholder channel to use when there are no items in our list
never := make(<-chan time.Time)

View File

@ -74,7 +74,7 @@ func ParallelizeUntil(ctx context.Context, workers, pieces int, doWorkPiece DoWo
wg.Add(workers)
for i := 0; i < workers; i++ {
go func() {
defer utilruntime.HandleCrash()
defer utilruntime.HandleCrashWithContext(ctx)
defer wg.Done()
for chunk := range toProcess {
start := chunk * chunkSize