mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-05 18:24:07 +00:00
Merge pull request #119554 from davidmccormick/prevent-race-client-go-workqueue-shutdown
client-go workqueue: fix shutdown race / test flake
This commit is contained in:
commit
f234aa52cd
@ -238,8 +238,12 @@ func (q *Type) Done(item interface{}) {
|
|||||||
// ShutDown will cause q to ignore all new items added to it and
|
// ShutDown will cause q to ignore all new items added to it and
|
||||||
// immediately instruct the worker goroutines to exit.
|
// immediately instruct the worker goroutines to exit.
|
||||||
func (q *Type) ShutDown() {
|
func (q *Type) ShutDown() {
|
||||||
q.setDrain(false)
|
q.cond.L.Lock()
|
||||||
q.shutdown()
|
defer q.cond.L.Unlock()
|
||||||
|
|
||||||
|
q.drain = false
|
||||||
|
q.shuttingDown = true
|
||||||
|
q.cond.Broadcast()
|
||||||
}
|
}
|
||||||
|
|
||||||
// ShutDownWithDrain will cause q to ignore all new items added to it. As soon
|
// ShutDownWithDrain will cause q to ignore all new items added to it. As soon
|
||||||
@ -252,53 +256,16 @@ func (q *Type) ShutDown() {
|
|||||||
// ShutDownWithDrain, as to force the queue shut down to terminate immediately
|
// ShutDownWithDrain, as to force the queue shut down to terminate immediately
|
||||||
// without waiting for the drainage.
|
// without waiting for the drainage.
|
||||||
func (q *Type) ShutDownWithDrain() {
|
func (q *Type) ShutDownWithDrain() {
|
||||||
q.setDrain(true)
|
|
||||||
q.shutdown()
|
|
||||||
for q.isProcessing() && q.shouldDrain() {
|
|
||||||
q.waitForProcessing()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// isProcessing indicates if there are still items on the work queue being
|
|
||||||
// processed. It's used to drain the work queue on an eventual shutdown.
|
|
||||||
func (q *Type) isProcessing() bool {
|
|
||||||
q.cond.L.Lock()
|
q.cond.L.Lock()
|
||||||
defer q.cond.L.Unlock()
|
defer q.cond.L.Unlock()
|
||||||
return q.processing.len() != 0
|
|
||||||
}
|
|
||||||
|
|
||||||
// waitForProcessing waits for the worker goroutines to finish processing items
|
q.drain = true
|
||||||
// and call Done on them.
|
|
||||||
func (q *Type) waitForProcessing() {
|
|
||||||
q.cond.L.Lock()
|
|
||||||
defer q.cond.L.Unlock()
|
|
||||||
// Ensure that we do not wait on a queue which is already empty, as that
|
|
||||||
// could result in waiting for Done to be called on items in an empty queue
|
|
||||||
// which has already been shut down, which will result in waiting
|
|
||||||
// indefinitely.
|
|
||||||
if q.processing.len() == 0 {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
q.cond.Wait()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (q *Type) setDrain(shouldDrain bool) {
|
|
||||||
q.cond.L.Lock()
|
|
||||||
defer q.cond.L.Unlock()
|
|
||||||
q.drain = shouldDrain
|
|
||||||
}
|
|
||||||
|
|
||||||
func (q *Type) shouldDrain() bool {
|
|
||||||
q.cond.L.Lock()
|
|
||||||
defer q.cond.L.Unlock()
|
|
||||||
return q.drain
|
|
||||||
}
|
|
||||||
|
|
||||||
func (q *Type) shutdown() {
|
|
||||||
q.cond.L.Lock()
|
|
||||||
defer q.cond.L.Unlock()
|
|
||||||
q.shuttingDown = true
|
q.shuttingDown = true
|
||||||
q.cond.Broadcast()
|
q.cond.Broadcast()
|
||||||
|
|
||||||
|
for q.processing.len() != 0 && q.drain {
|
||||||
|
q.cond.Wait()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (q *Type) ShuttingDown() bool {
|
func (q *Type) ShuttingDown() bool {
|
||||||
|
Loading…
Reference in New Issue
Block a user