From 08e22c4b64e5ac60632c1a8e8923c23806207807 Mon Sep 17 00:00:00 2001 From: Clayton Coleman Date: Sat, 14 Jan 2023 14:17:33 -0500 Subject: [PATCH] cache: Reflector should have the same injected clock as its informer While refactoring the backoff manager to simplify and unify the code in wait a race condition was encountered in TestSharedInformerWatchDisruption. The new implementation failed because the fake clock was not propagated to the backoff managers when the reflector was used in a controller. After ensuring the mangaers, reflector, controller, and informer shared the same clock the test needed was updated to avoid the race condition by advancing the fake clock and adding real sleeps to wait for asynchronous propagation of the various goroutines in the controller. Due to the deep structure of informers it is difficult to inject hooks to avoid having to perform sleeps. At a minimum the FakeClock interface should allow a caller to determine the number of waiting timers (to avoid the first sleep). Kubernetes-commit: 91b3a81fbd916713afe215f7d701950e13a02869 --- tools/cache/controller.go | 2 +- tools/cache/reflector.go | 14 ++++++++++---- tools/cache/shared_informer_test.go | 12 ++++++++++++ 3 files changed, 23 insertions(+), 5 deletions(-) diff --git a/tools/cache/controller.go b/tools/cache/controller.go index 31b94ea2..466a708e 100644 --- a/tools/cache/controller.go +++ b/tools/cache/controller.go @@ -139,11 +139,11 @@ func (c *controller) Run(stopCh <-chan struct{}) { ReflectorOptions{ ResyncPeriod: c.config.FullResyncPeriod, TypeDescription: c.config.ObjectDescription, + Clock: c.clock, }, ) r.ShouldResync = c.config.ShouldResync r.WatchListPageSize = c.config.WatchListPageSize - r.clock = c.clock if c.config.WatchErrorHandler != nil { r.watchErrorHandler = c.config.WatchErrorHandler } diff --git a/tools/cache/reflector.go b/tools/cache/reflector.go index 7325ae5f..9aaaac61 100644 --- a/tools/cache/reflector.go +++ b/tools/cache/reflector.go @@ -183,6 +183,9 @@ type ReflectorOptions struct { // ResyncPeriod is the Reflector's resync period. If unset/unspecified, the resync period defaults to 0 // (do not resync). ResyncPeriod time.Duration + + // Clock allows tests to control time. If unset defaults to clock.RealClock{} + Clock clock.Clock } // NewReflectorWithOptions creates a new Reflector object which will keep the @@ -196,7 +199,10 @@ type ReflectorOptions struct { // everything as well as incrementally processing the things that // change. func NewReflectorWithOptions(lw ListerWatcher, expectedType interface{}, store Store, options ReflectorOptions) *Reflector { - realClock := &clock.RealClock{} + reflectorClock := options.Clock + if reflectorClock == nil { + reflectorClock = clock.RealClock{} + } r := &Reflector{ name: options.Name, resyncPeriod: options.ResyncPeriod, @@ -206,9 +212,9 @@ func NewReflectorWithOptions(lw ListerWatcher, expectedType interface{}, store S // We used to make the call every 1sec (1 QPS), the goal here is to achieve ~98% traffic reduction when // API server is not healthy. With these parameters, backoff will stop at [30,60) sec interval which is // 0.22 QPS. If we don't backoff for 2min, assume API server is healthy and we reset the backoff. - backoffManager: wait.NewExponentialBackoffManager(800*time.Millisecond, 30*time.Second, 2*time.Minute, 2.0, 1.0, realClock), - initConnBackoffManager: wait.NewExponentialBackoffManager(800*time.Millisecond, 30*time.Second, 2*time.Minute, 2.0, 1.0, realClock), - clock: realClock, + backoffManager: wait.NewExponentialBackoffManager(800*time.Millisecond, 30*time.Second, 2*time.Minute, 2.0, 1.0, reflectorClock), + initConnBackoffManager: wait.NewExponentialBackoffManager(800*time.Millisecond, 30*time.Second, 2*time.Minute, 2.0, 1.0, reflectorClock), + clock: reflectorClock, watchErrorHandler: WatchErrorHandler(DefaultWatchErrorHandler), expectedType: reflect.TypeOf(expectedType), } diff --git a/tools/cache/shared_informer_test.go b/tools/cache/shared_informer_test.go index 71f154b0..0ae254b8 100644 --- a/tools/cache/shared_informer_test.go +++ b/tools/cache/shared_informer_test.go @@ -348,6 +348,18 @@ func TestSharedInformerWatchDisruption(t *testing.T) { // Simulate a connection loss (or even just a too-old-watch) source.ResetWatch() + // Wait long enough for the reflector to exit and the backoff function to start waiting + // on the fake clock, otherwise advancing the fake clock will have no effect. + // TODO: Make this deterministic by counting the number of waiters on FakeClock + time.Sleep(10 * time.Millisecond) + + // Advance the clock to cause the backoff wait to expire. + clock.Step(1601 * time.Millisecond) + + // Wait long enough for backoff to invoke ListWatch a second time and distribute events + // to listeners. + time.Sleep(10 * time.Millisecond) + for _, listener := range listeners { if !listener.ok() { t.Errorf("%s: expected %v, got %v", listener.name, listener.expectedItemNames, listener.receivedItemNames)