From a8ef6e9f0104a44023162bb8229fb677ec80beb1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Tyczy=C5=84ski?= Date: Mon, 29 Apr 2024 14:19:46 +0200 Subject: [PATCH 1/2] Implement ResilientWatchCacheInitialization --- pkg/features/kube_features.go | 2 + .../apiserver/pkg/features/kube_features.go | 9 + .../apiserver/pkg/storage/cacher/cacher.go | 78 ++++-- .../pkg/storage/cacher/cacher_test.go | 15 +- .../storage/cacher/cacher_whitebox_test.go | 242 +++++++++++++----- .../client-go/tools/cache/reflector_test.go | 48 +++- 6 files changed, 306 insertions(+), 88 deletions(-) diff --git a/pkg/features/kube_features.go b/pkg/features/kube_features.go index cb57a4d8776..43b9ae7c4fe 100644 --- a/pkg/features/kube_features.go +++ b/pkg/features/kube_features.go @@ -1235,6 +1235,8 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS genericfeatures.RemainingItemCount: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, + genericfeatures.ResilientWatchCacheInitialization: {Default: true, PreRelease: featuregate.Beta}, + genericfeatures.SeparateCacheWatchRPC: {Default: true, PreRelease: featuregate.Beta}, genericfeatures.ServerSideApply: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.29 diff --git a/staging/src/k8s.io/apiserver/pkg/features/kube_features.go b/staging/src/k8s.io/apiserver/pkg/features/kube_features.go index c73063a2e39..c25296590a5 100644 --- a/staging/src/k8s.io/apiserver/pkg/features/kube_features.go +++ b/staging/src/k8s.io/apiserver/pkg/features/kube_features.go @@ -173,6 +173,13 @@ const ( // to a chunking list request. RemainingItemCount featuregate.Feature = "RemainingItemCount" + // owner: @wojtek-t + // beta: v1.31 + // + // Enables resilient watchcache initialization to avoid controlplane + // overload. + ResilientWatchCacheInitialization featuregate.Feature = "ResilientWatchCacheInitialization" + // owner: @serathius // beta: v1.30 // @@ -353,6 +360,8 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS RemainingItemCount: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.32 + ResilientWatchCacheInitialization: {Default: true, PreRelease: featuregate.Beta}, + RetryGenerateName: {Default: true, PreRelease: featuregate.Beta}, SeparateCacheWatchRPC: {Default: true, PreRelease: featuregate.Beta}, diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go index bbb510fcb73..bfe34801e59 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go @@ -532,9 +532,18 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions return nil, err } - readyGeneration, err := c.ready.waitAndReadGeneration(ctx) - if err != nil { - return nil, errors.NewServiceUnavailable(err.Error()) + var readyGeneration int + if utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) { + var ok bool + readyGeneration, ok = c.ready.checkAndReadGeneration() + if !ok { + return nil, errors.NewTooManyRequests("storage is (re)initializing", 1) + } + } else { + readyGeneration, err = c.ready.waitAndReadGeneration(ctx) + if err != nil { + return nil, errors.NewServiceUnavailable(err.Error()) + } } // determine the namespace and name scope of the watch, first from the request, secondarily from the field selector @@ -676,6 +685,14 @@ func (c *Cacher) Get(ctx context.Context, key string, opts storage.GetOptions, o return c.storage.Get(ctx, key, opts, objPtr) } + if utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) { + if !c.ready.check() { + // If Cache is not initialized, delegate Get requests to storage + // as described in https://kep.k8s.io/4568 + return c.storage.Get(ctx, key, opts, objPtr) + } + } + // If resourceVersion is specified, serve it from cache. // It's guaranteed that the returned value is at least that // fresh as the given resourceVersion. @@ -684,16 +701,18 @@ func (c *Cacher) Get(ctx context.Context, key string, opts storage.GetOptions, o return err } - if getRV == 0 && !c.ready.check() { - // If Cacher is not yet initialized and we don't require any specific - // minimal resource version, simply forward the request to storage. - return c.storage.Get(ctx, key, opts, objPtr) - } - // Do not create a trace - it's not for free and there are tons // of Get requests. We can add it if it will be really needed. - if err := c.ready.wait(ctx); err != nil { - return errors.NewServiceUnavailable(err.Error()) + + if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) { + if getRV == 0 && !c.ready.check() { + // If Cacher is not yet initialized and we don't require any specific + // minimal resource version, simply forward the request to storage. + return c.storage.Get(ctx, key, opts, objPtr) + } + if err := c.ready.wait(ctx); err != nil { + return errors.NewServiceUnavailable(err.Error()) + } } objVal, err := conversion.EnforcePtr(objPtr) @@ -743,6 +762,14 @@ func shouldDelegateList(opts storage.ListOptions) bool { return consistentReadFromStorage || hasContinuation || hasLimit || unsupportedMatch } +func shouldDelegateListOnNotReadyCache(opts storage.ListOptions) bool { + pred := opts.Predicate + noLabelSelector := pred.Label == nil || pred.Label.Empty() + noFieldSelector := pred.Field == nil || pred.Field.Empty() + hasLimit := pred.Limit > 0 + return noLabelSelector && noFieldSelector && hasLimit +} + func (c *Cacher) listItems(ctx context.Context, listRV uint64, key string, pred storage.SelectionPredicate, recursive bool) ([]interface{}, uint64, string, error) { if !recursive { obj, exists, readResourceVersion, err := c.watchCache.WaitUntilFreshAndGet(ctx, listRV, key) @@ -770,10 +797,19 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio if err != nil { return err } - if listRV == 0 && !c.ready.check() { - // If Cacher is not yet initialized and we don't require any specific - // minimal resource version, simply forward the request to storage. - return c.storage.GetList(ctx, key, opts, listObj) + + if utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) { + if !c.ready.check() && shouldDelegateListOnNotReadyCache(opts) { + // If Cacher is not initialized, delegate List requests to storage + // as described in https://kep.k8s.io/4568 + return c.storage.GetList(ctx, key, opts, listObj) + } + } else { + if listRV == 0 && !c.ready.check() { + // If Cacher is not yet initialized and we don't require any specific + // minimal resource version, simply forward the request to storage. + return c.storage.GetList(ctx, key, opts, listObj) + } } requestWatchProgressSupported := etcdfeature.DefaultFeatureSupportChecker.Supports(storage.RequestWatchProgress) if resourceVersion == "" && utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) && requestWatchProgressSupported { @@ -788,8 +824,16 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio attribute.Stringer("type", c.groupResource)) defer span.End(500 * time.Millisecond) - if err := c.ready.wait(ctx); err != nil { - return errors.NewServiceUnavailable(err.Error()) + if utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) { + if !c.ready.check() { + // If Cacher is not initialized, reject List requests + // as described in https://kep.k8s.io/4568 + return errors.NewTooManyRequests("storage is (re)initializing", 1) + } + } else { + if err := c.ready.wait(ctx); err != nil { + return errors.NewServiceUnavailable(err.Error()) + } } span.AddEvent("Ready") diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_test.go index 1b39a8d8771..cdd318a9d00 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_test.go @@ -464,14 +464,25 @@ func testSetupWithEtcdServer(t *testing.T, opts ...setupOption) (context.Context t.Fatalf("Failed to inject list errors: %v", err) } + if utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) { + // The tests assume that Get/GetList/Watch calls shouldn't fail. + // However, 429 error can now be returned if watchcache is under initialization. + // To avoid rewriting all tests, we wait for watcache to initialize. + if err := cacher.ready.wait(ctx); err != nil { + t.Fatal(err) + } + } + return ctx, cacher, server, terminate } func testSetupWithEtcdAndCreateWrapper(t *testing.T, opts ...setupOption) (storage.Interface, tearDownFunc) { _, cacher, _, tearDown := testSetupWithEtcdServer(t, opts...) - if err := cacher.ready.wait(context.TODO()); err != nil { - t.Fatalf("unexpected error waiting for the cache to be ready") + if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) { + if err := cacher.ready.wait(context.TODO()); err != nil { + t.Fatalf("unexpected error waiting for the cache to be ready") + } } return &createWrapper{Cacher: cacher}, tearDown } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go index cc34ced7fcf..0407fafbec8 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go @@ -53,7 +53,7 @@ import ( "k8s.io/utils/pointer" ) -func newTestCacher(s storage.Interface) (*Cacher, storage.Versioner, error) { +func newTestCacherWithoutSyncing(s storage.Interface) (*Cacher, storage.Versioner, error) { prefix := "pods" config := Config{ Storage: s, @@ -79,9 +79,27 @@ func newTestCacher(s storage.Interface) (*Cacher, storage.Versioner, error) { Clock: clock.RealClock{}, } cacher, err := NewCacherFromConfig(config) + return cacher, storage.APIObjectVersioner{}, err } +func newTestCacher(s storage.Interface) (*Cacher, storage.Versioner, error) { + cacher, versioner, err := newTestCacherWithoutSyncing(s) + if err != nil { + return nil, versioner, err + } + + if utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) { + // The tests assume that Get/GetList/Watch calls shouldn't fail. + // However, 429 error can now be returned if watchcache is under initialization. + // To avoid rewriting all tests, we wait for watcache to initialize. + if err := cacher.ready.wait(context.Background()); err != nil { + return nil, storage.APIObjectVersioner{}, err + } + } + return cacher, versioner, nil +} + type dummyStorage struct { sync.RWMutex err error @@ -222,10 +240,12 @@ func testGetListCacheBypass(t *testing.T, options storage.ListOptions, expectByp result := &example.PodList{} - // Wait until cacher is initialized. - if err := cacher.ready.wait(context.Background()); err != nil { - t.Fatalf("unexpected error waiting for the cache to be ready") + if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) { + if err := cacher.ready.wait(context.Background()); err != nil { + t.Fatalf("unexpected error waiting for the cache to be ready") + } } + // Inject error to underlying layer and check if cacher is not bypassed. backingStorage.getListFn = func(_ context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error { currentResourceVersion := "42" @@ -267,9 +287,10 @@ func TestGetListNonRecursiveCacheBypass(t *testing.T) { } result := &example.PodList{} - // Wait until cacher is initialized. - if err := cacher.ready.wait(context.Background()); err != nil { - t.Fatalf("unexpected error waiting for the cache to be ready") + if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) { + if err := cacher.ready.wait(context.Background()); err != nil { + t.Fatalf("unexpected error waiting for the cache to be ready") + } } // Inject error to underlying layer and check if cacher is not bypassed. @@ -301,9 +322,10 @@ func TestGetCacheBypass(t *testing.T) { result := &example.Pod{} - // Wait until cacher is initialized. - if err := cacher.ready.wait(context.Background()); err != nil { - t.Fatalf("unexpected error waiting for the cache to be ready") + if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) { + if err := cacher.ready.wait(context.Background()); err != nil { + t.Fatalf("unexpected error waiting for the cache to be ready") + } } // Inject error to underlying layer and check if cacher is not bypassed. @@ -333,9 +355,10 @@ func TestWatchCacheBypass(t *testing.T) { } defer cacher.Stop() - // Wait until cacher is initialized. - if err := cacher.ready.wait(context.Background()); err != nil { - t.Fatalf("unexpected error waiting for the cache to be ready") + if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) { + if err := cacher.ready.wait(context.Background()); err != nil { + t.Fatalf("unexpected error waiting for the cache to be ready") + } } _, err = cacher.Watch(context.TODO(), "pod/ns", storage.ListOptions{ @@ -375,6 +398,43 @@ func TestWatchCacheBypass(t *testing.T) { } } +func TestTooManyRequestsNotReturned(t *testing.T) { + // Ensure that with ResilientWatchCacheInitialization feature disabled, we don't return 429 + // errors when watchcache is not initialized. + featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ResilientWatchCacheInitialization, false) + + dummyErr := fmt.Errorf("dummy") + backingStorage := &dummyStorage{err: dummyErr} + cacher, _, err := newTestCacherWithoutSyncing(backingStorage) + if err != nil { + t.Fatalf("Couldn't create cacher: %v", err) + } + defer cacher.Stop() + + opts := storage.ListOptions{ + ResourceVersion: "0", + Predicate: storage.Everything, + } + + // Cancel the request so that it doesn't hang forever. + listCtx, listCancel := context.WithTimeout(context.Background(), 250*time.Millisecond) + defer listCancel() + + result := &example.PodList{} + err = cacher.GetList(listCtx, "/pods/ns", opts, result) + if err != nil && apierrors.IsTooManyRequests(err) { + t.Errorf("Unexpected 429 error without ResilientWatchCacheInitialization feature for List") + } + + watchCtx, watchCancel := context.WithTimeout(context.Background(), 250*time.Millisecond) + defer watchCancel() + + _, err = cacher.Watch(watchCtx, "/pods/ns", opts) + if err != nil && apierrors.IsTooManyRequests(err) { + t.Errorf("Unexpected 429 error without ResilientWatchCacheInitialization feature for Watch") + } +} + func TestEmptyWatchEventCache(t *testing.T) { server, etcdStorage := newEtcdTestStorage(t, etcd3testing.PathPrefix()) defer server.Terminate(t) @@ -471,7 +531,7 @@ func TestWatchNotHangingOnStartupFailure(t *testing.T) { // constantly failing lists to the underlying storage. dummyErr := fmt.Errorf("dummy") backingStorage := &dummyStorage{err: dummyErr} - cacher, _, err := newTestCacher(backingStorage) + cacher, _, err := newTestCacherWithoutSyncing(backingStorage) if err != nil { t.Fatalf("Couldn't create cacher: %v", err) } @@ -489,8 +549,14 @@ func TestWatchNotHangingOnStartupFailure(t *testing.T) { // Ensure that it terminates when its context is cancelled // (e.g. the request is terminated for whatever reason). _, err = cacher.Watch(ctx, "pods/ns", storage.ListOptions{ResourceVersion: "0"}) - if err == nil || err.Error() != apierrors.NewServiceUnavailable(context.Canceled.Error()).Error() { - t.Errorf("Unexpected error: %#v", err) + if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) { + if err == nil || err.Error() != apierrors.NewServiceUnavailable(context.Canceled.Error()).Error() { + t.Errorf("Unexpected error: %#v", err) + } + } else { + if err == nil || err.Error() != apierrors.NewTooManyRequests("storage is (re)initializing", 1).Error() { + t.Errorf("Unexpected error: %#v", err) + } } } @@ -502,9 +568,10 @@ func TestWatcherNotGoingBackInTime(t *testing.T) { } defer cacher.Stop() - // Wait until cacher is initialized. - if err := cacher.ready.wait(context.Background()); err != nil { - t.Fatalf("unexpected error waiting for the cache to be ready") + if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) { + if err := cacher.ready.wait(context.Background()); err != nil { + t.Fatalf("unexpected error waiting for the cache to be ready") + } } // Ensure there is some budget for slowing down processing. @@ -588,9 +655,10 @@ func TestCacherDontAcceptRequestsStopped(t *testing.T) { t.Fatalf("Couldn't create cacher: %v", err) } - // Wait until cacher is initialized. - if err := cacher.ready.wait(context.Background()); err != nil { - t.Fatalf("unexpected error waiting for the cache to be ready") + if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) { + if err := cacher.ready.wait(context.Background()); err != nil { + t.Fatalf("unexpected error waiting for the cache to be ready") + } } w, err := cacher.Watch(context.Background(), "pods/ns", storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything}) @@ -623,17 +691,32 @@ func TestCacherDontAcceptRequestsStopped(t *testing.T) { IgnoreNotFound: true, ResourceVersion: "1", }, result) - if err == nil { - t.Fatalf("Success to create Get: %v", err) + if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) { + if err == nil { + t.Fatalf("Success to create Get: %v", err) + } + } else { + if err != nil { + t.Fatalf("Failed to get object: %v:", err) + } } listResult := &example.PodList{} err = cacher.GetList(context.TODO(), "pods/ns", storage.ListOptions{ ResourceVersion: "1", Recursive: true, + Predicate: storage.SelectionPredicate{ + Limit: 500, + }, }, listResult) - if err == nil { - t.Fatalf("Success to create GetList: %v", err) + if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) { + if err == nil { + t.Fatalf("Success to create GetList: %v", err) + } + } else { + if err != nil { + t.Fatalf("Failed to list objects: %v", err) + } } select { @@ -762,10 +845,12 @@ func TestCacherNoLeakWithMultipleWatchers(t *testing.T) { } defer cacher.Stop() - // Wait until cacher is initialized. - if err := cacher.ready.wait(context.Background()); err != nil { - t.Fatalf("unexpected error waiting for the cache to be ready") + if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) { + if err := cacher.ready.wait(context.Background()); err != nil { + t.Fatalf("unexpected error waiting for the cache to be ready") + } } + pred := storage.Everything pred.AllowWatchBookmarks = true @@ -841,9 +926,10 @@ func testCacherSendBookmarkEvents(t *testing.T, allowWatchBookmarks, expectedBoo } defer cacher.Stop() - // Wait until cacher is initialized. - if err := cacher.ready.wait(context.Background()); err != nil { - t.Fatalf("unexpected error waiting for the cache to be ready") + if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) { + if err := cacher.ready.wait(context.Background()); err != nil { + t.Fatalf("unexpected error waiting for the cache to be ready") + } } pred := storage.Everything pred.AllowWatchBookmarks = allowWatchBookmarks @@ -941,9 +1027,10 @@ func TestCacherSendsMultipleWatchBookmarks(t *testing.T) { // resolution how frequency we recompute. cacher.bookmarkWatchers.bookmarkFrequency = time.Second - // Wait until cacher is initialized. - if err := cacher.ready.wait(context.Background()); err != nil { - t.Fatalf("unexpected error waiting for the cache to be ready") + if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) { + if err := cacher.ready.wait(context.Background()); err != nil { + t.Fatalf("unexpected error waiting for the cache to be ready") + } } pred := storage.Everything pred.AllowWatchBookmarks = true @@ -1011,9 +1098,10 @@ func TestDispatchingBookmarkEventsWithConcurrentStop(t *testing.T) { } defer cacher.Stop() - // Wait until cacher is initialized. - if err := cacher.ready.wait(context.Background()); err != nil { - t.Fatalf("unexpected error waiting for the cache to be ready") + if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) { + if err := cacher.ready.wait(context.Background()); err != nil { + t.Fatalf("unexpected error waiting for the cache to be ready") + } } // Ensure there is some budget for slowing down processing. @@ -1089,9 +1177,10 @@ func TestBookmarksOnResourceVersionUpdates(t *testing.T) { // Ensure that bookmarks are sent more frequently than every 1m. cacher.bookmarkWatchers = newTimeBucketWatchers(clock.RealClock{}, 2*time.Second) - // Wait until cacher is initialized. - if err := cacher.ready.wait(context.Background()); err != nil { - t.Fatalf("unexpected error waiting for the cache to be ready") + if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) { + if err := cacher.ready.wait(context.Background()); err != nil { + t.Fatalf("unexpected error waiting for the cache to be ready") + } } makePod := func(i int) *examplev1.Pod { @@ -1167,9 +1256,10 @@ func TestStartingResourceVersion(t *testing.T) { } defer cacher.Stop() - // Wait until cacher is initialized. - if err := cacher.ready.wait(context.Background()); err != nil { - t.Fatalf("unexpected error waiting for the cache to be ready") + if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) { + if err := cacher.ready.wait(context.Background()); err != nil { + t.Fatalf("unexpected error waiting for the cache to be ready") + } } // Ensure there is some budget for slowing down processing. @@ -1247,9 +1337,10 @@ func TestDispatchEventWillNotBeBlockedByTimedOutWatcher(t *testing.T) { } defer cacher.Stop() - // Wait until cacher is initialized. - if err := cacher.ready.wait(context.Background()); err != nil { - t.Fatalf("unexpected error waiting for the cache to be ready") + if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) { + if err := cacher.ready.wait(context.Background()); err != nil { + t.Fatalf("unexpected error waiting for the cache to be ready") + } } // Ensure there is some budget for slowing down processing. @@ -1389,9 +1480,10 @@ func TestCachingDeleteEvents(t *testing.T) { } defer cacher.Stop() - // Wait until cacher is initialized. - if err := cacher.ready.wait(context.Background()); err != nil { - t.Fatalf("unexpected error waiting for the cache to be ready") + if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) { + if err := cacher.ready.wait(context.Background()); err != nil { + t.Fatalf("unexpected error waiting for the cache to be ready") + } } fooPredicate := storage.SelectionPredicate{ @@ -1471,9 +1563,10 @@ func testCachingObjects(t *testing.T, watchersCount int) { } defer cacher.Stop() - // Wait until cacher is initialized. - if err := cacher.ready.wait(context.Background()); err != nil { - t.Fatalf("unexpected error waiting for the cache to be ready") + if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) { + if err := cacher.ready.wait(context.Background()); err != nil { + t.Fatalf("unexpected error waiting for the cache to be ready") + } } dispatchedEvents := []*watchCacheEvent{} @@ -1567,10 +1660,12 @@ func TestCacheIntervalInvalidationStopsWatch(t *testing.T) { } defer cacher.Stop() - // Wait until cacher is initialized. - if err := cacher.ready.wait(context.Background()); err != nil { - t.Fatalf("unexpected error waiting for the cache to be ready") + if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) { + if err := cacher.ready.wait(context.Background()); err != nil { + t.Fatalf("unexpected error waiting for the cache to be ready") + } } + // Ensure there is enough budget for slow processing since // the entire watch cache is going to be served through the // interval and events won't be popped from the cacheWatcher's @@ -1754,8 +1849,11 @@ func TestWaitUntilWatchCacheFreshAndForceAllEvents(t *testing.T) { t.Fatalf("Couldn't create cacher: %v", err) } defer cacher.Stop() - if err := cacher.ready.wait(context.Background()); err != nil { - t.Fatalf("unexpected error waiting for the cache to be ready") + + if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) { + if err := cacher.ready.wait(context.Background()); err != nil { + t.Fatalf("unexpected error waiting for the cache to be ready") + } } w, err := cacher.Watch(context.Background(), "pods/ns", scenario.opts) @@ -1911,9 +2009,10 @@ func TestWatchListIsSynchronisedWhenNoEventsFromStoreReceived(t *testing.T) { require.NoError(t, err, "failed to create cacher") defer cacher.Stop() - // wait until cacher is initialized. - if err := cacher.ready.wait(context.Background()); err != nil { - t.Fatalf("unexpected error waiting for the cache to be ready") + if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) { + if err := cacher.ready.wait(context.Background()); err != nil { + t.Fatalf("unexpected error waiting for the cache to be ready") + } } pred := storage.Everything @@ -1942,9 +2041,10 @@ func TestForgetWatcher(t *testing.T) { require.NoError(t, err) defer cacher.Stop() - // wait until cacher is initialized. - if err := cacher.ready.wait(context.Background()); err != nil { - t.Fatalf("unexpected error waiting for the cache to be ready") + if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) { + if err := cacher.ready.wait(context.Background()); err != nil { + t.Fatalf("unexpected error waiting for the cache to be ready") + } } assertCacherInternalState := func(expectedWatchersCounter, expectedValueWatchersCounter int) { @@ -2334,8 +2434,11 @@ func TestGetBookmarkAfterResourceVersionLockedFunc(t *testing.T) { require.NoError(t, err, "couldn't create cacher") defer cacher.Stop() - if err := cacher.ready.wait(context.Background()); err != nil { - t.Fatalf("unexpected error waiting for the cache to be ready") + + if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) { + if err := cacher.ready.wait(context.Background()); err != nil { + t.Fatalf("unexpected error waiting for the cache to be ready") + } } cacher.watchCache.UpdateResourceVersion(fmt.Sprintf("%d", scenario.watchCacheResourceVersion)) @@ -2395,8 +2498,11 @@ func TestWatchStreamSeparation(t *testing.T) { featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.SeparateCacheWatchRPC, tc.separateCacheWatchRPC) _, cacher, _, terminate := testSetupWithEtcdServer(t) t.Cleanup(terminate) - if err := cacher.ready.wait(context.TODO()); err != nil { - t.Fatalf("unexpected error waiting for the cache to be ready") + + if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) { + if err := cacher.ready.wait(context.TODO()); err != nil { + t.Fatalf("unexpected error waiting for the cache to be ready") + } } getCacherRV := func() uint64 { diff --git a/staging/src/k8s.io/client-go/tools/cache/reflector_test.go b/staging/src/k8s.io/client-go/tools/cache/reflector_test.go index 84a8d2697f2..265ba9a7e2a 100644 --- a/staging/src/k8s.io/client-go/tools/cache/reflector_test.go +++ b/staging/src/k8s.io/client-go/tools/cache/reflector_test.go @@ -507,13 +507,59 @@ func TestBackoffOnTooManyRequests(t *testing.T) { } stopCh := make(chan struct{}) - r.ListAndWatch(stopCh) + if err := r.ListAndWatch(stopCh); err != nil { + t.Fatal(err) + } close(stopCh) if bm.calls != 2 { t.Errorf("unexpected watch backoff calls: %d", bm.calls) } } +func TestNoRelistOnTooManyRequests(t *testing.T) { + err := apierrors.NewTooManyRequests("too many requests", 1) + clock := &clock.RealClock{} + bm := &fakeBackoff{clock: clock} + listCalls, watchCalls := 0, 0 + + lw := &testLW{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + listCalls++ + return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "1"}}, nil + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + watchCalls++ + if watchCalls < 5 { + return nil, err + } + w := watch.NewFake() + w.Stop() + return w, nil + }, + } + + r := &Reflector{ + name: "test-reflector", + listerWatcher: lw, + store: NewFIFO(MetaNamespaceKeyFunc), + backoffManager: bm, + clock: clock, + watchErrorHandler: WatchErrorHandler(DefaultWatchErrorHandler), + } + + stopCh := make(chan struct{}) + if err := r.ListAndWatch(stopCh); err != nil { + t.Fatal(err) + } + close(stopCh) + if listCalls != 1 { + t.Errorf("unexpected list calls: %d", listCalls) + } + if watchCalls != 5 { + t.Errorf("unexpected watch calls: %d", watchCalls) + } +} + func TestRetryInternalError(t *testing.T) { testCases := []struct { name string From 2854d84056e9559741b5575cd556316f679c6d12 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Tyczy=C5=84ski?= Date: Thu, 6 Jun 2024 15:59:03 +0200 Subject: [PATCH 2/2] Fix ChangeCRD test --- .../test/integration/change_test.go | 118 +++++++++++------- 1 file changed, 70 insertions(+), 48 deletions(-) diff --git a/staging/src/k8s.io/apiextensions-apiserver/test/integration/change_test.go b/staging/src/k8s.io/apiextensions-apiserver/test/integration/change_test.go index b0ddb00da21..59811f57c24 100644 --- a/staging/src/k8s.io/apiextensions-apiserver/test/integration/change_test.go +++ b/staging/src/k8s.io/apiextensions-apiserver/test/integration/change_test.go @@ -59,9 +59,74 @@ func TestChangeCRD(t *testing.T) { ns := "default" noxuNamespacedResourceClient := newNamespacedCustomResourceVersionedClient(ns, dynamicClient, noxuDefinition, "v1beta1") - stopChan := make(chan struct{}) + updateCRD := func() { + noxuDefinitionToUpdate, err := apiExtensionsClient.ApiextensionsV1().CustomResourceDefinitions().Get(context.TODO(), noxuDefinition.Name, metav1.GetOptions{}) + if err != nil { + t.Error(err) + return + } + if len(noxuDefinitionToUpdate.Spec.Versions) == 1 { + v2 := noxuDefinitionToUpdate.Spec.Versions[0] + v2.Name = "v2" + v2.Served = true + v2.Storage = false + noxuDefinitionToUpdate.Spec.Versions = append(noxuDefinitionToUpdate.Spec.Versions, v2) + } else { + noxuDefinitionToUpdate.Spec.Versions = noxuDefinitionToUpdate.Spec.Versions[0:1] + } + if _, err := apiExtensionsClient.ApiextensionsV1().CustomResourceDefinitions().Update(context.TODO(), noxuDefinitionToUpdate, metav1.UpdateOptions{}); err != nil && !apierrors.IsConflict(err) { + t.Error(err) + } + } + // Set up 10 watchers for custom resource. + // We can't exercise them in a loop the same way as get requests, as watchcache + // can reject them with 429 and Retry-After: 1 if it is uninitialized and even + // though 429 is automatically retried, with frequent watchcache terminations and + // reinitializations they could either end-up being rejected N times and fail or + // or not initialize until the last watchcache reinitialization and then not be + // terminated. Thus we exercise their termination explicitly at the beginning. wg := &sync.WaitGroup{} + for i := 0; i < 10; i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + + w, err := noxuNamespacedResourceClient.Watch(context.TODO(), metav1.ListOptions{}) + if err != nil { + t.Errorf("unexpected error establishing watch: %v", err) + return + } + for event := range w.ResultChan() { + switch event.Type { + case watch.Added, watch.Modified, watch.Deleted: + // all expected + default: + t.Errorf("unexpected watch event: %#v", event) + } + } + }(i) + } + + // Let all the established watches soak request loops soak + time.Sleep(5 * time.Second) + + // Update CRD and ensure that all watches are gracefully terminated. + updateCRD() + + drained := make(chan struct{}) + go func() { + defer close(drained) + wg.Wait() + }() + + select { + case <-drained: + case <-time.After(wait.ForeverTestTimeout): + t.Fatal("timed out waiting for watchers to be terminated") + } + + stopChan := make(chan struct{}) // Set up loop to modify CRD in the background wg.Add(1) @@ -76,28 +141,11 @@ func TestChangeCRD(t *testing.T) { time.Sleep(10 * time.Millisecond) - noxuDefinitionToUpdate, err := apiExtensionsClient.ApiextensionsV1().CustomResourceDefinitions().Get(context.TODO(), noxuDefinition.Name, metav1.GetOptions{}) - if err != nil { - t.Error(err) - continue - } - if len(noxuDefinitionToUpdate.Spec.Versions) == 1 { - v2 := noxuDefinitionToUpdate.Spec.Versions[0] - v2.Name = "v2" - v2.Served = true - v2.Storage = false - noxuDefinitionToUpdate.Spec.Versions = append(noxuDefinitionToUpdate.Spec.Versions, v2) - } else { - noxuDefinitionToUpdate.Spec.Versions = noxuDefinitionToUpdate.Spec.Versions[0:1] - } - if _, err := apiExtensionsClient.ApiextensionsV1().CustomResourceDefinitions().Update(context.TODO(), noxuDefinitionToUpdate, metav1.UpdateOptions{}); err != nil && !apierrors.IsConflict(err) { - t.Error(err) - continue - } + updateCRD() } }() - // Set up 10 loops creating and reading and watching custom resources + // Set up 10 loops creating and reading custom resources for i := 0; i < 10; i++ { wg.Add(1) go func(i int) { @@ -120,32 +168,6 @@ func TestChangeCRD(t *testing.T) { } } }(i) - - wg.Add(1) - go func(i int) { - defer wg.Done() - for { - time.Sleep(10 * time.Millisecond) - select { - case <-stopChan: - return - default: - w, err := noxuNamespacedResourceClient.Watch(context.TODO(), metav1.ListOptions{}) - if err != nil { - t.Errorf("unexpected error establishing watch: %v", err) - continue - } - for event := range w.ResultChan() { - switch event.Type { - case watch.Added, watch.Modified, watch.Deleted: - // all expected - default: - t.Errorf("unexpected watch event: %#v", event) - } - } - } - } - }(i) } // Let all the established get request loops soak @@ -155,7 +177,7 @@ func TestChangeCRD(t *testing.T) { close(stopChan) // Let loops drain - drained := make(chan struct{}) + drained = make(chan struct{}) go func() { defer close(drained) wg.Wait() @@ -164,6 +186,6 @@ func TestChangeCRD(t *testing.T) { select { case <-drained: case <-time.After(wait.ForeverTestTimeout): - t.Error("timed out waiting for clients to complete") + t.Fatal("timed out waiting for clients to complete") } }