From 459188ce25cf4af2f6b9912e6167bc96a1e96b23 Mon Sep 17 00:00:00 2001 From: Antonio Ojea Date: Thu, 1 May 2025 23:28:37 +0000 Subject: [PATCH] Port BoundedFrequencyRunner from flowcontrol.RateLimiter to clock.Clock Co-authored-by: Dan Winship --- pkg/proxy/runner/bounded_frequency_runner.go | 207 ++----- .../runner/bounded_frequency_runner_test.go | 577 ++++++++++-------- 2 files changed, 365 insertions(+), 419 deletions(-) diff --git a/pkg/proxy/runner/bounded_frequency_runner.go b/pkg/proxy/runner/bounded_frequency_runner.go index 525fa8f3c6d..b282381488d 100644 --- a/pkg/proxy/runner/bounded_frequency_runner.go +++ b/pkg/proxy/runner/bounded_frequency_runner.go @@ -18,12 +18,11 @@ package runner import ( "fmt" - "sync" "time" - "k8s.io/client-go/util/flowcontrol" - + utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/klog/v2" + "k8s.io/utils/clock" ) // BoundedFrequencyRunner manages runs of a user-provided work function. @@ -36,96 +35,14 @@ type BoundedFrequencyRunner struct { run chan struct{} // try an async run - mu sync.Mutex // guards runs of fn and all mutations - fn func() error // the work function - lastRun time.Time // time of last run - timer timer // timer for deferred runs - limiter rateLimiter // rate limiter for on-demand runs + fn func() error // the work function + minIntervalTimer clock.Timer + nextRunTimer clock.Timer // Combined timer for maxInterval and retryInterval logic + clock clock.Clock } -// designed so that flowcontrol.RateLimiter satisfies -type rateLimiter interface { - TryAccept() bool - Stop() -} - -type nullLimiter struct{} - -func (nullLimiter) TryAccept() bool { - return true -} - -func (nullLimiter) Stop() {} - -var _ rateLimiter = nullLimiter{} - -// for testing -type timer interface { - // C returns the timer's selectable channel. - C() <-chan time.Time - - // See time.Timer.Reset. - Reset(d time.Duration) bool - - // See time.Timer.Stop. - Stop() bool - - // See time.Now. - Now() time.Time - - // Remaining returns the time until the timer will go off (if it is running). - Remaining() time.Duration - - // See time.Since. - Since(t time.Time) time.Duration - - // See time.Sleep. - Sleep(d time.Duration) -} - -// implement our timer in terms of std time.Timer. -type realTimer struct { - timer *time.Timer - next time.Time -} - -func (rt *realTimer) C() <-chan time.Time { - return rt.timer.C -} - -func (rt *realTimer) Reset(d time.Duration) bool { - rt.next = time.Now().Add(d) - return rt.timer.Reset(d) -} - -func (rt *realTimer) Stop() bool { - return rt.timer.Stop() -} - -func (rt *realTimer) Now() time.Time { - return time.Now() -} - -func (rt *realTimer) Remaining() time.Duration { - return rt.next.Sub(time.Now()) -} - -func (rt *realTimer) Since(t time.Time) time.Duration { - return time.Since(t) -} - -func (rt *realTimer) Sleep(d time.Duration) { - time.Sleep(d) -} - -var _ timer = &realTimer{} - // NewBoundedFrequencyRunner creates and returns a new BoundedFrequencyRunner. -// This runner manages the execution frequency of the provided work function `fn`. -// -// All runs will be async to the caller of BoundedFrequencyRunner.Run, but -// multiple runs are serialized. If the function needs to hold locks, it must -// take them internally. +// This runner manages the execution frequency of the provided function `fn`. // // The runner guarantees two properties: // 1. Minimum Interval (`minInterval`): At least `minInterval` must pass between @@ -143,13 +60,11 @@ var _ timer = &realTimer{} // (unless another trigger, like `Run()` or `maxInterval`, causes it to run sooner). Any // successful run will abort the retry attempt. func NewBoundedFrequencyRunner(name string, fn func() error, minInterval, retryInterval, maxInterval time.Duration) *BoundedFrequencyRunner { - timer := &realTimer{timer: time.NewTimer(0)} // will tick immediately - <-timer.C() // consume the first tick - return construct(name, fn, minInterval, retryInterval, maxInterval, timer) + return construct(name, fn, minInterval, retryInterval, maxInterval, clock.RealClock{}) } // Make an instance with dependencies injected. -func construct(name string, fn func() error, minInterval, retryInterval, maxInterval time.Duration, timer timer) *BoundedFrequencyRunner { +func construct(name string, fn func() error, minInterval, retryInterval, maxInterval time.Duration, clock clock.Clock) *BoundedFrequencyRunner { if maxInterval < minInterval { panic(fmt.Sprintf("%s: maxInterval (%v) must be >= minInterval (%v)", name, maxInterval, minInterval)) } @@ -163,14 +78,9 @@ func construct(name string, fn func() error, minInterval, retryInterval, maxInte maxInterval: maxInterval, run: make(chan struct{}, 1), - timer: timer, - } - if minInterval == 0 { - bfr.limiter = nullLimiter{} - } else { - qps := float32(time.Second) / float32(minInterval) - bfr.limiter = flowcontrol.NewTokenBucketRateLimiterWithClock(qps, 1, timer) + clock: clock, } + return bfr } @@ -178,17 +88,56 @@ func construct(name string, fn func() error, minInterval, retryInterval, maxInte // called as a goroutine. func (bfr *BoundedFrequencyRunner) Loop(stop <-chan struct{}) { klog.V(3).InfoS("Loop running", "runner", bfr.name) - bfr.timer.Reset(bfr.maxInterval) + defer close(bfr.run) + + bfr.minIntervalTimer = bfr.clock.NewTimer(bfr.minInterval) + defer bfr.minIntervalTimer.Stop() + + // Initialize nextRunTimer with maxInterval + bfr.nextRunTimer = bfr.clock.NewTimer(bfr.maxInterval) + defer bfr.nextRunTimer.Stop() + for { select { case <-stop: - bfr.stop() klog.V(3).InfoS("Loop stopping", "runner", bfr.name) return - case <-bfr.timer.C(): - bfr.tryRun() + case <-bfr.nextRunTimer.C(): // Wait on the single timer case <-bfr.run: - bfr.tryRun() + } + + // stop the timers here to allow the tests using the fake clock to synchronize + // with the fakeClock.HasWaiters() method. The timers are reset after the function + // is executed. + bfr.minIntervalTimer.Stop() + bfr.nextRunTimer.Stop() + + var err error + // avoid crashing if the function executed crashes + func() { + defer utilruntime.HandleCrash() + err = bfr.fn() + }() + + // Determine the next interval based on the result + nextInterval := bfr.maxInterval + if err != nil { + // If error, ensure next run is within retryInterval and maxInterval + if bfr.retryInterval < nextInterval { + nextInterval = bfr.retryInterval + } + klog.V(3).InfoS("scheduling retry", "runner", bfr.name, "interval", nextInterval, "error", err) + } + // Reset the timers + bfr.minIntervalTimer.Reset(bfr.minInterval) + bfr.nextRunTimer.Reset(nextInterval) + + // Wait for minInterval before looping + select { + case <-stop: + klog.V(3).InfoS("Loop stopping", "runner", bfr.name) + return + case <-bfr.minIntervalTimer.C(): } } } @@ -204,49 +153,3 @@ func (bfr *BoundedFrequencyRunner) Run() { default: } } - -// assumes the lock is not held -func (bfr *BoundedFrequencyRunner) stop() { - bfr.mu.Lock() - defer bfr.mu.Unlock() - bfr.limiter.Stop() - bfr.timer.Stop() -} - -// assumes the lock is not held -func (bfr *BoundedFrequencyRunner) tryRun() { - bfr.mu.Lock() - defer bfr.mu.Unlock() - - if bfr.limiter.TryAccept() { - // We're allowed to run the function right now. - err := bfr.fn() - - bfr.lastRun = bfr.timer.Now() - bfr.timer.Stop() - - nextInterval := bfr.maxInterval - if err != nil { - // an error will schedule a retry after the retryInterval, - // any successful run before that will stop the retry attempt. - nextInterval = bfr.retryInterval - klog.V(3).InfoS("scheduling retry", "runner", bfr.name, "interval", nextInterval, "error", err) - } - bfr.timer.Reset(nextInterval) - return - } - - // It can't run right now, figure out when it can run next. - elapsed := bfr.timer.Since(bfr.lastRun) // how long since last run - nextPossible := bfr.minInterval - elapsed // time to next possible run - nextScheduled := bfr.timer.Remaining() // time to next scheduled run - klog.V(4).InfoS("can't run", "runner", bfr.name, "elapsed", elapsed, "nextPossible", nextPossible, "nextScheduled", nextScheduled) - - // It's hard to avoid race conditions in the unit tests unless we always reset - // the timer here, even when it's unchanged - if nextPossible < nextScheduled { - nextScheduled = nextPossible - } - bfr.timer.Stop() - bfr.timer.Reset(nextScheduled) -} diff --git a/pkg/proxy/runner/bounded_frequency_runner_test.go b/pkg/proxy/runner/bounded_frequency_runner_test.go index 5c7ea48d4f0..b8c7953c4ac 100644 --- a/pkg/proxy/runner/bounded_frequency_runner_test.go +++ b/pkg/proxy/runner/bounded_frequency_runner_test.go @@ -19,354 +19,397 @@ package runner import ( "fmt" "sync" + "sync/atomic" "testing" "time" + + clock "k8s.io/utils/clock/testing" ) // Track calls to the managed function. type receiver struct { - lock sync.Mutex - run bool - retry bool + counter atomic.Int32 + // counterCh signals completion of F() and sends the new count. + // It's unbuffered to make the send in F() blocking. + counterCh chan int + resultMu sync.RWMutex + result error } func (r *receiver) F() error { - r.lock.Lock() - defer r.lock.Unlock() - r.run = true - - if r.retry { - r.retry = false - return fmt.Errorf("retry") - } - return nil + newCount := r.counter.Add(1) + // Blocking send: F() will wait here until the test reads from counterCh. + r.counterCh <- int(newCount) + r.resultMu.RLock() + defer r.resultMu.RUnlock() + return r.result } -func (r *receiver) reset() bool { - r.lock.Lock() - defer r.lock.Unlock() - was := r.run - r.run = false - return was -} - -func (r *receiver) setRetry(retry bool) { - r.lock.Lock() - defer r.lock.Unlock() - r.retry = retry -} - -// A single change event in the fake timer. -type timerUpdate struct { - active bool - next time.Duration // iff active == true -} - -// Fake time. -type fakeTimer struct { - c chan time.Time - - lock sync.Mutex - now time.Time - timeout time.Time - active bool - - updated chan timerUpdate -} - -func newFakeTimer() *fakeTimer { - ft := &fakeTimer{ - now: time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC), - c: make(chan time.Time), - updated: make(chan timerUpdate), - } - return ft -} - -func (ft *fakeTimer) C() <-chan time.Time { - return ft.c -} - -func (ft *fakeTimer) Reset(in time.Duration) bool { - ft.lock.Lock() - defer ft.lock.Unlock() - - was := ft.active - ft.active = true - ft.timeout = ft.now.Add(in) - ft.updated <- timerUpdate{ - active: true, - next: in, - } - return was -} - -func (ft *fakeTimer) Stop() bool { - ft.lock.Lock() - defer ft.lock.Unlock() - - was := ft.active - ft.active = false - ft.updated <- timerUpdate{ - active: false, - } - return was -} - -func (ft *fakeTimer) Now() time.Time { - ft.lock.Lock() - defer ft.lock.Unlock() - - return ft.now -} - -func (ft *fakeTimer) Remaining() time.Duration { - ft.lock.Lock() - defer ft.lock.Unlock() - - return ft.timeout.Sub(ft.now) -} - -func (ft *fakeTimer) Since(t time.Time) time.Duration { - ft.lock.Lock() - defer ft.lock.Unlock() - - return ft.now.Sub(t) -} - -func (ft *fakeTimer) Sleep(d time.Duration) { - // ft.advance grabs ft.lock - ft.advance(d) -} - -// advance the current time. -func (ft *fakeTimer) advance(d time.Duration) { - ft.lock.Lock() - defer ft.lock.Unlock() - - ft.now = ft.now.Add(d) - if ft.active && !ft.now.Before(ft.timeout) { - ft.active = false - ft.c <- ft.timeout +func newReceiver() *receiver { + return &receiver{ + counterCh: make(chan int), } } -// return the calling line number (for printing) -// test the timer's state -func checkTimer(name string, t *testing.T, upd timerUpdate, active bool, next time.Duration) { - t.Helper() - if upd.active != active { - t.Fatalf("%s: expected timer active=%v", name, active) - } - if active && upd.next != next { - t.Fatalf("%s: expected timer to be %v, got %v", name, next, upd.next) - } +func (r *receiver) calls() <-chan int { + return r.counterCh } -// test and reset the receiver's state -func checkReceiver(name string, t *testing.T, receiver *receiver, expected bool) { - t.Helper() - triggered := receiver.reset() - if expected && !triggered { - t.Fatalf("%s: function should have been called", name) - } else if !expected && triggered { - t.Fatalf("%s: function should not have been called", name) - } +func (r *receiver) setReturnValue(err error) { + r.resultMu.Lock() + defer r.resultMu.Unlock() + r.result = err } -// Durations embedded in test cases depend on these. -var minInterval = 1 * time.Second -var retryInterval = 5 * time.Second -var maxInterval = 10 * time.Second - -func waitForReset(name string, t *testing.T, timer *fakeTimer, obj *receiver, expectCall bool, expectNext time.Duration) { - t.Helper() - upd := <-timer.updated // wait for stop - checkReceiver(name, t, obj, expectCall) - checkReceiver(name, t, obj, false) // prove post-condition - checkTimer(name, t, upd, false, 0) - upd = <-timer.updated // wait for reset - checkTimer(name, t, upd, true, expectNext) -} - -func waitForRun(name string, t *testing.T, timer *fakeTimer, obj *receiver) { - t.Helper() - waitForReset(name, t, timer, obj, true, maxInterval) -} - -func waitForRunWithRetry(name string, t *testing.T, timer *fakeTimer, obj *receiver, expectNext time.Duration) { - t.Helper() - waitForReset(name, t, timer, obj, true, expectNext) -} - -func waitForDefer(name string, t *testing.T, timer *fakeTimer, obj *receiver, expectNext time.Duration) { - t.Helper() - waitForReset(name, t, timer, obj, false, expectNext) -} - -func waitForNothing(name string, t *testing.T, timer *fakeTimer, obj *receiver) { +// assertCalls waits for the receiver's function to be called and asserts that +// the total call count matches expectedCalls. It fails the test if the timeout is reached +// or if the call count doesn't match. +func assertCalls(t *testing.T, r *receiver, expectedCalls int) { t.Helper() select { - case <-timer.c: - t.Fatalf("%s: unexpected timer tick", name) - case upd := <-timer.updated: - t.Fatalf("%s: unexpected timer update %v", name, upd) - default: + case calls := <-r.calls(): + if calls != expectedCalls { + t.Fatalf("expected %d calls, but got %d", expectedCalls, calls) + } + case <-time.After(1 * time.Second): + t.Fatalf("timed out waiting for function execution (expected %d calls, got %d)", expectedCalls, r.counter.Load()) + } +} + +// assertNoCalls waits for 100 millisecond and asserts that the receiver's +// function was *not* called during that time. It fails the test if a call is detected. +func assertNoCalls(t *testing.T, r *receiver) { + t.Helper() + select { + case calls := <-r.calls(): + t.Fatalf("unexpected function execution detected (call count: %d)", calls) + case <-time.After(100 * time.Millisecond): } - checkReceiver(name, t, obj, false) } func Test_BoundedFrequencyRunner(t *testing.T) { - obj := &receiver{} - timer := newFakeTimer() - runner := construct("test-runner", obj.F, minInterval, retryInterval, maxInterval, timer) + var minInterval = 1 * time.Second + var retryInterval = 5 * time.Second + var maxInterval = 10 * time.Second + obj := newReceiver() + fakeClock := clock.NewFakeClock(time.Now()) + runner := construct("test-runner", obj.F, minInterval, retryInterval, maxInterval, fakeClock) stop := make(chan struct{}) + defer close(stop) - var upd timerUpdate - - // Start. go runner.Loop(stop) - upd = <-timer.updated // wait for initial time to be set to max - checkTimer("init", t, upd, true, maxInterval) - checkReceiver("init", t, obj, false) // Run once, immediately. // rel=0ms runner.Run() - waitForRun("first run", t, timer, obj) - - // Run again, before minInterval expires. - timer.advance(500 * time.Millisecond) // rel=500ms + assertCalls(t, obj, 1) + // wait for the timers to be reset + for fakeClock.Waiters() != 2 { + time.Sleep(1 * time.Millisecond) + } + // Run again, before minInterval expires. No execution expected. + fakeClock.Step(500 * time.Millisecond) // rel=500ms runner.Run() - waitForDefer("too soon after first", t, timer, obj, 500*time.Millisecond) + assertNoCalls(t, obj) - // Run again, before minInterval expires. - timer.advance(499 * time.Millisecond) // rel=999ms + // Run again, before minInterval expires. No execution expected. + fakeClock.Step(499 * time.Millisecond) // rel=999ms runner.Run() - waitForDefer("still too soon after first", t, timer, obj, 1*time.Millisecond) + assertNoCalls(t, obj) // Do the deferred run - timer.advance(1 * time.Millisecond) // rel=1000ms - waitForRun("second run", t, timer, obj) + fakeClock.Step(1 * time.Millisecond) // rel=1000ms + assertCalls(t, obj, 2) - // Try again immediately runner.Run() - waitForDefer("too soon after second", t, timer, obj, 1*time.Second) + assertNoCalls(t, obj) - // Run again, before minInterval expires. - timer.advance(1 * time.Millisecond) // rel=1ms + // Run again, before minInterval expires. No execution expected. + fakeClock.Step(1 * time.Millisecond) // rel=1ms runner.Run() - waitForDefer("still too soon after second", t, timer, obj, 999*time.Millisecond) + assertNoCalls(t, obj) - // Ensure that we don't run again early - timer.advance(998 * time.Millisecond) // rel=999ms - waitForNothing("premature", t, timer, obj) + // Ensure that we don't run again early. No execution expected. + fakeClock.Step(998 * time.Millisecond) // rel=999ms + assertNoCalls(t, obj) // Do the deferred run - timer.advance(1 * time.Millisecond) // rel=1000ms - waitForRun("third run", t, timer, obj) - - // Let minInterval pass, but there are no runs queued - timer.advance(1 * time.Second) // rel=1000ms - waitForNothing("minInterval", t, timer, obj) + fakeClock.Step(1 * time.Millisecond) // rel=1000ms + assertCalls(t, obj, 3) + // wait for the timers to be reset + for fakeClock.Waiters() != 2 { + time.Sleep(1 * time.Millisecond) + } + // Let minInterval pass, but there are no runs queued. No execution expected. + fakeClock.Step(1 * time.Second) // rel=1000ms + assertNoCalls(t, obj) // Let maxInterval pass - timer.advance(9 * time.Second) // rel=10000ms - waitForRun("maxInterval", t, timer, obj) - - // Run again, before minInterval expires. - timer.advance(1 * time.Millisecond) // rel=1ms + fakeClock.Step(maxInterval) // rel=10000ms + assertCalls(t, obj, 4) + // wait for the timers to be reset + for fakeClock.Waiters() != 2 { + time.Sleep(1 * time.Millisecond) + } + // Run again, before minInterval expires. No execution expected. + fakeClock.Step(1 * time.Millisecond) // rel=1ms runner.Run() - waitForDefer("too soon after maxInterval run", t, timer, obj, 999*time.Millisecond) + assertNoCalls(t, obj) // Let minInterval pass - timer.advance(999 * time.Millisecond) // rel=1000ms - waitForRun("fifth run", t, timer, obj) - - // Clean up. - stop <- struct{}{} - // a message is sent to time.updated in func Stop() at the end of the child goroutine - // to terminate the child, a receive on time.updated is needed here - <-timer.updated + fakeClock.Step(999 * time.Millisecond) // rel=1000ms + assertCalls(t, obj, 5) } func Test_BoundedFrequencyRunnerRetry(t *testing.T) { - obj := &receiver{} - timer := newFakeTimer() - runner := construct("test-runner", obj.F, minInterval, retryInterval, maxInterval, timer) + var minInterval = 1 * time.Second + var retryInterval = 5 * time.Second + var maxInterval = 10 * time.Second + obj := newReceiver() + fakeClock := clock.NewFakeClock(time.Now()) + runner := construct("test-runner", obj.F, minInterval, retryInterval, maxInterval, fakeClock) stop := make(chan struct{}) + defer close(stop) - var upd timerUpdate - - // Start. go runner.Loop(stop) - upd = <-timer.updated // wait for initial time to be set to max - checkTimer("init", t, upd, true, maxInterval) - checkReceiver("init", t, obj, false) // Run once, immediately, and queue a retry // rel=0ms - obj.setRetry(true) + obj.setReturnValue(fmt.Errorf("sync error")) runner.Run() - waitForRunWithRetry("first run", t, timer, obj, 5*time.Second) + assertCalls(t, obj, 1) + // wait for the timers to be reset + for fakeClock.Waiters() != 2 { + time.Sleep(1 * time.Millisecond) + } + + // next run will succeed + obj.setReturnValue(nil) + assertNoCalls(t, obj) // Nothing happens... - timer.advance(time.Second) // rel=1000ms - waitForNothing("minInterval, nothing queued", t, timer, obj) + fakeClock.Step(minInterval) // rel=1000ms + assertNoCalls(t, obj) // After retryInterval, function is called - timer.advance(4 * time.Second) // rel=5000ms - waitForRun("retry", t, timer, obj) + fakeClock.Step(4 * time.Second) // rel=5000ms + assertCalls(t, obj, 2) + // wait for the timers to be reset + for fakeClock.Waiters() != 2 { + time.Sleep(1 * time.Millisecond) + } - // Run again, before minInterval expires. - timer.advance(499 * time.Millisecond) // rel=499ms + // Run again, before minInterval expires and trigger a retry + fakeClock.Step(499 * time.Millisecond) // rel=499ms + obj.setReturnValue(fmt.Errorf("sync error")) runner.Run() - waitForDefer("too soon after retry", t, timer, obj, 501*time.Millisecond) + assertNoCalls(t, obj) - // Do the deferred run, have it queue another retry - obj.setRetry(true) - timer.advance(501 * time.Millisecond) // rel=1000ms - waitForRunWithRetry("second run", t, timer, obj, 5*time.Second) + // Do the deferred run, queue another retry after it returns + fakeClock.Step(501 * time.Millisecond) // rel=1000ms + assertCalls(t, obj, 3) + + // next run will succeed + obj.setReturnValue(nil) + assertNoCalls(t, obj) // Wait for minInterval to pass - timer.advance(time.Second) // rel=1000ms - waitForNothing("minInterval, nothing queued", t, timer, obj) + fakeClock.Step(time.Second) // rel=1000ms + assertNoCalls(t, obj) - // Now do another run + // Now do another successful that abort the retry runner.Run() - waitForRun("third run", t, timer, obj) + assertCalls(t, obj, 4) + // wait for the timers to be reset + for fakeClock.Waiters() != 2 { + time.Sleep(1 * time.Millisecond) + } // Retry was cancelled because we already ran - timer.advance(4 * time.Second) - waitForNothing("retry cancelled", t, timer, obj) + fakeClock.Step(4 * time.Second) + assertNoCalls(t, obj) - // Run and request a retry - obj.setRetry(true) + // New run will trigger a retry. + obj.setReturnValue(fmt.Errorf("sync error")) runner.Run() - waitForRunWithRetry("fourth run", t, timer, obj, 5*time.Second) + assertCalls(t, obj, 5) + for fakeClock.Waiters() != 2 { // wait for retryIntervalTimer + time.Sleep(1 * time.Millisecond) + } + + // next run will succeed + obj.setReturnValue(nil) + assertNoCalls(t, obj) // Call Run again before minInterval passes - timer.advance(100 * time.Millisecond) // rel=100ms + fakeClock.Step(100 * time.Millisecond) // rel=100ms runner.Run() - waitForDefer("too soon after fourth run", t, timer, obj, 900*time.Millisecond) + assertNoCalls(t, obj) // Deferred run will run after minInterval passes - timer.advance(900 * time.Millisecond) // rel=1000ms - waitForRun("fifth run", t, timer, obj) + fakeClock.Step(900 * time.Millisecond) // rel=1000ms + assertCalls(t, obj, 6) + // wait for the timers to be reset + for fakeClock.Waiters() != 2 { + time.Sleep(1 * time.Millisecond) + } // Retry was cancelled because we already ran - timer.advance(4 * time.Second) // rel=4s since run, 5s since RetryAfter - waitForNothing("retry cancelled", t, timer, obj) + fakeClock.Step(4 * time.Second) // rel=4s since run, 5s since RetryAfter + assertNoCalls(t, obj) // Rerun happens after maxInterval - timer.advance(5 * time.Second) // rel=9s since run, 10s since RetryAfter - waitForNothing("premature", t, timer, obj) - timer.advance(time.Second) // rel=10s since run - waitForRun("maxInterval", t, timer, obj) + fakeClock.Step(5 * time.Second) // rel=9s since run, 10s since RetryAfter + assertNoCalls(t, obj) - // Clean up. - stop <- struct{}{} - // a message is sent to time.updated in func Stop() at the end of the child goroutine - // to terminate the child, a receive on time.updated is needed here - <-timer.updated + fakeClock.Step(time.Second) // rel=10s since run + assertCalls(t, obj, 7) +} + +func Test_BoundedFrequencyRunnerRetryShorterThanMinInterval(t *testing.T) { + var minInterval = 5 * time.Second + var retryInterval = 1 * time.Second // Shorter than minInterval + var maxInterval = 10 * time.Second + obj := newReceiver() + fakeClock := clock.NewFakeClock(time.Now()) + runner := construct("test-runner-short-retry", obj.F, minInterval, retryInterval, maxInterval, fakeClock) + stop := make(chan struct{}) + defer close(stop) + + go runner.Loop(stop) + + // Run once immediately and trigger a retry. + // rel=0s + obj.setReturnValue(fmt.Errorf("sync error")) + runner.Run() + assertCalls(t, obj, 1) + for fakeClock.Waiters() != 2 { + time.Sleep(1 * time.Millisecond) + } + + // next run will succeed + obj.setReturnValue(nil) + assertNoCalls(t, obj) + + // Advance clock past retryInterval, but still within minInterval. + // rel=1s + fakeClock.Step(retryInterval) + assertNoCalls(t, obj) // Still shouldn't run because minInterval hasn't passed since run 1 finished. + + // Advance clock just before minInterval expires. + // rel=4.999s + fakeClock.Step(minInterval - retryInterval - 1*time.Millisecond) + assertNoCalls(t, obj) + + // Advance clock past minInterval. The retry should now trigger the run. + // rel=5s + fakeClock.Step(1 * time.Millisecond) + assertCalls(t, obj, 2) // Run happens now, triggered by the earlier retry, respecting minInterval. + // wait for the timers to be reset + for fakeClock.Waiters() != 2 { + time.Sleep(1 * time.Millisecond) + } + // Let maxInterval pass without any Run() or Retry() calls. + fakeClock.Step(maxInterval) // rel=10s since run 2 + assertCalls(t, obj, 3) +} + +func TestBoundedFrequencyRunner_Run_RunsAgainAfterMinInterval_RealClock(t *testing.T) { + // Use relatively short intervals for real clock testing + var minInterval = 500 * time.Millisecond + var retryInterval = 800 * time.Millisecond + var maxInterval = 1500 * time.Millisecond + obj := newReceiver() + runner := NewBoundedFrequencyRunner("test-runner", obj.F, minInterval, retryInterval, maxInterval) + + stopCh := make(chan struct{}) + defer close(stopCh) + go runner.Loop(stopCh) + + runner.Run() // First run + assertCalls(t, obj, 1) + + time.Sleep(2 * minInterval) + assertNoCalls(t, obj) + + runner.Run() // Second run + assertCalls(t, obj, 2) +} + +func TestBoundedFrequencyRunner_Run_DoesNotRunBeforeMinInterval_RealClock(t *testing.T) { + // Use relatively short intervals for real clock testing + var minInterval = 500 * time.Millisecond + var retryInterval = 800 * time.Millisecond + var maxInterval = 1500 * time.Millisecond + obj := newReceiver() + runner := NewBoundedFrequencyRunner("test-runner", obj.F, minInterval, retryInterval, maxInterval) + + stopCh := make(chan struct{}) + defer close(stopCh) + go runner.Loop(stopCh) + + runner.Run() // First run + assertCalls(t, obj, 1) + + time.Sleep(minInterval / 4) + runner.Run() + assertNoCalls(t, obj) +} + +func TestBoundedFrequencyRunner_RunAfterMaxInterval_RealClock(t *testing.T) { + // Use relatively short intervals for real clock testing + var minInterval = 100 * time.Millisecond + var retryInterval = 200 * time.Millisecond + var maxInterval = 500 * time.Millisecond + obj := newReceiver() + runner := NewBoundedFrequencyRunner("test-runner", obj.F, minInterval, retryInterval, maxInterval) + + stopCh := make(chan struct{}) + defer close(stopCh) + go runner.Loop(stopCh) + + assertNoCalls(t, obj) + + time.Sleep(maxInterval) + assertCalls(t, obj, 1) +} + +func Test_BoundedFrequencyRunnerRetry_RealClock(t *testing.T) { + // Use relatively short intervals for real clock testing + var minInterval = 100 * time.Millisecond + var retryInterval = 500 * time.Millisecond + var maxInterval = 10 * time.Second + + obj := newReceiver() + // Use the real clock constructor + runner := NewBoundedFrequencyRunner("test-runner-real-clock", obj.F, minInterval, retryInterval, maxInterval) + + stopCh := make(chan struct{}) + defer close(stopCh) + go runner.Loop(stopCh) + + t.Log("Triggering first retry") + // Run once immediately and trigger a retry. + // rel=0s + obj.setReturnValue(fmt.Errorf("sync error")) + runner.Run() + assertCalls(t, obj, 1) + + // Check before retryInterval + time.Sleep(retryInterval / 4) + assertNoCalls(t, obj) + + // Check after retryInterval + time.Sleep(retryInterval) // Wait past retryInterval + assertCalls(t, obj, 2) + + // Check after retryInterval (relative to the *first* Retry call in this batch) + time.Sleep(retryInterval) + assertCalls(t, obj, 3) + + time.Sleep(retryInterval / 8) + assertNoCalls(t, obj) + + time.Sleep(retryInterval) // Wait past the new retryInterval + assertCalls(t, obj, 4) }