From da7124e5e5c0385dd5bcfc72ef035effc7708913 Mon Sep 17 00:00:00 2001 From: Clayton Coleman Date: Sun, 24 Sep 2017 18:06:57 -0400 Subject: [PATCH] Fill partial pages on the server rather than forcing client to The etcd3 storage now attempts to fill partial pages to prevent clients having to make more round trips (latency from server to etcd is lower than client to server). The server makes repeated requests to etcd of the current page size, then uses the filter function to eliminate any matches. After this change the apiserver will always return full pages, but we leave the language in place that clients must tolerate it. Reduces tail latency of large filtered lists, such as viewing pods assigned to a node. --- .../apiserver/pkg/storage/etcd3/store.go | 170 ++++++---- .../apiserver/pkg/storage/etcd3/store_test.go | 299 +++++++++++++++--- test/e2e/apimachinery/BUILD | 2 + test/e2e/apimachinery/chunking.go | 99 ++++++ 4 files changed, 464 insertions(+), 106 deletions(-) create mode 100644 test/e2e/apimachinery/chunking.go diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go index e5a67f043ec..1b4bf0f9fec 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go @@ -396,11 +396,11 @@ func (s *store) GetToList(ctx context.Context, key string, resourceVersion strin if err != nil { return storage.NewInternalError(err.Error()) } - elems := []*elemForDecode{{ - data: data, - rev: uint64(getResp.Kvs[0].ModRevision), - }} - if err := decodeList(elems, storage.SimpleFilter(pred), listPtr, s.codec, s.versioner); err != nil { + v, err := conversion.EnforcePtr(listPtr) + if err != nil || v.Kind() != reflect.Slice { + panic("need ptr to slice") + } + if err := appendListItem(v, data, uint64(getResp.Kvs[0].ModRevision), storage.SimpleFilter(pred), s.codec, s.versioner); err != nil { return err } // update version with cluster level revision @@ -472,7 +472,14 @@ func (s *store) List(ctx context.Context, key, resourceVersion string, pred stor if err != nil { return err } - key = path.Join(s.pathPrefix, key) + v, err := conversion.EnforcePtr(listPtr) + if err != nil || v.Kind() != reflect.Slice { + panic("need ptr to slice") + } + + if s.pathPrefix != "" { + key = path.Join(s.pathPrefix, key) + } // We need to make sure the key ended with "/" so that we only get children "directories". // e.g. if we have key "/a", "/a/b", "/ab", getting keys with prefix "/a" will return all three, // while with prefix "/a/" will return only "/a/b" which is the correct answer. @@ -481,81 +488,127 @@ func (s *store) List(ctx context.Context, key, resourceVersion string, pred stor } keyPrefix := key + filter := storage.SimpleFilter(pred) + // set the appropriate clientv3 options to filter the returned data set + var paging bool options := make([]clientv3.OpOption, 0, 4) if s.pagingEnabled && pred.Limit > 0 { + paging = true options = append(options, clientv3.WithLimit(pred.Limit)) } + var returnedRV int64 switch { case s.pagingEnabled && len(pred.Continue) > 0: continueKey, continueRV, err := decodeContinue(pred.Continue, keyPrefix) if err != nil { - return err + return apierrors.NewBadRequest(fmt.Sprintf("invalid continue token: %v", err)) } - options = append(options, clientv3.WithRange(clientv3.GetPrefixRangeEnd(key))) + if len(resourceVersion) > 0 && resourceVersion != "0" { + return apierrors.NewBadRequest("specifying resource version is not allowed when using continue") + } + + rangeEnd := clientv3.GetPrefixRangeEnd(keyPrefix) + options = append(options, clientv3.WithRange(rangeEnd)) key = continueKey options = append(options, clientv3.WithRev(continueRV)) returnedRV = continueRV - case len(resourceVersion) > 0: - fromRV, err := strconv.ParseInt(resourceVersion, 10, 64) - if err != nil { - return fmt.Errorf("invalid resource version: %v", err) + case s.pagingEnabled && pred.Limit > 0: + if len(resourceVersion) > 0 { + fromRV, err := strconv.ParseInt(resourceVersion, 10, 64) + if err != nil { + return apierrors.NewBadRequest(fmt.Sprintf("invalid resource version: %v", err)) + } + if fromRV > 0 { + options = append(options, clientv3.WithRev(fromRV)) + } + returnedRV = fromRV } - options = append(options, clientv3.WithPrefix(), clientv3.WithRev(fromRV)) - returnedRV = fromRV + rangeEnd := clientv3.GetPrefixRangeEnd(keyPrefix) + options = append(options, clientv3.WithRange(rangeEnd)) default: + if len(resourceVersion) > 0 { + fromRV, err := strconv.ParseInt(resourceVersion, 10, 64) + if err != nil { + return apierrors.NewBadRequest(fmt.Sprintf("invalid resource version: %v", err)) + } + if fromRV > 0 { + options = append(options, clientv3.WithRev(fromRV)) + } + returnedRV = fromRV + } + options = append(options, clientv3.WithPrefix()) } - getResp, err := s.client.KV.Get(ctx, key, options...) - if err != nil { - return interpretListError(err, len(pred.Continue) > 0) - } - - elems := make([]*elemForDecode, 0, len(getResp.Kvs)) - for _, kv := range getResp.Kvs { - data, _, err := s.transformer.TransformFromStorage(kv.Value, authenticatedDataString(kv.Key)) + // loop until we have filled the requested limit from etcd or there are no more results + var lastKey []byte + var hasMore bool + for { + getResp, err := s.client.KV.Get(ctx, key, options...) if err != nil { - utilruntime.HandleError(fmt.Errorf("unable to transform key %q: %v", kv.Key, err)) - continue + return interpretListError(err, len(pred.Continue) > 0) + } + hasMore = getResp.More + + if len(getResp.Kvs) == 0 && getResp.More { + return fmt.Errorf("no results were found, but etcd indicated there were more values remaining") } - elems = append(elems, &elemForDecode{ - data: data, - rev: uint64(kv.ModRevision), - }) + // take items from the response until the bucket is full, filtering as we go + for _, kv := range getResp.Kvs { + if paging && int64(v.Len()) >= pred.Limit { + hasMore = true + break + } + lastKey = kv.Key + + data, _, err := s.transformer.TransformFromStorage(kv.Value, authenticatedDataString(kv.Key)) + if err != nil { + utilruntime.HandleError(fmt.Errorf("unable to transform key %q: %v", kv.Key, err)) + continue + } + + if err := appendListItem(v, data, uint64(kv.ModRevision), filter, s.codec, s.versioner); err != nil { + return err + } + } + + // indicate to the client which resource version was returned + if returnedRV == 0 { + returnedRV = getResp.Header.Revision + } + + // no more results remain or we didn't request paging + if !hasMore || !paging { + break + } + // we're paging but we have filled our bucket + if int64(v.Len()) >= pred.Limit { + break + } + key = string(lastKey) + "\x00" } - if err := decodeList(elems, storage.SimpleFilter(pred), listPtr, s.codec, s.versioner); err != nil { - return err - } - - // indicate to the client which resource version was returned - if returnedRV == 0 { - returnedRV = getResp.Header.Revision - } - switch { - case !getResp.More: - // no continuation - return s.versioner.UpdateList(listObj, uint64(returnedRV), "") - case len(getResp.Kvs) == 0: - return fmt.Errorf("no results were found, but etcd indicated there were more values") - default: + // instruct the client to begin querying from immediately after the last key we returned + // we never return a key that the client wouldn't be allowed to see + if hasMore { // we want to start immediately after the last key - // TODO: this reveals info about certain keys - key := string(getResp.Kvs[len(getResp.Kvs)-1].Key) - next, err := encodeContinue(key+"\x00", keyPrefix, returnedRV) + next, err := encodeContinue(string(lastKey)+"\x00", keyPrefix, returnedRV) if err != nil { return err } return s.versioner.UpdateList(listObj, uint64(returnedRV), next) } + + // no continuation + return s.versioner.UpdateList(listObj, uint64(returnedRV), "") } // Watch implements storage.Interface.Watch. @@ -677,23 +730,16 @@ func decode(codec runtime.Codec, versioner storage.Versioner, value []byte, objP return nil } -// decodeList decodes a list of values into a list of objects, with resource version set to corresponding rev. -// On success, ListPtr would be set to the list of objects. -func decodeList(elems []*elemForDecode, filter storage.FilterFunc, listPtr interface{}, codec runtime.Codec, versioner storage.Versioner) error { - v, err := conversion.EnforcePtr(listPtr) - if err != nil || v.Kind() != reflect.Slice { - panic("need ptr to slice") +// appendListItem decodes and appends the object (if it passes filter) to v, which must be a slice. +func appendListItem(v reflect.Value, data []byte, rev uint64, filter storage.FilterFunc, codec runtime.Codec, versioner storage.Versioner) error { + obj, _, err := codec.Decode(data, nil, reflect.New(v.Type().Elem()).Interface().(runtime.Object)) + if err != nil { + return err } - for _, elem := range elems { - obj, _, err := codec.Decode(elem.data, nil, reflect.New(v.Type().Elem()).Interface().(runtime.Object)) - if err != nil { - return err - } - // being unable to set the version does not prevent the object from being extracted - versioner.UpdateObject(obj, elem.rev) - if filter(obj) { - v.Set(reflect.Append(v, reflect.ValueOf(obj).Elem())) - } + // being unable to set the version does not prevent the object from being extracted + versioner.UpdateObject(obj, rev) + if filter(obj) { + v.Set(reflect.Append(v, reflect.ValueOf(obj).Elem())) } return nil } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go index 6ee72431d9c..7e7320cfba8 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go @@ -727,25 +727,44 @@ func TestList(t *testing.T) { // | - test // | // - two-level/ - // - 1/ + // | - 1/ + // | | - test + // | | + // | - 2/ + // | - test + // | + // - z-level/ + // - 3/ // | - test // | - // - 2/ - // - test + // - 3/ + // - test-2 preset := []struct { key string obj *example.Pod storedObj *example.Pod - }{{ - key: "/one-level/test", - obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, - }, { - key: "/two-level/1/test", - obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, - }, { - key: "/two-level/2/test", - obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}}, - }} + }{ + { + key: "/one-level/test", + obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, + }, + { + key: "/two-level/1/test", + obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, + }, + { + key: "/two-level/2/test", + obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}}, + }, + { + key: "/z-level/3/test", + obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "fourth"}}, + }, + { + key: "/z-level/3/test-2", + obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}}, + }, + } for i, ps := range preset { preset[i].storedObj = &example.Pod{} @@ -763,110 +782,302 @@ func TestList(t *testing.T) { t.Fatal(err) } + getAttrs := func(obj runtime.Object) (labels.Set, fields.Set, bool, error) { + pod := obj.(*example.Pod) + return nil, fields.Set{"metadata.name": pod.Name}, pod.Initializers != nil, nil + } + tests := []struct { + name string disablePaging bool + rv string prefix string pred storage.SelectionPredicate expectedOut []*example.Pod expectContinue bool + expectError bool }{ - { // test List on existing key + { + name: "rejects invalid resource version", + prefix: "/", + pred: storage.Everything, + rv: "abc", + expectError: true, + }, + { + name: "rejects resource version and continue token", + prefix: "/", + pred: storage.SelectionPredicate{ + Label: labels.Everything(), + Field: fields.Everything(), + Limit: 1, + Continue: secondContinuation, + }, + rv: "1", + expectError: true, + }, + { + name: "test List on existing key", prefix: "/one-level/", pred: storage.Everything, expectedOut: []*example.Pod{preset[0].storedObj}, }, - { // test List on non-existing key + { + name: "test List on non-existing key", prefix: "/non-existing/", pred: storage.Everything, expectedOut: nil, }, - { // test List with pod name matching + { + name: "test List with pod name matching", prefix: "/one-level/", pred: storage.SelectionPredicate{ Label: labels.Everything(), - Field: fields.ParseSelectorOrDie("metadata.name!=" + preset[0].storedObj.Name), - GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, bool, error) { - pod := obj.(*example.Pod) - return nil, fields.Set{"metadata.name": pod.Name}, pod.Initializers != nil, nil - }, + Field: fields.ParseSelectorOrDie("metadata.name!=foo"), }, expectedOut: nil, }, - { // test List with limit + { + name: "test List with limit", prefix: "/two-level/", pred: storage.SelectionPredicate{ Label: labels.Everything(), Field: fields.Everything(), Limit: 1, - GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, bool, error) { - pod := obj.(*example.Pod) - return nil, fields.Set{"metadata.name": pod.Name}, pod.Initializers != nil, nil - }, }, expectedOut: []*example.Pod{preset[1].storedObj}, expectContinue: true, }, - { // test List with limit when paging disabled + { + name: "test List with limit when paging disabled", disablePaging: true, prefix: "/two-level/", pred: storage.SelectionPredicate{ Label: labels.Everything(), Field: fields.Everything(), Limit: 1, - GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, bool, error) { - pod := obj.(*example.Pod) - return nil, fields.Set{"metadata.name": pod.Name}, pod.Initializers != nil, nil - }, }, expectedOut: []*example.Pod{preset[1].storedObj, preset[2].storedObj}, expectContinue: false, }, - { // test List with pregenerated continue token + { + name: "test List with pregenerated continue token", prefix: "/two-level/", pred: storage.SelectionPredicate{ Label: labels.Everything(), Field: fields.Everything(), Limit: 1, Continue: secondContinuation, - GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, bool, error) { - pod := obj.(*example.Pod) - return nil, fields.Set{"metadata.name": pod.Name}, pod.Initializers != nil, nil - }, }, expectedOut: []*example.Pod{preset[2].storedObj}, }, - { // test List with multiple levels of directories and expect flattened result + { + name: "ignores resource version 0 for List with pregenerated continue token", + prefix: "/two-level/", + pred: storage.SelectionPredicate{ + Label: labels.Everything(), + Field: fields.Everything(), + Limit: 1, + Continue: secondContinuation, + }, + rv: "0", + expectedOut: []*example.Pod{preset[2].storedObj}, + }, + { + name: "test List with multiple levels of directories and expect flattened result", prefix: "/two-level/", pred: storage.Everything, expectedOut: []*example.Pod{preset[1].storedObj, preset[2].storedObj}, }, + { + name: "test List with filter returning only one item, ensure only a single page returned", + prefix: "/", + pred: storage.SelectionPredicate{ + Field: fields.OneTermEqualSelector("metadata.name", "fourth"), + Label: labels.Everything(), + Limit: 1, + }, + expectedOut: []*example.Pod{preset[3].storedObj}, + expectContinue: true, + }, + { + name: "test List with filter returning only one item, covers the entire list", + prefix: "/", + pred: storage.SelectionPredicate{ + Field: fields.OneTermEqualSelector("metadata.name", "fourth"), + Label: labels.Everything(), + Limit: 2, + }, + expectedOut: []*example.Pod{preset[3].storedObj}, + expectContinue: false, + }, + { + name: "test List with filter returning only one item, covers the entire list, with resource version 0", + prefix: "/", + pred: storage.SelectionPredicate{ + Field: fields.OneTermEqualSelector("metadata.name", "fourth"), + Label: labels.Everything(), + Limit: 2, + }, + rv: "0", + expectedOut: []*example.Pod{preset[3].storedObj}, + expectContinue: false, + }, + { + name: "test List with filter returning two items, more pages possible", + prefix: "/", + pred: storage.SelectionPredicate{ + Field: fields.OneTermEqualSelector("metadata.name", "foo"), + Label: labels.Everything(), + Limit: 2, + }, + expectContinue: true, + expectedOut: []*example.Pod{preset[0].storedObj, preset[1].storedObj}, + }, + { + name: "filter returns two items split across multiple pages", + prefix: "/", + pred: storage.SelectionPredicate{ + Field: fields.OneTermEqualSelector("metadata.name", "bar"), + Label: labels.Everything(), + Limit: 2, + }, + expectedOut: []*example.Pod{preset[2].storedObj, preset[4].storedObj}, + }, + { + name: "filter returns one item for last page, ends on last item, not full", + prefix: "/", + pred: storage.SelectionPredicate{ + Field: fields.OneTermEqualSelector("metadata.name", "bar"), + Label: labels.Everything(), + Limit: 2, + Continue: encodeContinueOrDie("meta.k8s.io/v1", int64(continueRV), "z-level/3"), + }, + expectedOut: []*example.Pod{preset[4].storedObj}, + }, + { + name: "filter returns one item for last page, starts on last item, full", + prefix: "/", + pred: storage.SelectionPredicate{ + Field: fields.OneTermEqualSelector("metadata.name", "bar"), + Label: labels.Everything(), + Limit: 1, + Continue: encodeContinueOrDie("meta.k8s.io/v1", int64(continueRV), "z-level/3/test-2"), + }, + expectedOut: []*example.Pod{preset[4].storedObj}, + }, + { + name: "filter returns one item for last page, starts on last item, partial page", + prefix: "/", + pred: storage.SelectionPredicate{ + Field: fields.OneTermEqualSelector("metadata.name", "bar"), + Label: labels.Everything(), + Limit: 2, + Continue: encodeContinueOrDie("meta.k8s.io/v1", int64(continueRV), "z-level/3/test-2"), + }, + expectedOut: []*example.Pod{preset[4].storedObj}, + }, + { + name: "filter returns two items, page size equal to total list size", + prefix: "/", + pred: storage.SelectionPredicate{ + Field: fields.OneTermEqualSelector("metadata.name", "bar"), + Label: labels.Everything(), + Limit: 5, + }, + expectedOut: []*example.Pod{preset[2].storedObj, preset[4].storedObj}, + }, + { + name: "filter returns one item, page size equal to total list size", + prefix: "/", + pred: storage.SelectionPredicate{ + Field: fields.OneTermEqualSelector("metadata.name", "fourth"), + Label: labels.Everything(), + Limit: 5, + }, + expectedOut: []*example.Pod{preset[3].storedObj}, + }, } - for i, tt := range tests { + for _, tt := range tests { + if tt.pred.GetAttrs == nil { + tt.pred.GetAttrs = getAttrs + } + out := &example.PodList{} var err error if tt.disablePaging { - err = disablePagingStore.List(ctx, tt.prefix, "0", tt.pred, out) + err = disablePagingStore.List(ctx, tt.prefix, tt.rv, tt.pred, out) } else { - err = store.List(ctx, tt.prefix, "0", tt.pred, out) + err = store.List(ctx, tt.prefix, tt.rv, tt.pred, out) + } + if (err != nil) != tt.expectError { + t.Errorf("(%s): List failed: %v", tt.name, err) } if err != nil { - t.Fatalf("#%d: List failed: %v", i, err) + continue } if (len(out.Continue) > 0) != tt.expectContinue { - t.Errorf("#%d: unexpected continue token: %v", i, out.Continue) + t.Errorf("(%s): unexpected continue token: %q", tt.name, out.Continue) } if len(tt.expectedOut) != len(out.Items) { - t.Errorf("#%d: length of list want=%d, get=%d", i, len(tt.expectedOut), len(out.Items)) + t.Errorf("(%s): length of list want=%d, got=%d", tt.name, len(tt.expectedOut), len(out.Items)) continue } for j, wantPod := range tt.expectedOut { getPod := &out.Items[j] if !reflect.DeepEqual(wantPod, getPod) { - t.Errorf("#%d: pod want=%#v, get=%#v", i, wantPod, getPod) + t.Errorf("(%s): pod want=%#v, got=%#v", tt.name, wantPod, getPod) } } } +} + +func TestListContinuation(t *testing.T) { + codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) + cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + defer cluster.Terminate(t) + store := newStore(cluster.RandClient(), false, true, codec, "", prefixTransformer{prefix: []byte(defaultTestPrefix)}) + ctx := context.Background() + + // Setup storage with the following structure: + // / + // - one-level/ + // | - test + // | + // - two-level/ + // - 1/ + // | - test + // | + // - 2/ + // - test + // + preset := []struct { + key string + obj *example.Pod + storedObj *example.Pod + }{ + { + key: "/one-level/test", + obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, + }, + { + key: "/two-level/1/test", + obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, + }, + { + key: "/two-level/2/test", + obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}}, + }, + } + + for i, ps := range preset { + preset[i].storedObj = &example.Pod{} + err := store.Create(ctx, ps.key, ps.obj, preset[i].storedObj, 0) + if err != nil { + t.Fatalf("Set failed: %v", err) + } + } // test continuations out := &example.PodList{} diff --git a/test/e2e/apimachinery/BUILD b/test/e2e/apimachinery/BUILD index 115fcf5c7cb..a96b152e380 100644 --- a/test/e2e/apimachinery/BUILD +++ b/test/e2e/apimachinery/BUILD @@ -9,6 +9,7 @@ go_library( name = "go_default_library", srcs = [ "aggregator.go", + "chunking.go", "custom_resource_definition.go", "etcd_failure.go", "framework.go", @@ -58,6 +59,7 @@ go_library( "//vendor/k8s.io/client-go/kubernetes:go_default_library", "//vendor/k8s.io/client-go/util/cert:go_default_library", "//vendor/k8s.io/client-go/util/retry:go_default_library", + "//vendor/k8s.io/client-go/util/workqueue:go_default_library", "//vendor/k8s.io/kube-aggregator/pkg/apis/apiregistration/v1beta1:go_default_library", "//vendor/k8s.io/sample-apiserver/pkg/apis/wardle/v1alpha1:go_default_library", ], diff --git a/test/e2e/apimachinery/chunking.go b/test/e2e/apimachinery/chunking.go new file mode 100644 index 00000000000..3892ad353be --- /dev/null +++ b/test/e2e/apimachinery/chunking.go @@ -0,0 +1,99 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package apimachinery + +import ( + "fmt" + "math/rand" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + + "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/util/workqueue" + "k8s.io/kubernetes/test/e2e/framework" +) + +const numberOfTotalResources = 400 + +var _ = SIGDescribe("Servers with support for API chunking", func() { + f := framework.NewDefaultFramework("chunking") + + It("should return chunks of results for list calls", func() { + ns := f.Namespace.Name + c := f.ClientSet + client := c.Core().PodTemplates(ns) + + By("creating a large number of resources") + workqueue.Parallelize(20, numberOfTotalResources, func(i int) { + for tries := 3; tries >= 0; tries-- { + _, err := client.Create(&v1.PodTemplate{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("template-%04d", i), + }, + Template: v1.PodTemplateSpec{ + Spec: v1.PodSpec{ + Containers: []v1.Container{ + {Name: "test", Image: "test2"}, + }, + }, + }, + }) + if err == nil { + return + } + framework.Logf("Got an error creating template %d: %v", i, err) + } + Fail("Unable to create template %d, exiting", i) + }) + + By("retrieving those results in paged fashion several times") + for i := 0; i < 3; i++ { + opts := metav1.ListOptions{} + found := 0 + var lastRV string + for { + opts.Limit = int64(rand.Int31n(numberOfTotalResources/10) + 1) + list, err := client.List(opts) + Expect(err).ToNot(HaveOccurred()) + framework.Logf("Retrieved %d/%d results with rv %s and continue %s", len(list.Items), opts.Limit, list.ResourceVersion, list.Continue) + + if len(lastRV) == 0 { + lastRV = list.ResourceVersion + } + if lastRV != list.ResourceVersion { + Expect(list.ResourceVersion).To(Equal(lastRV)) + } + for _, item := range list.Items { + Expect(item.Name).To(Equal(fmt.Sprintf("template-%04d", found))) + found++ + } + if len(list.Continue) == 0 { + break + } + opts.Continue = list.Continue + } + Expect(found).To(BeNumerically("==", numberOfTotalResources)) + } + + By("retrieving those results all at once") + list, err := client.List(metav1.ListOptions{Limit: numberOfTotalResources + 1}) + Expect(err).ToNot(HaveOccurred()) + Expect(list.Items).To(HaveLen(numberOfTotalResources)) + }) +})