diff --git a/pkg/controller/disruption/disruption.go b/pkg/controller/disruption/disruption.go index edaff5af2bd..72692dea8a3 100644 --- a/pkg/controller/disruption/disruption.go +++ b/pkg/controller/disruption/disruption.go @@ -182,21 +182,24 @@ func NewDisruptionControllerInternal(ctx context.Context, workqueue.DefaultTypedControllerRateLimiter[string](), workqueue.TypedRateLimitingQueueConfig[string]{ DelayingQueue: workqueue.NewTypedDelayingQueueWithConfig(workqueue.TypedDelayingQueueConfig[string]{ - Clock: clock, - Name: "disruption", + Logger: &logger, + Clock: clock, + Name: "disruption", }), }, ), recheckQueue: workqueue.NewTypedDelayingQueueWithConfig(workqueue.TypedDelayingQueueConfig[string]{ - Clock: clock, - Name: "disruption_recheck", + Logger: &logger, + Clock: clock, + Name: "disruption_recheck", }), stalePodDisruptionQueue: workqueue.NewTypedRateLimitingQueueWithConfig( workqueue.DefaultTypedControllerRateLimiter[string](), workqueue.TypedRateLimitingQueueConfig[string]{ DelayingQueue: workqueue.NewTypedDelayingQueueWithConfig(workqueue.TypedDelayingQueueConfig[string]{ - Clock: clock, - Name: "stale_pod_disruption", + Logger: &logger, + Clock: clock, + Name: "stale_pod_disruption", }), }, ), diff --git a/staging/src/k8s.io/client-go/util/workqueue/delaying_queue.go b/staging/src/k8s.io/client-go/util/workqueue/delaying_queue.go index e33a6c6929d..da444f4fb35 100644 --- a/staging/src/k8s.io/client-go/util/workqueue/delaying_queue.go +++ b/staging/src/k8s.io/client-go/util/workqueue/delaying_queue.go @@ -22,6 +22,7 @@ import ( "time" utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/klog/v2" "k8s.io/utils/clock" ) @@ -46,6 +47,10 @@ type DelayingQueueConfig = TypedDelayingQueueConfig[any] // TypedDelayingQueueConfig specifies optional configurations to customize a DelayingInterface. type TypedDelayingQueueConfig[T comparable] struct { + // An optional logger. The name of the queue does *not* get added to it, this should + // be done by the caller if desired. + Logger *klog.Logger + // Name for the queue. If unnamed, the metrics will not be registered. Name string @@ -94,6 +99,10 @@ func TypedNewDelayingQueue[T comparable]() TypedDelayingInterface[T] { // NewTypedDelayingQueueWithConfig constructs a new workqueue with options to // customize different properties. func NewTypedDelayingQueueWithConfig[T comparable](config TypedDelayingQueueConfig[T]) TypedDelayingInterface[T] { + logger := klog.Background() + if config.Logger != nil { + logger = *config.Logger + } if config.Clock == nil { config.Clock = clock.RealClock{} } @@ -106,7 +115,7 @@ func NewTypedDelayingQueueWithConfig[T comparable](config TypedDelayingQueueConf }) } - return newDelayingQueue(config.Clock, config.Queue, config.Name, config.MetricsProvider) + return newDelayingQueue(logger, config.Clock, config.Queue, config.Name, config.MetricsProvider) } // NewDelayingQueueWithCustomQueue constructs a new workqueue with ability to @@ -135,7 +144,7 @@ func NewDelayingQueueWithCustomClock(clock clock.WithTicker, name string) Delayi }) } -func newDelayingQueue[T comparable](clock clock.WithTicker, q TypedInterface[T], name string, provider MetricsProvider) *delayingType[T] { +func newDelayingQueue[T comparable](logger klog.Logger, clock clock.WithTicker, q TypedInterface[T], name string, provider MetricsProvider) *delayingType[T] { ret := &delayingType[T]{ TypedInterface: q, clock: clock, @@ -145,7 +154,7 @@ func newDelayingQueue[T comparable](clock clock.WithTicker, q TypedInterface[T], metrics: newRetryMetrics(name, provider), } - go ret.waitingLoop() + go ret.waitingLoop(logger) return ret } @@ -264,8 +273,8 @@ func (q *delayingType[T]) AddAfter(item T, duration time.Duration) { const maxWait = 10 * time.Second // waitingLoop runs until the workqueue is shutdown and keeps a check on the list of items to be added. -func (q *delayingType[T]) waitingLoop() { - defer utilruntime.HandleCrash() +func (q *delayingType[T]) waitingLoop(logger klog.Logger) { + defer utilruntime.HandleCrashWithLogger(logger) // Make a placeholder channel to use when there are no items in our list never := make(<-chan time.Time) diff --git a/staging/src/k8s.io/client-go/util/workqueue/parallelizer.go b/staging/src/k8s.io/client-go/util/workqueue/parallelizer.go index 366bf20a312..9f986a25a40 100644 --- a/staging/src/k8s.io/client-go/util/workqueue/parallelizer.go +++ b/staging/src/k8s.io/client-go/util/workqueue/parallelizer.go @@ -74,7 +74,7 @@ func ParallelizeUntil(ctx context.Context, workers, pieces int, doWorkPiece DoWo wg.Add(workers) for i := 0; i < workers; i++ { go func() { - defer utilruntime.HandleCrash() + defer utilruntime.HandleCrashWithContext(ctx) defer wg.Done() for chunk := range toProcess { start := chunk * chunkSize