From 1463bcbba2d8fb2ba24ae54901bc26d7b45b28a5 Mon Sep 17 00:00:00 2001 From: Jefftree Date: Mon, 3 Nov 2025 19:04:11 +0000 Subject: [PATCH] Fix queue to gracefully shutdown Kubernetes-commit: bdb2e37781aec3bfec477a7beddd9fa5ac4eab1b --- util/workqueue/queue.go | 32 ++++++++++++++++++++++++-------- 1 file changed, 24 insertions(+), 8 deletions(-) diff --git a/util/workqueue/queue.go b/util/workqueue/queue.go index 78b072dab..cb70e6cfc 100644 --- a/util/workqueue/queue.go +++ b/util/workqueue/queue.go @@ -169,12 +169,13 @@ func newQueue[T comparable](c clock.WithTicker, queue Queue[T], metrics queueMet cond: sync.NewCond(&sync.Mutex{}), metrics: metrics, unfinishedWorkUpdatePeriod: updatePeriod, + stopCh: make(chan struct{}), } // Don't start the goroutine for a type of noMetrics so we don't consume // resources unnecessarily if _, ok := metrics.(noMetrics[T]); !ok { - go t.updateUnfinishedWorkLoop() + t.wg.Go(t.updateUnfinishedWorkLoop) } return t @@ -210,6 +211,14 @@ type Typed[t comparable] struct { unfinishedWorkUpdatePeriod time.Duration clock clock.WithTicker + + // wg manages goroutines started by the queue to allow graceful shutdown + // ShutDown() will wait for goroutines to exit before returning. + wg sync.WaitGroup + + stopCh chan struct{} + // stopOnce guarantees we only signal shutdown a single time + stopOnce sync.Once } // Add marks item as needing processing. When the queue is shutdown new @@ -296,6 +305,11 @@ func (q *Typed[T]) Done(item T) { // goroutines will continue processing items in the queue until it is // empty and then receive the shutdown signal. func (q *Typed[T]) ShutDown() { + defer q.wg.Wait() + q.stopOnce.Do(func() { + defer close(q.stopCh) + }) + q.cond.L.Lock() defer q.cond.L.Unlock() @@ -311,6 +325,10 @@ func (q *Typed[T]) ShutDown() { // Workers must call Done on an item after processing it, otherwise // ShutDownWithDrain will block indefinitely. func (q *Typed[T]) ShutDownWithDrain() { + defer q.wg.Wait() + q.stopOnce.Do(func() { + defer close(q.stopCh) + }) q.cond.L.Lock() defer q.cond.L.Unlock() @@ -333,17 +351,15 @@ func (q *Typed[T]) ShuttingDown() bool { func (q *Typed[T]) updateUnfinishedWorkLoop() { t := q.clock.NewTicker(q.unfinishedWorkUpdatePeriod) defer t.Stop() - for range t.C() { - if !func() bool { + for { + select { + case <-t.C(): q.cond.L.Lock() - defer q.cond.L.Unlock() if !q.shuttingDown { q.metrics.updateUnfinishedWork() - return true } - return false - - }() { + q.cond.L.Unlock() + case <-q.stopCh: return } }