diff --git a/tools/cache/reflector.go b/tools/cache/reflector.go index cd4e2f5b..9cd476be 100644 --- a/tools/cache/reflector.go +++ b/tools/cache/reflector.go @@ -71,6 +71,8 @@ type Reflector struct { backoffManager wait.BackoffManager // initConnBackoffManager manages backoff the initial connection with the Watch call of ListAndWatch. initConnBackoffManager wait.BackoffManager + // MaxInternalErrorRetryDuration defines how long we should retry internal errors returned by watch. + MaxInternalErrorRetryDuration time.Duration resyncPeriod time.Duration // ShouldResync is invoked periodically and whenever it returns `true` the Store's Resync operation is invoked @@ -287,6 +289,7 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { } }() + retry := NewRetryWithDeadline(r.MaxInternalErrorRetryDuration, time.Minute, apierrors.IsInternalError, r.clock) for { // give the stopCh a chance to stop the loop, even in case of continue statements further down on errors select { @@ -323,7 +326,9 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { return err } - if err := watchHandler(start, w, r.store, r.expectedType, r.expectedGVK, r.name, r.expectedTypeName, r.setLastSyncResourceVersion, r.clock, resyncerrc, stopCh); err != nil { + err = watchHandler(start, w, r.store, r.expectedType, r.expectedGVK, r.name, r.expectedTypeName, r.setLastSyncResourceVersion, r.clock, resyncerrc, stopCh) + retry.After(err) + if err != nil { if err != errorStopRequested { switch { case isExpiredError(err): @@ -335,6 +340,9 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { klog.V(2).Infof("%s: watch of %v returned 429 - backing off", r.name, r.expectedTypeName) <-r.initConnBackoffManager.Backoff().C() continue + case apierrors.IsInternalError(err) && retry.ShouldRetry(): + klog.V(2).Infof("%s: retrying watch of %v internal error: %v", r.name, r.expectedTypeName, err) + continue default: klog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedTypeName, err) } diff --git a/tools/cache/reflector_test.go b/tools/cache/reflector_test.go index cc79b2b7..eeb4affb 100644 --- a/tools/cache/reflector_test.go +++ b/tools/cache/reflector_test.go @@ -487,6 +487,79 @@ func TestBackoffOnTooManyRequests(t *testing.T) { } } +func TestRetryInternalError(t *testing.T) { + testCases := []struct { + name string + maxInternalDuration time.Duration + rewindTime int + wantRetries int + }{ + { + name: "retries off", + maxInternalDuration: time.Duration(0), + wantRetries: 0, + }, + { + name: "retries on, all calls fail", + maxInternalDuration: time.Second * 30, + wantRetries: 31, + }, + { + name: "retries on, one call successful", + maxInternalDuration: time.Second * 30, + rewindTime: 10, + wantRetries: 40, + }, + } + + for _, tc := range testCases { + err := apierrors.NewInternalError(fmt.Errorf("etcdserver: no leader")) + fakeClock := testingclock.NewFakeClock(time.Now()) + bm := &fakeBackoff{clock: fakeClock} + + counter := 0 + + lw := &testLW{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "1"}}, nil + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + counter = counter + 1 + t.Logf("Counter: %v", counter) + if counter == tc.rewindTime { + t.Logf("Rewinding") + fakeClock.Step(time.Minute) + } + + fakeClock.Step(time.Second) + w := watch.NewFakeWithChanSize(1, false) + status := err.Status() + w.Error(&status) + return w, nil + }, + } + + r := &Reflector{ + name: "test-reflector", + listerWatcher: lw, + store: NewFIFO(MetaNamespaceKeyFunc), + initConnBackoffManager: bm, + clock: fakeClock, + watchErrorHandler: WatchErrorHandler(DefaultWatchErrorHandler), + } + + r.MaxInternalErrorRetryDuration = tc.maxInternalDuration + + stopCh := make(chan struct{}) + r.ListAndWatch(stopCh) + close(stopCh) + + if counter-1 != tc.wantRetries { + t.Errorf("%v unexpected number of retries: %d", tc, counter-1) + } + } +} + func TestReflectorResync(t *testing.T) { iteration := 0 stopCh := make(chan struct{}) diff --git a/tools/cache/retry_with_deadline.go b/tools/cache/retry_with_deadline.go new file mode 100644 index 00000000..8201fb15 --- /dev/null +++ b/tools/cache/retry_with_deadline.go @@ -0,0 +1,78 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cache + +import ( + "k8s.io/utils/clock" + "time" +) + +type RetryWithDeadline interface { + After(error) + ShouldRetry() bool +} + +type retryWithDeadlineImpl struct { + firstErrorTime time.Time + lastErrorTime time.Time + maxRetryDuration time.Duration + minResetPeriod time.Duration + isRetryable func(error) bool + clock clock.Clock +} + +func NewRetryWithDeadline(maxRetryDuration, minResetPeriod time.Duration, isRetryable func(error) bool, clock clock.Clock) RetryWithDeadline { + return &retryWithDeadlineImpl{ + firstErrorTime: time.Time{}, + lastErrorTime: time.Time{}, + maxRetryDuration: maxRetryDuration, + minResetPeriod: minResetPeriod, + isRetryable: isRetryable, + clock: clock, + } +} + +func (r *retryWithDeadlineImpl) reset() { + r.firstErrorTime = time.Time{} + r.lastErrorTime = time.Time{} +} + +func (r *retryWithDeadlineImpl) After(err error) { + if r.isRetryable(err) { + if r.clock.Now().Sub(r.lastErrorTime) >= r.minResetPeriod { + r.reset() + } + + if r.firstErrorTime.IsZero() { + r.firstErrorTime = r.clock.Now() + } + r.lastErrorTime = r.clock.Now() + } +} + +func (r *retryWithDeadlineImpl) ShouldRetry() bool { + if r.maxRetryDuration <= time.Duration(0) { + return false + } + + if r.clock.Now().Sub(r.firstErrorTime) <= r.maxRetryDuration { + return true + } + + r.reset() + return false +} diff --git a/tools/cache/retry_with_deadline_test.go b/tools/cache/retry_with_deadline_test.go new file mode 100644 index 00000000..337f9227 --- /dev/null +++ b/tools/cache/retry_with_deadline_test.go @@ -0,0 +1,143 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cache + +import ( + "fmt" + "testing" + "time" + + apierrors "k8s.io/apimachinery/pkg/api/errors" + testingclock "k8s.io/utils/clock/testing" +) + +type retryScenarioStep struct { + clockStep time.Duration + err error + wantRetry bool +} + +func TestRetryWithDeadline(t *testing.T) { + internalError := apierrors.NewInternalError(fmt.Errorf("etcdserver: no leader")) + otherError := fmt.Errorf("some other error") + + testCases := []struct { + name string + duration time.Duration + reset time.Duration + isRetryable func(error) bool + scenario []retryScenarioStep + }{ + { + name: "Never retry when duration is zero", + duration: time.Duration(0), + reset: time.Second * 30, + isRetryable: func(err error) bool { return false }, + scenario: []retryScenarioStep{ + { + clockStep: time.Second * 1, + err: nil, + wantRetry: false, + }, + { + clockStep: time.Second * 0, + err: internalError, + wantRetry: false, + }, + { + clockStep: time.Second * 1, + err: otherError, + wantRetry: false, + }, + }, + }, + { + name: "Retry when internal error happens only within duration", + duration: time.Second * 1, + reset: time.Second * 30, + isRetryable: apierrors.IsInternalError, + scenario: []retryScenarioStep{ + { + clockStep: time.Second * 1, + err: internalError, + wantRetry: true, + }, + { + clockStep: time.Second * 1, + err: internalError, + wantRetry: true, + }, + { + clockStep: time.Second * 1, + err: internalError, + wantRetry: false, + }, + }, + }, + { + name: "Don't retry when other error happens", + duration: time.Second * 1, + reset: time.Second * 30, + isRetryable: apierrors.IsInternalError, + scenario: []retryScenarioStep{ + { + clockStep: time.Second * 1, + err: otherError, + wantRetry: false, + }, + }, + }, + { + name: "Ignore other errors for retries", + duration: time.Second * 1, + reset: time.Second * 30, + isRetryable: apierrors.IsInternalError, + scenario: []retryScenarioStep{ + { + clockStep: time.Second * 1, + err: internalError, + wantRetry: true, + }, + { + clockStep: time.Second * 0, + err: otherError, + wantRetry: true, + }, + { + clockStep: time.Second * 1, + err: internalError, + wantRetry: true, + }, + }, + }, + } + + for _, tc := range testCases { + fakeClock := testingclock.NewFakeClock(time.Now()) + retry := NewRetryWithDeadline(tc.duration, tc.reset, tc.isRetryable, fakeClock) + + for i, step := range tc.scenario { + fakeClock.Step(step.clockStep) + retry.After(step.err) + result := retry.ShouldRetry() + if result != step.wantRetry { + t.Errorf("%v unexpected retry, step %d, result %v want %v", tc, i, result, step.wantRetry) + break + } + } + } +}