Refactor pagination tests

This commit is contained in:
Wojciech Tyczyński 2022-10-26 12:18:21 +02:00
parent 8472e1bc13
commit 6c8ce894e1
2 changed files with 323 additions and 305 deletions

View File

@ -202,212 +202,6 @@ func TestListWithoutPaging(t *testing.T) {
storagetesting.RunTestListWithoutPaging(ctx, t, store)
}
func TestListContinuation(t *testing.T) {
ctx, store, etcdClient := testSetup(t)
transformer := store.transformer.(*storagetesting.PrefixTransformer)
recorder := &clientRecorder{KV: etcdClient.KV}
etcdClient.KV = recorder
// Setup storage with the following structure:
// /
// - one-level/
// | - test
// |
// - two-level/
// - 1/
// | - test
// |
// - 2/
// - test
//
preset := []struct {
key string
obj *example.Pod
storedObj *example.Pod
}{
{
key: "/one-level/test",
obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}},
},
{
key: "/two-level/1/test",
obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}},
},
{
key: "/two-level/2/test",
obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}},
},
}
for i, ps := range preset {
preset[i].storedObj = &example.Pod{}
err := store.Create(ctx, ps.key, ps.obj, preset[i].storedObj, 0)
if err != nil {
t.Fatalf("Set failed: %v", err)
}
}
// test continuations
out := &example.PodList{}
pred := func(limit int64, continueValue string) storage.SelectionPredicate {
return storage.SelectionPredicate{
Limit: limit,
Continue: continueValue,
Label: labels.Everything(),
Field: fields.Everything(),
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) {
pod := obj.(*example.Pod)
return nil, fields.Set{"metadata.name": pod.Name}, nil
},
}
}
options := storage.ListOptions{
ResourceVersion: "0",
Predicate: pred(1, ""),
Recursive: true,
}
if err := store.GetList(ctx, "/", options, out); err != nil {
t.Fatalf("Unable to get initial list: %v", err)
}
if len(out.Continue) == 0 {
t.Fatalf("No continuation token set")
}
storagetesting.ExpectNoDiff(t, "incorrect first page", []example.Pod{*preset[0].storedObj}, out.Items)
if reads := transformer.GetReadsAndReset(); reads != 1 {
t.Errorf("unexpected reads: %d", reads)
}
if recorder.reads != 1 {
t.Errorf("unexpected reads: %d", recorder.reads)
}
recorder.resetReads()
continueFromSecondItem := out.Continue
// no limit, should get two items
out = &example.PodList{}
options = storage.ListOptions{
ResourceVersion: "0",
Predicate: pred(0, continueFromSecondItem),
Recursive: true,
}
if err := store.GetList(ctx, "/", options, out); err != nil {
t.Fatalf("Unable to get second page: %v", err)
}
if len(out.Continue) != 0 {
t.Fatalf("Unexpected continuation token set")
}
key, rv, err := storage.DecodeContinue(continueFromSecondItem, "/")
t.Logf("continue token was %d %s %v", rv, key, err)
storagetesting.ExpectNoDiff(t, "incorrect second page", []example.Pod{*preset[1].storedObj, *preset[2].storedObj}, out.Items)
if reads := transformer.GetReadsAndReset(); reads != 2 {
t.Errorf("unexpected reads: %d", reads)
}
if recorder.reads != 1 {
t.Errorf("unexpected reads: %d", recorder.reads)
}
recorder.resetReads()
// limit, should get two more pages
out = &example.PodList{}
options = storage.ListOptions{
ResourceVersion: "0",
Predicate: pred(1, continueFromSecondItem),
Recursive: true,
}
if err := store.GetList(ctx, "/", options, out); err != nil {
t.Fatalf("Unable to get second page: %v", err)
}
if len(out.Continue) == 0 {
t.Fatalf("No continuation token set")
}
storagetesting.ExpectNoDiff(t, "incorrect second page", []example.Pod{*preset[1].storedObj}, out.Items)
if reads := transformer.GetReadsAndReset(); reads != 1 {
t.Errorf("unexpected reads: %d", reads)
}
if recorder.reads != 1 {
t.Errorf("unexpected reads: %d", recorder.reads)
}
recorder.resetReads()
continueFromThirdItem := out.Continue
out = &example.PodList{}
options = storage.ListOptions{
ResourceVersion: "0",
Predicate: pred(1, continueFromThirdItem),
Recursive: true,
}
if err := store.GetList(ctx, "/", options, out); err != nil {
t.Fatalf("Unable to get second page: %v", err)
}
if len(out.Continue) != 0 {
t.Fatalf("Unexpected continuation token set")
}
storagetesting.ExpectNoDiff(t, "incorrect third page", []example.Pod{*preset[2].storedObj}, out.Items)
if reads := transformer.GetReadsAndReset(); reads != 1 {
t.Errorf("unexpected reads: %d", reads)
}
if recorder.reads != 1 {
t.Errorf("unexpected reads: %d", recorder.reads)
}
}
func TestListPaginationRareObject(t *testing.T) {
ctx, store, etcdClient := testSetup(t)
transformer := store.transformer.(*storagetesting.PrefixTransformer)
recorder := &clientRecorder{KV: etcdClient.KV}
etcdClient.KV = recorder
podCount := 1000
var pods []*example.Pod
for i := 0; i < podCount; i++ {
key := fmt.Sprintf("/one-level/pod-%d", i)
obj := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("pod-%d", i)}}
storedObj := &example.Pod{}
err := store.Create(ctx, key, obj, storedObj, 0)
if err != nil {
t.Fatalf("Set failed: %v", err)
}
pods = append(pods, storedObj)
}
out := &example.PodList{}
options := storage.ListOptions{
Predicate: storage.SelectionPredicate{
Limit: 1,
Label: labels.Everything(),
Field: fields.OneTermEqualSelector("metadata.name", "pod-999"),
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) {
pod := obj.(*example.Pod)
return nil, fields.Set{"metadata.name": pod.Name}, nil
},
},
Recursive: true,
}
if err := store.GetList(ctx, "/", options, out); err != nil {
t.Fatalf("Unable to get initial list: %v", err)
}
if len(out.Continue) != 0 {
t.Errorf("Unexpected continuation token set")
}
if len(out.Items) != 1 || !reflect.DeepEqual(&out.Items[0], pods[999]) {
t.Fatalf("Unexpected first page: %#v", out.Items)
}
if reads := transformer.GetReadsAndReset(); reads != uint64(podCount) {
t.Errorf("unexpected reads: %d", reads)
}
// We expect that kube-apiserver will be increasing page sizes
// if not full pages are received, so we should see significantly less
// than 1000 pages (which would be result of talking to etcd with page size
// copied from pred.Limit).
// The expected number of calls is n+1 where n is the smallest n so that:
// pageSize + pageSize * 2 + pageSize * 4 + ... + pageSize * 2^n >= podCount.
// For pageSize = 1, podCount = 1000, we get n+1 = 10, 2 ^ 10 = 1024.
if recorder.reads != 10 {
t.Errorf("unexpected reads: %d", recorder.reads)
}
}
type clientRecorder struct {
reads uint64
clientv3.KV
@ -418,8 +212,60 @@ func (r *clientRecorder) Get(ctx context.Context, key string, opts ...clientv3.O
return r.KV.Get(ctx, key, opts...)
}
func (r *clientRecorder) resetReads() {
r.reads = 0
func (r *clientRecorder) GetReadsAndReset() uint64 {
result := atomic.LoadUint64(&r.reads)
atomic.StoreUint64(&r.reads, 0)
return result
}
func checkStorageCallsInvariants(transformer *storagetesting.PrefixTransformer, recorder *clientRecorder) storagetesting.CallsValidation {
return func(t *testing.T, pageSize, estimatedProcessedObjects uint64) {
if reads := transformer.GetReadsAndReset(); reads != estimatedProcessedObjects {
t.Errorf("unexpected reads: %d, expected: %d", reads, estimatedProcessedObjects)
}
estimatedGetCalls := uint64(1)
if pageSize != 0 {
// We expect that kube-apiserver will be increasing page sizes
// if not full pages are received, so we should see significantly less
// than 1000 pages (which would be result of talking to etcd with page size
// copied from pred.Limit).
// The expected number of calls is n+1 where n is the smallest n so that:
// pageSize + pageSize * 2 + pageSize * 4 + ... + pageSize * 2^n >= podCount.
// For pageSize = 1, podCount = 1000, we get n+1 = 10, 2 ^ 10 = 1024.
currLimit := pageSize
for sum := uint64(1); sum < estimatedProcessedObjects; {
currLimit *= 2
if currLimit > maxLimit {
currLimit = maxLimit
}
sum += currLimit
estimatedGetCalls++
}
}
if reads := recorder.GetReadsAndReset(); reads != estimatedGetCalls {
t.Errorf("unexpected reads: %d", reads)
}
}
}
func TestListContinuation(t *testing.T) {
ctx, store, etcdClient := testSetup(t)
transformer := store.transformer.(*storagetesting.PrefixTransformer)
recorder := &clientRecorder{KV: etcdClient.KV}
etcdClient.KV = recorder
validation := checkStorageCallsInvariants(transformer, recorder)
storagetesting.RunTestListContinuation(ctx, t, store, validation)
}
func TestListPaginationRareObject(t *testing.T) {
ctx, store, etcdClient := testSetup(t)
transformer := store.transformer.(*storagetesting.PrefixTransformer)
recorder := &clientRecorder{KV: etcdClient.KV}
etcdClient.KV = recorder
validation := checkStorageCallsInvariants(transformer, recorder)
storagetesting.RunTestListPaginationRareObject(ctx, t, store, validation)
}
func TestListContinuationWithFilter(t *testing.T) {
@ -427,104 +273,9 @@ func TestListContinuationWithFilter(t *testing.T) {
transformer := store.transformer.(*storagetesting.PrefixTransformer)
recorder := &clientRecorder{KV: etcdClient.KV}
etcdClient.KV = recorder
validation := checkStorageCallsInvariants(transformer, recorder)
preset := []struct {
key string
obj *example.Pod
storedObj *example.Pod
}{
{
key: "/1",
obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}},
},
{
key: "/2",
obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}}, // this should not match
},
{
key: "/3",
obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}},
},
{
key: "/4",
obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}},
},
}
for i, ps := range preset {
preset[i].storedObj = &example.Pod{}
err := store.Create(ctx, ps.key, ps.obj, preset[i].storedObj, 0)
if err != nil {
t.Fatalf("Set failed: %v", err)
}
}
// the first list call should try to get 2 items from etcd (and only those items should be returned)
// the field selector should result in it reading 3 items via the transformer
// the chunking should result in 2 etcd Gets
// there should be a continueValue because there is more data
out := &example.PodList{}
pred := func(limit int64, continueValue string) storage.SelectionPredicate {
return storage.SelectionPredicate{
Limit: limit,
Continue: continueValue,
Label: labels.Everything(),
Field: fields.OneTermNotEqualSelector("metadata.name", "bar"),
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) {
pod := obj.(*example.Pod)
return nil, fields.Set{"metadata.name": pod.Name}, nil
},
}
}
options := storage.ListOptions{
ResourceVersion: "0",
Predicate: pred(2, ""),
Recursive: true,
}
if err := store.GetList(ctx, "/", options, out); err != nil {
t.Errorf("Unable to get initial list: %v", err)
}
if len(out.Continue) == 0 {
t.Errorf("No continuation token set")
}
storagetesting.ExpectNoDiff(t, "incorrect first page", []example.Pod{*preset[0].storedObj, *preset[2].storedObj}, out.Items)
if reads := transformer.GetReadsAndReset(); reads != 3 {
t.Errorf("unexpected reads: %d", reads)
}
if recorder.reads != 2 {
t.Errorf("unexpected reads: %d", recorder.reads)
}
recorder.resetReads()
// the rest of the test does not make sense if the previous call failed
if t.Failed() {
return
}
cont := out.Continue
// the second list call should try to get 2 more items from etcd
// but since there is only one item left, that is all we should get with no continueValue
// both read counters should be incremented for the singular calls they make in this case
out = &example.PodList{}
options = storage.ListOptions{
ResourceVersion: "0",
Predicate: pred(2, cont),
Recursive: true,
}
if err := store.GetList(ctx, "/", options, out); err != nil {
t.Errorf("Unable to get second page: %v", err)
}
if len(out.Continue) != 0 {
t.Errorf("Unexpected continuation token set")
}
storagetesting.ExpectNoDiff(t, "incorrect second page", []example.Pod{*preset[3].storedObj}, out.Items)
if reads := transformer.GetReadsAndReset(); reads != 1 {
t.Errorf("unexpected reads: %d", reads)
}
if recorder.reads != 1 {
t.Errorf("unexpected reads: %d", recorder.reads)
}
storagetesting.RunTestListContinuationWithFilter(ctx, t, store, validation)
}
func TestListInconsistentContinuation(t *testing.T) {

View File

@ -21,6 +21,7 @@ import (
"errors"
"fmt"
"math"
"reflect"
"strconv"
"sync"
"testing"
@ -1143,6 +1144,272 @@ func RunTestGetListNonRecursive(ctx context.Context, t *testing.T, store storage
}
}
type CallsValidation func(t *testing.T, pageSize, estimatedProcessedObjects uint64)
func RunTestListContinuation(ctx context.Context, t *testing.T, store storage.Interface, validation CallsValidation) {
// Setup storage with the following structure:
// /
// - one-level/
// | - test
// |
// - two-level/
// - 1/
// | - test
// |
// - 2/
// - test
//
preset := []struct {
key string
obj *example.Pod
storedObj *example.Pod
}{
{
key: "/one-level/test",
obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}},
},
{
key: "/two-level/1/test",
obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}},
},
{
key: "/two-level/2/test",
obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}},
},
}
for i, ps := range preset {
preset[i].storedObj = &example.Pod{}
err := store.Create(ctx, ps.key, ps.obj, preset[i].storedObj, 0)
if err != nil {
t.Fatalf("Set failed: %v", err)
}
}
// test continuations
out := &example.PodList{}
pred := func(limit int64, continueValue string) storage.SelectionPredicate {
return storage.SelectionPredicate{
Limit: limit,
Continue: continueValue,
Label: labels.Everything(),
Field: fields.Everything(),
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) {
pod := obj.(*example.Pod)
return nil, fields.Set{"metadata.name": pod.Name}, nil
},
}
}
options := storage.ListOptions{
ResourceVersion: "0",
Predicate: pred(1, ""),
Recursive: true,
}
if err := store.GetList(ctx, "/", options, out); err != nil {
t.Fatalf("Unable to get initial list: %v", err)
}
if len(out.Continue) == 0 {
t.Fatalf("No continuation token set")
}
ExpectNoDiff(t, "incorrect first page", []example.Pod{*preset[0].storedObj}, out.Items)
if validation != nil {
validation(t, 1, 1)
}
continueFromSecondItem := out.Continue
// no limit, should get two items
out = &example.PodList{}
options = storage.ListOptions{
ResourceVersion: "0",
Predicate: pred(0, continueFromSecondItem),
Recursive: true,
}
if err := store.GetList(ctx, "/", options, out); err != nil {
t.Fatalf("Unable to get second page: %v", err)
}
if len(out.Continue) != 0 {
t.Fatalf("Unexpected continuation token set")
}
key, rv, err := storage.DecodeContinue(continueFromSecondItem, "/")
t.Logf("continue token was %d %s %v", rv, key, err)
ExpectNoDiff(t, "incorrect second page", []example.Pod{*preset[1].storedObj, *preset[2].storedObj}, out.Items)
if validation != nil {
validation(t, 0, 2)
}
// limit, should get two more pages
out = &example.PodList{}
options = storage.ListOptions{
ResourceVersion: "0",
Predicate: pred(1, continueFromSecondItem),
Recursive: true,
}
if err := store.GetList(ctx, "/", options, out); err != nil {
t.Fatalf("Unable to get second page: %v", err)
}
if len(out.Continue) == 0 {
t.Fatalf("No continuation token set")
}
ExpectNoDiff(t, "incorrect second page", []example.Pod{*preset[1].storedObj}, out.Items)
if validation != nil {
validation(t, 1, 1)
}
continueFromThirdItem := out.Continue
out = &example.PodList{}
options = storage.ListOptions{
ResourceVersion: "0",
Predicate: pred(1, continueFromThirdItem),
Recursive: true,
}
if err := store.GetList(ctx, "/", options, out); err != nil {
t.Fatalf("Unable to get second page: %v", err)
}
if len(out.Continue) != 0 {
t.Fatalf("Unexpected continuation token set")
}
ExpectNoDiff(t, "incorrect third page", []example.Pod{*preset[2].storedObj}, out.Items)
if validation != nil {
validation(t, 1, 1)
}
}
func RunTestListPaginationRareObject(ctx context.Context, t *testing.T, store storage.Interface, validation CallsValidation) {
podCount := 1000
var pods []*example.Pod
for i := 0; i < podCount; i++ {
key := fmt.Sprintf("/one-level/pod-%d", i)
obj := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("pod-%d", i)}}
storedObj := &example.Pod{}
err := store.Create(ctx, key, obj, storedObj, 0)
if err != nil {
t.Fatalf("Set failed: %v", err)
}
pods = append(pods, storedObj)
}
out := &example.PodList{}
options := storage.ListOptions{
Predicate: storage.SelectionPredicate{
Limit: 1,
Label: labels.Everything(),
Field: fields.OneTermEqualSelector("metadata.name", "pod-999"),
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) {
pod := obj.(*example.Pod)
return nil, fields.Set{"metadata.name": pod.Name}, nil
},
},
Recursive: true,
}
if err := store.GetList(ctx, "/", options, out); err != nil {
t.Fatalf("Unable to get initial list: %v", err)
}
if len(out.Continue) != 0 {
t.Errorf("Unexpected continuation token set")
}
if len(out.Items) != 1 || !reflect.DeepEqual(&out.Items[0], pods[999]) {
t.Fatalf("Unexpected first page: %#v", out.Items)
}
if validation != nil {
validation(t, 1, uint64(podCount))
}
}
func RunTestListContinuationWithFilter(ctx context.Context, t *testing.T, store storage.Interface, validation CallsValidation) {
preset := []struct {
key string
obj *example.Pod
storedObj *example.Pod
}{
{
key: "/1",
obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}},
},
{
key: "/2",
obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}}, // this should not match
},
{
key: "/3",
obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}},
},
{
key: "/4",
obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}},
},
}
for i, ps := range preset {
preset[i].storedObj = &example.Pod{}
err := store.Create(ctx, ps.key, ps.obj, preset[i].storedObj, 0)
if err != nil {
t.Fatalf("Set failed: %v", err)
}
}
// the first list call should try to get 2 items from etcd (and only those items should be returned)
// the field selector should result in it reading 3 items via the transformer
// the chunking should result in 2 etcd Gets
// there should be a continueValue because there is more data
out := &example.PodList{}
pred := func(limit int64, continueValue string) storage.SelectionPredicate {
return storage.SelectionPredicate{
Limit: limit,
Continue: continueValue,
Label: labels.Everything(),
Field: fields.OneTermNotEqualSelector("metadata.name", "bar"),
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) {
pod := obj.(*example.Pod)
return nil, fields.Set{"metadata.name": pod.Name}, nil
},
}
}
options := storage.ListOptions{
ResourceVersion: "0",
Predicate: pred(2, ""),
Recursive: true,
}
if err := store.GetList(ctx, "/", options, out); err != nil {
t.Errorf("Unable to get initial list: %v", err)
}
if len(out.Continue) == 0 {
t.Errorf("No continuation token set")
}
ExpectNoDiff(t, "incorrect first page", []example.Pod{*preset[0].storedObj, *preset[2].storedObj}, out.Items)
if validation != nil {
validation(t, 2, 3)
}
// the rest of the test does not make sense if the previous call failed
if t.Failed() {
return
}
cont := out.Continue
// the second list call should try to get 2 more items from etcd
// but since there is only one item left, that is all we should get with no continueValue
// both read counters should be incremented for the singular calls they make in this case
out = &example.PodList{}
options = storage.ListOptions{
ResourceVersion: "0",
Predicate: pred(2, cont),
Recursive: true,
}
if err := store.GetList(ctx, "/", options, out); err != nil {
t.Errorf("Unable to get second page: %v", err)
}
if len(out.Continue) != 0 {
t.Errorf("Unexpected continuation token set")
}
ExpectNoDiff(t, "incorrect second page", []example.Pod{*preset[3].storedObj}, out.Items)
if validation != nil {
validation(t, 2, 1)
}
}
type PrefixTransformerModifier func(*PrefixTransformer) value.Transformer
type InterfaceWithPrefixTransformer interface {