diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_test.go index 9fc2d6aa2fe..21400048ac9 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_test.go @@ -145,9 +145,9 @@ func TestPreconditionalDeleteWithSuggestion(t *testing.T) { } func TestList(t *testing.T) { - ctx, cacher, terminate := testSetup(t) + ctx, cacher, server, terminate := testSetupWithEtcdServer(t) t.Cleanup(terminate) - storagetesting.RunTestList(ctx, t, cacher, true) + storagetesting.RunTestList(ctx, t, cacher, compactStorage(cacher, server.V3Client), true) } func TestListWithoutPaging(t *testing.T) { @@ -238,9 +238,9 @@ func TestWatch(t *testing.T) { } func TestWatchFromZero(t *testing.T) { - ctx, cacher, terminate := testSetup(t) + ctx, cacher, server, terminate := testSetupWithEtcdServer(t) t.Cleanup(terminate) - storagetesting.RunTestWatchFromZero(ctx, t, cacher, compactStorage(cacher)) + storagetesting.RunTestWatchFromZero(ctx, t, cacher, compactStorage(cacher, server.V3Client)) } func TestDeleteTriggerWatch(t *testing.T) { @@ -365,6 +365,11 @@ func withoutPaging(options *setupOptions) { } func testSetup(t *testing.T, opts ...setupOption) (context.Context, *Cacher, tearDownFunc) { + ctx, cacher, _, tearDown := testSetupWithEtcdServer(t, opts...) + return ctx, cacher, tearDown +} + +func testSetupWithEtcdServer(t *testing.T, opts ...setupOption) (context.Context, *Cacher, *etcd3testing.EtcdTestServer, tearDownFunc) { setupOpts := setupOptions{} opts = append([]setupOption{withDefaults}, opts...) for _, opt := range opts { @@ -407,5 +412,5 @@ func testSetup(t *testing.T, opts ...setupOption) (context.Context, *Cacher, tea t.Fatalf("Failed to inject list errors: %v", err) } - return ctx, cacher, terminate + return ctx, cacher, server, terminate } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_testing_utils_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_testing_utils_test.go index c7c6968be71..f738cdbdea0 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_testing_utils_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_testing_utils_test.go @@ -21,6 +21,8 @@ import ( "fmt" "testing" + clientv3 "go.etcd.io/etcd/client/v3" + "k8s.io/apimachinery/pkg/api/apitesting" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -71,7 +73,7 @@ func computePodKey(obj *example.Pod) string { return fmt.Sprintf("/pods/%s/%s", obj.Namespace, obj.Name) } -func compactStorage(c *Cacher) storagetesting.Compaction { +func compactStorage(c *Cacher, client *clientv3.Client) storagetesting.Compaction { return func(ctx context.Context, t *testing.T, resourceVersion string) { versioner := storage.APIObjectVersioner{} rv, err := versioner.ParseResourceVersion(resourceVersion) @@ -93,9 +95,6 @@ func compactStorage(c *Cacher) storagetesting.Compaction { if c.watchCache.resourceVersion < rv { t.Fatalf("Can't compact into a future version: %v", resourceVersion) } - if rv < c.watchCache.listResourceVersion { - t.Fatalf("Can't compact into a past version: %v", resourceVersion) - } if len(c.watchers.allWatchers) > 0 || len(c.watchers.valueWatchers) > 0 { // We could consider terminating those watchers, but given @@ -114,6 +113,11 @@ func compactStorage(c *Cacher) storagetesting.Compaction { } c.watchCache.listResourceVersion = rv - // TODO(wojtek-t): We should also compact the underlying etcd storage. + if _, err = client.KV.Put(ctx, "compact_rev_key", resourceVersion); err != nil { + t.Fatalf("Could not update compact_rev_key: %v", err) + } + if _, err = client.Compact(ctx, int64(rv)); err != nil { + t.Fatalf("Could not compact: %v", err) + } } } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go index 29be72a6668..ffc9891b8da 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go @@ -191,8 +191,8 @@ func TestTransformationFailure(t *testing.T) { } func TestList(t *testing.T) { - ctx, store, _ := testSetup(t) - storagetesting.RunTestList(ctx, t, store, false) + ctx, store, client := testSetup(t) + storagetesting.RunTestList(ctx, t, store, compactStorage(client), false) } func TestListWithoutPaging(t *testing.T) { @@ -258,7 +258,7 @@ func compactStorage(etcdClient *clientv3.Client) storagetesting.Compaction { if err != nil { t.Fatal(err) } - if _, err := etcdClient.KV.Compact(ctx, int64(rv), clientv3.WithCompactPhysical()); err != nil { + if _, _, err = compact(ctx, etcdClient, 0, int64(rv)); err != nil { t.Fatalf("Unable to compact, %v", err) } } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/testing/store_tests.go b/staging/src/k8s.io/apiserver/pkg/storage/testing/store_tests.go index f5dfd72ec69..aa0a8559d94 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/testing/store_tests.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/testing/store_tests.go @@ -478,7 +478,7 @@ func RunTestPreconditionalDeleteWithSuggestion(ctx context.Context, t *testing.T } } -func RunTestList(ctx context.Context, t *testing.T, store storage.Interface, ignoreWatchCacheTests bool) { +func RunTestList(ctx context.Context, t *testing.T, store storage.Interface, compaction Compaction, ignoreWatchCacheTests bool) { defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.RemainingItemCount, true)() initialRV, preset, err := seedMultiLevelData(ctx, store) @@ -506,6 +506,11 @@ func RunTestList(ctx context.Context, t *testing.T, store storage.Interface, ign pod := obj.(*example.Pod) return nil, fields.Set{"metadata.name": pod.Name, "spec.nodeName": pod.Spec.NodeName}, nil } + // Use compact to increase etcd global revision without changes to any resources. + // The increase in resources version comes from Kubernetes compaction updating hidden key. + // Used to test consistent List to confirm it returns latest etcd revision. + compaction(ctx, t, initialRV) + currentRV := fmt.Sprintf("%d", continueRV+1) tests := []struct { name string @@ -706,7 +711,7 @@ func RunTestList(ctx context.Context, t *testing.T, store storage.Interface, ign expectedRemainingItemCount: utilpointer.Int64(1), rv: list.ResourceVersion, rvMatch: metav1.ResourceVersionMatchNotOlderThan, - expectRV: list.ResourceVersion, + expectRVFunc: resourceVersionNotOlderThan(list.ResourceVersion), }, { name: "test List with limit at resource version 0", @@ -1019,6 +1024,14 @@ func RunTestList(ctx context.Context, t *testing.T, store storage.Interface, ign rvMatch: metav1.ResourceVersionMatchNotOlderThan, expectedOut: []example.Pod{}, }, + { + name: "test consistent List", + prefix: "/pods/empty", + pred: storage.Everything, + rv: "", + expectRV: currentRV, + expectedOut: []example.Pod{}, + }, } for _, tt := range tests {