mirror of
https://github.com/kubernetes/client-go.git
synced 2025-07-03 10:16:40 +00:00
Merge pull request #77170 from smarterclayton/delay_queue_reentrant
DelayingQueue.ShutDown() should be reentrant Kubernetes-commit: 4891eaa3adbfafb61d2bd264d2f0daef124ee8b9
This commit is contained in:
commit
2830425c91
@ -18,6 +18,7 @@ package workqueue
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"container/heap"
|
"container/heap"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"k8s.io/apimachinery/pkg/util/clock"
|
"k8s.io/apimachinery/pkg/util/clock"
|
||||||
@ -66,6 +67,8 @@ type delayingType struct {
|
|||||||
|
|
||||||
// stopCh lets us signal a shutdown to the waiting loop
|
// stopCh lets us signal a shutdown to the waiting loop
|
||||||
stopCh chan struct{}
|
stopCh chan struct{}
|
||||||
|
// stopOnce guarantees we only signal shutdown a single time
|
||||||
|
stopOnce sync.Once
|
||||||
|
|
||||||
// heartbeat ensures we wait no more than maxWait before firing
|
// heartbeat ensures we wait no more than maxWait before firing
|
||||||
heartbeat clock.Ticker
|
heartbeat clock.Ticker
|
||||||
@ -133,11 +136,14 @@ func (pq waitForPriorityQueue) Peek() interface{} {
|
|||||||
return pq[0]
|
return pq[0]
|
||||||
}
|
}
|
||||||
|
|
||||||
// ShutDown gives a way to shut off this queue
|
// ShutDown stops the queue. After the queue drains, the returned shutdown bool
|
||||||
|
// on Get() will be true. This method may be invoked more than once.
|
||||||
func (q *delayingType) ShutDown() {
|
func (q *delayingType) ShutDown() {
|
||||||
|
q.stopOnce.Do(func() {
|
||||||
q.Interface.ShutDown()
|
q.Interface.ShutDown()
|
||||||
close(q.stopCh)
|
close(q.stopCh)
|
||||||
q.heartbeat.Stop()
|
q.heartbeat.Stop()
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// AddAfter adds the given item to the work queue after the given delay
|
// AddAfter adds the given item to the work queue after the given delay
|
||||||
|
Loading…
Reference in New Issue
Block a user