From 7da7ddd779f9ea835f0c57deae05e050c543066b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Tyczy=C5=84ski?= Date: Wed, 26 Oct 2022 16:34:28 +0200 Subject: [PATCH 1/4] Refactor compaction in etcd3 tests --- .../apiserver/pkg/storage/etcd3/store_test.go | 29 +++++++++++++------ .../pkg/storage/etcd3/watcher_test.go | 15 +++++----- .../pkg/storage/testing/store_tests.go | 2 ++ 3 files changed, 29 insertions(+), 17 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go index f367d8327b7..6b6267268c7 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go @@ -253,8 +253,26 @@ func TestListContinuationWithFilter(t *testing.T) { storagetesting.RunTestListContinuationWithFilter(ctx, t, store, validation) } +func compactStorage(etcdClient *clientv3.Client) storagetesting.Compaction { + return func(ctx context.Context, t *testing.T, resourceVersion string) { + versioner := storage.APIObjectVersioner{} + rv, err := versioner.ParseResourceVersion(resourceVersion) + if err != nil { + t.Fatal(err) + } + if _, err := etcdClient.KV.Compact(ctx, int64(rv), clientv3.WithCompactPhysical()); err != nil { + t.Fatalf("Unable to compact, %v", err) + } + } +} + func TestListInconsistentContinuation(t *testing.T) { ctx, store, client := testSetup(t) + compaction := compactStorage(client) + + if compaction == nil { + t.Skipf("compaction callback not provided") + } // Setup storage with the following structure: // / @@ -342,15 +360,8 @@ func TestListInconsistentContinuation(t *testing.T) { } // compact to latest revision. - versioner := storage.APIObjectVersioner{} lastRVString := preset[2].storedObj.ResourceVersion - lastRV, err := versioner.ParseResourceVersion(lastRVString) - if err != nil { - t.Fatal(err) - } - if _, err := client.KV.Compact(ctx, int64(lastRV), clientv3.WithCompactPhysical()); err != nil { - t.Fatalf("Unable to compact, %v", err) - } + compaction(ctx, t, lastRVString) // The old continue token should have expired options = storage.ListOptions{ @@ -358,7 +369,7 @@ func TestListInconsistentContinuation(t *testing.T) { Predicate: pred(0, continueFromSecondItem), Recursive: true, } - err = store.GetList(ctx, "/", options, out) + err := store.GetList(ctx, "/", options, out) if err == nil { t.Fatalf("unexpected no error") } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go index cf76a162719..8e387cc266d 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go @@ -54,6 +54,8 @@ func TestDeleteTriggerWatch(t *testing.T) { // - watch from 0 is able to return events for objects whose previous version has been compacted func TestWatchFromZero(t *testing.T) { ctx, store, client := testSetup(t) + compaction := compactStorage(client) + key, storedObj := storagetesting.TestPropagateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "ns"}}) w, err := store.Watch(ctx, key, storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything}) @@ -81,6 +83,10 @@ func TestWatchFromZero(t *testing.T) { storagetesting.TestCheckResult(t, watch.Added, w, out) w.Stop() + if compaction == nil { + t.Skip("compaction callback not provided") + } + // Update again out = &example.Pod{} err = store.GuaranteedUpdate(ctx, key, out, true, nil, storage.SimpleUpdate( @@ -92,14 +98,7 @@ func TestWatchFromZero(t *testing.T) { } // Compact previous versions - revToCompact, err := store.versioner.ParseResourceVersion(out.ResourceVersion) - if err != nil { - t.Fatalf("Error converting %q to an int: %v", storedObj.ResourceVersion, err) - } - _, err = client.Compact(ctx, int64(revToCompact), clientv3.WithCompactPhysical()) - if err != nil { - t.Fatalf("Error compacting: %v", err) - } + compaction(ctx, t, out.ResourceVersion) // Make sure we can still watch from 0 and receive an ADDED event w, err = store.Watch(ctx, key, storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything}) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/testing/store_tests.go b/staging/src/k8s.io/apiserver/pkg/storage/testing/store_tests.go index f7c92a3ff81..7dec4985691 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/testing/store_tests.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/testing/store_tests.go @@ -1410,6 +1410,8 @@ func RunTestListContinuationWithFilter(ctx context.Context, t *testing.T, store } } +type Compaction func(ctx context.Context, t *testing.T, resourceVersion string) + type PrefixTransformerModifier func(*PrefixTransformer) value.Transformer type InterfaceWithPrefixTransformer interface { From b02f172cbdf7e824d13a6a9c3a9b9fe4f3f92afb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Tyczy=C5=84ski?= Date: Wed, 26 Oct 2022 20:47:19 +0200 Subject: [PATCH 2/4] Refactor storage tests using compaction --- .../apiserver/pkg/storage/etcd3/store_test.go | 155 +----------------- .../pkg/storage/etcd3/watcher_test.go | 56 +------ .../pkg/storage/testing/store_tests.go | 154 +++++++++++++++++ .../pkg/storage/testing/watcher_tests.go | 56 +++++++ 4 files changed, 212 insertions(+), 209 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go index 6b6267268c7..3785ef8ec52 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go @@ -23,7 +23,6 @@ import ( "os" "reflect" "strconv" - "strings" "sync" "sync/atomic" "testing" @@ -33,7 +32,6 @@ import ( "google.golang.org/grpc/grpclog" "k8s.io/apimachinery/pkg/api/apitesting" - apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" @@ -268,158 +266,7 @@ func compactStorage(etcdClient *clientv3.Client) storagetesting.Compaction { func TestListInconsistentContinuation(t *testing.T) { ctx, store, client := testSetup(t) - compaction := compactStorage(client) - - if compaction == nil { - t.Skipf("compaction callback not provided") - } - - // Setup storage with the following structure: - // / - // - one-level/ - // | - test - // | - // - two-level/ - // - 1/ - // | - test - // | - // - 2/ - // - test - // - preset := []struct { - key string - obj *example.Pod - storedObj *example.Pod - }{ - { - key: "/one-level/test", - obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, - }, - { - key: "/two-level/1/test", - obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, - }, - { - key: "/two-level/2/test", - obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}}, - }, - } - - for i, ps := range preset { - preset[i].storedObj = &example.Pod{} - err := store.Create(ctx, ps.key, ps.obj, preset[i].storedObj, 0) - if err != nil { - t.Fatalf("Set failed: %v", err) - } - } - - pred := func(limit int64, continueValue string) storage.SelectionPredicate { - return storage.SelectionPredicate{ - Limit: limit, - Continue: continueValue, - Label: labels.Everything(), - Field: fields.Everything(), - GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) { - pod := obj.(*example.Pod) - return nil, fields.Set{"metadata.name": pod.Name}, nil - }, - } - } - - out := &example.PodList{} - options := storage.ListOptions{ - ResourceVersion: "0", - Predicate: pred(1, ""), - Recursive: true, - } - if err := store.GetList(ctx, "/", options, out); err != nil { - t.Fatalf("Unable to get initial list: %v", err) - } - if len(out.Continue) == 0 { - t.Fatalf("No continuation token set") - } - storagetesting.ExpectNoDiff(t, "incorrect first page", []example.Pod{*preset[0].storedObj}, out.Items) - - continueFromSecondItem := out.Continue - - // update /two-level/2/test/bar - oldName := preset[2].obj.Name - newPod := &example.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: oldName, - Labels: map[string]string{ - "state": "new", - }, - }, - } - if err := store.GuaranteedUpdate(ctx, preset[2].key, preset[2].storedObj, false, nil, - func(_ runtime.Object, _ storage.ResponseMeta) (runtime.Object, *uint64, error) { - return newPod, nil, nil - }, newPod); err != nil { - t.Fatalf("update failed: %v", err) - } - - // compact to latest revision. - lastRVString := preset[2].storedObj.ResourceVersion - compaction(ctx, t, lastRVString) - - // The old continue token should have expired - options = storage.ListOptions{ - ResourceVersion: "0", - Predicate: pred(0, continueFromSecondItem), - Recursive: true, - } - err := store.GetList(ctx, "/", options, out) - if err == nil { - t.Fatalf("unexpected no error") - } - if !strings.Contains(err.Error(), inconsistentContinue) { - t.Fatalf("unexpected error message %v", err) - } - status, ok := err.(apierrors.APIStatus) - if !ok { - t.Fatalf("expect error of implements the APIStatus interface, got %v", reflect.TypeOf(err)) - } - inconsistentContinueFromSecondItem := status.Status().ListMeta.Continue - if len(inconsistentContinueFromSecondItem) == 0 { - t.Fatalf("expect non-empty continue token") - } - - out = &example.PodList{} - options = storage.ListOptions{ - ResourceVersion: "0", - Predicate: pred(1, inconsistentContinueFromSecondItem), - Recursive: true, - } - if err := store.GetList(ctx, "/", options, out); err != nil { - t.Fatalf("Unable to get second page: %v", err) - } - if len(out.Continue) == 0 { - t.Fatalf("No continuation token set") - } - validateResourceVersion := storagetesting.ResourceVersionNotOlderThan(lastRVString) - storagetesting.ExpectNoDiff(t, "incorrect second page", []example.Pod{*preset[1].storedObj}, out.Items) - if err := validateResourceVersion(out.ResourceVersion); err != nil { - t.Fatal(err) - } - continueFromThirdItem := out.Continue - resolvedResourceVersionFromThirdItem := out.ResourceVersion - out = &example.PodList{} - options = storage.ListOptions{ - ResourceVersion: "0", - Predicate: pred(1, continueFromThirdItem), - Recursive: true, - } - if err := store.GetList(ctx, "/", options, out); err != nil { - t.Fatalf("Unable to get second page: %v", err) - } - if len(out.Continue) != 0 { - t.Fatalf("Unexpected continuation token set") - } - storagetesting.ExpectNoDiff(t, "incorrect third page", []example.Pod{*preset[2].storedObj}, out.Items) - if out.ResourceVersion != resolvedResourceVersionFromThirdItem { - t.Fatalf("Expected list resource version to be %s, got %s", resolvedResourceVersionFromThirdItem, out.ResourceVersion) - } + storagetesting.RunTestListInconsistentContinuation(ctx, t, store, compactStorage(client)) } func newTestLeaseManagerConfig() LeaseManagerConfig { diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go index 8e387cc266d..cadd90cc3bf 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go @@ -49,63 +49,9 @@ func TestDeleteTriggerWatch(t *testing.T) { storagetesting.RunTestDeleteTriggerWatch(ctx, t, store) } -// TestWatchFromZero tests that -// - watch from 0 should sync up and grab the object added before -// - watch from 0 is able to return events for objects whose previous version has been compacted func TestWatchFromZero(t *testing.T) { ctx, store, client := testSetup(t) - compaction := compactStorage(client) - - key, storedObj := storagetesting.TestPropagateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "ns"}}) - - w, err := store.Watch(ctx, key, storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything}) - if err != nil { - t.Fatalf("Watch failed: %v", err) - } - storagetesting.TestCheckResult(t, watch.Added, w, storedObj) - w.Stop() - - // Update - out := &example.Pod{} - err = store.GuaranteedUpdate(ctx, key, out, true, nil, storage.SimpleUpdate( - func(runtime.Object) (runtime.Object, error) { - return &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "ns", Annotations: map[string]string{"a": "1"}}}, nil - }), nil) - if err != nil { - t.Fatalf("GuaranteedUpdate failed: %v", err) - } - - // Make sure when we watch from 0 we receive an ADDED event - w, err = store.Watch(ctx, key, storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything}) - if err != nil { - t.Fatalf("Watch failed: %v", err) - } - storagetesting.TestCheckResult(t, watch.Added, w, out) - w.Stop() - - if compaction == nil { - t.Skip("compaction callback not provided") - } - - // Update again - out = &example.Pod{} - err = store.GuaranteedUpdate(ctx, key, out, true, nil, storage.SimpleUpdate( - func(runtime.Object) (runtime.Object, error) { - return &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "ns"}}, nil - }), nil) - if err != nil { - t.Fatalf("GuaranteedUpdate failed: %v", err) - } - - // Compact previous versions - compaction(ctx, t, out.ResourceVersion) - - // Make sure we can still watch from 0 and receive an ADDED event - w, err = store.Watch(ctx, key, storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything}) - if err != nil { - t.Fatalf("Watch failed: %v", err) - } - storagetesting.TestCheckResult(t, watch.Added, w, out) + storagetesting.RunTestWatchFromZero(ctx, t, store, compactStorage(client)) } // TestWatchFromNoneZero tests that diff --git a/staging/src/k8s.io/apiserver/pkg/storage/testing/store_tests.go b/staging/src/k8s.io/apiserver/pkg/storage/testing/store_tests.go index 7dec4985691..686b80944e9 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/testing/store_tests.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/testing/store_tests.go @@ -23,6 +23,7 @@ import ( "math" "reflect" "strconv" + "strings" "sync" "testing" @@ -1412,6 +1413,159 @@ func RunTestListContinuationWithFilter(ctx context.Context, t *testing.T, store type Compaction func(ctx context.Context, t *testing.T, resourceVersion string) +func RunTestListInconsistentContinuation(ctx context.Context, t *testing.T, store storage.Interface, compaction Compaction) { + if compaction == nil { + t.Skipf("compaction callback not provided") + } + + // Setup storage with the following structure: + // / + // - one-level/ + // | - test + // | + // - two-level/ + // - 1/ + // | - test + // | + // - 2/ + // - test + // + preset := []struct { + key string + obj *example.Pod + storedObj *example.Pod + }{ + { + key: "/one-level/test", + obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, + }, + { + key: "/two-level/1/test", + obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, + }, + { + key: "/two-level/2/test", + obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}}, + }, + } + + for i, ps := range preset { + preset[i].storedObj = &example.Pod{} + err := store.Create(ctx, ps.key, ps.obj, preset[i].storedObj, 0) + if err != nil { + t.Fatalf("Set failed: %v", err) + } + } + + pred := func(limit int64, continueValue string) storage.SelectionPredicate { + return storage.SelectionPredicate{ + Limit: limit, + Continue: continueValue, + Label: labels.Everything(), + Field: fields.Everything(), + GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) { + pod := obj.(*example.Pod) + return nil, fields.Set{"metadata.name": pod.Name}, nil + }, + } + } + + out := &example.PodList{} + options := storage.ListOptions{ + ResourceVersion: "0", + Predicate: pred(1, ""), + Recursive: true, + } + if err := store.GetList(ctx, "/", options, out); err != nil { + t.Fatalf("Unable to get initial list: %v", err) + } + if len(out.Continue) == 0 { + t.Fatalf("No continuation token set") + } + ExpectNoDiff(t, "incorrect first page", []example.Pod{*preset[0].storedObj}, out.Items) + + continueFromSecondItem := out.Continue + + // update /two-level/2/test/bar + oldName := preset[2].obj.Name + newPod := &example.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: oldName, + Labels: map[string]string{ + "state": "new", + }, + }, + } + if err := store.GuaranteedUpdate(ctx, preset[2].key, preset[2].storedObj, false, nil, + func(_ runtime.Object, _ storage.ResponseMeta) (runtime.Object, *uint64, error) { + return newPod, nil, nil + }, newPod); err != nil { + t.Fatalf("update failed: %v", err) + } + + // compact to latest revision. + lastRVString := preset[2].storedObj.ResourceVersion + compaction(ctx, t, lastRVString) + + // The old continue token should have expired + options = storage.ListOptions{ + ResourceVersion: "0", + Predicate: pred(0, continueFromSecondItem), + Recursive: true, + } + err := store.GetList(ctx, "/", options, out) + if err == nil { + t.Fatalf("unexpected no error") + } + if !strings.Contains(err.Error(), "The provided continue parameter is too old ") { + t.Fatalf("unexpected error message %v", err) + } + status, ok := err.(apierrors.APIStatus) + if !ok { + t.Fatalf("expect error of implements the APIStatus interface, got %v", reflect.TypeOf(err)) + } + inconsistentContinueFromSecondItem := status.Status().ListMeta.Continue + if len(inconsistentContinueFromSecondItem) == 0 { + t.Fatalf("expect non-empty continue token") + } + + out = &example.PodList{} + options = storage.ListOptions{ + ResourceVersion: "0", + Predicate: pred(1, inconsistentContinueFromSecondItem), + Recursive: true, + } + if err := store.GetList(ctx, "/", options, out); err != nil { + t.Fatalf("Unable to get second page: %v", err) + } + if len(out.Continue) == 0 { + t.Fatalf("No continuation token set") + } + validateResourceVersion := ResourceVersionNotOlderThan(lastRVString) + ExpectNoDiff(t, "incorrect second page", []example.Pod{*preset[1].storedObj}, out.Items) + if err := validateResourceVersion(out.ResourceVersion); err != nil { + t.Fatal(err) + } + continueFromThirdItem := out.Continue + resolvedResourceVersionFromThirdItem := out.ResourceVersion + out = &example.PodList{} + options = storage.ListOptions{ + ResourceVersion: "0", + Predicate: pred(1, continueFromThirdItem), + Recursive: true, + } + if err := store.GetList(ctx, "/", options, out); err != nil { + t.Fatalf("Unable to get second page: %v", err) + } + if len(out.Continue) != 0 { + t.Fatalf("Unexpected continuation token set") + } + ExpectNoDiff(t, "incorrect third page", []example.Pod{*preset[2].storedObj}, out.Items) + if out.ResourceVersion != resolvedResourceVersionFromThirdItem { + t.Fatalf("Expected list resource version to be %s, got %s", resolvedResourceVersionFromThirdItem, out.ResourceVersion) + } +} + type PrefixTransformerModifier func(*PrefixTransformer) value.Transformer type InterfaceWithPrefixTransformer interface { diff --git a/staging/src/k8s.io/apiserver/pkg/storage/testing/watcher_tests.go b/staging/src/k8s.io/apiserver/pkg/storage/testing/watcher_tests.go index bbdf5c4adff..d8823db90ac 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/testing/watcher_tests.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/testing/watcher_tests.go @@ -121,6 +121,62 @@ func testWatch(ctx context.Context, t *testing.T, store storage.Interface, recur } } +// RunTestWatchFromZero tests that +// - watch from 0 should sync up and grab the object added before +// - watch from 0 is able to return events for objects whose previous version has been compacted +func RunTestWatchFromZero(ctx context.Context, t *testing.T, store storage.Interface, compaction Compaction) { + key, storedObj := TestPropagateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "ns"}}) + + w, err := store.Watch(ctx, key, storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything}) + if err != nil { + t.Fatalf("Watch failed: %v", err) + } + TestCheckResult(t, watch.Added, w, storedObj) + w.Stop() + + // Update + out := &example.Pod{} + err = store.GuaranteedUpdate(ctx, key, out, true, nil, storage.SimpleUpdate( + func(runtime.Object) (runtime.Object, error) { + return &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "ns", Annotations: map[string]string{"a": "1"}}}, nil + }), nil) + if err != nil { + t.Fatalf("GuaranteedUpdate failed: %v", err) + } + + // Make sure when we watch from 0 we receive an ADDED event + w, err = store.Watch(ctx, key, storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything}) + if err != nil { + t.Fatalf("Watch failed: %v", err) + } + TestCheckResult(t, watch.Added, w, out) + w.Stop() + + if compaction == nil { + t.Skip("compaction callback not provided") + } + + // Update again + out = &example.Pod{} + err = store.GuaranteedUpdate(ctx, key, out, true, nil, storage.SimpleUpdate( + func(runtime.Object) (runtime.Object, error) { + return &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "ns"}}, nil + }), nil) + if err != nil { + t.Fatalf("GuaranteedUpdate failed: %v", err) + } + + // Compact previous versions + compaction(ctx, t, out.ResourceVersion) + + // Make sure we can still watch from 0 and receive an ADDED event + w, err = store.Watch(ctx, key, storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything}) + if err != nil { + t.Fatalf("Watch failed: %v", err) + } + TestCheckResult(t, watch.Added, w, out) +} + func RunTestDeleteTriggerWatch(ctx context.Context, t *testing.T, store storage.Interface) { key, storedObj := TestPropagateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}) w, err := store.Watch(ctx, key, storage.ListOptions{ResourceVersion: storedObj.ResourceVersion, Predicate: storage.Everything}) From bbe1ebc82aa019118c91e8447517f91b9d036c1e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Tyczy=C5=84ski?= Date: Thu, 27 Oct 2022 09:48:06 +0200 Subject: [PATCH 3/4] Minor cleanup of etcd3 tests --- .../apiserver/pkg/storage/etcd3/store_test.go | 316 +++++++++--------- 1 file changed, 164 insertions(+), 152 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go index 3785ef8ec52..1ed514a384f 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go @@ -269,6 +269,170 @@ func TestListInconsistentContinuation(t *testing.T) { storagetesting.RunTestListInconsistentContinuation(ctx, t, store, compactStorage(client)) } +func TestCount(t *testing.T) { + ctx, store, _ := testSetup(t) + storagetesting.RunTestCount(ctx, t, store) +} + +// ======================================================================= +// Implementation-specific tests are following. +// The following tests are exercising the details of the implementation +// not the actual user-facing contract of storage interface. +// As such, they may focus e.g. on non-functional aspects like performance +// impact. +// ======================================================================= + +func TestPrefix(t *testing.T) { + testcases := map[string]string{ + "custom/prefix": "/custom/prefix", + "/custom//prefix//": "/custom/prefix", + "/registry": "/registry", + } + for configuredPrefix, effectivePrefix := range testcases { + _, store, _ := testSetup(t, withPrefix(configuredPrefix)) + if store.pathPrefix != effectivePrefix { + t.Errorf("configured prefix of %s, expected effective prefix of %s, got %s", configuredPrefix, effectivePrefix, store.pathPrefix) + } + } +} + +func Test_growSlice(t *testing.T) { + type args struct { + initialCapacity int + initialLen int + v reflect.Value + maxCapacity int + sizes []int + } + tests := []struct { + name string + args args + cap int + len int + }{ + { + name: "empty", + args: args{v: reflect.ValueOf([]example.Pod{})}, + cap: 0, + }, + { + name: "no sizes", + args: args{v: reflect.ValueOf([]example.Pod{}), maxCapacity: 10}, + cap: 10, + }, + { + name: "above maxCapacity", + args: args{v: reflect.ValueOf([]example.Pod{}), maxCapacity: 10, sizes: []int{1, 12}}, + cap: 10, + }, + { + name: "takes max", + args: args{v: reflect.ValueOf([]example.Pod{}), maxCapacity: 10, sizes: []int{8, 4}}, + cap: 8, + }, + { + name: "with existing capacity above max", + args: args{initialCapacity: 12, maxCapacity: 10, sizes: []int{8, 4}}, + cap: 12, + }, + { + name: "with existing capacity below max", + args: args{initialCapacity: 5, maxCapacity: 10, sizes: []int{8, 4}}, + cap: 8, + }, + { + name: "with existing capacity and length above max", + args: args{initialCapacity: 12, initialLen: 5, maxCapacity: 10, sizes: []int{8, 4}}, + cap: 12, + len: 5, + }, + { + name: "with existing capacity and length below max", + args: args{initialCapacity: 5, initialLen: 3, maxCapacity: 10, sizes: []int{8, 4}}, + cap: 8, + len: 3, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if tt.args.initialCapacity > 0 { + val := make([]example.Pod, tt.args.initialLen, tt.args.initialCapacity) + for i := 0; i < tt.args.initialLen; i++ { + val[i].Name = fmt.Sprintf("test-%d", i) + } + tt.args.v = reflect.ValueOf(val) + } + // reflection requires that the value be addressable in order to call set, + // so we must ensure the value we created is available on the heap (not a problem + // for normal usage) + if !tt.args.v.CanAddr() { + x := reflect.New(tt.args.v.Type()) + x.Elem().Set(tt.args.v) + tt.args.v = x.Elem() + } + growSlice(tt.args.v, tt.args.maxCapacity, tt.args.sizes...) + if tt.cap != tt.args.v.Cap() { + t.Errorf("Unexpected capacity: got=%d want=%d", tt.args.v.Cap(), tt.cap) + } + if tt.len != tt.args.v.Len() { + t.Errorf("Unexpected length: got=%d want=%d", tt.args.v.Len(), tt.len) + } + for i := 0; i < tt.args.v.Len(); i++ { + nameWanted := fmt.Sprintf("test-%d", i) + val := tt.args.v.Index(i).Interface() + pod, ok := val.(example.Pod) + if !ok || pod.Name != nameWanted { + t.Errorf("Unexpected element value: got=%s, want=%s", pod.Name, nameWanted) + } + } + }) + } +} + +func TestLeaseMaxObjectCount(t *testing.T) { + ctx, store, _ := testSetup(t, withLeaseConfig(LeaseManagerConfig{ + ReuseDurationSeconds: defaultLeaseReuseDurationSeconds, + MaxObjectCount: 2, + })) + + obj := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}} + out := &example.Pod{} + + testCases := []struct { + key string + expectAttachedCount int64 + }{ + { + key: "testkey1", + expectAttachedCount: 1, + }, + { + key: "testkey2", + expectAttachedCount: 2, + }, + { + key: "testkey3", + // We assume each time has 1 object attached to the lease + // so after granting a new lease, the recorded count is set to 1 + expectAttachedCount: 1, + }, + } + + for _, tc := range testCases { + err := store.Create(ctx, tc.key, obj, out, 120) + if err != nil { + t.Fatalf("Set failed: %v", err) + } + if store.leaseManager.leaseAttachedObjectCount != tc.expectAttachedCount { + t.Errorf("Lease manager recorded count %v should be %v", store.leaseManager.leaseAttachedObjectCount, tc.expectAttachedCount) + } + } +} + +// =================================================== +// Test-setup related function are following. +// =================================================== + func newTestLeaseManagerConfig() LeaseManagerConfig { cfg := NewDefaultLeaseManagerConfig() // As 30s is the default timeout for testing in global configuration, @@ -402,113 +566,6 @@ func testSetup(t *testing.T, opts ...setupOption) (context.Context, *store, *cli return ctx, store, client } -func TestPrefix(t *testing.T) { - testcases := map[string]string{ - "custom/prefix": "/custom/prefix", - "/custom//prefix//": "/custom/prefix", - "/registry": "/registry", - } - for configuredPrefix, effectivePrefix := range testcases { - _, store, _ := testSetup(t, withPrefix(configuredPrefix)) - if store.pathPrefix != effectivePrefix { - t.Errorf("configured prefix of %s, expected effective prefix of %s, got %s", configuredPrefix, effectivePrefix, store.pathPrefix) - } - } -} - -func Test_growSlice(t *testing.T) { - type args struct { - initialCapacity int - initialLen int - v reflect.Value - maxCapacity int - sizes []int - } - tests := []struct { - name string - args args - cap int - len int - }{ - { - name: "empty", - args: args{v: reflect.ValueOf([]example.Pod{})}, - cap: 0, - }, - { - name: "no sizes", - args: args{v: reflect.ValueOf([]example.Pod{}), maxCapacity: 10}, - cap: 10, - }, - { - name: "above maxCapacity", - args: args{v: reflect.ValueOf([]example.Pod{}), maxCapacity: 10, sizes: []int{1, 12}}, - cap: 10, - }, - { - name: "takes max", - args: args{v: reflect.ValueOf([]example.Pod{}), maxCapacity: 10, sizes: []int{8, 4}}, - cap: 8, - }, - { - name: "with existing capacity above max", - args: args{initialCapacity: 12, maxCapacity: 10, sizes: []int{8, 4}}, - cap: 12, - }, - { - name: "with existing capacity below max", - args: args{initialCapacity: 5, maxCapacity: 10, sizes: []int{8, 4}}, - cap: 8, - }, - { - name: "with existing capacity and length above max", - args: args{initialCapacity: 12, initialLen: 5, maxCapacity: 10, sizes: []int{8, 4}}, - cap: 12, - len: 5, - }, - { - name: "with existing capacity and length below max", - args: args{initialCapacity: 5, initialLen: 3, maxCapacity: 10, sizes: []int{8, 4}}, - cap: 8, - len: 3, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - if tt.args.initialCapacity > 0 { - val := make([]example.Pod, tt.args.initialLen, tt.args.initialCapacity) - for i := 0; i < tt.args.initialLen; i++ { - val[i].Name = fmt.Sprintf("test-%d", i) - } - tt.args.v = reflect.ValueOf(val) - } - // reflection requires that the value be addressable in order to call set, - // so we must ensure the value we created is available on the heap (not a problem - // for normal usage) - if !tt.args.v.CanAddr() { - x := reflect.New(tt.args.v.Type()) - x.Elem().Set(tt.args.v) - tt.args.v = x.Elem() - } - growSlice(tt.args.v, tt.args.maxCapacity, tt.args.sizes...) - if tt.cap != tt.args.v.Cap() { - t.Errorf("Unexpected capacity: got=%d want=%d", tt.args.v.Cap(), tt.cap) - } - if tt.len != tt.args.v.Len() { - t.Errorf("Unexpected length: got=%d want=%d", tt.args.v.Len(), tt.len) - } - for i := 0; i < tt.args.v.Len(); i++ { - nameWanted := fmt.Sprintf("test-%d", i) - val := tt.args.v.Index(i).Interface() - pod, ok := val.(example.Pod) - if !ok || pod.Name != nameWanted { - t.Errorf("Unexpected element value: got=%s, want=%s", pod.Name, nameWanted) - } - } - }) - } -} - // fancyTransformer creates next object on each call to // TransformFromStorage call. type fancyTransformer struct { @@ -616,48 +673,3 @@ func TestConsistentList(t *testing.T) { storagetesting.ExpectNoDiff(t, "incorrect lists", result3, result4) } - -func TestCount(t *testing.T) { - ctx, store, _ := testSetup(t) - storagetesting.RunTestCount(ctx, t, store) -} - -func TestLeaseMaxObjectCount(t *testing.T) { - ctx, store, _ := testSetup(t, withLeaseConfig(LeaseManagerConfig{ - ReuseDurationSeconds: defaultLeaseReuseDurationSeconds, - MaxObjectCount: 2, - })) - - obj := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}} - out := &example.Pod{} - - testCases := []struct { - key string - expectAttachedCount int64 - }{ - { - key: "testkey1", - expectAttachedCount: 1, - }, - { - key: "testkey2", - expectAttachedCount: 2, - }, - { - key: "testkey3", - // We assume each time has 1 object attached to the lease - // so after granting a new lease, the recorded count is set to 1 - expectAttachedCount: 1, - }, - } - - for _, tc := range testCases { - err := store.Create(ctx, tc.key, obj, out, 120) - if err != nil { - t.Fatalf("Set failed: %v", err) - } - if store.leaseManager.leaseAttachedObjectCount != tc.expectAttachedCount { - t.Errorf("Lease manager recorded count %v should be %v", store.leaseManager.leaseAttachedObjectCount, tc.expectAttachedCount) - } - } -} From cd5da36c92f3ed52debc22c39a7cd9d369b0eecd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Tyczy=C5=84ski?= Date: Thu, 27 Oct 2022 10:19:09 +0200 Subject: [PATCH 4/4] Refactor etcd3 list consistency test --- .../apiserver/pkg/storage/etcd3/store_test.go | 123 +----------------- .../pkg/storage/testing/store_tests.go | 88 +++++++++++++ .../apiserver/pkg/storage/testing/utils.go | 30 +++++ 3 files changed, 123 insertions(+), 118 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go index 1ed514a384f..c0536fa4b57 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go @@ -22,8 +22,6 @@ import ( "io/ioutil" "os" "reflect" - "strconv" - "sync" "sync/atomic" "testing" @@ -33,8 +31,6 @@ import ( "k8s.io/apimachinery/pkg/api/apitesting" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/fields" - "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/serializer" @@ -269,6 +265,11 @@ func TestListInconsistentContinuation(t *testing.T) { storagetesting.RunTestListInconsistentContinuation(ctx, t, store, compactStorage(client)) } +func TestConsistentList(t *testing.T) { + ctx, store, _ := testSetup(t) + storagetesting.RunTestConsistentList(ctx, t, &storeWithPrefixTransformer{store}) +} + func TestCount(t *testing.T) { ctx, store, _ := testSetup(t) storagetesting.RunTestCount(ctx, t, store) @@ -509,12 +510,6 @@ func withoutPaging() setupOption { } } -func withTransformer(transformer value.Transformer) setupOption { - return func(options *setupOptions) { - options.transformer = transformer - } -} - func withLeaseConfig(leaseConfig LeaseManagerConfig) setupOption { return func(options *setupOptions) { options.leaseConfig = leaseConfig @@ -565,111 +560,3 @@ func testSetup(t *testing.T, opts ...setupOption) (context.Context, *store, *cli ctx := context.Background() return ctx, store, client } - -// fancyTransformer creates next object on each call to -// TransformFromStorage call. -type fancyTransformer struct { - transformer value.Transformer - store *store - - lock sync.Mutex - index int -} - -func (t *fancyTransformer) TransformFromStorage(ctx context.Context, data []byte, dataCtx value.Context) ([]byte, bool, error) { - if err := t.createObject(ctx); err != nil { - return nil, false, err - } - return t.transformer.TransformFromStorage(ctx, data, dataCtx) -} - -func (t *fancyTransformer) TransformToStorage(ctx context.Context, data []byte, dataCtx value.Context) ([]byte, error) { - return t.transformer.TransformToStorage(ctx, data, dataCtx) -} - -func (t *fancyTransformer) createObject(ctx context.Context) error { - t.lock.Lock() - defer t.lock.Unlock() - - t.index++ - key := fmt.Sprintf("pod-%d", t.index) - obj := &example.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: key, - Labels: map[string]string{ - "even": strconv.FormatBool(t.index%2 == 0), - }, - }, - } - out := &example.Pod{} - return t.store.Create(ctx, key, obj, out, 0) -} - -func TestConsistentList(t *testing.T) { - transformer := &fancyTransformer{ - transformer: newTestTransformer(), - } - ctx, store, _ := testSetup(t, withTransformer(transformer)) - transformer.store = store - - for i := 0; i < 5; i++ { - if err := transformer.createObject(ctx); err != nil { - t.Fatalf("failed to create object: %v", err) - } - } - - getAttrs := func(obj runtime.Object) (labels.Set, fields.Set, error) { - pod, ok := obj.(*example.Pod) - if !ok { - return nil, nil, fmt.Errorf("invalid object") - } - return labels.Set(pod.Labels), nil, nil - } - predicate := storage.SelectionPredicate{ - Label: labels.Set{"even": "true"}.AsSelector(), - GetAttrs: getAttrs, - Limit: 4, - } - - result1 := example.PodList{} - options := storage.ListOptions{ - Predicate: predicate, - Recursive: true, - } - if err := store.GetList(ctx, "/", options, &result1); err != nil { - t.Fatalf("failed to list objects: %v", err) - } - - // List objects from the returned resource version. - options = storage.ListOptions{ - Predicate: predicate, - ResourceVersion: result1.ResourceVersion, - ResourceVersionMatch: metav1.ResourceVersionMatchExact, - Recursive: true, - } - - result2 := example.PodList{} - if err := store.GetList(ctx, "/", options, &result2); err != nil { - t.Fatalf("failed to list objects: %v", err) - } - - storagetesting.ExpectNoDiff(t, "incorrect lists", result1, result2) - - // Now also verify the ResourceVersionMatchNotOlderThan. - options.ResourceVersionMatch = metav1.ResourceVersionMatchNotOlderThan - - result3 := example.PodList{} - if err := store.GetList(ctx, "/", options, &result3); err != nil { - t.Fatalf("failed to list objects: %v", err) - } - - options.ResourceVersion = result3.ResourceVersion - options.ResourceVersionMatch = metav1.ResourceVersionMatchExact - - result4 := example.PodList{} - if err := store.GetList(ctx, "/", options, &result4); err != nil { - t.Fatalf("failed to list objects: %v", err) - } - - storagetesting.ExpectNoDiff(t, "incorrect lists", result3, result4) -} diff --git a/staging/src/k8s.io/apiserver/pkg/storage/testing/store_tests.go b/staging/src/k8s.io/apiserver/pkg/storage/testing/store_tests.go index 686b80944e9..e6d98a0ecc8 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/testing/store_tests.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/testing/store_tests.go @@ -1574,6 +1574,94 @@ type InterfaceWithPrefixTransformer interface { UpdatePrefixTransformer(PrefixTransformerModifier) func() } +func RunTestConsistentList(ctx context.Context, t *testing.T, store InterfaceWithPrefixTransformer) { + nextPod := func(index uint32) (string, *example.Pod) { + key := fmt.Sprintf("pod-%d", index) + obj := &example.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: key, + Labels: map[string]string{ + "even": strconv.FormatBool(index%2 == 0), + }, + }, + } + return key, obj + } + + transformer := &reproducingTransformer{ + store: store, + nextObject: nextPod, + } + + revertTransformer := store.UpdatePrefixTransformer( + func(previousTransformer *PrefixTransformer) value.Transformer { + transformer.wrapped = previousTransformer + return transformer + }) + defer revertTransformer() + + for i := 0; i < 5; i++ { + if err := transformer.createObject(ctx); err != nil { + t.Fatalf("failed to create object: %v", err) + } + } + + getAttrs := func(obj runtime.Object) (labels.Set, fields.Set, error) { + pod, ok := obj.(*example.Pod) + if !ok { + return nil, nil, fmt.Errorf("invalid object") + } + return labels.Set(pod.Labels), nil, nil + } + predicate := storage.SelectionPredicate{ + Label: labels.Set{"even": "true"}.AsSelector(), + GetAttrs: getAttrs, + Limit: 4, + } + + result1 := example.PodList{} + options := storage.ListOptions{ + Predicate: predicate, + Recursive: true, + } + if err := store.GetList(ctx, "/", options, &result1); err != nil { + t.Fatalf("failed to list objects: %v", err) + } + + // List objects from the returned resource version. + options = storage.ListOptions{ + Predicate: predicate, + ResourceVersion: result1.ResourceVersion, + ResourceVersionMatch: metav1.ResourceVersionMatchExact, + Recursive: true, + } + + result2 := example.PodList{} + if err := store.GetList(ctx, "/", options, &result2); err != nil { + t.Fatalf("failed to list objects: %v", err) + } + + ExpectNoDiff(t, "incorrect lists", result1, result2) + + // Now also verify the ResourceVersionMatchNotOlderThan. + options.ResourceVersionMatch = metav1.ResourceVersionMatchNotOlderThan + + result3 := example.PodList{} + if err := store.GetList(ctx, "/", options, &result3); err != nil { + t.Fatalf("failed to list objects: %v", err) + } + + options.ResourceVersion = result3.ResourceVersion + options.ResourceVersionMatch = metav1.ResourceVersionMatchExact + + result4 := example.PodList{} + if err := store.GetList(ctx, "/", options, &result4); err != nil { + t.Fatalf("failed to list objects: %v", err) + } + + ExpectNoDiff(t, "incorrect lists", result3, result4) +} + func RunTestGuaranteedUpdate(ctx context.Context, t *testing.T, store InterfaceWithPrefixTransformer, validation KeyValidation) { key := "/testkey" diff --git a/staging/src/k8s.io/apiserver/pkg/storage/testing/utils.go b/staging/src/k8s.io/apiserver/pkg/storage/testing/utils.go index c6bca507b8f..6c595589839 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/testing/utils.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/testing/utils.go @@ -235,3 +235,33 @@ func (p *PrefixTransformer) TransformToStorage(ctx context.Context, data []byte, func (p *PrefixTransformer) GetReadsAndReset() uint64 { return atomic.SwapUint64(&p.reads, 0) } + +// reproducingTransformer is a custom test-only transformer used purely +// for testing consistency. +// It allows for creating predefined objects on TransformFromStorage operations, +// which allows for precise in time injection of new objects in the middle of +// read operations. +type reproducingTransformer struct { + wrapped value.Transformer + store storage.Interface + + index uint32 + nextObject func(uint32) (string, *example.Pod) +} + +func (rt *reproducingTransformer) TransformFromStorage(ctx context.Context, data []byte, dataCtx value.Context) ([]byte, bool, error) { + if err := rt.createObject(ctx); err != nil { + return nil, false, err + } + return rt.wrapped.TransformFromStorage(ctx, data, dataCtx) +} + +func (rt *reproducingTransformer) TransformToStorage(ctx context.Context, data []byte, dataCtx value.Context) ([]byte, error) { + return rt.wrapped.TransformToStorage(ctx, data, dataCtx) +} + +func (rt *reproducingTransformer) createObject(ctx context.Context) error { + key, obj := rt.nextObject(atomic.AddUint32(&rt.index, 1)) + out := &example.Pod{} + return rt.store.Create(ctx, key, obj, out, 0) +}