From 4d01b584c3e1769cf94ae9a5b3af82643d1528cd Mon Sep 17 00:00:00 2001 From: Kevin Delgado Date: Tue, 25 Aug 2020 19:15:21 +0000 Subject: [PATCH] Add exp backoff for connection refused errors Currently when ListAndWatch() receives a connection refused error, it is assumed to be due to the apiserver being transiently unresponsive. In situations where a controller is running outside the k8s cluster it's controlling, it is more common for the controller to lose connection permanently to the apiserver and needs to exponentially backoff its retry rather than continously spamming logs with Watch attempts that will never succeed. Kubernetes-commit: 1ff789f2bb9bf7fbb3df35977bc249c0dd019d31 --- tools/cache/reflector.go | 15 ++++--- tools/cache/reflector_test.go | 80 ++++++++++++++++++++++++++++++++++- 2 files changed, 88 insertions(+), 7 deletions(-) diff --git a/tools/cache/reflector.go b/tools/cache/reflector.go index e995abe2..9c9a758f 100644 --- a/tools/cache/reflector.go +++ b/tools/cache/reflector.go @@ -69,6 +69,8 @@ type Reflector struct { // backoff manages backoff of ListWatch backoffManager wait.BackoffManager + // initConnBackoffManager manages backoff the initial connection with the Watch calll of ListAndWatch. + initConnBackoffManager wait.BackoffManager resyncPeriod time.Duration // ShouldResync is invoked periodically and whenever it returns `true` the Store's Resync operation is invoked @@ -166,10 +168,11 @@ func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, // We used to make the call every 1sec (1 QPS), the goal here is to achieve ~98% traffic reduction when // API server is not healthy. With these parameters, backoff will stop at [30,60) sec interval which is // 0.22 QPS. If we don't backoff for 2min, assume API server is healthy and we reset the backoff. - backoffManager: wait.NewExponentialBackoffManager(800*time.Millisecond, 30*time.Second, 2*time.Minute, 2.0, 1.0, realClock), - resyncPeriod: resyncPeriod, - clock: realClock, - watchErrorHandler: WatchErrorHandler(DefaultWatchErrorHandler), + backoffManager: wait.NewExponentialBackoffManager(800*time.Millisecond, 30*time.Second, 2*time.Minute, 2.0, 1.0, realClock), + initConnBackoffManager: wait.NewExponentialBackoffManager(800*time.Millisecond, 30*time.Second, 2*time.Minute, 2.0, 1.0, realClock), + resyncPeriod: resyncPeriod, + clock: realClock, + watchErrorHandler: WatchErrorHandler(DefaultWatchErrorHandler), } r.setExpectedType(expectedType) return r @@ -404,9 +407,9 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { // If this is "connection refused" error, it means that most likely apiserver is not responsive. // It doesn't make sense to re-list all objects because most likely we will be able to restart // watch where we ended. - // If that's the case wait and resend watch request. + // If that's the case begin exponentially backing off and resend watch request. if utilnet.IsConnectionRefused(err) { - time.Sleep(time.Second) + <-r.initConnBackoffManager.Backoff().C() continue } return err diff --git a/tools/cache/reflector_test.go b/tools/cache/reflector_test.go index 5394e668..9260d5a6 100644 --- a/tools/cache/reflector_test.go +++ b/tools/cache/reflector_test.go @@ -22,15 +22,17 @@ import ( "math/rand" "reflect" "strconv" + "syscall" "testing" "time" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/clock" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" ) @@ -358,6 +360,82 @@ func TestReflectorListAndWatchWithErrors(t *testing.T) { } } +func TestReflectorListAndWatchInitConnBackoff(t *testing.T) { + maxBackoff := 50 * time.Millisecond + table := []struct { + numConnFails int + expLowerBound time.Duration + expUpperBound time.Duration + }{ + {5, 32 * time.Millisecond, 64 * time.Millisecond}, // case where maxBackoff is not hit, time should grow exponentially + {40, 35 * 2 * maxBackoff, 40 * 2 * maxBackoff}, // case where maxBoff is hit, backoff time should flatten + + } + for _, test := range table { + t.Run(fmt.Sprintf("%d connection failures takes at least %d ms", test.numConnFails, 1< maxBackoff*2 { + step = maxBackoff * 2 + } + fakeClock.Step(step) + i++ + } + time.Sleep(100 * time.Microsecond) + } + }() + lw := &testLW{ + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + if connFails > 0 { + connFails-- + return nil, syscall.ECONNREFUSED + } + close(stopCh) + return watch.NewFake(), nil + }, + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "1"}}, nil + }, + } + r := &Reflector{ + name: "test-reflector", + listerWatcher: lw, + store: NewFIFO(MetaNamespaceKeyFunc), + initConnBackoffManager: bm, + clock: fakeClock, + watchErrorHandler: WatchErrorHandler(DefaultWatchErrorHandler), + } + start := fakeClock.Now() + err := r.ListAndWatch(stopCh) + elapsed := fakeClock.Since(start) + if err != nil { + t.Errorf("unexpected error %v", err) + } + if elapsed < (test.expLowerBound) { + t.Errorf("expected lower bound of ListAndWatch: %v, got %v", test.expLowerBound, elapsed) + } + if elapsed > (test.expUpperBound) { + t.Errorf("expected upper bound of ListAndWatch: %v, got %v", test.expUpperBound, elapsed) + } + }) + } +} + func TestReflectorResync(t *testing.T) { iteration := 0 stopCh := make(chan struct{})