From bdead75b4b5d9feedc3a106c6ff56575fecc2ae1 Mon Sep 17 00:00:00 2001 From: Clayton Coleman Date: Sat, 27 Apr 2019 16:16:55 -0400 Subject: [PATCH] DelayingQueue.ShutDown() should be reentrant All queue ShutDown() calls should be able to be invoked multiple times. ``` Observed a panic: "close of closed channel" (close of closed channel) /go/src/github.com/openshift/cluster-version-operator/vendor/k8s.io/apimachinery/pkg/util/runtime/runtime.go:76 /go/src/github.com/openshift/cluster-version-operator/vendor/k8s.io/apimachinery/pkg/util/runtime/runtime.go:65 /go/src/github.com/openshift/cluster-version-operator/vendor/k8s.io/apimachinery/pkg/util/runtime/runtime.go:51 /usr/local/go/src/runtime/asm_amd64.s:573 /usr/local/go/src/runtime/panic.go:502 /usr/local/go/src/runtime/chan.go:333 /go/src/github.com/openshift/cluster-version-operator/vendor/k8s.io/client-go/util/workqueue/delaying_queue.go:137 ``` Use sync.Once to guarantee a single close. Kubernetes-commit: d2f7eb5235a93556261c8947e7a87342aeeaee2b --- util/workqueue/delaying_queue.go | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/util/workqueue/delaying_queue.go b/util/workqueue/delaying_queue.go index bd654bf3..6c9e9447 100644 --- a/util/workqueue/delaying_queue.go +++ b/util/workqueue/delaying_queue.go @@ -18,6 +18,7 @@ package workqueue import ( "container/heap" + "sync" "time" "k8s.io/apimachinery/pkg/util/clock" @@ -66,6 +67,8 @@ type delayingType struct { // stopCh lets us signal a shutdown to the waiting loop stopCh chan struct{} + // stopOnce guarantees we only signal shutdown a single time + stopOnce sync.Once // heartbeat ensures we wait no more than maxWait before firing heartbeat clock.Ticker @@ -133,11 +136,14 @@ func (pq waitForPriorityQueue) Peek() interface{} { return pq[0] } -// ShutDown gives a way to shut off this queue +// ShutDown stops the queue. After the queue drains, the returned shutdown bool +// on Get() will be true. This method may be invoked more than once. func (q *delayingType) ShutDown() { - q.Interface.ShutDown() - close(q.stopCh) - q.heartbeat.Stop() + q.stopOnce.Do(func() { + q.Interface.ShutDown() + close(q.stopCh) + q.heartbeat.Stop() + }) } // AddAfter adds the given item to the work queue after the given delay