diff --git a/pkg/util/clock.go b/pkg/util/clock.go index ac2d738d628..474cbb68d11 100644 --- a/pkg/util/clock.go +++ b/pkg/util/clock.go @@ -28,6 +28,7 @@ type Clock interface { Since(time.Time) time.Duration After(d time.Duration) <-chan time.Time Sleep(d time.Duration) + Tick(d time.Duration) <-chan time.Time } var ( @@ -54,6 +55,10 @@ func (RealClock) After(d time.Duration) <-chan time.Time { return time.After(d) } +func (RealClock) Tick(d time.Duration) <-chan time.Time { + return time.Tick(d) +} + func (RealClock) Sleep(d time.Duration) { time.Sleep(d) } @@ -68,8 +73,10 @@ type FakeClock struct { } type fakeClockWaiter struct { - targetTime time.Time - destChan chan<- time.Time + targetTime time.Time + stepInterval time.Duration + skipIfBlocked bool + destChan chan<- time.Time } func NewFakeClock(t time.Time) *FakeClock { @@ -105,7 +112,22 @@ func (f *FakeClock) After(d time.Duration) <-chan time.Time { return ch } -// Move clock by Duration, notify anyone that's called After +func (f *FakeClock) Tick(d time.Duration) <-chan time.Time { + f.lock.Lock() + defer f.lock.Unlock() + tickTime := f.time.Add(d) + ch := make(chan time.Time, 1) // hold one tick + f.waiters = append(f.waiters, fakeClockWaiter{ + targetTime: tickTime, + stepInterval: d, + skipIfBlocked: true, + destChan: ch, + }) + + return ch +} + +// Move clock by Duration, notify anyone that's called After or Tick func (f *FakeClock) Step(d time.Duration) { f.lock.Lock() defer f.lock.Unlock() @@ -126,7 +148,23 @@ func (f *FakeClock) setTimeLocked(t time.Time) { for i := range f.waiters { w := &f.waiters[i] if !w.targetTime.After(t) { - w.destChan <- t + + if w.skipIfBlocked { + select { + case w.destChan <- t: + default: + } + } else { + w.destChan <- t + } + + if w.stepInterval > 0 { + for !w.targetTime.After(t) { + w.targetTime = w.targetTime.Add(w.stepInterval) + } + newWaiters = append(newWaiters, *w) + } + } else { newWaiters = append(newWaiters, f.waiters[i]) } @@ -169,6 +207,12 @@ func (*IntervalClock) After(d time.Duration) <-chan time.Time { panic("IntervalClock doesn't implement After") } +// Unimplemented, will panic. +// TODO: make interval clock use FakeClock so this can be implemented. +func (*IntervalClock) Tick(d time.Duration) <-chan time.Time { + panic("IntervalClock doesn't implement Tick") +} + func (*IntervalClock) Sleep(d time.Duration) { panic("IntervalClock doesn't implement Sleep") } diff --git a/pkg/util/clock_test.go b/pkg/util/clock_test.go index 8205541e8b1..ee60fcb0d23 100644 --- a/pkg/util/clock_test.go +++ b/pkg/util/clock_test.go @@ -104,3 +104,81 @@ func TestFakeAfter(t *testing.T) { t.Errorf("unexpected non-channel read") } } + +func TestFakeTick(t *testing.T) { + tc := NewFakeClock(time.Now()) + if tc.HasWaiters() { + t.Errorf("unexpected waiter?") + } + oneSec := tc.Tick(time.Second) + if !tc.HasWaiters() { + t.Errorf("unexpected lack of waiter?") + } + + oneOhOneSec := tc.Tick(time.Second + time.Millisecond) + twoSec := tc.Tick(2 * time.Second) + select { + case <-oneSec: + t.Errorf("unexpected channel read") + case <-oneOhOneSec: + t.Errorf("unexpected channel read") + case <-twoSec: + t.Errorf("unexpected channel read") + default: + } + + tc.Step(999 * time.Millisecond) // t=.999 + select { + case <-oneSec: + t.Errorf("unexpected channel read") + case <-oneOhOneSec: + t.Errorf("unexpected channel read") + case <-twoSec: + t.Errorf("unexpected channel read") + default: + } + + tc.Step(time.Millisecond) // t=1.000 + select { + case <-oneSec: + // Expected! + case <-oneOhOneSec: + t.Errorf("unexpected channel read") + case <-twoSec: + t.Errorf("unexpected channel read") + default: + t.Errorf("unexpected non-channel read") + } + tc.Step(time.Millisecond) // t=1.001 + select { + case <-oneSec: + // should not double-trigger! + t.Errorf("unexpected channel read") + case <-oneOhOneSec: + // Expected! + case <-twoSec: + t.Errorf("unexpected channel read") + default: + t.Errorf("unexpected non-channel read") + } + + tc.Step(time.Second) // t=2.001 + tc.Step(time.Second) // t=3.001 + tc.Step(time.Second) // t=4.001 + tc.Step(time.Second) // t=5.001 + + // The one second ticker should not accumulate ticks + accumulatedTicks := 0 + drained := false + for !drained { + select { + case <-oneSec: + accumulatedTicks++ + default: + drained = true + } + } + if accumulatedTicks != 1 { + t.Errorf("unexpected number of accumulated ticks: %d", accumulatedTicks) + } +} diff --git a/pkg/util/workqueue/delaying_queue.go b/pkg/util/workqueue/delaying_queue.go index 68098c5b895..429d15c4a1e 100644 --- a/pkg/util/workqueue/delaying_queue.go +++ b/pkg/util/workqueue/delaying_queue.go @@ -41,7 +41,7 @@ func newDelayingQueue(clock util.Clock) DelayingInterface { ret := &delayingType{ Interface: New(), clock: clock, - heartbeat: time.Tick(maxWait), + heartbeat: clock.Tick(maxWait), stopCh: make(chan struct{}), waitingForAddCh: make(chan waitFor, 1000), }