From 478fadccffeb4ab8bf0f61f6a30a54078cc7d328 Mon Sep 17 00:00:00 2001 From: Dan Winship Date: Tue, 1 Oct 2019 14:05:05 -0400 Subject: [PATCH 1/3] BoundedFrequencyRunner: fix tests The tests were using a fake timer that only ticked when the test cases told it to, so it would only be correctly testing the BoundedFrequencyRunner functionality if the test cases made it tick whenever the BFR timer was supposed to expire, and didn't make it tick at any other time. But they didn't do that. Fix it to tick automatically at the correct times, and update the test cases accordingly (including adding a new helper method for asserting that the runner did nothing in cases when it's expected to have done nothing). Also fix two unrelated minor bugs in fakeTimer. --- .../async/bounded_frequency_runner_test.go | 89 ++++++++++--------- 1 file changed, 46 insertions(+), 43 deletions(-) diff --git a/pkg/util/async/bounded_frequency_runner_test.go b/pkg/util/async/bounded_frequency_runner_test.go index f21bf58b1af..cb59ed32fd3 100644 --- a/pkg/util/async/bounded_frequency_runner_test.go +++ b/pkg/util/async/bounded_frequency_runner_test.go @@ -52,16 +52,17 @@ type timerUpdate struct { type fakeTimer struct { c chan time.Time - lock sync.Mutex - now time.Time - active bool + lock sync.Mutex + now time.Time + timeout time.Time + active bool updated chan timerUpdate } func newFakeTimer() *fakeTimer { ft := &fakeTimer{ - now: time.Date(2000, 0, 0, 0, 0, 0, 0, time.UTC), + now: time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC), c: make(chan time.Time), updated: make(chan timerUpdate), } @@ -78,6 +79,7 @@ func (ft *fakeTimer) Reset(in time.Duration) bool { was := ft.active ft.active = true + ft.timeout = ft.now.Add(in) ft.updated <- timerUpdate{ active: true, next: in, @@ -112,9 +114,7 @@ func (ft *fakeTimer) Since(t time.Time) time.Duration { } func (ft *fakeTimer) Sleep(d time.Duration) { - ft.lock.Lock() - defer ft.lock.Unlock() - + // ft.advance grabs ft.lock ft.advance(d) } @@ -124,15 +124,10 @@ func (ft *fakeTimer) advance(d time.Duration) { defer ft.lock.Unlock() ft.now = ft.now.Add(d) -} - -// send a timer tick. -func (ft *fakeTimer) tick() { - ft.lock.Lock() - defer ft.lock.Unlock() - - ft.active = false - ft.c <- ft.now + if ft.active && !ft.now.Before(ft.timeout) { + ft.active = false + ft.c <- ft.timeout + } } // return the calling line number (for printing) @@ -177,6 +172,17 @@ func waitForDefer(name string, t *testing.T, timer *fakeTimer, obj *receiver, ex waitForReset(name, t, timer, obj, false, expectNext) } +func waitForNothing(name string, t *testing.T, timer *fakeTimer, obj *receiver) { + select { + case <-timer.c: + t.Fatalf("%s: unexpected timer tick", name) + case upd := <-timer.updated: + t.Fatalf("%s: unexpected timer update %v", name, upd) + default: + } + checkReceiver(name, t, obj, false) +} + func Test_BoundedFrequencyRunnerNoBurst(t *testing.T) { obj := &receiver{} timer := newFakeTimer() @@ -206,13 +212,11 @@ func Test_BoundedFrequencyRunnerNoBurst(t *testing.T) { runner.Run() waitForDefer("still too soon after first", t, timer, obj, 1*time.Millisecond) - // Run again, once minInterval has passed (race with timer). + // Do the deferred run timer.advance(1 * time.Millisecond) // rel=1000ms - runner.Run() waitForRun("second run", t, timer, obj) - // Run again, before minInterval expires. - // rel=0ms + // Try again immediately runner.Run() waitForDefer("too soon after second", t, timer, obj, 1*time.Second) @@ -221,30 +225,30 @@ func Test_BoundedFrequencyRunnerNoBurst(t *testing.T) { runner.Run() waitForDefer("still too soon after second", t, timer, obj, 999*time.Millisecond) - // Let the timer tick prematurely. + // Ensure that we don't run again early timer.advance(998 * time.Millisecond) // rel=999ms - timer.tick() - waitForDefer("premature tick", t, timer, obj, 1*time.Millisecond) + waitForNothing("premature", t, timer, obj) - // Let the timer tick. + // Do the deferred run timer.advance(1 * time.Millisecond) // rel=1000ms - timer.tick() - waitForRun("first tick", t, timer, obj) + waitForRun("third run", t, timer, obj) - // Let the timer tick. - timer.advance(10 * time.Second) // rel=10000ms - timer.tick() - waitForRun("second tick", t, timer, obj) + // Let minInterval pass, but there are no runs queued + timer.advance(1 * time.Second) // rel=1000ms + waitForNothing("minInterval", t, timer, obj) + + // Let maxInterval pass + timer.advance(9 * time.Second) // rel=10000ms + waitForRun("maxInterval", t, timer, obj) // Run again, before minInterval expires. timer.advance(1 * time.Millisecond) // rel=1ms runner.Run() - waitForDefer("too soon after tick", t, timer, obj, 999*time.Millisecond) + waitForDefer("too soon after maxInterval run", t, timer, obj, 999*time.Millisecond) - // Let the timer tick. + // Let minInterval pass timer.advance(999 * time.Millisecond) // rel=1000ms - timer.tick() - waitForRun("third tick", t, timer, obj) + waitForRun("fourth run", t, timer, obj) // Clean up. stop <- struct{}{} @@ -289,8 +293,10 @@ func Test_BoundedFrequencyRunnerBurst(t *testing.T) { runner.Run() waitForDefer("too soon after second 3", t, timer, obj, 500*time.Millisecond) - // Run again, once burst has replenished. + // Advance timer enough to replenish bursts, but not enough to be minInterval + // after the last run timer.advance(499 * time.Millisecond) // abs=1000ms, rel=999ms + waitForNothing("not minInterval", t, timer, obj) runner.Run() waitForRun("third run", t, timer, obj) @@ -304,9 +310,8 @@ func Test_BoundedFrequencyRunnerBurst(t *testing.T) { runner.Run() waitForDefer("too soon after third 2", t, timer, obj, 1*time.Millisecond) - // Run again, once burst has replenished. + // Advance and do the deferred run timer.advance(1 * time.Millisecond) // abs=2000ms, rel=1000ms - runner.Run() waitForRun("fourth run", t, timer, obj) // Run again, once burst has fully replenished. @@ -318,15 +323,13 @@ func Test_BoundedFrequencyRunnerBurst(t *testing.T) { runner.Run() waitForDefer("too soon after sixth", t, timer, obj, 1*time.Second) - // Let the timer tick. + // Wait until minInterval after the last run timer.advance(1 * time.Second) // abs=5000ms, rel=1000ms - timer.tick() - waitForRun("first tick", t, timer, obj) + waitForRun("seventh run", t, timer, obj) - // Let the timer tick. + // Wait for maxInterval timer.advance(10 * time.Second) // abs=15000ms, rel=10000ms - timer.tick() - waitForRun("second tick", t, timer, obj) + waitForRun("maxInterval", t, timer, obj) // Clean up. stop <- struct{}{} From 4c5f4cb353a8840d8d0aa7f5248fbd9a256a9b70 Mon Sep 17 00:00:00 2001 From: Dan Winship Date: Tue, 1 Oct 2019 14:54:43 -0400 Subject: [PATCH 2/3] BoundedFrequencyRunner: add RetryAfter --- pkg/util/async/bounded_frequency_runner.go | 110 ++++++++++++++--- .../async/bounded_frequency_runner_test.go | 112 +++++++++++++++++- 2 files changed, 202 insertions(+), 20 deletions(-) diff --git a/pkg/util/async/bounded_frequency_runner.go b/pkg/util/async/bounded_frequency_runner.go index 5d74d53b13c..385b81f5e29 100644 --- a/pkg/util/async/bounded_frequency_runner.go +++ b/pkg/util/async/bounded_frequency_runner.go @@ -40,6 +40,10 @@ type BoundedFrequencyRunner struct { lastRun time.Time // time of last run timer timer // timer for deferred runs limiter rateLimiter // rate limiter for on-demand runs + + retry chan struct{} // schedule a retry + retryMu sync.Mutex // guards retryTime + retryTime time.Time // when to retry } // designed so that flowcontrol.RateLimiter satisfies @@ -72,6 +76,9 @@ type timer interface { // See time.Now. Now() time.Time + // Remaining returns the time until the timer will go off (if it is running). + Remaining() time.Duration + // See time.Since. Since(t time.Time) time.Duration @@ -81,26 +88,40 @@ type timer interface { // implement our timer in terms of std time.Timer. type realTimer struct { - *time.Timer + timer *time.Timer + next time.Time } -func (rt realTimer) C() <-chan time.Time { - return rt.Timer.C +func (rt *realTimer) C() <-chan time.Time { + return rt.timer.C } -func (rt realTimer) Now() time.Time { +func (rt *realTimer) Reset(d time.Duration) bool { + rt.next = time.Now().Add(d) + return rt.timer.Reset(d) +} + +func (rt *realTimer) Stop() bool { + return rt.timer.Stop() +} + +func (rt *realTimer) Now() time.Time { return time.Now() } -func (rt realTimer) Since(t time.Time) time.Duration { +func (rt *realTimer) Remaining() time.Duration { + return rt.next.Sub(time.Now()) +} + +func (rt *realTimer) Since(t time.Time) time.Duration { return time.Since(t) } -func (rt realTimer) Sleep(d time.Duration) { +func (rt *realTimer) Sleep(d time.Duration) { time.Sleep(d) } -var _ timer = realTimer{} +var _ timer = &realTimer{} // NewBoundedFrequencyRunner creates a new BoundedFrequencyRunner instance, // which will manage runs of the specified function. @@ -132,8 +153,8 @@ var _ timer = realTimer{} // The maxInterval must be greater than or equal to the minInterval, If the // caller passes a maxInterval less than minInterval, this function will panic. func NewBoundedFrequencyRunner(name string, fn func(), minInterval, maxInterval time.Duration, burstRuns int) *BoundedFrequencyRunner { - timer := realTimer{Timer: time.NewTimer(0)} // will tick immediately - <-timer.C() // consume the first tick + timer := &realTimer{timer: time.NewTimer(0)} // will tick immediately + <-timer.C() // consume the first tick return construct(name, fn, minInterval, maxInterval, burstRuns, timer) } @@ -152,6 +173,7 @@ func construct(name string, fn func(), minInterval, maxInterval time.Duration, b minInterval: minInterval, maxInterval: maxInterval, run: make(chan struct{}, 1), + retry: make(chan struct{}, 1), timer: timer, } if minInterval == 0 { @@ -179,6 +201,8 @@ func (bfr *BoundedFrequencyRunner) Loop(stop <-chan struct{}) { bfr.tryRun() case <-bfr.run: bfr.tryRun() + case <-bfr.retry: + bfr.doRetry() } } } @@ -199,6 +223,36 @@ func (bfr *BoundedFrequencyRunner) Run() { } } +// RetryAfter ensures that the function will run again after no later than interval. This +// can be called from inside a run of the BoundedFrequencyRunner's function, or +// asynchronously. +func (bfr *BoundedFrequencyRunner) RetryAfter(interval time.Duration) { + // This could be called either with or without bfr.mu held, so we can't grab that + // lock, and therefore we can't update the timer directly. + + // If the Loop thread is currently running fn then it may be a while before it + // processes our retry request. But we want to retry at interval from now, not at + // interval from "whenever doRetry eventually gets called". So we convert to + // absolute time. + retryTime := bfr.timer.Now().Add(interval) + + // We can't just write retryTime to a channel because there could be multiple + // RetryAfter calls before Loop gets a chance to read from the channel. So we + // record the soonest requested retry time in bfr.retryTime and then only signal + // the Loop thread once, just like Run does. + bfr.retryMu.Lock() + defer bfr.retryMu.Unlock() + if !bfr.retryTime.IsZero() && bfr.retryTime.Before(retryTime) { + return + } + bfr.retryTime = retryTime + + select { + case bfr.retry <- struct{}{}: + default: + } +} + // assumes the lock is not held func (bfr *BoundedFrequencyRunner) stop() { bfr.mu.Lock() @@ -207,6 +261,27 @@ func (bfr *BoundedFrequencyRunner) stop() { bfr.timer.Stop() } +// assumes the lock is not held +func (bfr *BoundedFrequencyRunner) doRetry() { + bfr.mu.Lock() + defer bfr.mu.Unlock() + bfr.retryMu.Lock() + defer bfr.retryMu.Unlock() + + if bfr.retryTime.IsZero() { + return + } + + // Timer wants an interval not an absolute time, so convert retryTime back now + retryInterval := bfr.retryTime.Sub(bfr.timer.Now()) + bfr.retryTime = time.Time{} + if retryInterval < bfr.timer.Remaining() { + klog.V(3).Infof("%s: retrying in %v", bfr.name, retryInterval) + bfr.timer.Stop() + bfr.timer.Reset(retryInterval) + } +} + // assumes the lock is not held func (bfr *BoundedFrequencyRunner) tryRun() { bfr.mu.Lock() @@ -223,17 +298,16 @@ func (bfr *BoundedFrequencyRunner) tryRun() { } // It can't run right now, figure out when it can run next. - - elapsed := bfr.timer.Since(bfr.lastRun) // how long since last run - nextPossible := bfr.minInterval - elapsed // time to next possible run - nextScheduled := bfr.maxInterval - elapsed // time to next periodic run + elapsed := bfr.timer.Since(bfr.lastRun) // how long since last run + nextPossible := bfr.minInterval - elapsed // time to next possible run + nextScheduled := bfr.timer.Remaining() // time to next scheduled run klog.V(4).Infof("%s: %v since last run, possible in %v, scheduled in %v", bfr.name, elapsed, nextPossible, nextScheduled) + // It's hard to avoid race conditions in the unit tests unless we always reset + // the timer here, even when it's unchanged if nextPossible < nextScheduled { - // Set the timer for ASAP, but don't drain here. Assuming Loop is running, - // it might get a delivery in the mean time, but that is OK. - bfr.timer.Stop() - bfr.timer.Reset(nextPossible) - klog.V(3).Infof("%s: throttled, scheduling run in %v", bfr.name, nextPossible) + nextScheduled = nextPossible } + bfr.timer.Stop() + bfr.timer.Reset(nextScheduled) } diff --git a/pkg/util/async/bounded_frequency_runner_test.go b/pkg/util/async/bounded_frequency_runner_test.go index cb59ed32fd3..1f9791af878 100644 --- a/pkg/util/async/bounded_frequency_runner_test.go +++ b/pkg/util/async/bounded_frequency_runner_test.go @@ -24,14 +24,20 @@ import ( // Track calls to the managed function. type receiver struct { - lock sync.Mutex - run bool + lock sync.Mutex + run bool + retryFn func() } func (r *receiver) F() { r.lock.Lock() defer r.lock.Unlock() r.run = true + + if r.retryFn != nil { + r.retryFn() + r.retryFn = nil + } } func (r *receiver) reset() bool { @@ -42,6 +48,12 @@ func (r *receiver) reset() bool { return was } +func (r *receiver) setRetryFn(retryFn func()) { + r.lock.Lock() + defer r.lock.Unlock() + r.retryFn = retryFn +} + // A single change event in the fake timer. type timerUpdate struct { active bool @@ -106,6 +118,13 @@ func (ft *fakeTimer) Now() time.Time { return ft.now } +func (ft *fakeTimer) Remaining() time.Duration { + ft.lock.Lock() + defer ft.lock.Unlock() + + return ft.timeout.Sub(ft.now) +} + func (ft *fakeTimer) Since(t time.Time) time.Duration { ft.lock.Lock() defer ft.lock.Unlock() @@ -168,6 +187,12 @@ func waitForRun(name string, t *testing.T, timer *fakeTimer, obj *receiver) { waitForReset(name, t, timer, obj, true, maxInterval) } +func waitForRunWithRetry(name string, t *testing.T, timer *fakeTimer, obj *receiver, expectNext time.Duration) { + // It will first get reset as with a normal run, and then get set again + waitForRun(name, t, timer, obj) + waitForReset(name, t, timer, obj, false, expectNext) +} + func waitForDefer(name string, t *testing.T, timer *fakeTimer, obj *receiver, expectNext time.Duration) { waitForReset(name, t, timer, obj, false, expectNext) } @@ -334,3 +359,86 @@ func Test_BoundedFrequencyRunnerBurst(t *testing.T) { // Clean up. stop <- struct{}{} } + +func Test_BoundedFrequencyRunnerRetryAfter(t *testing.T) { + obj := &receiver{} + timer := newFakeTimer() + runner := construct("test-runner", obj.F, minInterval, maxInterval, 1, timer) + stop := make(chan struct{}) + + var upd timerUpdate + + // Start. + go runner.Loop(stop) + upd = <-timer.updated // wait for initial time to be set to max + checkTimer("init", t, upd, true, maxInterval) + checkReceiver("init", t, obj, false) + + // Run once, immediately, and queue a retry + // rel=0ms + obj.setRetryFn(func() { runner.RetryAfter(5 * time.Second) }) + runner.Run() + waitForRunWithRetry("first run", t, timer, obj, 5*time.Second) + + // Nothing happens... + timer.advance(time.Second) // rel=1000ms + waitForNothing("minInterval, nothing queued", t, timer, obj) + + // After retryInterval, function is called + timer.advance(4 * time.Second) // rel=5000ms + waitForRun("retry", t, timer, obj) + + // Run again, before minInterval expires. + timer.advance(499 * time.Millisecond) // rel=499ms + runner.Run() + waitForDefer("too soon after retry", t, timer, obj, 501*time.Millisecond) + + // Do the deferred run, queue another retry after it returns + timer.advance(501 * time.Millisecond) // rel=1000ms + runner.RetryAfter(5 * time.Second) + waitForRunWithRetry("second run", t, timer, obj, 5*time.Second) + + // Wait for minInterval to pass + timer.advance(time.Second) // rel=1000ms + waitForNothing("minInterval, nothing queued", t, timer, obj) + + // Now do another run + runner.Run() + waitForRun("third run", t, timer, obj) + + // Retry was cancelled because we already ran + timer.advance(4 * time.Second) + waitForNothing("retry cancelled", t, timer, obj) + + // Run, queue a retry from a goroutine + obj.setRetryFn(func() { + go func() { + time.Sleep(100 * time.Millisecond) + runner.RetryAfter(5 * time.Second) + }() + }) + runner.Run() + waitForRunWithRetry("fourth run", t, timer, obj, 5*time.Second) + + // Call Run again before minInterval passes + timer.advance(100 * time.Millisecond) // rel=100ms + runner.Run() + waitForDefer("too soon after fourth run", t, timer, obj, 900*time.Millisecond) + + // Deferred run will run after minInterval passes + timer.advance(900 * time.Millisecond) // rel=1000ms + waitForRun("fifth run", t, timer, obj) + + // Retry was cancelled because we already ran + timer.advance(4 * time.Second) // rel=4s since run, 5s since RetryAfter + waitForNothing("retry cancelled", t, timer, obj) + + // Rerun happens after maxInterval + timer.advance(5 * time.Second) // rel=9s since run, 10s since RetryAfter + waitForNothing("premature", t, timer, obj) + timer.advance(time.Second) // rel=10s since run + waitForRun("maxInterval", t, timer, obj) + + // Clean up. + stop <- struct{}{} +} From 2fd42dee9526a580d2873b4780ac743fca1cebd5 Mon Sep 17 00:00:00 2001 From: Dan Winship Date: Tue, 1 Oct 2019 17:40:00 -0400 Subject: [PATCH 3/3] If an iptables proxier sync fails, retry after iptablesSyncPeriod --- pkg/proxy/iptables/proxier.go | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index 2dd0389dc55..28a48940304 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -189,6 +189,7 @@ type Proxier struct { servicesSynced bool initialized int32 syncRunner *async.BoundedFrequencyRunner // governs calls to syncProxyRules + syncPeriod time.Duration // These are effectively const and do not need the mutex to be held. iptables utiliptables.Interface @@ -301,6 +302,7 @@ func NewProxier(ipt utiliptables.Interface, serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, &isIPv6, recorder), endpointsMap: make(proxy.EndpointsMap), endpointsChanges: proxy.NewEndpointChangeTracker(hostname, newEndpointInfo, &isIPv6, recorder, endpointSlicesEnabled), + syncPeriod: syncPeriod, iptables: ipt, masqueradeAll: masqueradeAll, masqueradeMark: masqueradeMark, @@ -722,6 +724,14 @@ func (proxier *Proxier) syncProxyRules() { klog.V(3).Info("Syncing iptables rules") + success := false + defer func() { + if !success { + klog.Infof("Sync failed; retrying in %s", proxier.syncPeriod) + proxier.syncRunner.RetryAfter(proxier.syncPeriod) + } + }() + // Create and link the kube chains. for _, jump := range iptablesJumpChains { if _, err := proxier.iptables.EnsureChain(jump.table, jump.dstChain); err != nil { @@ -1437,6 +1447,8 @@ func (proxier *Proxier) syncProxyRules() { utilproxy.RevertPorts(replacementPortsMap, proxier.portsMap) return } + success = true + for name, lastChangeTriggerTimes := range endpointUpdateResult.LastChangeTriggerTimes { for _, lastChangeTriggerTime := range lastChangeTriggerTimes { latency := metrics.SinceInSeconds(lastChangeTriggerTime)