diff --git a/util/workqueue/delaying_queue.go b/util/workqueue/delaying_queue.go index bd654bf3..6c9e9447 100644 --- a/util/workqueue/delaying_queue.go +++ b/util/workqueue/delaying_queue.go @@ -18,6 +18,7 @@ package workqueue import ( "container/heap" + "sync" "time" "k8s.io/apimachinery/pkg/util/clock" @@ -66,6 +67,8 @@ type delayingType struct { // stopCh lets us signal a shutdown to the waiting loop stopCh chan struct{} + // stopOnce guarantees we only signal shutdown a single time + stopOnce sync.Once // heartbeat ensures we wait no more than maxWait before firing heartbeat clock.Ticker @@ -133,11 +136,14 @@ func (pq waitForPriorityQueue) Peek() interface{} { return pq[0] } -// ShutDown gives a way to shut off this queue +// ShutDown stops the queue. After the queue drains, the returned shutdown bool +// on Get() will be true. This method may be invoked more than once. func (q *delayingType) ShutDown() { - q.Interface.ShutDown() - close(q.stopCh) - q.heartbeat.Stop() + q.stopOnce.Do(func() { + q.Interface.ShutDown() + close(q.stopCh) + q.heartbeat.Stop() + }) } // AddAfter adds the given item to the work queue after the given delay