diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_test.go index 795aea2e636..5b368202eff 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_test.go @@ -182,7 +182,7 @@ func TestList(t *testing.T) { featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, consistentRead) ctx, cacher, server, terminate := testSetupWithEtcdServer(t) t.Cleanup(terminate) - storagetesting.RunTestList(ctx, t, cacher, compactStorage(cacher, server.V3Client.Client), true) + storagetesting.RunTestList(ctx, t, cacher, increaseRV(server.V3Client.Client), true) }) } } @@ -192,7 +192,7 @@ func TestConsistentList(t *testing.T) { featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, consistentRead) ctx, cacher, server, terminate := testSetupWithEtcdServer(t) t.Cleanup(terminate) - storagetesting.RunTestConsistentList(ctx, t, cacher, compactStorage(cacher, server.V3Client.Client), true, consistentRead) + storagetesting.RunTestConsistentList(ctx, t, cacher, increaseRV(server.V3Client.Client), true, consistentRead) }) } } @@ -203,7 +203,7 @@ func TestGetListNonRecursive(t *testing.T) { featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, consistentRead) ctx, cacher, server, terminate := testSetupWithEtcdServer(t) t.Cleanup(terminate) - storagetesting.RunTestGetListNonRecursive(ctx, t, compactStorage(cacher, server.V3Client.Client), cacher) + storagetesting.RunTestGetListNonRecursive(ctx, t, increaseRV(server.V3Client.Client), cacher) }) } } @@ -297,7 +297,7 @@ func TestWatch(t *testing.T) { func TestWatchFromZero(t *testing.T) { ctx, cacher, server, terminate := testSetupWithEtcdServer(t) t.Cleanup(terminate) - storagetesting.RunTestWatchFromZero(ctx, t, cacher, compactStorage(cacher, server.V3Client.Client)) + storagetesting.RunTestWatchFromZero(ctx, t, cacher, compactWatchCache(cacher, server.V3Client.Client)) } func TestDeleteTriggerWatch(t *testing.T) { diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_testing_utils_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_testing_utils_test.go index bd574ab132e..cf83231016b 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_testing_utils_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_testing_utils_test.go @@ -78,7 +78,7 @@ func computePodKey(obj *example.Pod) string { return fmt.Sprintf("/pods/%s/%s", obj.Namespace, obj.Name) } -func compactStorage(c *CacheDelegator, client *clientv3.Client) storagetesting.Compaction { +func compactWatchCache(c *CacheDelegator, client *clientv3.Client) storagetesting.Compaction { return func(ctx context.Context, t *testing.T, resourceVersion string) { versioner := storage.APIObjectVersioner{} rv, err := versioner.ParseResourceVersion(resourceVersion) @@ -117,12 +117,16 @@ func compactStorage(c *CacheDelegator, client *clientv3.Client) storagetesting.C c.cacher.watchCache.startIndex++ } c.cacher.watchCache.listResourceVersion = rv - - if _, err = client.KV.Put(ctx, "compact_rev_key", resourceVersion); err != nil { - t.Fatalf("Could not update compact_rev_key: %v", err) - } - if _, err = client.Compact(ctx, int64(rv)); err != nil { + if _, err := client.Compact(ctx, int64(rv)); err != nil { t.Fatalf("Could not compact: %v", err) } } } + +func increaseRV(client *clientv3.Client) storagetesting.IncreaseRVFunc { + return func(ctx context.Context, t *testing.T) { + if _, err := client.KV.Put(ctx, "increaseRV", "ok"); err != nil { + t.Fatalf("Could not update increaseRV: %v", err) + } + } +} diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go index c7238c17f58..8fcee6f4950 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go @@ -172,7 +172,7 @@ func TestListPaging(t *testing.T) { func TestGetListNonRecursive(t *testing.T) { ctx, store, client := testSetup(t) - storagetesting.RunTestGetListNonRecursive(ctx, t, compactStorage(client.Client), store) + storagetesting.RunTestGetListNonRecursive(ctx, t, increaseRV(client.Client), store) } func TestGetListRecursivePrefix(t *testing.T) { @@ -249,12 +249,12 @@ func TestTransformationFailure(t *testing.T) { func TestList(t *testing.T) { ctx, store, client := testSetup(t) - storagetesting.RunTestList(ctx, t, store, compactStorage(client.Client), false) + storagetesting.RunTestList(ctx, t, store, increaseRV(client.Client), false) } func TestConsistentList(t *testing.T) { ctx, store, client := testSetup(t) - storagetesting.RunTestConsistentList(ctx, t, store, compactStorage(client.Client), false, true) + storagetesting.RunTestConsistentList(ctx, t, store, increaseRV(client.Client), false, true) } func checkStorageCallsInvariants(transformer *storagetesting.PrefixTransformer, recorder *clientRecorder) storagetesting.CallsValidation { @@ -313,19 +313,27 @@ func TestNamespaceScopedList(t *testing.T) { storagetesting.RunTestNamespaceScopedList(ctx, t, store) } -func compactStorage(etcdClient *clientv3.Client) storagetesting.Compaction { +func compactStorage(client *clientv3.Client) storagetesting.Compaction { return func(ctx context.Context, t *testing.T, resourceVersion string) { versioner := storage.APIObjectVersioner{} rv, err := versioner.ParseResourceVersion(resourceVersion) if err != nil { t.Fatal(err) } - if _, _, err = compact(ctx, etcdClient, 0, int64(rv)); err != nil { + if _, err = client.Compact(ctx, int64(rv)); err != nil { t.Fatalf("Unable to compact, %v", err) } } } +func increaseRV(client *clientv3.Client) storagetesting.IncreaseRVFunc { + return func(ctx context.Context, t *testing.T) { + if _, err := client.KV.Put(ctx, "increaseRV", "ok"); err != nil { + t.Fatalf("Could not update increaseRV: %v", err) + } + } +} + func TestListInconsistentContinuation(t *testing.T) { ctx, store, client := testSetup(t) storagetesting.RunTestListInconsistentContinuation(ctx, t, store, compactStorage(client.Client)) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/testing/store_tests.go b/staging/src/k8s.io/apiserver/pkg/storage/testing/store_tests.go index f8ed2a60870..bba452676c7 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/testing/store_tests.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/testing/store_tests.go @@ -622,7 +622,7 @@ func RunTestPreconditionalDeleteWithOnlySuggestionPass(ctx context.Context, t *t expectNoDiff(t, "incorrect pod:", updatedPod, out) } -func RunTestList(ctx context.Context, t *testing.T, store storage.Interface, compaction Compaction, ignoreWatchCacheTests bool) { +func RunTestList(ctx context.Context, t *testing.T, store storage.Interface, increaseRV IncreaseRVFunc, ignoreWatchCacheTests bool) { initialRV, createdPods, updatedPod, err := seedMultiLevelData(ctx, store) if err != nil { t.Fatal(err) @@ -648,10 +648,8 @@ func RunTestList(ctx context.Context, t *testing.T, store storage.Interface, com pod := obj.(*example.Pod) return nil, fields.Set{"metadata.name": pod.Name, "spec.nodeName": pod.Spec.NodeName}, nil } - // Use compact to increase etcd global revision without changes to any resources. - // The increase in resources version comes from Kubernetes compaction updating hidden key. - // Used to test consistent List to confirm it returns latest etcd revision. - compaction(ctx, t, initialRV) + // Increase RV to test consistent List. + increaseRV(ctx, t) currentRV := fmt.Sprintf("%d", continueRV+1) tests := []struct { @@ -1591,7 +1589,7 @@ func ExpectContinueMatches(t *testing.T, expect, got string) { t.Errorf("expected continue token: %s, got: %s", expectDecoded, gotDecoded) } -func RunTestConsistentList(ctx context.Context, t *testing.T, store storage.Interface, compaction Compaction, cacheEnabled, consistentReadsSupported bool) { +func RunTestConsistentList(ctx context.Context, t *testing.T, store storage.Interface, increaseRV IncreaseRVFunc, cacheEnabled, consistentReadsSupported bool) { outPod := &example.Pod{} inPod := &example.Pod{ObjectMeta: metav1.ObjectMeta{Namespace: "default", Name: "foo"}} err := store.Create(ctx, computePodKey(inPod), inPod, outPod, 0) @@ -1599,7 +1597,7 @@ func RunTestConsistentList(ctx context.Context, t *testing.T, store storage.Inte t.Errorf("Unexpected error: %v", err) } lastObjecRV := outPod.ResourceVersion - compaction(ctx, t, outPod.ResourceVersion) + increaseRV(ctx, t) parsedRV, _ := strconv.Atoi(outPod.ResourceVersion) currentRV := fmt.Sprintf("%d", parsedRV+1) @@ -1609,7 +1607,7 @@ func RunTestConsistentList(ctx context.Context, t *testing.T, store storage.Inte } secondNonConsistentReadRV := lastObjecRV - if consistentReadsSupported { + if !cacheEnabled || consistentReadsSupported { secondNonConsistentReadRV = currentRV } @@ -1749,7 +1747,7 @@ func seedMultiLevelData(ctx context.Context, store storage.Interface) (initialRV return initialRV, created, updated, nil } -func RunTestGetListNonRecursive(ctx context.Context, t *testing.T, compaction Compaction, store storage.Interface) { +func RunTestGetListNonRecursive(ctx context.Context, t *testing.T, increaseRV IncreaseRVFunc, store storage.Interface) { key, prevStoredObj := testPropagateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "test-ns"}}) prevRV, _ := strconv.Atoi(prevStoredObj.ResourceVersion) @@ -1763,10 +1761,8 @@ func RunTestGetListNonRecursive(ctx context.Context, t *testing.T, compaction Co t.Fatalf("update failed: %v", err) } objRV, _ := strconv.Atoi(storedObj.ResourceVersion) - // Use compact to increase etcd global revision without changes to any resources. - // The increase in resources version comes from Kubernetes compaction updating hidden key. - // Used to test consistent List to confirm it returns latest etcd revision. - compaction(ctx, t, prevStoredObj.ResourceVersion) + // Increase RV to test consistent List. + increaseRV(ctx, t) tests := []struct { name string @@ -2320,6 +2316,7 @@ func RunTestListContinuationWithFilter(ctx context.Context, t *testing.T, store } type Compaction func(ctx context.Context, t *testing.T, resourceVersion string) +type IncreaseRVFunc func(ctx context.Context, t *testing.T) func RunTestListInconsistentContinuation(ctx context.Context, t *testing.T, store storage.Interface, compaction Compaction) { if compaction == nil {