From 0b8e79580eb3a63ca7707626b4894adfb9125586 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Mon, 4 Mar 2024 19:35:34 +0100 Subject: [PATCH] Fix enabling consistent list from watch cache also works for resourceVersion=0 --- .../apiserver/pkg/storage/cacher/cacher.go | 2 +- .../pkg/storage/cacher/cacher_test.go | 35 +++++++-- .../apiserver/pkg/storage/etcd3/store_test.go | 9 ++- .../pkg/storage/testing/store_tests.go | 71 ++++++++++++++++++- 4 files changed, 109 insertions(+), 8 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go index 819cf56698b..f94feb7c4fe 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go @@ -773,7 +773,7 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio // minimal resource version, simply forward the request to storage. return c.storage.GetList(ctx, key, opts, listObj) } - if listRV == 0 && utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) { + if resourceVersion == "" && utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) { listRV, err = storage.GetCurrentResourceVersionFromStorage(ctx, c.storage, c.newListFunc, c.resourcePrefix, c.objectType.String()) if err != nil { return err diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_test.go index 1fafb58bbca..9ace1829492 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_test.go @@ -167,20 +167,47 @@ func TestPreconditionalDeleteWithSuggestionPass(t *testing.T) { } func TestList(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, false)() ctx, cacher, server, terminate := testSetupWithEtcdServer(t) t.Cleanup(terminate) storagetesting.RunTestList(ctx, t, cacher, compactStorage(cacher, server.V3Client), true) } -func TestListWithListFromCache(t *testing.T) { - // TODO(https://github.com/etcd-io/etcd/issues/17507): Remove skip. - t.Skip("This test flakes flakes due to https://github.com/etcd-io/etcd/issues/17507") +func TestListWithConsistentListFromCache(t *testing.T) { defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, true)() ctx, cacher, server, terminate := testSetupWithEtcdServer(t) t.Cleanup(terminate) + // Wait before sending watch progress request to avoid https://github.com/etcd-io/etcd/issues/17507 + // TODO(https://github.com/etcd-io/etcd/issues/17507): Remove the wait when etcd is upgraded to version with fix. + err := cacher.ready.wait(ctx) + if err != nil { + t.Fatal(err) + } + time.Sleep(time.Second) storagetesting.RunTestList(ctx, t, cacher, compactStorage(cacher, server.V3Client), true) } +func TestConsistentList(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, false)() + ctx, cacher, server, terminate := testSetupWithEtcdServer(t) + t.Cleanup(terminate) + storagetesting.RunTestConsistentList(ctx, t, cacher, compactStorage(cacher, server.V3Client), true, false) +} + +func TestConsistentListWithConsistentListFromCache(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, true)() + ctx, cacher, server, terminate := testSetupWithEtcdServer(t) + t.Cleanup(terminate) + // Wait before sending watch progress request to avoid https://github.com/etcd-io/etcd/issues/17507 + // TODO(https://github.com/etcd-io/etcd/issues/17507): Remove the wait when etcd is upgraded to version with fix. + err := cacher.ready.wait(ctx) + if err != nil { + t.Fatal(err) + } + time.Sleep(time.Second) + storagetesting.RunTestConsistentList(ctx, t, cacher, compactStorage(cacher, server.V3Client), true, true) +} + func TestGetListNonRecursive(t *testing.T) { ctx, cacher, terminate := testSetup(t) t.Cleanup(terminate) @@ -216,7 +243,7 @@ func TestListInconsistentContinuation(t *testing.T) { storagetesting.RunTestListInconsistentContinuation(ctx, t, cacher, nil) } -func TestConsistentList(t *testing.T) { +func TestListResourceVersionMatch(t *testing.T) { // TODO(#109831): Enable use of this test and run it. } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go index 182a4aea5c4..ef5bc3083ba 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go @@ -216,6 +216,11 @@ func TestList(t *testing.T) { storagetesting.RunTestList(ctx, t, store, compactStorage(client), false) } +func TestConsistentList(t *testing.T) { + ctx, store, client := testSetup(t) + storagetesting.RunTestConsistentList(ctx, t, store, compactStorage(client), false, true) +} + func checkStorageCallsInvariants(transformer *storagetesting.PrefixTransformer, recorder *clientRecorder) storagetesting.CallsValidation { return func(t *testing.T, pageSize, estimatedProcessedObjects uint64) { if reads := transformer.GetReadsAndReset(); reads != estimatedProcessedObjects { @@ -285,9 +290,9 @@ func TestListInconsistentContinuation(t *testing.T) { storagetesting.RunTestListInconsistentContinuation(ctx, t, store, compactStorage(client)) } -func TestConsistentList(t *testing.T) { +func TestListResourceVersionMatch(t *testing.T) { ctx, store, _ := testSetup(t) - storagetesting.RunTestConsistentList(ctx, t, &storeWithPrefixTransformer{store}) + storagetesting.RunTestListResourceVersionMatch(ctx, t, &storeWithPrefixTransformer{store}) } func TestCount(t *testing.T) { diff --git a/staging/src/k8s.io/apiserver/pkg/storage/testing/store_tests.go b/staging/src/k8s.io/apiserver/pkg/storage/testing/store_tests.go index c5a1d98ec7c..65aa39cd5a4 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/testing/store_tests.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/testing/store_tests.go @@ -1170,6 +1170,14 @@ func RunTestList(ctx context.Context, t *testing.T, store storage.Interface, com expectRV: currentRV, expectedOut: []example.Pod{}, }, + { + name: "test non-consistent List", + prefix: "/pods/empty", + pred: storage.Everything, + rv: "0", + expectRVFunc: resourceVersionNotOlderThan(list.ResourceVersion), + expectedOut: []example.Pod{}, + }, } for _, tt := range tests { @@ -1242,6 +1250,67 @@ func RunTestList(ctx context.Context, t *testing.T, store storage.Interface, com } } +func RunTestConsistentList(ctx context.Context, t *testing.T, store storage.Interface, compaction Compaction, cacheEnabled, consistentReadsSupported bool) { + outPod := &example.Pod{} + inPod := &example.Pod{ObjectMeta: metav1.ObjectMeta{Namespace: "default", Name: "foo"}} + err := store.Create(ctx, computePodKey(inPod), inPod, outPod, 0) + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + lastObjecRV := outPod.ResourceVersion + compaction(ctx, t, outPod.ResourceVersion) + parsedRV, _ := strconv.Atoi(outPod.ResourceVersion) + currentRV := fmt.Sprintf("%d", parsedRV+1) + + firstNonConsistentReadRV := lastObjecRV + if consistentReadsSupported && !cacheEnabled { + firstNonConsistentReadRV = currentRV + } + + secondNonConsistentReadRV := lastObjecRV + if consistentReadsSupported { + secondNonConsistentReadRV = currentRV + } + + tcs := []struct { + name string + requestRV string + expectResponseRV string + }{ + { + name: "Non-consistent list before sync", + requestRV: "0", + expectResponseRV: firstNonConsistentReadRV, + }, + { + name: "Consistent request returns currentRV", + requestRV: "", + expectResponseRV: currentRV, + }, + { + name: "Non-consistent request after sync returns currentRV", + requestRV: "0", + expectResponseRV: secondNonConsistentReadRV, + }, + } + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + out := &example.PodList{} + opts := storage.ListOptions{ + ResourceVersion: tc.requestRV, + Predicate: storage.Everything, + } + err = store.GetList(ctx, "/pods/empty", opts, out) + if err != nil { + t.Fatalf("GetList failed: %v", err) + } + if out.ResourceVersion != tc.expectResponseRV { + t.Errorf("resourceVersion in list response want=%s, got=%s", tc.expectResponseRV, out.ResourceVersion) + } + }) + } +} + // seedMultiLevelData creates a set of keys with a multi-level structure, returning a resourceVersion // from before any were created along with the full set of objects that were persisted func seedMultiLevelData(ctx context.Context, store storage.Interface) (string, []*example.Pod, error) { @@ -1952,7 +2021,7 @@ type InterfaceWithPrefixTransformer interface { UpdatePrefixTransformer(PrefixTransformerModifier) func() } -func RunTestConsistentList(ctx context.Context, t *testing.T, store InterfaceWithPrefixTransformer) { +func RunTestListResourceVersionMatch(ctx context.Context, t *testing.T, store InterfaceWithPrefixTransformer) { nextPod := func(index uint32) (string, *example.Pod) { obj := &example.Pod{ ObjectMeta: metav1.ObjectMeta{