diff --git a/tools/cache/controller.go b/tools/cache/controller.go index 31b94ea2..466a708e 100644 --- a/tools/cache/controller.go +++ b/tools/cache/controller.go @@ -139,11 +139,11 @@ func (c *controller) Run(stopCh <-chan struct{}) { ReflectorOptions{ ResyncPeriod: c.config.FullResyncPeriod, TypeDescription: c.config.ObjectDescription, + Clock: c.clock, }, ) r.ShouldResync = c.config.ShouldResync r.WatchListPageSize = c.config.WatchListPageSize - r.clock = c.clock if c.config.WatchErrorHandler != nil { r.watchErrorHandler = c.config.WatchErrorHandler } diff --git a/tools/cache/reflector.go b/tools/cache/reflector.go index 7325ae5f..9aaaac61 100644 --- a/tools/cache/reflector.go +++ b/tools/cache/reflector.go @@ -183,6 +183,9 @@ type ReflectorOptions struct { // ResyncPeriod is the Reflector's resync period. If unset/unspecified, the resync period defaults to 0 // (do not resync). ResyncPeriod time.Duration + + // Clock allows tests to control time. If unset defaults to clock.RealClock{} + Clock clock.Clock } // NewReflectorWithOptions creates a new Reflector object which will keep the @@ -196,7 +199,10 @@ type ReflectorOptions struct { // everything as well as incrementally processing the things that // change. func NewReflectorWithOptions(lw ListerWatcher, expectedType interface{}, store Store, options ReflectorOptions) *Reflector { - realClock := &clock.RealClock{} + reflectorClock := options.Clock + if reflectorClock == nil { + reflectorClock = clock.RealClock{} + } r := &Reflector{ name: options.Name, resyncPeriod: options.ResyncPeriod, @@ -206,9 +212,9 @@ func NewReflectorWithOptions(lw ListerWatcher, expectedType interface{}, store S // We used to make the call every 1sec (1 QPS), the goal here is to achieve ~98% traffic reduction when // API server is not healthy. With these parameters, backoff will stop at [30,60) sec interval which is // 0.22 QPS. If we don't backoff for 2min, assume API server is healthy and we reset the backoff. - backoffManager: wait.NewExponentialBackoffManager(800*time.Millisecond, 30*time.Second, 2*time.Minute, 2.0, 1.0, realClock), - initConnBackoffManager: wait.NewExponentialBackoffManager(800*time.Millisecond, 30*time.Second, 2*time.Minute, 2.0, 1.0, realClock), - clock: realClock, + backoffManager: wait.NewExponentialBackoffManager(800*time.Millisecond, 30*time.Second, 2*time.Minute, 2.0, 1.0, reflectorClock), + initConnBackoffManager: wait.NewExponentialBackoffManager(800*time.Millisecond, 30*time.Second, 2*time.Minute, 2.0, 1.0, reflectorClock), + clock: reflectorClock, watchErrorHandler: WatchErrorHandler(DefaultWatchErrorHandler), expectedType: reflect.TypeOf(expectedType), } diff --git a/tools/cache/shared_informer_test.go b/tools/cache/shared_informer_test.go index 71f154b0..0ae254b8 100644 --- a/tools/cache/shared_informer_test.go +++ b/tools/cache/shared_informer_test.go @@ -348,6 +348,18 @@ func TestSharedInformerWatchDisruption(t *testing.T) { // Simulate a connection loss (or even just a too-old-watch) source.ResetWatch() + // Wait long enough for the reflector to exit and the backoff function to start waiting + // on the fake clock, otherwise advancing the fake clock will have no effect. + // TODO: Make this deterministic by counting the number of waiters on FakeClock + time.Sleep(10 * time.Millisecond) + + // Advance the clock to cause the backoff wait to expire. + clock.Step(1601 * time.Millisecond) + + // Wait long enough for backoff to invoke ListWatch a second time and distribute events + // to listeners. + time.Sleep(10 * time.Millisecond) + for _, listener := range listeners { if !listener.ok() { t.Errorf("%s: expected %v, got %v", listener.name, listener.expectedItemNames, listener.receivedItemNames)