From d4a4a1d881ea46857afe67d3ca89d106fa59c6b1 Mon Sep 17 00:00:00 2001 From: Harish Kuna Date: Thu, 5 Jun 2025 20:38:07 +0000 Subject: [PATCH] Fix -Consistent paginated lists serve from cache --- .../storage/cacher/cacher_whitebox_test.go | 120 ++++++++++++++++++ .../apiserver/pkg/storage/cacher/delegator.go | 19 ++- 2 files changed, 134 insertions(+), 5 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go index e7fccdab263..6bd17feba15 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go @@ -662,6 +662,7 @@ func TestMatchExactResourceVersionFallback(t *testing.T) { } for _, tc := range tcs { t.Run(tc.name, func(t *testing.T) { + featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ListFromCacheSnapshot, true) backingStorage := &dummyStorage{} expectStoreRequests := 0 backingStorage.getListFn = func(_ context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error { @@ -759,6 +760,125 @@ func TestGetListNonRecursiveCacheBypass(t *testing.T) { } } +func TestGetListNonRecursiveCacheWithConsistentListFromCache(t *testing.T) { + // Set feature gates once at the beginning since we only care about ConsistentListFromCache=true and ListFromCacheSnapshot=false + featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, true) + featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ListFromCacheSnapshot, false) + forceRequestWatchProgressSupport(t) + + tests := []struct { + name string + consistentListFromCache bool + expectGetListCallCount int + expectGetCurrentRV bool + injectRVError bool + expectedError error + }{ + { + name: "ConsistentListFromCache enabled - served from cache", + consistentListFromCache: true, + expectGetListCallCount: 1, + expectGetCurrentRV: true, + injectRVError: false, + expectedError: nil, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + var getListCount, getCurrentRVCount int + backingStorage := &dummyStorage{} + + backingStorage.getListFn = func(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error { + getListCount++ + if tc.injectRVError { + return errDummy + } + podList := listObj.(*example.PodList) + podList.ListMeta = metav1.ListMeta{ResourceVersion: "100"} + return nil + } + + backingStorage.getRVFn = func(ctx context.Context) (uint64, error) { + getCurrentRVCount++ + rv := uint64(100) + err := error(nil) + if tc.injectRVError { + err = errDummy + return 0, err + } + return rv, nil + } + + cacher, v, err := newTestCacher(backingStorage) + if err != nil { + t.Fatalf("Couldn't create cacher: %v", err) + } + defer cacher.Stop() + + // Wait for cacher to be ready before injecting errors + if err := cacher.ready.wait(context.Background()); err != nil { + t.Fatalf("unexpected error waiting for the cache to be ready: %v", err) + } + delegator := NewCacheDelegator(cacher, backingStorage) + defer delegator.Stop() + + // Setup test object + key := "pods/ns" + input := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "ns"}} + if err := v.UpdateObject(input, 100); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + // Put object into the store + if err := cacher.watchCache.Add(input); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + pred := storage.SelectionPredicate{ + Label: labels.Everything(), + Field: fields.Everything(), + Limit: 500, + } + result := &example.PodList{} + + // Make the list call with empty RV - delegator will get current RV and use it + err = delegator.GetList(context.TODO(), key, storage.ListOptions{ + ResourceVersion: "", + Predicate: pred, + Recursive: true, + }, result) + + // Verify error matches expectation + if !errors.Is(err, tc.expectedError) { + t.Errorf("Expected error %v, got: %v", tc.expectedError, err) + } + + // Verify the correct storage method was called + if getListCount != tc.expectGetListCallCount { + t.Errorf("Expected GetList to be called %d times, but it was called %d times", tc.expectGetListCallCount, getListCount) + } + if tc.expectGetCurrentRV && getCurrentRVCount == 0 { + t.Error("Expected GetCurrentResourceVersion to be called, but it wasn't") + } + if !tc.expectGetCurrentRV && getCurrentRVCount > 0 { + t.Errorf("Expected GetCurrentResourceVersion not to be called, but it was called %d times", getCurrentRVCount) + } + + // For successful cache reads, verify the resource version + if err == nil { + resultRV, err := cacher.versioner.ParseResourceVersion(result.ResourceVersion) + if err != nil { + t.Fatalf("Failed to parse result resource version: %v", err) + } + expectedRV := uint64(100) + if resultRV != expectedRV { + t.Errorf("Expected RV %d but got %d", expectedRV, resultRV) + } + } + }) + } +} func TestGetCacheBypass(t *testing.T) { backingStorage := &dummyStorage{} cacher, _, err := newTestCacher(backingStorage) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/delegator.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/delegator.go index ac17fb1c887..10d2ce4c811 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/delegator.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/delegator.go @@ -206,6 +206,7 @@ func (c *CacheDelegator) GetList(ctx context.Context, key string, opts storage.L return c.storage.GetList(ctx, key, opts, listObj) } } + fallbackOpts := opts if result.ConsistentRead { listRV, err = c.storage.GetCurrentResourceVersion(ctx) if err != nil { @@ -213,20 +214,28 @@ func (c *CacheDelegator) GetList(ctx context.Context, key string, opts storage.L } // Setting resource version for consistent read in cache based on current ResourceVersion in etcd. opts.ResourceVersion = strconv.FormatInt(int64(listRV), 10) + // If continue is not set, we need to set the resource version match to ResourceVersionMatchNotOlderThan to serve latest from cache + if opts.Predicate.Continue == "" { + opts.ResourceVersionMatch = metav1.ResourceVersionMatchNotOlderThan + } } err = c.cacher.GetList(ctx, key, opts, listObj) success := "true" fallback := "false" if err != nil { - if errors.IsResourceExpired(err) { - return c.storage.GetList(ctx, key, opts, listObj) + // ResourceExpired error occurs when attempting to list from cache with a specific resourceVersion + // that is no longer available in the cache. With ListFromCacheSnapshot feature (1.34+), we can + // serve exact resourceVersion requests from cache if available, falling back to storage only when + // the requested version is expired. + if errors.IsResourceExpired(err) && utilfeature.DefaultFeatureGate.Enabled(features.ListFromCacheSnapshot) { + return c.storage.GetList(ctx, key, fallbackOpts, listObj) } if result.ConsistentRead { + // IsTooLargeResourceVersion occurs when the requested RV is higher than cache's current RV + // and cache hasn't caught up within the timeout period. Fall back to etcd. if storage.IsTooLargeResourceVersion(err) { fallback = "true" - // Reset resourceVersion during fallback from consistent read. - opts.ResourceVersion = "" - err = c.storage.GetList(ctx, key, opts, listObj) + err = c.storage.GetList(ctx, key, fallbackOpts, listObj) } if err != nil { success = "false"