From a9daa46d470e87ed328688bebd9314cc5ac0fc4a Mon Sep 17 00:00:00 2001 From: wojtekt Date: Sat, 13 Mar 2021 12:29:44 +0100 Subject: [PATCH] Handle 429 errors from watch requests in reflector library. --- .../k8s.io/client-go/tools/cache/reflector.go | 7 ++- .../client-go/tools/cache/reflector_test.go | 53 +++++++++++++++++++ 2 files changed, 59 insertions(+), 1 deletion(-) diff --git a/staging/src/k8s.io/client-go/tools/cache/reflector.go b/staging/src/k8s.io/client-go/tools/cache/reflector.go index 360d7304b7f..c732c782207 100644 --- a/staging/src/k8s.io/client-go/tools/cache/reflector.go +++ b/staging/src/k8s.io/client-go/tools/cache/reflector.go @@ -417,7 +417,8 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { // It doesn't make sense to re-list all objects because most likely we will be able to restart // watch where we ended. // If that's the case begin exponentially backing off and resend watch request. - if utilnet.IsConnectionRefused(err) { + // Do the same for "429" errors. + if utilnet.IsConnectionRefused(err) || apierrors.IsTooManyRequests(err) { <-r.initConnBackoffManager.Backoff().C() continue } @@ -432,6 +433,10 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { // has a semantic that it returns data at least as fresh as provided RV. // So first try to LIST with setting RV to resource version of last observed object. klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err) + case apierrors.IsTooManyRequests(err): + klog.V(2).Infof("%s: watch of %v returned 429 - backing off", r.name, r.expectedTypeName) + <-r.initConnBackoffManager.Backoff().C() + continue default: klog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedTypeName, err) } diff --git a/staging/src/k8s.io/client-go/tools/cache/reflector_test.go b/staging/src/k8s.io/client-go/tools/cache/reflector_test.go index 6a67e59cc74..fa3b638c0db 100644 --- a/staging/src/k8s.io/client-go/tools/cache/reflector_test.go +++ b/staging/src/k8s.io/client-go/tools/cache/reflector_test.go @@ -436,6 +436,59 @@ func TestReflectorListAndWatchInitConnBackoff(t *testing.T) { } } +type fakeBackoff struct { + clock clock.Clock + calls int +} + +func (f *fakeBackoff) Backoff() clock.Timer { + f.calls++ + return f.clock.NewTimer(time.Duration(0)) +} + +func TestBackoffOnTooManyRequests(t *testing.T) { + err := apierrors.NewTooManyRequests("too many requests", 1) + clock := &clock.RealClock{} + bm := &fakeBackoff{clock: clock} + + lw := &testLW{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "1"}}, nil + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + switch bm.calls { + case 0: + return nil, err + case 1: + w := watch.NewFakeWithChanSize(1, false) + status := err.Status() + w.Error(&status) + return w, nil + default: + w := watch.NewFake() + w.Stop() + return w, nil + } + }, + } + + r := &Reflector{ + name: "test-reflector", + listerWatcher: lw, + store: NewFIFO(MetaNamespaceKeyFunc), + initConnBackoffManager: bm, + clock: clock, + watchErrorHandler: WatchErrorHandler(DefaultWatchErrorHandler), + } + + stopCh := make(chan struct{}) + r.ListAndWatch(stopCh) + close(stopCh) + if bm.calls != 2 { + t.Errorf("unexpected watch backoff calls: %d", bm.calls) + } +} + func TestReflectorResync(t *testing.T) { iteration := 0 stopCh := make(chan struct{})