From 8472e1bc13b0fa8aa3a67e5fcf2f13bfd7974cd3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Tyczy=C5=84ski?= Date: Wed, 26 Oct 2022 11:30:32 +0200 Subject: [PATCH 1/3] Refactor GetReads --- .../apiserver/pkg/storage/etcd3/store_test.go | 18 +++++++----------- .../apiserver/pkg/storage/testing/utils.go | 8 ++------ 2 files changed, 9 insertions(+), 17 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go index 8160a94e705..45e226d0210 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go @@ -273,13 +273,12 @@ func TestListContinuation(t *testing.T) { t.Fatalf("No continuation token set") } storagetesting.ExpectNoDiff(t, "incorrect first page", []example.Pod{*preset[0].storedObj}, out.Items) - if reads := transformer.GetReads(); reads != 1 { + if reads := transformer.GetReadsAndReset(); reads != 1 { t.Errorf("unexpected reads: %d", reads) } if recorder.reads != 1 { t.Errorf("unexpected reads: %d", recorder.reads) } - transformer.ResetReads() recorder.resetReads() continueFromSecondItem := out.Continue @@ -300,13 +299,12 @@ func TestListContinuation(t *testing.T) { key, rv, err := storage.DecodeContinue(continueFromSecondItem, "/") t.Logf("continue token was %d %s %v", rv, key, err) storagetesting.ExpectNoDiff(t, "incorrect second page", []example.Pod{*preset[1].storedObj, *preset[2].storedObj}, out.Items) - if reads := transformer.GetReads(); reads != 2 { + if reads := transformer.GetReadsAndReset(); reads != 2 { t.Errorf("unexpected reads: %d", reads) } if recorder.reads != 1 { t.Errorf("unexpected reads: %d", recorder.reads) } - transformer.ResetReads() recorder.resetReads() // limit, should get two more pages @@ -323,13 +321,12 @@ func TestListContinuation(t *testing.T) { t.Fatalf("No continuation token set") } storagetesting.ExpectNoDiff(t, "incorrect second page", []example.Pod{*preset[1].storedObj}, out.Items) - if reads := transformer.GetReads(); reads != 1 { + if reads := transformer.GetReadsAndReset(); reads != 1 { t.Errorf("unexpected reads: %d", reads) } if recorder.reads != 1 { t.Errorf("unexpected reads: %d", recorder.reads) } - transformer.ResetReads() recorder.resetReads() continueFromThirdItem := out.Continue @@ -347,7 +344,7 @@ func TestListContinuation(t *testing.T) { t.Fatalf("Unexpected continuation token set") } storagetesting.ExpectNoDiff(t, "incorrect third page", []example.Pod{*preset[2].storedObj}, out.Items) - if reads := transformer.GetReads(); reads != 1 { + if reads := transformer.GetReadsAndReset(); reads != 1 { t.Errorf("unexpected reads: %d", reads) } if recorder.reads != 1 { @@ -396,7 +393,7 @@ func TestListPaginationRareObject(t *testing.T) { if len(out.Items) != 1 || !reflect.DeepEqual(&out.Items[0], pods[999]) { t.Fatalf("Unexpected first page: %#v", out.Items) } - if reads := transformer.GetReads(); reads != uint64(podCount) { + if reads := transformer.GetReadsAndReset(); reads != uint64(podCount) { t.Errorf("unexpected reads: %d", reads) } // We expect that kube-apiserver will be increasing page sizes @@ -491,13 +488,12 @@ func TestListContinuationWithFilter(t *testing.T) { t.Errorf("No continuation token set") } storagetesting.ExpectNoDiff(t, "incorrect first page", []example.Pod{*preset[0].storedObj, *preset[2].storedObj}, out.Items) - if reads := transformer.GetReads(); reads != 3 { + if reads := transformer.GetReadsAndReset(); reads != 3 { t.Errorf("unexpected reads: %d", reads) } if recorder.reads != 2 { t.Errorf("unexpected reads: %d", recorder.reads) } - transformer.ResetReads() recorder.resetReads() // the rest of the test does not make sense if the previous call failed @@ -523,7 +519,7 @@ func TestListContinuationWithFilter(t *testing.T) { t.Errorf("Unexpected continuation token set") } storagetesting.ExpectNoDiff(t, "incorrect second page", []example.Pod{*preset[3].storedObj}, out.Items) - if reads := transformer.GetReads(); reads != 1 { + if reads := transformer.GetReadsAndReset(); reads != 1 { t.Errorf("unexpected reads: %d", reads) } if recorder.reads != 1 { diff --git a/staging/src/k8s.io/apiserver/pkg/storage/testing/utils.go b/staging/src/k8s.io/apiserver/pkg/storage/testing/utils.go index 42db0cbffe5..c6bca507b8f 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/testing/utils.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/testing/utils.go @@ -232,10 +232,6 @@ func (p *PrefixTransformer) TransformToStorage(ctx context.Context, data []byte, return data, p.err } -func (p *PrefixTransformer) GetReads() uint64 { - return atomic.LoadUint64(&p.reads) -} - -func (p *PrefixTransformer) ResetReads() { - atomic.StoreUint64(&p.reads, 0) +func (p *PrefixTransformer) GetReadsAndReset() uint64 { + return atomic.SwapUint64(&p.reads, 0) } From 6c8ce894e18a2e2cae20edd2f6e79db9407ebce3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Tyczy=C5=84ski?= Date: Wed, 26 Oct 2022 12:18:21 +0200 Subject: [PATCH 2/3] Refactor pagination tests --- .../apiserver/pkg/storage/etcd3/store_test.go | 361 +++--------------- .../pkg/storage/testing/store_tests.go | 267 +++++++++++++ 2 files changed, 323 insertions(+), 305 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go index 45e226d0210..54c599f2d6b 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go @@ -202,212 +202,6 @@ func TestListWithoutPaging(t *testing.T) { storagetesting.RunTestListWithoutPaging(ctx, t, store) } -func TestListContinuation(t *testing.T) { - ctx, store, etcdClient := testSetup(t) - transformer := store.transformer.(*storagetesting.PrefixTransformer) - recorder := &clientRecorder{KV: etcdClient.KV} - etcdClient.KV = recorder - - // Setup storage with the following structure: - // / - // - one-level/ - // | - test - // | - // - two-level/ - // - 1/ - // | - test - // | - // - 2/ - // - test - // - preset := []struct { - key string - obj *example.Pod - storedObj *example.Pod - }{ - { - key: "/one-level/test", - obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, - }, - { - key: "/two-level/1/test", - obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, - }, - { - key: "/two-level/2/test", - obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}}, - }, - } - - for i, ps := range preset { - preset[i].storedObj = &example.Pod{} - err := store.Create(ctx, ps.key, ps.obj, preset[i].storedObj, 0) - if err != nil { - t.Fatalf("Set failed: %v", err) - } - } - - // test continuations - out := &example.PodList{} - pred := func(limit int64, continueValue string) storage.SelectionPredicate { - return storage.SelectionPredicate{ - Limit: limit, - Continue: continueValue, - Label: labels.Everything(), - Field: fields.Everything(), - GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) { - pod := obj.(*example.Pod) - return nil, fields.Set{"metadata.name": pod.Name}, nil - }, - } - } - options := storage.ListOptions{ - ResourceVersion: "0", - Predicate: pred(1, ""), - Recursive: true, - } - if err := store.GetList(ctx, "/", options, out); err != nil { - t.Fatalf("Unable to get initial list: %v", err) - } - if len(out.Continue) == 0 { - t.Fatalf("No continuation token set") - } - storagetesting.ExpectNoDiff(t, "incorrect first page", []example.Pod{*preset[0].storedObj}, out.Items) - if reads := transformer.GetReadsAndReset(); reads != 1 { - t.Errorf("unexpected reads: %d", reads) - } - if recorder.reads != 1 { - t.Errorf("unexpected reads: %d", recorder.reads) - } - recorder.resetReads() - - continueFromSecondItem := out.Continue - - // no limit, should get two items - out = &example.PodList{} - options = storage.ListOptions{ - ResourceVersion: "0", - Predicate: pred(0, continueFromSecondItem), - Recursive: true, - } - if err := store.GetList(ctx, "/", options, out); err != nil { - t.Fatalf("Unable to get second page: %v", err) - } - if len(out.Continue) != 0 { - t.Fatalf("Unexpected continuation token set") - } - key, rv, err := storage.DecodeContinue(continueFromSecondItem, "/") - t.Logf("continue token was %d %s %v", rv, key, err) - storagetesting.ExpectNoDiff(t, "incorrect second page", []example.Pod{*preset[1].storedObj, *preset[2].storedObj}, out.Items) - if reads := transformer.GetReadsAndReset(); reads != 2 { - t.Errorf("unexpected reads: %d", reads) - } - if recorder.reads != 1 { - t.Errorf("unexpected reads: %d", recorder.reads) - } - recorder.resetReads() - - // limit, should get two more pages - out = &example.PodList{} - options = storage.ListOptions{ - ResourceVersion: "0", - Predicate: pred(1, continueFromSecondItem), - Recursive: true, - } - if err := store.GetList(ctx, "/", options, out); err != nil { - t.Fatalf("Unable to get second page: %v", err) - } - if len(out.Continue) == 0 { - t.Fatalf("No continuation token set") - } - storagetesting.ExpectNoDiff(t, "incorrect second page", []example.Pod{*preset[1].storedObj}, out.Items) - if reads := transformer.GetReadsAndReset(); reads != 1 { - t.Errorf("unexpected reads: %d", reads) - } - if recorder.reads != 1 { - t.Errorf("unexpected reads: %d", recorder.reads) - } - recorder.resetReads() - - continueFromThirdItem := out.Continue - - out = &example.PodList{} - options = storage.ListOptions{ - ResourceVersion: "0", - Predicate: pred(1, continueFromThirdItem), - Recursive: true, - } - if err := store.GetList(ctx, "/", options, out); err != nil { - t.Fatalf("Unable to get second page: %v", err) - } - if len(out.Continue) != 0 { - t.Fatalf("Unexpected continuation token set") - } - storagetesting.ExpectNoDiff(t, "incorrect third page", []example.Pod{*preset[2].storedObj}, out.Items) - if reads := transformer.GetReadsAndReset(); reads != 1 { - t.Errorf("unexpected reads: %d", reads) - } - if recorder.reads != 1 { - t.Errorf("unexpected reads: %d", recorder.reads) - } -} - -func TestListPaginationRareObject(t *testing.T) { - ctx, store, etcdClient := testSetup(t) - transformer := store.transformer.(*storagetesting.PrefixTransformer) - recorder := &clientRecorder{KV: etcdClient.KV} - etcdClient.KV = recorder - - podCount := 1000 - var pods []*example.Pod - for i := 0; i < podCount; i++ { - key := fmt.Sprintf("/one-level/pod-%d", i) - obj := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("pod-%d", i)}} - storedObj := &example.Pod{} - err := store.Create(ctx, key, obj, storedObj, 0) - if err != nil { - t.Fatalf("Set failed: %v", err) - } - pods = append(pods, storedObj) - } - - out := &example.PodList{} - options := storage.ListOptions{ - Predicate: storage.SelectionPredicate{ - Limit: 1, - Label: labels.Everything(), - Field: fields.OneTermEqualSelector("metadata.name", "pod-999"), - GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) { - pod := obj.(*example.Pod) - return nil, fields.Set{"metadata.name": pod.Name}, nil - }, - }, - Recursive: true, - } - if err := store.GetList(ctx, "/", options, out); err != nil { - t.Fatalf("Unable to get initial list: %v", err) - } - if len(out.Continue) != 0 { - t.Errorf("Unexpected continuation token set") - } - if len(out.Items) != 1 || !reflect.DeepEqual(&out.Items[0], pods[999]) { - t.Fatalf("Unexpected first page: %#v", out.Items) - } - if reads := transformer.GetReadsAndReset(); reads != uint64(podCount) { - t.Errorf("unexpected reads: %d", reads) - } - // We expect that kube-apiserver will be increasing page sizes - // if not full pages are received, so we should see significantly less - // than 1000 pages (which would be result of talking to etcd with page size - // copied from pred.Limit). - // The expected number of calls is n+1 where n is the smallest n so that: - // pageSize + pageSize * 2 + pageSize * 4 + ... + pageSize * 2^n >= podCount. - // For pageSize = 1, podCount = 1000, we get n+1 = 10, 2 ^ 10 = 1024. - if recorder.reads != 10 { - t.Errorf("unexpected reads: %d", recorder.reads) - } -} - type clientRecorder struct { reads uint64 clientv3.KV @@ -418,8 +212,60 @@ func (r *clientRecorder) Get(ctx context.Context, key string, opts ...clientv3.O return r.KV.Get(ctx, key, opts...) } -func (r *clientRecorder) resetReads() { - r.reads = 0 +func (r *clientRecorder) GetReadsAndReset() uint64 { + result := atomic.LoadUint64(&r.reads) + atomic.StoreUint64(&r.reads, 0) + return result +} + +func checkStorageCallsInvariants(transformer *storagetesting.PrefixTransformer, recorder *clientRecorder) storagetesting.CallsValidation { + return func(t *testing.T, pageSize, estimatedProcessedObjects uint64) { + if reads := transformer.GetReadsAndReset(); reads != estimatedProcessedObjects { + t.Errorf("unexpected reads: %d, expected: %d", reads, estimatedProcessedObjects) + } + estimatedGetCalls := uint64(1) + if pageSize != 0 { + // We expect that kube-apiserver will be increasing page sizes + // if not full pages are received, so we should see significantly less + // than 1000 pages (which would be result of talking to etcd with page size + // copied from pred.Limit). + // The expected number of calls is n+1 where n is the smallest n so that: + // pageSize + pageSize * 2 + pageSize * 4 + ... + pageSize * 2^n >= podCount. + // For pageSize = 1, podCount = 1000, we get n+1 = 10, 2 ^ 10 = 1024. + currLimit := pageSize + for sum := uint64(1); sum < estimatedProcessedObjects; { + currLimit *= 2 + if currLimit > maxLimit { + currLimit = maxLimit + } + sum += currLimit + estimatedGetCalls++ + } + } + if reads := recorder.GetReadsAndReset(); reads != estimatedGetCalls { + t.Errorf("unexpected reads: %d", reads) + } + } +} + +func TestListContinuation(t *testing.T) { + ctx, store, etcdClient := testSetup(t) + transformer := store.transformer.(*storagetesting.PrefixTransformer) + recorder := &clientRecorder{KV: etcdClient.KV} + etcdClient.KV = recorder + validation := checkStorageCallsInvariants(transformer, recorder) + + storagetesting.RunTestListContinuation(ctx, t, store, validation) +} + +func TestListPaginationRareObject(t *testing.T) { + ctx, store, etcdClient := testSetup(t) + transformer := store.transformer.(*storagetesting.PrefixTransformer) + recorder := &clientRecorder{KV: etcdClient.KV} + etcdClient.KV = recorder + validation := checkStorageCallsInvariants(transformer, recorder) + + storagetesting.RunTestListPaginationRareObject(ctx, t, store, validation) } func TestListContinuationWithFilter(t *testing.T) { @@ -427,104 +273,9 @@ func TestListContinuationWithFilter(t *testing.T) { transformer := store.transformer.(*storagetesting.PrefixTransformer) recorder := &clientRecorder{KV: etcdClient.KV} etcdClient.KV = recorder + validation := checkStorageCallsInvariants(transformer, recorder) - preset := []struct { - key string - obj *example.Pod - storedObj *example.Pod - }{ - { - key: "/1", - obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, - }, - { - key: "/2", - obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}}, // this should not match - }, - { - key: "/3", - obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, - }, - { - key: "/4", - obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, - }, - } - - for i, ps := range preset { - preset[i].storedObj = &example.Pod{} - err := store.Create(ctx, ps.key, ps.obj, preset[i].storedObj, 0) - if err != nil { - t.Fatalf("Set failed: %v", err) - } - } - - // the first list call should try to get 2 items from etcd (and only those items should be returned) - // the field selector should result in it reading 3 items via the transformer - // the chunking should result in 2 etcd Gets - // there should be a continueValue because there is more data - out := &example.PodList{} - pred := func(limit int64, continueValue string) storage.SelectionPredicate { - return storage.SelectionPredicate{ - Limit: limit, - Continue: continueValue, - Label: labels.Everything(), - Field: fields.OneTermNotEqualSelector("metadata.name", "bar"), - GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) { - pod := obj.(*example.Pod) - return nil, fields.Set{"metadata.name": pod.Name}, nil - }, - } - } - options := storage.ListOptions{ - ResourceVersion: "0", - Predicate: pred(2, ""), - Recursive: true, - } - if err := store.GetList(ctx, "/", options, out); err != nil { - t.Errorf("Unable to get initial list: %v", err) - } - if len(out.Continue) == 0 { - t.Errorf("No continuation token set") - } - storagetesting.ExpectNoDiff(t, "incorrect first page", []example.Pod{*preset[0].storedObj, *preset[2].storedObj}, out.Items) - if reads := transformer.GetReadsAndReset(); reads != 3 { - t.Errorf("unexpected reads: %d", reads) - } - if recorder.reads != 2 { - t.Errorf("unexpected reads: %d", recorder.reads) - } - recorder.resetReads() - - // the rest of the test does not make sense if the previous call failed - if t.Failed() { - return - } - - cont := out.Continue - - // the second list call should try to get 2 more items from etcd - // but since there is only one item left, that is all we should get with no continueValue - // both read counters should be incremented for the singular calls they make in this case - out = &example.PodList{} - options = storage.ListOptions{ - ResourceVersion: "0", - Predicate: pred(2, cont), - Recursive: true, - } - if err := store.GetList(ctx, "/", options, out); err != nil { - t.Errorf("Unable to get second page: %v", err) - } - if len(out.Continue) != 0 { - t.Errorf("Unexpected continuation token set") - } - storagetesting.ExpectNoDiff(t, "incorrect second page", []example.Pod{*preset[3].storedObj}, out.Items) - if reads := transformer.GetReadsAndReset(); reads != 1 { - t.Errorf("unexpected reads: %d", reads) - } - if recorder.reads != 1 { - t.Errorf("unexpected reads: %d", recorder.reads) - } + storagetesting.RunTestListContinuationWithFilter(ctx, t, store, validation) } func TestListInconsistentContinuation(t *testing.T) { diff --git a/staging/src/k8s.io/apiserver/pkg/storage/testing/store_tests.go b/staging/src/k8s.io/apiserver/pkg/storage/testing/store_tests.go index 75030157553..f7c92a3ff81 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/testing/store_tests.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/testing/store_tests.go @@ -21,6 +21,7 @@ import ( "errors" "fmt" "math" + "reflect" "strconv" "sync" "testing" @@ -1143,6 +1144,272 @@ func RunTestGetListNonRecursive(ctx context.Context, t *testing.T, store storage } } +type CallsValidation func(t *testing.T, pageSize, estimatedProcessedObjects uint64) + +func RunTestListContinuation(ctx context.Context, t *testing.T, store storage.Interface, validation CallsValidation) { + // Setup storage with the following structure: + // / + // - one-level/ + // | - test + // | + // - two-level/ + // - 1/ + // | - test + // | + // - 2/ + // - test + // + preset := []struct { + key string + obj *example.Pod + storedObj *example.Pod + }{ + { + key: "/one-level/test", + obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, + }, + { + key: "/two-level/1/test", + obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, + }, + { + key: "/two-level/2/test", + obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}}, + }, + } + + for i, ps := range preset { + preset[i].storedObj = &example.Pod{} + err := store.Create(ctx, ps.key, ps.obj, preset[i].storedObj, 0) + if err != nil { + t.Fatalf("Set failed: %v", err) + } + } + + // test continuations + out := &example.PodList{} + pred := func(limit int64, continueValue string) storage.SelectionPredicate { + return storage.SelectionPredicate{ + Limit: limit, + Continue: continueValue, + Label: labels.Everything(), + Field: fields.Everything(), + GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) { + pod := obj.(*example.Pod) + return nil, fields.Set{"metadata.name": pod.Name}, nil + }, + } + } + options := storage.ListOptions{ + ResourceVersion: "0", + Predicate: pred(1, ""), + Recursive: true, + } + if err := store.GetList(ctx, "/", options, out); err != nil { + t.Fatalf("Unable to get initial list: %v", err) + } + if len(out.Continue) == 0 { + t.Fatalf("No continuation token set") + } + ExpectNoDiff(t, "incorrect first page", []example.Pod{*preset[0].storedObj}, out.Items) + if validation != nil { + validation(t, 1, 1) + } + + continueFromSecondItem := out.Continue + + // no limit, should get two items + out = &example.PodList{} + options = storage.ListOptions{ + ResourceVersion: "0", + Predicate: pred(0, continueFromSecondItem), + Recursive: true, + } + if err := store.GetList(ctx, "/", options, out); err != nil { + t.Fatalf("Unable to get second page: %v", err) + } + if len(out.Continue) != 0 { + t.Fatalf("Unexpected continuation token set") + } + key, rv, err := storage.DecodeContinue(continueFromSecondItem, "/") + t.Logf("continue token was %d %s %v", rv, key, err) + ExpectNoDiff(t, "incorrect second page", []example.Pod{*preset[1].storedObj, *preset[2].storedObj}, out.Items) + if validation != nil { + validation(t, 0, 2) + } + + // limit, should get two more pages + out = &example.PodList{} + options = storage.ListOptions{ + ResourceVersion: "0", + Predicate: pred(1, continueFromSecondItem), + Recursive: true, + } + if err := store.GetList(ctx, "/", options, out); err != nil { + t.Fatalf("Unable to get second page: %v", err) + } + if len(out.Continue) == 0 { + t.Fatalf("No continuation token set") + } + ExpectNoDiff(t, "incorrect second page", []example.Pod{*preset[1].storedObj}, out.Items) + if validation != nil { + validation(t, 1, 1) + } + + continueFromThirdItem := out.Continue + + out = &example.PodList{} + options = storage.ListOptions{ + ResourceVersion: "0", + Predicate: pred(1, continueFromThirdItem), + Recursive: true, + } + if err := store.GetList(ctx, "/", options, out); err != nil { + t.Fatalf("Unable to get second page: %v", err) + } + if len(out.Continue) != 0 { + t.Fatalf("Unexpected continuation token set") + } + ExpectNoDiff(t, "incorrect third page", []example.Pod{*preset[2].storedObj}, out.Items) + if validation != nil { + validation(t, 1, 1) + } +} + +func RunTestListPaginationRareObject(ctx context.Context, t *testing.T, store storage.Interface, validation CallsValidation) { + podCount := 1000 + var pods []*example.Pod + for i := 0; i < podCount; i++ { + key := fmt.Sprintf("/one-level/pod-%d", i) + obj := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("pod-%d", i)}} + storedObj := &example.Pod{} + err := store.Create(ctx, key, obj, storedObj, 0) + if err != nil { + t.Fatalf("Set failed: %v", err) + } + pods = append(pods, storedObj) + } + + out := &example.PodList{} + options := storage.ListOptions{ + Predicate: storage.SelectionPredicate{ + Limit: 1, + Label: labels.Everything(), + Field: fields.OneTermEqualSelector("metadata.name", "pod-999"), + GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) { + pod := obj.(*example.Pod) + return nil, fields.Set{"metadata.name": pod.Name}, nil + }, + }, + Recursive: true, + } + if err := store.GetList(ctx, "/", options, out); err != nil { + t.Fatalf("Unable to get initial list: %v", err) + } + if len(out.Continue) != 0 { + t.Errorf("Unexpected continuation token set") + } + if len(out.Items) != 1 || !reflect.DeepEqual(&out.Items[0], pods[999]) { + t.Fatalf("Unexpected first page: %#v", out.Items) + } + if validation != nil { + validation(t, 1, uint64(podCount)) + } +} + +func RunTestListContinuationWithFilter(ctx context.Context, t *testing.T, store storage.Interface, validation CallsValidation) { + preset := []struct { + key string + obj *example.Pod + storedObj *example.Pod + }{ + { + key: "/1", + obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, + }, + { + key: "/2", + obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}}, // this should not match + }, + { + key: "/3", + obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, + }, + { + key: "/4", + obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, + }, + } + + for i, ps := range preset { + preset[i].storedObj = &example.Pod{} + err := store.Create(ctx, ps.key, ps.obj, preset[i].storedObj, 0) + if err != nil { + t.Fatalf("Set failed: %v", err) + } + } + + // the first list call should try to get 2 items from etcd (and only those items should be returned) + // the field selector should result in it reading 3 items via the transformer + // the chunking should result in 2 etcd Gets + // there should be a continueValue because there is more data + out := &example.PodList{} + pred := func(limit int64, continueValue string) storage.SelectionPredicate { + return storage.SelectionPredicate{ + Limit: limit, + Continue: continueValue, + Label: labels.Everything(), + Field: fields.OneTermNotEqualSelector("metadata.name", "bar"), + GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) { + pod := obj.(*example.Pod) + return nil, fields.Set{"metadata.name": pod.Name}, nil + }, + } + } + options := storage.ListOptions{ + ResourceVersion: "0", + Predicate: pred(2, ""), + Recursive: true, + } + if err := store.GetList(ctx, "/", options, out); err != nil { + t.Errorf("Unable to get initial list: %v", err) + } + if len(out.Continue) == 0 { + t.Errorf("No continuation token set") + } + ExpectNoDiff(t, "incorrect first page", []example.Pod{*preset[0].storedObj, *preset[2].storedObj}, out.Items) + if validation != nil { + validation(t, 2, 3) + } + + // the rest of the test does not make sense if the previous call failed + if t.Failed() { + return + } + + cont := out.Continue + + // the second list call should try to get 2 more items from etcd + // but since there is only one item left, that is all we should get with no continueValue + // both read counters should be incremented for the singular calls they make in this case + out = &example.PodList{} + options = storage.ListOptions{ + ResourceVersion: "0", + Predicate: pred(2, cont), + Recursive: true, + } + if err := store.GetList(ctx, "/", options, out); err != nil { + t.Errorf("Unable to get second page: %v", err) + } + if len(out.Continue) != 0 { + t.Errorf("Unexpected continuation token set") + } + ExpectNoDiff(t, "incorrect second page", []example.Pod{*preset[3].storedObj}, out.Items) + if validation != nil { + validation(t, 2, 1) + } +} + type PrefixTransformerModifier func(*PrefixTransformer) value.Transformer type InterfaceWithPrefixTransformer interface { From e04fe81dfcff189ef5162b9b26b55760f613a010 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Tyczy=C5=84ski?= Date: Wed, 26 Oct 2022 13:05:49 +0200 Subject: [PATCH 3/3] Minor cleanup of etcd3 tests --- .../apiserver/pkg/storage/etcd3/store_test.go | 68 +++++++++---------- 1 file changed, 34 insertions(+), 34 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go index 54c599f2d6b..f367d8327b7 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go @@ -202,22 +202,6 @@ func TestListWithoutPaging(t *testing.T) { storagetesting.RunTestListWithoutPaging(ctx, t, store) } -type clientRecorder struct { - reads uint64 - clientv3.KV -} - -func (r *clientRecorder) Get(ctx context.Context, key string, opts ...clientv3.OpOption) (*clientv3.GetResponse, error) { - atomic.AddUint64(&r.reads, 1) - return r.KV.Get(ctx, key, opts...) -} - -func (r *clientRecorder) GetReadsAndReset() uint64 { - result := atomic.LoadUint64(&r.reads) - atomic.StoreUint64(&r.reads, 0) - return result -} - func checkStorageCallsInvariants(transformer *storagetesting.PrefixTransformer, recorder *clientRecorder) storagetesting.CallsValidation { return func(t *testing.T, pageSize, estimatedProcessedObjects uint64) { if reads := transformer.GetReadsAndReset(); reads != estimatedProcessedObjects { @@ -249,32 +233,23 @@ func checkStorageCallsInvariants(transformer *storagetesting.PrefixTransformer, } func TestListContinuation(t *testing.T) { - ctx, store, etcdClient := testSetup(t) - transformer := store.transformer.(*storagetesting.PrefixTransformer) - recorder := &clientRecorder{KV: etcdClient.KV} - etcdClient.KV = recorder - validation := checkStorageCallsInvariants(transformer, recorder) - + ctx, store, etcdClient := testSetup(t, withRecorder()) + validation := checkStorageCallsInvariants( + store.transformer.(*storagetesting.PrefixTransformer), etcdClient.KV.(*clientRecorder)) storagetesting.RunTestListContinuation(ctx, t, store, validation) } func TestListPaginationRareObject(t *testing.T) { - ctx, store, etcdClient := testSetup(t) - transformer := store.transformer.(*storagetesting.PrefixTransformer) - recorder := &clientRecorder{KV: etcdClient.KV} - etcdClient.KV = recorder - validation := checkStorageCallsInvariants(transformer, recorder) - + ctx, store, etcdClient := testSetup(t, withRecorder()) + validation := checkStorageCallsInvariants( + store.transformer.(*storagetesting.PrefixTransformer), etcdClient.KV.(*clientRecorder)) storagetesting.RunTestListPaginationRareObject(ctx, t, store, validation) } func TestListContinuationWithFilter(t *testing.T) { - ctx, store, etcdClient := testSetup(t) - transformer := store.transformer.(*storagetesting.PrefixTransformer) - recorder := &clientRecorder{KV: etcdClient.KV} - etcdClient.KV = recorder - validation := checkStorageCallsInvariants(transformer, recorder) - + ctx, store, etcdClient := testSetup(t, withRecorder()) + validation := checkStorageCallsInvariants( + store.transformer.(*storagetesting.PrefixTransformer), etcdClient.KV.(*clientRecorder)) storagetesting.RunTestListContinuationWithFilter(ctx, t, store, validation) } @@ -449,6 +424,20 @@ func newTestTransformer() value.Transformer { return storagetesting.NewPrefixTransformer([]byte(defaultTestPrefix), false) } +type clientRecorder struct { + reads uint64 + clientv3.KV +} + +func (r *clientRecorder) Get(ctx context.Context, key string, opts ...clientv3.OpOption) (*clientv3.GetResponse, error) { + atomic.AddUint64(&r.reads, 1) + return r.KV.Get(ctx, key, opts...) +} + +func (r *clientRecorder) GetReadsAndReset() uint64 { + return atomic.SwapUint64(&r.reads, 0) +} + type setupOptions struct { client func(*testing.T) *clientv3.Client codec runtime.Codec @@ -458,6 +447,8 @@ type setupOptions struct { transformer value.Transformer pagingEnabled bool leaseConfig LeaseManagerConfig + + recorderEnabled bool } type setupOption func(*setupOptions) @@ -508,6 +499,12 @@ func withLeaseConfig(leaseConfig LeaseManagerConfig) setupOption { } } +func withRecorder() setupOption { + return func(options *setupOptions) { + options.recorderEnabled = true + } +} + func withDefaults(options *setupOptions) { options.client = func(t *testing.T) *clientv3.Client { return testserver.RunEtcd(t, nil) @@ -530,6 +527,9 @@ func testSetup(t *testing.T, opts ...setupOption) (context.Context, *store, *cli opt(&setupOpts) } client := setupOpts.client(t) + if setupOpts.recorderEnabled { + client.KV = &clientRecorder{KV: client.KV} + } store := newStore( client, setupOpts.codec,