Merge pull request #100878 from wojtek-t/backoff_watches_on_429

Handle 429 errors from watch requests in reflector library.
This commit is contained in:
Kubernetes Prow Robot 2021-04-10 19:04:54 -07:00 committed by GitHub
commit 860dfe112e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 59 additions and 1 deletions

View File

@ -417,7 +417,8 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
// It doesn't make sense to re-list all objects because most likely we will be able to restart
// watch where we ended.
// If that's the case begin exponentially backing off and resend watch request.
if utilnet.IsConnectionRefused(err) {
// Do the same for "429" errors.
if utilnet.IsConnectionRefused(err) || apierrors.IsTooManyRequests(err) {
<-r.initConnBackoffManager.Backoff().C()
continue
}
@ -432,6 +433,10 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
// has a semantic that it returns data at least as fresh as provided RV.
// So first try to LIST with setting RV to resource version of last observed object.
klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err)
case apierrors.IsTooManyRequests(err):
klog.V(2).Infof("%s: watch of %v returned 429 - backing off", r.name, r.expectedTypeName)
<-r.initConnBackoffManager.Backoff().C()
continue
default:
klog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedTypeName, err)
}

View File

@ -436,6 +436,59 @@ func TestReflectorListAndWatchInitConnBackoff(t *testing.T) {
}
}
type fakeBackoff struct {
clock clock.Clock
calls int
}
func (f *fakeBackoff) Backoff() clock.Timer {
f.calls++
return f.clock.NewTimer(time.Duration(0))
}
func TestBackoffOnTooManyRequests(t *testing.T) {
err := apierrors.NewTooManyRequests("too many requests", 1)
clock := &clock.RealClock{}
bm := &fakeBackoff{clock: clock}
lw := &testLW{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "1"}}, nil
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
switch bm.calls {
case 0:
return nil, err
case 1:
w := watch.NewFakeWithChanSize(1, false)
status := err.Status()
w.Error(&status)
return w, nil
default:
w := watch.NewFake()
w.Stop()
return w, nil
}
},
}
r := &Reflector{
name: "test-reflector",
listerWatcher: lw,
store: NewFIFO(MetaNamespaceKeyFunc),
initConnBackoffManager: bm,
clock: clock,
watchErrorHandler: WatchErrorHandler(DefaultWatchErrorHandler),
}
stopCh := make(chan struct{})
r.ListAndWatch(stopCh)
close(stopCh)
if bm.calls != 2 {
t.Errorf("unexpected watch backoff calls: %d", bm.calls)
}
}
func TestReflectorResync(t *testing.T) {
iteration := 0
stopCh := make(chan struct{})