mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-25 04:33:26 +00:00
DelayingQueue.ShutDown() should be reentrant
All queue ShutDown() calls should be able to be invoked multiple times. ``` Observed a panic: "close of closed channel" (close of closed channel) /go/src/github.com/openshift/cluster-version-operator/vendor/k8s.io/apimachinery/pkg/util/runtime/runtime.go:76 /go/src/github.com/openshift/cluster-version-operator/vendor/k8s.io/apimachinery/pkg/util/runtime/runtime.go:65 /go/src/github.com/openshift/cluster-version-operator/vendor/k8s.io/apimachinery/pkg/util/runtime/runtime.go:51 /usr/local/go/src/runtime/asm_amd64.s:573 /usr/local/go/src/runtime/panic.go:502 /usr/local/go/src/runtime/chan.go:333 /go/src/github.com/openshift/cluster-version-operator/vendor/k8s.io/client-go/util/workqueue/delaying_queue.go:137 ``` Use sync.Once to guarantee a single close.
This commit is contained in:
parent
c88b7cdd58
commit
d2f7eb5235
@ -18,6 +18,7 @@ package workqueue
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"container/heap"
|
"container/heap"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"k8s.io/apimachinery/pkg/util/clock"
|
"k8s.io/apimachinery/pkg/util/clock"
|
||||||
@ -66,6 +67,8 @@ type delayingType struct {
|
|||||||
|
|
||||||
// stopCh lets us signal a shutdown to the waiting loop
|
// stopCh lets us signal a shutdown to the waiting loop
|
||||||
stopCh chan struct{}
|
stopCh chan struct{}
|
||||||
|
// stopOnce guarantees we only signal shutdown a single time
|
||||||
|
stopOnce sync.Once
|
||||||
|
|
||||||
// heartbeat ensures we wait no more than maxWait before firing
|
// heartbeat ensures we wait no more than maxWait before firing
|
||||||
heartbeat clock.Ticker
|
heartbeat clock.Ticker
|
||||||
@ -133,11 +136,14 @@ func (pq waitForPriorityQueue) Peek() interface{} {
|
|||||||
return pq[0]
|
return pq[0]
|
||||||
}
|
}
|
||||||
|
|
||||||
// ShutDown gives a way to shut off this queue
|
// ShutDown stops the queue. After the queue drains, the returned shutdown bool
|
||||||
|
// on Get() will be true. This method may be invoked more than once.
|
||||||
func (q *delayingType) ShutDown() {
|
func (q *delayingType) ShutDown() {
|
||||||
q.Interface.ShutDown()
|
q.stopOnce.Do(func() {
|
||||||
close(q.stopCh)
|
q.Interface.ShutDown()
|
||||||
q.heartbeat.Stop()
|
close(q.stopCh)
|
||||||
|
q.heartbeat.Stop()
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// AddAfter adds the given item to the work queue after the given delay
|
// AddAfter adds the given item to the work queue after the given delay
|
||||||
|
Loading…
Reference in New Issue
Block a user