mirror of
https://github.com/kubernetes/client-go.git
synced 2025-08-01 23:40:27 +00:00
Merge pull request #119554 from davidmccormick/prevent-race-client-go-workqueue-shutdown
client-go workqueue: fix shutdown race / test flake Kubernetes-commit: f234aa52cdcd996d00cd14b94902b3a3752595ea
This commit is contained in:
commit
b97d8af093
4
go.mod
4
go.mod
@ -24,7 +24,7 @@ require (
|
|||||||
golang.org/x/time v0.3.0
|
golang.org/x/time v0.3.0
|
||||||
google.golang.org/protobuf v1.30.0
|
google.golang.org/protobuf v1.30.0
|
||||||
k8s.io/api v0.0.0-20230810042731-2f6eec10c476
|
k8s.io/api v0.0.0-20230810042731-2f6eec10c476
|
||||||
k8s.io/apimachinery v0.0.0-20230815235016-14436eb53afd
|
k8s.io/apimachinery v0.0.0-20230815235018-4c378f8a8a61
|
||||||
k8s.io/klog/v2 v2.100.1
|
k8s.io/klog/v2 v2.100.1
|
||||||
k8s.io/kube-openapi v0.0.0-20230717233707-2695361300d9
|
k8s.io/kube-openapi v0.0.0-20230717233707-2695361300d9
|
||||||
k8s.io/utils v0.0.0-20230406110748-d93618cff8a2
|
k8s.io/utils v0.0.0-20230406110748-d93618cff8a2
|
||||||
@ -61,5 +61,5 @@ require (
|
|||||||
|
|
||||||
replace (
|
replace (
|
||||||
k8s.io/api => k8s.io/api v0.0.0-20230810042731-2f6eec10c476
|
k8s.io/api => k8s.io/api v0.0.0-20230810042731-2f6eec10c476
|
||||||
k8s.io/apimachinery => k8s.io/apimachinery v0.0.0-20230815235016-14436eb53afd
|
k8s.io/apimachinery => k8s.io/apimachinery v0.0.0-20230815235018-4c378f8a8a61
|
||||||
)
|
)
|
||||||
|
4
go.sum
4
go.sum
@ -148,8 +148,8 @@ gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
|||||||
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||||
k8s.io/api v0.0.0-20230810042731-2f6eec10c476 h1:1LpPoqkYurARQ/TQ0U3DRAclyCkM7hMCMCUVymCR3jM=
|
k8s.io/api v0.0.0-20230810042731-2f6eec10c476 h1:1LpPoqkYurARQ/TQ0U3DRAclyCkM7hMCMCUVymCR3jM=
|
||||||
k8s.io/api v0.0.0-20230810042731-2f6eec10c476/go.mod h1:RFi7MZgMNqcWc0azfutkpPR/OdHWZjnTAwdFHTKjAUQ=
|
k8s.io/api v0.0.0-20230810042731-2f6eec10c476/go.mod h1:RFi7MZgMNqcWc0azfutkpPR/OdHWZjnTAwdFHTKjAUQ=
|
||||||
k8s.io/apimachinery v0.0.0-20230815235016-14436eb53afd h1:x//MctFnLnU7WIEUEorfJtI5RkFptZADuNKPFxZBdbg=
|
k8s.io/apimachinery v0.0.0-20230815235018-4c378f8a8a61 h1:PQAKasgkOolbHn5mI2MIMaytDF453zUaMrnG6nrqwy4=
|
||||||
k8s.io/apimachinery v0.0.0-20230815235016-14436eb53afd/go.mod h1:X0xh/chESs2hP9koe+SdIAcXWcQ+RM5hy0ZynB+yEvw=
|
k8s.io/apimachinery v0.0.0-20230815235018-4c378f8a8a61/go.mod h1:X0xh/chESs2hP9koe+SdIAcXWcQ+RM5hy0ZynB+yEvw=
|
||||||
k8s.io/klog/v2 v2.100.1 h1:7WCHKK6K8fNhTqfBhISHQ97KrnJNFZMcQvKp7gP/tmg=
|
k8s.io/klog/v2 v2.100.1 h1:7WCHKK6K8fNhTqfBhISHQ97KrnJNFZMcQvKp7gP/tmg=
|
||||||
k8s.io/klog/v2 v2.100.1/go.mod h1:y1WjHnz7Dj687irZUWR/WLkLc5N1YHtjLdmgWjndZn0=
|
k8s.io/klog/v2 v2.100.1/go.mod h1:y1WjHnz7Dj687irZUWR/WLkLc5N1YHtjLdmgWjndZn0=
|
||||||
k8s.io/kube-openapi v0.0.0-20230717233707-2695361300d9 h1:LyMgNKD2P8Wn1iAwQU5OhxCKlKJy0sHc+PcDwFB24dQ=
|
k8s.io/kube-openapi v0.0.0-20230717233707-2695361300d9 h1:LyMgNKD2P8Wn1iAwQU5OhxCKlKJy0sHc+PcDwFB24dQ=
|
||||||
|
@ -238,8 +238,12 @@ func (q *Type) Done(item interface{}) {
|
|||||||
// ShutDown will cause q to ignore all new items added to it and
|
// ShutDown will cause q to ignore all new items added to it and
|
||||||
// immediately instruct the worker goroutines to exit.
|
// immediately instruct the worker goroutines to exit.
|
||||||
func (q *Type) ShutDown() {
|
func (q *Type) ShutDown() {
|
||||||
q.setDrain(false)
|
q.cond.L.Lock()
|
||||||
q.shutdown()
|
defer q.cond.L.Unlock()
|
||||||
|
|
||||||
|
q.drain = false
|
||||||
|
q.shuttingDown = true
|
||||||
|
q.cond.Broadcast()
|
||||||
}
|
}
|
||||||
|
|
||||||
// ShutDownWithDrain will cause q to ignore all new items added to it. As soon
|
// ShutDownWithDrain will cause q to ignore all new items added to it. As soon
|
||||||
@ -252,53 +256,16 @@ func (q *Type) ShutDown() {
|
|||||||
// ShutDownWithDrain, as to force the queue shut down to terminate immediately
|
// ShutDownWithDrain, as to force the queue shut down to terminate immediately
|
||||||
// without waiting for the drainage.
|
// without waiting for the drainage.
|
||||||
func (q *Type) ShutDownWithDrain() {
|
func (q *Type) ShutDownWithDrain() {
|
||||||
q.setDrain(true)
|
|
||||||
q.shutdown()
|
|
||||||
for q.isProcessing() && q.shouldDrain() {
|
|
||||||
q.waitForProcessing()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// isProcessing indicates if there are still items on the work queue being
|
|
||||||
// processed. It's used to drain the work queue on an eventual shutdown.
|
|
||||||
func (q *Type) isProcessing() bool {
|
|
||||||
q.cond.L.Lock()
|
q.cond.L.Lock()
|
||||||
defer q.cond.L.Unlock()
|
defer q.cond.L.Unlock()
|
||||||
return q.processing.len() != 0
|
|
||||||
}
|
|
||||||
|
|
||||||
// waitForProcessing waits for the worker goroutines to finish processing items
|
q.drain = true
|
||||||
// and call Done on them.
|
|
||||||
func (q *Type) waitForProcessing() {
|
|
||||||
q.cond.L.Lock()
|
|
||||||
defer q.cond.L.Unlock()
|
|
||||||
// Ensure that we do not wait on a queue which is already empty, as that
|
|
||||||
// could result in waiting for Done to be called on items in an empty queue
|
|
||||||
// which has already been shut down, which will result in waiting
|
|
||||||
// indefinitely.
|
|
||||||
if q.processing.len() == 0 {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
q.cond.Wait()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (q *Type) setDrain(shouldDrain bool) {
|
|
||||||
q.cond.L.Lock()
|
|
||||||
defer q.cond.L.Unlock()
|
|
||||||
q.drain = shouldDrain
|
|
||||||
}
|
|
||||||
|
|
||||||
func (q *Type) shouldDrain() bool {
|
|
||||||
q.cond.L.Lock()
|
|
||||||
defer q.cond.L.Unlock()
|
|
||||||
return q.drain
|
|
||||||
}
|
|
||||||
|
|
||||||
func (q *Type) shutdown() {
|
|
||||||
q.cond.L.Lock()
|
|
||||||
defer q.cond.L.Unlock()
|
|
||||||
q.shuttingDown = true
|
q.shuttingDown = true
|
||||||
q.cond.Broadcast()
|
q.cond.Broadcast()
|
||||||
|
|
||||||
|
for q.processing.len() != 0 && q.drain {
|
||||||
|
q.cond.Wait()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (q *Type) ShuttingDown() bool {
|
func (q *Type) ShuttingDown() bool {
|
||||||
|
Loading…
Reference in New Issue
Block a user