diff --git a/go.mod b/go.mod index 8c84da34..0f8e3b30 100644 --- a/go.mod +++ b/go.mod @@ -24,7 +24,7 @@ require ( golang.org/x/time v0.3.0 google.golang.org/protobuf v1.30.0 k8s.io/api v0.0.0-20230810042731-2f6eec10c476 - k8s.io/apimachinery v0.0.0-20230815235016-14436eb53afd + k8s.io/apimachinery v0.0.0-20230815235018-4c378f8a8a61 k8s.io/klog/v2 v2.100.1 k8s.io/kube-openapi v0.0.0-20230717233707-2695361300d9 k8s.io/utils v0.0.0-20230406110748-d93618cff8a2 @@ -61,5 +61,5 @@ require ( replace ( k8s.io/api => k8s.io/api v0.0.0-20230810042731-2f6eec10c476 - k8s.io/apimachinery => k8s.io/apimachinery v0.0.0-20230815235016-14436eb53afd + k8s.io/apimachinery => k8s.io/apimachinery v0.0.0-20230815235018-4c378f8a8a61 ) diff --git a/go.sum b/go.sum index 59b55229..5993c90b 100644 --- a/go.sum +++ b/go.sum @@ -148,8 +148,8 @@ gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= k8s.io/api v0.0.0-20230810042731-2f6eec10c476 h1:1LpPoqkYurARQ/TQ0U3DRAclyCkM7hMCMCUVymCR3jM= k8s.io/api v0.0.0-20230810042731-2f6eec10c476/go.mod h1:RFi7MZgMNqcWc0azfutkpPR/OdHWZjnTAwdFHTKjAUQ= -k8s.io/apimachinery v0.0.0-20230815235016-14436eb53afd h1:x//MctFnLnU7WIEUEorfJtI5RkFptZADuNKPFxZBdbg= -k8s.io/apimachinery v0.0.0-20230815235016-14436eb53afd/go.mod h1:X0xh/chESs2hP9koe+SdIAcXWcQ+RM5hy0ZynB+yEvw= +k8s.io/apimachinery v0.0.0-20230815235018-4c378f8a8a61 h1:PQAKasgkOolbHn5mI2MIMaytDF453zUaMrnG6nrqwy4= +k8s.io/apimachinery v0.0.0-20230815235018-4c378f8a8a61/go.mod h1:X0xh/chESs2hP9koe+SdIAcXWcQ+RM5hy0ZynB+yEvw= k8s.io/klog/v2 v2.100.1 h1:7WCHKK6K8fNhTqfBhISHQ97KrnJNFZMcQvKp7gP/tmg= k8s.io/klog/v2 v2.100.1/go.mod h1:y1WjHnz7Dj687irZUWR/WLkLc5N1YHtjLdmgWjndZn0= k8s.io/kube-openapi v0.0.0-20230717233707-2695361300d9 h1:LyMgNKD2P8Wn1iAwQU5OhxCKlKJy0sHc+PcDwFB24dQ= diff --git a/util/workqueue/queue.go b/util/workqueue/queue.go index 380c0645..a363d1af 100644 --- a/util/workqueue/queue.go +++ b/util/workqueue/queue.go @@ -238,8 +238,12 @@ func (q *Type) Done(item interface{}) { // ShutDown will cause q to ignore all new items added to it and // immediately instruct the worker goroutines to exit. func (q *Type) ShutDown() { - q.setDrain(false) - q.shutdown() + q.cond.L.Lock() + defer q.cond.L.Unlock() + + q.drain = false + q.shuttingDown = true + q.cond.Broadcast() } // ShutDownWithDrain will cause q to ignore all new items added to it. As soon @@ -252,53 +256,16 @@ func (q *Type) ShutDown() { // ShutDownWithDrain, as to force the queue shut down to terminate immediately // without waiting for the drainage. func (q *Type) ShutDownWithDrain() { - q.setDrain(true) - q.shutdown() - for q.isProcessing() && q.shouldDrain() { - q.waitForProcessing() - } -} - -// isProcessing indicates if there are still items on the work queue being -// processed. It's used to drain the work queue on an eventual shutdown. -func (q *Type) isProcessing() bool { q.cond.L.Lock() defer q.cond.L.Unlock() - return q.processing.len() != 0 -} -// waitForProcessing waits for the worker goroutines to finish processing items -// and call Done on them. -func (q *Type) waitForProcessing() { - q.cond.L.Lock() - defer q.cond.L.Unlock() - // Ensure that we do not wait on a queue which is already empty, as that - // could result in waiting for Done to be called on items in an empty queue - // which has already been shut down, which will result in waiting - // indefinitely. - if q.processing.len() == 0 { - return - } - q.cond.Wait() -} - -func (q *Type) setDrain(shouldDrain bool) { - q.cond.L.Lock() - defer q.cond.L.Unlock() - q.drain = shouldDrain -} - -func (q *Type) shouldDrain() bool { - q.cond.L.Lock() - defer q.cond.L.Unlock() - return q.drain -} - -func (q *Type) shutdown() { - q.cond.L.Lock() - defer q.cond.L.Unlock() + q.drain = true q.shuttingDown = true q.cond.Broadcast() + + for q.processing.len() != 0 && q.drain { + q.cond.Wait() + } } func (q *Type) ShuttingDown() bool {