Improve backoff policy in reflector.

Before, we've used two separate backoff managers for List and Watch
calls, now they share single backoff manager.

Kubernetes-commit: 337728b02559dec8a613fdef174f732da9cae310
This commit is contained in:
Marcel Zieba 2023-05-19 14:28:31 +02:00 committed by Kubernetes Publisher
parent c670796be1
commit bae10246dd
2 changed files with 26 additions and 29 deletions

View File

@ -70,9 +70,7 @@ type Reflector struct {
listerWatcher ListerWatcher listerWatcher ListerWatcher
// backoff manages backoff of ListWatch // backoff manages backoff of ListWatch
backoffManager wait.BackoffManager backoffManager wait.BackoffManager
// initConnBackoffManager manages backoff the initial connection with the Watch call of ListAndWatch. resyncPeriod time.Duration
initConnBackoffManager wait.BackoffManager
resyncPeriod time.Duration
// clock allows tests to manipulate time // clock allows tests to manipulate time
clock clock.Clock clock clock.Clock
// paginatedResult defines whether pagination should be forced for list calls. // paginatedResult defines whether pagination should be forced for list calls.
@ -221,11 +219,10 @@ func NewReflectorWithOptions(lw ListerWatcher, expectedType interface{}, store S
// We used to make the call every 1sec (1 QPS), the goal here is to achieve ~98% traffic reduction when // We used to make the call every 1sec (1 QPS), the goal here is to achieve ~98% traffic reduction when
// API server is not healthy. With these parameters, backoff will stop at [30,60) sec interval which is // API server is not healthy. With these parameters, backoff will stop at [30,60) sec interval which is
// 0.22 QPS. If we don't backoff for 2min, assume API server is healthy and we reset the backoff. // 0.22 QPS. If we don't backoff for 2min, assume API server is healthy and we reset the backoff.
backoffManager: wait.NewExponentialBackoffManager(800*time.Millisecond, 30*time.Second, 2*time.Minute, 2.0, 1.0, reflectorClock), backoffManager: wait.NewExponentialBackoffManager(800*time.Millisecond, 30*time.Second, 2*time.Minute, 2.0, 1.0, reflectorClock),
initConnBackoffManager: wait.NewExponentialBackoffManager(800*time.Millisecond, 30*time.Second, 2*time.Minute, 2.0, 1.0, reflectorClock), clock: reflectorClock,
clock: reflectorClock, watchErrorHandler: WatchErrorHandler(DefaultWatchErrorHandler),
watchErrorHandler: WatchErrorHandler(DefaultWatchErrorHandler), expectedType: reflect.TypeOf(expectedType),
expectedType: reflect.TypeOf(expectedType),
} }
if r.name == "" { if r.name == "" {
@ -425,7 +422,7 @@ func (r *Reflector) watch(w watch.Interface, stopCh <-chan struct{}, resyncerrc
select { select {
case <-stopCh: case <-stopCh:
return nil return nil
case <-r.initConnBackoffManager.Backoff().C(): case <-r.backoffManager.Backoff().C():
continue continue
} }
} }
@ -451,7 +448,7 @@ func (r *Reflector) watch(w watch.Interface, stopCh <-chan struct{}, resyncerrc
select { select {
case <-stopCh: case <-stopCh:
return nil return nil
case <-r.initConnBackoffManager.Backoff().C(): case <-r.backoffManager.Backoff().C():
continue continue
} }
case apierrors.IsInternalError(err) && retry.ShouldRetry(): case apierrors.IsInternalError(err) && retry.ShouldRetry():
@ -604,7 +601,7 @@ func (r *Reflector) watchList(stopCh <-chan struct{}) (watch.Interface, error) {
isErrorRetriableWithSideEffectsFn := func(err error) bool { isErrorRetriableWithSideEffectsFn := func(err error) bool {
if canRetry := isWatchErrorRetriable(err); canRetry { if canRetry := isWatchErrorRetriable(err); canRetry {
klog.V(2).Infof("%s: watch-list of %v returned %v - backing off", r.name, r.typeDescription, err) klog.V(2).Infof("%s: watch-list of %v returned %v - backing off", r.name, r.typeDescription, err)
<-r.initConnBackoffManager.Backoff().C() <-r.backoffManager.Backoff().C()
return true return true
} }
if isExpiredError(err) || isTooLargeResourceVersionError(err) { if isExpiredError(err) || isTooLargeResourceVersionError(err) {

View File

@ -411,12 +411,12 @@ func TestReflectorListAndWatchInitConnBackoff(t *testing.T) {
}, },
} }
r := &Reflector{ r := &Reflector{
name: "test-reflector", name: "test-reflector",
listerWatcher: lw, listerWatcher: lw,
store: NewFIFO(MetaNamespaceKeyFunc), store: NewFIFO(MetaNamespaceKeyFunc),
initConnBackoffManager: bm, backoffManager: bm,
clock: fakeClock, clock: fakeClock,
watchErrorHandler: WatchErrorHandler(DefaultWatchErrorHandler), watchErrorHandler: WatchErrorHandler(DefaultWatchErrorHandler),
} }
start := fakeClock.Now() start := fakeClock.Now()
err := r.ListAndWatch(stopCh) err := r.ListAndWatch(stopCh)
@ -471,12 +471,12 @@ func TestBackoffOnTooManyRequests(t *testing.T) {
} }
r := &Reflector{ r := &Reflector{
name: "test-reflector", name: "test-reflector",
listerWatcher: lw, listerWatcher: lw,
store: NewFIFO(MetaNamespaceKeyFunc), store: NewFIFO(MetaNamespaceKeyFunc),
initConnBackoffManager: bm, backoffManager: bm,
clock: clock, clock: clock,
watchErrorHandler: WatchErrorHandler(DefaultWatchErrorHandler), watchErrorHandler: WatchErrorHandler(DefaultWatchErrorHandler),
} }
stopCh := make(chan struct{}) stopCh := make(chan struct{})
@ -540,12 +540,12 @@ func TestRetryInternalError(t *testing.T) {
} }
r := &Reflector{ r := &Reflector{
name: "test-reflector", name: "test-reflector",
listerWatcher: lw, listerWatcher: lw,
store: NewFIFO(MetaNamespaceKeyFunc), store: NewFIFO(MetaNamespaceKeyFunc),
initConnBackoffManager: bm, backoffManager: bm,
clock: fakeClock, clock: fakeClock,
watchErrorHandler: WatchErrorHandler(DefaultWatchErrorHandler), watchErrorHandler: WatchErrorHandler(DefaultWatchErrorHandler),
} }
r.MaxInternalErrorRetryDuration = tc.maxInternalDuration r.MaxInternalErrorRetryDuration = tc.maxInternalDuration