diff --git a/go.mod b/go.mod index 9b3fa2fb..41b9d913 100644 --- a/go.mod +++ b/go.mod @@ -25,8 +25,8 @@ require ( golang.org/x/time v0.9.0 google.golang.org/protobuf v1.36.5 gopkg.in/evanphx/json-patch.v4 v4.12.0 - k8s.io/api v0.0.0-20250313213104-173e173fb2b8 - k8s.io/apimachinery v0.0.0-20250313012745-a04ff375cef8 + k8s.io/api v0.0.0-20250313213105-aae61a387040 + k8s.io/apimachinery v0.0.0-20250314052748-eaf4038701d0 k8s.io/klog/v2 v2.130.1 k8s.io/kube-openapi v0.0.0-20250304201544-e5f78fe3ede9 k8s.io/utils v0.0.0-20241104100929-3ea5e8cea738 diff --git a/go.sum b/go.sum index 45cb6a89..f6f8d7a8 100644 --- a/go.sum +++ b/go.sum @@ -146,10 +146,10 @@ gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -k8s.io/api v0.0.0-20250313213104-173e173fb2b8 h1:Z7fpBA1zEvgJJjIhketIRekQDQzUp67A3ou5MQZBBFk= -k8s.io/api v0.0.0-20250313213104-173e173fb2b8/go.mod h1:nyT7xKOld0k8IraPSyWNF96YIMppoRBYxtAUIehRQW0= -k8s.io/apimachinery v0.0.0-20250313012745-a04ff375cef8 h1:aZ3lyC/cbDqhg3/RCSobSABNBeUzBWh6LrHeSnIVzVY= -k8s.io/apimachinery v0.0.0-20250313012745-a04ff375cef8/go.mod h1:S2OIkExGqJOXYSYcAJwQ9zWcc6BkBUdTJUu4M7z0cvo= +k8s.io/api v0.0.0-20250313213105-aae61a387040 h1:t4Fsq5t8BkN81dKsvJL0PobJ7UcEONapi8OEL7GKxdo= +k8s.io/api v0.0.0-20250313213105-aae61a387040/go.mod h1:nyT7xKOld0k8IraPSyWNF96YIMppoRBYxtAUIehRQW0= +k8s.io/apimachinery v0.0.0-20250314052748-eaf4038701d0 h1:JT6F6sHvZMnT4KRpUaBsuOg5b7AQzK7qwv1stub2/Q4= +k8s.io/apimachinery v0.0.0-20250314052748-eaf4038701d0/go.mod h1:S2OIkExGqJOXYSYcAJwQ9zWcc6BkBUdTJUu4M7z0cvo= k8s.io/klog/v2 v2.130.1 h1:n9Xl7H1Xvksem4KFG4PYbdQCQxqc/tTUyrgXaOhHSzk= k8s.io/klog/v2 v2.130.1/go.mod h1:3Jpz1GvMt720eyJH1ckRHK1EDfpxISzJ7I9OYgaDtPE= k8s.io/kube-openapi v0.0.0-20250304201544-e5f78fe3ede9 h1:t0huyHnz6HsokckRxAF1bY0cqPFwzINKCL7yltEjZQc= diff --git a/util/workqueue/delaying_queue.go b/util/workqueue/delaying_queue.go index e33a6c69..da444f4f 100644 --- a/util/workqueue/delaying_queue.go +++ b/util/workqueue/delaying_queue.go @@ -22,6 +22,7 @@ import ( "time" utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/klog/v2" "k8s.io/utils/clock" ) @@ -46,6 +47,10 @@ type DelayingQueueConfig = TypedDelayingQueueConfig[any] // TypedDelayingQueueConfig specifies optional configurations to customize a DelayingInterface. type TypedDelayingQueueConfig[T comparable] struct { + // An optional logger. The name of the queue does *not* get added to it, this should + // be done by the caller if desired. + Logger *klog.Logger + // Name for the queue. If unnamed, the metrics will not be registered. Name string @@ -94,6 +99,10 @@ func TypedNewDelayingQueue[T comparable]() TypedDelayingInterface[T] { // NewTypedDelayingQueueWithConfig constructs a new workqueue with options to // customize different properties. func NewTypedDelayingQueueWithConfig[T comparable](config TypedDelayingQueueConfig[T]) TypedDelayingInterface[T] { + logger := klog.Background() + if config.Logger != nil { + logger = *config.Logger + } if config.Clock == nil { config.Clock = clock.RealClock{} } @@ -106,7 +115,7 @@ func NewTypedDelayingQueueWithConfig[T comparable](config TypedDelayingQueueConf }) } - return newDelayingQueue(config.Clock, config.Queue, config.Name, config.MetricsProvider) + return newDelayingQueue(logger, config.Clock, config.Queue, config.Name, config.MetricsProvider) } // NewDelayingQueueWithCustomQueue constructs a new workqueue with ability to @@ -135,7 +144,7 @@ func NewDelayingQueueWithCustomClock(clock clock.WithTicker, name string) Delayi }) } -func newDelayingQueue[T comparable](clock clock.WithTicker, q TypedInterface[T], name string, provider MetricsProvider) *delayingType[T] { +func newDelayingQueue[T comparable](logger klog.Logger, clock clock.WithTicker, q TypedInterface[T], name string, provider MetricsProvider) *delayingType[T] { ret := &delayingType[T]{ TypedInterface: q, clock: clock, @@ -145,7 +154,7 @@ func newDelayingQueue[T comparable](clock clock.WithTicker, q TypedInterface[T], metrics: newRetryMetrics(name, provider), } - go ret.waitingLoop() + go ret.waitingLoop(logger) return ret } @@ -264,8 +273,8 @@ func (q *delayingType[T]) AddAfter(item T, duration time.Duration) { const maxWait = 10 * time.Second // waitingLoop runs until the workqueue is shutdown and keeps a check on the list of items to be added. -func (q *delayingType[T]) waitingLoop() { - defer utilruntime.HandleCrash() +func (q *delayingType[T]) waitingLoop(logger klog.Logger) { + defer utilruntime.HandleCrashWithLogger(logger) // Make a placeholder channel to use when there are no items in our list never := make(<-chan time.Time) diff --git a/util/workqueue/parallelizer.go b/util/workqueue/parallelizer.go index 366bf20a..9f986a25 100644 --- a/util/workqueue/parallelizer.go +++ b/util/workqueue/parallelizer.go @@ -74,7 +74,7 @@ func ParallelizeUntil(ctx context.Context, workers, pieces int, doWorkPiece DoWo wg.Add(workers) for i := 0; i < workers; i++ { go func() { - defer utilruntime.HandleCrash() + defer utilruntime.HandleCrashWithContext(ctx) defer wg.Done() for chunk := range toProcess { start := chunk * chunkSize