From b02f172cbdf7e824d13a6a9c3a9b9fe4f3f92afb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Tyczy=C5=84ski?= Date: Wed, 26 Oct 2022 20:47:19 +0200 Subject: [PATCH] Refactor storage tests using compaction --- .../apiserver/pkg/storage/etcd3/store_test.go | 155 +----------------- .../pkg/storage/etcd3/watcher_test.go | 56 +------ .../pkg/storage/testing/store_tests.go | 154 +++++++++++++++++ .../pkg/storage/testing/watcher_tests.go | 56 +++++++ 4 files changed, 212 insertions(+), 209 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go index 6b6267268c7..3785ef8ec52 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go @@ -23,7 +23,6 @@ import ( "os" "reflect" "strconv" - "strings" "sync" "sync/atomic" "testing" @@ -33,7 +32,6 @@ import ( "google.golang.org/grpc/grpclog" "k8s.io/apimachinery/pkg/api/apitesting" - apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" @@ -268,158 +266,7 @@ func compactStorage(etcdClient *clientv3.Client) storagetesting.Compaction { func TestListInconsistentContinuation(t *testing.T) { ctx, store, client := testSetup(t) - compaction := compactStorage(client) - - if compaction == nil { - t.Skipf("compaction callback not provided") - } - - // Setup storage with the following structure: - // / - // - one-level/ - // | - test - // | - // - two-level/ - // - 1/ - // | - test - // | - // - 2/ - // - test - // - preset := []struct { - key string - obj *example.Pod - storedObj *example.Pod - }{ - { - key: "/one-level/test", - obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, - }, - { - key: "/two-level/1/test", - obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, - }, - { - key: "/two-level/2/test", - obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}}, - }, - } - - for i, ps := range preset { - preset[i].storedObj = &example.Pod{} - err := store.Create(ctx, ps.key, ps.obj, preset[i].storedObj, 0) - if err != nil { - t.Fatalf("Set failed: %v", err) - } - } - - pred := func(limit int64, continueValue string) storage.SelectionPredicate { - return storage.SelectionPredicate{ - Limit: limit, - Continue: continueValue, - Label: labels.Everything(), - Field: fields.Everything(), - GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) { - pod := obj.(*example.Pod) - return nil, fields.Set{"metadata.name": pod.Name}, nil - }, - } - } - - out := &example.PodList{} - options := storage.ListOptions{ - ResourceVersion: "0", - Predicate: pred(1, ""), - Recursive: true, - } - if err := store.GetList(ctx, "/", options, out); err != nil { - t.Fatalf("Unable to get initial list: %v", err) - } - if len(out.Continue) == 0 { - t.Fatalf("No continuation token set") - } - storagetesting.ExpectNoDiff(t, "incorrect first page", []example.Pod{*preset[0].storedObj}, out.Items) - - continueFromSecondItem := out.Continue - - // update /two-level/2/test/bar - oldName := preset[2].obj.Name - newPod := &example.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: oldName, - Labels: map[string]string{ - "state": "new", - }, - }, - } - if err := store.GuaranteedUpdate(ctx, preset[2].key, preset[2].storedObj, false, nil, - func(_ runtime.Object, _ storage.ResponseMeta) (runtime.Object, *uint64, error) { - return newPod, nil, nil - }, newPod); err != nil { - t.Fatalf("update failed: %v", err) - } - - // compact to latest revision. - lastRVString := preset[2].storedObj.ResourceVersion - compaction(ctx, t, lastRVString) - - // The old continue token should have expired - options = storage.ListOptions{ - ResourceVersion: "0", - Predicate: pred(0, continueFromSecondItem), - Recursive: true, - } - err := store.GetList(ctx, "/", options, out) - if err == nil { - t.Fatalf("unexpected no error") - } - if !strings.Contains(err.Error(), inconsistentContinue) { - t.Fatalf("unexpected error message %v", err) - } - status, ok := err.(apierrors.APIStatus) - if !ok { - t.Fatalf("expect error of implements the APIStatus interface, got %v", reflect.TypeOf(err)) - } - inconsistentContinueFromSecondItem := status.Status().ListMeta.Continue - if len(inconsistentContinueFromSecondItem) == 0 { - t.Fatalf("expect non-empty continue token") - } - - out = &example.PodList{} - options = storage.ListOptions{ - ResourceVersion: "0", - Predicate: pred(1, inconsistentContinueFromSecondItem), - Recursive: true, - } - if err := store.GetList(ctx, "/", options, out); err != nil { - t.Fatalf("Unable to get second page: %v", err) - } - if len(out.Continue) == 0 { - t.Fatalf("No continuation token set") - } - validateResourceVersion := storagetesting.ResourceVersionNotOlderThan(lastRVString) - storagetesting.ExpectNoDiff(t, "incorrect second page", []example.Pod{*preset[1].storedObj}, out.Items) - if err := validateResourceVersion(out.ResourceVersion); err != nil { - t.Fatal(err) - } - continueFromThirdItem := out.Continue - resolvedResourceVersionFromThirdItem := out.ResourceVersion - out = &example.PodList{} - options = storage.ListOptions{ - ResourceVersion: "0", - Predicate: pred(1, continueFromThirdItem), - Recursive: true, - } - if err := store.GetList(ctx, "/", options, out); err != nil { - t.Fatalf("Unable to get second page: %v", err) - } - if len(out.Continue) != 0 { - t.Fatalf("Unexpected continuation token set") - } - storagetesting.ExpectNoDiff(t, "incorrect third page", []example.Pod{*preset[2].storedObj}, out.Items) - if out.ResourceVersion != resolvedResourceVersionFromThirdItem { - t.Fatalf("Expected list resource version to be %s, got %s", resolvedResourceVersionFromThirdItem, out.ResourceVersion) - } + storagetesting.RunTestListInconsistentContinuation(ctx, t, store, compactStorage(client)) } func newTestLeaseManagerConfig() LeaseManagerConfig { diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go index 8e387cc266d..cadd90cc3bf 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go @@ -49,63 +49,9 @@ func TestDeleteTriggerWatch(t *testing.T) { storagetesting.RunTestDeleteTriggerWatch(ctx, t, store) } -// TestWatchFromZero tests that -// - watch from 0 should sync up and grab the object added before -// - watch from 0 is able to return events for objects whose previous version has been compacted func TestWatchFromZero(t *testing.T) { ctx, store, client := testSetup(t) - compaction := compactStorage(client) - - key, storedObj := storagetesting.TestPropagateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "ns"}}) - - w, err := store.Watch(ctx, key, storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything}) - if err != nil { - t.Fatalf("Watch failed: %v", err) - } - storagetesting.TestCheckResult(t, watch.Added, w, storedObj) - w.Stop() - - // Update - out := &example.Pod{} - err = store.GuaranteedUpdate(ctx, key, out, true, nil, storage.SimpleUpdate( - func(runtime.Object) (runtime.Object, error) { - return &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "ns", Annotations: map[string]string{"a": "1"}}}, nil - }), nil) - if err != nil { - t.Fatalf("GuaranteedUpdate failed: %v", err) - } - - // Make sure when we watch from 0 we receive an ADDED event - w, err = store.Watch(ctx, key, storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything}) - if err != nil { - t.Fatalf("Watch failed: %v", err) - } - storagetesting.TestCheckResult(t, watch.Added, w, out) - w.Stop() - - if compaction == nil { - t.Skip("compaction callback not provided") - } - - // Update again - out = &example.Pod{} - err = store.GuaranteedUpdate(ctx, key, out, true, nil, storage.SimpleUpdate( - func(runtime.Object) (runtime.Object, error) { - return &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "ns"}}, nil - }), nil) - if err != nil { - t.Fatalf("GuaranteedUpdate failed: %v", err) - } - - // Compact previous versions - compaction(ctx, t, out.ResourceVersion) - - // Make sure we can still watch from 0 and receive an ADDED event - w, err = store.Watch(ctx, key, storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything}) - if err != nil { - t.Fatalf("Watch failed: %v", err) - } - storagetesting.TestCheckResult(t, watch.Added, w, out) + storagetesting.RunTestWatchFromZero(ctx, t, store, compactStorage(client)) } // TestWatchFromNoneZero tests that diff --git a/staging/src/k8s.io/apiserver/pkg/storage/testing/store_tests.go b/staging/src/k8s.io/apiserver/pkg/storage/testing/store_tests.go index 7dec4985691..686b80944e9 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/testing/store_tests.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/testing/store_tests.go @@ -23,6 +23,7 @@ import ( "math" "reflect" "strconv" + "strings" "sync" "testing" @@ -1412,6 +1413,159 @@ func RunTestListContinuationWithFilter(ctx context.Context, t *testing.T, store type Compaction func(ctx context.Context, t *testing.T, resourceVersion string) +func RunTestListInconsistentContinuation(ctx context.Context, t *testing.T, store storage.Interface, compaction Compaction) { + if compaction == nil { + t.Skipf("compaction callback not provided") + } + + // Setup storage with the following structure: + // / + // - one-level/ + // | - test + // | + // - two-level/ + // - 1/ + // | - test + // | + // - 2/ + // - test + // + preset := []struct { + key string + obj *example.Pod + storedObj *example.Pod + }{ + { + key: "/one-level/test", + obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, + }, + { + key: "/two-level/1/test", + obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, + }, + { + key: "/two-level/2/test", + obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}}, + }, + } + + for i, ps := range preset { + preset[i].storedObj = &example.Pod{} + err := store.Create(ctx, ps.key, ps.obj, preset[i].storedObj, 0) + if err != nil { + t.Fatalf("Set failed: %v", err) + } + } + + pred := func(limit int64, continueValue string) storage.SelectionPredicate { + return storage.SelectionPredicate{ + Limit: limit, + Continue: continueValue, + Label: labels.Everything(), + Field: fields.Everything(), + GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) { + pod := obj.(*example.Pod) + return nil, fields.Set{"metadata.name": pod.Name}, nil + }, + } + } + + out := &example.PodList{} + options := storage.ListOptions{ + ResourceVersion: "0", + Predicate: pred(1, ""), + Recursive: true, + } + if err := store.GetList(ctx, "/", options, out); err != nil { + t.Fatalf("Unable to get initial list: %v", err) + } + if len(out.Continue) == 0 { + t.Fatalf("No continuation token set") + } + ExpectNoDiff(t, "incorrect first page", []example.Pod{*preset[0].storedObj}, out.Items) + + continueFromSecondItem := out.Continue + + // update /two-level/2/test/bar + oldName := preset[2].obj.Name + newPod := &example.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: oldName, + Labels: map[string]string{ + "state": "new", + }, + }, + } + if err := store.GuaranteedUpdate(ctx, preset[2].key, preset[2].storedObj, false, nil, + func(_ runtime.Object, _ storage.ResponseMeta) (runtime.Object, *uint64, error) { + return newPod, nil, nil + }, newPod); err != nil { + t.Fatalf("update failed: %v", err) + } + + // compact to latest revision. + lastRVString := preset[2].storedObj.ResourceVersion + compaction(ctx, t, lastRVString) + + // The old continue token should have expired + options = storage.ListOptions{ + ResourceVersion: "0", + Predicate: pred(0, continueFromSecondItem), + Recursive: true, + } + err := store.GetList(ctx, "/", options, out) + if err == nil { + t.Fatalf("unexpected no error") + } + if !strings.Contains(err.Error(), "The provided continue parameter is too old ") { + t.Fatalf("unexpected error message %v", err) + } + status, ok := err.(apierrors.APIStatus) + if !ok { + t.Fatalf("expect error of implements the APIStatus interface, got %v", reflect.TypeOf(err)) + } + inconsistentContinueFromSecondItem := status.Status().ListMeta.Continue + if len(inconsistentContinueFromSecondItem) == 0 { + t.Fatalf("expect non-empty continue token") + } + + out = &example.PodList{} + options = storage.ListOptions{ + ResourceVersion: "0", + Predicate: pred(1, inconsistentContinueFromSecondItem), + Recursive: true, + } + if err := store.GetList(ctx, "/", options, out); err != nil { + t.Fatalf("Unable to get second page: %v", err) + } + if len(out.Continue) == 0 { + t.Fatalf("No continuation token set") + } + validateResourceVersion := ResourceVersionNotOlderThan(lastRVString) + ExpectNoDiff(t, "incorrect second page", []example.Pod{*preset[1].storedObj}, out.Items) + if err := validateResourceVersion(out.ResourceVersion); err != nil { + t.Fatal(err) + } + continueFromThirdItem := out.Continue + resolvedResourceVersionFromThirdItem := out.ResourceVersion + out = &example.PodList{} + options = storage.ListOptions{ + ResourceVersion: "0", + Predicate: pred(1, continueFromThirdItem), + Recursive: true, + } + if err := store.GetList(ctx, "/", options, out); err != nil { + t.Fatalf("Unable to get second page: %v", err) + } + if len(out.Continue) != 0 { + t.Fatalf("Unexpected continuation token set") + } + ExpectNoDiff(t, "incorrect third page", []example.Pod{*preset[2].storedObj}, out.Items) + if out.ResourceVersion != resolvedResourceVersionFromThirdItem { + t.Fatalf("Expected list resource version to be %s, got %s", resolvedResourceVersionFromThirdItem, out.ResourceVersion) + } +} + type PrefixTransformerModifier func(*PrefixTransformer) value.Transformer type InterfaceWithPrefixTransformer interface { diff --git a/staging/src/k8s.io/apiserver/pkg/storage/testing/watcher_tests.go b/staging/src/k8s.io/apiserver/pkg/storage/testing/watcher_tests.go index bbdf5c4adff..d8823db90ac 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/testing/watcher_tests.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/testing/watcher_tests.go @@ -121,6 +121,62 @@ func testWatch(ctx context.Context, t *testing.T, store storage.Interface, recur } } +// RunTestWatchFromZero tests that +// - watch from 0 should sync up and grab the object added before +// - watch from 0 is able to return events for objects whose previous version has been compacted +func RunTestWatchFromZero(ctx context.Context, t *testing.T, store storage.Interface, compaction Compaction) { + key, storedObj := TestPropagateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "ns"}}) + + w, err := store.Watch(ctx, key, storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything}) + if err != nil { + t.Fatalf("Watch failed: %v", err) + } + TestCheckResult(t, watch.Added, w, storedObj) + w.Stop() + + // Update + out := &example.Pod{} + err = store.GuaranteedUpdate(ctx, key, out, true, nil, storage.SimpleUpdate( + func(runtime.Object) (runtime.Object, error) { + return &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "ns", Annotations: map[string]string{"a": "1"}}}, nil + }), nil) + if err != nil { + t.Fatalf("GuaranteedUpdate failed: %v", err) + } + + // Make sure when we watch from 0 we receive an ADDED event + w, err = store.Watch(ctx, key, storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything}) + if err != nil { + t.Fatalf("Watch failed: %v", err) + } + TestCheckResult(t, watch.Added, w, out) + w.Stop() + + if compaction == nil { + t.Skip("compaction callback not provided") + } + + // Update again + out = &example.Pod{} + err = store.GuaranteedUpdate(ctx, key, out, true, nil, storage.SimpleUpdate( + func(runtime.Object) (runtime.Object, error) { + return &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "ns"}}, nil + }), nil) + if err != nil { + t.Fatalf("GuaranteedUpdate failed: %v", err) + } + + // Compact previous versions + compaction(ctx, t, out.ResourceVersion) + + // Make sure we can still watch from 0 and receive an ADDED event + w, err = store.Watch(ctx, key, storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything}) + if err != nil { + t.Fatalf("Watch failed: %v", err) + } + TestCheckResult(t, watch.Added, w, out) +} + func RunTestDeleteTriggerWatch(ctx context.Context, t *testing.T, store storage.Interface) { key, storedObj := TestPropagateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}) w, err := store.Watch(ctx, key, storage.ListOptions{ResourceVersion: storedObj.ResourceVersion, Predicate: storage.Everything})