diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go index 45e226d0210..54c599f2d6b 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go @@ -202,212 +202,6 @@ func TestListWithoutPaging(t *testing.T) { storagetesting.RunTestListWithoutPaging(ctx, t, store) } -func TestListContinuation(t *testing.T) { - ctx, store, etcdClient := testSetup(t) - transformer := store.transformer.(*storagetesting.PrefixTransformer) - recorder := &clientRecorder{KV: etcdClient.KV} - etcdClient.KV = recorder - - // Setup storage with the following structure: - // / - // - one-level/ - // | - test - // | - // - two-level/ - // - 1/ - // | - test - // | - // - 2/ - // - test - // - preset := []struct { - key string - obj *example.Pod - storedObj *example.Pod - }{ - { - key: "/one-level/test", - obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, - }, - { - key: "/two-level/1/test", - obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, - }, - { - key: "/two-level/2/test", - obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}}, - }, - } - - for i, ps := range preset { - preset[i].storedObj = &example.Pod{} - err := store.Create(ctx, ps.key, ps.obj, preset[i].storedObj, 0) - if err != nil { - t.Fatalf("Set failed: %v", err) - } - } - - // test continuations - out := &example.PodList{} - pred := func(limit int64, continueValue string) storage.SelectionPredicate { - return storage.SelectionPredicate{ - Limit: limit, - Continue: continueValue, - Label: labels.Everything(), - Field: fields.Everything(), - GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) { - pod := obj.(*example.Pod) - return nil, fields.Set{"metadata.name": pod.Name}, nil - }, - } - } - options := storage.ListOptions{ - ResourceVersion: "0", - Predicate: pred(1, ""), - Recursive: true, - } - if err := store.GetList(ctx, "/", options, out); err != nil { - t.Fatalf("Unable to get initial list: %v", err) - } - if len(out.Continue) == 0 { - t.Fatalf("No continuation token set") - } - storagetesting.ExpectNoDiff(t, "incorrect first page", []example.Pod{*preset[0].storedObj}, out.Items) - if reads := transformer.GetReadsAndReset(); reads != 1 { - t.Errorf("unexpected reads: %d", reads) - } - if recorder.reads != 1 { - t.Errorf("unexpected reads: %d", recorder.reads) - } - recorder.resetReads() - - continueFromSecondItem := out.Continue - - // no limit, should get two items - out = &example.PodList{} - options = storage.ListOptions{ - ResourceVersion: "0", - Predicate: pred(0, continueFromSecondItem), - Recursive: true, - } - if err := store.GetList(ctx, "/", options, out); err != nil { - t.Fatalf("Unable to get second page: %v", err) - } - if len(out.Continue) != 0 { - t.Fatalf("Unexpected continuation token set") - } - key, rv, err := storage.DecodeContinue(continueFromSecondItem, "/") - t.Logf("continue token was %d %s %v", rv, key, err) - storagetesting.ExpectNoDiff(t, "incorrect second page", []example.Pod{*preset[1].storedObj, *preset[2].storedObj}, out.Items) - if reads := transformer.GetReadsAndReset(); reads != 2 { - t.Errorf("unexpected reads: %d", reads) - } - if recorder.reads != 1 { - t.Errorf("unexpected reads: %d", recorder.reads) - } - recorder.resetReads() - - // limit, should get two more pages - out = &example.PodList{} - options = storage.ListOptions{ - ResourceVersion: "0", - Predicate: pred(1, continueFromSecondItem), - Recursive: true, - } - if err := store.GetList(ctx, "/", options, out); err != nil { - t.Fatalf("Unable to get second page: %v", err) - } - if len(out.Continue) == 0 { - t.Fatalf("No continuation token set") - } - storagetesting.ExpectNoDiff(t, "incorrect second page", []example.Pod{*preset[1].storedObj}, out.Items) - if reads := transformer.GetReadsAndReset(); reads != 1 { - t.Errorf("unexpected reads: %d", reads) - } - if recorder.reads != 1 { - t.Errorf("unexpected reads: %d", recorder.reads) - } - recorder.resetReads() - - continueFromThirdItem := out.Continue - - out = &example.PodList{} - options = storage.ListOptions{ - ResourceVersion: "0", - Predicate: pred(1, continueFromThirdItem), - Recursive: true, - } - if err := store.GetList(ctx, "/", options, out); err != nil { - t.Fatalf("Unable to get second page: %v", err) - } - if len(out.Continue) != 0 { - t.Fatalf("Unexpected continuation token set") - } - storagetesting.ExpectNoDiff(t, "incorrect third page", []example.Pod{*preset[2].storedObj}, out.Items) - if reads := transformer.GetReadsAndReset(); reads != 1 { - t.Errorf("unexpected reads: %d", reads) - } - if recorder.reads != 1 { - t.Errorf("unexpected reads: %d", recorder.reads) - } -} - -func TestListPaginationRareObject(t *testing.T) { - ctx, store, etcdClient := testSetup(t) - transformer := store.transformer.(*storagetesting.PrefixTransformer) - recorder := &clientRecorder{KV: etcdClient.KV} - etcdClient.KV = recorder - - podCount := 1000 - var pods []*example.Pod - for i := 0; i < podCount; i++ { - key := fmt.Sprintf("/one-level/pod-%d", i) - obj := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("pod-%d", i)}} - storedObj := &example.Pod{} - err := store.Create(ctx, key, obj, storedObj, 0) - if err != nil { - t.Fatalf("Set failed: %v", err) - } - pods = append(pods, storedObj) - } - - out := &example.PodList{} - options := storage.ListOptions{ - Predicate: storage.SelectionPredicate{ - Limit: 1, - Label: labels.Everything(), - Field: fields.OneTermEqualSelector("metadata.name", "pod-999"), - GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) { - pod := obj.(*example.Pod) - return nil, fields.Set{"metadata.name": pod.Name}, nil - }, - }, - Recursive: true, - } - if err := store.GetList(ctx, "/", options, out); err != nil { - t.Fatalf("Unable to get initial list: %v", err) - } - if len(out.Continue) != 0 { - t.Errorf("Unexpected continuation token set") - } - if len(out.Items) != 1 || !reflect.DeepEqual(&out.Items[0], pods[999]) { - t.Fatalf("Unexpected first page: %#v", out.Items) - } - if reads := transformer.GetReadsAndReset(); reads != uint64(podCount) { - t.Errorf("unexpected reads: %d", reads) - } - // We expect that kube-apiserver will be increasing page sizes - // if not full pages are received, so we should see significantly less - // than 1000 pages (which would be result of talking to etcd with page size - // copied from pred.Limit). - // The expected number of calls is n+1 where n is the smallest n so that: - // pageSize + pageSize * 2 + pageSize * 4 + ... + pageSize * 2^n >= podCount. - // For pageSize = 1, podCount = 1000, we get n+1 = 10, 2 ^ 10 = 1024. - if recorder.reads != 10 { - t.Errorf("unexpected reads: %d", recorder.reads) - } -} - type clientRecorder struct { reads uint64 clientv3.KV @@ -418,8 +212,60 @@ func (r *clientRecorder) Get(ctx context.Context, key string, opts ...clientv3.O return r.KV.Get(ctx, key, opts...) } -func (r *clientRecorder) resetReads() { - r.reads = 0 +func (r *clientRecorder) GetReadsAndReset() uint64 { + result := atomic.LoadUint64(&r.reads) + atomic.StoreUint64(&r.reads, 0) + return result +} + +func checkStorageCallsInvariants(transformer *storagetesting.PrefixTransformer, recorder *clientRecorder) storagetesting.CallsValidation { + return func(t *testing.T, pageSize, estimatedProcessedObjects uint64) { + if reads := transformer.GetReadsAndReset(); reads != estimatedProcessedObjects { + t.Errorf("unexpected reads: %d, expected: %d", reads, estimatedProcessedObjects) + } + estimatedGetCalls := uint64(1) + if pageSize != 0 { + // We expect that kube-apiserver will be increasing page sizes + // if not full pages are received, so we should see significantly less + // than 1000 pages (which would be result of talking to etcd with page size + // copied from pred.Limit). + // The expected number of calls is n+1 where n is the smallest n so that: + // pageSize + pageSize * 2 + pageSize * 4 + ... + pageSize * 2^n >= podCount. + // For pageSize = 1, podCount = 1000, we get n+1 = 10, 2 ^ 10 = 1024. + currLimit := pageSize + for sum := uint64(1); sum < estimatedProcessedObjects; { + currLimit *= 2 + if currLimit > maxLimit { + currLimit = maxLimit + } + sum += currLimit + estimatedGetCalls++ + } + } + if reads := recorder.GetReadsAndReset(); reads != estimatedGetCalls { + t.Errorf("unexpected reads: %d", reads) + } + } +} + +func TestListContinuation(t *testing.T) { + ctx, store, etcdClient := testSetup(t) + transformer := store.transformer.(*storagetesting.PrefixTransformer) + recorder := &clientRecorder{KV: etcdClient.KV} + etcdClient.KV = recorder + validation := checkStorageCallsInvariants(transformer, recorder) + + storagetesting.RunTestListContinuation(ctx, t, store, validation) +} + +func TestListPaginationRareObject(t *testing.T) { + ctx, store, etcdClient := testSetup(t) + transformer := store.transformer.(*storagetesting.PrefixTransformer) + recorder := &clientRecorder{KV: etcdClient.KV} + etcdClient.KV = recorder + validation := checkStorageCallsInvariants(transformer, recorder) + + storagetesting.RunTestListPaginationRareObject(ctx, t, store, validation) } func TestListContinuationWithFilter(t *testing.T) { @@ -427,104 +273,9 @@ func TestListContinuationWithFilter(t *testing.T) { transformer := store.transformer.(*storagetesting.PrefixTransformer) recorder := &clientRecorder{KV: etcdClient.KV} etcdClient.KV = recorder + validation := checkStorageCallsInvariants(transformer, recorder) - preset := []struct { - key string - obj *example.Pod - storedObj *example.Pod - }{ - { - key: "/1", - obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, - }, - { - key: "/2", - obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}}, // this should not match - }, - { - key: "/3", - obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, - }, - { - key: "/4", - obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, - }, - } - - for i, ps := range preset { - preset[i].storedObj = &example.Pod{} - err := store.Create(ctx, ps.key, ps.obj, preset[i].storedObj, 0) - if err != nil { - t.Fatalf("Set failed: %v", err) - } - } - - // the first list call should try to get 2 items from etcd (and only those items should be returned) - // the field selector should result in it reading 3 items via the transformer - // the chunking should result in 2 etcd Gets - // there should be a continueValue because there is more data - out := &example.PodList{} - pred := func(limit int64, continueValue string) storage.SelectionPredicate { - return storage.SelectionPredicate{ - Limit: limit, - Continue: continueValue, - Label: labels.Everything(), - Field: fields.OneTermNotEqualSelector("metadata.name", "bar"), - GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) { - pod := obj.(*example.Pod) - return nil, fields.Set{"metadata.name": pod.Name}, nil - }, - } - } - options := storage.ListOptions{ - ResourceVersion: "0", - Predicate: pred(2, ""), - Recursive: true, - } - if err := store.GetList(ctx, "/", options, out); err != nil { - t.Errorf("Unable to get initial list: %v", err) - } - if len(out.Continue) == 0 { - t.Errorf("No continuation token set") - } - storagetesting.ExpectNoDiff(t, "incorrect first page", []example.Pod{*preset[0].storedObj, *preset[2].storedObj}, out.Items) - if reads := transformer.GetReadsAndReset(); reads != 3 { - t.Errorf("unexpected reads: %d", reads) - } - if recorder.reads != 2 { - t.Errorf("unexpected reads: %d", recorder.reads) - } - recorder.resetReads() - - // the rest of the test does not make sense if the previous call failed - if t.Failed() { - return - } - - cont := out.Continue - - // the second list call should try to get 2 more items from etcd - // but since there is only one item left, that is all we should get with no continueValue - // both read counters should be incremented for the singular calls they make in this case - out = &example.PodList{} - options = storage.ListOptions{ - ResourceVersion: "0", - Predicate: pred(2, cont), - Recursive: true, - } - if err := store.GetList(ctx, "/", options, out); err != nil { - t.Errorf("Unable to get second page: %v", err) - } - if len(out.Continue) != 0 { - t.Errorf("Unexpected continuation token set") - } - storagetesting.ExpectNoDiff(t, "incorrect second page", []example.Pod{*preset[3].storedObj}, out.Items) - if reads := transformer.GetReadsAndReset(); reads != 1 { - t.Errorf("unexpected reads: %d", reads) - } - if recorder.reads != 1 { - t.Errorf("unexpected reads: %d", recorder.reads) - } + storagetesting.RunTestListContinuationWithFilter(ctx, t, store, validation) } func TestListInconsistentContinuation(t *testing.T) { diff --git a/staging/src/k8s.io/apiserver/pkg/storage/testing/store_tests.go b/staging/src/k8s.io/apiserver/pkg/storage/testing/store_tests.go index 75030157553..f7c92a3ff81 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/testing/store_tests.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/testing/store_tests.go @@ -21,6 +21,7 @@ import ( "errors" "fmt" "math" + "reflect" "strconv" "sync" "testing" @@ -1143,6 +1144,272 @@ func RunTestGetListNonRecursive(ctx context.Context, t *testing.T, store storage } } +type CallsValidation func(t *testing.T, pageSize, estimatedProcessedObjects uint64) + +func RunTestListContinuation(ctx context.Context, t *testing.T, store storage.Interface, validation CallsValidation) { + // Setup storage with the following structure: + // / + // - one-level/ + // | - test + // | + // - two-level/ + // - 1/ + // | - test + // | + // - 2/ + // - test + // + preset := []struct { + key string + obj *example.Pod + storedObj *example.Pod + }{ + { + key: "/one-level/test", + obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, + }, + { + key: "/two-level/1/test", + obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, + }, + { + key: "/two-level/2/test", + obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}}, + }, + } + + for i, ps := range preset { + preset[i].storedObj = &example.Pod{} + err := store.Create(ctx, ps.key, ps.obj, preset[i].storedObj, 0) + if err != nil { + t.Fatalf("Set failed: %v", err) + } + } + + // test continuations + out := &example.PodList{} + pred := func(limit int64, continueValue string) storage.SelectionPredicate { + return storage.SelectionPredicate{ + Limit: limit, + Continue: continueValue, + Label: labels.Everything(), + Field: fields.Everything(), + GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) { + pod := obj.(*example.Pod) + return nil, fields.Set{"metadata.name": pod.Name}, nil + }, + } + } + options := storage.ListOptions{ + ResourceVersion: "0", + Predicate: pred(1, ""), + Recursive: true, + } + if err := store.GetList(ctx, "/", options, out); err != nil { + t.Fatalf("Unable to get initial list: %v", err) + } + if len(out.Continue) == 0 { + t.Fatalf("No continuation token set") + } + ExpectNoDiff(t, "incorrect first page", []example.Pod{*preset[0].storedObj}, out.Items) + if validation != nil { + validation(t, 1, 1) + } + + continueFromSecondItem := out.Continue + + // no limit, should get two items + out = &example.PodList{} + options = storage.ListOptions{ + ResourceVersion: "0", + Predicate: pred(0, continueFromSecondItem), + Recursive: true, + } + if err := store.GetList(ctx, "/", options, out); err != nil { + t.Fatalf("Unable to get second page: %v", err) + } + if len(out.Continue) != 0 { + t.Fatalf("Unexpected continuation token set") + } + key, rv, err := storage.DecodeContinue(continueFromSecondItem, "/") + t.Logf("continue token was %d %s %v", rv, key, err) + ExpectNoDiff(t, "incorrect second page", []example.Pod{*preset[1].storedObj, *preset[2].storedObj}, out.Items) + if validation != nil { + validation(t, 0, 2) + } + + // limit, should get two more pages + out = &example.PodList{} + options = storage.ListOptions{ + ResourceVersion: "0", + Predicate: pred(1, continueFromSecondItem), + Recursive: true, + } + if err := store.GetList(ctx, "/", options, out); err != nil { + t.Fatalf("Unable to get second page: %v", err) + } + if len(out.Continue) == 0 { + t.Fatalf("No continuation token set") + } + ExpectNoDiff(t, "incorrect second page", []example.Pod{*preset[1].storedObj}, out.Items) + if validation != nil { + validation(t, 1, 1) + } + + continueFromThirdItem := out.Continue + + out = &example.PodList{} + options = storage.ListOptions{ + ResourceVersion: "0", + Predicate: pred(1, continueFromThirdItem), + Recursive: true, + } + if err := store.GetList(ctx, "/", options, out); err != nil { + t.Fatalf("Unable to get second page: %v", err) + } + if len(out.Continue) != 0 { + t.Fatalf("Unexpected continuation token set") + } + ExpectNoDiff(t, "incorrect third page", []example.Pod{*preset[2].storedObj}, out.Items) + if validation != nil { + validation(t, 1, 1) + } +} + +func RunTestListPaginationRareObject(ctx context.Context, t *testing.T, store storage.Interface, validation CallsValidation) { + podCount := 1000 + var pods []*example.Pod + for i := 0; i < podCount; i++ { + key := fmt.Sprintf("/one-level/pod-%d", i) + obj := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("pod-%d", i)}} + storedObj := &example.Pod{} + err := store.Create(ctx, key, obj, storedObj, 0) + if err != nil { + t.Fatalf("Set failed: %v", err) + } + pods = append(pods, storedObj) + } + + out := &example.PodList{} + options := storage.ListOptions{ + Predicate: storage.SelectionPredicate{ + Limit: 1, + Label: labels.Everything(), + Field: fields.OneTermEqualSelector("metadata.name", "pod-999"), + GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) { + pod := obj.(*example.Pod) + return nil, fields.Set{"metadata.name": pod.Name}, nil + }, + }, + Recursive: true, + } + if err := store.GetList(ctx, "/", options, out); err != nil { + t.Fatalf("Unable to get initial list: %v", err) + } + if len(out.Continue) != 0 { + t.Errorf("Unexpected continuation token set") + } + if len(out.Items) != 1 || !reflect.DeepEqual(&out.Items[0], pods[999]) { + t.Fatalf("Unexpected first page: %#v", out.Items) + } + if validation != nil { + validation(t, 1, uint64(podCount)) + } +} + +func RunTestListContinuationWithFilter(ctx context.Context, t *testing.T, store storage.Interface, validation CallsValidation) { + preset := []struct { + key string + obj *example.Pod + storedObj *example.Pod + }{ + { + key: "/1", + obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, + }, + { + key: "/2", + obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}}, // this should not match + }, + { + key: "/3", + obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, + }, + { + key: "/4", + obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, + }, + } + + for i, ps := range preset { + preset[i].storedObj = &example.Pod{} + err := store.Create(ctx, ps.key, ps.obj, preset[i].storedObj, 0) + if err != nil { + t.Fatalf("Set failed: %v", err) + } + } + + // the first list call should try to get 2 items from etcd (and only those items should be returned) + // the field selector should result in it reading 3 items via the transformer + // the chunking should result in 2 etcd Gets + // there should be a continueValue because there is more data + out := &example.PodList{} + pred := func(limit int64, continueValue string) storage.SelectionPredicate { + return storage.SelectionPredicate{ + Limit: limit, + Continue: continueValue, + Label: labels.Everything(), + Field: fields.OneTermNotEqualSelector("metadata.name", "bar"), + GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) { + pod := obj.(*example.Pod) + return nil, fields.Set{"metadata.name": pod.Name}, nil + }, + } + } + options := storage.ListOptions{ + ResourceVersion: "0", + Predicate: pred(2, ""), + Recursive: true, + } + if err := store.GetList(ctx, "/", options, out); err != nil { + t.Errorf("Unable to get initial list: %v", err) + } + if len(out.Continue) == 0 { + t.Errorf("No continuation token set") + } + ExpectNoDiff(t, "incorrect first page", []example.Pod{*preset[0].storedObj, *preset[2].storedObj}, out.Items) + if validation != nil { + validation(t, 2, 3) + } + + // the rest of the test does not make sense if the previous call failed + if t.Failed() { + return + } + + cont := out.Continue + + // the second list call should try to get 2 more items from etcd + // but since there is only one item left, that is all we should get with no continueValue + // both read counters should be incremented for the singular calls they make in this case + out = &example.PodList{} + options = storage.ListOptions{ + ResourceVersion: "0", + Predicate: pred(2, cont), + Recursive: true, + } + if err := store.GetList(ctx, "/", options, out); err != nil { + t.Errorf("Unable to get second page: %v", err) + } + if len(out.Continue) != 0 { + t.Errorf("Unexpected continuation token set") + } + ExpectNoDiff(t, "incorrect second page", []example.Pod{*preset[3].storedObj}, out.Items) + if validation != nil { + validation(t, 2, 1) + } +} + type PrefixTransformerModifier func(*PrefixTransformer) value.Transformer type InterfaceWithPrefixTransformer interface {