From 8952a0cb722b77459cf2701632a30f5b264f5aba Mon Sep 17 00:00:00 2001 From: Clayton Coleman Date: Thu, 10 Aug 2017 22:31:51 -0400 Subject: [PATCH] Server side implementation of paging for etcd3 Add a feature gate in the apiserver to control whether paging can be used. Add controls to the storage factory that allow it to be disabled per resource. Use a JSON encoded continuation token that can be versioned. Create a 410 error if the continuation token is expired. Adds GetContinue() to ListMeta. --- .../garbagecollector/garbagecollector_test.go | 1 + pkg/features/kube_features.go | 1 + .../apimachinery/pkg/api/errors/errors.go | 27 ++ .../apimachinery/pkg/apis/meta/v1/meta.go | 4 + .../apis/meta/v1/unstructured/unstructured.go | 16 ++ .../pkg/test/api_meta_meta_test.go | 3 + .../apiserver/pkg/features/kube_features.go | 8 + .../pkg/registry/generic/registry/store.go | 2 + .../apiserver/pkg/server/options/etcd.go | 2 + .../k8s.io/apiserver/pkg/server/storage/BUILD | 2 + .../pkg/server/storage/storage_factory.go | 16 ++ .../src/k8s.io/apiserver/pkg/storage/BUILD | 2 + .../k8s.io/apiserver/pkg/storage/cacher.go | 18 +- .../pkg/storage/etcd/api_object_versioner.go | 3 +- .../apiserver/pkg/storage/etcd/etcd_helper.go | 4 +- .../pkg/storage/etcd/testing/utils.go | 1 + .../k8s.io/apiserver/pkg/storage/etcd3/BUILD | 3 +- .../apiserver/pkg/storage/etcd3/errors.go | 42 +++ .../apiserver/pkg/storage/etcd3/store.go | 164 ++++++++++-- .../apiserver/pkg/storage/etcd3/store_test.go | 251 +++++++++++++++--- .../apiserver/pkg/storage/etcd3/watcher.go | 31 +-- .../pkg/storage/etcd3/watcher_test.go | 4 +- .../apiserver/pkg/storage/interfaces.go | 5 +- .../pkg/storage/selection_predicate.go | 5 + .../pkg/storage/storagebackend/config.go | 5 + .../storage/storagebackend/factory/etcd3.go | 4 +- .../pkg/storage/tests/cacher_test.go | 2 +- .../src/k8s.io/client-go/tools/pager/pager.go | 11 + .../k8s.io/code-generator/Godeps/Godeps.json | 4 + 29 files changed, 542 insertions(+), 99 deletions(-) create mode 100644 staging/src/k8s.io/apiserver/pkg/storage/etcd3/errors.go diff --git a/pkg/controller/garbagecollector/garbagecollector_test.go b/pkg/controller/garbagecollector/garbagecollector_test.go index fa4d53a8d04..0d25e4f2a5c 100644 --- a/pkg/controller/garbagecollector/garbagecollector_test.go +++ b/pkg/controller/garbagecollector/garbagecollector_test.go @@ -420,6 +420,7 @@ func TestGCListWatcher(t *testing.T) { t.Fatal(err) } lw := listWatcher(client, podResource) + lw.DisablePaging = true if _, err := lw.Watch(metav1.ListOptions{ResourceVersion: "1"}); err != nil { t.Fatal(err) } diff --git a/pkg/features/kube_features.go b/pkg/features/kube_features.go index e369e422b89..655ee252eb5 100644 --- a/pkg/features/kube_features.go +++ b/pkg/features/kube_features.go @@ -178,6 +178,7 @@ var defaultKubernetesFeatureGates = map[utilfeature.Feature]utilfeature.FeatureS genericfeatures.AdvancedAuditing: {Default: false, PreRelease: utilfeature.Alpha}, genericfeatures.APIResponseCompression: {Default: false, PreRelease: utilfeature.Alpha}, genericfeatures.Initializers: {Default: false, PreRelease: utilfeature.Alpha}, + genericfeatures.APIListChunking: {Default: false, PreRelease: utilfeature.Alpha}, // inherited features from apiextensions-apiserver, relisted here to get a conflict if it is changed // unintentionally on either side: diff --git a/staging/src/k8s.io/apimachinery/pkg/api/errors/errors.go b/staging/src/k8s.io/apimachinery/pkg/api/errors/errors.go index 98160227024..7f1e50b8280 100644 --- a/staging/src/k8s.io/apimachinery/pkg/api/errors/errors.go +++ b/staging/src/k8s.io/apimachinery/pkg/api/errors/errors.go @@ -174,6 +174,17 @@ func NewGone(message string) *StatusError { }} } +// NewResourceExpired creates an error that indicates that the requested resource content has expired from +// the server (usually due to a resourceVersion that is too old). +func NewResourceExpired(message string) *StatusError { + return &StatusError{metav1.Status{ + Status: metav1.StatusFailure, + Code: http.StatusGone, + Reason: metav1.StatusReasonExpired, + Message: message, + }} +} + // NewInvalid returns an error indicating the item is invalid and cannot be processed. func NewInvalid(qualifiedKind schema.GroupKind, name string, errs field.ErrorList) *StatusError { causes := make([]metav1.StatusCause, 0, len(errs)) @@ -394,12 +405,28 @@ func IsInvalid(err error) bool { return reasonForError(err) == metav1.StatusReasonInvalid } +// IsGone is true if the error indicates the requested resource is no longer available. +func IsGone(err error) bool { + return reasonForError(err) == metav1.StatusReasonGone +} + +// IsResourceExpired is true if the error indicates the resource has expired and the current action is +// no longer possible. +func IsResourceExpired(err error) bool { + return reasonForError(err) == metav1.StatusReasonExpired +} + // IsMethodNotSupported determines if the err is an error which indicates the provided action could not // be performed because it is not supported by the server. func IsMethodNotSupported(err error) bool { return reasonForError(err) == metav1.StatusReasonMethodNotAllowed } +// IsServiceUnavailable is true if the error indicates the underlying service is no longer available. +func IsServiceUnavailable(err error) bool { + return reasonForError(err) == metav1.StatusReasonServiceUnavailable +} + // IsBadRequest determines if err is an error which indicates that the request is invalid. func IsBadRequest(err error) bool { return reasonForError(err) == metav1.StatusReasonBadRequest diff --git a/staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/meta.go b/staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/meta.go index 5eccffcc7ec..c13fe4af8e0 100644 --- a/staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/meta.go +++ b/staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/meta.go @@ -90,6 +90,8 @@ type ListInterface interface { SetResourceVersion(version string) GetSelfLink() string SetSelfLink(selfLink string) + GetContinue() string + SetContinue(c string) } // Type exposes the type and APIVersion of versioned or internal API objects. @@ -105,6 +107,8 @@ func (meta *ListMeta) GetResourceVersion() string { return meta.ResourceV func (meta *ListMeta) SetResourceVersion(version string) { meta.ResourceVersion = version } func (meta *ListMeta) GetSelfLink() string { return meta.SelfLink } func (meta *ListMeta) SetSelfLink(selfLink string) { meta.SelfLink = selfLink } +func (meta *ListMeta) GetContinue() string { return meta.Continue } +func (meta *ListMeta) SetContinue(c string) { meta.Continue = c } func (obj *TypeMeta) GetObjectKind() schema.ObjectKind { return obj } diff --git a/staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured/unstructured.go b/staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured/unstructured.go index d248c4aa610..2b991a8245b 100644 --- a/staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured/unstructured.go +++ b/staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured/unstructured.go @@ -468,6 +468,14 @@ func (u *Unstructured) SetSelfLink(selfLink string) { u.setNestedField(selfLink, "metadata", "selfLink") } +func (u *Unstructured) GetContinue() string { + return getNestedString(u.Object, "metadata", "continue") +} + +func (u *Unstructured) SetContinue(c string) { + u.setNestedField(c, "metadata", "continue") +} + func (u *Unstructured) GetCreationTimestamp() metav1.Time { var timestamp metav1.Time timestamp.UnmarshalQueryParameter(getNestedString(u.Object, "metadata", "creationTimestamp")) @@ -652,6 +660,14 @@ func (u *UnstructuredList) SetSelfLink(selfLink string) { u.setNestedField(selfLink, "metadata", "selfLink") } +func (u *UnstructuredList) GetContinue() string { + return getNestedString(u.Object, "metadata", "continue") +} + +func (u *UnstructuredList) SetContinue(c string) { + u.setNestedField(c, "metadata", "continue") +} + func (u *UnstructuredList) SetGroupVersionKind(gvk schema.GroupVersionKind) { u.SetAPIVersion(gvk.GroupVersion().String()) u.SetKind(gvk.Kind) diff --git a/staging/src/k8s.io/apimachinery/pkg/test/api_meta_meta_test.go b/staging/src/k8s.io/apimachinery/pkg/test/api_meta_meta_test.go index 68a40f53bf8..9da81d6d5c1 100644 --- a/staging/src/k8s.io/apimachinery/pkg/test/api_meta_meta_test.go +++ b/staging/src/k8s.io/apimachinery/pkg/test/api_meta_meta_test.go @@ -199,6 +199,7 @@ type InternalTypeMeta struct { CreationTimestamp metav1.Time `json:"creationTimestamp,omitempty"` SelfLink string `json:"selfLink,omitempty"` ResourceVersion string `json:"resourceVersion,omitempty"` + Continue string `json:"next,omitempty"` APIVersion string `json:"apiVersion,omitempty"` Labels map[string]string `json:"labels,omitempty"` Annotations map[string]string `json:"annotations,omitempty"` @@ -210,6 +211,8 @@ func (m *InternalTypeMeta) GetResourceVersion() string { return m.ResourceVers func (m *InternalTypeMeta) SetResourceVersion(rv string) { m.ResourceVersion = rv } func (m *InternalTypeMeta) GetSelfLink() string { return m.SelfLink } func (m *InternalTypeMeta) SetSelfLink(link string) { m.SelfLink = link } +func (m *InternalTypeMeta) GetContinue() string { return m.Continue } +func (m *InternalTypeMeta) SetContinue(c string) { m.Continue = c } type MyAPIObject struct { TypeMeta InternalTypeMeta `json:",inline"` diff --git a/staging/src/k8s.io/apiserver/pkg/features/kube_features.go b/staging/src/k8s.io/apiserver/pkg/features/kube_features.go index f874cd90740..c2153184a61 100644 --- a/staging/src/k8s.io/apiserver/pkg/features/kube_features.go +++ b/staging/src/k8s.io/apiserver/pkg/features/kube_features.go @@ -54,6 +54,13 @@ const ( // Allow asynchronous coordination of object creation. // Auto-enabled by the Initializers admission plugin. Initializers utilfeature.Feature = "Initializers" + + // owner: @smarterclayton + // alpha: v1.8 + // + // Allow API clients to retrieve resource lists in chunks rather than + // all at once. + APIListChunking utilfeature.Feature = "APIListChunking" ) func init() { @@ -68,4 +75,5 @@ var defaultKubernetesFeatureGates = map[utilfeature.Feature]utilfeature.FeatureS AdvancedAuditing: {Default: false, PreRelease: utilfeature.Alpha}, APIResponseCompression: {Default: false, PreRelease: utilfeature.Alpha}, Initializers: {Default: false, PreRelease: utilfeature.Alpha}, + APIListChunking: {Default: false, PreRelease: utilfeature.Alpha}, } diff --git a/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go b/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go index 0b1ddc8c8c6..da15f299c05 100644 --- a/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go +++ b/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go @@ -293,6 +293,8 @@ func (e *Store) ListPredicate(ctx genericapirequest.Context, p storage.Selection options = &metainternalversion.ListOptions{ResourceVersion: ""} } p.IncludeUninitialized = options.IncludeUninitialized + p.Limit = options.Limit + p.Continue = options.Continue list := e.NewListFunc() qualifiedResource := e.qualifiedResourceFromContext(ctx) if name, ok := p.MatchesSingle(); ok { diff --git a/staging/src/k8s.io/apiserver/pkg/server/options/etcd.go b/staging/src/k8s.io/apiserver/pkg/server/options/etcd.go index 16e2626de48..0a222ac79bc 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/options/etcd.go +++ b/staging/src/k8s.io/apiserver/pkg/server/options/etcd.go @@ -34,6 +34,8 @@ import ( ) type EtcdOptions struct { + // The value of Paging on StorageConfig will be overriden by the + // calculated feature gate value. StorageConfig storagebackend.Config EncryptionProviderConfigFilepath string diff --git a/staging/src/k8s.io/apiserver/pkg/server/storage/BUILD b/staging/src/k8s.io/apiserver/pkg/server/storage/BUILD index 18b12853c09..9f9194d76f5 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/storage/BUILD +++ b/staging/src/k8s.io/apiserver/pkg/server/storage/BUILD @@ -43,8 +43,10 @@ go_library( "//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime/serializer/recognizer:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library", + "//vendor/k8s.io/apiserver/pkg/features:go_default_library", "//vendor/k8s.io/apiserver/pkg/storage/storagebackend:go_default_library", "//vendor/k8s.io/apiserver/pkg/storage/value:go_default_library", + "//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library", ], ) diff --git a/staging/src/k8s.io/apiserver/pkg/server/storage/storage_factory.go b/staging/src/k8s.io/apiserver/pkg/server/storage/storage_factory.go index 2ceff3b6121..a540f0440d4 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/storage/storage_factory.go +++ b/staging/src/k8s.io/apiserver/pkg/server/storage/storage_factory.go @@ -27,8 +27,10 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apiserver/pkg/features" "k8s.io/apiserver/pkg/storage/storagebackend" "k8s.io/apiserver/pkg/storage/value" + utilfeature "k8s.io/apiserver/pkg/util/feature" ) // Backend describes the storage servers, the information here should be enough @@ -112,6 +114,8 @@ type groupResourceOverrides struct { decoderDecoratorFn func([]runtime.Decoder) []runtime.Decoder // transformer is optional and shall encrypt that resource at rest. transformer value.Transformer + // disablePaging will prevent paging on the provided resource. + disablePaging bool } // Apply overrides the provided config and options if the override has a value in that position @@ -138,6 +142,9 @@ func (o groupResourceOverrides) Apply(config *storagebackend.Config, options *St if o.transformer != nil { config.Transformer = o.transformer } + if o.disablePaging { + config.Paging = false + } } var _ StorageFactory = &DefaultStorageFactory{} @@ -157,6 +164,7 @@ var specialDefaultResourcePrefixes = map[schema.GroupResource]string{ } func NewDefaultStorageFactory(config storagebackend.Config, defaultMediaType string, defaultSerializer runtime.StorageSerializer, resourceEncodingConfig ResourceEncodingConfig, resourceConfig APIResourceConfigSource) *DefaultStorageFactory { + config.Paging = utilfeature.DefaultFeatureGate.Enabled(features.APIListChunking) if len(defaultMediaType) == 0 { defaultMediaType = runtime.ContentTypeJSON } @@ -185,6 +193,14 @@ func (s *DefaultStorageFactory) SetEtcdPrefix(groupResource schema.GroupResource s.Overrides[groupResource] = overrides } +// SetDisableAPIListChunking allows a specific resource to disable paging at the storage layer, to prevent +// exposure of key names in continuations. This may be overriden by feature gates. +func (s *DefaultStorageFactory) SetDisableAPIListChunking(groupResource schema.GroupResource) { + overrides := s.Overrides[groupResource] + overrides.disablePaging = true + s.Overrides[groupResource] = overrides +} + // SetResourceEtcdPrefix sets the prefix for a resource, but not the base-dir. You'll end up in `etcdPrefix/resourceEtcdPrefix`. func (s *DefaultStorageFactory) SetResourceEtcdPrefix(groupResource schema.GroupResource, prefix string) { overrides := s.Overrides[groupResource] diff --git a/staging/src/k8s.io/apiserver/pkg/storage/BUILD b/staging/src/k8s.io/apiserver/pkg/storage/BUILD index c8ea98ab703..f85dff5f977 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/BUILD +++ b/staging/src/k8s.io/apiserver/pkg/storage/BUILD @@ -64,6 +64,8 @@ go_library( "//vendor/k8s.io/apimachinery/pkg/util/validation/field:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//vendor/k8s.io/apimachinery/pkg/watch:go_default_library", + "//vendor/k8s.io/apiserver/pkg/features:go_default_library", + "//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library", "//vendor/k8s.io/apiserver/pkg/util/trace:go_default_library", "//vendor/k8s.io/client-go/tools/cache:go_default_library", ], diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher.go index ff164687ed2..d2f39030a1b 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher.go @@ -37,6 +37,8 @@ import ( utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" + "k8s.io/apiserver/pkg/features" + utilfeature "k8s.io/apiserver/pkg/util/feature" utiltrace "k8s.io/apiserver/pkg/util/trace" "k8s.io/client-go/tools/cache" ) @@ -406,9 +408,11 @@ func (c *Cacher) Get(ctx context.Context, key string, resourceVersion string, ob // Implements storage.Interface. func (c *Cacher) GetToList(ctx context.Context, key string, resourceVersion string, pred SelectionPredicate, listObj runtime.Object) error { - if resourceVersion == "" { + pagingEnabled := utilfeature.DefaultFeatureGate.Enabled(features.APIListChunking) + if resourceVersion == "" || (pagingEnabled && (len(pred.Continue) > 0 || pred.Limit > 0)) { // If resourceVersion is not specified, serve it from underlying - // storage (for backward compatibility). + // storage (for backward compatibility). If a continuation or limit is + // requested, serve it from the underlying storage as well. return c.storage.GetToList(ctx, key, resourceVersion, pred, listObj) } @@ -459,7 +463,7 @@ func (c *Cacher) GetToList(ctx context.Context, key string, resourceVersion stri } } if c.versioner != nil { - if err := c.versioner.UpdateList(listObj, readResourceVersion); err != nil { + if err := c.versioner.UpdateList(listObj, readResourceVersion, ""); err != nil { return err } } @@ -468,9 +472,11 @@ func (c *Cacher) GetToList(ctx context.Context, key string, resourceVersion stri // Implements storage.Interface. func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, pred SelectionPredicate, listObj runtime.Object) error { - if resourceVersion == "" { + pagingEnabled := utilfeature.DefaultFeatureGate.Enabled(features.APIListChunking) + if resourceVersion == "" || (pagingEnabled && (len(pred.Continue) > 0 || pred.Limit > 0)) { // If resourceVersion is not specified, serve it from underlying - // storage (for backward compatibility). + // storage (for backward compatibility). If a continuation or limit is + // requested, serve it from the underlying storage as well. return c.storage.List(ctx, key, resourceVersion, pred, listObj) } @@ -527,7 +533,7 @@ func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, p } trace.Step(fmt.Sprintf("Filtered %d items", listVal.Len())) if c.versioner != nil { - if err := c.versioner.UpdateList(listObj, readResourceVersion); err != nil { + if err := c.versioner.UpdateList(listObj, readResourceVersion, ""); err != nil { return err } } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd/api_object_versioner.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd/api_object_versioner.go index 5b8583a2b57..71fd85b0bb4 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd/api_object_versioner.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd/api_object_versioner.go @@ -43,7 +43,7 @@ func (a APIObjectVersioner) UpdateObject(obj runtime.Object, resourceVersion uin } // UpdateList implements Versioner -func (a APIObjectVersioner) UpdateList(obj runtime.Object, resourceVersion uint64) error { +func (a APIObjectVersioner) UpdateList(obj runtime.Object, resourceVersion uint64, nextKey string) error { listAccessor, err := meta.ListAccessor(obj) if err != nil || listAccessor == nil { return err @@ -53,6 +53,7 @@ func (a APIObjectVersioner) UpdateList(obj runtime.Object, resourceVersion uint6 versionString = strconv.FormatUint(resourceVersion, 10) } listAccessor.SetResourceVersion(versionString) + listAccessor.SetContinue(nextKey) return nil } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd/etcd_helper.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd/etcd_helper.go index 3d986f113ad..9c09c071c3e 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd/etcd_helper.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd/etcd_helper.go @@ -365,7 +365,7 @@ func (h *etcdHelper) GetToList(ctx context.Context, key string, resourceVersion return err } trace.Step("Object decoded") - if err := h.versioner.UpdateList(listObj, response.Index); err != nil { + if err := h.versioner.UpdateList(listObj, response.Index, ""); err != nil { return err } return nil @@ -445,7 +445,7 @@ func (h *etcdHelper) List(ctx context.Context, key string, resourceVersion strin return err } trace.Step("Node list decoded") - if err := h.versioner.UpdateList(listObj, index); err != nil { + if err := h.versioner.UpdateList(listObj, index, ""); err != nil { return err } return nil diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd/testing/utils.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd/testing/utils.go index 83fcbd5049c..c02d75b61a3 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd/testing/utils.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd/testing/utils.go @@ -322,6 +322,7 @@ func NewUnsecuredEtcd3TestClientServer(t *testing.T, scheme *runtime.Scheme) (*E ServerList: server.V3Client.Endpoints(), DeserializationCacheSize: etcdtest.DeserializationCacheSize, Copier: scheme, + Paging: true, } return server, config } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/BUILD b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/BUILD index 51496d936e1..5cdfd13c143 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/BUILD +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/BUILD @@ -41,6 +41,7 @@ go_library( name = "go_default_library", srcs = [ "compact.go", + "errors.go", "event.go", "store.go", "watcher.go", @@ -51,8 +52,8 @@ go_library( "//vendor/github.com/coreos/etcd/mvcc/mvccpb:go_default_library", "//vendor/github.com/golang/glog:go_default_library", "//vendor/golang.org/x/net/context:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/meta:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/conversion:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library", diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/errors.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/errors.go new file mode 100644 index 00000000000..5aac30a146d --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/errors.go @@ -0,0 +1,42 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package etcd3 + +import ( + "k8s.io/apimachinery/pkg/api/errors" + + etcdrpc "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" +) + +func interpretWatchError(err error) error { + switch { + case err == etcdrpc.ErrCompacted: + return errors.NewResourceExpired("The resourceVersion for the provided watch is too old.") + } + return err +} + +func interpretListError(err error, paging bool) error { + switch { + case err == etcdrpc.ErrCompacted: + if paging { + return errors.NewResourceExpired("The provided from parameter is too old to display a consistent list result. You must start a new list without the from.") + } + return errors.NewResourceExpired("The resourceVersion for the provided list is too old.") + } + return err +} diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go index d00a26d8922..69c715745cc 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go @@ -18,10 +18,13 @@ package etcd3 import ( "bytes" + "encoding/base64" + "encoding/json" "errors" "fmt" "path" "reflect" + "strconv" "strings" "time" @@ -60,12 +63,13 @@ type store struct { client *clientv3.Client // getOpts contains additional options that should be passed // to all Get() calls. - getOps []clientv3.OpOption - codec runtime.Codec - versioner storage.Versioner - transformer value.Transformer - pathPrefix string - watcher *watcher + getOps []clientv3.OpOption + codec runtime.Codec + versioner storage.Versioner + transformer value.Transformer + pathPrefix string + watcher *watcher + pagingEnabled bool } type elemForDecode struct { @@ -82,23 +86,24 @@ type objState struct { } // New returns an etcd3 implementation of storage.Interface. -func New(c *clientv3.Client, codec runtime.Codec, prefix string, transformer value.Transformer) storage.Interface { - return newStore(c, true, codec, prefix, transformer) +func New(c *clientv3.Client, codec runtime.Codec, prefix string, transformer value.Transformer, pagingEnabled bool) storage.Interface { + return newStore(c, true, pagingEnabled, codec, prefix, transformer) } // NewWithNoQuorumRead returns etcd3 implementation of storage.Interface // where Get operations don't require quorum read. -func NewWithNoQuorumRead(c *clientv3.Client, codec runtime.Codec, prefix string, transformer value.Transformer) storage.Interface { - return newStore(c, false, codec, prefix, transformer) +func NewWithNoQuorumRead(c *clientv3.Client, codec runtime.Codec, prefix string, transformer value.Transformer, pagingEnabled bool) storage.Interface { + return newStore(c, false, pagingEnabled, codec, prefix, transformer) } -func newStore(c *clientv3.Client, quorumRead bool, codec runtime.Codec, prefix string, transformer value.Transformer) *store { +func newStore(c *clientv3.Client, quorumRead, pagingEnabled bool, codec runtime.Codec, prefix string, transformer value.Transformer) *store { versioner := etcd.APIObjectVersioner{} result := &store{ - client: c, - codec: codec, - versioner: versioner, - transformer: transformer, + client: c, + codec: codec, + versioner: versioner, + transformer: transformer, + pagingEnabled: pagingEnabled, // for compatibility with etcd2 impl. // no-op for default prefix of '/registry'. // keeps compatibility with etcd2 impl for custom prefixes that don't start with '/' @@ -386,7 +391,66 @@ func (s *store) GetToList(ctx context.Context, key string, resourceVersion strin return err } // update version with cluster level revision - return s.versioner.UpdateList(listObj, uint64(getResp.Header.Revision)) + return s.versioner.UpdateList(listObj, uint64(getResp.Header.Revision), "") +} + +// continueToken is a simple structured object for encoding the state of a continue token. +// TODO: if we change the version of the encoded from, we can't start encoding the new version +// until all other servers are upgraded (i.e. we need to support rolling schema) +// This is a public API struct and cannot change. +type continueToken struct { + APIVersion string `json:"v"` + ResourceVersion int64 `json:"rv"` + StartKey string `json:"start"` +} + +// parseFrom transforms an encoded predicate from into a versioned struct. +// TODO: return a typed error that instructs clients that they must relist +func decodeContinue(continueValue, keyPrefix string) (fromKey string, rv int64, err error) { + data, err := base64.RawURLEncoding.DecodeString(continueValue) + if err != nil { + return "", 0, fmt.Errorf("continue key is not valid: %v", err) + } + var c continueToken + if err := json.Unmarshal(data, &c); err != nil { + return "", 0, fmt.Errorf("continue key is not valid: %v", err) + } + switch c.APIVersion { + case "v1alpha1": + if c.ResourceVersion == 0 { + return "", 0, fmt.Errorf("continue key is not valid: incorrect encoded start resourceVersion (version v1alpha1)") + } + if len(c.StartKey) == 0 { + return "", 0, fmt.Errorf("continue key is not valid: encoded start key empty (version v1alpha1)") + } + // defend against path traversal attacks by clients - path.Clean will ensure that startKey cannot + // be at a higher level of the hierarchy, and so when we append the key prefix we will end up with + // continue start key that is fully qualified and cannot range over anything less specific than + // keyPrefix. + cleaned := path.Clean(c.StartKey) + if cleaned != c.StartKey || cleaned == "." || cleaned == "/" { + return "", 0, fmt.Errorf("continue key is not valid: %s", cleaned) + } + if len(cleaned) == 0 { + return "", 0, fmt.Errorf("continue key is not valid: encoded start key empty (version 0)") + } + return keyPrefix + cleaned, c.ResourceVersion, nil + default: + return "", 0, fmt.Errorf("continue key is not valid: server does not recognize this encoded version %q", c.APIVersion) + } +} + +// encodeContinue returns a string representing the encoded continuation of the current query. +func encodeContinue(key, keyPrefix string, resourceVersion int64) (string, error) { + nextKey := strings.TrimPrefix(key, keyPrefix) + if nextKey == key { + return "", fmt.Errorf("unable to encode next field: the key and key prefix do not match") + } + out, err := json.Marshal(&continueToken{APIVersion: "v1alpha1", ResourceVersion: resourceVersion, StartKey: nextKey}) + if err != nil { + return "", err + } + return base64.RawURLEncoding.EncodeToString(out), nil } // List implements storage.Interface.List. @@ -402,16 +466,50 @@ func (s *store) List(ctx context.Context, key, resourceVersion string, pred stor if !strings.HasSuffix(key, "/") { key += "/" } - getResp, err := s.client.KV.Get(ctx, key, clientv3.WithPrefix()) + keyPrefix := key + + // set the appropriate clientv3 options to filter the returned data set + options := make([]clientv3.OpOption, 0, 4) + if s.pagingEnabled && pred.Limit > 0 { + options = append(options, clientv3.WithLimit(pred.Limit)) + } + var returnedRV int64 + switch { + case s.pagingEnabled && len(pred.Continue) > 0: + continueKey, continueRV, err := decodeContinue(pred.Continue, keyPrefix) + if err != nil { + return err + } + + options = append(options, clientv3.WithRange(clientv3.GetPrefixRangeEnd(key))) + key = continueKey + + options = append(options, clientv3.WithRev(continueRV)) + returnedRV = continueRV + + case len(resourceVersion) > 0: + fromRV, err := strconv.ParseInt(resourceVersion, 10, 64) + if err != nil { + return fmt.Errorf("invalid resource version: %v", err) + } + + options = append(options, clientv3.WithPrefix(), clientv3.WithRev(fromRV)) + returnedRV = fromRV + + default: + options = append(options, clientv3.WithPrefix()) + } + + getResp, err := s.client.KV.Get(ctx, key, options...) if err != nil { - return err + return interpretListError(err, len(pred.Continue) > 0) } elems := make([]*elemForDecode, 0, len(getResp.Kvs)) for _, kv := range getResp.Kvs { data, _, err := s.transformer.TransformFromStorage(kv.Value, authenticatedDataString(kv.Key)) if err != nil { - utilruntime.HandleError(fmt.Errorf("unable to transform key %q: %v", key, err)) + utilruntime.HandleError(fmt.Errorf("unable to transform key %q: %v", kv.Key, err)) continue } @@ -420,11 +518,31 @@ func (s *store) List(ctx context.Context, key, resourceVersion string, pred stor rev: uint64(kv.ModRevision), }) } + if err := decodeList(elems, storage.SimpleFilter(pred), listPtr, s.codec, s.versioner); err != nil { return err } - // update version with cluster level revision - return s.versioner.UpdateList(listObj, uint64(getResp.Header.Revision)) + + // indicate to the client which resource version was returned + if returnedRV == 0 { + returnedRV = getResp.Header.Revision + } + switch { + case !getResp.More: + // no continuation + return s.versioner.UpdateList(listObj, uint64(returnedRV), "") + case len(getResp.Kvs) == 0: + return fmt.Errorf("no results were found, but etcd indicated there were more values") + default: + // we want to start immediately after the last key + // TODO: this reveals info about certain keys + key := string(getResp.Kvs[len(getResp.Kvs)-1].Key) + next, err := encodeContinue(key+"\x00", keyPrefix, returnedRV) + if err != nil { + return err + } + return s.versioner.UpdateList(listObj, uint64(returnedRV), next) + } } // Watch implements storage.Interface.Watch. @@ -548,8 +666,8 @@ func decode(codec runtime.Codec, versioner storage.Versioner, value []byte, objP // decodeList decodes a list of values into a list of objects, with resource version set to corresponding rev. // On success, ListPtr would be set to the list of objects. -func decodeList(elems []*elemForDecode, filter storage.FilterFunc, ListPtr interface{}, codec runtime.Codec, versioner storage.Versioner) error { - v, err := conversion.EnforcePtr(ListPtr) +func decodeList(elems []*elemForDecode, filter storage.FilterFunc, listPtr interface{}, codec runtime.Codec, versioner storage.Versioner) error { + v, err := conversion.EnforcePtr(listPtr) if err != nil || v.Kind() != reflect.Slice { panic("need ptr to slice") } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go index eb123f8554a..e46f73a9665 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go @@ -18,12 +18,17 @@ package etcd3 import ( "bytes" + "encoding/base64" + "encoding/json" "fmt" "reflect" "strconv" "sync" "testing" + "github.com/coreos/etcd/clientv3" + "github.com/coreos/etcd/integration" + "golang.org/x/net/context" apitesting "k8s.io/apimachinery/pkg/api/testing" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" @@ -37,10 +42,6 @@ import ( "k8s.io/apiserver/pkg/storage" storagetests "k8s.io/apiserver/pkg/storage/tests" "k8s.io/apiserver/pkg/storage/value" - - "github.com/coreos/etcd/clientv3" - "github.com/coreos/etcd/integration" - "golang.org/x/net/context" ) var scheme = runtime.NewScheme() @@ -587,7 +588,7 @@ func TestTransformationFailure(t *testing.T) { codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) defer cluster.Terminate(t) - store := newStore(cluster.RandClient(), false, codec, "", prefixTransformer{prefix: []byte(defaultTestPrefix)}) + store := newStore(cluster.RandClient(), false, false, codec, "", prefixTransformer{prefix: []byte(defaultTestPrefix)}) ctx := context.Background() preset := []struct { @@ -667,7 +668,8 @@ func TestList(t *testing.T) { codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) defer cluster.Terminate(t) - store := newStore(cluster.RandClient(), false, codec, "", prefixTransformer{prefix: []byte(defaultTestPrefix)}) + store := newStore(cluster.RandClient(), false, true, codec, "", prefixTransformer{prefix: []byte(defaultTestPrefix)}) + disablePagingStore := newStore(cluster.RandClient(), false, false, codec, "", prefixTransformer{prefix: []byte(defaultTestPrefix)}) ctx := context.Background() // Setup storage with the following structure: @@ -704,40 +706,106 @@ func TestList(t *testing.T) { } } + list := &example.PodList{} + store.List(ctx, "/two-level", "0", storage.Everything, list) + continueRV, _ := strconv.Atoi(list.ResourceVersion) + secondContinuation, err := encodeContinue("/two-level/2", "/two-level/", int64(continueRV)) + if err != nil { + t.Fatal(err) + } + tests := []struct { - prefix string - pred storage.SelectionPredicate - expectedOut []*example.Pod - }{{ // test List on existing key - prefix: "/one-level/", - pred: storage.Everything, - expectedOut: []*example.Pod{preset[0].storedObj}, - }, { // test List on non-existing key - prefix: "/non-existing/", - pred: storage.Everything, - expectedOut: nil, - }, { // test List with pod name matching - prefix: "/one-level/", - pred: storage.SelectionPredicate{ - Label: labels.Everything(), - Field: fields.ParseSelectorOrDie("metadata.name!=" + preset[0].storedObj.Name), - GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, bool, error) { - pod := obj.(*example.Pod) - return nil, fields.Set{"metadata.name": pod.Name}, pod.Initializers != nil, nil - }, + disablePaging bool + prefix string + pred storage.SelectionPredicate + expectedOut []*example.Pod + expectContinue bool + }{ + { // test List on existing key + prefix: "/one-level/", + pred: storage.Everything, + expectedOut: []*example.Pod{preset[0].storedObj}, }, - expectedOut: nil, - }, { // test List with multiple levels of directories and expect flattened result - prefix: "/two-level/", - pred: storage.Everything, - expectedOut: []*example.Pod{preset[1].storedObj, preset[2].storedObj}, - }} + { // test List on non-existing key + prefix: "/non-existing/", + pred: storage.Everything, + expectedOut: nil, + }, + { // test List with pod name matching + prefix: "/one-level/", + pred: storage.SelectionPredicate{ + Label: labels.Everything(), + Field: fields.ParseSelectorOrDie("metadata.name!=" + preset[0].storedObj.Name), + GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, bool, error) { + pod := obj.(*example.Pod) + return nil, fields.Set{"metadata.name": pod.Name}, pod.Initializers != nil, nil + }, + }, + expectedOut: nil, + }, + { // test List with limit + prefix: "/two-level/", + pred: storage.SelectionPredicate{ + Label: labels.Everything(), + Field: fields.Everything(), + Limit: 1, + GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, bool, error) { + pod := obj.(*example.Pod) + return nil, fields.Set{"metadata.name": pod.Name}, pod.Initializers != nil, nil + }, + }, + expectedOut: []*example.Pod{preset[1].storedObj}, + expectContinue: true, + }, + { // test List with limit when paging disabled + disablePaging: true, + prefix: "/two-level/", + pred: storage.SelectionPredicate{ + Label: labels.Everything(), + Field: fields.Everything(), + Limit: 1, + GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, bool, error) { + pod := obj.(*example.Pod) + return nil, fields.Set{"metadata.name": pod.Name}, pod.Initializers != nil, nil + }, + }, + expectedOut: []*example.Pod{preset[1].storedObj, preset[2].storedObj}, + expectContinue: false, + }, + { // test List with pregenerated continue token + prefix: "/two-level/", + pred: storage.SelectionPredicate{ + Label: labels.Everything(), + Field: fields.Everything(), + Limit: 1, + Continue: secondContinuation, + GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, bool, error) { + pod := obj.(*example.Pod) + return nil, fields.Set{"metadata.name": pod.Name}, pod.Initializers != nil, nil + }, + }, + expectedOut: []*example.Pod{preset[2].storedObj}, + }, + { // test List with multiple levels of directories and expect flattened result + prefix: "/two-level/", + pred: storage.Everything, + expectedOut: []*example.Pod{preset[1].storedObj, preset[2].storedObj}, + }, + } for i, tt := range tests { out := &example.PodList{} - err := store.List(ctx, tt.prefix, "0", tt.pred, out) + var err error + if tt.disablePaging { + err = disablePagingStore.List(ctx, tt.prefix, "0", tt.pred, out) + } else { + err = store.List(ctx, tt.prefix, "0", tt.pred, out) + } if err != nil { - t.Fatalf("List failed: %v", err) + t.Fatalf("#%d: List failed: %v", i, err) + } + if (len(out.Continue) > 0) != tt.expectContinue { + t.Errorf("#%d: unexpected continue token: %v", i, out.Continue) } if len(tt.expectedOut) != len(out.Items) { t.Errorf("#%d: length of list want=%d, get=%d", i, len(tt.expectedOut), len(out.Items)) @@ -750,12 +818,75 @@ func TestList(t *testing.T) { } } } + + // test continuations + out := &example.PodList{} + pred := func(limit int64, continueValue string) storage.SelectionPredicate { + return storage.SelectionPredicate{ + Limit: limit, + Continue: continueValue, + Label: labels.Everything(), + Field: fields.Everything(), + GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, bool, error) { + pod := obj.(*example.Pod) + return nil, fields.Set{"metadata.name": pod.Name}, pod.Initializers != nil, nil + }, + } + } + if err := store.List(ctx, "/", "0", pred(1, ""), out); err != nil { + t.Fatalf("Unable to get initial list: %v", err) + } + if len(out.Continue) == 0 { + t.Fatalf("No continuation token set") + } + if len(out.Items) != 1 || !reflect.DeepEqual(&out.Items[0], preset[0].storedObj) { + t.Fatalf("Unexpected first page: %#v", out.Items) + } + + continueFromSecondItem := out.Continue + + // no limit, should get two items + out = &example.PodList{} + if err := store.List(ctx, "/", "0", pred(0, continueFromSecondItem), out); err != nil { + t.Fatalf("Unable to get second page: %v", err) + } + if len(out.Continue) != 0 { + t.Fatalf("Unexpected continuation token set") + } + if !reflect.DeepEqual(out.Items, []example.Pod{*preset[1].storedObj, *preset[2].storedObj}) { + key, rv, err := decodeContinue(continueFromSecondItem, "/") + t.Logf("continue token was %d %s %v", rv, key, err) + t.Fatalf("Unexpected second page: %#v", out.Items) + } + + // limit, should get two more pages + out = &example.PodList{} + if err := store.List(ctx, "/", "0", pred(1, continueFromSecondItem), out); err != nil { + t.Fatalf("Unable to get second page: %v", err) + } + if len(out.Continue) == 0 { + t.Fatalf("No continuation token set") + } + if len(out.Items) != 1 || !reflect.DeepEqual(&out.Items[0], preset[1].storedObj) { + t.Fatalf("Unexpected second page: %#v", out.Items) + } + continueFromThirdItem := out.Continue + out = &example.PodList{} + if err := store.List(ctx, "/", "0", pred(1, continueFromThirdItem), out); err != nil { + t.Fatalf("Unable to get second page: %v", err) + } + if len(out.Continue) != 0 { + t.Fatalf("Unexpected continuation token set") + } + if len(out.Items) != 1 || !reflect.DeepEqual(&out.Items[0], preset[2].storedObj) { + t.Fatalf("Unexpected third page: %#v", out.Items) + } } func testSetup(t *testing.T) (context.Context, *store, *integration.ClusterV3) { codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) - store := newStore(cluster.RandClient(), false, codec, "", prefixTransformer{prefix: []byte(defaultTestPrefix)}) + store := newStore(cluster.RandClient(), false, true, codec, "", prefixTransformer{prefix: []byte(defaultTestPrefix)}) ctx := context.Background() return ctx, store, cluster } @@ -787,9 +918,57 @@ func TestPrefix(t *testing.T) { "/registry": "/registry", } for configuredPrefix, effectivePrefix := range testcases { - store := newStore(cluster.RandClient(), false, codec, configuredPrefix, transformer) + store := newStore(cluster.RandClient(), false, true, codec, configuredPrefix, transformer) if store.pathPrefix != effectivePrefix { t.Errorf("configured prefix of %s, expected effective prefix of %s, got %s", configuredPrefix, effectivePrefix, store.pathPrefix) } } } + +func encodeContinueOrDie(apiVersion string, resourceVersion int64, nextKey string) string { + out, err := json.Marshal(&continueToken{APIVersion: apiVersion, ResourceVersion: resourceVersion, StartKey: nextKey}) + if err != nil { + panic(err) + } + return base64.RawURLEncoding.EncodeToString(out) +} + +func Test_decodeContinue(t *testing.T) { + type args struct { + continueValue string + keyPrefix string + } + tests := []struct { + name string + args args + wantFromKey string + wantRv int64 + wantErr bool + }{ + {name: "valid", args: args{continueValue: encodeContinueOrDie("v1alpha1", 1, "key"), keyPrefix: "/test/"}, wantRv: 1, wantFromKey: "/test/key"}, + + {name: "empty version", args: args{continueValue: encodeContinueOrDie("", 1, "key"), keyPrefix: "/test/"}, wantErr: true}, + {name: "invalid version", args: args{continueValue: encodeContinueOrDie("v1", 1, "key"), keyPrefix: "/test/"}, wantErr: true}, + + {name: "path traversal - parent", args: args{continueValue: encodeContinueOrDie("v1alpha", 1, "../key"), keyPrefix: "/test/"}, wantErr: true}, + {name: "path traversal - local", args: args{continueValue: encodeContinueOrDie("v1alpha", 1, "./key"), keyPrefix: "/test/"}, wantErr: true}, + {name: "path traversal - double parent", args: args{continueValue: encodeContinueOrDie("v1alpha", 1, "./../key"), keyPrefix: "/test/"}, wantErr: true}, + {name: "path traversal - after parent", args: args{continueValue: encodeContinueOrDie("v1alpha", 1, "key/../.."), keyPrefix: "/test/"}, wantErr: true}, + {name: "path traversal - separator", args: args{continueValue: encodeContinueOrDie("v1alpha", 1, "/"), keyPrefix: "/test/"}, wantErr: true}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + gotFromKey, gotRv, err := decodeContinue(tt.args.continueValue, tt.args.keyPrefix) + if (err != nil) != tt.wantErr { + t.Errorf("decodeContinue() error = %v, wantErr %v", err, tt.wantErr) + return + } + if gotFromKey != tt.wantFromKey { + t.Errorf("decodeContinue() gotFromKey = %v, want %v", gotFromKey, tt.wantFromKey) + } + if gotRv != tt.wantRv { + t.Errorf("decodeContinue() gotRv = %v, want %v", gotRv, tt.wantRv) + } + }) + } +} diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go index 50b46371021..366e161cfa0 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go @@ -19,20 +19,18 @@ package etcd3 import ( "errors" "fmt" - "net/http" "os" "strconv" "strings" "sync" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + apierrs "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/watch" "k8s.io/apiserver/pkg/storage" "k8s.io/apiserver/pkg/storage/value" "github.com/coreos/etcd/clientv3" - etcdrpc "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" "github.com/golang/glog" "golang.org/x/net/context" ) @@ -139,7 +137,7 @@ func (wc *watchChan) run() { if err == context.Canceled { break } - errResult := parseError(err) + errResult := transformErrorToEvent(err) if errResult != nil { // error result is guaranteed to be received by user before closing ResultChan. select { @@ -319,28 +317,15 @@ func (wc *watchChan) transform(e *event) (res *watch.Event) { return res } -func parseError(err error) *watch.Event { - var status *metav1.Status - switch { - case err == etcdrpc.ErrCompacted: - status = &metav1.Status{ - Status: metav1.StatusFailure, - Message: err.Error(), - Code: http.StatusGone, - Reason: metav1.StatusReasonExpired, - } - default: - status = &metav1.Status{ - Status: metav1.StatusFailure, - Message: err.Error(), - Code: http.StatusInternalServerError, - Reason: metav1.StatusReasonInternalError, - } +func transformErrorToEvent(err error) *watch.Event { + err = interpretWatchError(err) + if _, ok := err.(apierrs.APIStatus); !ok { + err = apierrs.NewInternalError(err) } - + status := err.(apierrs.APIStatus).Status() return &watch.Event{ Type: watch.Error, - Object: status, + Object: &status, } } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go index 052e4dc73a9..8c885c60f4e 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go @@ -226,13 +226,13 @@ func TestWatchError(t *testing.T) { codec := &testCodec{apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)} cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) defer cluster.Terminate(t) - invalidStore := newStore(cluster.RandClient(), false, codec, "", prefixTransformer{prefix: []byte("test!")}) + invalidStore := newStore(cluster.RandClient(), false, true, codec, "", prefixTransformer{prefix: []byte("test!")}) ctx := context.Background() w, err := invalidStore.Watch(ctx, "/abc", "0", storage.Everything) if err != nil { t.Fatalf("Watch failed: %v", err) } - validStore := newStore(cluster.RandClient(), false, codec, "", prefixTransformer{prefix: []byte("test!")}) + validStore := newStore(cluster.RandClient(), false, true, codec, "", prefixTransformer{prefix: []byte("test!")}) validStore.GuaranteedUpdate(ctx, "/abc", &example.Pod{}, true, nil, storage.SimpleUpdate( func(runtime.Object) (runtime.Object, error) { return &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, nil diff --git a/staging/src/k8s.io/apiserver/pkg/storage/interfaces.go b/staging/src/k8s.io/apiserver/pkg/storage/interfaces.go index 94fb58f40b2..0d81f05c3d9 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/interfaces.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/interfaces.go @@ -36,8 +36,9 @@ type Versioner interface { UpdateObject(obj runtime.Object, resourceVersion uint64) error // UpdateList sets the resource version into an API list object. Returns an error if the object // cannot be updated correctly. May return nil if the requested object does not need metadata - // from database. - UpdateList(obj runtime.Object, resourceVersion uint64) error + // from database. continueValue is optional and indicates that more results are available if + // the client passes that value to the server in a subsequent call. + UpdateList(obj runtime.Object, resourceVersion uint64, continueValue string) error // PrepareObjectForStorage should set SelfLink and ResourceVersion to the empty value. Should // return an error if the specified object cannot be updated. PrepareObjectForStorage(obj runtime.Object) error diff --git a/staging/src/k8s.io/apiserver/pkg/storage/selection_predicate.go b/staging/src/k8s.io/apiserver/pkg/storage/selection_predicate.go index 8421e2103de..83e423a97ab 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/selection_predicate.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/selection_predicate.go @@ -76,6 +76,8 @@ type SelectionPredicate struct { IncludeUninitialized bool GetAttrs AttrFunc IndexFields []string + Limit int64 + Continue string } // Matches returns true if the given object's labels and fields (as @@ -118,6 +120,9 @@ func (s *SelectionPredicate) MatchesObjectAttributes(l labels.Set, f fields.Set, // MatchesSingle will return (name, true) if and only if s.Field matches on the object's // name. func (s *SelectionPredicate) MatchesSingle() (string, bool) { + if len(s.Continue) > 0 { + return "", false + } // TODO: should be namespace.name if name, ok := s.Field.RequiresExactMatch("metadata.name"); ok { return name, true diff --git a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/config.go b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/config.go index 2a88ddf7453..919b11571cb 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/config.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/config.go @@ -41,6 +41,11 @@ type Config struct { CAFile string // Quorum indicates that whether read operations should be quorum-level consistent. Quorum bool + // Paging indicates whether the server implementation should allow paging (if it is + // supported). This is generally configured by feature gating, or by a specific + // resource type not wishing to allow paging, and is not intended for end users to + // set. + Paging bool // DeserializationCacheSize is the size of cache of deserialized objects. // Currently this is only supported in etcd2. // We will drop the cache once using protobuf. diff --git a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go index 2c838a09ce7..9ec49f7df01 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go @@ -61,7 +61,7 @@ func newETCD3Storage(c storagebackend.Config) (storage.Interface, DestroyFunc, e transformer = value.IdentityTransformer } if c.Quorum { - return etcd3.New(client, c.Codec, c.Prefix, transformer), destroyFunc, nil + return etcd3.New(client, c.Codec, c.Prefix, transformer, c.Paging), destroyFunc, nil } - return etcd3.NewWithNoQuorumRead(client, c.Codec, c.Prefix, transformer), destroyFunc, nil + return etcd3.NewWithNoQuorumRead(client, c.Codec, c.Prefix, transformer, c.Paging), destroyFunc, nil } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go b/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go index 924bae6f057..c3f07381340 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go @@ -93,7 +93,7 @@ func AddObjectMetaFieldsSet(source fields.Set, objectMeta *metav1.ObjectMeta, ha func newEtcdTestStorage(t *testing.T, prefix string) (*etcdtesting.EtcdTestServer, storage.Interface) { server, _ := etcdtesting.NewUnsecuredEtcd3TestClientServer(t, scheme) - storage := etcd3.New(server.V3Client, apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion), prefix, value.IdentityTransformer) + storage := etcd3.New(server.V3Client, apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion), prefix, value.IdentityTransformer, true) return server, storage } diff --git a/staging/src/k8s.io/client-go/tools/pager/pager.go b/staging/src/k8s.io/client-go/tools/pager/pager.go index 57d9632d78a..a4a04cdc325 100644 --- a/staging/src/k8s.io/client-go/tools/pager/pager.go +++ b/staging/src/k8s.io/client-go/tools/pager/pager.go @@ -30,14 +30,20 @@ import ( const defaultPageSize = 500 +// ListPageFunc returns a list object for the given list options. type ListPageFunc func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) +// SimplePageFunc adapts a context-less list function into one that accepts a context. func SimplePageFunc(fn func(opts metav1.ListOptions) (runtime.Object, error)) ListPageFunc { return func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { return fn(opts) } } +// ListPager assists client code in breaking large list queries into multiple +// smaller chunks of PageSize or smaller. PageFn is expected to accept a +// metav1.ListOptions that supports paging and return a list. The pager does +// not alter the field or label selectors on the initial options list. type ListPager struct { PageSize int64 PageFn ListPageFunc @@ -45,6 +51,8 @@ type ListPager struct { FullListIfExpired bool } +// New creates a new pager from the provided pager function using the default +// options. func New(fn ListPageFunc) *ListPager { return &ListPager{ PageSize: defaultPageSize, @@ -53,6 +61,9 @@ func New(fn ListPageFunc) *ListPager { } } +// List returns a single list object, but attempts to retrieve smaller chunks from the +// server to reduce the impact on the server. If the chunk attempt fails, it will load +// the full list instead. func (p *ListPager) List(ctx context.Context, options metav1.ListOptions) (runtime.Object, error) { if options.Limit == 0 { options.Limit = p.PageSize diff --git a/staging/src/k8s.io/code-generator/Godeps/Godeps.json b/staging/src/k8s.io/code-generator/Godeps/Godeps.json index 29608c7c695..59f27b6596c 100644 --- a/staging/src/k8s.io/code-generator/Godeps/Godeps.json +++ b/staging/src/k8s.io/code-generator/Godeps/Godeps.json @@ -238,6 +238,10 @@ "ImportPath": "github.com/spf13/pflag", "Rev": "9ff6c6923cfffbcd502984b8e0c80539a94968b7" }, + { + "ImportPath": "golang.org/x/net/context", + "Rev": "1c05540f6879653db88113bc4a2b70aec4bd491f" + }, { "ImportPath": "golang.org/x/net/http2", "Rev": "1c05540f6879653db88113bc4a2b70aec4bd491f"