From e9b0103a41fa3964de05455d5b1e8187afeda05d Mon Sep 17 00:00:00 2001 From: Dave McCormick Date: Mon, 24 Jul 2023 22:40:54 +0100 Subject: [PATCH] Fix a race where a call to Shutdown was happening after ShutDownWithDrain() had performed the check q.isProcessing() && q.shouldDrain() and before waitForProcessing() has reached its Wait(). This is because waitForProcessing() is only checking the length part of the condition instead of both the length and whether we still need to drain. It turned out that there were lots of unnecessary accessor functions being called locking and unlocking the cond which are not needed because Wait() automatically unlocks and locks the cond for us and best practice says we should run it in a for checking for the condition (so this is what I have done). Kubernetes-commit: 2b12df56b0be93bc2cac7c5a66342c0ffaa72311 --- util/workqueue/queue.go | 55 +++++++++-------------------------------- 1 file changed, 11 insertions(+), 44 deletions(-) diff --git a/util/workqueue/queue.go b/util/workqueue/queue.go index 380c0645..a363d1af 100644 --- a/util/workqueue/queue.go +++ b/util/workqueue/queue.go @@ -238,8 +238,12 @@ func (q *Type) Done(item interface{}) { // ShutDown will cause q to ignore all new items added to it and // immediately instruct the worker goroutines to exit. func (q *Type) ShutDown() { - q.setDrain(false) - q.shutdown() + q.cond.L.Lock() + defer q.cond.L.Unlock() + + q.drain = false + q.shuttingDown = true + q.cond.Broadcast() } // ShutDownWithDrain will cause q to ignore all new items added to it. As soon @@ -252,53 +256,16 @@ func (q *Type) ShutDown() { // ShutDownWithDrain, as to force the queue shut down to terminate immediately // without waiting for the drainage. func (q *Type) ShutDownWithDrain() { - q.setDrain(true) - q.shutdown() - for q.isProcessing() && q.shouldDrain() { - q.waitForProcessing() - } -} - -// isProcessing indicates if there are still items on the work queue being -// processed. It's used to drain the work queue on an eventual shutdown. -func (q *Type) isProcessing() bool { q.cond.L.Lock() defer q.cond.L.Unlock() - return q.processing.len() != 0 -} -// waitForProcessing waits for the worker goroutines to finish processing items -// and call Done on them. -func (q *Type) waitForProcessing() { - q.cond.L.Lock() - defer q.cond.L.Unlock() - // Ensure that we do not wait on a queue which is already empty, as that - // could result in waiting for Done to be called on items in an empty queue - // which has already been shut down, which will result in waiting - // indefinitely. - if q.processing.len() == 0 { - return - } - q.cond.Wait() -} - -func (q *Type) setDrain(shouldDrain bool) { - q.cond.L.Lock() - defer q.cond.L.Unlock() - q.drain = shouldDrain -} - -func (q *Type) shouldDrain() bool { - q.cond.L.Lock() - defer q.cond.L.Unlock() - return q.drain -} - -func (q *Type) shutdown() { - q.cond.L.Lock() - defer q.cond.L.Unlock() + q.drain = true q.shuttingDown = true q.cond.Broadcast() + + for q.processing.len() != 0 && q.drain { + q.cond.Wait() + } } func (q *Type) ShuttingDown() bool {