Implement ResilientWatchCacheInitialization

This commit is contained in:
Wojciech Tyczyński 2024-04-29 14:19:46 +02:00
parent 9fc0315ce8
commit a8ef6e9f01
6 changed files with 306 additions and 88 deletions

View File

@ -1235,6 +1235,8 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS
genericfeatures.RemainingItemCount: {Default: true, PreRelease: featuregate.GA, LockToDefault: true},
genericfeatures.ResilientWatchCacheInitialization: {Default: true, PreRelease: featuregate.Beta},
genericfeatures.SeparateCacheWatchRPC: {Default: true, PreRelease: featuregate.Beta},
genericfeatures.ServerSideApply: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.29

View File

@ -173,6 +173,13 @@ const (
// to a chunking list request.
RemainingItemCount featuregate.Feature = "RemainingItemCount"
// owner: @wojtek-t
// beta: v1.31
//
// Enables resilient watchcache initialization to avoid controlplane
// overload.
ResilientWatchCacheInitialization featuregate.Feature = "ResilientWatchCacheInitialization"
// owner: @serathius
// beta: v1.30
//
@ -353,6 +360,8 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS
RemainingItemCount: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.32
ResilientWatchCacheInitialization: {Default: true, PreRelease: featuregate.Beta},
RetryGenerateName: {Default: true, PreRelease: featuregate.Beta},
SeparateCacheWatchRPC: {Default: true, PreRelease: featuregate.Beta},

View File

@ -532,10 +532,19 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
return nil, err
}
readyGeneration, err := c.ready.waitAndReadGeneration(ctx)
var readyGeneration int
if utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
var ok bool
readyGeneration, ok = c.ready.checkAndReadGeneration()
if !ok {
return nil, errors.NewTooManyRequests("storage is (re)initializing", 1)
}
} else {
readyGeneration, err = c.ready.waitAndReadGeneration(ctx)
if err != nil {
return nil, errors.NewServiceUnavailable(err.Error())
}
}
// determine the namespace and name scope of the watch, first from the request, secondarily from the field selector
scope := namespacedName{}
@ -676,6 +685,14 @@ func (c *Cacher) Get(ctx context.Context, key string, opts storage.GetOptions, o
return c.storage.Get(ctx, key, opts, objPtr)
}
if utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
if !c.ready.check() {
// If Cache is not initialized, delegate Get requests to storage
// as described in https://kep.k8s.io/4568
return c.storage.Get(ctx, key, opts, objPtr)
}
}
// If resourceVersion is specified, serve it from cache.
// It's guaranteed that the returned value is at least that
// fresh as the given resourceVersion.
@ -684,17 +701,19 @@ func (c *Cacher) Get(ctx context.Context, key string, opts storage.GetOptions, o
return err
}
// Do not create a trace - it's not for free and there are tons
// of Get requests. We can add it if it will be really needed.
if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
if getRV == 0 && !c.ready.check() {
// If Cacher is not yet initialized and we don't require any specific
// minimal resource version, simply forward the request to storage.
return c.storage.Get(ctx, key, opts, objPtr)
}
// Do not create a trace - it's not for free and there are tons
// of Get requests. We can add it if it will be really needed.
if err := c.ready.wait(ctx); err != nil {
return errors.NewServiceUnavailable(err.Error())
}
}
objVal, err := conversion.EnforcePtr(objPtr)
if err != nil {
@ -743,6 +762,14 @@ func shouldDelegateList(opts storage.ListOptions) bool {
return consistentReadFromStorage || hasContinuation || hasLimit || unsupportedMatch
}
func shouldDelegateListOnNotReadyCache(opts storage.ListOptions) bool {
pred := opts.Predicate
noLabelSelector := pred.Label == nil || pred.Label.Empty()
noFieldSelector := pred.Field == nil || pred.Field.Empty()
hasLimit := pred.Limit > 0
return noLabelSelector && noFieldSelector && hasLimit
}
func (c *Cacher) listItems(ctx context.Context, listRV uint64, key string, pred storage.SelectionPredicate, recursive bool) ([]interface{}, uint64, string, error) {
if !recursive {
obj, exists, readResourceVersion, err := c.watchCache.WaitUntilFreshAndGet(ctx, listRV, key)
@ -770,11 +797,20 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio
if err != nil {
return err
}
if utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
if !c.ready.check() && shouldDelegateListOnNotReadyCache(opts) {
// If Cacher is not initialized, delegate List requests to storage
// as described in https://kep.k8s.io/4568
return c.storage.GetList(ctx, key, opts, listObj)
}
} else {
if listRV == 0 && !c.ready.check() {
// If Cacher is not yet initialized and we don't require any specific
// minimal resource version, simply forward the request to storage.
return c.storage.GetList(ctx, key, opts, listObj)
}
}
requestWatchProgressSupported := etcdfeature.DefaultFeatureSupportChecker.Supports(storage.RequestWatchProgress)
if resourceVersion == "" && utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) && requestWatchProgressSupported {
listRV, err = storage.GetCurrentResourceVersionFromStorage(ctx, c.storage, c.newListFunc, c.resourcePrefix, c.objectType.String())
@ -788,9 +824,17 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio
attribute.Stringer("type", c.groupResource))
defer span.End(500 * time.Millisecond)
if utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
if !c.ready.check() {
// If Cacher is not initialized, reject List requests
// as described in https://kep.k8s.io/4568
return errors.NewTooManyRequests("storage is (re)initializing", 1)
}
} else {
if err := c.ready.wait(ctx); err != nil {
return errors.NewServiceUnavailable(err.Error())
}
}
span.AddEvent("Ready")
// List elements with at least 'listRV' from cache.

View File

@ -464,15 +464,26 @@ func testSetupWithEtcdServer(t *testing.T, opts ...setupOption) (context.Context
t.Fatalf("Failed to inject list errors: %v", err)
}
if utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
// The tests assume that Get/GetList/Watch calls shouldn't fail.
// However, 429 error can now be returned if watchcache is under initialization.
// To avoid rewriting all tests, we wait for watcache to initialize.
if err := cacher.ready.wait(ctx); err != nil {
t.Fatal(err)
}
}
return ctx, cacher, server, terminate
}
func testSetupWithEtcdAndCreateWrapper(t *testing.T, opts ...setupOption) (storage.Interface, tearDownFunc) {
_, cacher, _, tearDown := testSetupWithEtcdServer(t, opts...)
if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
if err := cacher.ready.wait(context.TODO()); err != nil {
t.Fatalf("unexpected error waiting for the cache to be ready")
}
}
return &createWrapper{Cacher: cacher}, tearDown
}

View File

@ -53,7 +53,7 @@ import (
"k8s.io/utils/pointer"
)
func newTestCacher(s storage.Interface) (*Cacher, storage.Versioner, error) {
func newTestCacherWithoutSyncing(s storage.Interface) (*Cacher, storage.Versioner, error) {
prefix := "pods"
config := Config{
Storage: s,
@ -79,9 +79,27 @@ func newTestCacher(s storage.Interface) (*Cacher, storage.Versioner, error) {
Clock: clock.RealClock{},
}
cacher, err := NewCacherFromConfig(config)
return cacher, storage.APIObjectVersioner{}, err
}
func newTestCacher(s storage.Interface) (*Cacher, storage.Versioner, error) {
cacher, versioner, err := newTestCacherWithoutSyncing(s)
if err != nil {
return nil, versioner, err
}
if utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
// The tests assume that Get/GetList/Watch calls shouldn't fail.
// However, 429 error can now be returned if watchcache is under initialization.
// To avoid rewriting all tests, we wait for watcache to initialize.
if err := cacher.ready.wait(context.Background()); err != nil {
return nil, storage.APIObjectVersioner{}, err
}
}
return cacher, versioner, nil
}
type dummyStorage struct {
sync.RWMutex
err error
@ -222,10 +240,12 @@ func testGetListCacheBypass(t *testing.T, options storage.ListOptions, expectByp
result := &example.PodList{}
// Wait until cacher is initialized.
if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
if err := cacher.ready.wait(context.Background()); err != nil {
t.Fatalf("unexpected error waiting for the cache to be ready")
}
}
// Inject error to underlying layer and check if cacher is not bypassed.
backingStorage.getListFn = func(_ context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
currentResourceVersion := "42"
@ -267,10 +287,11 @@ func TestGetListNonRecursiveCacheBypass(t *testing.T) {
}
result := &example.PodList{}
// Wait until cacher is initialized.
if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
if err := cacher.ready.wait(context.Background()); err != nil {
t.Fatalf("unexpected error waiting for the cache to be ready")
}
}
// Inject error to underlying layer and check if cacher is not bypassed.
backingStorage.injectError(errDummy)
@ -301,10 +322,11 @@ func TestGetCacheBypass(t *testing.T) {
result := &example.Pod{}
// Wait until cacher is initialized.
if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
if err := cacher.ready.wait(context.Background()); err != nil {
t.Fatalf("unexpected error waiting for the cache to be ready")
}
}
// Inject error to underlying layer and check if cacher is not bypassed.
backingStorage.injectError(errDummy)
@ -333,10 +355,11 @@ func TestWatchCacheBypass(t *testing.T) {
}
defer cacher.Stop()
// Wait until cacher is initialized.
if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
if err := cacher.ready.wait(context.Background()); err != nil {
t.Fatalf("unexpected error waiting for the cache to be ready")
}
}
_, err = cacher.Watch(context.TODO(), "pod/ns", storage.ListOptions{
ResourceVersion: "0",
@ -375,6 +398,43 @@ func TestWatchCacheBypass(t *testing.T) {
}
}
func TestTooManyRequestsNotReturned(t *testing.T) {
// Ensure that with ResilientWatchCacheInitialization feature disabled, we don't return 429
// errors when watchcache is not initialized.
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ResilientWatchCacheInitialization, false)
dummyErr := fmt.Errorf("dummy")
backingStorage := &dummyStorage{err: dummyErr}
cacher, _, err := newTestCacherWithoutSyncing(backingStorage)
if err != nil {
t.Fatalf("Couldn't create cacher: %v", err)
}
defer cacher.Stop()
opts := storage.ListOptions{
ResourceVersion: "0",
Predicate: storage.Everything,
}
// Cancel the request so that it doesn't hang forever.
listCtx, listCancel := context.WithTimeout(context.Background(), 250*time.Millisecond)
defer listCancel()
result := &example.PodList{}
err = cacher.GetList(listCtx, "/pods/ns", opts, result)
if err != nil && apierrors.IsTooManyRequests(err) {
t.Errorf("Unexpected 429 error without ResilientWatchCacheInitialization feature for List")
}
watchCtx, watchCancel := context.WithTimeout(context.Background(), 250*time.Millisecond)
defer watchCancel()
_, err = cacher.Watch(watchCtx, "/pods/ns", opts)
if err != nil && apierrors.IsTooManyRequests(err) {
t.Errorf("Unexpected 429 error without ResilientWatchCacheInitialization feature for Watch")
}
}
func TestEmptyWatchEventCache(t *testing.T) {
server, etcdStorage := newEtcdTestStorage(t, etcd3testing.PathPrefix())
defer server.Terminate(t)
@ -471,7 +531,7 @@ func TestWatchNotHangingOnStartupFailure(t *testing.T) {
// constantly failing lists to the underlying storage.
dummyErr := fmt.Errorf("dummy")
backingStorage := &dummyStorage{err: dummyErr}
cacher, _, err := newTestCacher(backingStorage)
cacher, _, err := newTestCacherWithoutSyncing(backingStorage)
if err != nil {
t.Fatalf("Couldn't create cacher: %v", err)
}
@ -489,9 +549,15 @@ func TestWatchNotHangingOnStartupFailure(t *testing.T) {
// Ensure that it terminates when its context is cancelled
// (e.g. the request is terminated for whatever reason).
_, err = cacher.Watch(ctx, "pods/ns", storage.ListOptions{ResourceVersion: "0"})
if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
if err == nil || err.Error() != apierrors.NewServiceUnavailable(context.Canceled.Error()).Error() {
t.Errorf("Unexpected error: %#v", err)
}
} else {
if err == nil || err.Error() != apierrors.NewTooManyRequests("storage is (re)initializing", 1).Error() {
t.Errorf("Unexpected error: %#v", err)
}
}
}
func TestWatcherNotGoingBackInTime(t *testing.T) {
@ -502,10 +568,11 @@ func TestWatcherNotGoingBackInTime(t *testing.T) {
}
defer cacher.Stop()
// Wait until cacher is initialized.
if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
if err := cacher.ready.wait(context.Background()); err != nil {
t.Fatalf("unexpected error waiting for the cache to be ready")
}
}
// Ensure there is some budget for slowing down processing.
cacher.dispatchTimeoutBudget.returnUnused(100 * time.Millisecond)
@ -588,10 +655,11 @@ func TestCacherDontAcceptRequestsStopped(t *testing.T) {
t.Fatalf("Couldn't create cacher: %v", err)
}
// Wait until cacher is initialized.
if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
if err := cacher.ready.wait(context.Background()); err != nil {
t.Fatalf("unexpected error waiting for the cache to be ready")
}
}
w, err := cacher.Watch(context.Background(), "pods/ns", storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything})
if err != nil {
@ -623,18 +691,33 @@ func TestCacherDontAcceptRequestsStopped(t *testing.T) {
IgnoreNotFound: true,
ResourceVersion: "1",
}, result)
if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
if err == nil {
t.Fatalf("Success to create Get: %v", err)
}
} else {
if err != nil {
t.Fatalf("Failed to get object: %v:", err)
}
}
listResult := &example.PodList{}
err = cacher.GetList(context.TODO(), "pods/ns", storage.ListOptions{
ResourceVersion: "1",
Recursive: true,
Predicate: storage.SelectionPredicate{
Limit: 500,
},
}, listResult)
if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
if err == nil {
t.Fatalf("Success to create GetList: %v", err)
}
} else {
if err != nil {
t.Fatalf("Failed to list objects: %v", err)
}
}
select {
case <-watchClosed:
@ -762,10 +845,12 @@ func TestCacherNoLeakWithMultipleWatchers(t *testing.T) {
}
defer cacher.Stop()
// Wait until cacher is initialized.
if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
if err := cacher.ready.wait(context.Background()); err != nil {
t.Fatalf("unexpected error waiting for the cache to be ready")
}
}
pred := storage.Everything
pred.AllowWatchBookmarks = true
@ -841,10 +926,11 @@ func testCacherSendBookmarkEvents(t *testing.T, allowWatchBookmarks, expectedBoo
}
defer cacher.Stop()
// Wait until cacher is initialized.
if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
if err := cacher.ready.wait(context.Background()); err != nil {
t.Fatalf("unexpected error waiting for the cache to be ready")
}
}
pred := storage.Everything
pred.AllowWatchBookmarks = allowWatchBookmarks
@ -941,10 +1027,11 @@ func TestCacherSendsMultipleWatchBookmarks(t *testing.T) {
// resolution how frequency we recompute.
cacher.bookmarkWatchers.bookmarkFrequency = time.Second
// Wait until cacher is initialized.
if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
if err := cacher.ready.wait(context.Background()); err != nil {
t.Fatalf("unexpected error waiting for the cache to be ready")
}
}
pred := storage.Everything
pred.AllowWatchBookmarks = true
@ -1011,10 +1098,11 @@ func TestDispatchingBookmarkEventsWithConcurrentStop(t *testing.T) {
}
defer cacher.Stop()
// Wait until cacher is initialized.
if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
if err := cacher.ready.wait(context.Background()); err != nil {
t.Fatalf("unexpected error waiting for the cache to be ready")
}
}
// Ensure there is some budget for slowing down processing.
cacher.dispatchTimeoutBudget.returnUnused(100 * time.Millisecond)
@ -1089,10 +1177,11 @@ func TestBookmarksOnResourceVersionUpdates(t *testing.T) {
// Ensure that bookmarks are sent more frequently than every 1m.
cacher.bookmarkWatchers = newTimeBucketWatchers(clock.RealClock{}, 2*time.Second)
// Wait until cacher is initialized.
if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
if err := cacher.ready.wait(context.Background()); err != nil {
t.Fatalf("unexpected error waiting for the cache to be ready")
}
}
makePod := func(i int) *examplev1.Pod {
return &examplev1.Pod{
@ -1167,10 +1256,11 @@ func TestStartingResourceVersion(t *testing.T) {
}
defer cacher.Stop()
// Wait until cacher is initialized.
if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
if err := cacher.ready.wait(context.Background()); err != nil {
t.Fatalf("unexpected error waiting for the cache to be ready")
}
}
// Ensure there is some budget for slowing down processing.
// We use the fakeTimeBudget to prevent this test from flaking under
@ -1247,10 +1337,11 @@ func TestDispatchEventWillNotBeBlockedByTimedOutWatcher(t *testing.T) {
}
defer cacher.Stop()
// Wait until cacher is initialized.
if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
if err := cacher.ready.wait(context.Background()); err != nil {
t.Fatalf("unexpected error waiting for the cache to be ready")
}
}
// Ensure there is some budget for slowing down processing.
// We use the fakeTimeBudget to prevent this test from flaking under
@ -1389,10 +1480,11 @@ func TestCachingDeleteEvents(t *testing.T) {
}
defer cacher.Stop()
// Wait until cacher is initialized.
if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
if err := cacher.ready.wait(context.Background()); err != nil {
t.Fatalf("unexpected error waiting for the cache to be ready")
}
}
fooPredicate := storage.SelectionPredicate{
Label: labels.SelectorFromSet(map[string]string{"foo": "true"}),
@ -1471,10 +1563,11 @@ func testCachingObjects(t *testing.T, watchersCount int) {
}
defer cacher.Stop()
// Wait until cacher is initialized.
if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
if err := cacher.ready.wait(context.Background()); err != nil {
t.Fatalf("unexpected error waiting for the cache to be ready")
}
}
dispatchedEvents := []*watchCacheEvent{}
cacher.watchCache.eventHandler = func(event *watchCacheEvent) {
@ -1567,10 +1660,12 @@ func TestCacheIntervalInvalidationStopsWatch(t *testing.T) {
}
defer cacher.Stop()
// Wait until cacher is initialized.
if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
if err := cacher.ready.wait(context.Background()); err != nil {
t.Fatalf("unexpected error waiting for the cache to be ready")
}
}
// Ensure there is enough budget for slow processing since
// the entire watch cache is going to be served through the
// interval and events won't be popped from the cacheWatcher's
@ -1754,9 +1849,12 @@ func TestWaitUntilWatchCacheFreshAndForceAllEvents(t *testing.T) {
t.Fatalf("Couldn't create cacher: %v", err)
}
defer cacher.Stop()
if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
if err := cacher.ready.wait(context.Background()); err != nil {
t.Fatalf("unexpected error waiting for the cache to be ready")
}
}
w, err := cacher.Watch(context.Background(), "pods/ns", scenario.opts)
require.NoError(t, err, "failed to create watch: %v")
@ -1911,10 +2009,11 @@ func TestWatchListIsSynchronisedWhenNoEventsFromStoreReceived(t *testing.T) {
require.NoError(t, err, "failed to create cacher")
defer cacher.Stop()
// wait until cacher is initialized.
if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
if err := cacher.ready.wait(context.Background()); err != nil {
t.Fatalf("unexpected error waiting for the cache to be ready")
}
}
pred := storage.Everything
pred.AllowWatchBookmarks = true
@ -1942,10 +2041,11 @@ func TestForgetWatcher(t *testing.T) {
require.NoError(t, err)
defer cacher.Stop()
// wait until cacher is initialized.
if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
if err := cacher.ready.wait(context.Background()); err != nil {
t.Fatalf("unexpected error waiting for the cache to be ready")
}
}
assertCacherInternalState := func(expectedWatchersCounter, expectedValueWatchersCounter int) {
cacher.Lock()
@ -2334,9 +2434,12 @@ func TestGetBookmarkAfterResourceVersionLockedFunc(t *testing.T) {
require.NoError(t, err, "couldn't create cacher")
defer cacher.Stop()
if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
if err := cacher.ready.wait(context.Background()); err != nil {
t.Fatalf("unexpected error waiting for the cache to be ready")
}
}
cacher.watchCache.UpdateResourceVersion(fmt.Sprintf("%d", scenario.watchCacheResourceVersion))
parsedResourceVersion := 0
@ -2395,9 +2498,12 @@ func TestWatchStreamSeparation(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.SeparateCacheWatchRPC, tc.separateCacheWatchRPC)
_, cacher, _, terminate := testSetupWithEtcdServer(t)
t.Cleanup(terminate)
if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
if err := cacher.ready.wait(context.TODO()); err != nil {
t.Fatalf("unexpected error waiting for the cache to be ready")
}
}
getCacherRV := func() uint64 {
cacher.watchCache.RLock()

View File

@ -507,13 +507,59 @@ func TestBackoffOnTooManyRequests(t *testing.T) {
}
stopCh := make(chan struct{})
r.ListAndWatch(stopCh)
if err := r.ListAndWatch(stopCh); err != nil {
t.Fatal(err)
}
close(stopCh)
if bm.calls != 2 {
t.Errorf("unexpected watch backoff calls: %d", bm.calls)
}
}
func TestNoRelistOnTooManyRequests(t *testing.T) {
err := apierrors.NewTooManyRequests("too many requests", 1)
clock := &clock.RealClock{}
bm := &fakeBackoff{clock: clock}
listCalls, watchCalls := 0, 0
lw := &testLW{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
listCalls++
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "1"}}, nil
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
watchCalls++
if watchCalls < 5 {
return nil, err
}
w := watch.NewFake()
w.Stop()
return w, nil
},
}
r := &Reflector{
name: "test-reflector",
listerWatcher: lw,
store: NewFIFO(MetaNamespaceKeyFunc),
backoffManager: bm,
clock: clock,
watchErrorHandler: WatchErrorHandler(DefaultWatchErrorHandler),
}
stopCh := make(chan struct{})
if err := r.ListAndWatch(stopCh); err != nil {
t.Fatal(err)
}
close(stopCh)
if listCalls != 1 {
t.Errorf("unexpected list calls: %d", listCalls)
}
if watchCalls != 5 {
t.Errorf("unexpected watch calls: %d", watchCalls)
}
}
func TestRetryInternalError(t *testing.T) {
testCases := []struct {
name string