diff --git a/tools/cache/reflector.go b/tools/cache/reflector.go index e995abe2..9c9a758f 100644 --- a/tools/cache/reflector.go +++ b/tools/cache/reflector.go @@ -69,6 +69,8 @@ type Reflector struct { // backoff manages backoff of ListWatch backoffManager wait.BackoffManager + // initConnBackoffManager manages backoff the initial connection with the Watch calll of ListAndWatch. + initConnBackoffManager wait.BackoffManager resyncPeriod time.Duration // ShouldResync is invoked periodically and whenever it returns `true` the Store's Resync operation is invoked @@ -166,10 +168,11 @@ func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, // We used to make the call every 1sec (1 QPS), the goal here is to achieve ~98% traffic reduction when // API server is not healthy. With these parameters, backoff will stop at [30,60) sec interval which is // 0.22 QPS. If we don't backoff for 2min, assume API server is healthy and we reset the backoff. - backoffManager: wait.NewExponentialBackoffManager(800*time.Millisecond, 30*time.Second, 2*time.Minute, 2.0, 1.0, realClock), - resyncPeriod: resyncPeriod, - clock: realClock, - watchErrorHandler: WatchErrorHandler(DefaultWatchErrorHandler), + backoffManager: wait.NewExponentialBackoffManager(800*time.Millisecond, 30*time.Second, 2*time.Minute, 2.0, 1.0, realClock), + initConnBackoffManager: wait.NewExponentialBackoffManager(800*time.Millisecond, 30*time.Second, 2*time.Minute, 2.0, 1.0, realClock), + resyncPeriod: resyncPeriod, + clock: realClock, + watchErrorHandler: WatchErrorHandler(DefaultWatchErrorHandler), } r.setExpectedType(expectedType) return r @@ -404,9 +407,9 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { // If this is "connection refused" error, it means that most likely apiserver is not responsive. // It doesn't make sense to re-list all objects because most likely we will be able to restart // watch where we ended. - // If that's the case wait and resend watch request. + // If that's the case begin exponentially backing off and resend watch request. if utilnet.IsConnectionRefused(err) { - time.Sleep(time.Second) + <-r.initConnBackoffManager.Backoff().C() continue } return err diff --git a/tools/cache/reflector_test.go b/tools/cache/reflector_test.go index 5394e668..9260d5a6 100644 --- a/tools/cache/reflector_test.go +++ b/tools/cache/reflector_test.go @@ -22,15 +22,17 @@ import ( "math/rand" "reflect" "strconv" + "syscall" "testing" "time" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/clock" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" ) @@ -358,6 +360,82 @@ func TestReflectorListAndWatchWithErrors(t *testing.T) { } } +func TestReflectorListAndWatchInitConnBackoff(t *testing.T) { + maxBackoff := 50 * time.Millisecond + table := []struct { + numConnFails int + expLowerBound time.Duration + expUpperBound time.Duration + }{ + {5, 32 * time.Millisecond, 64 * time.Millisecond}, // case where maxBackoff is not hit, time should grow exponentially + {40, 35 * 2 * maxBackoff, 40 * 2 * maxBackoff}, // case where maxBoff is hit, backoff time should flatten + + } + for _, test := range table { + t.Run(fmt.Sprintf("%d connection failures takes at least %d ms", test.numConnFails, 1< maxBackoff*2 { + step = maxBackoff * 2 + } + fakeClock.Step(step) + i++ + } + time.Sleep(100 * time.Microsecond) + } + }() + lw := &testLW{ + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + if connFails > 0 { + connFails-- + return nil, syscall.ECONNREFUSED + } + close(stopCh) + return watch.NewFake(), nil + }, + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "1"}}, nil + }, + } + r := &Reflector{ + name: "test-reflector", + listerWatcher: lw, + store: NewFIFO(MetaNamespaceKeyFunc), + initConnBackoffManager: bm, + clock: fakeClock, + watchErrorHandler: WatchErrorHandler(DefaultWatchErrorHandler), + } + start := fakeClock.Now() + err := r.ListAndWatch(stopCh) + elapsed := fakeClock.Since(start) + if err != nil { + t.Errorf("unexpected error %v", err) + } + if elapsed < (test.expLowerBound) { + t.Errorf("expected lower bound of ListAndWatch: %v, got %v", test.expLowerBound, elapsed) + } + if elapsed > (test.expUpperBound) { + t.Errorf("expected upper bound of ListAndWatch: %v, got %v", test.expUpperBound, elapsed) + } + }) + } +} + func TestReflectorResync(t *testing.T) { iteration := 0 stopCh := make(chan struct{})