mirror of
https://github.com/k3s-io/kubernetes.git
synced 2026-02-21 22:57:15 +00:00
Port BoundedFrequencyRunner from flowcontrol.RateLimiter to clock.Clock
Co-authored-by: Dan Winship <danwinship@redhat.com>
This commit is contained in:
committed by
Dan Winship
parent
eae17c21b0
commit
459188ce25
@@ -18,12 +18,11 @@ package runner
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"k8s.io/client-go/util/flowcontrol"
|
||||
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/klog/v2"
|
||||
"k8s.io/utils/clock"
|
||||
)
|
||||
|
||||
// BoundedFrequencyRunner manages runs of a user-provided work function.
|
||||
@@ -36,96 +35,14 @@ type BoundedFrequencyRunner struct {
|
||||
|
||||
run chan struct{} // try an async run
|
||||
|
||||
mu sync.Mutex // guards runs of fn and all mutations
|
||||
fn func() error // the work function
|
||||
lastRun time.Time // time of last run
|
||||
timer timer // timer for deferred runs
|
||||
limiter rateLimiter // rate limiter for on-demand runs
|
||||
fn func() error // the work function
|
||||
minIntervalTimer clock.Timer
|
||||
nextRunTimer clock.Timer // Combined timer for maxInterval and retryInterval logic
|
||||
clock clock.Clock
|
||||
}
|
||||
|
||||
// designed so that flowcontrol.RateLimiter satisfies
|
||||
type rateLimiter interface {
|
||||
TryAccept() bool
|
||||
Stop()
|
||||
}
|
||||
|
||||
type nullLimiter struct{}
|
||||
|
||||
func (nullLimiter) TryAccept() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func (nullLimiter) Stop() {}
|
||||
|
||||
var _ rateLimiter = nullLimiter{}
|
||||
|
||||
// for testing
|
||||
type timer interface {
|
||||
// C returns the timer's selectable channel.
|
||||
C() <-chan time.Time
|
||||
|
||||
// See time.Timer.Reset.
|
||||
Reset(d time.Duration) bool
|
||||
|
||||
// See time.Timer.Stop.
|
||||
Stop() bool
|
||||
|
||||
// See time.Now.
|
||||
Now() time.Time
|
||||
|
||||
// Remaining returns the time until the timer will go off (if it is running).
|
||||
Remaining() time.Duration
|
||||
|
||||
// See time.Since.
|
||||
Since(t time.Time) time.Duration
|
||||
|
||||
// See time.Sleep.
|
||||
Sleep(d time.Duration)
|
||||
}
|
||||
|
||||
// implement our timer in terms of std time.Timer.
|
||||
type realTimer struct {
|
||||
timer *time.Timer
|
||||
next time.Time
|
||||
}
|
||||
|
||||
func (rt *realTimer) C() <-chan time.Time {
|
||||
return rt.timer.C
|
||||
}
|
||||
|
||||
func (rt *realTimer) Reset(d time.Duration) bool {
|
||||
rt.next = time.Now().Add(d)
|
||||
return rt.timer.Reset(d)
|
||||
}
|
||||
|
||||
func (rt *realTimer) Stop() bool {
|
||||
return rt.timer.Stop()
|
||||
}
|
||||
|
||||
func (rt *realTimer) Now() time.Time {
|
||||
return time.Now()
|
||||
}
|
||||
|
||||
func (rt *realTimer) Remaining() time.Duration {
|
||||
return rt.next.Sub(time.Now())
|
||||
}
|
||||
|
||||
func (rt *realTimer) Since(t time.Time) time.Duration {
|
||||
return time.Since(t)
|
||||
}
|
||||
|
||||
func (rt *realTimer) Sleep(d time.Duration) {
|
||||
time.Sleep(d)
|
||||
}
|
||||
|
||||
var _ timer = &realTimer{}
|
||||
|
||||
// NewBoundedFrequencyRunner creates and returns a new BoundedFrequencyRunner.
|
||||
// This runner manages the execution frequency of the provided work function `fn`.
|
||||
//
|
||||
// All runs will be async to the caller of BoundedFrequencyRunner.Run, but
|
||||
// multiple runs are serialized. If the function needs to hold locks, it must
|
||||
// take them internally.
|
||||
// This runner manages the execution frequency of the provided function `fn`.
|
||||
//
|
||||
// The runner guarantees two properties:
|
||||
// 1. Minimum Interval (`minInterval`): At least `minInterval` must pass between
|
||||
@@ -143,13 +60,11 @@ var _ timer = &realTimer{}
|
||||
// (unless another trigger, like `Run()` or `maxInterval`, causes it to run sooner). Any
|
||||
// successful run will abort the retry attempt.
|
||||
func NewBoundedFrequencyRunner(name string, fn func() error, minInterval, retryInterval, maxInterval time.Duration) *BoundedFrequencyRunner {
|
||||
timer := &realTimer{timer: time.NewTimer(0)} // will tick immediately
|
||||
<-timer.C() // consume the first tick
|
||||
return construct(name, fn, minInterval, retryInterval, maxInterval, timer)
|
||||
return construct(name, fn, minInterval, retryInterval, maxInterval, clock.RealClock{})
|
||||
}
|
||||
|
||||
// Make an instance with dependencies injected.
|
||||
func construct(name string, fn func() error, minInterval, retryInterval, maxInterval time.Duration, timer timer) *BoundedFrequencyRunner {
|
||||
func construct(name string, fn func() error, minInterval, retryInterval, maxInterval time.Duration, clock clock.Clock) *BoundedFrequencyRunner {
|
||||
if maxInterval < minInterval {
|
||||
panic(fmt.Sprintf("%s: maxInterval (%v) must be >= minInterval (%v)", name, maxInterval, minInterval))
|
||||
}
|
||||
@@ -163,14 +78,9 @@ func construct(name string, fn func() error, minInterval, retryInterval, maxInte
|
||||
maxInterval: maxInterval,
|
||||
|
||||
run: make(chan struct{}, 1),
|
||||
timer: timer,
|
||||
}
|
||||
if minInterval == 0 {
|
||||
bfr.limiter = nullLimiter{}
|
||||
} else {
|
||||
qps := float32(time.Second) / float32(minInterval)
|
||||
bfr.limiter = flowcontrol.NewTokenBucketRateLimiterWithClock(qps, 1, timer)
|
||||
clock: clock,
|
||||
}
|
||||
|
||||
return bfr
|
||||
}
|
||||
|
||||
@@ -178,17 +88,56 @@ func construct(name string, fn func() error, minInterval, retryInterval, maxInte
|
||||
// called as a goroutine.
|
||||
func (bfr *BoundedFrequencyRunner) Loop(stop <-chan struct{}) {
|
||||
klog.V(3).InfoS("Loop running", "runner", bfr.name)
|
||||
bfr.timer.Reset(bfr.maxInterval)
|
||||
defer close(bfr.run)
|
||||
|
||||
bfr.minIntervalTimer = bfr.clock.NewTimer(bfr.minInterval)
|
||||
defer bfr.minIntervalTimer.Stop()
|
||||
|
||||
// Initialize nextRunTimer with maxInterval
|
||||
bfr.nextRunTimer = bfr.clock.NewTimer(bfr.maxInterval)
|
||||
defer bfr.nextRunTimer.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-stop:
|
||||
bfr.stop()
|
||||
klog.V(3).InfoS("Loop stopping", "runner", bfr.name)
|
||||
return
|
||||
case <-bfr.timer.C():
|
||||
bfr.tryRun()
|
||||
case <-bfr.nextRunTimer.C(): // Wait on the single timer
|
||||
case <-bfr.run:
|
||||
bfr.tryRun()
|
||||
}
|
||||
|
||||
// stop the timers here to allow the tests using the fake clock to synchronize
|
||||
// with the fakeClock.HasWaiters() method. The timers are reset after the function
|
||||
// is executed.
|
||||
bfr.minIntervalTimer.Stop()
|
||||
bfr.nextRunTimer.Stop()
|
||||
|
||||
var err error
|
||||
// avoid crashing if the function executed crashes
|
||||
func() {
|
||||
defer utilruntime.HandleCrash()
|
||||
err = bfr.fn()
|
||||
}()
|
||||
|
||||
// Determine the next interval based on the result
|
||||
nextInterval := bfr.maxInterval
|
||||
if err != nil {
|
||||
// If error, ensure next run is within retryInterval and maxInterval
|
||||
if bfr.retryInterval < nextInterval {
|
||||
nextInterval = bfr.retryInterval
|
||||
}
|
||||
klog.V(3).InfoS("scheduling retry", "runner", bfr.name, "interval", nextInterval, "error", err)
|
||||
}
|
||||
// Reset the timers
|
||||
bfr.minIntervalTimer.Reset(bfr.minInterval)
|
||||
bfr.nextRunTimer.Reset(nextInterval)
|
||||
|
||||
// Wait for minInterval before looping
|
||||
select {
|
||||
case <-stop:
|
||||
klog.V(3).InfoS("Loop stopping", "runner", bfr.name)
|
||||
return
|
||||
case <-bfr.minIntervalTimer.C():
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -204,49 +153,3 @@ func (bfr *BoundedFrequencyRunner) Run() {
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
// assumes the lock is not held
|
||||
func (bfr *BoundedFrequencyRunner) stop() {
|
||||
bfr.mu.Lock()
|
||||
defer bfr.mu.Unlock()
|
||||
bfr.limiter.Stop()
|
||||
bfr.timer.Stop()
|
||||
}
|
||||
|
||||
// assumes the lock is not held
|
||||
func (bfr *BoundedFrequencyRunner) tryRun() {
|
||||
bfr.mu.Lock()
|
||||
defer bfr.mu.Unlock()
|
||||
|
||||
if bfr.limiter.TryAccept() {
|
||||
// We're allowed to run the function right now.
|
||||
err := bfr.fn()
|
||||
|
||||
bfr.lastRun = bfr.timer.Now()
|
||||
bfr.timer.Stop()
|
||||
|
||||
nextInterval := bfr.maxInterval
|
||||
if err != nil {
|
||||
// an error will schedule a retry after the retryInterval,
|
||||
// any successful run before that will stop the retry attempt.
|
||||
nextInterval = bfr.retryInterval
|
||||
klog.V(3).InfoS("scheduling retry", "runner", bfr.name, "interval", nextInterval, "error", err)
|
||||
}
|
||||
bfr.timer.Reset(nextInterval)
|
||||
return
|
||||
}
|
||||
|
||||
// It can't run right now, figure out when it can run next.
|
||||
elapsed := bfr.timer.Since(bfr.lastRun) // how long since last run
|
||||
nextPossible := bfr.minInterval - elapsed // time to next possible run
|
||||
nextScheduled := bfr.timer.Remaining() // time to next scheduled run
|
||||
klog.V(4).InfoS("can't run", "runner", bfr.name, "elapsed", elapsed, "nextPossible", nextPossible, "nextScheduled", nextScheduled)
|
||||
|
||||
// It's hard to avoid race conditions in the unit tests unless we always reset
|
||||
// the timer here, even when it's unchanged
|
||||
if nextPossible < nextScheduled {
|
||||
nextScheduled = nextPossible
|
||||
}
|
||||
bfr.timer.Stop()
|
||||
bfr.timer.Reset(nextScheduled)
|
||||
}
|
||||
|
||||
@@ -19,354 +19,397 @@ package runner
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
clock "k8s.io/utils/clock/testing"
|
||||
)
|
||||
|
||||
// Track calls to the managed function.
|
||||
type receiver struct {
|
||||
lock sync.Mutex
|
||||
run bool
|
||||
retry bool
|
||||
counter atomic.Int32
|
||||
// counterCh signals completion of F() and sends the new count.
|
||||
// It's unbuffered to make the send in F() blocking.
|
||||
counterCh chan int
|
||||
resultMu sync.RWMutex
|
||||
result error
|
||||
}
|
||||
|
||||
func (r *receiver) F() error {
|
||||
r.lock.Lock()
|
||||
defer r.lock.Unlock()
|
||||
r.run = true
|
||||
|
||||
if r.retry {
|
||||
r.retry = false
|
||||
return fmt.Errorf("retry")
|
||||
}
|
||||
return nil
|
||||
newCount := r.counter.Add(1)
|
||||
// Blocking send: F() will wait here until the test reads from counterCh.
|
||||
r.counterCh <- int(newCount)
|
||||
r.resultMu.RLock()
|
||||
defer r.resultMu.RUnlock()
|
||||
return r.result
|
||||
}
|
||||
|
||||
func (r *receiver) reset() bool {
|
||||
r.lock.Lock()
|
||||
defer r.lock.Unlock()
|
||||
was := r.run
|
||||
r.run = false
|
||||
return was
|
||||
}
|
||||
|
||||
func (r *receiver) setRetry(retry bool) {
|
||||
r.lock.Lock()
|
||||
defer r.lock.Unlock()
|
||||
r.retry = retry
|
||||
}
|
||||
|
||||
// A single change event in the fake timer.
|
||||
type timerUpdate struct {
|
||||
active bool
|
||||
next time.Duration // iff active == true
|
||||
}
|
||||
|
||||
// Fake time.
|
||||
type fakeTimer struct {
|
||||
c chan time.Time
|
||||
|
||||
lock sync.Mutex
|
||||
now time.Time
|
||||
timeout time.Time
|
||||
active bool
|
||||
|
||||
updated chan timerUpdate
|
||||
}
|
||||
|
||||
func newFakeTimer() *fakeTimer {
|
||||
ft := &fakeTimer{
|
||||
now: time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC),
|
||||
c: make(chan time.Time),
|
||||
updated: make(chan timerUpdate),
|
||||
}
|
||||
return ft
|
||||
}
|
||||
|
||||
func (ft *fakeTimer) C() <-chan time.Time {
|
||||
return ft.c
|
||||
}
|
||||
|
||||
func (ft *fakeTimer) Reset(in time.Duration) bool {
|
||||
ft.lock.Lock()
|
||||
defer ft.lock.Unlock()
|
||||
|
||||
was := ft.active
|
||||
ft.active = true
|
||||
ft.timeout = ft.now.Add(in)
|
||||
ft.updated <- timerUpdate{
|
||||
active: true,
|
||||
next: in,
|
||||
}
|
||||
return was
|
||||
}
|
||||
|
||||
func (ft *fakeTimer) Stop() bool {
|
||||
ft.lock.Lock()
|
||||
defer ft.lock.Unlock()
|
||||
|
||||
was := ft.active
|
||||
ft.active = false
|
||||
ft.updated <- timerUpdate{
|
||||
active: false,
|
||||
}
|
||||
return was
|
||||
}
|
||||
|
||||
func (ft *fakeTimer) Now() time.Time {
|
||||
ft.lock.Lock()
|
||||
defer ft.lock.Unlock()
|
||||
|
||||
return ft.now
|
||||
}
|
||||
|
||||
func (ft *fakeTimer) Remaining() time.Duration {
|
||||
ft.lock.Lock()
|
||||
defer ft.lock.Unlock()
|
||||
|
||||
return ft.timeout.Sub(ft.now)
|
||||
}
|
||||
|
||||
func (ft *fakeTimer) Since(t time.Time) time.Duration {
|
||||
ft.lock.Lock()
|
||||
defer ft.lock.Unlock()
|
||||
|
||||
return ft.now.Sub(t)
|
||||
}
|
||||
|
||||
func (ft *fakeTimer) Sleep(d time.Duration) {
|
||||
// ft.advance grabs ft.lock
|
||||
ft.advance(d)
|
||||
}
|
||||
|
||||
// advance the current time.
|
||||
func (ft *fakeTimer) advance(d time.Duration) {
|
||||
ft.lock.Lock()
|
||||
defer ft.lock.Unlock()
|
||||
|
||||
ft.now = ft.now.Add(d)
|
||||
if ft.active && !ft.now.Before(ft.timeout) {
|
||||
ft.active = false
|
||||
ft.c <- ft.timeout
|
||||
func newReceiver() *receiver {
|
||||
return &receiver{
|
||||
counterCh: make(chan int),
|
||||
}
|
||||
}
|
||||
|
||||
// return the calling line number (for printing)
|
||||
// test the timer's state
|
||||
func checkTimer(name string, t *testing.T, upd timerUpdate, active bool, next time.Duration) {
|
||||
t.Helper()
|
||||
if upd.active != active {
|
||||
t.Fatalf("%s: expected timer active=%v", name, active)
|
||||
}
|
||||
if active && upd.next != next {
|
||||
t.Fatalf("%s: expected timer to be %v, got %v", name, next, upd.next)
|
||||
}
|
||||
func (r *receiver) calls() <-chan int {
|
||||
return r.counterCh
|
||||
}
|
||||
|
||||
// test and reset the receiver's state
|
||||
func checkReceiver(name string, t *testing.T, receiver *receiver, expected bool) {
|
||||
t.Helper()
|
||||
triggered := receiver.reset()
|
||||
if expected && !triggered {
|
||||
t.Fatalf("%s: function should have been called", name)
|
||||
} else if !expected && triggered {
|
||||
t.Fatalf("%s: function should not have been called", name)
|
||||
}
|
||||
func (r *receiver) setReturnValue(err error) {
|
||||
r.resultMu.Lock()
|
||||
defer r.resultMu.Unlock()
|
||||
r.result = err
|
||||
}
|
||||
|
||||
// Durations embedded in test cases depend on these.
|
||||
var minInterval = 1 * time.Second
|
||||
var retryInterval = 5 * time.Second
|
||||
var maxInterval = 10 * time.Second
|
||||
|
||||
func waitForReset(name string, t *testing.T, timer *fakeTimer, obj *receiver, expectCall bool, expectNext time.Duration) {
|
||||
t.Helper()
|
||||
upd := <-timer.updated // wait for stop
|
||||
checkReceiver(name, t, obj, expectCall)
|
||||
checkReceiver(name, t, obj, false) // prove post-condition
|
||||
checkTimer(name, t, upd, false, 0)
|
||||
upd = <-timer.updated // wait for reset
|
||||
checkTimer(name, t, upd, true, expectNext)
|
||||
}
|
||||
|
||||
func waitForRun(name string, t *testing.T, timer *fakeTimer, obj *receiver) {
|
||||
t.Helper()
|
||||
waitForReset(name, t, timer, obj, true, maxInterval)
|
||||
}
|
||||
|
||||
func waitForRunWithRetry(name string, t *testing.T, timer *fakeTimer, obj *receiver, expectNext time.Duration) {
|
||||
t.Helper()
|
||||
waitForReset(name, t, timer, obj, true, expectNext)
|
||||
}
|
||||
|
||||
func waitForDefer(name string, t *testing.T, timer *fakeTimer, obj *receiver, expectNext time.Duration) {
|
||||
t.Helper()
|
||||
waitForReset(name, t, timer, obj, false, expectNext)
|
||||
}
|
||||
|
||||
func waitForNothing(name string, t *testing.T, timer *fakeTimer, obj *receiver) {
|
||||
// assertCalls waits for the receiver's function to be called and asserts that
|
||||
// the total call count matches expectedCalls. It fails the test if the timeout is reached
|
||||
// or if the call count doesn't match.
|
||||
func assertCalls(t *testing.T, r *receiver, expectedCalls int) {
|
||||
t.Helper()
|
||||
select {
|
||||
case <-timer.c:
|
||||
t.Fatalf("%s: unexpected timer tick", name)
|
||||
case upd := <-timer.updated:
|
||||
t.Fatalf("%s: unexpected timer update %v", name, upd)
|
||||
default:
|
||||
case calls := <-r.calls():
|
||||
if calls != expectedCalls {
|
||||
t.Fatalf("expected %d calls, but got %d", expectedCalls, calls)
|
||||
}
|
||||
case <-time.After(1 * time.Second):
|
||||
t.Fatalf("timed out waiting for function execution (expected %d calls, got %d)", expectedCalls, r.counter.Load())
|
||||
}
|
||||
}
|
||||
|
||||
// assertNoCalls waits for 100 millisecond and asserts that the receiver's
|
||||
// function was *not* called during that time. It fails the test if a call is detected.
|
||||
func assertNoCalls(t *testing.T, r *receiver) {
|
||||
t.Helper()
|
||||
select {
|
||||
case calls := <-r.calls():
|
||||
t.Fatalf("unexpected function execution detected (call count: %d)", calls)
|
||||
case <-time.After(100 * time.Millisecond):
|
||||
}
|
||||
checkReceiver(name, t, obj, false)
|
||||
}
|
||||
|
||||
func Test_BoundedFrequencyRunner(t *testing.T) {
|
||||
obj := &receiver{}
|
||||
timer := newFakeTimer()
|
||||
runner := construct("test-runner", obj.F, minInterval, retryInterval, maxInterval, timer)
|
||||
var minInterval = 1 * time.Second
|
||||
var retryInterval = 5 * time.Second
|
||||
var maxInterval = 10 * time.Second
|
||||
obj := newReceiver()
|
||||
fakeClock := clock.NewFakeClock(time.Now())
|
||||
runner := construct("test-runner", obj.F, minInterval, retryInterval, maxInterval, fakeClock)
|
||||
stop := make(chan struct{})
|
||||
defer close(stop)
|
||||
|
||||
var upd timerUpdate
|
||||
|
||||
// Start.
|
||||
go runner.Loop(stop)
|
||||
upd = <-timer.updated // wait for initial time to be set to max
|
||||
checkTimer("init", t, upd, true, maxInterval)
|
||||
checkReceiver("init", t, obj, false)
|
||||
|
||||
// Run once, immediately.
|
||||
// rel=0ms
|
||||
runner.Run()
|
||||
waitForRun("first run", t, timer, obj)
|
||||
|
||||
// Run again, before minInterval expires.
|
||||
timer.advance(500 * time.Millisecond) // rel=500ms
|
||||
assertCalls(t, obj, 1)
|
||||
// wait for the timers to be reset
|
||||
for fakeClock.Waiters() != 2 {
|
||||
time.Sleep(1 * time.Millisecond)
|
||||
}
|
||||
// Run again, before minInterval expires. No execution expected.
|
||||
fakeClock.Step(500 * time.Millisecond) // rel=500ms
|
||||
runner.Run()
|
||||
waitForDefer("too soon after first", t, timer, obj, 500*time.Millisecond)
|
||||
assertNoCalls(t, obj)
|
||||
|
||||
// Run again, before minInterval expires.
|
||||
timer.advance(499 * time.Millisecond) // rel=999ms
|
||||
// Run again, before minInterval expires. No execution expected.
|
||||
fakeClock.Step(499 * time.Millisecond) // rel=999ms
|
||||
runner.Run()
|
||||
waitForDefer("still too soon after first", t, timer, obj, 1*time.Millisecond)
|
||||
assertNoCalls(t, obj)
|
||||
|
||||
// Do the deferred run
|
||||
timer.advance(1 * time.Millisecond) // rel=1000ms
|
||||
waitForRun("second run", t, timer, obj)
|
||||
fakeClock.Step(1 * time.Millisecond) // rel=1000ms
|
||||
assertCalls(t, obj, 2)
|
||||
|
||||
// Try again immediately
|
||||
runner.Run()
|
||||
waitForDefer("too soon after second", t, timer, obj, 1*time.Second)
|
||||
assertNoCalls(t, obj)
|
||||
|
||||
// Run again, before minInterval expires.
|
||||
timer.advance(1 * time.Millisecond) // rel=1ms
|
||||
// Run again, before minInterval expires. No execution expected.
|
||||
fakeClock.Step(1 * time.Millisecond) // rel=1ms
|
||||
runner.Run()
|
||||
waitForDefer("still too soon after second", t, timer, obj, 999*time.Millisecond)
|
||||
assertNoCalls(t, obj)
|
||||
|
||||
// Ensure that we don't run again early
|
||||
timer.advance(998 * time.Millisecond) // rel=999ms
|
||||
waitForNothing("premature", t, timer, obj)
|
||||
// Ensure that we don't run again early. No execution expected.
|
||||
fakeClock.Step(998 * time.Millisecond) // rel=999ms
|
||||
assertNoCalls(t, obj)
|
||||
|
||||
// Do the deferred run
|
||||
timer.advance(1 * time.Millisecond) // rel=1000ms
|
||||
waitForRun("third run", t, timer, obj)
|
||||
|
||||
// Let minInterval pass, but there are no runs queued
|
||||
timer.advance(1 * time.Second) // rel=1000ms
|
||||
waitForNothing("minInterval", t, timer, obj)
|
||||
fakeClock.Step(1 * time.Millisecond) // rel=1000ms
|
||||
assertCalls(t, obj, 3)
|
||||
// wait for the timers to be reset
|
||||
for fakeClock.Waiters() != 2 {
|
||||
time.Sleep(1 * time.Millisecond)
|
||||
}
|
||||
// Let minInterval pass, but there are no runs queued. No execution expected.
|
||||
fakeClock.Step(1 * time.Second) // rel=1000ms
|
||||
assertNoCalls(t, obj)
|
||||
|
||||
// Let maxInterval pass
|
||||
timer.advance(9 * time.Second) // rel=10000ms
|
||||
waitForRun("maxInterval", t, timer, obj)
|
||||
|
||||
// Run again, before minInterval expires.
|
||||
timer.advance(1 * time.Millisecond) // rel=1ms
|
||||
fakeClock.Step(maxInterval) // rel=10000ms
|
||||
assertCalls(t, obj, 4)
|
||||
// wait for the timers to be reset
|
||||
for fakeClock.Waiters() != 2 {
|
||||
time.Sleep(1 * time.Millisecond)
|
||||
}
|
||||
// Run again, before minInterval expires. No execution expected.
|
||||
fakeClock.Step(1 * time.Millisecond) // rel=1ms
|
||||
runner.Run()
|
||||
waitForDefer("too soon after maxInterval run", t, timer, obj, 999*time.Millisecond)
|
||||
assertNoCalls(t, obj)
|
||||
|
||||
// Let minInterval pass
|
||||
timer.advance(999 * time.Millisecond) // rel=1000ms
|
||||
waitForRun("fifth run", t, timer, obj)
|
||||
|
||||
// Clean up.
|
||||
stop <- struct{}{}
|
||||
// a message is sent to time.updated in func Stop() at the end of the child goroutine
|
||||
// to terminate the child, a receive on time.updated is needed here
|
||||
<-timer.updated
|
||||
fakeClock.Step(999 * time.Millisecond) // rel=1000ms
|
||||
assertCalls(t, obj, 5)
|
||||
}
|
||||
|
||||
func Test_BoundedFrequencyRunnerRetry(t *testing.T) {
|
||||
obj := &receiver{}
|
||||
timer := newFakeTimer()
|
||||
runner := construct("test-runner", obj.F, minInterval, retryInterval, maxInterval, timer)
|
||||
var minInterval = 1 * time.Second
|
||||
var retryInterval = 5 * time.Second
|
||||
var maxInterval = 10 * time.Second
|
||||
obj := newReceiver()
|
||||
fakeClock := clock.NewFakeClock(time.Now())
|
||||
runner := construct("test-runner", obj.F, minInterval, retryInterval, maxInterval, fakeClock)
|
||||
stop := make(chan struct{})
|
||||
defer close(stop)
|
||||
|
||||
var upd timerUpdate
|
||||
|
||||
// Start.
|
||||
go runner.Loop(stop)
|
||||
upd = <-timer.updated // wait for initial time to be set to max
|
||||
checkTimer("init", t, upd, true, maxInterval)
|
||||
checkReceiver("init", t, obj, false)
|
||||
|
||||
// Run once, immediately, and queue a retry
|
||||
// rel=0ms
|
||||
obj.setRetry(true)
|
||||
obj.setReturnValue(fmt.Errorf("sync error"))
|
||||
runner.Run()
|
||||
waitForRunWithRetry("first run", t, timer, obj, 5*time.Second)
|
||||
assertCalls(t, obj, 1)
|
||||
// wait for the timers to be reset
|
||||
for fakeClock.Waiters() != 2 {
|
||||
time.Sleep(1 * time.Millisecond)
|
||||
}
|
||||
|
||||
// next run will succeed
|
||||
obj.setReturnValue(nil)
|
||||
assertNoCalls(t, obj)
|
||||
|
||||
// Nothing happens...
|
||||
timer.advance(time.Second) // rel=1000ms
|
||||
waitForNothing("minInterval, nothing queued", t, timer, obj)
|
||||
fakeClock.Step(minInterval) // rel=1000ms
|
||||
assertNoCalls(t, obj)
|
||||
|
||||
// After retryInterval, function is called
|
||||
timer.advance(4 * time.Second) // rel=5000ms
|
||||
waitForRun("retry", t, timer, obj)
|
||||
fakeClock.Step(4 * time.Second) // rel=5000ms
|
||||
assertCalls(t, obj, 2)
|
||||
// wait for the timers to be reset
|
||||
for fakeClock.Waiters() != 2 {
|
||||
time.Sleep(1 * time.Millisecond)
|
||||
}
|
||||
|
||||
// Run again, before minInterval expires.
|
||||
timer.advance(499 * time.Millisecond) // rel=499ms
|
||||
// Run again, before minInterval expires and trigger a retry
|
||||
fakeClock.Step(499 * time.Millisecond) // rel=499ms
|
||||
obj.setReturnValue(fmt.Errorf("sync error"))
|
||||
runner.Run()
|
||||
waitForDefer("too soon after retry", t, timer, obj, 501*time.Millisecond)
|
||||
assertNoCalls(t, obj)
|
||||
|
||||
// Do the deferred run, have it queue another retry
|
||||
obj.setRetry(true)
|
||||
timer.advance(501 * time.Millisecond) // rel=1000ms
|
||||
waitForRunWithRetry("second run", t, timer, obj, 5*time.Second)
|
||||
// Do the deferred run, queue another retry after it returns
|
||||
fakeClock.Step(501 * time.Millisecond) // rel=1000ms
|
||||
assertCalls(t, obj, 3)
|
||||
|
||||
// next run will succeed
|
||||
obj.setReturnValue(nil)
|
||||
assertNoCalls(t, obj)
|
||||
|
||||
// Wait for minInterval to pass
|
||||
timer.advance(time.Second) // rel=1000ms
|
||||
waitForNothing("minInterval, nothing queued", t, timer, obj)
|
||||
fakeClock.Step(time.Second) // rel=1000ms
|
||||
assertNoCalls(t, obj)
|
||||
|
||||
// Now do another run
|
||||
// Now do another successful that abort the retry
|
||||
runner.Run()
|
||||
waitForRun("third run", t, timer, obj)
|
||||
assertCalls(t, obj, 4)
|
||||
// wait for the timers to be reset
|
||||
for fakeClock.Waiters() != 2 {
|
||||
time.Sleep(1 * time.Millisecond)
|
||||
}
|
||||
|
||||
// Retry was cancelled because we already ran
|
||||
timer.advance(4 * time.Second)
|
||||
waitForNothing("retry cancelled", t, timer, obj)
|
||||
fakeClock.Step(4 * time.Second)
|
||||
assertNoCalls(t, obj)
|
||||
|
||||
// Run and request a retry
|
||||
obj.setRetry(true)
|
||||
// New run will trigger a retry.
|
||||
obj.setReturnValue(fmt.Errorf("sync error"))
|
||||
runner.Run()
|
||||
waitForRunWithRetry("fourth run", t, timer, obj, 5*time.Second)
|
||||
assertCalls(t, obj, 5)
|
||||
for fakeClock.Waiters() != 2 { // wait for retryIntervalTimer
|
||||
time.Sleep(1 * time.Millisecond)
|
||||
}
|
||||
|
||||
// next run will succeed
|
||||
obj.setReturnValue(nil)
|
||||
assertNoCalls(t, obj)
|
||||
|
||||
// Call Run again before minInterval passes
|
||||
timer.advance(100 * time.Millisecond) // rel=100ms
|
||||
fakeClock.Step(100 * time.Millisecond) // rel=100ms
|
||||
runner.Run()
|
||||
waitForDefer("too soon after fourth run", t, timer, obj, 900*time.Millisecond)
|
||||
assertNoCalls(t, obj)
|
||||
|
||||
// Deferred run will run after minInterval passes
|
||||
timer.advance(900 * time.Millisecond) // rel=1000ms
|
||||
waitForRun("fifth run", t, timer, obj)
|
||||
fakeClock.Step(900 * time.Millisecond) // rel=1000ms
|
||||
assertCalls(t, obj, 6)
|
||||
// wait for the timers to be reset
|
||||
for fakeClock.Waiters() != 2 {
|
||||
time.Sleep(1 * time.Millisecond)
|
||||
}
|
||||
|
||||
// Retry was cancelled because we already ran
|
||||
timer.advance(4 * time.Second) // rel=4s since run, 5s since RetryAfter
|
||||
waitForNothing("retry cancelled", t, timer, obj)
|
||||
fakeClock.Step(4 * time.Second) // rel=4s since run, 5s since RetryAfter
|
||||
assertNoCalls(t, obj)
|
||||
|
||||
// Rerun happens after maxInterval
|
||||
timer.advance(5 * time.Second) // rel=9s since run, 10s since RetryAfter
|
||||
waitForNothing("premature", t, timer, obj)
|
||||
timer.advance(time.Second) // rel=10s since run
|
||||
waitForRun("maxInterval", t, timer, obj)
|
||||
fakeClock.Step(5 * time.Second) // rel=9s since run, 10s since RetryAfter
|
||||
assertNoCalls(t, obj)
|
||||
|
||||
// Clean up.
|
||||
stop <- struct{}{}
|
||||
// a message is sent to time.updated in func Stop() at the end of the child goroutine
|
||||
// to terminate the child, a receive on time.updated is needed here
|
||||
<-timer.updated
|
||||
fakeClock.Step(time.Second) // rel=10s since run
|
||||
assertCalls(t, obj, 7)
|
||||
}
|
||||
|
||||
func Test_BoundedFrequencyRunnerRetryShorterThanMinInterval(t *testing.T) {
|
||||
var minInterval = 5 * time.Second
|
||||
var retryInterval = 1 * time.Second // Shorter than minInterval
|
||||
var maxInterval = 10 * time.Second
|
||||
obj := newReceiver()
|
||||
fakeClock := clock.NewFakeClock(time.Now())
|
||||
runner := construct("test-runner-short-retry", obj.F, minInterval, retryInterval, maxInterval, fakeClock)
|
||||
stop := make(chan struct{})
|
||||
defer close(stop)
|
||||
|
||||
go runner.Loop(stop)
|
||||
|
||||
// Run once immediately and trigger a retry.
|
||||
// rel=0s
|
||||
obj.setReturnValue(fmt.Errorf("sync error"))
|
||||
runner.Run()
|
||||
assertCalls(t, obj, 1)
|
||||
for fakeClock.Waiters() != 2 {
|
||||
time.Sleep(1 * time.Millisecond)
|
||||
}
|
||||
|
||||
// next run will succeed
|
||||
obj.setReturnValue(nil)
|
||||
assertNoCalls(t, obj)
|
||||
|
||||
// Advance clock past retryInterval, but still within minInterval.
|
||||
// rel=1s
|
||||
fakeClock.Step(retryInterval)
|
||||
assertNoCalls(t, obj) // Still shouldn't run because minInterval hasn't passed since run 1 finished.
|
||||
|
||||
// Advance clock just before minInterval expires.
|
||||
// rel=4.999s
|
||||
fakeClock.Step(minInterval - retryInterval - 1*time.Millisecond)
|
||||
assertNoCalls(t, obj)
|
||||
|
||||
// Advance clock past minInterval. The retry should now trigger the run.
|
||||
// rel=5s
|
||||
fakeClock.Step(1 * time.Millisecond)
|
||||
assertCalls(t, obj, 2) // Run happens now, triggered by the earlier retry, respecting minInterval.
|
||||
// wait for the timers to be reset
|
||||
for fakeClock.Waiters() != 2 {
|
||||
time.Sleep(1 * time.Millisecond)
|
||||
}
|
||||
// Let maxInterval pass without any Run() or Retry() calls.
|
||||
fakeClock.Step(maxInterval) // rel=10s since run 2
|
||||
assertCalls(t, obj, 3)
|
||||
}
|
||||
|
||||
func TestBoundedFrequencyRunner_Run_RunsAgainAfterMinInterval_RealClock(t *testing.T) {
|
||||
// Use relatively short intervals for real clock testing
|
||||
var minInterval = 500 * time.Millisecond
|
||||
var retryInterval = 800 * time.Millisecond
|
||||
var maxInterval = 1500 * time.Millisecond
|
||||
obj := newReceiver()
|
||||
runner := NewBoundedFrequencyRunner("test-runner", obj.F, minInterval, retryInterval, maxInterval)
|
||||
|
||||
stopCh := make(chan struct{})
|
||||
defer close(stopCh)
|
||||
go runner.Loop(stopCh)
|
||||
|
||||
runner.Run() // First run
|
||||
assertCalls(t, obj, 1)
|
||||
|
||||
time.Sleep(2 * minInterval)
|
||||
assertNoCalls(t, obj)
|
||||
|
||||
runner.Run() // Second run
|
||||
assertCalls(t, obj, 2)
|
||||
}
|
||||
|
||||
func TestBoundedFrequencyRunner_Run_DoesNotRunBeforeMinInterval_RealClock(t *testing.T) {
|
||||
// Use relatively short intervals for real clock testing
|
||||
var minInterval = 500 * time.Millisecond
|
||||
var retryInterval = 800 * time.Millisecond
|
||||
var maxInterval = 1500 * time.Millisecond
|
||||
obj := newReceiver()
|
||||
runner := NewBoundedFrequencyRunner("test-runner", obj.F, minInterval, retryInterval, maxInterval)
|
||||
|
||||
stopCh := make(chan struct{})
|
||||
defer close(stopCh)
|
||||
go runner.Loop(stopCh)
|
||||
|
||||
runner.Run() // First run
|
||||
assertCalls(t, obj, 1)
|
||||
|
||||
time.Sleep(minInterval / 4)
|
||||
runner.Run()
|
||||
assertNoCalls(t, obj)
|
||||
}
|
||||
|
||||
func TestBoundedFrequencyRunner_RunAfterMaxInterval_RealClock(t *testing.T) {
|
||||
// Use relatively short intervals for real clock testing
|
||||
var minInterval = 100 * time.Millisecond
|
||||
var retryInterval = 200 * time.Millisecond
|
||||
var maxInterval = 500 * time.Millisecond
|
||||
obj := newReceiver()
|
||||
runner := NewBoundedFrequencyRunner("test-runner", obj.F, minInterval, retryInterval, maxInterval)
|
||||
|
||||
stopCh := make(chan struct{})
|
||||
defer close(stopCh)
|
||||
go runner.Loop(stopCh)
|
||||
|
||||
assertNoCalls(t, obj)
|
||||
|
||||
time.Sleep(maxInterval)
|
||||
assertCalls(t, obj, 1)
|
||||
}
|
||||
|
||||
func Test_BoundedFrequencyRunnerRetry_RealClock(t *testing.T) {
|
||||
// Use relatively short intervals for real clock testing
|
||||
var minInterval = 100 * time.Millisecond
|
||||
var retryInterval = 500 * time.Millisecond
|
||||
var maxInterval = 10 * time.Second
|
||||
|
||||
obj := newReceiver()
|
||||
// Use the real clock constructor
|
||||
runner := NewBoundedFrequencyRunner("test-runner-real-clock", obj.F, minInterval, retryInterval, maxInterval)
|
||||
|
||||
stopCh := make(chan struct{})
|
||||
defer close(stopCh)
|
||||
go runner.Loop(stopCh)
|
||||
|
||||
t.Log("Triggering first retry")
|
||||
// Run once immediately and trigger a retry.
|
||||
// rel=0s
|
||||
obj.setReturnValue(fmt.Errorf("sync error"))
|
||||
runner.Run()
|
||||
assertCalls(t, obj, 1)
|
||||
|
||||
// Check before retryInterval
|
||||
time.Sleep(retryInterval / 4)
|
||||
assertNoCalls(t, obj)
|
||||
|
||||
// Check after retryInterval
|
||||
time.Sleep(retryInterval) // Wait past retryInterval
|
||||
assertCalls(t, obj, 2)
|
||||
|
||||
// Check after retryInterval (relative to the *first* Retry call in this batch)
|
||||
time.Sleep(retryInterval)
|
||||
assertCalls(t, obj, 3)
|
||||
|
||||
time.Sleep(retryInterval / 8)
|
||||
assertNoCalls(t, obj)
|
||||
|
||||
time.Sleep(retryInterval) // Wait past the new retryInterval
|
||||
assertCalls(t, obj, 4)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user