Fill partial pages on the server rather than forcing client to

The etcd3 storage now attempts to fill partial pages to prevent clients
having to make more round trips (latency from server to etcd is lower
than client to server). The server makes repeated requests to etcd of
the current page size, then uses the filter function to eliminate any
matches. After this change the apiserver will always return full pages,
but we leave the language in place that clients must tolerate it.

Reduces tail latency of large filtered lists, such as viewing pods
assigned to a node.
This commit is contained in:
Clayton Coleman 2017-09-24 18:06:57 -04:00
parent 113889e72d
commit da7124e5e5
No known key found for this signature in database
GPG Key ID: 3D16906B4F1C5CB3
4 changed files with 464 additions and 106 deletions

View File

@ -396,11 +396,11 @@ func (s *store) GetToList(ctx context.Context, key string, resourceVersion strin
if err != nil {
return storage.NewInternalError(err.Error())
}
elems := []*elemForDecode{{
data: data,
rev: uint64(getResp.Kvs[0].ModRevision),
}}
if err := decodeList(elems, storage.SimpleFilter(pred), listPtr, s.codec, s.versioner); err != nil {
v, err := conversion.EnforcePtr(listPtr)
if err != nil || v.Kind() != reflect.Slice {
panic("need ptr to slice")
}
if err := appendListItem(v, data, uint64(getResp.Kvs[0].ModRevision), storage.SimpleFilter(pred), s.codec, s.versioner); err != nil {
return err
}
// update version with cluster level revision
@ -472,7 +472,14 @@ func (s *store) List(ctx context.Context, key, resourceVersion string, pred stor
if err != nil {
return err
}
key = path.Join(s.pathPrefix, key)
v, err := conversion.EnforcePtr(listPtr)
if err != nil || v.Kind() != reflect.Slice {
panic("need ptr to slice")
}
if s.pathPrefix != "" {
key = path.Join(s.pathPrefix, key)
}
// We need to make sure the key ended with "/" so that we only get children "directories".
// e.g. if we have key "/a", "/a/b", "/ab", getting keys with prefix "/a" will return all three,
// while with prefix "/a/" will return only "/a/b" which is the correct answer.
@ -481,81 +488,127 @@ func (s *store) List(ctx context.Context, key, resourceVersion string, pred stor
}
keyPrefix := key
filter := storage.SimpleFilter(pred)
// set the appropriate clientv3 options to filter the returned data set
var paging bool
options := make([]clientv3.OpOption, 0, 4)
if s.pagingEnabled && pred.Limit > 0 {
paging = true
options = append(options, clientv3.WithLimit(pred.Limit))
}
var returnedRV int64
switch {
case s.pagingEnabled && len(pred.Continue) > 0:
continueKey, continueRV, err := decodeContinue(pred.Continue, keyPrefix)
if err != nil {
return err
return apierrors.NewBadRequest(fmt.Sprintf("invalid continue token: %v", err))
}
options = append(options, clientv3.WithRange(clientv3.GetPrefixRangeEnd(key)))
if len(resourceVersion) > 0 && resourceVersion != "0" {
return apierrors.NewBadRequest("specifying resource version is not allowed when using continue")
}
rangeEnd := clientv3.GetPrefixRangeEnd(keyPrefix)
options = append(options, clientv3.WithRange(rangeEnd))
key = continueKey
options = append(options, clientv3.WithRev(continueRV))
returnedRV = continueRV
case len(resourceVersion) > 0:
fromRV, err := strconv.ParseInt(resourceVersion, 10, 64)
if err != nil {
return fmt.Errorf("invalid resource version: %v", err)
case s.pagingEnabled && pred.Limit > 0:
if len(resourceVersion) > 0 {
fromRV, err := strconv.ParseInt(resourceVersion, 10, 64)
if err != nil {
return apierrors.NewBadRequest(fmt.Sprintf("invalid resource version: %v", err))
}
if fromRV > 0 {
options = append(options, clientv3.WithRev(fromRV))
}
returnedRV = fromRV
}
options = append(options, clientv3.WithPrefix(), clientv3.WithRev(fromRV))
returnedRV = fromRV
rangeEnd := clientv3.GetPrefixRangeEnd(keyPrefix)
options = append(options, clientv3.WithRange(rangeEnd))
default:
if len(resourceVersion) > 0 {
fromRV, err := strconv.ParseInt(resourceVersion, 10, 64)
if err != nil {
return apierrors.NewBadRequest(fmt.Sprintf("invalid resource version: %v", err))
}
if fromRV > 0 {
options = append(options, clientv3.WithRev(fromRV))
}
returnedRV = fromRV
}
options = append(options, clientv3.WithPrefix())
}
getResp, err := s.client.KV.Get(ctx, key, options...)
if err != nil {
return interpretListError(err, len(pred.Continue) > 0)
}
elems := make([]*elemForDecode, 0, len(getResp.Kvs))
for _, kv := range getResp.Kvs {
data, _, err := s.transformer.TransformFromStorage(kv.Value, authenticatedDataString(kv.Key))
// loop until we have filled the requested limit from etcd or there are no more results
var lastKey []byte
var hasMore bool
for {
getResp, err := s.client.KV.Get(ctx, key, options...)
if err != nil {
utilruntime.HandleError(fmt.Errorf("unable to transform key %q: %v", kv.Key, err))
continue
return interpretListError(err, len(pred.Continue) > 0)
}
hasMore = getResp.More
if len(getResp.Kvs) == 0 && getResp.More {
return fmt.Errorf("no results were found, but etcd indicated there were more values remaining")
}
elems = append(elems, &elemForDecode{
data: data,
rev: uint64(kv.ModRevision),
})
// take items from the response until the bucket is full, filtering as we go
for _, kv := range getResp.Kvs {
if paging && int64(v.Len()) >= pred.Limit {
hasMore = true
break
}
lastKey = kv.Key
data, _, err := s.transformer.TransformFromStorage(kv.Value, authenticatedDataString(kv.Key))
if err != nil {
utilruntime.HandleError(fmt.Errorf("unable to transform key %q: %v", kv.Key, err))
continue
}
if err := appendListItem(v, data, uint64(kv.ModRevision), filter, s.codec, s.versioner); err != nil {
return err
}
}
// indicate to the client which resource version was returned
if returnedRV == 0 {
returnedRV = getResp.Header.Revision
}
// no more results remain or we didn't request paging
if !hasMore || !paging {
break
}
// we're paging but we have filled our bucket
if int64(v.Len()) >= pred.Limit {
break
}
key = string(lastKey) + "\x00"
}
if err := decodeList(elems, storage.SimpleFilter(pred), listPtr, s.codec, s.versioner); err != nil {
return err
}
// indicate to the client which resource version was returned
if returnedRV == 0 {
returnedRV = getResp.Header.Revision
}
switch {
case !getResp.More:
// no continuation
return s.versioner.UpdateList(listObj, uint64(returnedRV), "")
case len(getResp.Kvs) == 0:
return fmt.Errorf("no results were found, but etcd indicated there were more values")
default:
// instruct the client to begin querying from immediately after the last key we returned
// we never return a key that the client wouldn't be allowed to see
if hasMore {
// we want to start immediately after the last key
// TODO: this reveals info about certain keys
key := string(getResp.Kvs[len(getResp.Kvs)-1].Key)
next, err := encodeContinue(key+"\x00", keyPrefix, returnedRV)
next, err := encodeContinue(string(lastKey)+"\x00", keyPrefix, returnedRV)
if err != nil {
return err
}
return s.versioner.UpdateList(listObj, uint64(returnedRV), next)
}
// no continuation
return s.versioner.UpdateList(listObj, uint64(returnedRV), "")
}
// Watch implements storage.Interface.Watch.
@ -677,23 +730,16 @@ func decode(codec runtime.Codec, versioner storage.Versioner, value []byte, objP
return nil
}
// decodeList decodes a list of values into a list of objects, with resource version set to corresponding rev.
// On success, ListPtr would be set to the list of objects.
func decodeList(elems []*elemForDecode, filter storage.FilterFunc, listPtr interface{}, codec runtime.Codec, versioner storage.Versioner) error {
v, err := conversion.EnforcePtr(listPtr)
if err != nil || v.Kind() != reflect.Slice {
panic("need ptr to slice")
// appendListItem decodes and appends the object (if it passes filter) to v, which must be a slice.
func appendListItem(v reflect.Value, data []byte, rev uint64, filter storage.FilterFunc, codec runtime.Codec, versioner storage.Versioner) error {
obj, _, err := codec.Decode(data, nil, reflect.New(v.Type().Elem()).Interface().(runtime.Object))
if err != nil {
return err
}
for _, elem := range elems {
obj, _, err := codec.Decode(elem.data, nil, reflect.New(v.Type().Elem()).Interface().(runtime.Object))
if err != nil {
return err
}
// being unable to set the version does not prevent the object from being extracted
versioner.UpdateObject(obj, elem.rev)
if filter(obj) {
v.Set(reflect.Append(v, reflect.ValueOf(obj).Elem()))
}
// being unable to set the version does not prevent the object from being extracted
versioner.UpdateObject(obj, rev)
if filter(obj) {
v.Set(reflect.Append(v, reflect.ValueOf(obj).Elem()))
}
return nil
}

View File

@ -727,25 +727,44 @@ func TestList(t *testing.T) {
// | - test
// |
// - two-level/
// - 1/
// | - 1/
// | | - test
// | |
// | - 2/
// | - test
// |
// - z-level/
// - 3/
// | - test
// |
// - 2/
// - test
// - 3/
// - test-2
preset := []struct {
key string
obj *example.Pod
storedObj *example.Pod
}{{
key: "/one-level/test",
obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}},
}, {
key: "/two-level/1/test",
obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}},
}, {
key: "/two-level/2/test",
obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}},
}}
}{
{
key: "/one-level/test",
obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}},
},
{
key: "/two-level/1/test",
obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}},
},
{
key: "/two-level/2/test",
obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}},
},
{
key: "/z-level/3/test",
obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "fourth"}},
},
{
key: "/z-level/3/test-2",
obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}},
},
}
for i, ps := range preset {
preset[i].storedObj = &example.Pod{}
@ -763,110 +782,302 @@ func TestList(t *testing.T) {
t.Fatal(err)
}
getAttrs := func(obj runtime.Object) (labels.Set, fields.Set, bool, error) {
pod := obj.(*example.Pod)
return nil, fields.Set{"metadata.name": pod.Name}, pod.Initializers != nil, nil
}
tests := []struct {
name string
disablePaging bool
rv string
prefix string
pred storage.SelectionPredicate
expectedOut []*example.Pod
expectContinue bool
expectError bool
}{
{ // test List on existing key
{
name: "rejects invalid resource version",
prefix: "/",
pred: storage.Everything,
rv: "abc",
expectError: true,
},
{
name: "rejects resource version and continue token",
prefix: "/",
pred: storage.SelectionPredicate{
Label: labels.Everything(),
Field: fields.Everything(),
Limit: 1,
Continue: secondContinuation,
},
rv: "1",
expectError: true,
},
{
name: "test List on existing key",
prefix: "/one-level/",
pred: storage.Everything,
expectedOut: []*example.Pod{preset[0].storedObj},
},
{ // test List on non-existing key
{
name: "test List on non-existing key",
prefix: "/non-existing/",
pred: storage.Everything,
expectedOut: nil,
},
{ // test List with pod name matching
{
name: "test List with pod name matching",
prefix: "/one-level/",
pred: storage.SelectionPredicate{
Label: labels.Everything(),
Field: fields.ParseSelectorOrDie("metadata.name!=" + preset[0].storedObj.Name),
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, bool, error) {
pod := obj.(*example.Pod)
return nil, fields.Set{"metadata.name": pod.Name}, pod.Initializers != nil, nil
},
Field: fields.ParseSelectorOrDie("metadata.name!=foo"),
},
expectedOut: nil,
},
{ // test List with limit
{
name: "test List with limit",
prefix: "/two-level/",
pred: storage.SelectionPredicate{
Label: labels.Everything(),
Field: fields.Everything(),
Limit: 1,
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, bool, error) {
pod := obj.(*example.Pod)
return nil, fields.Set{"metadata.name": pod.Name}, pod.Initializers != nil, nil
},
},
expectedOut: []*example.Pod{preset[1].storedObj},
expectContinue: true,
},
{ // test List with limit when paging disabled
{
name: "test List with limit when paging disabled",
disablePaging: true,
prefix: "/two-level/",
pred: storage.SelectionPredicate{
Label: labels.Everything(),
Field: fields.Everything(),
Limit: 1,
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, bool, error) {
pod := obj.(*example.Pod)
return nil, fields.Set{"metadata.name": pod.Name}, pod.Initializers != nil, nil
},
},
expectedOut: []*example.Pod{preset[1].storedObj, preset[2].storedObj},
expectContinue: false,
},
{ // test List with pregenerated continue token
{
name: "test List with pregenerated continue token",
prefix: "/two-level/",
pred: storage.SelectionPredicate{
Label: labels.Everything(),
Field: fields.Everything(),
Limit: 1,
Continue: secondContinuation,
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, bool, error) {
pod := obj.(*example.Pod)
return nil, fields.Set{"metadata.name": pod.Name}, pod.Initializers != nil, nil
},
},
expectedOut: []*example.Pod{preset[2].storedObj},
},
{ // test List with multiple levels of directories and expect flattened result
{
name: "ignores resource version 0 for List with pregenerated continue token",
prefix: "/two-level/",
pred: storage.SelectionPredicate{
Label: labels.Everything(),
Field: fields.Everything(),
Limit: 1,
Continue: secondContinuation,
},
rv: "0",
expectedOut: []*example.Pod{preset[2].storedObj},
},
{
name: "test List with multiple levels of directories and expect flattened result",
prefix: "/two-level/",
pred: storage.Everything,
expectedOut: []*example.Pod{preset[1].storedObj, preset[2].storedObj},
},
{
name: "test List with filter returning only one item, ensure only a single page returned",
prefix: "/",
pred: storage.SelectionPredicate{
Field: fields.OneTermEqualSelector("metadata.name", "fourth"),
Label: labels.Everything(),
Limit: 1,
},
expectedOut: []*example.Pod{preset[3].storedObj},
expectContinue: true,
},
{
name: "test List with filter returning only one item, covers the entire list",
prefix: "/",
pred: storage.SelectionPredicate{
Field: fields.OneTermEqualSelector("metadata.name", "fourth"),
Label: labels.Everything(),
Limit: 2,
},
expectedOut: []*example.Pod{preset[3].storedObj},
expectContinue: false,
},
{
name: "test List with filter returning only one item, covers the entire list, with resource version 0",
prefix: "/",
pred: storage.SelectionPredicate{
Field: fields.OneTermEqualSelector("metadata.name", "fourth"),
Label: labels.Everything(),
Limit: 2,
},
rv: "0",
expectedOut: []*example.Pod{preset[3].storedObj},
expectContinue: false,
},
{
name: "test List with filter returning two items, more pages possible",
prefix: "/",
pred: storage.SelectionPredicate{
Field: fields.OneTermEqualSelector("metadata.name", "foo"),
Label: labels.Everything(),
Limit: 2,
},
expectContinue: true,
expectedOut: []*example.Pod{preset[0].storedObj, preset[1].storedObj},
},
{
name: "filter returns two items split across multiple pages",
prefix: "/",
pred: storage.SelectionPredicate{
Field: fields.OneTermEqualSelector("metadata.name", "bar"),
Label: labels.Everything(),
Limit: 2,
},
expectedOut: []*example.Pod{preset[2].storedObj, preset[4].storedObj},
},
{
name: "filter returns one item for last page, ends on last item, not full",
prefix: "/",
pred: storage.SelectionPredicate{
Field: fields.OneTermEqualSelector("metadata.name", "bar"),
Label: labels.Everything(),
Limit: 2,
Continue: encodeContinueOrDie("meta.k8s.io/v1", int64(continueRV), "z-level/3"),
},
expectedOut: []*example.Pod{preset[4].storedObj},
},
{
name: "filter returns one item for last page, starts on last item, full",
prefix: "/",
pred: storage.SelectionPredicate{
Field: fields.OneTermEqualSelector("metadata.name", "bar"),
Label: labels.Everything(),
Limit: 1,
Continue: encodeContinueOrDie("meta.k8s.io/v1", int64(continueRV), "z-level/3/test-2"),
},
expectedOut: []*example.Pod{preset[4].storedObj},
},
{
name: "filter returns one item for last page, starts on last item, partial page",
prefix: "/",
pred: storage.SelectionPredicate{
Field: fields.OneTermEqualSelector("metadata.name", "bar"),
Label: labels.Everything(),
Limit: 2,
Continue: encodeContinueOrDie("meta.k8s.io/v1", int64(continueRV), "z-level/3/test-2"),
},
expectedOut: []*example.Pod{preset[4].storedObj},
},
{
name: "filter returns two items, page size equal to total list size",
prefix: "/",
pred: storage.SelectionPredicate{
Field: fields.OneTermEqualSelector("metadata.name", "bar"),
Label: labels.Everything(),
Limit: 5,
},
expectedOut: []*example.Pod{preset[2].storedObj, preset[4].storedObj},
},
{
name: "filter returns one item, page size equal to total list size",
prefix: "/",
pred: storage.SelectionPredicate{
Field: fields.OneTermEqualSelector("metadata.name", "fourth"),
Label: labels.Everything(),
Limit: 5,
},
expectedOut: []*example.Pod{preset[3].storedObj},
},
}
for i, tt := range tests {
for _, tt := range tests {
if tt.pred.GetAttrs == nil {
tt.pred.GetAttrs = getAttrs
}
out := &example.PodList{}
var err error
if tt.disablePaging {
err = disablePagingStore.List(ctx, tt.prefix, "0", tt.pred, out)
err = disablePagingStore.List(ctx, tt.prefix, tt.rv, tt.pred, out)
} else {
err = store.List(ctx, tt.prefix, "0", tt.pred, out)
err = store.List(ctx, tt.prefix, tt.rv, tt.pred, out)
}
if (err != nil) != tt.expectError {
t.Errorf("(%s): List failed: %v", tt.name, err)
}
if err != nil {
t.Fatalf("#%d: List failed: %v", i, err)
continue
}
if (len(out.Continue) > 0) != tt.expectContinue {
t.Errorf("#%d: unexpected continue token: %v", i, out.Continue)
t.Errorf("(%s): unexpected continue token: %q", tt.name, out.Continue)
}
if len(tt.expectedOut) != len(out.Items) {
t.Errorf("#%d: length of list want=%d, get=%d", i, len(tt.expectedOut), len(out.Items))
t.Errorf("(%s): length of list want=%d, got=%d", tt.name, len(tt.expectedOut), len(out.Items))
continue
}
for j, wantPod := range tt.expectedOut {
getPod := &out.Items[j]
if !reflect.DeepEqual(wantPod, getPod) {
t.Errorf("#%d: pod want=%#v, get=%#v", i, wantPod, getPod)
t.Errorf("(%s): pod want=%#v, got=%#v", tt.name, wantPod, getPod)
}
}
}
}
func TestListContinuation(t *testing.T) {
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer cluster.Terminate(t)
store := newStore(cluster.RandClient(), false, true, codec, "", prefixTransformer{prefix: []byte(defaultTestPrefix)})
ctx := context.Background()
// Setup storage with the following structure:
// /
// - one-level/
// | - test
// |
// - two-level/
// - 1/
// | - test
// |
// - 2/
// - test
//
preset := []struct {
key string
obj *example.Pod
storedObj *example.Pod
}{
{
key: "/one-level/test",
obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}},
},
{
key: "/two-level/1/test",
obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}},
},
{
key: "/two-level/2/test",
obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}},
},
}
for i, ps := range preset {
preset[i].storedObj = &example.Pod{}
err := store.Create(ctx, ps.key, ps.obj, preset[i].storedObj, 0)
if err != nil {
t.Fatalf("Set failed: %v", err)
}
}
// test continuations
out := &example.PodList{}

View File

@ -9,6 +9,7 @@ go_library(
name = "go_default_library",
srcs = [
"aggregator.go",
"chunking.go",
"custom_resource_definition.go",
"etcd_failure.go",
"framework.go",
@ -58,6 +59,7 @@ go_library(
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
"//vendor/k8s.io/client-go/util/cert:go_default_library",
"//vendor/k8s.io/client-go/util/retry:go_default_library",
"//vendor/k8s.io/client-go/util/workqueue:go_default_library",
"//vendor/k8s.io/kube-aggregator/pkg/apis/apiregistration/v1beta1:go_default_library",
"//vendor/k8s.io/sample-apiserver/pkg/apis/wardle/v1alpha1:go_default_library",
],

View File

@ -0,0 +1,99 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package apimachinery
import (
"fmt"
"math/rand"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/util/workqueue"
"k8s.io/kubernetes/test/e2e/framework"
)
const numberOfTotalResources = 400
var _ = SIGDescribe("Servers with support for API chunking", func() {
f := framework.NewDefaultFramework("chunking")
It("should return chunks of results for list calls", func() {
ns := f.Namespace.Name
c := f.ClientSet
client := c.Core().PodTemplates(ns)
By("creating a large number of resources")
workqueue.Parallelize(20, numberOfTotalResources, func(i int) {
for tries := 3; tries >= 0; tries-- {
_, err := client.Create(&v1.PodTemplate{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("template-%04d", i),
},
Template: v1.PodTemplateSpec{
Spec: v1.PodSpec{
Containers: []v1.Container{
{Name: "test", Image: "test2"},
},
},
},
})
if err == nil {
return
}
framework.Logf("Got an error creating template %d: %v", i, err)
}
Fail("Unable to create template %d, exiting", i)
})
By("retrieving those results in paged fashion several times")
for i := 0; i < 3; i++ {
opts := metav1.ListOptions{}
found := 0
var lastRV string
for {
opts.Limit = int64(rand.Int31n(numberOfTotalResources/10) + 1)
list, err := client.List(opts)
Expect(err).ToNot(HaveOccurred())
framework.Logf("Retrieved %d/%d results with rv %s and continue %s", len(list.Items), opts.Limit, list.ResourceVersion, list.Continue)
if len(lastRV) == 0 {
lastRV = list.ResourceVersion
}
if lastRV != list.ResourceVersion {
Expect(list.ResourceVersion).To(Equal(lastRV))
}
for _, item := range list.Items {
Expect(item.Name).To(Equal(fmt.Sprintf("template-%04d", found)))
found++
}
if len(list.Continue) == 0 {
break
}
opts.Continue = list.Continue
}
Expect(found).To(BeNumerically("==", numberOfTotalResources))
}
By("retrieving those results all at once")
list, err := client.List(metav1.ListOptions{Limit: numberOfTotalResources + 1})
Expect(err).ToNot(HaveOccurred())
Expect(list.Items).To(HaveLen(numberOfTotalResources))
})
})