/* Copyright 2017 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package async import ( "sync" "testing" "time" ) // Track calls to the managed function. type receiver struct { lock sync.Mutex run bool } func (r *receiver) F() { r.lock.Lock() defer r.lock.Unlock() r.run = true } func (r *receiver) reset() bool { r.lock.Lock() defer r.lock.Unlock() was := r.run r.run = false return was } // A single change event in the fake timer. type timerUpdate struct { active bool next time.Duration // iff active == true } // Fake time. type fakeTimer struct { c chan time.Time lock sync.Mutex now time.Time timeout time.Time active bool updated chan timerUpdate } func newFakeTimer() *fakeTimer { ft := &fakeTimer{ now: time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC), c: make(chan time.Time), updated: make(chan timerUpdate), } return ft } func (ft *fakeTimer) C() <-chan time.Time { return ft.c } func (ft *fakeTimer) Reset(in time.Duration) bool { ft.lock.Lock() defer ft.lock.Unlock() was := ft.active ft.active = true ft.timeout = ft.now.Add(in) ft.updated <- timerUpdate{ active: true, next: in, } return was } func (ft *fakeTimer) Stop() bool { ft.lock.Lock() defer ft.lock.Unlock() was := ft.active ft.active = false ft.updated <- timerUpdate{ active: false, } return was } func (ft *fakeTimer) Now() time.Time { ft.lock.Lock() defer ft.lock.Unlock() return ft.now } func (ft *fakeTimer) Since(t time.Time) time.Duration { ft.lock.Lock() defer ft.lock.Unlock() return ft.now.Sub(t) } func (ft *fakeTimer) Sleep(d time.Duration) { // ft.advance grabs ft.lock ft.advance(d) } // advance the current time. func (ft *fakeTimer) advance(d time.Duration) { ft.lock.Lock() defer ft.lock.Unlock() ft.now = ft.now.Add(d) if ft.active && !ft.now.Before(ft.timeout) { ft.active = false ft.c <- ft.timeout } } // return the calling line number (for printing) // test the timer's state func checkTimer(name string, t *testing.T, upd timerUpdate, active bool, next time.Duration) { if upd.active != active { t.Fatalf("%s: expected timer active=%v", name, active) } if active && upd.next != next { t.Fatalf("%s: expected timer to be %v, got %v", name, next, upd.next) } } // test and reset the receiver's state func checkReceiver(name string, t *testing.T, receiver *receiver, expected bool) { triggered := receiver.reset() if expected && !triggered { t.Fatalf("%s: function should have been called", name) } else if !expected && triggered { t.Fatalf("%s: function should not have been called", name) } } // Durations embedded in test cases depend on these. var minInterval = 1 * time.Second var maxInterval = 10 * time.Second func waitForReset(name string, t *testing.T, timer *fakeTimer, obj *receiver, expectCall bool, expectNext time.Duration) { upd := <-timer.updated // wait for stop checkReceiver(name, t, obj, expectCall) checkReceiver(name, t, obj, false) // prove post-condition checkTimer(name, t, upd, false, 0) upd = <-timer.updated // wait for reset checkTimer(name, t, upd, true, expectNext) } func waitForRun(name string, t *testing.T, timer *fakeTimer, obj *receiver) { waitForReset(name, t, timer, obj, true, maxInterval) } func waitForDefer(name string, t *testing.T, timer *fakeTimer, obj *receiver, expectNext time.Duration) { waitForReset(name, t, timer, obj, false, expectNext) } func waitForNothing(name string, t *testing.T, timer *fakeTimer, obj *receiver) { select { case <-timer.c: t.Fatalf("%s: unexpected timer tick", name) case upd := <-timer.updated: t.Fatalf("%s: unexpected timer update %v", name, upd) default: } checkReceiver(name, t, obj, false) } func Test_BoundedFrequencyRunnerNoBurst(t *testing.T) { obj := &receiver{} timer := newFakeTimer() runner := construct("test-runner", obj.F, minInterval, maxInterval, 1, timer) stop := make(chan struct{}) var upd timerUpdate // Start. go runner.Loop(stop) upd = <-timer.updated // wait for initial time to be set to max checkTimer("init", t, upd, true, maxInterval) checkReceiver("init", t, obj, false) // Run once, immediately. // rel=0ms runner.Run() waitForRun("first run", t, timer, obj) // Run again, before minInterval expires. timer.advance(500 * time.Millisecond) // rel=500ms runner.Run() waitForDefer("too soon after first", t, timer, obj, 500*time.Millisecond) // Run again, before minInterval expires. timer.advance(499 * time.Millisecond) // rel=999ms runner.Run() waitForDefer("still too soon after first", t, timer, obj, 1*time.Millisecond) // Do the deferred run timer.advance(1 * time.Millisecond) // rel=1000ms waitForRun("second run", t, timer, obj) // Try again immediately runner.Run() waitForDefer("too soon after second", t, timer, obj, 1*time.Second) // Run again, before minInterval expires. timer.advance(1 * time.Millisecond) // rel=1ms runner.Run() waitForDefer("still too soon after second", t, timer, obj, 999*time.Millisecond) // Ensure that we don't run again early timer.advance(998 * time.Millisecond) // rel=999ms waitForNothing("premature", t, timer, obj) // Do the deferred run timer.advance(1 * time.Millisecond) // rel=1000ms waitForRun("third run", t, timer, obj) // Let minInterval pass, but there are no runs queued timer.advance(1 * time.Second) // rel=1000ms waitForNothing("minInterval", t, timer, obj) // Let maxInterval pass timer.advance(9 * time.Second) // rel=10000ms waitForRun("maxInterval", t, timer, obj) // Run again, before minInterval expires. timer.advance(1 * time.Millisecond) // rel=1ms runner.Run() waitForDefer("too soon after maxInterval run", t, timer, obj, 999*time.Millisecond) // Let minInterval pass timer.advance(999 * time.Millisecond) // rel=1000ms waitForRun("fourth run", t, timer, obj) // Clean up. stop <- struct{}{} } func Test_BoundedFrequencyRunnerBurst(t *testing.T) { obj := &receiver{} timer := newFakeTimer() runner := construct("test-runner", obj.F, minInterval, maxInterval, 2, timer) stop := make(chan struct{}) var upd timerUpdate // Start. go runner.Loop(stop) upd = <-timer.updated // wait for initial time to be set to max checkTimer("init", t, upd, true, maxInterval) checkReceiver("init", t, obj, false) // Run once, immediately. // abs=0ms, rel=0ms runner.Run() waitForRun("first run", t, timer, obj) // Run again, before minInterval expires, with burst. timer.advance(1 * time.Millisecond) // abs=1ms, rel=1ms runner.Run() waitForRun("second run", t, timer, obj) // Run again, before minInterval expires. timer.advance(498 * time.Millisecond) // abs=499ms, rel=498ms runner.Run() waitForDefer("too soon after second", t, timer, obj, 502*time.Millisecond) // Run again, before minInterval expires. timer.advance(1 * time.Millisecond) // abs=500ms, rel=499ms runner.Run() waitForDefer("too soon after second 2", t, timer, obj, 501*time.Millisecond) // Run again, before minInterval expires. timer.advance(1 * time.Millisecond) // abs=501ms, rel=500ms runner.Run() waitForDefer("too soon after second 3", t, timer, obj, 500*time.Millisecond) // Advance timer enough to replenish bursts, but not enough to be minInterval // after the last run timer.advance(499 * time.Millisecond) // abs=1000ms, rel=999ms waitForNothing("not minInterval", t, timer, obj) runner.Run() waitForRun("third run", t, timer, obj) // Run again, before minInterval expires. timer.advance(1 * time.Millisecond) // abs=1001ms, rel=1ms runner.Run() waitForDefer("too soon after third", t, timer, obj, 999*time.Millisecond) // Run again, before minInterval expires. timer.advance(998 * time.Millisecond) // abs=1999ms, rel=999ms runner.Run() waitForDefer("too soon after third 2", t, timer, obj, 1*time.Millisecond) // Advance and do the deferred run timer.advance(1 * time.Millisecond) // abs=2000ms, rel=1000ms waitForRun("fourth run", t, timer, obj) // Run again, once burst has fully replenished. timer.advance(2 * time.Second) // abs=4000ms, rel=2000ms runner.Run() waitForRun("fifth run", t, timer, obj) runner.Run() waitForRun("sixth run", t, timer, obj) runner.Run() waitForDefer("too soon after sixth", t, timer, obj, 1*time.Second) // Wait until minInterval after the last run timer.advance(1 * time.Second) // abs=5000ms, rel=1000ms waitForRun("seventh run", t, timer, obj) // Wait for maxInterval timer.advance(10 * time.Second) // abs=15000ms, rel=10000ms waitForRun("maxInterval", t, timer, obj) // Clean up. stop <- struct{}{} }