diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index 02bf0ed6de2..3fa72480d75 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -189,6 +189,7 @@ type Proxier struct { servicesSynced bool initialized int32 syncRunner *async.BoundedFrequencyRunner // governs calls to syncProxyRules + syncPeriod time.Duration // These are effectively const and do not need the mutex to be held. iptables utiliptables.Interface @@ -296,6 +297,7 @@ func NewProxier(ipt utiliptables.Interface, serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, &isIPv6, recorder), endpointsMap: make(proxy.EndpointsMap), endpointsChanges: proxy.NewEndpointChangeTracker(hostname, newEndpointInfo, &isIPv6, recorder, endpointSlicesEnabled), + syncPeriod: syncPeriod, iptables: ipt, masqueradeAll: masqueradeAll, masqueradeMark: masqueradeMark, @@ -717,6 +719,14 @@ func (proxier *Proxier) syncProxyRules() { klog.V(3).Info("Syncing iptables rules") + success := false + defer func() { + if !success { + klog.Infof("Sync failed; retrying in %s", proxier.syncPeriod) + proxier.syncRunner.RetryAfter(proxier.syncPeriod) + } + }() + // Create and link the kube chains. for _, jump := range iptablesJumpChains { if _, err := proxier.iptables.EnsureChain(jump.table, jump.dstChain); err != nil { @@ -1432,6 +1442,8 @@ func (proxier *Proxier) syncProxyRules() { utilproxy.RevertPorts(replacementPortsMap, proxier.portsMap) return } + success = true + for name, lastChangeTriggerTimes := range endpointUpdateResult.LastChangeTriggerTimes { for _, lastChangeTriggerTime := range lastChangeTriggerTimes { latency := metrics.SinceInSeconds(lastChangeTriggerTime) diff --git a/pkg/util/async/bounded_frequency_runner.go b/pkg/util/async/bounded_frequency_runner.go index 5d74d53b13c..385b81f5e29 100644 --- a/pkg/util/async/bounded_frequency_runner.go +++ b/pkg/util/async/bounded_frequency_runner.go @@ -40,6 +40,10 @@ type BoundedFrequencyRunner struct { lastRun time.Time // time of last run timer timer // timer for deferred runs limiter rateLimiter // rate limiter for on-demand runs + + retry chan struct{} // schedule a retry + retryMu sync.Mutex // guards retryTime + retryTime time.Time // when to retry } // designed so that flowcontrol.RateLimiter satisfies @@ -72,6 +76,9 @@ type timer interface { // See time.Now. Now() time.Time + // Remaining returns the time until the timer will go off (if it is running). + Remaining() time.Duration + // See time.Since. Since(t time.Time) time.Duration @@ -81,26 +88,40 @@ type timer interface { // implement our timer in terms of std time.Timer. type realTimer struct { - *time.Timer + timer *time.Timer + next time.Time } -func (rt realTimer) C() <-chan time.Time { - return rt.Timer.C +func (rt *realTimer) C() <-chan time.Time { + return rt.timer.C } -func (rt realTimer) Now() time.Time { +func (rt *realTimer) Reset(d time.Duration) bool { + rt.next = time.Now().Add(d) + return rt.timer.Reset(d) +} + +func (rt *realTimer) Stop() bool { + return rt.timer.Stop() +} + +func (rt *realTimer) Now() time.Time { return time.Now() } -func (rt realTimer) Since(t time.Time) time.Duration { +func (rt *realTimer) Remaining() time.Duration { + return rt.next.Sub(time.Now()) +} + +func (rt *realTimer) Since(t time.Time) time.Duration { return time.Since(t) } -func (rt realTimer) Sleep(d time.Duration) { +func (rt *realTimer) Sleep(d time.Duration) { time.Sleep(d) } -var _ timer = realTimer{} +var _ timer = &realTimer{} // NewBoundedFrequencyRunner creates a new BoundedFrequencyRunner instance, // which will manage runs of the specified function. @@ -132,8 +153,8 @@ var _ timer = realTimer{} // The maxInterval must be greater than or equal to the minInterval, If the // caller passes a maxInterval less than minInterval, this function will panic. func NewBoundedFrequencyRunner(name string, fn func(), minInterval, maxInterval time.Duration, burstRuns int) *BoundedFrequencyRunner { - timer := realTimer{Timer: time.NewTimer(0)} // will tick immediately - <-timer.C() // consume the first tick + timer := &realTimer{timer: time.NewTimer(0)} // will tick immediately + <-timer.C() // consume the first tick return construct(name, fn, minInterval, maxInterval, burstRuns, timer) } @@ -152,6 +173,7 @@ func construct(name string, fn func(), minInterval, maxInterval time.Duration, b minInterval: minInterval, maxInterval: maxInterval, run: make(chan struct{}, 1), + retry: make(chan struct{}, 1), timer: timer, } if minInterval == 0 { @@ -179,6 +201,8 @@ func (bfr *BoundedFrequencyRunner) Loop(stop <-chan struct{}) { bfr.tryRun() case <-bfr.run: bfr.tryRun() + case <-bfr.retry: + bfr.doRetry() } } } @@ -199,6 +223,36 @@ func (bfr *BoundedFrequencyRunner) Run() { } } +// RetryAfter ensures that the function will run again after no later than interval. This +// can be called from inside a run of the BoundedFrequencyRunner's function, or +// asynchronously. +func (bfr *BoundedFrequencyRunner) RetryAfter(interval time.Duration) { + // This could be called either with or without bfr.mu held, so we can't grab that + // lock, and therefore we can't update the timer directly. + + // If the Loop thread is currently running fn then it may be a while before it + // processes our retry request. But we want to retry at interval from now, not at + // interval from "whenever doRetry eventually gets called". So we convert to + // absolute time. + retryTime := bfr.timer.Now().Add(interval) + + // We can't just write retryTime to a channel because there could be multiple + // RetryAfter calls before Loop gets a chance to read from the channel. So we + // record the soonest requested retry time in bfr.retryTime and then only signal + // the Loop thread once, just like Run does. + bfr.retryMu.Lock() + defer bfr.retryMu.Unlock() + if !bfr.retryTime.IsZero() && bfr.retryTime.Before(retryTime) { + return + } + bfr.retryTime = retryTime + + select { + case bfr.retry <- struct{}{}: + default: + } +} + // assumes the lock is not held func (bfr *BoundedFrequencyRunner) stop() { bfr.mu.Lock() @@ -207,6 +261,27 @@ func (bfr *BoundedFrequencyRunner) stop() { bfr.timer.Stop() } +// assumes the lock is not held +func (bfr *BoundedFrequencyRunner) doRetry() { + bfr.mu.Lock() + defer bfr.mu.Unlock() + bfr.retryMu.Lock() + defer bfr.retryMu.Unlock() + + if bfr.retryTime.IsZero() { + return + } + + // Timer wants an interval not an absolute time, so convert retryTime back now + retryInterval := bfr.retryTime.Sub(bfr.timer.Now()) + bfr.retryTime = time.Time{} + if retryInterval < bfr.timer.Remaining() { + klog.V(3).Infof("%s: retrying in %v", bfr.name, retryInterval) + bfr.timer.Stop() + bfr.timer.Reset(retryInterval) + } +} + // assumes the lock is not held func (bfr *BoundedFrequencyRunner) tryRun() { bfr.mu.Lock() @@ -223,17 +298,16 @@ func (bfr *BoundedFrequencyRunner) tryRun() { } // It can't run right now, figure out when it can run next. - - elapsed := bfr.timer.Since(bfr.lastRun) // how long since last run - nextPossible := bfr.minInterval - elapsed // time to next possible run - nextScheduled := bfr.maxInterval - elapsed // time to next periodic run + elapsed := bfr.timer.Since(bfr.lastRun) // how long since last run + nextPossible := bfr.minInterval - elapsed // time to next possible run + nextScheduled := bfr.timer.Remaining() // time to next scheduled run klog.V(4).Infof("%s: %v since last run, possible in %v, scheduled in %v", bfr.name, elapsed, nextPossible, nextScheduled) + // It's hard to avoid race conditions in the unit tests unless we always reset + // the timer here, even when it's unchanged if nextPossible < nextScheduled { - // Set the timer for ASAP, but don't drain here. Assuming Loop is running, - // it might get a delivery in the mean time, but that is OK. - bfr.timer.Stop() - bfr.timer.Reset(nextPossible) - klog.V(3).Infof("%s: throttled, scheduling run in %v", bfr.name, nextPossible) + nextScheduled = nextPossible } + bfr.timer.Stop() + bfr.timer.Reset(nextScheduled) } diff --git a/pkg/util/async/bounded_frequency_runner_test.go b/pkg/util/async/bounded_frequency_runner_test.go index f21bf58b1af..1f9791af878 100644 --- a/pkg/util/async/bounded_frequency_runner_test.go +++ b/pkg/util/async/bounded_frequency_runner_test.go @@ -24,14 +24,20 @@ import ( // Track calls to the managed function. type receiver struct { - lock sync.Mutex - run bool + lock sync.Mutex + run bool + retryFn func() } func (r *receiver) F() { r.lock.Lock() defer r.lock.Unlock() r.run = true + + if r.retryFn != nil { + r.retryFn() + r.retryFn = nil + } } func (r *receiver) reset() bool { @@ -42,6 +48,12 @@ func (r *receiver) reset() bool { return was } +func (r *receiver) setRetryFn(retryFn func()) { + r.lock.Lock() + defer r.lock.Unlock() + r.retryFn = retryFn +} + // A single change event in the fake timer. type timerUpdate struct { active bool @@ -52,16 +64,17 @@ type timerUpdate struct { type fakeTimer struct { c chan time.Time - lock sync.Mutex - now time.Time - active bool + lock sync.Mutex + now time.Time + timeout time.Time + active bool updated chan timerUpdate } func newFakeTimer() *fakeTimer { ft := &fakeTimer{ - now: time.Date(2000, 0, 0, 0, 0, 0, 0, time.UTC), + now: time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC), c: make(chan time.Time), updated: make(chan timerUpdate), } @@ -78,6 +91,7 @@ func (ft *fakeTimer) Reset(in time.Duration) bool { was := ft.active ft.active = true + ft.timeout = ft.now.Add(in) ft.updated <- timerUpdate{ active: true, next: in, @@ -104,6 +118,13 @@ func (ft *fakeTimer) Now() time.Time { return ft.now } +func (ft *fakeTimer) Remaining() time.Duration { + ft.lock.Lock() + defer ft.lock.Unlock() + + return ft.timeout.Sub(ft.now) +} + func (ft *fakeTimer) Since(t time.Time) time.Duration { ft.lock.Lock() defer ft.lock.Unlock() @@ -112,9 +133,7 @@ func (ft *fakeTimer) Since(t time.Time) time.Duration { } func (ft *fakeTimer) Sleep(d time.Duration) { - ft.lock.Lock() - defer ft.lock.Unlock() - + // ft.advance grabs ft.lock ft.advance(d) } @@ -124,15 +143,10 @@ func (ft *fakeTimer) advance(d time.Duration) { defer ft.lock.Unlock() ft.now = ft.now.Add(d) -} - -// send a timer tick. -func (ft *fakeTimer) tick() { - ft.lock.Lock() - defer ft.lock.Unlock() - - ft.active = false - ft.c <- ft.now + if ft.active && !ft.now.Before(ft.timeout) { + ft.active = false + ft.c <- ft.timeout + } } // return the calling line number (for printing) @@ -173,10 +187,27 @@ func waitForRun(name string, t *testing.T, timer *fakeTimer, obj *receiver) { waitForReset(name, t, timer, obj, true, maxInterval) } +func waitForRunWithRetry(name string, t *testing.T, timer *fakeTimer, obj *receiver, expectNext time.Duration) { + // It will first get reset as with a normal run, and then get set again + waitForRun(name, t, timer, obj) + waitForReset(name, t, timer, obj, false, expectNext) +} + func waitForDefer(name string, t *testing.T, timer *fakeTimer, obj *receiver, expectNext time.Duration) { waitForReset(name, t, timer, obj, false, expectNext) } +func waitForNothing(name string, t *testing.T, timer *fakeTimer, obj *receiver) { + select { + case <-timer.c: + t.Fatalf("%s: unexpected timer tick", name) + case upd := <-timer.updated: + t.Fatalf("%s: unexpected timer update %v", name, upd) + default: + } + checkReceiver(name, t, obj, false) +} + func Test_BoundedFrequencyRunnerNoBurst(t *testing.T) { obj := &receiver{} timer := newFakeTimer() @@ -206,13 +237,11 @@ func Test_BoundedFrequencyRunnerNoBurst(t *testing.T) { runner.Run() waitForDefer("still too soon after first", t, timer, obj, 1*time.Millisecond) - // Run again, once minInterval has passed (race with timer). + // Do the deferred run timer.advance(1 * time.Millisecond) // rel=1000ms - runner.Run() waitForRun("second run", t, timer, obj) - // Run again, before minInterval expires. - // rel=0ms + // Try again immediately runner.Run() waitForDefer("too soon after second", t, timer, obj, 1*time.Second) @@ -221,30 +250,30 @@ func Test_BoundedFrequencyRunnerNoBurst(t *testing.T) { runner.Run() waitForDefer("still too soon after second", t, timer, obj, 999*time.Millisecond) - // Let the timer tick prematurely. + // Ensure that we don't run again early timer.advance(998 * time.Millisecond) // rel=999ms - timer.tick() - waitForDefer("premature tick", t, timer, obj, 1*time.Millisecond) + waitForNothing("premature", t, timer, obj) - // Let the timer tick. + // Do the deferred run timer.advance(1 * time.Millisecond) // rel=1000ms - timer.tick() - waitForRun("first tick", t, timer, obj) + waitForRun("third run", t, timer, obj) - // Let the timer tick. - timer.advance(10 * time.Second) // rel=10000ms - timer.tick() - waitForRun("second tick", t, timer, obj) + // Let minInterval pass, but there are no runs queued + timer.advance(1 * time.Second) // rel=1000ms + waitForNothing("minInterval", t, timer, obj) + + // Let maxInterval pass + timer.advance(9 * time.Second) // rel=10000ms + waitForRun("maxInterval", t, timer, obj) // Run again, before minInterval expires. timer.advance(1 * time.Millisecond) // rel=1ms runner.Run() - waitForDefer("too soon after tick", t, timer, obj, 999*time.Millisecond) + waitForDefer("too soon after maxInterval run", t, timer, obj, 999*time.Millisecond) - // Let the timer tick. + // Let minInterval pass timer.advance(999 * time.Millisecond) // rel=1000ms - timer.tick() - waitForRun("third tick", t, timer, obj) + waitForRun("fourth run", t, timer, obj) // Clean up. stop <- struct{}{} @@ -289,8 +318,10 @@ func Test_BoundedFrequencyRunnerBurst(t *testing.T) { runner.Run() waitForDefer("too soon after second 3", t, timer, obj, 500*time.Millisecond) - // Run again, once burst has replenished. + // Advance timer enough to replenish bursts, but not enough to be minInterval + // after the last run timer.advance(499 * time.Millisecond) // abs=1000ms, rel=999ms + waitForNothing("not minInterval", t, timer, obj) runner.Run() waitForRun("third run", t, timer, obj) @@ -304,9 +335,8 @@ func Test_BoundedFrequencyRunnerBurst(t *testing.T) { runner.Run() waitForDefer("too soon after third 2", t, timer, obj, 1*time.Millisecond) - // Run again, once burst has replenished. + // Advance and do the deferred run timer.advance(1 * time.Millisecond) // abs=2000ms, rel=1000ms - runner.Run() waitForRun("fourth run", t, timer, obj) // Run again, once burst has fully replenished. @@ -318,15 +348,96 @@ func Test_BoundedFrequencyRunnerBurst(t *testing.T) { runner.Run() waitForDefer("too soon after sixth", t, timer, obj, 1*time.Second) - // Let the timer tick. + // Wait until minInterval after the last run timer.advance(1 * time.Second) // abs=5000ms, rel=1000ms - timer.tick() - waitForRun("first tick", t, timer, obj) + waitForRun("seventh run", t, timer, obj) - // Let the timer tick. + // Wait for maxInterval timer.advance(10 * time.Second) // abs=15000ms, rel=10000ms - timer.tick() - waitForRun("second tick", t, timer, obj) + waitForRun("maxInterval", t, timer, obj) + + // Clean up. + stop <- struct{}{} +} + +func Test_BoundedFrequencyRunnerRetryAfter(t *testing.T) { + obj := &receiver{} + timer := newFakeTimer() + runner := construct("test-runner", obj.F, minInterval, maxInterval, 1, timer) + stop := make(chan struct{}) + + var upd timerUpdate + + // Start. + go runner.Loop(stop) + upd = <-timer.updated // wait for initial time to be set to max + checkTimer("init", t, upd, true, maxInterval) + checkReceiver("init", t, obj, false) + + // Run once, immediately, and queue a retry + // rel=0ms + obj.setRetryFn(func() { runner.RetryAfter(5 * time.Second) }) + runner.Run() + waitForRunWithRetry("first run", t, timer, obj, 5*time.Second) + + // Nothing happens... + timer.advance(time.Second) // rel=1000ms + waitForNothing("minInterval, nothing queued", t, timer, obj) + + // After retryInterval, function is called + timer.advance(4 * time.Second) // rel=5000ms + waitForRun("retry", t, timer, obj) + + // Run again, before minInterval expires. + timer.advance(499 * time.Millisecond) // rel=499ms + runner.Run() + waitForDefer("too soon after retry", t, timer, obj, 501*time.Millisecond) + + // Do the deferred run, queue another retry after it returns + timer.advance(501 * time.Millisecond) // rel=1000ms + runner.RetryAfter(5 * time.Second) + waitForRunWithRetry("second run", t, timer, obj, 5*time.Second) + + // Wait for minInterval to pass + timer.advance(time.Second) // rel=1000ms + waitForNothing("minInterval, nothing queued", t, timer, obj) + + // Now do another run + runner.Run() + waitForRun("third run", t, timer, obj) + + // Retry was cancelled because we already ran + timer.advance(4 * time.Second) + waitForNothing("retry cancelled", t, timer, obj) + + // Run, queue a retry from a goroutine + obj.setRetryFn(func() { + go func() { + time.Sleep(100 * time.Millisecond) + runner.RetryAfter(5 * time.Second) + }() + }) + runner.Run() + waitForRunWithRetry("fourth run", t, timer, obj, 5*time.Second) + + // Call Run again before minInterval passes + timer.advance(100 * time.Millisecond) // rel=100ms + runner.Run() + waitForDefer("too soon after fourth run", t, timer, obj, 900*time.Millisecond) + + // Deferred run will run after minInterval passes + timer.advance(900 * time.Millisecond) // rel=1000ms + waitForRun("fifth run", t, timer, obj) + + // Retry was cancelled because we already ran + timer.advance(4 * time.Second) // rel=4s since run, 5s since RetryAfter + waitForNothing("retry cancelled", t, timer, obj) + + // Rerun happens after maxInterval + timer.advance(5 * time.Second) // rel=9s since run, 10s since RetryAfter + waitForNothing("premature", t, timer, obj) + timer.advance(time.Second) // rel=10s since run + waitForRun("maxInterval", t, timer, obj) // Clean up. stop <- struct{}{}