Stop() for Ticker to enable leak-free code

Kubernetes-commit: 1f393cdef96fe6e4ddcbf93825d65a9980463406
This commit is contained in:
Mikhail Mazurskiy 2018-03-31 19:41:43 +11:00 committed by Kubernetes Publisher
parent 9cabba1998
commit 9446f4ef9d
2 changed files with 5 additions and 7 deletions

View File

@ -45,7 +45,7 @@ func newDelayingQueue(clock clock.Clock, name string) DelayingInterface {
ret := &delayingType{
Interface: NewNamed(name),
clock: clock,
heartbeat: clock.Tick(maxWait),
heartbeat: clock.NewTicker(maxWait),
stopCh: make(chan struct{}),
waitingForAddCh: make(chan *waitFor, 1000),
metrics: newRetryMetrics(name),
@ -67,10 +67,7 @@ type delayingType struct {
stopCh chan struct{}
// heartbeat ensures we wait no more than maxWait before firing
//
// TODO: replace with Ticker (and add to clock) so this can be cleaned up.
// clock.Tick will leak.
heartbeat <-chan time.Time
heartbeat clock.Ticker
// waitingForAddCh is a buffered channel that feeds waitingForAdd
waitingForAddCh chan *waitFor
@ -138,6 +135,7 @@ func (pq waitForPriorityQueue) Peek() interface{} {
func (q *delayingType) ShutDown() {
q.Interface.ShutDown()
close(q.stopCh)
q.heartbeat.Stop()
}
// AddAfter adds the given item to the work queue after the given delay
@ -209,7 +207,7 @@ func (q *delayingType) waitingLoop() {
case <-q.stopCh:
return
case <-q.heartbeat:
case <-q.heartbeat.C():
// continue the loop, which will add ready items
case <-nextReadyAt:

View File

@ -30,7 +30,7 @@ func TestRateLimitingQueue(t *testing.T) {
delayingQueue := &delayingType{
Interface: New(),
clock: fakeClock,
heartbeat: fakeClock.Tick(maxWait),
heartbeat: fakeClock.NewTicker(maxWait),
stopCh: make(chan struct{}),
waitingForAddCh: make(chan *waitFor, 1000),
metrics: newRetryMetrics(""),