diff --git a/pkg/features/kube_features.go b/pkg/features/kube_features.go index 35a90a1ac25..7033678cc69 100644 --- a/pkg/features/kube_features.go +++ b/pkg/features/kube_features.go @@ -201,7 +201,7 @@ var defaultKubernetesFeatureGates = map[utilfeature.Feature]utilfeature.FeatureS genericfeatures.AdvancedAuditing: {Default: true, PreRelease: utilfeature.Beta}, genericfeatures.APIResponseCompression: {Default: false, PreRelease: utilfeature.Alpha}, genericfeatures.Initializers: {Default: false, PreRelease: utilfeature.Alpha}, - genericfeatures.APIListChunking: {Default: false, PreRelease: utilfeature.Alpha}, + genericfeatures.APIListChunking: {Default: true, PreRelease: utilfeature.Beta}, // inherited features from apiextensions-apiserver, relisted here to get a conflict if it is changed // unintentionally on either side: diff --git a/staging/src/k8s.io/apiserver/pkg/features/kube_features.go b/staging/src/k8s.io/apiserver/pkg/features/kube_features.go index d5b52fd8758..57bab8b0024 100644 --- a/staging/src/k8s.io/apiserver/pkg/features/kube_features.go +++ b/staging/src/k8s.io/apiserver/pkg/features/kube_features.go @@ -58,6 +58,7 @@ const ( // owner: @smarterclayton // alpha: v1.8 + // beta: v1.9 // // Allow API clients to retrieve resource lists in chunks rather than // all at once. @@ -76,5 +77,5 @@ var defaultKubernetesFeatureGates = map[utilfeature.Feature]utilfeature.FeatureS AdvancedAuditing: {Default: true, PreRelease: utilfeature.Beta}, APIResponseCompression: {Default: false, PreRelease: utilfeature.Alpha}, Initializers: {Default: false, PreRelease: utilfeature.Alpha}, - APIListChunking: {Default: false, PreRelease: utilfeature.Alpha}, + APIListChunking: {Default: true, PreRelease: utilfeature.Beta}, } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher.go index 6d0b1040e99..66bb1912b6a 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher.go @@ -468,10 +468,15 @@ func (c *Cacher) GetToList(ctx context.Context, key string, resourceVersion stri // Implements storage.Interface. func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, pred SelectionPredicate, listObj runtime.Object) error { pagingEnabled := utilfeature.DefaultFeatureGate.Enabled(features.APIListChunking) - if resourceVersion == "" || (pagingEnabled && (len(pred.Continue) > 0 || pred.Limit > 0)) { + hasContinuation := pagingEnabled && len(pred.Continue) > 0 + hasLimit := pagingEnabled && pred.Limit > 0 && resourceVersion != "0" + if resourceVersion == "" || hasContinuation || hasLimit { // If resourceVersion is not specified, serve it from underlying - // storage (for backward compatibility). If a continuation or limit is + // storage (for backward compatibility). If a continuation is // requested, serve it from the underlying storage as well. + // Limits are only sent to storage when resourceVersion is non-zero + // since the watch cache isn't able to perform continuations, and + // limits are ignored when resource version is zero. return c.storage.List(ctx, key, resourceVersion, pred, listObj) } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/BUILD b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/BUILD index ad5b06d819c..2a6ce84be24 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/BUILD +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/BUILD @@ -18,6 +18,7 @@ go_test( "//vendor/github.com/coreos/etcd/clientv3:go_default_library", "//vendor/github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes:go_default_library", "//vendor/github.com/coreos/etcd/integration:go_default_library", + "//vendor/github.com/coreos/pkg/capnslog:go_default_library", "//vendor/golang.org/x/net/context:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/testing:go_default_library", diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go index e5a67f043ec..b0acc840165 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go @@ -396,11 +396,11 @@ func (s *store) GetToList(ctx context.Context, key string, resourceVersion strin if err != nil { return storage.NewInternalError(err.Error()) } - elems := []*elemForDecode{{ - data: data, - rev: uint64(getResp.Kvs[0].ModRevision), - }} - if err := decodeList(elems, storage.SimpleFilter(pred), listPtr, s.codec, s.versioner); err != nil { + v, err := conversion.EnforcePtr(listPtr) + if err != nil || v.Kind() != reflect.Slice { + panic("need ptr to slice") + } + if err := appendListItem(v, data, uint64(getResp.Kvs[0].ModRevision), storage.SimpleFilter(pred), s.codec, s.versioner); err != nil { return err } // update version with cluster level revision @@ -429,25 +429,26 @@ func decodeContinue(continueValue, keyPrefix string) (fromKey string, rv int64, return "", 0, fmt.Errorf("continue key is not valid: %v", err) } switch c.APIVersion { - case "v1alpha1": + case "meta.k8s.io/v1": if c.ResourceVersion == 0 { - return "", 0, fmt.Errorf("continue key is not valid: incorrect encoded start resourceVersion (version v1alpha1)") + return "", 0, fmt.Errorf("continue key is not valid: incorrect encoded start resourceVersion (version meta.k8s.io/v1)") } if len(c.StartKey) == 0 { - return "", 0, fmt.Errorf("continue key is not valid: encoded start key empty (version v1alpha1)") + return "", 0, fmt.Errorf("continue key is not valid: encoded start key empty (version meta.k8s.io/v1)") } // defend against path traversal attacks by clients - path.Clean will ensure that startKey cannot // be at a higher level of the hierarchy, and so when we append the key prefix we will end up with // continue start key that is fully qualified and cannot range over anything less specific than // keyPrefix. - cleaned := path.Clean(c.StartKey) - if cleaned != c.StartKey || cleaned == "." || cleaned == "/" { - return "", 0, fmt.Errorf("continue key is not valid: %s", cleaned) + key := c.StartKey + if !strings.HasPrefix(key, "/") { + key = "/" + key } - if len(cleaned) == 0 { - return "", 0, fmt.Errorf("continue key is not valid: encoded start key empty (version 0)") + cleaned := path.Clean(key) + if cleaned != key { + return "", 0, fmt.Errorf("continue key is not valid: %s", c.StartKey) } - return keyPrefix + cleaned, c.ResourceVersion, nil + return keyPrefix + cleaned[1:], c.ResourceVersion, nil default: return "", 0, fmt.Errorf("continue key is not valid: server does not recognize this encoded version %q", c.APIVersion) } @@ -459,7 +460,7 @@ func encodeContinue(key, keyPrefix string, resourceVersion int64) (string, error if nextKey == key { return "", fmt.Errorf("unable to encode next field: the key and key prefix do not match") } - out, err := json.Marshal(&continueToken{APIVersion: "v1alpha1", ResourceVersion: resourceVersion, StartKey: nextKey}) + out, err := json.Marshal(&continueToken{APIVersion: "meta.k8s.io/v1", ResourceVersion: resourceVersion, StartKey: nextKey}) if err != nil { return "", err } @@ -472,7 +473,14 @@ func (s *store) List(ctx context.Context, key, resourceVersion string, pred stor if err != nil { return err } - key = path.Join(s.pathPrefix, key) + v, err := conversion.EnforcePtr(listPtr) + if err != nil || v.Kind() != reflect.Slice { + panic("need ptr to slice") + } + + if s.pathPrefix != "" { + key = path.Join(s.pathPrefix, key) + } // We need to make sure the key ended with "/" so that we only get children "directories". // e.g. if we have key "/a", "/a/b", "/ab", getting keys with prefix "/a" will return all three, // while with prefix "/a/" will return only "/a/b" which is the correct answer. @@ -481,81 +489,166 @@ func (s *store) List(ctx context.Context, key, resourceVersion string, pred stor } keyPrefix := key + filter := storage.SimpleFilter(pred) + // set the appropriate clientv3 options to filter the returned data set + var paging bool options := make([]clientv3.OpOption, 0, 4) if s.pagingEnabled && pred.Limit > 0 { + paging = true options = append(options, clientv3.WithLimit(pred.Limit)) } + var returnedRV int64 switch { case s.pagingEnabled && len(pred.Continue) > 0: continueKey, continueRV, err := decodeContinue(pred.Continue, keyPrefix) if err != nil { - return err + return apierrors.NewBadRequest(fmt.Sprintf("invalid continue token: %v", err)) } - options = append(options, clientv3.WithRange(clientv3.GetPrefixRangeEnd(key))) + if len(resourceVersion) > 0 && resourceVersion != "0" { + return apierrors.NewBadRequest("specifying resource version is not allowed when using continue") + } + + rangeEnd := clientv3.GetPrefixRangeEnd(keyPrefix) + options = append(options, clientv3.WithRange(rangeEnd)) key = continueKey options = append(options, clientv3.WithRev(continueRV)) returnedRV = continueRV - case len(resourceVersion) > 0: - fromRV, err := strconv.ParseInt(resourceVersion, 10, 64) - if err != nil { - return fmt.Errorf("invalid resource version: %v", err) + case s.pagingEnabled && pred.Limit > 0: + if len(resourceVersion) > 0 { + fromRV, err := strconv.ParseInt(resourceVersion, 10, 64) + if err != nil { + return apierrors.NewBadRequest(fmt.Sprintf("invalid resource version: %v", err)) + } + if fromRV > 0 { + options = append(options, clientv3.WithRev(fromRV)) + } + returnedRV = fromRV } - options = append(options, clientv3.WithPrefix(), clientv3.WithRev(fromRV)) - returnedRV = fromRV + rangeEnd := clientv3.GetPrefixRangeEnd(keyPrefix) + options = append(options, clientv3.WithRange(rangeEnd)) default: + if len(resourceVersion) > 0 { + fromRV, err := strconv.ParseInt(resourceVersion, 10, 64) + if err != nil { + return apierrors.NewBadRequest(fmt.Sprintf("invalid resource version: %v", err)) + } + if fromRV > 0 { + options = append(options, clientv3.WithRev(fromRV)) + } + returnedRV = fromRV + } + options = append(options, clientv3.WithPrefix()) } - getResp, err := s.client.KV.Get(ctx, key, options...) - if err != nil { - return interpretListError(err, len(pred.Continue) > 0) - } - - elems := make([]*elemForDecode, 0, len(getResp.Kvs)) - for _, kv := range getResp.Kvs { - data, _, err := s.transformer.TransformFromStorage(kv.Value, authenticatedDataString(kv.Key)) + // loop until we have filled the requested limit from etcd or there are no more results + var lastKey []byte + var hasMore bool + for { + getResp, err := s.client.KV.Get(ctx, key, options...) if err != nil { - utilruntime.HandleError(fmt.Errorf("unable to transform key %q: %v", kv.Key, err)) - continue + return interpretListError(err, len(pred.Continue) > 0) + } + hasMore = getResp.More + + if len(getResp.Kvs) == 0 && getResp.More { + return fmt.Errorf("no results were found, but etcd indicated there were more values remaining") } - elems = append(elems, &elemForDecode{ - data: data, - rev: uint64(kv.ModRevision), - }) + // avoid small allocations for the result slice, since this can be called in many + // different contexts and we don't know how significantly the result will be filtered + if pred.Empty() { + growSlice(v, len(getResp.Kvs)) + } else { + growSlice(v, 2048, len(getResp.Kvs)) + } + + // take items from the response until the bucket is full, filtering as we go + for _, kv := range getResp.Kvs { + if paging && int64(v.Len()) >= pred.Limit { + hasMore = true + break + } + lastKey = kv.Key + + data, _, err := s.transformer.TransformFromStorage(kv.Value, authenticatedDataString(kv.Key)) + if err != nil { + utilruntime.HandleError(fmt.Errorf("unable to transform key %q: %v", kv.Key, err)) + continue + } + + if err := appendListItem(v, data, uint64(kv.ModRevision), filter, s.codec, s.versioner); err != nil { + return err + } + } + + // indicate to the client which resource version was returned + if returnedRV == 0 { + returnedRV = getResp.Header.Revision + } + + // no more results remain or we didn't request paging + if !hasMore || !paging { + break + } + // we're paging but we have filled our bucket + if int64(v.Len()) >= pred.Limit { + break + } + key = string(lastKey) + "\x00" } - if err := decodeList(elems, storage.SimpleFilter(pred), listPtr, s.codec, s.versioner); err != nil { - return err - } - - // indicate to the client which resource version was returned - if returnedRV == 0 { - returnedRV = getResp.Header.Revision - } - switch { - case !getResp.More: - // no continuation - return s.versioner.UpdateList(listObj, uint64(returnedRV), "") - case len(getResp.Kvs) == 0: - return fmt.Errorf("no results were found, but etcd indicated there were more values") - default: + // instruct the client to begin querying from immediately after the last key we returned + // we never return a key that the client wouldn't be allowed to see + if hasMore { // we want to start immediately after the last key - // TODO: this reveals info about certain keys - key := string(getResp.Kvs[len(getResp.Kvs)-1].Key) - next, err := encodeContinue(key+"\x00", keyPrefix, returnedRV) + next, err := encodeContinue(string(lastKey)+"\x00", keyPrefix, returnedRV) if err != nil { return err } return s.versioner.UpdateList(listObj, uint64(returnedRV), next) } + + // no continuation + return s.versioner.UpdateList(listObj, uint64(returnedRV), "") +} + +// growSlice takes a slice value and grows its capacity up +// to the maximum of the passed sizes or maxCapacity, whichever +// is smaller. Above maxCapacity decisions about allocation are left +// to the Go runtime on append. This allows a caller to make an +// educated guess about the potential size of the total list while +// still avoiding overly aggressive initial allocation. If sizes +// is empty maxCapacity will be used as the size to grow. +func growSlice(v reflect.Value, maxCapacity int, sizes ...int) { + cap := v.Cap() + max := cap + for _, size := range sizes { + if size > max { + max = size + } + } + if len(sizes) == 0 || max > maxCapacity { + max = maxCapacity + } + if max <= cap { + return + } + if v.Len() > 0 { + extra := reflect.MakeSlice(v.Type(), 0, max) + reflect.Copy(extra, v) + v.Set(extra) + } else { + extra := reflect.MakeSlice(v.Type(), 0, max) + v.Set(extra) + } } // Watch implements storage.Interface.Watch. @@ -677,23 +770,16 @@ func decode(codec runtime.Codec, versioner storage.Versioner, value []byte, objP return nil } -// decodeList decodes a list of values into a list of objects, with resource version set to corresponding rev. -// On success, ListPtr would be set to the list of objects. -func decodeList(elems []*elemForDecode, filter storage.FilterFunc, listPtr interface{}, codec runtime.Codec, versioner storage.Versioner) error { - v, err := conversion.EnforcePtr(listPtr) - if err != nil || v.Kind() != reflect.Slice { - panic("need ptr to slice") +// appendListItem decodes and appends the object (if it passes filter) to v, which must be a slice. +func appendListItem(v reflect.Value, data []byte, rev uint64, filter storage.FilterFunc, codec runtime.Codec, versioner storage.Versioner) error { + obj, _, err := codec.Decode(data, nil, reflect.New(v.Type().Elem()).Interface().(runtime.Object)) + if err != nil { + return err } - for _, elem := range elems { - obj, _, err := codec.Decode(elem.data, nil, reflect.New(v.Type().Elem()).Interface().(runtime.Object)) - if err != nil { - return err - } - // being unable to set the version does not prevent the object from being extracted - versioner.UpdateObject(obj, elem.rev) - if filter(obj) { - v.Set(reflect.Append(v, reflect.ValueOf(obj).Elem())) - } + // being unable to set the version does not prevent the object from being extracted + versioner.UpdateObject(obj, rev) + if filter(obj) { + v.Set(reflect.Append(v, reflect.ValueOf(obj).Elem())) } return nil } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go index 6ee72431d9c..d7c7f0f10d3 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go @@ -29,6 +29,7 @@ import ( "github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/integration" + "github.com/coreos/pkg/capnslog" "golang.org/x/net/context" apierrors "k8s.io/apimachinery/pkg/api/errors" apitesting "k8s.io/apimachinery/pkg/api/testing" @@ -55,6 +56,8 @@ func init() { metav1.AddToGroupVersion(scheme, metav1.SchemeGroupVersion) example.AddToScheme(scheme) examplev1.AddToScheme(scheme) + + capnslog.SetGlobalLogLevel(capnslog.CRITICAL) } // prefixTransformer adds and verifies that all data has the correct prefix on its way in and out. @@ -727,25 +730,44 @@ func TestList(t *testing.T) { // | - test // | // - two-level/ - // - 1/ + // | - 1/ + // | | - test + // | | + // | - 2/ + // | - test + // | + // - z-level/ + // - 3/ // | - test // | - // - 2/ - // - test + // - 3/ + // - test-2 preset := []struct { key string obj *example.Pod storedObj *example.Pod - }{{ - key: "/one-level/test", - obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, - }, { - key: "/two-level/1/test", - obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, - }, { - key: "/two-level/2/test", - obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}}, - }} + }{ + { + key: "/one-level/test", + obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, + }, + { + key: "/two-level/1/test", + obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, + }, + { + key: "/two-level/2/test", + obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}}, + }, + { + key: "/z-level/3/test", + obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "fourth"}}, + }, + { + key: "/z-level/3/test-2", + obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}}, + }, + } for i, ps := range preset { preset[i].storedObj = &example.Pod{} @@ -763,110 +785,302 @@ func TestList(t *testing.T) { t.Fatal(err) } + getAttrs := func(obj runtime.Object) (labels.Set, fields.Set, bool, error) { + pod := obj.(*example.Pod) + return nil, fields.Set{"metadata.name": pod.Name}, pod.Initializers != nil, nil + } + tests := []struct { + name string disablePaging bool + rv string prefix string pred storage.SelectionPredicate expectedOut []*example.Pod expectContinue bool + expectError bool }{ - { // test List on existing key + { + name: "rejects invalid resource version", + prefix: "/", + pred: storage.Everything, + rv: "abc", + expectError: true, + }, + { + name: "rejects resource version and continue token", + prefix: "/", + pred: storage.SelectionPredicate{ + Label: labels.Everything(), + Field: fields.Everything(), + Limit: 1, + Continue: secondContinuation, + }, + rv: "1", + expectError: true, + }, + { + name: "test List on existing key", prefix: "/one-level/", pred: storage.Everything, expectedOut: []*example.Pod{preset[0].storedObj}, }, - { // test List on non-existing key + { + name: "test List on non-existing key", prefix: "/non-existing/", pred: storage.Everything, expectedOut: nil, }, - { // test List with pod name matching + { + name: "test List with pod name matching", prefix: "/one-level/", pred: storage.SelectionPredicate{ Label: labels.Everything(), - Field: fields.ParseSelectorOrDie("metadata.name!=" + preset[0].storedObj.Name), - GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, bool, error) { - pod := obj.(*example.Pod) - return nil, fields.Set{"metadata.name": pod.Name}, pod.Initializers != nil, nil - }, + Field: fields.ParseSelectorOrDie("metadata.name!=foo"), }, expectedOut: nil, }, - { // test List with limit + { + name: "test List with limit", prefix: "/two-level/", pred: storage.SelectionPredicate{ Label: labels.Everything(), Field: fields.Everything(), Limit: 1, - GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, bool, error) { - pod := obj.(*example.Pod) - return nil, fields.Set{"metadata.name": pod.Name}, pod.Initializers != nil, nil - }, }, expectedOut: []*example.Pod{preset[1].storedObj}, expectContinue: true, }, - { // test List with limit when paging disabled + { + name: "test List with limit when paging disabled", disablePaging: true, prefix: "/two-level/", pred: storage.SelectionPredicate{ Label: labels.Everything(), Field: fields.Everything(), Limit: 1, - GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, bool, error) { - pod := obj.(*example.Pod) - return nil, fields.Set{"metadata.name": pod.Name}, pod.Initializers != nil, nil - }, }, expectedOut: []*example.Pod{preset[1].storedObj, preset[2].storedObj}, expectContinue: false, }, - { // test List with pregenerated continue token + { + name: "test List with pregenerated continue token", prefix: "/two-level/", pred: storage.SelectionPredicate{ Label: labels.Everything(), Field: fields.Everything(), Limit: 1, Continue: secondContinuation, - GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, bool, error) { - pod := obj.(*example.Pod) - return nil, fields.Set{"metadata.name": pod.Name}, pod.Initializers != nil, nil - }, }, expectedOut: []*example.Pod{preset[2].storedObj}, }, - { // test List with multiple levels of directories and expect flattened result + { + name: "ignores resource version 0 for List with pregenerated continue token", + prefix: "/two-level/", + pred: storage.SelectionPredicate{ + Label: labels.Everything(), + Field: fields.Everything(), + Limit: 1, + Continue: secondContinuation, + }, + rv: "0", + expectedOut: []*example.Pod{preset[2].storedObj}, + }, + { + name: "test List with multiple levels of directories and expect flattened result", prefix: "/two-level/", pred: storage.Everything, expectedOut: []*example.Pod{preset[1].storedObj, preset[2].storedObj}, }, + { + name: "test List with filter returning only one item, ensure only a single page returned", + prefix: "/", + pred: storage.SelectionPredicate{ + Field: fields.OneTermEqualSelector("metadata.name", "fourth"), + Label: labels.Everything(), + Limit: 1, + }, + expectedOut: []*example.Pod{preset[3].storedObj}, + expectContinue: true, + }, + { + name: "test List with filter returning only one item, covers the entire list", + prefix: "/", + pred: storage.SelectionPredicate{ + Field: fields.OneTermEqualSelector("metadata.name", "fourth"), + Label: labels.Everything(), + Limit: 2, + }, + expectedOut: []*example.Pod{preset[3].storedObj}, + expectContinue: false, + }, + { + name: "test List with filter returning only one item, covers the entire list, with resource version 0", + prefix: "/", + pred: storage.SelectionPredicate{ + Field: fields.OneTermEqualSelector("metadata.name", "fourth"), + Label: labels.Everything(), + Limit: 2, + }, + rv: "0", + expectedOut: []*example.Pod{preset[3].storedObj}, + expectContinue: false, + }, + { + name: "test List with filter returning two items, more pages possible", + prefix: "/", + pred: storage.SelectionPredicate{ + Field: fields.OneTermEqualSelector("metadata.name", "foo"), + Label: labels.Everything(), + Limit: 2, + }, + expectContinue: true, + expectedOut: []*example.Pod{preset[0].storedObj, preset[1].storedObj}, + }, + { + name: "filter returns two items split across multiple pages", + prefix: "/", + pred: storage.SelectionPredicate{ + Field: fields.OneTermEqualSelector("metadata.name", "bar"), + Label: labels.Everything(), + Limit: 2, + }, + expectedOut: []*example.Pod{preset[2].storedObj, preset[4].storedObj}, + }, + { + name: "filter returns one item for last page, ends on last item, not full", + prefix: "/", + pred: storage.SelectionPredicate{ + Field: fields.OneTermEqualSelector("metadata.name", "bar"), + Label: labels.Everything(), + Limit: 2, + Continue: encodeContinueOrDie("meta.k8s.io/v1", int64(continueRV), "z-level/3"), + }, + expectedOut: []*example.Pod{preset[4].storedObj}, + }, + { + name: "filter returns one item for last page, starts on last item, full", + prefix: "/", + pred: storage.SelectionPredicate{ + Field: fields.OneTermEqualSelector("metadata.name", "bar"), + Label: labels.Everything(), + Limit: 1, + Continue: encodeContinueOrDie("meta.k8s.io/v1", int64(continueRV), "z-level/3/test-2"), + }, + expectedOut: []*example.Pod{preset[4].storedObj}, + }, + { + name: "filter returns one item for last page, starts on last item, partial page", + prefix: "/", + pred: storage.SelectionPredicate{ + Field: fields.OneTermEqualSelector("metadata.name", "bar"), + Label: labels.Everything(), + Limit: 2, + Continue: encodeContinueOrDie("meta.k8s.io/v1", int64(continueRV), "z-level/3/test-2"), + }, + expectedOut: []*example.Pod{preset[4].storedObj}, + }, + { + name: "filter returns two items, page size equal to total list size", + prefix: "/", + pred: storage.SelectionPredicate{ + Field: fields.OneTermEqualSelector("metadata.name", "bar"), + Label: labels.Everything(), + Limit: 5, + }, + expectedOut: []*example.Pod{preset[2].storedObj, preset[4].storedObj}, + }, + { + name: "filter returns one item, page size equal to total list size", + prefix: "/", + pred: storage.SelectionPredicate{ + Field: fields.OneTermEqualSelector("metadata.name", "fourth"), + Label: labels.Everything(), + Limit: 5, + }, + expectedOut: []*example.Pod{preset[3].storedObj}, + }, } - for i, tt := range tests { + for _, tt := range tests { + if tt.pred.GetAttrs == nil { + tt.pred.GetAttrs = getAttrs + } + out := &example.PodList{} var err error if tt.disablePaging { - err = disablePagingStore.List(ctx, tt.prefix, "0", tt.pred, out) + err = disablePagingStore.List(ctx, tt.prefix, tt.rv, tt.pred, out) } else { - err = store.List(ctx, tt.prefix, "0", tt.pred, out) + err = store.List(ctx, tt.prefix, tt.rv, tt.pred, out) + } + if (err != nil) != tt.expectError { + t.Errorf("(%s): List failed: %v", tt.name, err) } if err != nil { - t.Fatalf("#%d: List failed: %v", i, err) + continue } if (len(out.Continue) > 0) != tt.expectContinue { - t.Errorf("#%d: unexpected continue token: %v", i, out.Continue) + t.Errorf("(%s): unexpected continue token: %q", tt.name, out.Continue) } if len(tt.expectedOut) != len(out.Items) { - t.Errorf("#%d: length of list want=%d, get=%d", i, len(tt.expectedOut), len(out.Items)) + t.Errorf("(%s): length of list want=%d, got=%d", tt.name, len(tt.expectedOut), len(out.Items)) continue } for j, wantPod := range tt.expectedOut { getPod := &out.Items[j] if !reflect.DeepEqual(wantPod, getPod) { - t.Errorf("#%d: pod want=%#v, get=%#v", i, wantPod, getPod) + t.Errorf("(%s): pod want=%#v, got=%#v", tt.name, wantPod, getPod) } } } +} + +func TestListContinuation(t *testing.T) { + codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) + cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + defer cluster.Terminate(t) + store := newStore(cluster.RandClient(), false, true, codec, "", prefixTransformer{prefix: []byte(defaultTestPrefix)}) + ctx := context.Background() + + // Setup storage with the following structure: + // / + // - one-level/ + // | - test + // | + // - two-level/ + // - 1/ + // | - test + // | + // - 2/ + // - test + // + preset := []struct { + key string + obj *example.Pod + storedObj *example.Pod + }{ + { + key: "/one-level/test", + obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, + }, + { + key: "/two-level/1/test", + obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, + }, + { + key: "/two-level/2/test", + obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}}, + }, + } + + for i, ps := range preset { + preset[i].storedObj = &example.Pod{} + err := store.Create(ctx, ps.key, ps.obj, preset[i].storedObj, 0) + if err != nil { + t.Fatalf("Set failed: %v", err) + } + } // test continuations out := &example.PodList{} @@ -994,16 +1208,16 @@ func Test_decodeContinue(t *testing.T) { wantRv int64 wantErr bool }{ - {name: "valid", args: args{continueValue: encodeContinueOrDie("v1alpha1", 1, "key"), keyPrefix: "/test/"}, wantRv: 1, wantFromKey: "/test/key"}, + {name: "valid", args: args{continueValue: encodeContinueOrDie("meta.k8s.io/v1", 1, "key"), keyPrefix: "/test/"}, wantRv: 1, wantFromKey: "/test/key"}, + {name: "root path", args: args{continueValue: encodeContinueOrDie("meta.k8s.io/v1", 1, "/"), keyPrefix: "/test/"}, wantRv: 1, wantFromKey: "/test/"}, {name: "empty version", args: args{continueValue: encodeContinueOrDie("", 1, "key"), keyPrefix: "/test/"}, wantErr: true}, {name: "invalid version", args: args{continueValue: encodeContinueOrDie("v1", 1, "key"), keyPrefix: "/test/"}, wantErr: true}, - {name: "path traversal - parent", args: args{continueValue: encodeContinueOrDie("v1alpha", 1, "../key"), keyPrefix: "/test/"}, wantErr: true}, - {name: "path traversal - local", args: args{continueValue: encodeContinueOrDie("v1alpha", 1, "./key"), keyPrefix: "/test/"}, wantErr: true}, - {name: "path traversal - double parent", args: args{continueValue: encodeContinueOrDie("v1alpha", 1, "./../key"), keyPrefix: "/test/"}, wantErr: true}, - {name: "path traversal - after parent", args: args{continueValue: encodeContinueOrDie("v1alpha", 1, "key/../.."), keyPrefix: "/test/"}, wantErr: true}, - {name: "path traversal - separator", args: args{continueValue: encodeContinueOrDie("v1alpha", 1, "/"), keyPrefix: "/test/"}, wantErr: true}, + {name: "path traversal - parent", args: args{continueValue: encodeContinueOrDie("meta.k8s.io/v1", 1, "../key"), keyPrefix: "/test/"}, wantErr: true}, + {name: "path traversal - local", args: args{continueValue: encodeContinueOrDie("meta.k8s.io/v1", 1, "./key"), keyPrefix: "/test/"}, wantErr: true}, + {name: "path traversal - double parent", args: args{continueValue: encodeContinueOrDie("meta.k8s.io/v1", 1, "./../key"), keyPrefix: "/test/"}, wantErr: true}, + {name: "path traversal - after parent", args: args{continueValue: encodeContinueOrDie("meta.k8s.io/v1", 1, "key/../.."), keyPrefix: "/test/"}, wantErr: true}, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -1021,3 +1235,68 @@ func Test_decodeContinue(t *testing.T) { }) } } + +func Test_growSlice(t *testing.T) { + type args struct { + t reflect.Type + initialCapacity int + v reflect.Value + maxCapacity int + sizes []int + } + tests := []struct { + name string + args args + cap int + }{ + { + name: "empty", + args: args{v: reflect.ValueOf([]example.Pod{})}, + cap: 0, + }, + { + name: "no sizes", + args: args{v: reflect.ValueOf([]example.Pod{}), maxCapacity: 10}, + cap: 10, + }, + { + name: "above maxCapacity", + args: args{v: reflect.ValueOf([]example.Pod{}), maxCapacity: 10, sizes: []int{1, 12}}, + cap: 10, + }, + { + name: "takes max", + args: args{v: reflect.ValueOf([]example.Pod{}), maxCapacity: 10, sizes: []int{8, 4}}, + cap: 8, + }, + { + name: "with existing capacity above max", + args: args{initialCapacity: 12, maxCapacity: 10, sizes: []int{8, 4}}, + cap: 12, + }, + { + name: "with existing capacity below max", + args: args{initialCapacity: 5, maxCapacity: 10, sizes: []int{8, 4}}, + cap: 8, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if tt.args.initialCapacity > 0 { + tt.args.v = reflect.ValueOf(make([]example.Pod, 0, tt.args.initialCapacity)) + } + // reflection requires that the value be addressible in order to call set, + // so we must ensure the value we created is available on the heap (not a problem + // for normal usage) + if !tt.args.v.CanAddr() { + x := reflect.New(tt.args.v.Type()) + x.Elem().Set(tt.args.v) + tt.args.v = x.Elem() + } + growSlice(tt.args.v, tt.args.maxCapacity, tt.args.sizes...) + if tt.cap != tt.args.v.Cap() { + t.Errorf("Unexpected capacity: got=%d want=%d", tt.args.v.Cap(), tt.cap) + } + }) + } +} diff --git a/staging/src/k8s.io/client-go/tools/cache/listwatch.go b/staging/src/k8s.io/client-go/tools/cache/listwatch.go index 55a90b631d6..454d50aadc6 100644 --- a/staging/src/k8s.io/client-go/tools/cache/listwatch.go +++ b/staging/src/k8s.io/client-go/tools/cache/listwatch.go @@ -51,8 +51,7 @@ type WatchFunc func(options metav1.ListOptions) (watch.Interface, error) type ListWatch struct { ListFunc ListFunc WatchFunc WatchFunc - // DisableChunking requests no chunking for this list watcher. It has no effect in Kubernetes 1.8, but in - // 1.9 will allow a controller to opt out of chunking. + // DisableChunking requests no chunking for this list watcher. DisableChunking bool } @@ -93,9 +92,7 @@ func timeoutFromListOptions(options metav1.ListOptions) time.Duration { // List a set of apiserver resources func (lw *ListWatch) List(options metav1.ListOptions) (runtime.Object, error) { - // chunking will become the default for list watchers starting in Kubernetes 1.9, unless - // otherwise disabled. - if false && !lw.DisableChunking { + if !lw.DisableChunking { return pager.New(pager.SimplePageFunc(lw.ListFunc)).List(context.TODO(), options) } return lw.ListFunc(options) diff --git a/test/e2e/apimachinery/BUILD b/test/e2e/apimachinery/BUILD index 115fcf5c7cb..a96b152e380 100644 --- a/test/e2e/apimachinery/BUILD +++ b/test/e2e/apimachinery/BUILD @@ -9,6 +9,7 @@ go_library( name = "go_default_library", srcs = [ "aggregator.go", + "chunking.go", "custom_resource_definition.go", "etcd_failure.go", "framework.go", @@ -58,6 +59,7 @@ go_library( "//vendor/k8s.io/client-go/kubernetes:go_default_library", "//vendor/k8s.io/client-go/util/cert:go_default_library", "//vendor/k8s.io/client-go/util/retry:go_default_library", + "//vendor/k8s.io/client-go/util/workqueue:go_default_library", "//vendor/k8s.io/kube-aggregator/pkg/apis/apiregistration/v1beta1:go_default_library", "//vendor/k8s.io/sample-apiserver/pkg/apis/wardle/v1alpha1:go_default_library", ], diff --git a/test/e2e/apimachinery/chunking.go b/test/e2e/apimachinery/chunking.go new file mode 100644 index 00000000000..3892ad353be --- /dev/null +++ b/test/e2e/apimachinery/chunking.go @@ -0,0 +1,99 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package apimachinery + +import ( + "fmt" + "math/rand" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + + "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/util/workqueue" + "k8s.io/kubernetes/test/e2e/framework" +) + +const numberOfTotalResources = 400 + +var _ = SIGDescribe("Servers with support for API chunking", func() { + f := framework.NewDefaultFramework("chunking") + + It("should return chunks of results for list calls", func() { + ns := f.Namespace.Name + c := f.ClientSet + client := c.Core().PodTemplates(ns) + + By("creating a large number of resources") + workqueue.Parallelize(20, numberOfTotalResources, func(i int) { + for tries := 3; tries >= 0; tries-- { + _, err := client.Create(&v1.PodTemplate{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("template-%04d", i), + }, + Template: v1.PodTemplateSpec{ + Spec: v1.PodSpec{ + Containers: []v1.Container{ + {Name: "test", Image: "test2"}, + }, + }, + }, + }) + if err == nil { + return + } + framework.Logf("Got an error creating template %d: %v", i, err) + } + Fail("Unable to create template %d, exiting", i) + }) + + By("retrieving those results in paged fashion several times") + for i := 0; i < 3; i++ { + opts := metav1.ListOptions{} + found := 0 + var lastRV string + for { + opts.Limit = int64(rand.Int31n(numberOfTotalResources/10) + 1) + list, err := client.List(opts) + Expect(err).ToNot(HaveOccurred()) + framework.Logf("Retrieved %d/%d results with rv %s and continue %s", len(list.Items), opts.Limit, list.ResourceVersion, list.Continue) + + if len(lastRV) == 0 { + lastRV = list.ResourceVersion + } + if lastRV != list.ResourceVersion { + Expect(list.ResourceVersion).To(Equal(lastRV)) + } + for _, item := range list.Items { + Expect(item.Name).To(Equal(fmt.Sprintf("template-%04d", found))) + found++ + } + if len(list.Continue) == 0 { + break + } + opts.Continue = list.Continue + } + Expect(found).To(BeNumerically("==", numberOfTotalResources)) + } + + By("retrieving those results all at once") + list, err := client.List(metav1.ListOptions{Limit: numberOfTotalResources + 1}) + Expect(err).ToNot(HaveOccurred()) + Expect(list.Items).To(HaveLen(numberOfTotalResources)) + }) +})