From 002c75442d768d2bcc51047667354ff16bbfa2e8 Mon Sep 17 00:00:00 2001 From: Monis Khan Date: Fri, 28 Feb 2020 15:35:22 -0500 Subject: [PATCH] storage: confirm that paging and predicate filtering work together This change adds the TestListContinuationWithFilter test which confirms that paging with a predicate that does not match everything results in the correct amount of calls to TransformFromStorage and KV.Get. The partial result of each paging call is also asserted. Signed-off-by: Monis Khan --- .../apiserver/pkg/storage/etcd3/store_test.go | 195 ++++++++++++++++-- .../pkg/storage/etcd3/watcher_test.go | 4 +- 2 files changed, 182 insertions(+), 17 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go index e3f625b50e3..51364728317 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go @@ -27,6 +27,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "testing" "github.com/coreos/pkg/capnslog" @@ -71,9 +72,11 @@ type prefixTransformer struct { prefix []byte stale bool err error + reads uint64 } -func (p prefixTransformer) TransformFromStorage(b []byte, ctx value.Context) ([]byte, bool, error) { +func (p *prefixTransformer) TransformFromStorage(b []byte, ctx value.Context) ([]byte, bool, error) { + atomic.AddUint64(&p.reads, 1) if ctx == nil { panic("no context provided") } @@ -82,7 +85,7 @@ func (p prefixTransformer) TransformFromStorage(b []byte, ctx value.Context) ([] } return bytes.TrimPrefix(b, p.prefix), p.stale, p.err } -func (p prefixTransformer) TransformToStorage(b []byte, ctx value.Context) ([]byte, error) { +func (p *prefixTransformer) TransformToStorage(b []byte, ctx value.Context) ([]byte, error) { if ctx == nil { panic("no context provided") } @@ -92,6 +95,10 @@ func (p prefixTransformer) TransformToStorage(b []byte, ctx value.Context) ([]by return b, p.err } +func (p *prefixTransformer) resetReads() { + p.reads = 0 +} + func TestCreate(t *testing.T) { ctx, store, cluster := testSetup(t) defer cluster.Terminate(t) @@ -502,11 +509,11 @@ func TestGuaranteedUpdate(t *testing.T) { if tt.expectNoUpdate { name = storeObj.Name } - originalTransformer := store.transformer.(prefixTransformer) + originalTransformer := store.transformer.(*prefixTransformer) if tt.transformStale { - transformer := originalTransformer + transformer := *originalTransformer transformer.stale = true - store.transformer = transformer + store.transformer = &transformer } version := storeObj.ResourceVersion err := store.GuaranteedUpdate(ctx, tt.key, out, tt.ignoreNotFound, tt.precondition, @@ -606,7 +613,7 @@ func TestGuaranteedUpdateChecksStoredData(t *testing.T) { t.Fatal(err) } - store.transformer = prefixTransformer{prefix: []byte(defaultTestPrefix)} + store.transformer = &prefixTransformer{prefix: []byte(defaultTestPrefix)} // this update should write the canonical value to etcd because the new serialization differs // from the stored serialization @@ -639,7 +646,7 @@ func TestGuaranteedUpdateChecksStoredData(t *testing.T) { t.Errorf("guaranteed update should have short-circuited write, got %#v", out) } - store.transformer = prefixTransformer{prefix: []byte(defaultTestPrefix), stale: true} + store.transformer = &prefixTransformer{prefix: []byte(defaultTestPrefix), stale: true} // this update should write to etcd because the transformer reported stale err = store.GuaranteedUpdate(ctx, key, out, true, nil, @@ -752,7 +759,7 @@ func TestTransformationFailure(t *testing.T) { codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) defer cluster.Terminate(t) - store := newStore(cluster.RandClient(), false, codec, "", prefixTransformer{prefix: []byte(defaultTestPrefix)}) + store := newStore(cluster.RandClient(), false, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}) ctx := context.Background() preset := []struct { @@ -782,7 +789,7 @@ func TestTransformationFailure(t *testing.T) { // create a second resource with an invalid prefix oldTransformer := store.transformer - store.transformer = prefixTransformer{prefix: []byte("otherprefix!")} + store.transformer = &prefixTransformer{prefix: []byte("otherprefix!")} for i, ps := range preset[1:] { preset[1:][i].storedObj = &example.Pod{} err := store.Create(ctx, ps.key, ps.obj, preset[1:][i].storedObj, 0) @@ -829,8 +836,8 @@ func TestList(t *testing.T) { codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) defer cluster.Terminate(t) - store := newStore(cluster.RandClient(), true, codec, "", prefixTransformer{prefix: []byte(defaultTestPrefix)}) - disablePagingStore := newStore(cluster.RandClient(), false, codec, "", prefixTransformer{prefix: []byte(defaultTestPrefix)}) + store := newStore(cluster.RandClient(), true, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}) + disablePagingStore := newStore(cluster.RandClient(), false, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}) ctx := context.Background() // Setup storage with the following structure: @@ -1182,7 +1189,11 @@ func TestListContinuation(t *testing.T) { codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) defer cluster.Terminate(t) - store := newStore(cluster.RandClient(), true, codec, "", prefixTransformer{prefix: []byte(defaultTestPrefix)}) + transformer := &prefixTransformer{prefix: []byte(defaultTestPrefix)} + etcdClient := cluster.RandClient() + recorder := &clientRecorder{KV: etcdClient.KV} + etcdClient.KV = recorder + store := newStore(etcdClient, true, codec, "", transformer) ctx := context.Background() // Setup storage with the following structure: @@ -1247,6 +1258,14 @@ func TestListContinuation(t *testing.T) { if len(out.Items) != 1 || !reflect.DeepEqual(&out.Items[0], preset[0].storedObj) { t.Fatalf("Unexpected first page: %#v", out.Items) } + if transformer.reads != 1 { + t.Errorf("unexpected reads: %d", transformer.reads) + } + if recorder.reads != 1 { + t.Errorf("unexpected reads: %d", recorder.reads) + } + transformer.resetReads() + recorder.resetReads() continueFromSecondItem := out.Continue @@ -1263,6 +1282,14 @@ func TestListContinuation(t *testing.T) { t.Logf("continue token was %d %s %v", rv, key, err) t.Fatalf("Unexpected second page: %#v", out.Items) } + if transformer.reads != 2 { + t.Errorf("unexpected reads: %d", transformer.reads) + } + if recorder.reads != 1 { + t.Errorf("unexpected reads: %d", recorder.reads) + } + transformer.resetReads() + recorder.resetReads() // limit, should get two more pages out = &example.PodList{} @@ -1275,7 +1302,17 @@ func TestListContinuation(t *testing.T) { if len(out.Items) != 1 || !reflect.DeepEqual(&out.Items[0], preset[1].storedObj) { t.Fatalf("Unexpected second page: %#v", out.Items) } + if transformer.reads != 1 { + t.Errorf("unexpected reads: %d", transformer.reads) + } + if recorder.reads != 1 { + t.Errorf("unexpected reads: %d", recorder.reads) + } + transformer.resetReads() + recorder.resetReads() + continueFromThirdItem := out.Continue + out = &example.PodList{} if err := store.List(ctx, "/", "0", pred(1, continueFromThirdItem), out); err != nil { t.Fatalf("Unable to get second page: %v", err) @@ -1286,14 +1323,142 @@ func TestListContinuation(t *testing.T) { if len(out.Items) != 1 || !reflect.DeepEqual(&out.Items[0], preset[2].storedObj) { t.Fatalf("Unexpected third page: %#v", out.Items) } + if transformer.reads != 1 { + t.Errorf("unexpected reads: %d", transformer.reads) + } + if recorder.reads != 1 { + t.Errorf("unexpected reads: %d", recorder.reads) + } + transformer.resetReads() + recorder.resetReads() +} +type clientRecorder struct { + reads uint64 + clientv3.KV +} + +func (r *clientRecorder) Get(ctx context.Context, key string, opts ...clientv3.OpOption) (*clientv3.GetResponse, error) { + atomic.AddUint64(&r.reads, 1) + return r.KV.Get(ctx, key, opts...) +} + +func (r *clientRecorder) resetReads() { + r.reads = 0 +} + +func TestListContinuationWithFilter(t *testing.T) { + codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) + cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + defer cluster.Terminate(t) + transformer := &prefixTransformer{prefix: []byte(defaultTestPrefix)} + etcdClient := cluster.RandClient() + recorder := &clientRecorder{KV: etcdClient.KV} + etcdClient.KV = recorder + store := newStore(etcdClient, true, codec, "", transformer) + ctx := context.Background() + + preset := []struct { + key string + obj *example.Pod + storedObj *example.Pod + }{ + { + key: "/1", + obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, + }, + { + key: "/2", + obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}}, // this should not match + }, + { + key: "/3", + obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, + }, + { + key: "/4", + obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, + }, + } + + for i, ps := range preset { + preset[i].storedObj = &example.Pod{} + err := store.Create(ctx, ps.key, ps.obj, preset[i].storedObj, 0) + if err != nil { + t.Fatalf("Set failed: %v", err) + } + } + + // the first list call should try to get 2 items from etcd (and only those items should be returned) + // the field selector should result in it reading 3 items via the transformer + // the chunking should result in 2 etcd Gets + // there should be a continueValue because there is more data + out := &example.PodList{} + pred := func(limit int64, continueValue string) storage.SelectionPredicate { + return storage.SelectionPredicate{ + Limit: limit, + Continue: continueValue, + Label: labels.Everything(), + Field: fields.OneTermNotEqualSelector("metadata.name", "bar"), + GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) { + pod := obj.(*example.Pod) + return nil, fields.Set{"metadata.name": pod.Name}, nil + }, + } + } + if err := store.List(ctx, "/", "0", pred(2, ""), out); err != nil { + t.Errorf("Unable to get initial list: %v", err) + } + if len(out.Continue) == 0 { + t.Errorf("No continuation token set") + } + if len(out.Items) != 2 || !reflect.DeepEqual(&out.Items[0], preset[0].storedObj) || !reflect.DeepEqual(&out.Items[1], preset[2].storedObj) { + t.Errorf("Unexpected first page, len=%d: %#v", len(out.Items), out.Items) + } + if transformer.reads != 3 { + t.Errorf("unexpected reads: %d", transformer.reads) + } + if recorder.reads != 2 { + t.Errorf("unexpected reads: %d", recorder.reads) + } + transformer.resetReads() + recorder.resetReads() + + // the rest of the test does not make sense if the previous call failed + if t.Failed() { + return + } + + cont := out.Continue + + // the second list call should try to get 2 more items from etcd + // but since there is only one item left, that is all we should get with no continueValue + // both read counters should be incremented for the singular calls they make in this case + out = &example.PodList{} + if err := store.List(ctx, "/", "0", pred(2, cont), out); err != nil { + t.Errorf("Unable to get second page: %v", err) + } + if len(out.Continue) != 0 { + t.Errorf("Unexpected continuation token set") + } + if len(out.Items) != 1 || !reflect.DeepEqual(&out.Items[0], preset[3].storedObj) { + t.Errorf("Unexpected second page, len=%d: %#v", len(out.Items), out.Items) + } + if transformer.reads != 1 { + t.Errorf("unexpected reads: %d", transformer.reads) + } + if recorder.reads != 1 { + t.Errorf("unexpected reads: %d", recorder.reads) + } + transformer.resetReads() + recorder.resetReads() } func TestListInconsistentContinuation(t *testing.T) { codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) defer cluster.Terminate(t) - store := newStore(cluster.RandClient(), true, codec, "", prefixTransformer{prefix: []byte(defaultTestPrefix)}) + store := newStore(cluster.RandClient(), true, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}) ctx := context.Background() // Setup storage with the following structure: @@ -1438,7 +1603,7 @@ func TestListInconsistentContinuation(t *testing.T) { func testSetup(t *testing.T) (context.Context, *store, *integration.ClusterV3) { codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) - store := newStore(cluster.RandClient(), true, codec, "", prefixTransformer{prefix: []byte(defaultTestPrefix)}) + store := newStore(cluster.RandClient(), true, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}) ctx := context.Background() // As 30s is the default timeout for testing in glboal configuration, // we cannot wait longer than that in a single time: change it to 10 @@ -1471,7 +1636,7 @@ func TestPrefix(t *testing.T) { codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) defer cluster.Terminate(t) - transformer := prefixTransformer{prefix: []byte(defaultTestPrefix)} + transformer := &prefixTransformer{prefix: []byte(defaultTestPrefix)} testcases := map[string]string{ "custom/prefix": "/custom/prefix", "/custom//prefix//": "/custom/prefix", diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go index 1f2dca8ebf2..9e820cf557d 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go @@ -225,13 +225,13 @@ func TestWatchError(t *testing.T) { codec := &testCodec{apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)} cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) defer cluster.Terminate(t) - invalidStore := newStore(cluster.RandClient(), true, codec, "", prefixTransformer{prefix: []byte("test!")}) + invalidStore := newStore(cluster.RandClient(), true, codec, "", &prefixTransformer{prefix: []byte("test!")}) ctx := context.Background() w, err := invalidStore.Watch(ctx, "/abc", "0", storage.Everything) if err != nil { t.Fatalf("Watch failed: %v", err) } - validStore := newStore(cluster.RandClient(), true, codec, "", prefixTransformer{prefix: []byte("test!")}) + validStore := newStore(cluster.RandClient(), true, codec, "", &prefixTransformer{prefix: []byte("test!")}) validStore.GuaranteedUpdate(ctx, "/abc", &example.Pod{}, true, nil, storage.SimpleUpdate( func(runtime.Object) (runtime.Object, error) { return &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, nil