storage: confirm that paging and predicate filtering work together

This change adds the TestListContinuationWithFilter test which
confirms that paging with a predicate that does not match everything
results in the correct amount of calls to TransformFromStorage and
KV.Get.  The partial result of each paging call is also asserted.

Signed-off-by: Monis Khan <mok@vmware.com>
This commit is contained in:
Monis Khan 2020-02-28 15:35:22 -05:00
parent a9c7529f99
commit 002c75442d
No known key found for this signature in database
GPG Key ID: 52C90ADA01B269B8
2 changed files with 182 additions and 17 deletions

View File

@ -27,6 +27,7 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"testing"
"github.com/coreos/pkg/capnslog"
@ -71,9 +72,11 @@ type prefixTransformer struct {
prefix []byte
stale bool
err error
reads uint64
}
func (p prefixTransformer) TransformFromStorage(b []byte, ctx value.Context) ([]byte, bool, error) {
func (p *prefixTransformer) TransformFromStorage(b []byte, ctx value.Context) ([]byte, bool, error) {
atomic.AddUint64(&p.reads, 1)
if ctx == nil {
panic("no context provided")
}
@ -82,7 +85,7 @@ func (p prefixTransformer) TransformFromStorage(b []byte, ctx value.Context) ([]
}
return bytes.TrimPrefix(b, p.prefix), p.stale, p.err
}
func (p prefixTransformer) TransformToStorage(b []byte, ctx value.Context) ([]byte, error) {
func (p *prefixTransformer) TransformToStorage(b []byte, ctx value.Context) ([]byte, error) {
if ctx == nil {
panic("no context provided")
}
@ -92,6 +95,10 @@ func (p prefixTransformer) TransformToStorage(b []byte, ctx value.Context) ([]by
return b, p.err
}
func (p *prefixTransformer) resetReads() {
p.reads = 0
}
func TestCreate(t *testing.T) {
ctx, store, cluster := testSetup(t)
defer cluster.Terminate(t)
@ -502,11 +509,11 @@ func TestGuaranteedUpdate(t *testing.T) {
if tt.expectNoUpdate {
name = storeObj.Name
}
originalTransformer := store.transformer.(prefixTransformer)
originalTransformer := store.transformer.(*prefixTransformer)
if tt.transformStale {
transformer := originalTransformer
transformer := *originalTransformer
transformer.stale = true
store.transformer = transformer
store.transformer = &transformer
}
version := storeObj.ResourceVersion
err := store.GuaranteedUpdate(ctx, tt.key, out, tt.ignoreNotFound, tt.precondition,
@ -606,7 +613,7 @@ func TestGuaranteedUpdateChecksStoredData(t *testing.T) {
t.Fatal(err)
}
store.transformer = prefixTransformer{prefix: []byte(defaultTestPrefix)}
store.transformer = &prefixTransformer{prefix: []byte(defaultTestPrefix)}
// this update should write the canonical value to etcd because the new serialization differs
// from the stored serialization
@ -639,7 +646,7 @@ func TestGuaranteedUpdateChecksStoredData(t *testing.T) {
t.Errorf("guaranteed update should have short-circuited write, got %#v", out)
}
store.transformer = prefixTransformer{prefix: []byte(defaultTestPrefix), stale: true}
store.transformer = &prefixTransformer{prefix: []byte(defaultTestPrefix), stale: true}
// this update should write to etcd because the transformer reported stale
err = store.GuaranteedUpdate(ctx, key, out, true, nil,
@ -752,7 +759,7 @@ func TestTransformationFailure(t *testing.T) {
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer cluster.Terminate(t)
store := newStore(cluster.RandClient(), false, codec, "", prefixTransformer{prefix: []byte(defaultTestPrefix)})
store := newStore(cluster.RandClient(), false, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)})
ctx := context.Background()
preset := []struct {
@ -782,7 +789,7 @@ func TestTransformationFailure(t *testing.T) {
// create a second resource with an invalid prefix
oldTransformer := store.transformer
store.transformer = prefixTransformer{prefix: []byte("otherprefix!")}
store.transformer = &prefixTransformer{prefix: []byte("otherprefix!")}
for i, ps := range preset[1:] {
preset[1:][i].storedObj = &example.Pod{}
err := store.Create(ctx, ps.key, ps.obj, preset[1:][i].storedObj, 0)
@ -829,8 +836,8 @@ func TestList(t *testing.T) {
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer cluster.Terminate(t)
store := newStore(cluster.RandClient(), true, codec, "", prefixTransformer{prefix: []byte(defaultTestPrefix)})
disablePagingStore := newStore(cluster.RandClient(), false, codec, "", prefixTransformer{prefix: []byte(defaultTestPrefix)})
store := newStore(cluster.RandClient(), true, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)})
disablePagingStore := newStore(cluster.RandClient(), false, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)})
ctx := context.Background()
// Setup storage with the following structure:
@ -1182,7 +1189,11 @@ func TestListContinuation(t *testing.T) {
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer cluster.Terminate(t)
store := newStore(cluster.RandClient(), true, codec, "", prefixTransformer{prefix: []byte(defaultTestPrefix)})
transformer := &prefixTransformer{prefix: []byte(defaultTestPrefix)}
etcdClient := cluster.RandClient()
recorder := &clientRecorder{KV: etcdClient.KV}
etcdClient.KV = recorder
store := newStore(etcdClient, true, codec, "", transformer)
ctx := context.Background()
// Setup storage with the following structure:
@ -1247,6 +1258,14 @@ func TestListContinuation(t *testing.T) {
if len(out.Items) != 1 || !reflect.DeepEqual(&out.Items[0], preset[0].storedObj) {
t.Fatalf("Unexpected first page: %#v", out.Items)
}
if transformer.reads != 1 {
t.Errorf("unexpected reads: %d", transformer.reads)
}
if recorder.reads != 1 {
t.Errorf("unexpected reads: %d", recorder.reads)
}
transformer.resetReads()
recorder.resetReads()
continueFromSecondItem := out.Continue
@ -1263,6 +1282,14 @@ func TestListContinuation(t *testing.T) {
t.Logf("continue token was %d %s %v", rv, key, err)
t.Fatalf("Unexpected second page: %#v", out.Items)
}
if transformer.reads != 2 {
t.Errorf("unexpected reads: %d", transformer.reads)
}
if recorder.reads != 1 {
t.Errorf("unexpected reads: %d", recorder.reads)
}
transformer.resetReads()
recorder.resetReads()
// limit, should get two more pages
out = &example.PodList{}
@ -1275,7 +1302,17 @@ func TestListContinuation(t *testing.T) {
if len(out.Items) != 1 || !reflect.DeepEqual(&out.Items[0], preset[1].storedObj) {
t.Fatalf("Unexpected second page: %#v", out.Items)
}
if transformer.reads != 1 {
t.Errorf("unexpected reads: %d", transformer.reads)
}
if recorder.reads != 1 {
t.Errorf("unexpected reads: %d", recorder.reads)
}
transformer.resetReads()
recorder.resetReads()
continueFromThirdItem := out.Continue
out = &example.PodList{}
if err := store.List(ctx, "/", "0", pred(1, continueFromThirdItem), out); err != nil {
t.Fatalf("Unable to get second page: %v", err)
@ -1286,14 +1323,142 @@ func TestListContinuation(t *testing.T) {
if len(out.Items) != 1 || !reflect.DeepEqual(&out.Items[0], preset[2].storedObj) {
t.Fatalf("Unexpected third page: %#v", out.Items)
}
if transformer.reads != 1 {
t.Errorf("unexpected reads: %d", transformer.reads)
}
if recorder.reads != 1 {
t.Errorf("unexpected reads: %d", recorder.reads)
}
transformer.resetReads()
recorder.resetReads()
}
type clientRecorder struct {
reads uint64
clientv3.KV
}
func (r *clientRecorder) Get(ctx context.Context, key string, opts ...clientv3.OpOption) (*clientv3.GetResponse, error) {
atomic.AddUint64(&r.reads, 1)
return r.KV.Get(ctx, key, opts...)
}
func (r *clientRecorder) resetReads() {
r.reads = 0
}
func TestListContinuationWithFilter(t *testing.T) {
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer cluster.Terminate(t)
transformer := &prefixTransformer{prefix: []byte(defaultTestPrefix)}
etcdClient := cluster.RandClient()
recorder := &clientRecorder{KV: etcdClient.KV}
etcdClient.KV = recorder
store := newStore(etcdClient, true, codec, "", transformer)
ctx := context.Background()
preset := []struct {
key string
obj *example.Pod
storedObj *example.Pod
}{
{
key: "/1",
obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}},
},
{
key: "/2",
obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}}, // this should not match
},
{
key: "/3",
obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}},
},
{
key: "/4",
obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}},
},
}
for i, ps := range preset {
preset[i].storedObj = &example.Pod{}
err := store.Create(ctx, ps.key, ps.obj, preset[i].storedObj, 0)
if err != nil {
t.Fatalf("Set failed: %v", err)
}
}
// the first list call should try to get 2 items from etcd (and only those items should be returned)
// the field selector should result in it reading 3 items via the transformer
// the chunking should result in 2 etcd Gets
// there should be a continueValue because there is more data
out := &example.PodList{}
pred := func(limit int64, continueValue string) storage.SelectionPredicate {
return storage.SelectionPredicate{
Limit: limit,
Continue: continueValue,
Label: labels.Everything(),
Field: fields.OneTermNotEqualSelector("metadata.name", "bar"),
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) {
pod := obj.(*example.Pod)
return nil, fields.Set{"metadata.name": pod.Name}, nil
},
}
}
if err := store.List(ctx, "/", "0", pred(2, ""), out); err != nil {
t.Errorf("Unable to get initial list: %v", err)
}
if len(out.Continue) == 0 {
t.Errorf("No continuation token set")
}
if len(out.Items) != 2 || !reflect.DeepEqual(&out.Items[0], preset[0].storedObj) || !reflect.DeepEqual(&out.Items[1], preset[2].storedObj) {
t.Errorf("Unexpected first page, len=%d: %#v", len(out.Items), out.Items)
}
if transformer.reads != 3 {
t.Errorf("unexpected reads: %d", transformer.reads)
}
if recorder.reads != 2 {
t.Errorf("unexpected reads: %d", recorder.reads)
}
transformer.resetReads()
recorder.resetReads()
// the rest of the test does not make sense if the previous call failed
if t.Failed() {
return
}
cont := out.Continue
// the second list call should try to get 2 more items from etcd
// but since there is only one item left, that is all we should get with no continueValue
// both read counters should be incremented for the singular calls they make in this case
out = &example.PodList{}
if err := store.List(ctx, "/", "0", pred(2, cont), out); err != nil {
t.Errorf("Unable to get second page: %v", err)
}
if len(out.Continue) != 0 {
t.Errorf("Unexpected continuation token set")
}
if len(out.Items) != 1 || !reflect.DeepEqual(&out.Items[0], preset[3].storedObj) {
t.Errorf("Unexpected second page, len=%d: %#v", len(out.Items), out.Items)
}
if transformer.reads != 1 {
t.Errorf("unexpected reads: %d", transformer.reads)
}
if recorder.reads != 1 {
t.Errorf("unexpected reads: %d", recorder.reads)
}
transformer.resetReads()
recorder.resetReads()
}
func TestListInconsistentContinuation(t *testing.T) {
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer cluster.Terminate(t)
store := newStore(cluster.RandClient(), true, codec, "", prefixTransformer{prefix: []byte(defaultTestPrefix)})
store := newStore(cluster.RandClient(), true, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)})
ctx := context.Background()
// Setup storage with the following structure:
@ -1438,7 +1603,7 @@ func TestListInconsistentContinuation(t *testing.T) {
func testSetup(t *testing.T) (context.Context, *store, *integration.ClusterV3) {
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
store := newStore(cluster.RandClient(), true, codec, "", prefixTransformer{prefix: []byte(defaultTestPrefix)})
store := newStore(cluster.RandClient(), true, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)})
ctx := context.Background()
// As 30s is the default timeout for testing in glboal configuration,
// we cannot wait longer than that in a single time: change it to 10
@ -1471,7 +1636,7 @@ func TestPrefix(t *testing.T) {
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer cluster.Terminate(t)
transformer := prefixTransformer{prefix: []byte(defaultTestPrefix)}
transformer := &prefixTransformer{prefix: []byte(defaultTestPrefix)}
testcases := map[string]string{
"custom/prefix": "/custom/prefix",
"/custom//prefix//": "/custom/prefix",

View File

@ -225,13 +225,13 @@ func TestWatchError(t *testing.T) {
codec := &testCodec{apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)}
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer cluster.Terminate(t)
invalidStore := newStore(cluster.RandClient(), true, codec, "", prefixTransformer{prefix: []byte("test!")})
invalidStore := newStore(cluster.RandClient(), true, codec, "", &prefixTransformer{prefix: []byte("test!")})
ctx := context.Background()
w, err := invalidStore.Watch(ctx, "/abc", "0", storage.Everything)
if err != nil {
t.Fatalf("Watch failed: %v", err)
}
validStore := newStore(cluster.RandClient(), true, codec, "", prefixTransformer{prefix: []byte("test!")})
validStore := newStore(cluster.RandClient(), true, codec, "", &prefixTransformer{prefix: []byte("test!")})
validStore.GuaranteedUpdate(ctx, "/abc", &example.Pod{}, true, nil, storage.SimpleUpdate(
func(runtime.Object) (runtime.Object, error) {
return &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, nil