mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-24 20:24:09 +00:00
Merge pull request #83387 from danwinship/proxy-error-retry
If an iptables proxier sync fails, retry after iptablesSyncPeriod
This commit is contained in:
commit
85575e929b
@ -189,6 +189,7 @@ type Proxier struct {
|
|||||||
servicesSynced bool
|
servicesSynced bool
|
||||||
initialized int32
|
initialized int32
|
||||||
syncRunner *async.BoundedFrequencyRunner // governs calls to syncProxyRules
|
syncRunner *async.BoundedFrequencyRunner // governs calls to syncProxyRules
|
||||||
|
syncPeriod time.Duration
|
||||||
|
|
||||||
// These are effectively const and do not need the mutex to be held.
|
// These are effectively const and do not need the mutex to be held.
|
||||||
iptables utiliptables.Interface
|
iptables utiliptables.Interface
|
||||||
@ -296,6 +297,7 @@ func NewProxier(ipt utiliptables.Interface,
|
|||||||
serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, &isIPv6, recorder),
|
serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, &isIPv6, recorder),
|
||||||
endpointsMap: make(proxy.EndpointsMap),
|
endpointsMap: make(proxy.EndpointsMap),
|
||||||
endpointsChanges: proxy.NewEndpointChangeTracker(hostname, newEndpointInfo, &isIPv6, recorder, endpointSlicesEnabled),
|
endpointsChanges: proxy.NewEndpointChangeTracker(hostname, newEndpointInfo, &isIPv6, recorder, endpointSlicesEnabled),
|
||||||
|
syncPeriod: syncPeriod,
|
||||||
iptables: ipt,
|
iptables: ipt,
|
||||||
masqueradeAll: masqueradeAll,
|
masqueradeAll: masqueradeAll,
|
||||||
masqueradeMark: masqueradeMark,
|
masqueradeMark: masqueradeMark,
|
||||||
@ -717,6 +719,14 @@ func (proxier *Proxier) syncProxyRules() {
|
|||||||
|
|
||||||
klog.V(3).Info("Syncing iptables rules")
|
klog.V(3).Info("Syncing iptables rules")
|
||||||
|
|
||||||
|
success := false
|
||||||
|
defer func() {
|
||||||
|
if !success {
|
||||||
|
klog.Infof("Sync failed; retrying in %s", proxier.syncPeriod)
|
||||||
|
proxier.syncRunner.RetryAfter(proxier.syncPeriod)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
// Create and link the kube chains.
|
// Create and link the kube chains.
|
||||||
for _, jump := range iptablesJumpChains {
|
for _, jump := range iptablesJumpChains {
|
||||||
if _, err := proxier.iptables.EnsureChain(jump.table, jump.dstChain); err != nil {
|
if _, err := proxier.iptables.EnsureChain(jump.table, jump.dstChain); err != nil {
|
||||||
@ -1432,6 +1442,8 @@ func (proxier *Proxier) syncProxyRules() {
|
|||||||
utilproxy.RevertPorts(replacementPortsMap, proxier.portsMap)
|
utilproxy.RevertPorts(replacementPortsMap, proxier.portsMap)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
success = true
|
||||||
|
|
||||||
for name, lastChangeTriggerTimes := range endpointUpdateResult.LastChangeTriggerTimes {
|
for name, lastChangeTriggerTimes := range endpointUpdateResult.LastChangeTriggerTimes {
|
||||||
for _, lastChangeTriggerTime := range lastChangeTriggerTimes {
|
for _, lastChangeTriggerTime := range lastChangeTriggerTimes {
|
||||||
latency := metrics.SinceInSeconds(lastChangeTriggerTime)
|
latency := metrics.SinceInSeconds(lastChangeTriggerTime)
|
||||||
|
@ -40,6 +40,10 @@ type BoundedFrequencyRunner struct {
|
|||||||
lastRun time.Time // time of last run
|
lastRun time.Time // time of last run
|
||||||
timer timer // timer for deferred runs
|
timer timer // timer for deferred runs
|
||||||
limiter rateLimiter // rate limiter for on-demand runs
|
limiter rateLimiter // rate limiter for on-demand runs
|
||||||
|
|
||||||
|
retry chan struct{} // schedule a retry
|
||||||
|
retryMu sync.Mutex // guards retryTime
|
||||||
|
retryTime time.Time // when to retry
|
||||||
}
|
}
|
||||||
|
|
||||||
// designed so that flowcontrol.RateLimiter satisfies
|
// designed so that flowcontrol.RateLimiter satisfies
|
||||||
@ -72,6 +76,9 @@ type timer interface {
|
|||||||
// See time.Now.
|
// See time.Now.
|
||||||
Now() time.Time
|
Now() time.Time
|
||||||
|
|
||||||
|
// Remaining returns the time until the timer will go off (if it is running).
|
||||||
|
Remaining() time.Duration
|
||||||
|
|
||||||
// See time.Since.
|
// See time.Since.
|
||||||
Since(t time.Time) time.Duration
|
Since(t time.Time) time.Duration
|
||||||
|
|
||||||
@ -81,26 +88,40 @@ type timer interface {
|
|||||||
|
|
||||||
// implement our timer in terms of std time.Timer.
|
// implement our timer in terms of std time.Timer.
|
||||||
type realTimer struct {
|
type realTimer struct {
|
||||||
*time.Timer
|
timer *time.Timer
|
||||||
|
next time.Time
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rt realTimer) C() <-chan time.Time {
|
func (rt *realTimer) C() <-chan time.Time {
|
||||||
return rt.Timer.C
|
return rt.timer.C
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rt realTimer) Now() time.Time {
|
func (rt *realTimer) Reset(d time.Duration) bool {
|
||||||
|
rt.next = time.Now().Add(d)
|
||||||
|
return rt.timer.Reset(d)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (rt *realTimer) Stop() bool {
|
||||||
|
return rt.timer.Stop()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (rt *realTimer) Now() time.Time {
|
||||||
return time.Now()
|
return time.Now()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rt realTimer) Since(t time.Time) time.Duration {
|
func (rt *realTimer) Remaining() time.Duration {
|
||||||
|
return rt.next.Sub(time.Now())
|
||||||
|
}
|
||||||
|
|
||||||
|
func (rt *realTimer) Since(t time.Time) time.Duration {
|
||||||
return time.Since(t)
|
return time.Since(t)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rt realTimer) Sleep(d time.Duration) {
|
func (rt *realTimer) Sleep(d time.Duration) {
|
||||||
time.Sleep(d)
|
time.Sleep(d)
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ timer = realTimer{}
|
var _ timer = &realTimer{}
|
||||||
|
|
||||||
// NewBoundedFrequencyRunner creates a new BoundedFrequencyRunner instance,
|
// NewBoundedFrequencyRunner creates a new BoundedFrequencyRunner instance,
|
||||||
// which will manage runs of the specified function.
|
// which will manage runs of the specified function.
|
||||||
@ -132,7 +153,7 @@ var _ timer = realTimer{}
|
|||||||
// The maxInterval must be greater than or equal to the minInterval, If the
|
// The maxInterval must be greater than or equal to the minInterval, If the
|
||||||
// caller passes a maxInterval less than minInterval, this function will panic.
|
// caller passes a maxInterval less than minInterval, this function will panic.
|
||||||
func NewBoundedFrequencyRunner(name string, fn func(), minInterval, maxInterval time.Duration, burstRuns int) *BoundedFrequencyRunner {
|
func NewBoundedFrequencyRunner(name string, fn func(), minInterval, maxInterval time.Duration, burstRuns int) *BoundedFrequencyRunner {
|
||||||
timer := realTimer{Timer: time.NewTimer(0)} // will tick immediately
|
timer := &realTimer{timer: time.NewTimer(0)} // will tick immediately
|
||||||
<-timer.C() // consume the first tick
|
<-timer.C() // consume the first tick
|
||||||
return construct(name, fn, minInterval, maxInterval, burstRuns, timer)
|
return construct(name, fn, minInterval, maxInterval, burstRuns, timer)
|
||||||
}
|
}
|
||||||
@ -152,6 +173,7 @@ func construct(name string, fn func(), minInterval, maxInterval time.Duration, b
|
|||||||
minInterval: minInterval,
|
minInterval: minInterval,
|
||||||
maxInterval: maxInterval,
|
maxInterval: maxInterval,
|
||||||
run: make(chan struct{}, 1),
|
run: make(chan struct{}, 1),
|
||||||
|
retry: make(chan struct{}, 1),
|
||||||
timer: timer,
|
timer: timer,
|
||||||
}
|
}
|
||||||
if minInterval == 0 {
|
if minInterval == 0 {
|
||||||
@ -179,6 +201,8 @@ func (bfr *BoundedFrequencyRunner) Loop(stop <-chan struct{}) {
|
|||||||
bfr.tryRun()
|
bfr.tryRun()
|
||||||
case <-bfr.run:
|
case <-bfr.run:
|
||||||
bfr.tryRun()
|
bfr.tryRun()
|
||||||
|
case <-bfr.retry:
|
||||||
|
bfr.doRetry()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -199,6 +223,36 @@ func (bfr *BoundedFrequencyRunner) Run() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// RetryAfter ensures that the function will run again after no later than interval. This
|
||||||
|
// can be called from inside a run of the BoundedFrequencyRunner's function, or
|
||||||
|
// asynchronously.
|
||||||
|
func (bfr *BoundedFrequencyRunner) RetryAfter(interval time.Duration) {
|
||||||
|
// This could be called either with or without bfr.mu held, so we can't grab that
|
||||||
|
// lock, and therefore we can't update the timer directly.
|
||||||
|
|
||||||
|
// If the Loop thread is currently running fn then it may be a while before it
|
||||||
|
// processes our retry request. But we want to retry at interval from now, not at
|
||||||
|
// interval from "whenever doRetry eventually gets called". So we convert to
|
||||||
|
// absolute time.
|
||||||
|
retryTime := bfr.timer.Now().Add(interval)
|
||||||
|
|
||||||
|
// We can't just write retryTime to a channel because there could be multiple
|
||||||
|
// RetryAfter calls before Loop gets a chance to read from the channel. So we
|
||||||
|
// record the soonest requested retry time in bfr.retryTime and then only signal
|
||||||
|
// the Loop thread once, just like Run does.
|
||||||
|
bfr.retryMu.Lock()
|
||||||
|
defer bfr.retryMu.Unlock()
|
||||||
|
if !bfr.retryTime.IsZero() && bfr.retryTime.Before(retryTime) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
bfr.retryTime = retryTime
|
||||||
|
|
||||||
|
select {
|
||||||
|
case bfr.retry <- struct{}{}:
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// assumes the lock is not held
|
// assumes the lock is not held
|
||||||
func (bfr *BoundedFrequencyRunner) stop() {
|
func (bfr *BoundedFrequencyRunner) stop() {
|
||||||
bfr.mu.Lock()
|
bfr.mu.Lock()
|
||||||
@ -207,6 +261,27 @@ func (bfr *BoundedFrequencyRunner) stop() {
|
|||||||
bfr.timer.Stop()
|
bfr.timer.Stop()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// assumes the lock is not held
|
||||||
|
func (bfr *BoundedFrequencyRunner) doRetry() {
|
||||||
|
bfr.mu.Lock()
|
||||||
|
defer bfr.mu.Unlock()
|
||||||
|
bfr.retryMu.Lock()
|
||||||
|
defer bfr.retryMu.Unlock()
|
||||||
|
|
||||||
|
if bfr.retryTime.IsZero() {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Timer wants an interval not an absolute time, so convert retryTime back now
|
||||||
|
retryInterval := bfr.retryTime.Sub(bfr.timer.Now())
|
||||||
|
bfr.retryTime = time.Time{}
|
||||||
|
if retryInterval < bfr.timer.Remaining() {
|
||||||
|
klog.V(3).Infof("%s: retrying in %v", bfr.name, retryInterval)
|
||||||
|
bfr.timer.Stop()
|
||||||
|
bfr.timer.Reset(retryInterval)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// assumes the lock is not held
|
// assumes the lock is not held
|
||||||
func (bfr *BoundedFrequencyRunner) tryRun() {
|
func (bfr *BoundedFrequencyRunner) tryRun() {
|
||||||
bfr.mu.Lock()
|
bfr.mu.Lock()
|
||||||
@ -223,17 +298,16 @@ func (bfr *BoundedFrequencyRunner) tryRun() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// It can't run right now, figure out when it can run next.
|
// It can't run right now, figure out when it can run next.
|
||||||
|
|
||||||
elapsed := bfr.timer.Since(bfr.lastRun) // how long since last run
|
elapsed := bfr.timer.Since(bfr.lastRun) // how long since last run
|
||||||
nextPossible := bfr.minInterval - elapsed // time to next possible run
|
nextPossible := bfr.minInterval - elapsed // time to next possible run
|
||||||
nextScheduled := bfr.maxInterval - elapsed // time to next periodic run
|
nextScheduled := bfr.timer.Remaining() // time to next scheduled run
|
||||||
klog.V(4).Infof("%s: %v since last run, possible in %v, scheduled in %v", bfr.name, elapsed, nextPossible, nextScheduled)
|
klog.V(4).Infof("%s: %v since last run, possible in %v, scheduled in %v", bfr.name, elapsed, nextPossible, nextScheduled)
|
||||||
|
|
||||||
|
// It's hard to avoid race conditions in the unit tests unless we always reset
|
||||||
|
// the timer here, even when it's unchanged
|
||||||
if nextPossible < nextScheduled {
|
if nextPossible < nextScheduled {
|
||||||
// Set the timer for ASAP, but don't drain here. Assuming Loop is running,
|
nextScheduled = nextPossible
|
||||||
// it might get a delivery in the mean time, but that is OK.
|
|
||||||
bfr.timer.Stop()
|
|
||||||
bfr.timer.Reset(nextPossible)
|
|
||||||
klog.V(3).Infof("%s: throttled, scheduling run in %v", bfr.name, nextPossible)
|
|
||||||
}
|
}
|
||||||
|
bfr.timer.Stop()
|
||||||
|
bfr.timer.Reset(nextScheduled)
|
||||||
}
|
}
|
||||||
|
@ -26,12 +26,18 @@ import (
|
|||||||
type receiver struct {
|
type receiver struct {
|
||||||
lock sync.Mutex
|
lock sync.Mutex
|
||||||
run bool
|
run bool
|
||||||
|
retryFn func()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *receiver) F() {
|
func (r *receiver) F() {
|
||||||
r.lock.Lock()
|
r.lock.Lock()
|
||||||
defer r.lock.Unlock()
|
defer r.lock.Unlock()
|
||||||
r.run = true
|
r.run = true
|
||||||
|
|
||||||
|
if r.retryFn != nil {
|
||||||
|
r.retryFn()
|
||||||
|
r.retryFn = nil
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *receiver) reset() bool {
|
func (r *receiver) reset() bool {
|
||||||
@ -42,6 +48,12 @@ func (r *receiver) reset() bool {
|
|||||||
return was
|
return was
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *receiver) setRetryFn(retryFn func()) {
|
||||||
|
r.lock.Lock()
|
||||||
|
defer r.lock.Unlock()
|
||||||
|
r.retryFn = retryFn
|
||||||
|
}
|
||||||
|
|
||||||
// A single change event in the fake timer.
|
// A single change event in the fake timer.
|
||||||
type timerUpdate struct {
|
type timerUpdate struct {
|
||||||
active bool
|
active bool
|
||||||
@ -54,6 +66,7 @@ type fakeTimer struct {
|
|||||||
|
|
||||||
lock sync.Mutex
|
lock sync.Mutex
|
||||||
now time.Time
|
now time.Time
|
||||||
|
timeout time.Time
|
||||||
active bool
|
active bool
|
||||||
|
|
||||||
updated chan timerUpdate
|
updated chan timerUpdate
|
||||||
@ -61,7 +74,7 @@ type fakeTimer struct {
|
|||||||
|
|
||||||
func newFakeTimer() *fakeTimer {
|
func newFakeTimer() *fakeTimer {
|
||||||
ft := &fakeTimer{
|
ft := &fakeTimer{
|
||||||
now: time.Date(2000, 0, 0, 0, 0, 0, 0, time.UTC),
|
now: time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC),
|
||||||
c: make(chan time.Time),
|
c: make(chan time.Time),
|
||||||
updated: make(chan timerUpdate),
|
updated: make(chan timerUpdate),
|
||||||
}
|
}
|
||||||
@ -78,6 +91,7 @@ func (ft *fakeTimer) Reset(in time.Duration) bool {
|
|||||||
|
|
||||||
was := ft.active
|
was := ft.active
|
||||||
ft.active = true
|
ft.active = true
|
||||||
|
ft.timeout = ft.now.Add(in)
|
||||||
ft.updated <- timerUpdate{
|
ft.updated <- timerUpdate{
|
||||||
active: true,
|
active: true,
|
||||||
next: in,
|
next: in,
|
||||||
@ -104,6 +118,13 @@ func (ft *fakeTimer) Now() time.Time {
|
|||||||
return ft.now
|
return ft.now
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (ft *fakeTimer) Remaining() time.Duration {
|
||||||
|
ft.lock.Lock()
|
||||||
|
defer ft.lock.Unlock()
|
||||||
|
|
||||||
|
return ft.timeout.Sub(ft.now)
|
||||||
|
}
|
||||||
|
|
||||||
func (ft *fakeTimer) Since(t time.Time) time.Duration {
|
func (ft *fakeTimer) Since(t time.Time) time.Duration {
|
||||||
ft.lock.Lock()
|
ft.lock.Lock()
|
||||||
defer ft.lock.Unlock()
|
defer ft.lock.Unlock()
|
||||||
@ -112,9 +133,7 @@ func (ft *fakeTimer) Since(t time.Time) time.Duration {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (ft *fakeTimer) Sleep(d time.Duration) {
|
func (ft *fakeTimer) Sleep(d time.Duration) {
|
||||||
ft.lock.Lock()
|
// ft.advance grabs ft.lock
|
||||||
defer ft.lock.Unlock()
|
|
||||||
|
|
||||||
ft.advance(d)
|
ft.advance(d)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -124,15 +143,10 @@ func (ft *fakeTimer) advance(d time.Duration) {
|
|||||||
defer ft.lock.Unlock()
|
defer ft.lock.Unlock()
|
||||||
|
|
||||||
ft.now = ft.now.Add(d)
|
ft.now = ft.now.Add(d)
|
||||||
}
|
if ft.active && !ft.now.Before(ft.timeout) {
|
||||||
|
|
||||||
// send a timer tick.
|
|
||||||
func (ft *fakeTimer) tick() {
|
|
||||||
ft.lock.Lock()
|
|
||||||
defer ft.lock.Unlock()
|
|
||||||
|
|
||||||
ft.active = false
|
ft.active = false
|
||||||
ft.c <- ft.now
|
ft.c <- ft.timeout
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// return the calling line number (for printing)
|
// return the calling line number (for printing)
|
||||||
@ -173,10 +187,27 @@ func waitForRun(name string, t *testing.T, timer *fakeTimer, obj *receiver) {
|
|||||||
waitForReset(name, t, timer, obj, true, maxInterval)
|
waitForReset(name, t, timer, obj, true, maxInterval)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func waitForRunWithRetry(name string, t *testing.T, timer *fakeTimer, obj *receiver, expectNext time.Duration) {
|
||||||
|
// It will first get reset as with a normal run, and then get set again
|
||||||
|
waitForRun(name, t, timer, obj)
|
||||||
|
waitForReset(name, t, timer, obj, false, expectNext)
|
||||||
|
}
|
||||||
|
|
||||||
func waitForDefer(name string, t *testing.T, timer *fakeTimer, obj *receiver, expectNext time.Duration) {
|
func waitForDefer(name string, t *testing.T, timer *fakeTimer, obj *receiver, expectNext time.Duration) {
|
||||||
waitForReset(name, t, timer, obj, false, expectNext)
|
waitForReset(name, t, timer, obj, false, expectNext)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func waitForNothing(name string, t *testing.T, timer *fakeTimer, obj *receiver) {
|
||||||
|
select {
|
||||||
|
case <-timer.c:
|
||||||
|
t.Fatalf("%s: unexpected timer tick", name)
|
||||||
|
case upd := <-timer.updated:
|
||||||
|
t.Fatalf("%s: unexpected timer update %v", name, upd)
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
checkReceiver(name, t, obj, false)
|
||||||
|
}
|
||||||
|
|
||||||
func Test_BoundedFrequencyRunnerNoBurst(t *testing.T) {
|
func Test_BoundedFrequencyRunnerNoBurst(t *testing.T) {
|
||||||
obj := &receiver{}
|
obj := &receiver{}
|
||||||
timer := newFakeTimer()
|
timer := newFakeTimer()
|
||||||
@ -206,13 +237,11 @@ func Test_BoundedFrequencyRunnerNoBurst(t *testing.T) {
|
|||||||
runner.Run()
|
runner.Run()
|
||||||
waitForDefer("still too soon after first", t, timer, obj, 1*time.Millisecond)
|
waitForDefer("still too soon after first", t, timer, obj, 1*time.Millisecond)
|
||||||
|
|
||||||
// Run again, once minInterval has passed (race with timer).
|
// Do the deferred run
|
||||||
timer.advance(1 * time.Millisecond) // rel=1000ms
|
timer.advance(1 * time.Millisecond) // rel=1000ms
|
||||||
runner.Run()
|
|
||||||
waitForRun("second run", t, timer, obj)
|
waitForRun("second run", t, timer, obj)
|
||||||
|
|
||||||
// Run again, before minInterval expires.
|
// Try again immediately
|
||||||
// rel=0ms
|
|
||||||
runner.Run()
|
runner.Run()
|
||||||
waitForDefer("too soon after second", t, timer, obj, 1*time.Second)
|
waitForDefer("too soon after second", t, timer, obj, 1*time.Second)
|
||||||
|
|
||||||
@ -221,30 +250,30 @@ func Test_BoundedFrequencyRunnerNoBurst(t *testing.T) {
|
|||||||
runner.Run()
|
runner.Run()
|
||||||
waitForDefer("still too soon after second", t, timer, obj, 999*time.Millisecond)
|
waitForDefer("still too soon after second", t, timer, obj, 999*time.Millisecond)
|
||||||
|
|
||||||
// Let the timer tick prematurely.
|
// Ensure that we don't run again early
|
||||||
timer.advance(998 * time.Millisecond) // rel=999ms
|
timer.advance(998 * time.Millisecond) // rel=999ms
|
||||||
timer.tick()
|
waitForNothing("premature", t, timer, obj)
|
||||||
waitForDefer("premature tick", t, timer, obj, 1*time.Millisecond)
|
|
||||||
|
|
||||||
// Let the timer tick.
|
// Do the deferred run
|
||||||
timer.advance(1 * time.Millisecond) // rel=1000ms
|
timer.advance(1 * time.Millisecond) // rel=1000ms
|
||||||
timer.tick()
|
waitForRun("third run", t, timer, obj)
|
||||||
waitForRun("first tick", t, timer, obj)
|
|
||||||
|
|
||||||
// Let the timer tick.
|
// Let minInterval pass, but there are no runs queued
|
||||||
timer.advance(10 * time.Second) // rel=10000ms
|
timer.advance(1 * time.Second) // rel=1000ms
|
||||||
timer.tick()
|
waitForNothing("minInterval", t, timer, obj)
|
||||||
waitForRun("second tick", t, timer, obj)
|
|
||||||
|
// Let maxInterval pass
|
||||||
|
timer.advance(9 * time.Second) // rel=10000ms
|
||||||
|
waitForRun("maxInterval", t, timer, obj)
|
||||||
|
|
||||||
// Run again, before minInterval expires.
|
// Run again, before minInterval expires.
|
||||||
timer.advance(1 * time.Millisecond) // rel=1ms
|
timer.advance(1 * time.Millisecond) // rel=1ms
|
||||||
runner.Run()
|
runner.Run()
|
||||||
waitForDefer("too soon after tick", t, timer, obj, 999*time.Millisecond)
|
waitForDefer("too soon after maxInterval run", t, timer, obj, 999*time.Millisecond)
|
||||||
|
|
||||||
// Let the timer tick.
|
// Let minInterval pass
|
||||||
timer.advance(999 * time.Millisecond) // rel=1000ms
|
timer.advance(999 * time.Millisecond) // rel=1000ms
|
||||||
timer.tick()
|
waitForRun("fourth run", t, timer, obj)
|
||||||
waitForRun("third tick", t, timer, obj)
|
|
||||||
|
|
||||||
// Clean up.
|
// Clean up.
|
||||||
stop <- struct{}{}
|
stop <- struct{}{}
|
||||||
@ -289,8 +318,10 @@ func Test_BoundedFrequencyRunnerBurst(t *testing.T) {
|
|||||||
runner.Run()
|
runner.Run()
|
||||||
waitForDefer("too soon after second 3", t, timer, obj, 500*time.Millisecond)
|
waitForDefer("too soon after second 3", t, timer, obj, 500*time.Millisecond)
|
||||||
|
|
||||||
// Run again, once burst has replenished.
|
// Advance timer enough to replenish bursts, but not enough to be minInterval
|
||||||
|
// after the last run
|
||||||
timer.advance(499 * time.Millisecond) // abs=1000ms, rel=999ms
|
timer.advance(499 * time.Millisecond) // abs=1000ms, rel=999ms
|
||||||
|
waitForNothing("not minInterval", t, timer, obj)
|
||||||
runner.Run()
|
runner.Run()
|
||||||
waitForRun("third run", t, timer, obj)
|
waitForRun("third run", t, timer, obj)
|
||||||
|
|
||||||
@ -304,9 +335,8 @@ func Test_BoundedFrequencyRunnerBurst(t *testing.T) {
|
|||||||
runner.Run()
|
runner.Run()
|
||||||
waitForDefer("too soon after third 2", t, timer, obj, 1*time.Millisecond)
|
waitForDefer("too soon after third 2", t, timer, obj, 1*time.Millisecond)
|
||||||
|
|
||||||
// Run again, once burst has replenished.
|
// Advance and do the deferred run
|
||||||
timer.advance(1 * time.Millisecond) // abs=2000ms, rel=1000ms
|
timer.advance(1 * time.Millisecond) // abs=2000ms, rel=1000ms
|
||||||
runner.Run()
|
|
||||||
waitForRun("fourth run", t, timer, obj)
|
waitForRun("fourth run", t, timer, obj)
|
||||||
|
|
||||||
// Run again, once burst has fully replenished.
|
// Run again, once burst has fully replenished.
|
||||||
@ -318,15 +348,96 @@ func Test_BoundedFrequencyRunnerBurst(t *testing.T) {
|
|||||||
runner.Run()
|
runner.Run()
|
||||||
waitForDefer("too soon after sixth", t, timer, obj, 1*time.Second)
|
waitForDefer("too soon after sixth", t, timer, obj, 1*time.Second)
|
||||||
|
|
||||||
// Let the timer tick.
|
// Wait until minInterval after the last run
|
||||||
timer.advance(1 * time.Second) // abs=5000ms, rel=1000ms
|
timer.advance(1 * time.Second) // abs=5000ms, rel=1000ms
|
||||||
timer.tick()
|
waitForRun("seventh run", t, timer, obj)
|
||||||
waitForRun("first tick", t, timer, obj)
|
|
||||||
|
|
||||||
// Let the timer tick.
|
// Wait for maxInterval
|
||||||
timer.advance(10 * time.Second) // abs=15000ms, rel=10000ms
|
timer.advance(10 * time.Second) // abs=15000ms, rel=10000ms
|
||||||
timer.tick()
|
waitForRun("maxInterval", t, timer, obj)
|
||||||
waitForRun("second tick", t, timer, obj)
|
|
||||||
|
// Clean up.
|
||||||
|
stop <- struct{}{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func Test_BoundedFrequencyRunnerRetryAfter(t *testing.T) {
|
||||||
|
obj := &receiver{}
|
||||||
|
timer := newFakeTimer()
|
||||||
|
runner := construct("test-runner", obj.F, minInterval, maxInterval, 1, timer)
|
||||||
|
stop := make(chan struct{})
|
||||||
|
|
||||||
|
var upd timerUpdate
|
||||||
|
|
||||||
|
// Start.
|
||||||
|
go runner.Loop(stop)
|
||||||
|
upd = <-timer.updated // wait for initial time to be set to max
|
||||||
|
checkTimer("init", t, upd, true, maxInterval)
|
||||||
|
checkReceiver("init", t, obj, false)
|
||||||
|
|
||||||
|
// Run once, immediately, and queue a retry
|
||||||
|
// rel=0ms
|
||||||
|
obj.setRetryFn(func() { runner.RetryAfter(5 * time.Second) })
|
||||||
|
runner.Run()
|
||||||
|
waitForRunWithRetry("first run", t, timer, obj, 5*time.Second)
|
||||||
|
|
||||||
|
// Nothing happens...
|
||||||
|
timer.advance(time.Second) // rel=1000ms
|
||||||
|
waitForNothing("minInterval, nothing queued", t, timer, obj)
|
||||||
|
|
||||||
|
// After retryInterval, function is called
|
||||||
|
timer.advance(4 * time.Second) // rel=5000ms
|
||||||
|
waitForRun("retry", t, timer, obj)
|
||||||
|
|
||||||
|
// Run again, before minInterval expires.
|
||||||
|
timer.advance(499 * time.Millisecond) // rel=499ms
|
||||||
|
runner.Run()
|
||||||
|
waitForDefer("too soon after retry", t, timer, obj, 501*time.Millisecond)
|
||||||
|
|
||||||
|
// Do the deferred run, queue another retry after it returns
|
||||||
|
timer.advance(501 * time.Millisecond) // rel=1000ms
|
||||||
|
runner.RetryAfter(5 * time.Second)
|
||||||
|
waitForRunWithRetry("second run", t, timer, obj, 5*time.Second)
|
||||||
|
|
||||||
|
// Wait for minInterval to pass
|
||||||
|
timer.advance(time.Second) // rel=1000ms
|
||||||
|
waitForNothing("minInterval, nothing queued", t, timer, obj)
|
||||||
|
|
||||||
|
// Now do another run
|
||||||
|
runner.Run()
|
||||||
|
waitForRun("third run", t, timer, obj)
|
||||||
|
|
||||||
|
// Retry was cancelled because we already ran
|
||||||
|
timer.advance(4 * time.Second)
|
||||||
|
waitForNothing("retry cancelled", t, timer, obj)
|
||||||
|
|
||||||
|
// Run, queue a retry from a goroutine
|
||||||
|
obj.setRetryFn(func() {
|
||||||
|
go func() {
|
||||||
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
runner.RetryAfter(5 * time.Second)
|
||||||
|
}()
|
||||||
|
})
|
||||||
|
runner.Run()
|
||||||
|
waitForRunWithRetry("fourth run", t, timer, obj, 5*time.Second)
|
||||||
|
|
||||||
|
// Call Run again before minInterval passes
|
||||||
|
timer.advance(100 * time.Millisecond) // rel=100ms
|
||||||
|
runner.Run()
|
||||||
|
waitForDefer("too soon after fourth run", t, timer, obj, 900*time.Millisecond)
|
||||||
|
|
||||||
|
// Deferred run will run after minInterval passes
|
||||||
|
timer.advance(900 * time.Millisecond) // rel=1000ms
|
||||||
|
waitForRun("fifth run", t, timer, obj)
|
||||||
|
|
||||||
|
// Retry was cancelled because we already ran
|
||||||
|
timer.advance(4 * time.Second) // rel=4s since run, 5s since RetryAfter
|
||||||
|
waitForNothing("retry cancelled", t, timer, obj)
|
||||||
|
|
||||||
|
// Rerun happens after maxInterval
|
||||||
|
timer.advance(5 * time.Second) // rel=9s since run, 10s since RetryAfter
|
||||||
|
waitForNothing("premature", t, timer, obj)
|
||||||
|
timer.advance(time.Second) // rel=10s since run
|
||||||
|
waitForRun("maxInterval", t, timer, obj)
|
||||||
|
|
||||||
// Clean up.
|
// Clean up.
|
||||||
stop <- struct{}{}
|
stop <- struct{}{}
|
||||||
|
Loading…
Reference in New Issue
Block a user