BoundedFrequencyRunner: add RetryAfter

This commit is contained in:
Dan Winship 2019-10-01 14:54:43 -04:00
parent 478fadccff
commit 4c5f4cb353
2 changed files with 202 additions and 20 deletions

View File

@ -40,6 +40,10 @@ type BoundedFrequencyRunner struct {
lastRun time.Time // time of last run
timer timer // timer for deferred runs
limiter rateLimiter // rate limiter for on-demand runs
retry chan struct{} // schedule a retry
retryMu sync.Mutex // guards retryTime
retryTime time.Time // when to retry
}
// designed so that flowcontrol.RateLimiter satisfies
@ -72,6 +76,9 @@ type timer interface {
// See time.Now.
Now() time.Time
// Remaining returns the time until the timer will go off (if it is running).
Remaining() time.Duration
// See time.Since.
Since(t time.Time) time.Duration
@ -81,26 +88,40 @@ type timer interface {
// implement our timer in terms of std time.Timer.
type realTimer struct {
*time.Timer
timer *time.Timer
next time.Time
}
func (rt realTimer) C() <-chan time.Time {
return rt.Timer.C
func (rt *realTimer) C() <-chan time.Time {
return rt.timer.C
}
func (rt realTimer) Now() time.Time {
func (rt *realTimer) Reset(d time.Duration) bool {
rt.next = time.Now().Add(d)
return rt.timer.Reset(d)
}
func (rt *realTimer) Stop() bool {
return rt.timer.Stop()
}
func (rt *realTimer) Now() time.Time {
return time.Now()
}
func (rt realTimer) Since(t time.Time) time.Duration {
func (rt *realTimer) Remaining() time.Duration {
return rt.next.Sub(time.Now())
}
func (rt *realTimer) Since(t time.Time) time.Duration {
return time.Since(t)
}
func (rt realTimer) Sleep(d time.Duration) {
func (rt *realTimer) Sleep(d time.Duration) {
time.Sleep(d)
}
var _ timer = realTimer{}
var _ timer = &realTimer{}
// NewBoundedFrequencyRunner creates a new BoundedFrequencyRunner instance,
// which will manage runs of the specified function.
@ -132,8 +153,8 @@ var _ timer = realTimer{}
// The maxInterval must be greater than or equal to the minInterval, If the
// caller passes a maxInterval less than minInterval, this function will panic.
func NewBoundedFrequencyRunner(name string, fn func(), minInterval, maxInterval time.Duration, burstRuns int) *BoundedFrequencyRunner {
timer := realTimer{Timer: time.NewTimer(0)} // will tick immediately
<-timer.C() // consume the first tick
timer := &realTimer{timer: time.NewTimer(0)} // will tick immediately
<-timer.C() // consume the first tick
return construct(name, fn, minInterval, maxInterval, burstRuns, timer)
}
@ -152,6 +173,7 @@ func construct(name string, fn func(), minInterval, maxInterval time.Duration, b
minInterval: minInterval,
maxInterval: maxInterval,
run: make(chan struct{}, 1),
retry: make(chan struct{}, 1),
timer: timer,
}
if minInterval == 0 {
@ -179,6 +201,8 @@ func (bfr *BoundedFrequencyRunner) Loop(stop <-chan struct{}) {
bfr.tryRun()
case <-bfr.run:
bfr.tryRun()
case <-bfr.retry:
bfr.doRetry()
}
}
}
@ -199,6 +223,36 @@ func (bfr *BoundedFrequencyRunner) Run() {
}
}
// RetryAfter ensures that the function will run again after no later than interval. This
// can be called from inside a run of the BoundedFrequencyRunner's function, or
// asynchronously.
func (bfr *BoundedFrequencyRunner) RetryAfter(interval time.Duration) {
// This could be called either with or without bfr.mu held, so we can't grab that
// lock, and therefore we can't update the timer directly.
// If the Loop thread is currently running fn then it may be a while before it
// processes our retry request. But we want to retry at interval from now, not at
// interval from "whenever doRetry eventually gets called". So we convert to
// absolute time.
retryTime := bfr.timer.Now().Add(interval)
// We can't just write retryTime to a channel because there could be multiple
// RetryAfter calls before Loop gets a chance to read from the channel. So we
// record the soonest requested retry time in bfr.retryTime and then only signal
// the Loop thread once, just like Run does.
bfr.retryMu.Lock()
defer bfr.retryMu.Unlock()
if !bfr.retryTime.IsZero() && bfr.retryTime.Before(retryTime) {
return
}
bfr.retryTime = retryTime
select {
case bfr.retry <- struct{}{}:
default:
}
}
// assumes the lock is not held
func (bfr *BoundedFrequencyRunner) stop() {
bfr.mu.Lock()
@ -207,6 +261,27 @@ func (bfr *BoundedFrequencyRunner) stop() {
bfr.timer.Stop()
}
// assumes the lock is not held
func (bfr *BoundedFrequencyRunner) doRetry() {
bfr.mu.Lock()
defer bfr.mu.Unlock()
bfr.retryMu.Lock()
defer bfr.retryMu.Unlock()
if bfr.retryTime.IsZero() {
return
}
// Timer wants an interval not an absolute time, so convert retryTime back now
retryInterval := bfr.retryTime.Sub(bfr.timer.Now())
bfr.retryTime = time.Time{}
if retryInterval < bfr.timer.Remaining() {
klog.V(3).Infof("%s: retrying in %v", bfr.name, retryInterval)
bfr.timer.Stop()
bfr.timer.Reset(retryInterval)
}
}
// assumes the lock is not held
func (bfr *BoundedFrequencyRunner) tryRun() {
bfr.mu.Lock()
@ -223,17 +298,16 @@ func (bfr *BoundedFrequencyRunner) tryRun() {
}
// It can't run right now, figure out when it can run next.
elapsed := bfr.timer.Since(bfr.lastRun) // how long since last run
nextPossible := bfr.minInterval - elapsed // time to next possible run
nextScheduled := bfr.maxInterval - elapsed // time to next periodic run
elapsed := bfr.timer.Since(bfr.lastRun) // how long since last run
nextPossible := bfr.minInterval - elapsed // time to next possible run
nextScheduled := bfr.timer.Remaining() // time to next scheduled run
klog.V(4).Infof("%s: %v since last run, possible in %v, scheduled in %v", bfr.name, elapsed, nextPossible, nextScheduled)
// It's hard to avoid race conditions in the unit tests unless we always reset
// the timer here, even when it's unchanged
if nextPossible < nextScheduled {
// Set the timer for ASAP, but don't drain here. Assuming Loop is running,
// it might get a delivery in the mean time, but that is OK.
bfr.timer.Stop()
bfr.timer.Reset(nextPossible)
klog.V(3).Infof("%s: throttled, scheduling run in %v", bfr.name, nextPossible)
nextScheduled = nextPossible
}
bfr.timer.Stop()
bfr.timer.Reset(nextScheduled)
}

View File

@ -24,14 +24,20 @@ import (
// Track calls to the managed function.
type receiver struct {
lock sync.Mutex
run bool
lock sync.Mutex
run bool
retryFn func()
}
func (r *receiver) F() {
r.lock.Lock()
defer r.lock.Unlock()
r.run = true
if r.retryFn != nil {
r.retryFn()
r.retryFn = nil
}
}
func (r *receiver) reset() bool {
@ -42,6 +48,12 @@ func (r *receiver) reset() bool {
return was
}
func (r *receiver) setRetryFn(retryFn func()) {
r.lock.Lock()
defer r.lock.Unlock()
r.retryFn = retryFn
}
// A single change event in the fake timer.
type timerUpdate struct {
active bool
@ -106,6 +118,13 @@ func (ft *fakeTimer) Now() time.Time {
return ft.now
}
func (ft *fakeTimer) Remaining() time.Duration {
ft.lock.Lock()
defer ft.lock.Unlock()
return ft.timeout.Sub(ft.now)
}
func (ft *fakeTimer) Since(t time.Time) time.Duration {
ft.lock.Lock()
defer ft.lock.Unlock()
@ -168,6 +187,12 @@ func waitForRun(name string, t *testing.T, timer *fakeTimer, obj *receiver) {
waitForReset(name, t, timer, obj, true, maxInterval)
}
func waitForRunWithRetry(name string, t *testing.T, timer *fakeTimer, obj *receiver, expectNext time.Duration) {
// It will first get reset as with a normal run, and then get set again
waitForRun(name, t, timer, obj)
waitForReset(name, t, timer, obj, false, expectNext)
}
func waitForDefer(name string, t *testing.T, timer *fakeTimer, obj *receiver, expectNext time.Duration) {
waitForReset(name, t, timer, obj, false, expectNext)
}
@ -334,3 +359,86 @@ func Test_BoundedFrequencyRunnerBurst(t *testing.T) {
// Clean up.
stop <- struct{}{}
}
func Test_BoundedFrequencyRunnerRetryAfter(t *testing.T) {
obj := &receiver{}
timer := newFakeTimer()
runner := construct("test-runner", obj.F, minInterval, maxInterval, 1, timer)
stop := make(chan struct{})
var upd timerUpdate
// Start.
go runner.Loop(stop)
upd = <-timer.updated // wait for initial time to be set to max
checkTimer("init", t, upd, true, maxInterval)
checkReceiver("init", t, obj, false)
// Run once, immediately, and queue a retry
// rel=0ms
obj.setRetryFn(func() { runner.RetryAfter(5 * time.Second) })
runner.Run()
waitForRunWithRetry("first run", t, timer, obj, 5*time.Second)
// Nothing happens...
timer.advance(time.Second) // rel=1000ms
waitForNothing("minInterval, nothing queued", t, timer, obj)
// After retryInterval, function is called
timer.advance(4 * time.Second) // rel=5000ms
waitForRun("retry", t, timer, obj)
// Run again, before minInterval expires.
timer.advance(499 * time.Millisecond) // rel=499ms
runner.Run()
waitForDefer("too soon after retry", t, timer, obj, 501*time.Millisecond)
// Do the deferred run, queue another retry after it returns
timer.advance(501 * time.Millisecond) // rel=1000ms
runner.RetryAfter(5 * time.Second)
waitForRunWithRetry("second run", t, timer, obj, 5*time.Second)
// Wait for minInterval to pass
timer.advance(time.Second) // rel=1000ms
waitForNothing("minInterval, nothing queued", t, timer, obj)
// Now do another run
runner.Run()
waitForRun("third run", t, timer, obj)
// Retry was cancelled because we already ran
timer.advance(4 * time.Second)
waitForNothing("retry cancelled", t, timer, obj)
// Run, queue a retry from a goroutine
obj.setRetryFn(func() {
go func() {
time.Sleep(100 * time.Millisecond)
runner.RetryAfter(5 * time.Second)
}()
})
runner.Run()
waitForRunWithRetry("fourth run", t, timer, obj, 5*time.Second)
// Call Run again before minInterval passes
timer.advance(100 * time.Millisecond) // rel=100ms
runner.Run()
waitForDefer("too soon after fourth run", t, timer, obj, 900*time.Millisecond)
// Deferred run will run after minInterval passes
timer.advance(900 * time.Millisecond) // rel=1000ms
waitForRun("fifth run", t, timer, obj)
// Retry was cancelled because we already ran
timer.advance(4 * time.Second) // rel=4s since run, 5s since RetryAfter
waitForNothing("retry cancelled", t, timer, obj)
// Rerun happens after maxInterval
timer.advance(5 * time.Second) // rel=9s since run, 10s since RetryAfter
waitForNothing("premature", t, timer, obj)
timer.advance(time.Second) // rel=10s since run
waitForRun("maxInterval", t, timer, obj)
// Clean up.
stop <- struct{}{}
}