Handle 429 errors from watch requests in reflector library.

Kubernetes-commit: a9daa46d470e87ed328688bebd9314cc5ac0fc4a
This commit is contained in:
wojtekt
2021-03-13 12:29:44 +01:00
committed by Kubernetes Publisher
parent 196e360f9b
commit 03ebb2739b
2 changed files with 59 additions and 1 deletions

View File

@@ -436,6 +436,59 @@ func TestReflectorListAndWatchInitConnBackoff(t *testing.T) {
}
}
type fakeBackoff struct {
clock clock.Clock
calls int
}
func (f *fakeBackoff) Backoff() clock.Timer {
f.calls++
return f.clock.NewTimer(time.Duration(0))
}
func TestBackoffOnTooManyRequests(t *testing.T) {
err := apierrors.NewTooManyRequests("too many requests", 1)
clock := &clock.RealClock{}
bm := &fakeBackoff{clock: clock}
lw := &testLW{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "1"}}, nil
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
switch bm.calls {
case 0:
return nil, err
case 1:
w := watch.NewFakeWithChanSize(1, false)
status := err.Status()
w.Error(&status)
return w, nil
default:
w := watch.NewFake()
w.Stop()
return w, nil
}
},
}
r := &Reflector{
name: "test-reflector",
listerWatcher: lw,
store: NewFIFO(MetaNamespaceKeyFunc),
initConnBackoffManager: bm,
clock: clock,
watchErrorHandler: WatchErrorHandler(DefaultWatchErrorHandler),
}
stopCh := make(chan struct{})
r.ListAndWatch(stopCh)
close(stopCh)
if bm.calls != 2 {
t.Errorf("unexpected watch backoff calls: %d", bm.calls)
}
}
func TestReflectorResync(t *testing.T) {
iteration := 0
stopCh := make(chan struct{})