From 113889e72da573946f3389b628251647c2a858b9 Mon Sep 17 00:00:00 2001 From: Clayton Coleman Date: Sat, 23 Sep 2017 15:21:54 -0400 Subject: [PATCH 1/5] Enable API chunking and promote to beta for 1.9 All list watchers default to using chunking. --- pkg/features/kube_features.go | 2 +- staging/src/k8s.io/apiserver/pkg/features/kube_features.go | 3 ++- staging/src/k8s.io/client-go/tools/cache/listwatch.go | 7 ++----- 3 files changed, 5 insertions(+), 7 deletions(-) diff --git a/pkg/features/kube_features.go b/pkg/features/kube_features.go index 35a90a1ac25..7033678cc69 100644 --- a/pkg/features/kube_features.go +++ b/pkg/features/kube_features.go @@ -201,7 +201,7 @@ var defaultKubernetesFeatureGates = map[utilfeature.Feature]utilfeature.FeatureS genericfeatures.AdvancedAuditing: {Default: true, PreRelease: utilfeature.Beta}, genericfeatures.APIResponseCompression: {Default: false, PreRelease: utilfeature.Alpha}, genericfeatures.Initializers: {Default: false, PreRelease: utilfeature.Alpha}, - genericfeatures.APIListChunking: {Default: false, PreRelease: utilfeature.Alpha}, + genericfeatures.APIListChunking: {Default: true, PreRelease: utilfeature.Beta}, // inherited features from apiextensions-apiserver, relisted here to get a conflict if it is changed // unintentionally on either side: diff --git a/staging/src/k8s.io/apiserver/pkg/features/kube_features.go b/staging/src/k8s.io/apiserver/pkg/features/kube_features.go index d5b52fd8758..57bab8b0024 100644 --- a/staging/src/k8s.io/apiserver/pkg/features/kube_features.go +++ b/staging/src/k8s.io/apiserver/pkg/features/kube_features.go @@ -58,6 +58,7 @@ const ( // owner: @smarterclayton // alpha: v1.8 + // beta: v1.9 // // Allow API clients to retrieve resource lists in chunks rather than // all at once. @@ -76,5 +77,5 @@ var defaultKubernetesFeatureGates = map[utilfeature.Feature]utilfeature.FeatureS AdvancedAuditing: {Default: true, PreRelease: utilfeature.Beta}, APIResponseCompression: {Default: false, PreRelease: utilfeature.Alpha}, Initializers: {Default: false, PreRelease: utilfeature.Alpha}, - APIListChunking: {Default: false, PreRelease: utilfeature.Alpha}, + APIListChunking: {Default: true, PreRelease: utilfeature.Beta}, } diff --git a/staging/src/k8s.io/client-go/tools/cache/listwatch.go b/staging/src/k8s.io/client-go/tools/cache/listwatch.go index 55a90b631d6..454d50aadc6 100644 --- a/staging/src/k8s.io/client-go/tools/cache/listwatch.go +++ b/staging/src/k8s.io/client-go/tools/cache/listwatch.go @@ -51,8 +51,7 @@ type WatchFunc func(options metav1.ListOptions) (watch.Interface, error) type ListWatch struct { ListFunc ListFunc WatchFunc WatchFunc - // DisableChunking requests no chunking for this list watcher. It has no effect in Kubernetes 1.8, but in - // 1.9 will allow a controller to opt out of chunking. + // DisableChunking requests no chunking for this list watcher. DisableChunking bool } @@ -93,9 +92,7 @@ func timeoutFromListOptions(options metav1.ListOptions) time.Duration { // List a set of apiserver resources func (lw *ListWatch) List(options metav1.ListOptions) (runtime.Object, error) { - // chunking will become the default for list watchers starting in Kubernetes 1.9, unless - // otherwise disabled. - if false && !lw.DisableChunking { + if !lw.DisableChunking { return pager.New(pager.SimplePageFunc(lw.ListFunc)).List(context.TODO(), options) } return lw.ListFunc(options) From da7124e5e5c0385dd5bcfc72ef035effc7708913 Mon Sep 17 00:00:00 2001 From: Clayton Coleman Date: Sun, 24 Sep 2017 18:06:57 -0400 Subject: [PATCH 2/5] Fill partial pages on the server rather than forcing client to The etcd3 storage now attempts to fill partial pages to prevent clients having to make more round trips (latency from server to etcd is lower than client to server). The server makes repeated requests to etcd of the current page size, then uses the filter function to eliminate any matches. After this change the apiserver will always return full pages, but we leave the language in place that clients must tolerate it. Reduces tail latency of large filtered lists, such as viewing pods assigned to a node. --- .../apiserver/pkg/storage/etcd3/store.go | 170 ++++++---- .../apiserver/pkg/storage/etcd3/store_test.go | 299 +++++++++++++++--- test/e2e/apimachinery/BUILD | 2 + test/e2e/apimachinery/chunking.go | 99 ++++++ 4 files changed, 464 insertions(+), 106 deletions(-) create mode 100644 test/e2e/apimachinery/chunking.go diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go index e5a67f043ec..1b4bf0f9fec 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go @@ -396,11 +396,11 @@ func (s *store) GetToList(ctx context.Context, key string, resourceVersion strin if err != nil { return storage.NewInternalError(err.Error()) } - elems := []*elemForDecode{{ - data: data, - rev: uint64(getResp.Kvs[0].ModRevision), - }} - if err := decodeList(elems, storage.SimpleFilter(pred), listPtr, s.codec, s.versioner); err != nil { + v, err := conversion.EnforcePtr(listPtr) + if err != nil || v.Kind() != reflect.Slice { + panic("need ptr to slice") + } + if err := appendListItem(v, data, uint64(getResp.Kvs[0].ModRevision), storage.SimpleFilter(pred), s.codec, s.versioner); err != nil { return err } // update version with cluster level revision @@ -472,7 +472,14 @@ func (s *store) List(ctx context.Context, key, resourceVersion string, pred stor if err != nil { return err } - key = path.Join(s.pathPrefix, key) + v, err := conversion.EnforcePtr(listPtr) + if err != nil || v.Kind() != reflect.Slice { + panic("need ptr to slice") + } + + if s.pathPrefix != "" { + key = path.Join(s.pathPrefix, key) + } // We need to make sure the key ended with "/" so that we only get children "directories". // e.g. if we have key "/a", "/a/b", "/ab", getting keys with prefix "/a" will return all three, // while with prefix "/a/" will return only "/a/b" which is the correct answer. @@ -481,81 +488,127 @@ func (s *store) List(ctx context.Context, key, resourceVersion string, pred stor } keyPrefix := key + filter := storage.SimpleFilter(pred) + // set the appropriate clientv3 options to filter the returned data set + var paging bool options := make([]clientv3.OpOption, 0, 4) if s.pagingEnabled && pred.Limit > 0 { + paging = true options = append(options, clientv3.WithLimit(pred.Limit)) } + var returnedRV int64 switch { case s.pagingEnabled && len(pred.Continue) > 0: continueKey, continueRV, err := decodeContinue(pred.Continue, keyPrefix) if err != nil { - return err + return apierrors.NewBadRequest(fmt.Sprintf("invalid continue token: %v", err)) } - options = append(options, clientv3.WithRange(clientv3.GetPrefixRangeEnd(key))) + if len(resourceVersion) > 0 && resourceVersion != "0" { + return apierrors.NewBadRequest("specifying resource version is not allowed when using continue") + } + + rangeEnd := clientv3.GetPrefixRangeEnd(keyPrefix) + options = append(options, clientv3.WithRange(rangeEnd)) key = continueKey options = append(options, clientv3.WithRev(continueRV)) returnedRV = continueRV - case len(resourceVersion) > 0: - fromRV, err := strconv.ParseInt(resourceVersion, 10, 64) - if err != nil { - return fmt.Errorf("invalid resource version: %v", err) + case s.pagingEnabled && pred.Limit > 0: + if len(resourceVersion) > 0 { + fromRV, err := strconv.ParseInt(resourceVersion, 10, 64) + if err != nil { + return apierrors.NewBadRequest(fmt.Sprintf("invalid resource version: %v", err)) + } + if fromRV > 0 { + options = append(options, clientv3.WithRev(fromRV)) + } + returnedRV = fromRV } - options = append(options, clientv3.WithPrefix(), clientv3.WithRev(fromRV)) - returnedRV = fromRV + rangeEnd := clientv3.GetPrefixRangeEnd(keyPrefix) + options = append(options, clientv3.WithRange(rangeEnd)) default: + if len(resourceVersion) > 0 { + fromRV, err := strconv.ParseInt(resourceVersion, 10, 64) + if err != nil { + return apierrors.NewBadRequest(fmt.Sprintf("invalid resource version: %v", err)) + } + if fromRV > 0 { + options = append(options, clientv3.WithRev(fromRV)) + } + returnedRV = fromRV + } + options = append(options, clientv3.WithPrefix()) } - getResp, err := s.client.KV.Get(ctx, key, options...) - if err != nil { - return interpretListError(err, len(pred.Continue) > 0) - } - - elems := make([]*elemForDecode, 0, len(getResp.Kvs)) - for _, kv := range getResp.Kvs { - data, _, err := s.transformer.TransformFromStorage(kv.Value, authenticatedDataString(kv.Key)) + // loop until we have filled the requested limit from etcd or there are no more results + var lastKey []byte + var hasMore bool + for { + getResp, err := s.client.KV.Get(ctx, key, options...) if err != nil { - utilruntime.HandleError(fmt.Errorf("unable to transform key %q: %v", kv.Key, err)) - continue + return interpretListError(err, len(pred.Continue) > 0) + } + hasMore = getResp.More + + if len(getResp.Kvs) == 0 && getResp.More { + return fmt.Errorf("no results were found, but etcd indicated there were more values remaining") } - elems = append(elems, &elemForDecode{ - data: data, - rev: uint64(kv.ModRevision), - }) + // take items from the response until the bucket is full, filtering as we go + for _, kv := range getResp.Kvs { + if paging && int64(v.Len()) >= pred.Limit { + hasMore = true + break + } + lastKey = kv.Key + + data, _, err := s.transformer.TransformFromStorage(kv.Value, authenticatedDataString(kv.Key)) + if err != nil { + utilruntime.HandleError(fmt.Errorf("unable to transform key %q: %v", kv.Key, err)) + continue + } + + if err := appendListItem(v, data, uint64(kv.ModRevision), filter, s.codec, s.versioner); err != nil { + return err + } + } + + // indicate to the client which resource version was returned + if returnedRV == 0 { + returnedRV = getResp.Header.Revision + } + + // no more results remain or we didn't request paging + if !hasMore || !paging { + break + } + // we're paging but we have filled our bucket + if int64(v.Len()) >= pred.Limit { + break + } + key = string(lastKey) + "\x00" } - if err := decodeList(elems, storage.SimpleFilter(pred), listPtr, s.codec, s.versioner); err != nil { - return err - } - - // indicate to the client which resource version was returned - if returnedRV == 0 { - returnedRV = getResp.Header.Revision - } - switch { - case !getResp.More: - // no continuation - return s.versioner.UpdateList(listObj, uint64(returnedRV), "") - case len(getResp.Kvs) == 0: - return fmt.Errorf("no results were found, but etcd indicated there were more values") - default: + // instruct the client to begin querying from immediately after the last key we returned + // we never return a key that the client wouldn't be allowed to see + if hasMore { // we want to start immediately after the last key - // TODO: this reveals info about certain keys - key := string(getResp.Kvs[len(getResp.Kvs)-1].Key) - next, err := encodeContinue(key+"\x00", keyPrefix, returnedRV) + next, err := encodeContinue(string(lastKey)+"\x00", keyPrefix, returnedRV) if err != nil { return err } return s.versioner.UpdateList(listObj, uint64(returnedRV), next) } + + // no continuation + return s.versioner.UpdateList(listObj, uint64(returnedRV), "") } // Watch implements storage.Interface.Watch. @@ -677,23 +730,16 @@ func decode(codec runtime.Codec, versioner storage.Versioner, value []byte, objP return nil } -// decodeList decodes a list of values into a list of objects, with resource version set to corresponding rev. -// On success, ListPtr would be set to the list of objects. -func decodeList(elems []*elemForDecode, filter storage.FilterFunc, listPtr interface{}, codec runtime.Codec, versioner storage.Versioner) error { - v, err := conversion.EnforcePtr(listPtr) - if err != nil || v.Kind() != reflect.Slice { - panic("need ptr to slice") +// appendListItem decodes and appends the object (if it passes filter) to v, which must be a slice. +func appendListItem(v reflect.Value, data []byte, rev uint64, filter storage.FilterFunc, codec runtime.Codec, versioner storage.Versioner) error { + obj, _, err := codec.Decode(data, nil, reflect.New(v.Type().Elem()).Interface().(runtime.Object)) + if err != nil { + return err } - for _, elem := range elems { - obj, _, err := codec.Decode(elem.data, nil, reflect.New(v.Type().Elem()).Interface().(runtime.Object)) - if err != nil { - return err - } - // being unable to set the version does not prevent the object from being extracted - versioner.UpdateObject(obj, elem.rev) - if filter(obj) { - v.Set(reflect.Append(v, reflect.ValueOf(obj).Elem())) - } + // being unable to set the version does not prevent the object from being extracted + versioner.UpdateObject(obj, rev) + if filter(obj) { + v.Set(reflect.Append(v, reflect.ValueOf(obj).Elem())) } return nil } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go index 6ee72431d9c..7e7320cfba8 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go @@ -727,25 +727,44 @@ func TestList(t *testing.T) { // | - test // | // - two-level/ - // - 1/ + // | - 1/ + // | | - test + // | | + // | - 2/ + // | - test + // | + // - z-level/ + // - 3/ // | - test // | - // - 2/ - // - test + // - 3/ + // - test-2 preset := []struct { key string obj *example.Pod storedObj *example.Pod - }{{ - key: "/one-level/test", - obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, - }, { - key: "/two-level/1/test", - obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, - }, { - key: "/two-level/2/test", - obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}}, - }} + }{ + { + key: "/one-level/test", + obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, + }, + { + key: "/two-level/1/test", + obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, + }, + { + key: "/two-level/2/test", + obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}}, + }, + { + key: "/z-level/3/test", + obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "fourth"}}, + }, + { + key: "/z-level/3/test-2", + obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}}, + }, + } for i, ps := range preset { preset[i].storedObj = &example.Pod{} @@ -763,110 +782,302 @@ func TestList(t *testing.T) { t.Fatal(err) } + getAttrs := func(obj runtime.Object) (labels.Set, fields.Set, bool, error) { + pod := obj.(*example.Pod) + return nil, fields.Set{"metadata.name": pod.Name}, pod.Initializers != nil, nil + } + tests := []struct { + name string disablePaging bool + rv string prefix string pred storage.SelectionPredicate expectedOut []*example.Pod expectContinue bool + expectError bool }{ - { // test List on existing key + { + name: "rejects invalid resource version", + prefix: "/", + pred: storage.Everything, + rv: "abc", + expectError: true, + }, + { + name: "rejects resource version and continue token", + prefix: "/", + pred: storage.SelectionPredicate{ + Label: labels.Everything(), + Field: fields.Everything(), + Limit: 1, + Continue: secondContinuation, + }, + rv: "1", + expectError: true, + }, + { + name: "test List on existing key", prefix: "/one-level/", pred: storage.Everything, expectedOut: []*example.Pod{preset[0].storedObj}, }, - { // test List on non-existing key + { + name: "test List on non-existing key", prefix: "/non-existing/", pred: storage.Everything, expectedOut: nil, }, - { // test List with pod name matching + { + name: "test List with pod name matching", prefix: "/one-level/", pred: storage.SelectionPredicate{ Label: labels.Everything(), - Field: fields.ParseSelectorOrDie("metadata.name!=" + preset[0].storedObj.Name), - GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, bool, error) { - pod := obj.(*example.Pod) - return nil, fields.Set{"metadata.name": pod.Name}, pod.Initializers != nil, nil - }, + Field: fields.ParseSelectorOrDie("metadata.name!=foo"), }, expectedOut: nil, }, - { // test List with limit + { + name: "test List with limit", prefix: "/two-level/", pred: storage.SelectionPredicate{ Label: labels.Everything(), Field: fields.Everything(), Limit: 1, - GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, bool, error) { - pod := obj.(*example.Pod) - return nil, fields.Set{"metadata.name": pod.Name}, pod.Initializers != nil, nil - }, }, expectedOut: []*example.Pod{preset[1].storedObj}, expectContinue: true, }, - { // test List with limit when paging disabled + { + name: "test List with limit when paging disabled", disablePaging: true, prefix: "/two-level/", pred: storage.SelectionPredicate{ Label: labels.Everything(), Field: fields.Everything(), Limit: 1, - GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, bool, error) { - pod := obj.(*example.Pod) - return nil, fields.Set{"metadata.name": pod.Name}, pod.Initializers != nil, nil - }, }, expectedOut: []*example.Pod{preset[1].storedObj, preset[2].storedObj}, expectContinue: false, }, - { // test List with pregenerated continue token + { + name: "test List with pregenerated continue token", prefix: "/two-level/", pred: storage.SelectionPredicate{ Label: labels.Everything(), Field: fields.Everything(), Limit: 1, Continue: secondContinuation, - GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, bool, error) { - pod := obj.(*example.Pod) - return nil, fields.Set{"metadata.name": pod.Name}, pod.Initializers != nil, nil - }, }, expectedOut: []*example.Pod{preset[2].storedObj}, }, - { // test List with multiple levels of directories and expect flattened result + { + name: "ignores resource version 0 for List with pregenerated continue token", + prefix: "/two-level/", + pred: storage.SelectionPredicate{ + Label: labels.Everything(), + Field: fields.Everything(), + Limit: 1, + Continue: secondContinuation, + }, + rv: "0", + expectedOut: []*example.Pod{preset[2].storedObj}, + }, + { + name: "test List with multiple levels of directories and expect flattened result", prefix: "/two-level/", pred: storage.Everything, expectedOut: []*example.Pod{preset[1].storedObj, preset[2].storedObj}, }, + { + name: "test List with filter returning only one item, ensure only a single page returned", + prefix: "/", + pred: storage.SelectionPredicate{ + Field: fields.OneTermEqualSelector("metadata.name", "fourth"), + Label: labels.Everything(), + Limit: 1, + }, + expectedOut: []*example.Pod{preset[3].storedObj}, + expectContinue: true, + }, + { + name: "test List with filter returning only one item, covers the entire list", + prefix: "/", + pred: storage.SelectionPredicate{ + Field: fields.OneTermEqualSelector("metadata.name", "fourth"), + Label: labels.Everything(), + Limit: 2, + }, + expectedOut: []*example.Pod{preset[3].storedObj}, + expectContinue: false, + }, + { + name: "test List with filter returning only one item, covers the entire list, with resource version 0", + prefix: "/", + pred: storage.SelectionPredicate{ + Field: fields.OneTermEqualSelector("metadata.name", "fourth"), + Label: labels.Everything(), + Limit: 2, + }, + rv: "0", + expectedOut: []*example.Pod{preset[3].storedObj}, + expectContinue: false, + }, + { + name: "test List with filter returning two items, more pages possible", + prefix: "/", + pred: storage.SelectionPredicate{ + Field: fields.OneTermEqualSelector("metadata.name", "foo"), + Label: labels.Everything(), + Limit: 2, + }, + expectContinue: true, + expectedOut: []*example.Pod{preset[0].storedObj, preset[1].storedObj}, + }, + { + name: "filter returns two items split across multiple pages", + prefix: "/", + pred: storage.SelectionPredicate{ + Field: fields.OneTermEqualSelector("metadata.name", "bar"), + Label: labels.Everything(), + Limit: 2, + }, + expectedOut: []*example.Pod{preset[2].storedObj, preset[4].storedObj}, + }, + { + name: "filter returns one item for last page, ends on last item, not full", + prefix: "/", + pred: storage.SelectionPredicate{ + Field: fields.OneTermEqualSelector("metadata.name", "bar"), + Label: labels.Everything(), + Limit: 2, + Continue: encodeContinueOrDie("meta.k8s.io/v1", int64(continueRV), "z-level/3"), + }, + expectedOut: []*example.Pod{preset[4].storedObj}, + }, + { + name: "filter returns one item for last page, starts on last item, full", + prefix: "/", + pred: storage.SelectionPredicate{ + Field: fields.OneTermEqualSelector("metadata.name", "bar"), + Label: labels.Everything(), + Limit: 1, + Continue: encodeContinueOrDie("meta.k8s.io/v1", int64(continueRV), "z-level/3/test-2"), + }, + expectedOut: []*example.Pod{preset[4].storedObj}, + }, + { + name: "filter returns one item for last page, starts on last item, partial page", + prefix: "/", + pred: storage.SelectionPredicate{ + Field: fields.OneTermEqualSelector("metadata.name", "bar"), + Label: labels.Everything(), + Limit: 2, + Continue: encodeContinueOrDie("meta.k8s.io/v1", int64(continueRV), "z-level/3/test-2"), + }, + expectedOut: []*example.Pod{preset[4].storedObj}, + }, + { + name: "filter returns two items, page size equal to total list size", + prefix: "/", + pred: storage.SelectionPredicate{ + Field: fields.OneTermEqualSelector("metadata.name", "bar"), + Label: labels.Everything(), + Limit: 5, + }, + expectedOut: []*example.Pod{preset[2].storedObj, preset[4].storedObj}, + }, + { + name: "filter returns one item, page size equal to total list size", + prefix: "/", + pred: storage.SelectionPredicate{ + Field: fields.OneTermEqualSelector("metadata.name", "fourth"), + Label: labels.Everything(), + Limit: 5, + }, + expectedOut: []*example.Pod{preset[3].storedObj}, + }, } - for i, tt := range tests { + for _, tt := range tests { + if tt.pred.GetAttrs == nil { + tt.pred.GetAttrs = getAttrs + } + out := &example.PodList{} var err error if tt.disablePaging { - err = disablePagingStore.List(ctx, tt.prefix, "0", tt.pred, out) + err = disablePagingStore.List(ctx, tt.prefix, tt.rv, tt.pred, out) } else { - err = store.List(ctx, tt.prefix, "0", tt.pred, out) + err = store.List(ctx, tt.prefix, tt.rv, tt.pred, out) + } + if (err != nil) != tt.expectError { + t.Errorf("(%s): List failed: %v", tt.name, err) } if err != nil { - t.Fatalf("#%d: List failed: %v", i, err) + continue } if (len(out.Continue) > 0) != tt.expectContinue { - t.Errorf("#%d: unexpected continue token: %v", i, out.Continue) + t.Errorf("(%s): unexpected continue token: %q", tt.name, out.Continue) } if len(tt.expectedOut) != len(out.Items) { - t.Errorf("#%d: length of list want=%d, get=%d", i, len(tt.expectedOut), len(out.Items)) + t.Errorf("(%s): length of list want=%d, got=%d", tt.name, len(tt.expectedOut), len(out.Items)) continue } for j, wantPod := range tt.expectedOut { getPod := &out.Items[j] if !reflect.DeepEqual(wantPod, getPod) { - t.Errorf("#%d: pod want=%#v, get=%#v", i, wantPod, getPod) + t.Errorf("(%s): pod want=%#v, got=%#v", tt.name, wantPod, getPod) } } } +} + +func TestListContinuation(t *testing.T) { + codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) + cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + defer cluster.Terminate(t) + store := newStore(cluster.RandClient(), false, true, codec, "", prefixTransformer{prefix: []byte(defaultTestPrefix)}) + ctx := context.Background() + + // Setup storage with the following structure: + // / + // - one-level/ + // | - test + // | + // - two-level/ + // - 1/ + // | - test + // | + // - 2/ + // - test + // + preset := []struct { + key string + obj *example.Pod + storedObj *example.Pod + }{ + { + key: "/one-level/test", + obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, + }, + { + key: "/two-level/1/test", + obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, + }, + { + key: "/two-level/2/test", + obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}}, + }, + } + + for i, ps := range preset { + preset[i].storedObj = &example.Pod{} + err := store.Create(ctx, ps.key, ps.obj, preset[i].storedObj, 0) + if err != nil { + t.Fatalf("Set failed: %v", err) + } + } // test continuations out := &example.PodList{} diff --git a/test/e2e/apimachinery/BUILD b/test/e2e/apimachinery/BUILD index 115fcf5c7cb..a96b152e380 100644 --- a/test/e2e/apimachinery/BUILD +++ b/test/e2e/apimachinery/BUILD @@ -9,6 +9,7 @@ go_library( name = "go_default_library", srcs = [ "aggregator.go", + "chunking.go", "custom_resource_definition.go", "etcd_failure.go", "framework.go", @@ -58,6 +59,7 @@ go_library( "//vendor/k8s.io/client-go/kubernetes:go_default_library", "//vendor/k8s.io/client-go/util/cert:go_default_library", "//vendor/k8s.io/client-go/util/retry:go_default_library", + "//vendor/k8s.io/client-go/util/workqueue:go_default_library", "//vendor/k8s.io/kube-aggregator/pkg/apis/apiregistration/v1beta1:go_default_library", "//vendor/k8s.io/sample-apiserver/pkg/apis/wardle/v1alpha1:go_default_library", ], diff --git a/test/e2e/apimachinery/chunking.go b/test/e2e/apimachinery/chunking.go new file mode 100644 index 00000000000..3892ad353be --- /dev/null +++ b/test/e2e/apimachinery/chunking.go @@ -0,0 +1,99 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package apimachinery + +import ( + "fmt" + "math/rand" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + + "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/util/workqueue" + "k8s.io/kubernetes/test/e2e/framework" +) + +const numberOfTotalResources = 400 + +var _ = SIGDescribe("Servers with support for API chunking", func() { + f := framework.NewDefaultFramework("chunking") + + It("should return chunks of results for list calls", func() { + ns := f.Namespace.Name + c := f.ClientSet + client := c.Core().PodTemplates(ns) + + By("creating a large number of resources") + workqueue.Parallelize(20, numberOfTotalResources, func(i int) { + for tries := 3; tries >= 0; tries-- { + _, err := client.Create(&v1.PodTemplate{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("template-%04d", i), + }, + Template: v1.PodTemplateSpec{ + Spec: v1.PodSpec{ + Containers: []v1.Container{ + {Name: "test", Image: "test2"}, + }, + }, + }, + }) + if err == nil { + return + } + framework.Logf("Got an error creating template %d: %v", i, err) + } + Fail("Unable to create template %d, exiting", i) + }) + + By("retrieving those results in paged fashion several times") + for i := 0; i < 3; i++ { + opts := metav1.ListOptions{} + found := 0 + var lastRV string + for { + opts.Limit = int64(rand.Int31n(numberOfTotalResources/10) + 1) + list, err := client.List(opts) + Expect(err).ToNot(HaveOccurred()) + framework.Logf("Retrieved %d/%d results with rv %s and continue %s", len(list.Items), opts.Limit, list.ResourceVersion, list.Continue) + + if len(lastRV) == 0 { + lastRV = list.ResourceVersion + } + if lastRV != list.ResourceVersion { + Expect(list.ResourceVersion).To(Equal(lastRV)) + } + for _, item := range list.Items { + Expect(item.Name).To(Equal(fmt.Sprintf("template-%04d", found))) + found++ + } + if len(list.Continue) == 0 { + break + } + opts.Continue = list.Continue + } + Expect(found).To(BeNumerically("==", numberOfTotalResources)) + } + + By("retrieving those results all at once") + list, err := client.List(metav1.ListOptions{Limit: numberOfTotalResources + 1}) + Expect(err).ToNot(HaveOccurred()) + Expect(list.Items).To(HaveLen(numberOfTotalResources)) + }) +}) From ac8808b792f624fa04aa7c589bd5aca1b9afde39 Mon Sep 17 00:00:00 2001 From: Clayton Coleman Date: Sun, 24 Sep 2017 18:09:54 -0400 Subject: [PATCH 3/5] Promote continuation token schema to v1 Change the filtering logic to require a leading path and clean that afterwards. --- .../k8s.io/apiserver/pkg/storage/etcd3/BUILD | 1 + .../apiserver/pkg/storage/etcd3/store.go | 21 ++++++++++--------- .../apiserver/pkg/storage/etcd3/store_test.go | 16 ++++++++------ 3 files changed, 22 insertions(+), 16 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/BUILD b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/BUILD index ad5b06d819c..2a6ce84be24 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/BUILD +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/BUILD @@ -18,6 +18,7 @@ go_test( "//vendor/github.com/coreos/etcd/clientv3:go_default_library", "//vendor/github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes:go_default_library", "//vendor/github.com/coreos/etcd/integration:go_default_library", + "//vendor/github.com/coreos/pkg/capnslog:go_default_library", "//vendor/golang.org/x/net/context:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/testing:go_default_library", diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go index 1b4bf0f9fec..df9917ed61c 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go @@ -429,25 +429,26 @@ func decodeContinue(continueValue, keyPrefix string) (fromKey string, rv int64, return "", 0, fmt.Errorf("continue key is not valid: %v", err) } switch c.APIVersion { - case "v1alpha1": + case "meta.k8s.io/v1": if c.ResourceVersion == 0 { - return "", 0, fmt.Errorf("continue key is not valid: incorrect encoded start resourceVersion (version v1alpha1)") + return "", 0, fmt.Errorf("continue key is not valid: incorrect encoded start resourceVersion (version meta.k8s.io/v1)") } if len(c.StartKey) == 0 { - return "", 0, fmt.Errorf("continue key is not valid: encoded start key empty (version v1alpha1)") + return "", 0, fmt.Errorf("continue key is not valid: encoded start key empty (version meta.k8s.io/v1)") } // defend against path traversal attacks by clients - path.Clean will ensure that startKey cannot // be at a higher level of the hierarchy, and so when we append the key prefix we will end up with // continue start key that is fully qualified and cannot range over anything less specific than // keyPrefix. - cleaned := path.Clean(c.StartKey) - if cleaned != c.StartKey || cleaned == "." || cleaned == "/" { - return "", 0, fmt.Errorf("continue key is not valid: %s", cleaned) + key := c.StartKey + if !strings.HasPrefix(key, "/") { + key = "/" + key } - if len(cleaned) == 0 { - return "", 0, fmt.Errorf("continue key is not valid: encoded start key empty (version 0)") + cleaned := path.Clean(key) + if cleaned != key { + return "", 0, fmt.Errorf("continue key is not valid: %s", c.StartKey) } - return keyPrefix + cleaned, c.ResourceVersion, nil + return keyPrefix + cleaned[1:], c.ResourceVersion, nil default: return "", 0, fmt.Errorf("continue key is not valid: server does not recognize this encoded version %q", c.APIVersion) } @@ -459,7 +460,7 @@ func encodeContinue(key, keyPrefix string, resourceVersion int64) (string, error if nextKey == key { return "", fmt.Errorf("unable to encode next field: the key and key prefix do not match") } - out, err := json.Marshal(&continueToken{APIVersion: "v1alpha1", ResourceVersion: resourceVersion, StartKey: nextKey}) + out, err := json.Marshal(&continueToken{APIVersion: "meta.k8s.io/v1", ResourceVersion: resourceVersion, StartKey: nextKey}) if err != nil { return "", err } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go index 7e7320cfba8..8b36b611096 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go @@ -29,7 +29,9 @@ import ( "github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/integration" + "github.com/coreos/pkg/capnslog" "golang.org/x/net/context" + apierrors "k8s.io/apimachinery/pkg/api/errors" apitesting "k8s.io/apimachinery/pkg/api/testing" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -55,6 +57,8 @@ func init() { metav1.AddToGroupVersion(scheme, metav1.SchemeGroupVersion) example.AddToScheme(scheme) examplev1.AddToScheme(scheme) + + capnslog.SetGlobalLogLevel(capnslog.CRITICAL) } // prefixTransformer adds and verifies that all data has the correct prefix on its way in and out. @@ -1205,16 +1209,16 @@ func Test_decodeContinue(t *testing.T) { wantRv int64 wantErr bool }{ - {name: "valid", args: args{continueValue: encodeContinueOrDie("v1alpha1", 1, "key"), keyPrefix: "/test/"}, wantRv: 1, wantFromKey: "/test/key"}, + {name: "valid", args: args{continueValue: encodeContinueOrDie("meta.k8s.io/v1", 1, "key"), keyPrefix: "/test/"}, wantRv: 1, wantFromKey: "/test/key"}, + {name: "root path", args: args{continueValue: encodeContinueOrDie("meta.k8s.io/v1", 1, "/"), keyPrefix: "/test/"}, wantRv: 1, wantFromKey: "/test/"}, {name: "empty version", args: args{continueValue: encodeContinueOrDie("", 1, "key"), keyPrefix: "/test/"}, wantErr: true}, {name: "invalid version", args: args{continueValue: encodeContinueOrDie("v1", 1, "key"), keyPrefix: "/test/"}, wantErr: true}, - {name: "path traversal - parent", args: args{continueValue: encodeContinueOrDie("v1alpha", 1, "../key"), keyPrefix: "/test/"}, wantErr: true}, - {name: "path traversal - local", args: args{continueValue: encodeContinueOrDie("v1alpha", 1, "./key"), keyPrefix: "/test/"}, wantErr: true}, - {name: "path traversal - double parent", args: args{continueValue: encodeContinueOrDie("v1alpha", 1, "./../key"), keyPrefix: "/test/"}, wantErr: true}, - {name: "path traversal - after parent", args: args{continueValue: encodeContinueOrDie("v1alpha", 1, "key/../.."), keyPrefix: "/test/"}, wantErr: true}, - {name: "path traversal - separator", args: args{continueValue: encodeContinueOrDie("v1alpha", 1, "/"), keyPrefix: "/test/"}, wantErr: true}, + {name: "path traversal - parent", args: args{continueValue: encodeContinueOrDie("meta.k8s.io/v1", 1, "../key"), keyPrefix: "/test/"}, wantErr: true}, + {name: "path traversal - local", args: args{continueValue: encodeContinueOrDie("meta.k8s.io/v1", 1, "./key"), keyPrefix: "/test/"}, wantErr: true}, + {name: "path traversal - double parent", args: args{continueValue: encodeContinueOrDie("meta.k8s.io/v1", 1, "./../key"), keyPrefix: "/test/"}, wantErr: true}, + {name: "path traversal - after parent", args: args{continueValue: encodeContinueOrDie("meta.k8s.io/v1", 1, "key/../.."), keyPrefix: "/test/"}, wantErr: true}, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { From 6a76931e2cfd53991a2da20795b929fcaf7877e7 Mon Sep 17 00:00:00 2001 From: Clayton Coleman Date: Mon, 25 Sep 2017 14:22:25 -0400 Subject: [PATCH 4/5] Use watch cache when rv=0 even when limit is set --- staging/src/k8s.io/apiserver/pkg/storage/cacher.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher.go index d2f39030a1b..45c94d2ebda 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher.go @@ -473,10 +473,15 @@ func (c *Cacher) GetToList(ctx context.Context, key string, resourceVersion stri // Implements storage.Interface. func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, pred SelectionPredicate, listObj runtime.Object) error { pagingEnabled := utilfeature.DefaultFeatureGate.Enabled(features.APIListChunking) - if resourceVersion == "" || (pagingEnabled && (len(pred.Continue) > 0 || pred.Limit > 0)) { + hasContinuation := pagingEnabled && len(pred.Continue) > 0 + hasLimit := pagingEnabled && pred.Limit > 0 && resourceVersion != "0" + if resourceVersion == "" || hasContinuation || hasLimit { // If resourceVersion is not specified, serve it from underlying - // storage (for backward compatibility). If a continuation or limit is + // storage (for backward compatibility). If a continuation is // requested, serve it from the underlying storage as well. + // Limits are only sent to storage when resourceVersion is non-zero + // since the watch cache isn't able to perform continuations, and + // limits are ignored when resource version is zero. return c.storage.List(ctx, key, resourceVersion, pred, listObj) } From ce0dc76901bd1ce36ca20c5cf96b89088d0e95a2 Mon Sep 17 00:00:00 2001 From: Clayton Coleman Date: Mon, 9 Oct 2017 22:16:13 -0400 Subject: [PATCH 5/5] Avoid intermediate List allocations as items added to the list Pick a reasonable middle ground between allocating larger chunks of memory (2048 * ~500b for pod slices) and having many small allocations as the list is resized by preallocating capacity based on the expected list size. At worst, we'll allocate a 1M slice for pods and only add a single pod to it (if the selector is very specific). --- .../apiserver/pkg/storage/etcd3/store.go | 39 +++++++++++ .../apiserver/pkg/storage/etcd3/store_test.go | 66 ++++++++++++++++++- 2 files changed, 104 insertions(+), 1 deletion(-) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go index df9917ed61c..b0acc840165 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go @@ -562,6 +562,14 @@ func (s *store) List(ctx context.Context, key, resourceVersion string, pred stor return fmt.Errorf("no results were found, but etcd indicated there were more values remaining") } + // avoid small allocations for the result slice, since this can be called in many + // different contexts and we don't know how significantly the result will be filtered + if pred.Empty() { + growSlice(v, len(getResp.Kvs)) + } else { + growSlice(v, 2048, len(getResp.Kvs)) + } + // take items from the response until the bucket is full, filtering as we go for _, kv := range getResp.Kvs { if paging && int64(v.Len()) >= pred.Limit { @@ -612,6 +620,37 @@ func (s *store) List(ctx context.Context, key, resourceVersion string, pred stor return s.versioner.UpdateList(listObj, uint64(returnedRV), "") } +// growSlice takes a slice value and grows its capacity up +// to the maximum of the passed sizes or maxCapacity, whichever +// is smaller. Above maxCapacity decisions about allocation are left +// to the Go runtime on append. This allows a caller to make an +// educated guess about the potential size of the total list while +// still avoiding overly aggressive initial allocation. If sizes +// is empty maxCapacity will be used as the size to grow. +func growSlice(v reflect.Value, maxCapacity int, sizes ...int) { + cap := v.Cap() + max := cap + for _, size := range sizes { + if size > max { + max = size + } + } + if len(sizes) == 0 || max > maxCapacity { + max = maxCapacity + } + if max <= cap { + return + } + if v.Len() > 0 { + extra := reflect.MakeSlice(v.Type(), 0, max) + reflect.Copy(extra, v) + v.Set(extra) + } else { + extra := reflect.MakeSlice(v.Type(), 0, max) + v.Set(extra) + } +} + // Watch implements storage.Interface.Watch. func (s *store) Watch(ctx context.Context, key string, resourceVersion string, pred storage.SelectionPredicate) (watch.Interface, error) { return s.watch(ctx, key, resourceVersion, pred, false) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go index 8b36b611096..d7c7f0f10d3 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go @@ -31,7 +31,6 @@ import ( "github.com/coreos/etcd/integration" "github.com/coreos/pkg/capnslog" "golang.org/x/net/context" - apierrors "k8s.io/apimachinery/pkg/api/errors" apitesting "k8s.io/apimachinery/pkg/api/testing" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -1236,3 +1235,68 @@ func Test_decodeContinue(t *testing.T) { }) } } + +func Test_growSlice(t *testing.T) { + type args struct { + t reflect.Type + initialCapacity int + v reflect.Value + maxCapacity int + sizes []int + } + tests := []struct { + name string + args args + cap int + }{ + { + name: "empty", + args: args{v: reflect.ValueOf([]example.Pod{})}, + cap: 0, + }, + { + name: "no sizes", + args: args{v: reflect.ValueOf([]example.Pod{}), maxCapacity: 10}, + cap: 10, + }, + { + name: "above maxCapacity", + args: args{v: reflect.ValueOf([]example.Pod{}), maxCapacity: 10, sizes: []int{1, 12}}, + cap: 10, + }, + { + name: "takes max", + args: args{v: reflect.ValueOf([]example.Pod{}), maxCapacity: 10, sizes: []int{8, 4}}, + cap: 8, + }, + { + name: "with existing capacity above max", + args: args{initialCapacity: 12, maxCapacity: 10, sizes: []int{8, 4}}, + cap: 12, + }, + { + name: "with existing capacity below max", + args: args{initialCapacity: 5, maxCapacity: 10, sizes: []int{8, 4}}, + cap: 8, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if tt.args.initialCapacity > 0 { + tt.args.v = reflect.ValueOf(make([]example.Pod, 0, tt.args.initialCapacity)) + } + // reflection requires that the value be addressible in order to call set, + // so we must ensure the value we created is available on the heap (not a problem + // for normal usage) + if !tt.args.v.CanAddr() { + x := reflect.New(tt.args.v.Type()) + x.Elem().Set(tt.args.v) + tt.args.v = x.Elem() + } + growSlice(tt.args.v, tt.args.maxCapacity, tt.args.sizes...) + if tt.cap != tt.args.v.Cap() { + t.Errorf("Unexpected capacity: got=%d want=%d", tt.args.v.Cap(), tt.cap) + } + }) + } +}