diff --git a/tools/cache/reflector.go b/tools/cache/reflector.go index 66af9febc..8ed7fbd09 100644 --- a/tools/cache/reflector.go +++ b/tools/cache/reflector.go @@ -21,6 +21,7 @@ import ( "errors" "fmt" "io" + "math" "math/rand" "reflect" "strings" @@ -50,9 +51,21 @@ import ( const defaultExpectedTypeName = "" -// We try to spread the load on apiserver by setting timeouts for -// watch requests - it is random in [minWatchTimeout, 2*minWatchTimeout]. -var defaultMinWatchTimeout = 5 * time.Minute +var ( + // We try to spread the load on apiserver by setting timeouts for + // watch requests - it is random in [minWatchTimeout, 2*minWatchTimeout]. + defaultMinWatchTimeout = 5 * time.Minute + defaultMaxWatchTimeout = 2 * defaultMinWatchTimeout + // We used to make the call every 1sec (1 QPS), the goal here is to achieve ~98% traffic reduction when + // API server is not healthy. With these parameters, backoff will stop at [30,60) sec interval which is + // 0.22 QPS. + defaultBackoffInit = 800 * time.Millisecond + defaultBackoffMax = 30 * time.Second + // If we don't backoff for 2min, assume API server is healthy and we reset the backoff. + defaultBackoffReset = 2 * time.Minute + defaultBackoffFactor = 2.0 + defaultBackoffJitter = 1.0 +) // ReflectorStore is the subset of cache.Store that the reflector uses type ReflectorStore interface { @@ -110,11 +123,14 @@ type Reflector struct { store ReflectorStore // listerWatcher is used to perform lists and watches. listerWatcher ListerWatcherWithContext - // backoff manages backoff of ListWatch - backoffManager wait.BackoffManager - resyncPeriod time.Duration + // delay returns the next backoff interval for retries. + resyncPeriod time.Duration + delayHandler wait.DelayFunc // minWatchTimeout defines the minimum timeout for watch requests. minWatchTimeout time.Duration + // maxWatchTimeout defines the maximum timeout for watch requests. + // Actual timeout is random in [minWatchTimeout, maxWatchTimeout]. + maxWatchTimeout time.Duration // clock allows tests to manipulate time clock clock.Clock // paginatedResult defines whether pagination should be forced for list calls. @@ -259,6 +275,12 @@ type ReflectorOptions struct { // Clock allows tests to control time. If unset defaults to clock.RealClock{} Clock clock.Clock + + // Backoff is an optional custom backoff configuration. + // If set, it will be used instead of the default exponential backoff. + // DelayWithReset(clock, resetDuration) will be called on it to create the delay function. + // TODO(#136943): Expose this configuration through SharedInformerFactory. + Backoff *wait.Backoff } // NewReflectorWithOptions creates a new Reflector object which will keep the @@ -276,21 +298,42 @@ func NewReflectorWithOptions(lw ListerWatcher, expectedType interface{}, store R if reflectorClock == nil { reflectorClock = clock.RealClock{} } + minWatchTimeout := defaultMinWatchTimeout + maxWatchTimeout := defaultMaxWatchTimeout if options.MinWatchTimeout > defaultMinWatchTimeout { minWatchTimeout = options.MinWatchTimeout + maxWatchTimeout = 2 * minWatchTimeout } + if maxWatchTimeout < minWatchTimeout { + klog.TODO().V(3).Info( + "maxWatchTimeout was less than minWatchTimeout, overriding to minWatchTimeout. Watch timeout randomization is disabled.", + "minWatchTimeout", minWatchTimeout, + "maxWatchTimeout", maxWatchTimeout, + ) + maxWatchTimeout = minWatchTimeout + } + + backoff := options.Backoff + if backoff == nil { + backoff = &wait.Backoff{ + Duration: defaultBackoffInit, + Cap: defaultBackoffMax, + Steps: int(math.Ceil(float64(defaultBackoffMax) / float64(defaultBackoffInit))), + Factor: defaultBackoffFactor, + Jitter: defaultBackoffJitter, + } + } + r := &Reflector{ - name: options.Name, - resyncPeriod: options.ResyncPeriod, - minWatchTimeout: minWatchTimeout, - typeDescription: options.TypeDescription, - listerWatcher: ToListerWatcherWithContext(lw), - store: store, - // We used to make the call every 1sec (1 QPS), the goal here is to achieve ~98% traffic reduction when - // API server is not healthy. With these parameters, backoff will stop at [30,60) sec interval which is - // 0.22 QPS. If we don't backoff for 2min, assume API server is healthy and we reset the backoff. - backoffManager: wait.NewExponentialBackoffManager(800*time.Millisecond, 30*time.Second, 2*time.Minute, 2.0, 1.0, reflectorClock), + name: options.Name, + resyncPeriod: options.ResyncPeriod, + minWatchTimeout: minWatchTimeout, + maxWatchTimeout: maxWatchTimeout, + typeDescription: options.TypeDescription, + listerWatcher: ToListerWatcherWithContext(lw), + store: store, + delayHandler: backoff.DelayWithReset(reflectorClock, defaultBackoffReset), clock: reflectorClock, watchErrorHandler: WatchErrorHandlerWithContext(DefaultWatchErrorHandler), expectedType: reflect.TypeOf(expectedType), @@ -380,11 +423,16 @@ func (r *Reflector) Run(stopCh <-chan struct{}) { func (r *Reflector) RunWithContext(ctx context.Context) { logger := klog.FromContext(ctx) logger.V(3).Info("Starting reflector", "type", r.typeDescription, "resyncPeriod", r.resyncPeriod, "reflector", r.name) - wait.BackoffUntil(func() { + // Until runs the loop immediately (immediate=true) and resets the backoff timer after each + // successful iteration (sliding=true). See backoff constants at top of file for generalized QPS targets (~0.22 QPS). + if err := r.delayHandler.Until(ctx, true, true, func(ctx context.Context) (bool, error) { if err := r.ListAndWatchWithContext(ctx); err != nil { r.watchErrorHandler(ctx, r, err) } - }, r.backoffManager, true, ctx.Done()) + return false, nil + }); err != nil { + logger.Error(err, "Reflector stopped with error", "type", r.typeDescription, "reflector", r.name) + } logger.V(3).Info("Stopping reflector", "type", r.typeDescription, "resyncPeriod", r.resyncPeriod, "reflector", r.name) } @@ -539,7 +587,7 @@ func (r *Reflector) watch(ctx context.Context, w watch.Interface, resyncerrc cha // if w is already initialized, it must be past any synthetic non-rv-ordered added events propagateRVFromStart := true if w == nil { - timeoutSeconds := int64(r.minWatchTimeout.Seconds() * (rand.Float64() + 1.0)) + timeoutSeconds := int64(r.minWatchTimeout.Seconds() + rand.Float64()*(r.maxWatchTimeout.Seconds()-r.minWatchTimeout.Seconds())) options := metav1.ListOptions{ ResourceVersion: r.LastSyncResourceVersion(), // We want to avoid situations of hanging watchers. Stop any watchers that do not @@ -563,7 +611,7 @@ func (r *Reflector) watch(ctx context.Context, w watch.Interface, resyncerrc cha select { case <-stopCh: return nil - case <-r.backoffManager.Backoff().C(): + case <-r.clock.After(r.delayHandler()): continue } } @@ -608,7 +656,7 @@ func (r *Reflector) watch(ctx context.Context, w watch.Interface, resyncerrc cha select { case <-stopCh: return nil - case <-r.backoffManager.Backoff().C(): + case <-r.clock.After(r.delayHandler()): continue } case apierrors.IsInternalError(err) && retry.ShouldRetry(): @@ -762,7 +810,7 @@ func (r *Reflector) watchList(ctx context.Context) (watch.Interface, error) { isErrorRetriableWithSideEffectsFn := func(err error) bool { if canRetry := isWatchErrorRetriable(err); canRetry { logger.V(2).Info("watch-list failed - backing off", "reflector", r.name, "type", r.typeDescription, "err", err) - <-r.backoffManager.Backoff().C() + <-r.clock.After(r.delayHandler()) return true } if isExpiredError(err) || isTooLargeResourceVersionError(err) { @@ -798,7 +846,7 @@ func (r *Reflector) watchList(ctx context.Context) (watch.Interface, error) { // TODO(#115478): large "list", slow clients, slow network, p&f // might slow down streaming and eventually fail. // maybe in such a case we should retry with an increased timeout? - timeoutSeconds := int64(r.minWatchTimeout.Seconds() * (rand.Float64() + 1.0)) + timeoutSeconds := int64(r.minWatchTimeout.Seconds() + rand.Float64()*(r.maxWatchTimeout.Seconds()-r.minWatchTimeout.Seconds())) options := metav1.ListOptions{ ResourceVersion: lastKnownRV, AllowWatchBookmarks: true, diff --git a/tools/cache/reflector_test.go b/tools/cache/reflector_test.go index 1d1400feb..337c608fb 100644 --- a/tools/cache/reflector_test.go +++ b/tools/cache/reflector_test.go @@ -745,7 +745,14 @@ func TestReflectorListAndWatchInitConnBackoff(t *testing.T) { ctx, cancel := context.WithCancelCause(ctx) connFails := test.numConnFails fakeClock := testingclock.NewFakeClock(time.Unix(0, 0)) - bm := wait.NewExponentialBackoffManager(time.Millisecond, maxBackoff, 100*time.Millisecond, 2.0, 1.0, fakeClock) + backoff := wait.Backoff{ + Duration: time.Millisecond, + Cap: maxBackoff, + Steps: 1000, // large number to not run out + Factor: 2.0, + Jitter: 1.0, + } + delayFn := backoff.DelayWithReset(fakeClock, 100*time.Millisecond) done := make(chan struct{}) defer close(done) go func() { @@ -784,7 +791,7 @@ func TestReflectorListAndWatchInitConnBackoff(t *testing.T) { name: "test-reflector", listerWatcher: lw, store: NewFIFO(MetaNamespaceKeyFunc), - backoffManager: bm, + delayHandler: delayFn, clock: fakeClock, watchErrorHandler: WatchErrorHandlerWithContext(DefaultWatchErrorHandler), } @@ -804,28 +811,103 @@ func TestReflectorListAndWatchInitConnBackoff(t *testing.T) { } } -type fakeBackoff struct { - clock clock.Clock +func TestNewReflectorWithCustomBackoff(t *testing.T) { + testCases := []struct { + name string + backoff *wait.Backoff + numConnFails int + expLowerBound time.Duration + expUpperBound time.Duration + }{ + { + // Default backoff uses jitter so timing is non-deterministic + // Just verify it completes without error + name: "default backoff", + backoff: nil, + numConnFails: 2, + expLowerBound: 0, + expUpperBound: 10 * time.Second, + }, + { + // Custom backoff: 10ms initial, 2x factor, no jitter, 100ms cap + // After 5 failures: 10 + 20 + 40 + 80 + 100 = 250ms + name: "custom backoff", + backoff: &wait.Backoff{ + Duration: 10 * time.Millisecond, + Factor: 2.0, + Jitter: 0, + Steps: 100, + Cap: 100 * time.Millisecond, + }, + numConnFails: 5, + expLowerBound: 250 * time.Millisecond, + expUpperBound: 300 * time.Millisecond, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancelCause(ctx) + connFails := tc.numConnFails + + lw := &ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "1"}}, nil + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + if connFails > 0 { + connFails-- + return nil, syscall.ECONNREFUSED + } + cancel(errors.New("done")) + return watch.NewFake(), nil + }, + } + + opts := ReflectorOptions{ + Backoff: tc.backoff, + } + r := NewReflectorWithOptions(lw, &v1.Pod{}, NewStore(MetaNamespaceKeyFunc), opts) + + start := time.Now() + err := r.ListAndWatchWithContext(ctx) + elapsed := time.Since(start) + + if err != nil { + t.Errorf("unexpected error %v", err) + } + if elapsed < tc.expLowerBound { + t.Errorf("expected lower bound %v, got %v", tc.expLowerBound, elapsed) + } + if elapsed > tc.expUpperBound { + t.Errorf("expected upper bound %v, got %v", tc.expUpperBound, elapsed) + } + }) + } +} + +type fakeDelayFunc struct { calls int } -func (f *fakeBackoff) Backoff() clock.Timer { +func (f *fakeDelayFunc) delayFunc() time.Duration { f.calls++ - return f.clock.NewTimer(time.Duration(0)) + return 0 } func TestBackoffOnTooManyRequests(t *testing.T) { _, ctx := ktesting.NewTestContext(t) err := apierrors.NewTooManyRequests("too many requests", 1) clock := &clock.RealClock{} - bm := &fakeBackoff{clock: clock} + fd := &fakeDelayFunc{} lw := &ListWatch{ ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "1"}}, nil }, WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - switch bm.calls { + switch fd.calls { case 0: return nil, err case 1: @@ -845,7 +927,7 @@ func TestBackoffOnTooManyRequests(t *testing.T) { name: "test-reflector", listerWatcher: lw, store: NewFIFO(MetaNamespaceKeyFunc), - backoffManager: bm, + delayHandler: fd.delayFunc, clock: clock, watchErrorHandler: WatchErrorHandlerWithContext(DefaultWatchErrorHandler), } @@ -855,15 +937,15 @@ func TestBackoffOnTooManyRequests(t *testing.T) { t.Fatal(err) } close(stopCh) - if bm.calls != 2 { - t.Errorf("unexpected watch backoff calls: %d", bm.calls) + if fd.calls != 2 { + t.Errorf("unexpected watch backoff calls: %d", fd.calls) } } func TestNoRelistOnTooManyRequests(t *testing.T) { err := apierrors.NewTooManyRequests("too many requests", 1) clock := &clock.RealClock{} - bm := &fakeBackoff{clock: clock} + fd := &fakeDelayFunc{} listCalls, watchCalls := 0, 0 lw := &ListWatch{ @@ -886,7 +968,7 @@ func TestNoRelistOnTooManyRequests(t *testing.T) { name: "test-reflector", listerWatcher: lw, store: NewFIFO(MetaNamespaceKeyFunc), - backoffManager: bm, + delayHandler: fd.delayFunc, clock: clock, watchErrorHandler: WatchErrorHandlerWithContext(DefaultWatchErrorHandler), } @@ -933,7 +1015,7 @@ func TestRetryInternalError(t *testing.T) { for _, tc := range testCases { err := apierrors.NewInternalError(fmt.Errorf("etcdserver: no leader")) fakeClock := testingclock.NewFakeClock(time.Now()) - bm := &fakeBackoff{clock: fakeClock} + fd := &fakeDelayFunc{} counter := 0 @@ -961,7 +1043,7 @@ func TestRetryInternalError(t *testing.T) { name: "test-reflector", listerWatcher: lw, store: NewFIFO(MetaNamespaceKeyFunc), - backoffManager: bm, + delayHandler: fd.delayFunc, clock: fakeClock, watchErrorHandler: WatchErrorHandlerWithContext(DefaultWatchErrorHandler), }