From 7da7ddd779f9ea835f0c57deae05e050c543066b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Tyczy=C5=84ski?= Date: Wed, 26 Oct 2022 16:34:28 +0200 Subject: [PATCH] Refactor compaction in etcd3 tests --- .../apiserver/pkg/storage/etcd3/store_test.go | 29 +++++++++++++------ .../pkg/storage/etcd3/watcher_test.go | 15 +++++----- .../pkg/storage/testing/store_tests.go | 2 ++ 3 files changed, 29 insertions(+), 17 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go index f367d8327b7..6b6267268c7 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go @@ -253,8 +253,26 @@ func TestListContinuationWithFilter(t *testing.T) { storagetesting.RunTestListContinuationWithFilter(ctx, t, store, validation) } +func compactStorage(etcdClient *clientv3.Client) storagetesting.Compaction { + return func(ctx context.Context, t *testing.T, resourceVersion string) { + versioner := storage.APIObjectVersioner{} + rv, err := versioner.ParseResourceVersion(resourceVersion) + if err != nil { + t.Fatal(err) + } + if _, err := etcdClient.KV.Compact(ctx, int64(rv), clientv3.WithCompactPhysical()); err != nil { + t.Fatalf("Unable to compact, %v", err) + } + } +} + func TestListInconsistentContinuation(t *testing.T) { ctx, store, client := testSetup(t) + compaction := compactStorage(client) + + if compaction == nil { + t.Skipf("compaction callback not provided") + } // Setup storage with the following structure: // / @@ -342,15 +360,8 @@ func TestListInconsistentContinuation(t *testing.T) { } // compact to latest revision. - versioner := storage.APIObjectVersioner{} lastRVString := preset[2].storedObj.ResourceVersion - lastRV, err := versioner.ParseResourceVersion(lastRVString) - if err != nil { - t.Fatal(err) - } - if _, err := client.KV.Compact(ctx, int64(lastRV), clientv3.WithCompactPhysical()); err != nil { - t.Fatalf("Unable to compact, %v", err) - } + compaction(ctx, t, lastRVString) // The old continue token should have expired options = storage.ListOptions{ @@ -358,7 +369,7 @@ func TestListInconsistentContinuation(t *testing.T) { Predicate: pred(0, continueFromSecondItem), Recursive: true, } - err = store.GetList(ctx, "/", options, out) + err := store.GetList(ctx, "/", options, out) if err == nil { t.Fatalf("unexpected no error") } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go index cf76a162719..8e387cc266d 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go @@ -54,6 +54,8 @@ func TestDeleteTriggerWatch(t *testing.T) { // - watch from 0 is able to return events for objects whose previous version has been compacted func TestWatchFromZero(t *testing.T) { ctx, store, client := testSetup(t) + compaction := compactStorage(client) + key, storedObj := storagetesting.TestPropagateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "ns"}}) w, err := store.Watch(ctx, key, storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything}) @@ -81,6 +83,10 @@ func TestWatchFromZero(t *testing.T) { storagetesting.TestCheckResult(t, watch.Added, w, out) w.Stop() + if compaction == nil { + t.Skip("compaction callback not provided") + } + // Update again out = &example.Pod{} err = store.GuaranteedUpdate(ctx, key, out, true, nil, storage.SimpleUpdate( @@ -92,14 +98,7 @@ func TestWatchFromZero(t *testing.T) { } // Compact previous versions - revToCompact, err := store.versioner.ParseResourceVersion(out.ResourceVersion) - if err != nil { - t.Fatalf("Error converting %q to an int: %v", storedObj.ResourceVersion, err) - } - _, err = client.Compact(ctx, int64(revToCompact), clientv3.WithCompactPhysical()) - if err != nil { - t.Fatalf("Error compacting: %v", err) - } + compaction(ctx, t, out.ResourceVersion) // Make sure we can still watch from 0 and receive an ADDED event w, err = store.Watch(ctx, key, storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything}) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/testing/store_tests.go b/staging/src/k8s.io/apiserver/pkg/storage/testing/store_tests.go index f7c92a3ff81..7dec4985691 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/testing/store_tests.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/testing/store_tests.go @@ -1410,6 +1410,8 @@ func RunTestListContinuationWithFilter(ctx context.Context, t *testing.T, store } } +type Compaction func(ctx context.Context, t *testing.T, resourceVersion string) + type PrefixTransformerModifier func(*PrefixTransformer) value.Transformer type InterfaceWithPrefixTransformer interface {