Merge pull request #52949 from smarterclayton/enable_paging

Automatic merge from submit-queue (batch tested with PRs 52354, 52949, 53551). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

Enable API chunking and promote to beta for 1.9

All list watchers default to using chunking.  The server by default fills pages to avoid low cardinality filters from making excessive numbers of requests.  Fix an issue with continuation tokens where a `../` could be used if the feature was enabled.

```release-note
API chunking via the `limit` and `continue` request parameters is promoted to beta in this release.  Client libraries using the Informer or ListWatch types will automatically opt in to chunking.
```
This commit is contained in:
Kubernetes Submit Queue 2017-10-10 15:45:29 -07:00 committed by GitHub
commit 23cc4dc50a
9 changed files with 601 additions and 131 deletions

View File

@ -201,7 +201,7 @@ var defaultKubernetesFeatureGates = map[utilfeature.Feature]utilfeature.FeatureS
genericfeatures.AdvancedAuditing: {Default: true, PreRelease: utilfeature.Beta}, genericfeatures.AdvancedAuditing: {Default: true, PreRelease: utilfeature.Beta},
genericfeatures.APIResponseCompression: {Default: false, PreRelease: utilfeature.Alpha}, genericfeatures.APIResponseCompression: {Default: false, PreRelease: utilfeature.Alpha},
genericfeatures.Initializers: {Default: false, PreRelease: utilfeature.Alpha}, genericfeatures.Initializers: {Default: false, PreRelease: utilfeature.Alpha},
genericfeatures.APIListChunking: {Default: false, PreRelease: utilfeature.Alpha}, genericfeatures.APIListChunking: {Default: true, PreRelease: utilfeature.Beta},
// inherited features from apiextensions-apiserver, relisted here to get a conflict if it is changed // inherited features from apiextensions-apiserver, relisted here to get a conflict if it is changed
// unintentionally on either side: // unintentionally on either side:

View File

@ -58,6 +58,7 @@ const (
// owner: @smarterclayton // owner: @smarterclayton
// alpha: v1.8 // alpha: v1.8
// beta: v1.9
// //
// Allow API clients to retrieve resource lists in chunks rather than // Allow API clients to retrieve resource lists in chunks rather than
// all at once. // all at once.
@ -76,5 +77,5 @@ var defaultKubernetesFeatureGates = map[utilfeature.Feature]utilfeature.FeatureS
AdvancedAuditing: {Default: true, PreRelease: utilfeature.Beta}, AdvancedAuditing: {Default: true, PreRelease: utilfeature.Beta},
APIResponseCompression: {Default: false, PreRelease: utilfeature.Alpha}, APIResponseCompression: {Default: false, PreRelease: utilfeature.Alpha},
Initializers: {Default: false, PreRelease: utilfeature.Alpha}, Initializers: {Default: false, PreRelease: utilfeature.Alpha},
APIListChunking: {Default: false, PreRelease: utilfeature.Alpha}, APIListChunking: {Default: true, PreRelease: utilfeature.Beta},
} }

View File

@ -468,10 +468,15 @@ func (c *Cacher) GetToList(ctx context.Context, key string, resourceVersion stri
// Implements storage.Interface. // Implements storage.Interface.
func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, pred SelectionPredicate, listObj runtime.Object) error { func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, pred SelectionPredicate, listObj runtime.Object) error {
pagingEnabled := utilfeature.DefaultFeatureGate.Enabled(features.APIListChunking) pagingEnabled := utilfeature.DefaultFeatureGate.Enabled(features.APIListChunking)
if resourceVersion == "" || (pagingEnabled && (len(pred.Continue) > 0 || pred.Limit > 0)) { hasContinuation := pagingEnabled && len(pred.Continue) > 0
hasLimit := pagingEnabled && pred.Limit > 0 && resourceVersion != "0"
if resourceVersion == "" || hasContinuation || hasLimit {
// If resourceVersion is not specified, serve it from underlying // If resourceVersion is not specified, serve it from underlying
// storage (for backward compatibility). If a continuation or limit is // storage (for backward compatibility). If a continuation is
// requested, serve it from the underlying storage as well. // requested, serve it from the underlying storage as well.
// Limits are only sent to storage when resourceVersion is non-zero
// since the watch cache isn't able to perform continuations, and
// limits are ignored when resource version is zero.
return c.storage.List(ctx, key, resourceVersion, pred, listObj) return c.storage.List(ctx, key, resourceVersion, pred, listObj)
} }

View File

@ -18,6 +18,7 @@ go_test(
"//vendor/github.com/coreos/etcd/clientv3:go_default_library", "//vendor/github.com/coreos/etcd/clientv3:go_default_library",
"//vendor/github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes:go_default_library", "//vendor/github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes:go_default_library",
"//vendor/github.com/coreos/etcd/integration:go_default_library", "//vendor/github.com/coreos/etcd/integration:go_default_library",
"//vendor/github.com/coreos/pkg/capnslog:go_default_library",
"//vendor/golang.org/x/net/context:go_default_library", "//vendor/golang.org/x/net/context:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/testing:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/testing:go_default_library",

View File

@ -396,11 +396,11 @@ func (s *store) GetToList(ctx context.Context, key string, resourceVersion strin
if err != nil { if err != nil {
return storage.NewInternalError(err.Error()) return storage.NewInternalError(err.Error())
} }
elems := []*elemForDecode{{ v, err := conversion.EnforcePtr(listPtr)
data: data, if err != nil || v.Kind() != reflect.Slice {
rev: uint64(getResp.Kvs[0].ModRevision), panic("need ptr to slice")
}} }
if err := decodeList(elems, storage.SimpleFilter(pred), listPtr, s.codec, s.versioner); err != nil { if err := appendListItem(v, data, uint64(getResp.Kvs[0].ModRevision), storage.SimpleFilter(pred), s.codec, s.versioner); err != nil {
return err return err
} }
// update version with cluster level revision // update version with cluster level revision
@ -429,25 +429,26 @@ func decodeContinue(continueValue, keyPrefix string) (fromKey string, rv int64,
return "", 0, fmt.Errorf("continue key is not valid: %v", err) return "", 0, fmt.Errorf("continue key is not valid: %v", err)
} }
switch c.APIVersion { switch c.APIVersion {
case "v1alpha1": case "meta.k8s.io/v1":
if c.ResourceVersion == 0 { if c.ResourceVersion == 0 {
return "", 0, fmt.Errorf("continue key is not valid: incorrect encoded start resourceVersion (version v1alpha1)") return "", 0, fmt.Errorf("continue key is not valid: incorrect encoded start resourceVersion (version meta.k8s.io/v1)")
} }
if len(c.StartKey) == 0 { if len(c.StartKey) == 0 {
return "", 0, fmt.Errorf("continue key is not valid: encoded start key empty (version v1alpha1)") return "", 0, fmt.Errorf("continue key is not valid: encoded start key empty (version meta.k8s.io/v1)")
} }
// defend against path traversal attacks by clients - path.Clean will ensure that startKey cannot // defend against path traversal attacks by clients - path.Clean will ensure that startKey cannot
// be at a higher level of the hierarchy, and so when we append the key prefix we will end up with // be at a higher level of the hierarchy, and so when we append the key prefix we will end up with
// continue start key that is fully qualified and cannot range over anything less specific than // continue start key that is fully qualified and cannot range over anything less specific than
// keyPrefix. // keyPrefix.
cleaned := path.Clean(c.StartKey) key := c.StartKey
if cleaned != c.StartKey || cleaned == "." || cleaned == "/" { if !strings.HasPrefix(key, "/") {
return "", 0, fmt.Errorf("continue key is not valid: %s", cleaned) key = "/" + key
} }
if len(cleaned) == 0 { cleaned := path.Clean(key)
return "", 0, fmt.Errorf("continue key is not valid: encoded start key empty (version 0)") if cleaned != key {
return "", 0, fmt.Errorf("continue key is not valid: %s", c.StartKey)
} }
return keyPrefix + cleaned, c.ResourceVersion, nil return keyPrefix + cleaned[1:], c.ResourceVersion, nil
default: default:
return "", 0, fmt.Errorf("continue key is not valid: server does not recognize this encoded version %q", c.APIVersion) return "", 0, fmt.Errorf("continue key is not valid: server does not recognize this encoded version %q", c.APIVersion)
} }
@ -459,7 +460,7 @@ func encodeContinue(key, keyPrefix string, resourceVersion int64) (string, error
if nextKey == key { if nextKey == key {
return "", fmt.Errorf("unable to encode next field: the key and key prefix do not match") return "", fmt.Errorf("unable to encode next field: the key and key prefix do not match")
} }
out, err := json.Marshal(&continueToken{APIVersion: "v1alpha1", ResourceVersion: resourceVersion, StartKey: nextKey}) out, err := json.Marshal(&continueToken{APIVersion: "meta.k8s.io/v1", ResourceVersion: resourceVersion, StartKey: nextKey})
if err != nil { if err != nil {
return "", err return "", err
} }
@ -472,7 +473,14 @@ func (s *store) List(ctx context.Context, key, resourceVersion string, pred stor
if err != nil { if err != nil {
return err return err
} }
key = path.Join(s.pathPrefix, key) v, err := conversion.EnforcePtr(listPtr)
if err != nil || v.Kind() != reflect.Slice {
panic("need ptr to slice")
}
if s.pathPrefix != "" {
key = path.Join(s.pathPrefix, key)
}
// We need to make sure the key ended with "/" so that we only get children "directories". // We need to make sure the key ended with "/" so that we only get children "directories".
// e.g. if we have key "/a", "/a/b", "/ab", getting keys with prefix "/a" will return all three, // e.g. if we have key "/a", "/a/b", "/ab", getting keys with prefix "/a" will return all three,
// while with prefix "/a/" will return only "/a/b" which is the correct answer. // while with prefix "/a/" will return only "/a/b" which is the correct answer.
@ -481,81 +489,166 @@ func (s *store) List(ctx context.Context, key, resourceVersion string, pred stor
} }
keyPrefix := key keyPrefix := key
filter := storage.SimpleFilter(pred)
// set the appropriate clientv3 options to filter the returned data set // set the appropriate clientv3 options to filter the returned data set
var paging bool
options := make([]clientv3.OpOption, 0, 4) options := make([]clientv3.OpOption, 0, 4)
if s.pagingEnabled && pred.Limit > 0 { if s.pagingEnabled && pred.Limit > 0 {
paging = true
options = append(options, clientv3.WithLimit(pred.Limit)) options = append(options, clientv3.WithLimit(pred.Limit))
} }
var returnedRV int64 var returnedRV int64
switch { switch {
case s.pagingEnabled && len(pred.Continue) > 0: case s.pagingEnabled && len(pred.Continue) > 0:
continueKey, continueRV, err := decodeContinue(pred.Continue, keyPrefix) continueKey, continueRV, err := decodeContinue(pred.Continue, keyPrefix)
if err != nil { if err != nil {
return err return apierrors.NewBadRequest(fmt.Sprintf("invalid continue token: %v", err))
} }
options = append(options, clientv3.WithRange(clientv3.GetPrefixRangeEnd(key))) if len(resourceVersion) > 0 && resourceVersion != "0" {
return apierrors.NewBadRequest("specifying resource version is not allowed when using continue")
}
rangeEnd := clientv3.GetPrefixRangeEnd(keyPrefix)
options = append(options, clientv3.WithRange(rangeEnd))
key = continueKey key = continueKey
options = append(options, clientv3.WithRev(continueRV)) options = append(options, clientv3.WithRev(continueRV))
returnedRV = continueRV returnedRV = continueRV
case len(resourceVersion) > 0: case s.pagingEnabled && pred.Limit > 0:
fromRV, err := strconv.ParseInt(resourceVersion, 10, 64) if len(resourceVersion) > 0 {
if err != nil { fromRV, err := strconv.ParseInt(resourceVersion, 10, 64)
return fmt.Errorf("invalid resource version: %v", err) if err != nil {
return apierrors.NewBadRequest(fmt.Sprintf("invalid resource version: %v", err))
}
if fromRV > 0 {
options = append(options, clientv3.WithRev(fromRV))
}
returnedRV = fromRV
} }
options = append(options, clientv3.WithPrefix(), clientv3.WithRev(fromRV)) rangeEnd := clientv3.GetPrefixRangeEnd(keyPrefix)
returnedRV = fromRV options = append(options, clientv3.WithRange(rangeEnd))
default: default:
if len(resourceVersion) > 0 {
fromRV, err := strconv.ParseInt(resourceVersion, 10, 64)
if err != nil {
return apierrors.NewBadRequest(fmt.Sprintf("invalid resource version: %v", err))
}
if fromRV > 0 {
options = append(options, clientv3.WithRev(fromRV))
}
returnedRV = fromRV
}
options = append(options, clientv3.WithPrefix()) options = append(options, clientv3.WithPrefix())
} }
getResp, err := s.client.KV.Get(ctx, key, options...) // loop until we have filled the requested limit from etcd or there are no more results
if err != nil { var lastKey []byte
return interpretListError(err, len(pred.Continue) > 0) var hasMore bool
} for {
getResp, err := s.client.KV.Get(ctx, key, options...)
elems := make([]*elemForDecode, 0, len(getResp.Kvs))
for _, kv := range getResp.Kvs {
data, _, err := s.transformer.TransformFromStorage(kv.Value, authenticatedDataString(kv.Key))
if err != nil { if err != nil {
utilruntime.HandleError(fmt.Errorf("unable to transform key %q: %v", kv.Key, err)) return interpretListError(err, len(pred.Continue) > 0)
continue }
hasMore = getResp.More
if len(getResp.Kvs) == 0 && getResp.More {
return fmt.Errorf("no results were found, but etcd indicated there were more values remaining")
} }
elems = append(elems, &elemForDecode{ // avoid small allocations for the result slice, since this can be called in many
data: data, // different contexts and we don't know how significantly the result will be filtered
rev: uint64(kv.ModRevision), if pred.Empty() {
}) growSlice(v, len(getResp.Kvs))
} else {
growSlice(v, 2048, len(getResp.Kvs))
}
// take items from the response until the bucket is full, filtering as we go
for _, kv := range getResp.Kvs {
if paging && int64(v.Len()) >= pred.Limit {
hasMore = true
break
}
lastKey = kv.Key
data, _, err := s.transformer.TransformFromStorage(kv.Value, authenticatedDataString(kv.Key))
if err != nil {
utilruntime.HandleError(fmt.Errorf("unable to transform key %q: %v", kv.Key, err))
continue
}
if err := appendListItem(v, data, uint64(kv.ModRevision), filter, s.codec, s.versioner); err != nil {
return err
}
}
// indicate to the client which resource version was returned
if returnedRV == 0 {
returnedRV = getResp.Header.Revision
}
// no more results remain or we didn't request paging
if !hasMore || !paging {
break
}
// we're paging but we have filled our bucket
if int64(v.Len()) >= pred.Limit {
break
}
key = string(lastKey) + "\x00"
} }
if err := decodeList(elems, storage.SimpleFilter(pred), listPtr, s.codec, s.versioner); err != nil { // instruct the client to begin querying from immediately after the last key we returned
return err // we never return a key that the client wouldn't be allowed to see
} if hasMore {
// indicate to the client which resource version was returned
if returnedRV == 0 {
returnedRV = getResp.Header.Revision
}
switch {
case !getResp.More:
// no continuation
return s.versioner.UpdateList(listObj, uint64(returnedRV), "")
case len(getResp.Kvs) == 0:
return fmt.Errorf("no results were found, but etcd indicated there were more values")
default:
// we want to start immediately after the last key // we want to start immediately after the last key
// TODO: this reveals info about certain keys next, err := encodeContinue(string(lastKey)+"\x00", keyPrefix, returnedRV)
key := string(getResp.Kvs[len(getResp.Kvs)-1].Key)
next, err := encodeContinue(key+"\x00", keyPrefix, returnedRV)
if err != nil { if err != nil {
return err return err
} }
return s.versioner.UpdateList(listObj, uint64(returnedRV), next) return s.versioner.UpdateList(listObj, uint64(returnedRV), next)
} }
// no continuation
return s.versioner.UpdateList(listObj, uint64(returnedRV), "")
}
// growSlice takes a slice value and grows its capacity up
// to the maximum of the passed sizes or maxCapacity, whichever
// is smaller. Above maxCapacity decisions about allocation are left
// to the Go runtime on append. This allows a caller to make an
// educated guess about the potential size of the total list while
// still avoiding overly aggressive initial allocation. If sizes
// is empty maxCapacity will be used as the size to grow.
func growSlice(v reflect.Value, maxCapacity int, sizes ...int) {
cap := v.Cap()
max := cap
for _, size := range sizes {
if size > max {
max = size
}
}
if len(sizes) == 0 || max > maxCapacity {
max = maxCapacity
}
if max <= cap {
return
}
if v.Len() > 0 {
extra := reflect.MakeSlice(v.Type(), 0, max)
reflect.Copy(extra, v)
v.Set(extra)
} else {
extra := reflect.MakeSlice(v.Type(), 0, max)
v.Set(extra)
}
} }
// Watch implements storage.Interface.Watch. // Watch implements storage.Interface.Watch.
@ -677,23 +770,16 @@ func decode(codec runtime.Codec, versioner storage.Versioner, value []byte, objP
return nil return nil
} }
// decodeList decodes a list of values into a list of objects, with resource version set to corresponding rev. // appendListItem decodes and appends the object (if it passes filter) to v, which must be a slice.
// On success, ListPtr would be set to the list of objects. func appendListItem(v reflect.Value, data []byte, rev uint64, filter storage.FilterFunc, codec runtime.Codec, versioner storage.Versioner) error {
func decodeList(elems []*elemForDecode, filter storage.FilterFunc, listPtr interface{}, codec runtime.Codec, versioner storage.Versioner) error { obj, _, err := codec.Decode(data, nil, reflect.New(v.Type().Elem()).Interface().(runtime.Object))
v, err := conversion.EnforcePtr(listPtr) if err != nil {
if err != nil || v.Kind() != reflect.Slice { return err
panic("need ptr to slice")
} }
for _, elem := range elems { // being unable to set the version does not prevent the object from being extracted
obj, _, err := codec.Decode(elem.data, nil, reflect.New(v.Type().Elem()).Interface().(runtime.Object)) versioner.UpdateObject(obj, rev)
if err != nil { if filter(obj) {
return err v.Set(reflect.Append(v, reflect.ValueOf(obj).Elem()))
}
// being unable to set the version does not prevent the object from being extracted
versioner.UpdateObject(obj, elem.rev)
if filter(obj) {
v.Set(reflect.Append(v, reflect.ValueOf(obj).Elem()))
}
} }
return nil return nil
} }

View File

@ -29,6 +29,7 @@ import (
"github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/integration" "github.com/coreos/etcd/integration"
"github.com/coreos/pkg/capnslog"
"golang.org/x/net/context" "golang.org/x/net/context"
apierrors "k8s.io/apimachinery/pkg/api/errors" apierrors "k8s.io/apimachinery/pkg/api/errors"
apitesting "k8s.io/apimachinery/pkg/api/testing" apitesting "k8s.io/apimachinery/pkg/api/testing"
@ -55,6 +56,8 @@ func init() {
metav1.AddToGroupVersion(scheme, metav1.SchemeGroupVersion) metav1.AddToGroupVersion(scheme, metav1.SchemeGroupVersion)
example.AddToScheme(scheme) example.AddToScheme(scheme)
examplev1.AddToScheme(scheme) examplev1.AddToScheme(scheme)
capnslog.SetGlobalLogLevel(capnslog.CRITICAL)
} }
// prefixTransformer adds and verifies that all data has the correct prefix on its way in and out. // prefixTransformer adds and verifies that all data has the correct prefix on its way in and out.
@ -727,25 +730,44 @@ func TestList(t *testing.T) {
// | - test // | - test
// | // |
// - two-level/ // - two-level/
// - 1/ // | - 1/
// | | - test
// | |
// | - 2/
// | - test
// |
// - z-level/
// - 3/
// | - test // | - test
// | // |
// - 2/ // - 3/
// - test // - test-2
preset := []struct { preset := []struct {
key string key string
obj *example.Pod obj *example.Pod
storedObj *example.Pod storedObj *example.Pod
}{{ }{
key: "/one-level/test", {
obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, key: "/one-level/test",
}, { obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}},
key: "/two-level/1/test", },
obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, {
}, { key: "/two-level/1/test",
key: "/two-level/2/test", obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}},
obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}}, },
}} {
key: "/two-level/2/test",
obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}},
},
{
key: "/z-level/3/test",
obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "fourth"}},
},
{
key: "/z-level/3/test-2",
obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}},
},
}
for i, ps := range preset { for i, ps := range preset {
preset[i].storedObj = &example.Pod{} preset[i].storedObj = &example.Pod{}
@ -763,110 +785,302 @@ func TestList(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
getAttrs := func(obj runtime.Object) (labels.Set, fields.Set, bool, error) {
pod := obj.(*example.Pod)
return nil, fields.Set{"metadata.name": pod.Name}, pod.Initializers != nil, nil
}
tests := []struct { tests := []struct {
name string
disablePaging bool disablePaging bool
rv string
prefix string prefix string
pred storage.SelectionPredicate pred storage.SelectionPredicate
expectedOut []*example.Pod expectedOut []*example.Pod
expectContinue bool expectContinue bool
expectError bool
}{ }{
{ // test List on existing key {
name: "rejects invalid resource version",
prefix: "/",
pred: storage.Everything,
rv: "abc",
expectError: true,
},
{
name: "rejects resource version and continue token",
prefix: "/",
pred: storage.SelectionPredicate{
Label: labels.Everything(),
Field: fields.Everything(),
Limit: 1,
Continue: secondContinuation,
},
rv: "1",
expectError: true,
},
{
name: "test List on existing key",
prefix: "/one-level/", prefix: "/one-level/",
pred: storage.Everything, pred: storage.Everything,
expectedOut: []*example.Pod{preset[0].storedObj}, expectedOut: []*example.Pod{preset[0].storedObj},
}, },
{ // test List on non-existing key {
name: "test List on non-existing key",
prefix: "/non-existing/", prefix: "/non-existing/",
pred: storage.Everything, pred: storage.Everything,
expectedOut: nil, expectedOut: nil,
}, },
{ // test List with pod name matching {
name: "test List with pod name matching",
prefix: "/one-level/", prefix: "/one-level/",
pred: storage.SelectionPredicate{ pred: storage.SelectionPredicate{
Label: labels.Everything(), Label: labels.Everything(),
Field: fields.ParseSelectorOrDie("metadata.name!=" + preset[0].storedObj.Name), Field: fields.ParseSelectorOrDie("metadata.name!=foo"),
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, bool, error) {
pod := obj.(*example.Pod)
return nil, fields.Set{"metadata.name": pod.Name}, pod.Initializers != nil, nil
},
}, },
expectedOut: nil, expectedOut: nil,
}, },
{ // test List with limit {
name: "test List with limit",
prefix: "/two-level/", prefix: "/two-level/",
pred: storage.SelectionPredicate{ pred: storage.SelectionPredicate{
Label: labels.Everything(), Label: labels.Everything(),
Field: fields.Everything(), Field: fields.Everything(),
Limit: 1, Limit: 1,
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, bool, error) {
pod := obj.(*example.Pod)
return nil, fields.Set{"metadata.name": pod.Name}, pod.Initializers != nil, nil
},
}, },
expectedOut: []*example.Pod{preset[1].storedObj}, expectedOut: []*example.Pod{preset[1].storedObj},
expectContinue: true, expectContinue: true,
}, },
{ // test List with limit when paging disabled {
name: "test List with limit when paging disabled",
disablePaging: true, disablePaging: true,
prefix: "/two-level/", prefix: "/two-level/",
pred: storage.SelectionPredicate{ pred: storage.SelectionPredicate{
Label: labels.Everything(), Label: labels.Everything(),
Field: fields.Everything(), Field: fields.Everything(),
Limit: 1, Limit: 1,
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, bool, error) {
pod := obj.(*example.Pod)
return nil, fields.Set{"metadata.name": pod.Name}, pod.Initializers != nil, nil
},
}, },
expectedOut: []*example.Pod{preset[1].storedObj, preset[2].storedObj}, expectedOut: []*example.Pod{preset[1].storedObj, preset[2].storedObj},
expectContinue: false, expectContinue: false,
}, },
{ // test List with pregenerated continue token {
name: "test List with pregenerated continue token",
prefix: "/two-level/", prefix: "/two-level/",
pred: storage.SelectionPredicate{ pred: storage.SelectionPredicate{
Label: labels.Everything(), Label: labels.Everything(),
Field: fields.Everything(), Field: fields.Everything(),
Limit: 1, Limit: 1,
Continue: secondContinuation, Continue: secondContinuation,
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, bool, error) {
pod := obj.(*example.Pod)
return nil, fields.Set{"metadata.name": pod.Name}, pod.Initializers != nil, nil
},
}, },
expectedOut: []*example.Pod{preset[2].storedObj}, expectedOut: []*example.Pod{preset[2].storedObj},
}, },
{ // test List with multiple levels of directories and expect flattened result {
name: "ignores resource version 0 for List with pregenerated continue token",
prefix: "/two-level/",
pred: storage.SelectionPredicate{
Label: labels.Everything(),
Field: fields.Everything(),
Limit: 1,
Continue: secondContinuation,
},
rv: "0",
expectedOut: []*example.Pod{preset[2].storedObj},
},
{
name: "test List with multiple levels of directories and expect flattened result",
prefix: "/two-level/", prefix: "/two-level/",
pred: storage.Everything, pred: storage.Everything,
expectedOut: []*example.Pod{preset[1].storedObj, preset[2].storedObj}, expectedOut: []*example.Pod{preset[1].storedObj, preset[2].storedObj},
}, },
{
name: "test List with filter returning only one item, ensure only a single page returned",
prefix: "/",
pred: storage.SelectionPredicate{
Field: fields.OneTermEqualSelector("metadata.name", "fourth"),
Label: labels.Everything(),
Limit: 1,
},
expectedOut: []*example.Pod{preset[3].storedObj},
expectContinue: true,
},
{
name: "test List with filter returning only one item, covers the entire list",
prefix: "/",
pred: storage.SelectionPredicate{
Field: fields.OneTermEqualSelector("metadata.name", "fourth"),
Label: labels.Everything(),
Limit: 2,
},
expectedOut: []*example.Pod{preset[3].storedObj},
expectContinue: false,
},
{
name: "test List with filter returning only one item, covers the entire list, with resource version 0",
prefix: "/",
pred: storage.SelectionPredicate{
Field: fields.OneTermEqualSelector("metadata.name", "fourth"),
Label: labels.Everything(),
Limit: 2,
},
rv: "0",
expectedOut: []*example.Pod{preset[3].storedObj},
expectContinue: false,
},
{
name: "test List with filter returning two items, more pages possible",
prefix: "/",
pred: storage.SelectionPredicate{
Field: fields.OneTermEqualSelector("metadata.name", "foo"),
Label: labels.Everything(),
Limit: 2,
},
expectContinue: true,
expectedOut: []*example.Pod{preset[0].storedObj, preset[1].storedObj},
},
{
name: "filter returns two items split across multiple pages",
prefix: "/",
pred: storage.SelectionPredicate{
Field: fields.OneTermEqualSelector("metadata.name", "bar"),
Label: labels.Everything(),
Limit: 2,
},
expectedOut: []*example.Pod{preset[2].storedObj, preset[4].storedObj},
},
{
name: "filter returns one item for last page, ends on last item, not full",
prefix: "/",
pred: storage.SelectionPredicate{
Field: fields.OneTermEqualSelector("metadata.name", "bar"),
Label: labels.Everything(),
Limit: 2,
Continue: encodeContinueOrDie("meta.k8s.io/v1", int64(continueRV), "z-level/3"),
},
expectedOut: []*example.Pod{preset[4].storedObj},
},
{
name: "filter returns one item for last page, starts on last item, full",
prefix: "/",
pred: storage.SelectionPredicate{
Field: fields.OneTermEqualSelector("metadata.name", "bar"),
Label: labels.Everything(),
Limit: 1,
Continue: encodeContinueOrDie("meta.k8s.io/v1", int64(continueRV), "z-level/3/test-2"),
},
expectedOut: []*example.Pod{preset[4].storedObj},
},
{
name: "filter returns one item for last page, starts on last item, partial page",
prefix: "/",
pred: storage.SelectionPredicate{
Field: fields.OneTermEqualSelector("metadata.name", "bar"),
Label: labels.Everything(),
Limit: 2,
Continue: encodeContinueOrDie("meta.k8s.io/v1", int64(continueRV), "z-level/3/test-2"),
},
expectedOut: []*example.Pod{preset[4].storedObj},
},
{
name: "filter returns two items, page size equal to total list size",
prefix: "/",
pred: storage.SelectionPredicate{
Field: fields.OneTermEqualSelector("metadata.name", "bar"),
Label: labels.Everything(),
Limit: 5,
},
expectedOut: []*example.Pod{preset[2].storedObj, preset[4].storedObj},
},
{
name: "filter returns one item, page size equal to total list size",
prefix: "/",
pred: storage.SelectionPredicate{
Field: fields.OneTermEqualSelector("metadata.name", "fourth"),
Label: labels.Everything(),
Limit: 5,
},
expectedOut: []*example.Pod{preset[3].storedObj},
},
} }
for i, tt := range tests { for _, tt := range tests {
if tt.pred.GetAttrs == nil {
tt.pred.GetAttrs = getAttrs
}
out := &example.PodList{} out := &example.PodList{}
var err error var err error
if tt.disablePaging { if tt.disablePaging {
err = disablePagingStore.List(ctx, tt.prefix, "0", tt.pred, out) err = disablePagingStore.List(ctx, tt.prefix, tt.rv, tt.pred, out)
} else { } else {
err = store.List(ctx, tt.prefix, "0", tt.pred, out) err = store.List(ctx, tt.prefix, tt.rv, tt.pred, out)
}
if (err != nil) != tt.expectError {
t.Errorf("(%s): List failed: %v", tt.name, err)
} }
if err != nil { if err != nil {
t.Fatalf("#%d: List failed: %v", i, err) continue
} }
if (len(out.Continue) > 0) != tt.expectContinue { if (len(out.Continue) > 0) != tt.expectContinue {
t.Errorf("#%d: unexpected continue token: %v", i, out.Continue) t.Errorf("(%s): unexpected continue token: %q", tt.name, out.Continue)
} }
if len(tt.expectedOut) != len(out.Items) { if len(tt.expectedOut) != len(out.Items) {
t.Errorf("#%d: length of list want=%d, get=%d", i, len(tt.expectedOut), len(out.Items)) t.Errorf("(%s): length of list want=%d, got=%d", tt.name, len(tt.expectedOut), len(out.Items))
continue continue
} }
for j, wantPod := range tt.expectedOut { for j, wantPod := range tt.expectedOut {
getPod := &out.Items[j] getPod := &out.Items[j]
if !reflect.DeepEqual(wantPod, getPod) { if !reflect.DeepEqual(wantPod, getPod) {
t.Errorf("#%d: pod want=%#v, get=%#v", i, wantPod, getPod) t.Errorf("(%s): pod want=%#v, got=%#v", tt.name, wantPod, getPod)
} }
} }
} }
}
func TestListContinuation(t *testing.T) {
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer cluster.Terminate(t)
store := newStore(cluster.RandClient(), false, true, codec, "", prefixTransformer{prefix: []byte(defaultTestPrefix)})
ctx := context.Background()
// Setup storage with the following structure:
// /
// - one-level/
// | - test
// |
// - two-level/
// - 1/
// | - test
// |
// - 2/
// - test
//
preset := []struct {
key string
obj *example.Pod
storedObj *example.Pod
}{
{
key: "/one-level/test",
obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}},
},
{
key: "/two-level/1/test",
obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}},
},
{
key: "/two-level/2/test",
obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}},
},
}
for i, ps := range preset {
preset[i].storedObj = &example.Pod{}
err := store.Create(ctx, ps.key, ps.obj, preset[i].storedObj, 0)
if err != nil {
t.Fatalf("Set failed: %v", err)
}
}
// test continuations // test continuations
out := &example.PodList{} out := &example.PodList{}
@ -994,16 +1208,16 @@ func Test_decodeContinue(t *testing.T) {
wantRv int64 wantRv int64
wantErr bool wantErr bool
}{ }{
{name: "valid", args: args{continueValue: encodeContinueOrDie("v1alpha1", 1, "key"), keyPrefix: "/test/"}, wantRv: 1, wantFromKey: "/test/key"}, {name: "valid", args: args{continueValue: encodeContinueOrDie("meta.k8s.io/v1", 1, "key"), keyPrefix: "/test/"}, wantRv: 1, wantFromKey: "/test/key"},
{name: "root path", args: args{continueValue: encodeContinueOrDie("meta.k8s.io/v1", 1, "/"), keyPrefix: "/test/"}, wantRv: 1, wantFromKey: "/test/"},
{name: "empty version", args: args{continueValue: encodeContinueOrDie("", 1, "key"), keyPrefix: "/test/"}, wantErr: true}, {name: "empty version", args: args{continueValue: encodeContinueOrDie("", 1, "key"), keyPrefix: "/test/"}, wantErr: true},
{name: "invalid version", args: args{continueValue: encodeContinueOrDie("v1", 1, "key"), keyPrefix: "/test/"}, wantErr: true}, {name: "invalid version", args: args{continueValue: encodeContinueOrDie("v1", 1, "key"), keyPrefix: "/test/"}, wantErr: true},
{name: "path traversal - parent", args: args{continueValue: encodeContinueOrDie("v1alpha", 1, "../key"), keyPrefix: "/test/"}, wantErr: true}, {name: "path traversal - parent", args: args{continueValue: encodeContinueOrDie("meta.k8s.io/v1", 1, "../key"), keyPrefix: "/test/"}, wantErr: true},
{name: "path traversal - local", args: args{continueValue: encodeContinueOrDie("v1alpha", 1, "./key"), keyPrefix: "/test/"}, wantErr: true}, {name: "path traversal - local", args: args{continueValue: encodeContinueOrDie("meta.k8s.io/v1", 1, "./key"), keyPrefix: "/test/"}, wantErr: true},
{name: "path traversal - double parent", args: args{continueValue: encodeContinueOrDie("v1alpha", 1, "./../key"), keyPrefix: "/test/"}, wantErr: true}, {name: "path traversal - double parent", args: args{continueValue: encodeContinueOrDie("meta.k8s.io/v1", 1, "./../key"), keyPrefix: "/test/"}, wantErr: true},
{name: "path traversal - after parent", args: args{continueValue: encodeContinueOrDie("v1alpha", 1, "key/../.."), keyPrefix: "/test/"}, wantErr: true}, {name: "path traversal - after parent", args: args{continueValue: encodeContinueOrDie("meta.k8s.io/v1", 1, "key/../.."), keyPrefix: "/test/"}, wantErr: true},
{name: "path traversal - separator", args: args{continueValue: encodeContinueOrDie("v1alpha", 1, "/"), keyPrefix: "/test/"}, wantErr: true},
} }
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
@ -1021,3 +1235,68 @@ func Test_decodeContinue(t *testing.T) {
}) })
} }
} }
func Test_growSlice(t *testing.T) {
type args struct {
t reflect.Type
initialCapacity int
v reflect.Value
maxCapacity int
sizes []int
}
tests := []struct {
name string
args args
cap int
}{
{
name: "empty",
args: args{v: reflect.ValueOf([]example.Pod{})},
cap: 0,
},
{
name: "no sizes",
args: args{v: reflect.ValueOf([]example.Pod{}), maxCapacity: 10},
cap: 10,
},
{
name: "above maxCapacity",
args: args{v: reflect.ValueOf([]example.Pod{}), maxCapacity: 10, sizes: []int{1, 12}},
cap: 10,
},
{
name: "takes max",
args: args{v: reflect.ValueOf([]example.Pod{}), maxCapacity: 10, sizes: []int{8, 4}},
cap: 8,
},
{
name: "with existing capacity above max",
args: args{initialCapacity: 12, maxCapacity: 10, sizes: []int{8, 4}},
cap: 12,
},
{
name: "with existing capacity below max",
args: args{initialCapacity: 5, maxCapacity: 10, sizes: []int{8, 4}},
cap: 8,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if tt.args.initialCapacity > 0 {
tt.args.v = reflect.ValueOf(make([]example.Pod, 0, tt.args.initialCapacity))
}
// reflection requires that the value be addressible in order to call set,
// so we must ensure the value we created is available on the heap (not a problem
// for normal usage)
if !tt.args.v.CanAddr() {
x := reflect.New(tt.args.v.Type())
x.Elem().Set(tt.args.v)
tt.args.v = x.Elem()
}
growSlice(tt.args.v, tt.args.maxCapacity, tt.args.sizes...)
if tt.cap != tt.args.v.Cap() {
t.Errorf("Unexpected capacity: got=%d want=%d", tt.args.v.Cap(), tt.cap)
}
})
}
}

View File

@ -51,8 +51,7 @@ type WatchFunc func(options metav1.ListOptions) (watch.Interface, error)
type ListWatch struct { type ListWatch struct {
ListFunc ListFunc ListFunc ListFunc
WatchFunc WatchFunc WatchFunc WatchFunc
// DisableChunking requests no chunking for this list watcher. It has no effect in Kubernetes 1.8, but in // DisableChunking requests no chunking for this list watcher.
// 1.9 will allow a controller to opt out of chunking.
DisableChunking bool DisableChunking bool
} }
@ -93,9 +92,7 @@ func timeoutFromListOptions(options metav1.ListOptions) time.Duration {
// List a set of apiserver resources // List a set of apiserver resources
func (lw *ListWatch) List(options metav1.ListOptions) (runtime.Object, error) { func (lw *ListWatch) List(options metav1.ListOptions) (runtime.Object, error) {
// chunking will become the default for list watchers starting in Kubernetes 1.9, unless if !lw.DisableChunking {
// otherwise disabled.
if false && !lw.DisableChunking {
return pager.New(pager.SimplePageFunc(lw.ListFunc)).List(context.TODO(), options) return pager.New(pager.SimplePageFunc(lw.ListFunc)).List(context.TODO(), options)
} }
return lw.ListFunc(options) return lw.ListFunc(options)

View File

@ -9,6 +9,7 @@ go_library(
name = "go_default_library", name = "go_default_library",
srcs = [ srcs = [
"aggregator.go", "aggregator.go",
"chunking.go",
"custom_resource_definition.go", "custom_resource_definition.go",
"etcd_failure.go", "etcd_failure.go",
"framework.go", "framework.go",
@ -58,6 +59,7 @@ go_library(
"//vendor/k8s.io/client-go/kubernetes:go_default_library", "//vendor/k8s.io/client-go/kubernetes:go_default_library",
"//vendor/k8s.io/client-go/util/cert:go_default_library", "//vendor/k8s.io/client-go/util/cert:go_default_library",
"//vendor/k8s.io/client-go/util/retry:go_default_library", "//vendor/k8s.io/client-go/util/retry:go_default_library",
"//vendor/k8s.io/client-go/util/workqueue:go_default_library",
"//vendor/k8s.io/kube-aggregator/pkg/apis/apiregistration/v1beta1:go_default_library", "//vendor/k8s.io/kube-aggregator/pkg/apis/apiregistration/v1beta1:go_default_library",
"//vendor/k8s.io/sample-apiserver/pkg/apis/wardle/v1alpha1:go_default_library", "//vendor/k8s.io/sample-apiserver/pkg/apis/wardle/v1alpha1:go_default_library",
], ],

View File

@ -0,0 +1,99 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package apimachinery
import (
"fmt"
"math/rand"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/util/workqueue"
"k8s.io/kubernetes/test/e2e/framework"
)
const numberOfTotalResources = 400
var _ = SIGDescribe("Servers with support for API chunking", func() {
f := framework.NewDefaultFramework("chunking")
It("should return chunks of results for list calls", func() {
ns := f.Namespace.Name
c := f.ClientSet
client := c.Core().PodTemplates(ns)
By("creating a large number of resources")
workqueue.Parallelize(20, numberOfTotalResources, func(i int) {
for tries := 3; tries >= 0; tries-- {
_, err := client.Create(&v1.PodTemplate{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("template-%04d", i),
},
Template: v1.PodTemplateSpec{
Spec: v1.PodSpec{
Containers: []v1.Container{
{Name: "test", Image: "test2"},
},
},
},
})
if err == nil {
return
}
framework.Logf("Got an error creating template %d: %v", i, err)
}
Fail("Unable to create template %d, exiting", i)
})
By("retrieving those results in paged fashion several times")
for i := 0; i < 3; i++ {
opts := metav1.ListOptions{}
found := 0
var lastRV string
for {
opts.Limit = int64(rand.Int31n(numberOfTotalResources/10) + 1)
list, err := client.List(opts)
Expect(err).ToNot(HaveOccurred())
framework.Logf("Retrieved %d/%d results with rv %s and continue %s", len(list.Items), opts.Limit, list.ResourceVersion, list.Continue)
if len(lastRV) == 0 {
lastRV = list.ResourceVersion
}
if lastRV != list.ResourceVersion {
Expect(list.ResourceVersion).To(Equal(lastRV))
}
for _, item := range list.Items {
Expect(item.Name).To(Equal(fmt.Sprintf("template-%04d", found)))
found++
}
if len(list.Continue) == 0 {
break
}
opts.Continue = list.Continue
}
Expect(found).To(BeNumerically("==", numberOfTotalResources))
}
By("retrieving those results all at once")
list, err := client.List(metav1.ListOptions{Limit: numberOfTotalResources + 1})
Expect(err).ToNot(HaveOccurred())
Expect(list.Items).To(HaveLen(numberOfTotalResources))
})
})