From 331eea67d8000e5c4b37e2234a90903c15881c2f Mon Sep 17 00:00:00 2001 From: Clayton Coleman Date: Sat, 12 Nov 2016 10:17:54 -0800 Subject: [PATCH] Allow initialization of resources Add support for creating resources that are not immediately visible to naive clients, but must first be initialized by one or more privileged cluster agents. These controllers can mark the object as initialized, allowing others to see them. Permission to override initialization defaults or modify an initializing object is limited per resource to a virtual subresource "RESOURCE/initialize" via RBAC. Initialization is currently alpha. --- cmd/kube-apiserver/app/BUILD | 1 + cmd/kube-apiserver/app/plugins.go | 2 + federation/cmd/federation-apiserver/app/BUILD | 1 + .../cmd/federation-apiserver/app/plugins.go | 2 + .../authentication/tokenreview/storage.go | 4 + .../localsubjectaccessreview/rest.go | 4 + .../selfsubjectaccessreview/rest.go | 4 + .../authorization/subjectaccessreview/rest.go | 4 + pkg/registry/core/pod/storage/eviction.go | 5 + pkg/registry/core/pod/storage/storage.go | 5 + pkg/registry/core/service/rest.go | 5 + .../extensions/deployment/storage/storage.go | 4 + .../rbac/clusterrole/policybased/storage.go | 4 + .../clusterrolebinding/policybased/storage.go | 4 + pkg/registry/rbac/role/policybased/storage.go | 4 + .../rbac/rolebinding/policybased/storage.go | 4 + plugin/BUILD | 1 + plugin/pkg/admission/initialization/BUILD | 38 +++ .../initialization/initialization.go | 169 ++++++++++ .../pkg/api/validation/objectmeta.go | 60 +++- .../apis/meta/internalversion/conversion.go | 3 + .../pkg/apis/meta/internalversion/types.go | 2 +- .../pkg/apis/meta/v1/unstructured/BUILD | 2 + .../apis/meta/v1/unstructured/unstructured.go | 28 +- .../pkg/conversion/unstructured/BUILD | 1 - .../pkg/conversion/unstructured/converter.go | 33 +- .../apiserver/pkg/endpoints/apiserver_test.go | 52 +++- .../apiserver/pkg/endpoints/handlers/rest.go | 27 +- .../pkg/registry/generic/registry/store.go | 119 +++++++- .../registry/generic/registry/store_test.go | 288 ++++++++++++++++-- .../apiserver/pkg/registry/rest/rest.go | 10 +- .../pkg/registry/rest/resttest/resttest.go | 36 +-- .../apiserver/pkg/server/options/admission.go | 2 +- .../k8s.io/apiserver/pkg/storage/cacher.go | 14 +- .../pkg/storage/cacher_whitebox_test.go | 4 +- .../pkg/storage/etcd/etcd_helper_test.go | 4 +- .../apiserver/pkg/storage/etcd3/store_test.go | 8 +- .../apiserver/pkg/storage/etcd3/watcher.go | 2 +- .../pkg/storage/etcd3/watcher_test.go | 8 +- .../apiserver/pkg/storage/interfaces.go | 2 + .../pkg/storage/selection_predicate.go | 41 ++- .../pkg/storage/selection_predicate_test.go | 13 +- .../pkg/storage/tests/cacher_test.go | 10 +- .../apiserver/pkg/storage/watch_cache.go | 64 ++-- .../apiserver/pkg/storage/watch_cache_test.go | 4 +- .../pkg/registry/apiservice/strategy.go | 6 +- .../pkg/registry/customresource/strategy.go | 6 +- .../customresourcedefinition/strategy.go | 6 +- .../pkg/registry/wardle/strategy.go | 6 +- test/e2e/BUILD | 2 + test/e2e/e2e_test.go | 1 + test/e2e/extension/BUILD | 36 +++ test/e2e/extension/initializers.go | 121 ++++++++ 53 files changed, 1118 insertions(+), 168 deletions(-) create mode 100644 plugin/pkg/admission/initialization/BUILD create mode 100644 plugin/pkg/admission/initialization/initialization.go create mode 100644 test/e2e/extension/BUILD create mode 100644 test/e2e/extension/initializers.go diff --git a/cmd/kube-apiserver/app/BUILD b/cmd/kube-apiserver/app/BUILD index 46a7a00e941..1377ae4916a 100644 --- a/cmd/kube-apiserver/app/BUILD +++ b/cmd/kube-apiserver/app/BUILD @@ -52,6 +52,7 @@ go_library( "//plugin/pkg/admission/exec:go_default_library", "//plugin/pkg/admission/gc:go_default_library", "//plugin/pkg/admission/imagepolicy:go_default_library", + "//plugin/pkg/admission/initialization:go_default_library", "//plugin/pkg/admission/initialresources:go_default_library", "//plugin/pkg/admission/limitranger:go_default_library", "//plugin/pkg/admission/namespace/autoprovision:go_default_library", diff --git a/cmd/kube-apiserver/app/plugins.go b/cmd/kube-apiserver/app/plugins.go index f287a9a47d0..deab5b6f232 100644 --- a/cmd/kube-apiserver/app/plugins.go +++ b/cmd/kube-apiserver/app/plugins.go @@ -33,6 +33,7 @@ import ( "k8s.io/kubernetes/plugin/pkg/admission/exec" "k8s.io/kubernetes/plugin/pkg/admission/gc" "k8s.io/kubernetes/plugin/pkg/admission/imagepolicy" + "k8s.io/kubernetes/plugin/pkg/admission/initialization" "k8s.io/kubernetes/plugin/pkg/admission/initialresources" "k8s.io/kubernetes/plugin/pkg/admission/limitranger" "k8s.io/kubernetes/plugin/pkg/admission/namespace/autoprovision" @@ -59,6 +60,7 @@ func registerAllAdmissionPlugins(plugins *admission.Plugins) { exec.Register(plugins) gc.Register(plugins) imagepolicy.Register(plugins) + initialization.Register(plugins) initialresources.Register(plugins) limitranger.Register(plugins) autoprovision.Register(plugins) diff --git a/federation/cmd/federation-apiserver/app/BUILD b/federation/cmd/federation-apiserver/app/BUILD index 0966608dce0..830442e65d1 100644 --- a/federation/cmd/federation-apiserver/app/BUILD +++ b/federation/cmd/federation-apiserver/app/BUILD @@ -67,6 +67,7 @@ go_library( "//plugin/pkg/admission/admit:go_default_library", "//plugin/pkg/admission/deny:go_default_library", "//plugin/pkg/admission/gc:go_default_library", + "//plugin/pkg/admission/initialization:go_default_library", "//vendor/github.com/go-openapi/spec:go_default_library", "//vendor/github.com/golang/glog:go_default_library", "//vendor/github.com/spf13/cobra:go_default_library", diff --git a/federation/cmd/federation-apiserver/app/plugins.go b/federation/cmd/federation-apiserver/app/plugins.go index 61c5902eb12..168a0ef6e64 100644 --- a/federation/cmd/federation-apiserver/app/plugins.go +++ b/federation/cmd/federation-apiserver/app/plugins.go @@ -28,6 +28,7 @@ import ( "k8s.io/kubernetes/plugin/pkg/admission/admit" "k8s.io/kubernetes/plugin/pkg/admission/deny" "k8s.io/kubernetes/plugin/pkg/admission/gc" + "k8s.io/kubernetes/plugin/pkg/admission/initialization" ) // registerAllAdmissionPlugins registers all admission plugins @@ -35,4 +36,5 @@ func registerAllAdmissionPlugins(plugins *admission.Plugins) { admit.Register(plugins) deny.Register(plugins) gc.Register(plugins) + initialization.Register(plugins) } diff --git a/pkg/registry/authentication/tokenreview/storage.go b/pkg/registry/authentication/tokenreview/storage.go index 3c681ef9d2a..f1ddd5a1572 100644 --- a/pkg/registry/authentication/tokenreview/storage.go +++ b/pkg/registry/authentication/tokenreview/storage.go @@ -76,3 +76,7 @@ func (r *REST) Create(ctx genericapirequest.Context, obj runtime.Object) (runtim return tokenReview, nil } + +func (r *REST) CreateInitialized(ctx genericapirequest.Context, obj runtime.Object) (runtime.Object, error) { + return r.Create(ctx, obj) +} diff --git a/pkg/registry/authorization/localsubjectaccessreview/rest.go b/pkg/registry/authorization/localsubjectaccessreview/rest.go index 379a42b96ba..61d526a39e6 100644 --- a/pkg/registry/authorization/localsubjectaccessreview/rest.go +++ b/pkg/registry/authorization/localsubjectaccessreview/rest.go @@ -69,3 +69,7 @@ func (r *REST) Create(ctx genericapirequest.Context, obj runtime.Object) (runtim return localSubjectAccessReview, nil } + +func (r *REST) CreateInitialized(ctx genericapirequest.Context, obj runtime.Object) (runtime.Object, error) { + return r.Create(ctx, obj) +} diff --git a/pkg/registry/authorization/selfsubjectaccessreview/rest.go b/pkg/registry/authorization/selfsubjectaccessreview/rest.go index 94b4c06d727..1c77f9351bc 100644 --- a/pkg/registry/authorization/selfsubjectaccessreview/rest.go +++ b/pkg/registry/authorization/selfsubjectaccessreview/rest.go @@ -72,3 +72,7 @@ func (r *REST) Create(ctx genericapirequest.Context, obj runtime.Object) (runtim return selfSAR, nil } + +func (r *REST) CreateInitialized(ctx genericapirequest.Context, obj runtime.Object) (runtime.Object, error) { + return r.Create(ctx, obj) +} diff --git a/pkg/registry/authorization/subjectaccessreview/rest.go b/pkg/registry/authorization/subjectaccessreview/rest.go index 5b984b4c8bf..3b54f1f4275 100644 --- a/pkg/registry/authorization/subjectaccessreview/rest.go +++ b/pkg/registry/authorization/subjectaccessreview/rest.go @@ -62,3 +62,7 @@ func (r *REST) Create(ctx genericapirequest.Context, obj runtime.Object) (runtim return subjectAccessReview, nil } + +func (r *REST) CreateInitialized(ctx genericapirequest.Context, obj runtime.Object) (runtime.Object, error) { + return r.Create(ctx, obj) +} diff --git a/pkg/registry/core/pod/storage/eviction.go b/pkg/registry/core/pod/storage/eviction.go index 214680cce0a..acda808a0df 100644 --- a/pkg/registry/core/pod/storage/eviction.go +++ b/pkg/registry/core/pod/storage/eviction.go @@ -145,6 +145,11 @@ func (r *EvictionREST) Create(ctx genericapirequest.Context, obj runtime.Object) return &metav1.Status{Status: metav1.StatusSuccess}, nil } +// CreateInitialized will ensure the pod is evicted. +func (r *EvictionREST) CreateInitialized(ctx genericapirequest.Context, obj runtime.Object) (runtime.Object, error) { + return r.Create(ctx, obj) +} + // checkAndDecrement checks if the provided PodDisruptionBudget allows any disruption. func (r *EvictionREST) checkAndDecrement(namespace string, podName string, pdb policy.PodDisruptionBudget) (ok bool, err error) { if pdb.Status.ObservedGeneration < pdb.Generation { diff --git a/pkg/registry/core/pod/storage/storage.go b/pkg/registry/core/pod/storage/storage.go index d4107fff034..79964908ddb 100644 --- a/pkg/registry/core/pod/storage/storage.go +++ b/pkg/registry/core/pod/storage/storage.go @@ -143,6 +143,11 @@ func (r *BindingREST) Create(ctx genericapirequest.Context, obj runtime.Object) return } +// CreateInitialized will ensure the pod is bound. +func (r *BindingREST) CreateInitialized(ctx genericapirequest.Context, obj runtime.Object) (runtime.Object, error) { + return r.Create(ctx, obj) +} + // setPodHostAndAnnotations sets the given pod's host to 'machine' if and only if it was // previously 'oldMachine' and merges the provided annotations with those of the pod. // Returns the current state of the pod, or an error. diff --git a/pkg/registry/core/service/rest.go b/pkg/registry/core/service/rest.go index eadf3050319..cc8ec6472e6 100644 --- a/pkg/registry/core/service/rest.go +++ b/pkg/registry/core/service/rest.go @@ -192,6 +192,11 @@ func (rs *REST) Create(ctx genericapirequest.Context, obj runtime.Object) (runti return out, err } +// TODO: fix services to support initialization by using generic.Store +func (rs *REST) CreateInitialized(ctx genericapirequest.Context, obj runtime.Object) (runtime.Object, error) { + return rs.Create(ctx, obj) +} + func (rs *REST) Delete(ctx genericapirequest.Context, id string) (runtime.Object, error) { service, err := rs.registry.GetService(ctx, id, &metav1.GetOptions{}) if err != nil { diff --git a/pkg/registry/extensions/deployment/storage/storage.go b/pkg/registry/extensions/deployment/storage/storage.go index 2b65ec9aa9f..22a28882144 100644 --- a/pkg/registry/extensions/deployment/storage/storage.go +++ b/pkg/registry/extensions/deployment/storage/storage.go @@ -144,6 +144,10 @@ func (r *RollbackREST) Create(ctx genericapirequest.Context, obj runtime.Object) }, nil } +func (r *RollbackREST) CreateInitialized(ctx genericapirequest.Context, obj runtime.Object) (runtime.Object, error) { + return r.Create(ctx, obj) +} + func (r *RollbackREST) rollbackDeployment(ctx genericapirequest.Context, deploymentID string, config *extensions.RollbackConfig, annotations map[string]string) error { if _, err := r.setDeploymentRollback(ctx, deploymentID, config, annotations); err != nil { err = storeerr.InterpretGetError(err, extensions.Resource("deployments"), deploymentID) diff --git a/pkg/registry/rbac/clusterrole/policybased/storage.go b/pkg/registry/rbac/clusterrole/policybased/storage.go index 12ad18b5859..0d56bd5ddc9 100644 --- a/pkg/registry/rbac/clusterrole/policybased/storage.go +++ b/pkg/registry/rbac/clusterrole/policybased/storage.go @@ -52,6 +52,10 @@ func (s *Storage) Create(ctx genericapirequest.Context, obj runtime.Object) (run return s.StandardStorage.Create(ctx, obj) } +func (s *Storage) CreateInitialized(ctx genericapirequest.Context, obj runtime.Object) (runtime.Object, error) { + return s.Create(ctx, obj) +} + func (s *Storage) Update(ctx genericapirequest.Context, name string, obj rest.UpdatedObjectInfo) (runtime.Object, bool, error) { if rbacregistry.EscalationAllowed(ctx) { return s.StandardStorage.Update(ctx, name, obj) diff --git a/pkg/registry/rbac/clusterrolebinding/policybased/storage.go b/pkg/registry/rbac/clusterrolebinding/policybased/storage.go index 6794f88c696..6794c583f8a 100644 --- a/pkg/registry/rbac/clusterrolebinding/policybased/storage.go +++ b/pkg/registry/rbac/clusterrolebinding/policybased/storage.go @@ -63,6 +63,10 @@ func (s *Storage) Create(ctx genericapirequest.Context, obj runtime.Object) (run return s.StandardStorage.Create(ctx, obj) } +func (s *Storage) CreateInitialized(ctx genericapirequest.Context, obj runtime.Object) (runtime.Object, error) { + return s.Create(ctx, obj) +} + func (s *Storage) Update(ctx genericapirequest.Context, name string, obj rest.UpdatedObjectInfo) (runtime.Object, bool, error) { if rbacregistry.EscalationAllowed(ctx) { return s.StandardStorage.Update(ctx, name, obj) diff --git a/pkg/registry/rbac/role/policybased/storage.go b/pkg/registry/rbac/role/policybased/storage.go index 879b10419f8..fb51b37be79 100644 --- a/pkg/registry/rbac/role/policybased/storage.go +++ b/pkg/registry/rbac/role/policybased/storage.go @@ -52,6 +52,10 @@ func (s *Storage) Create(ctx genericapirequest.Context, obj runtime.Object) (run return s.StandardStorage.Create(ctx, obj) } +func (s *Storage) CreateInitialized(ctx genericapirequest.Context, obj runtime.Object) (runtime.Object, error) { + return s.Create(ctx, obj) +} + func (s *Storage) Update(ctx genericapirequest.Context, name string, obj rest.UpdatedObjectInfo) (runtime.Object, bool, error) { if rbacregistry.EscalationAllowed(ctx) { return s.StandardStorage.Update(ctx, name, obj) diff --git a/pkg/registry/rbac/rolebinding/policybased/storage.go b/pkg/registry/rbac/rolebinding/policybased/storage.go index ad53ea3a6c2..13adaa1dcac 100644 --- a/pkg/registry/rbac/rolebinding/policybased/storage.go +++ b/pkg/registry/rbac/rolebinding/policybased/storage.go @@ -69,6 +69,10 @@ func (s *Storage) Create(ctx genericapirequest.Context, obj runtime.Object) (run return s.StandardStorage.Create(ctx, obj) } +func (s *Storage) CreateInitialized(ctx genericapirequest.Context, obj runtime.Object) (runtime.Object, error) { + return s.Create(ctx, obj) +} + func (s *Storage) Update(ctx genericapirequest.Context, name string, obj rest.UpdatedObjectInfo) (runtime.Object, bool, error) { if rbacregistry.EscalationAllowed(ctx) { return s.StandardStorage.Update(ctx, name, obj) diff --git a/plugin/BUILD b/plugin/BUILD index 2af6b49bbce..5209dbdd37a 100644 --- a/plugin/BUILD +++ b/plugin/BUILD @@ -22,6 +22,7 @@ filegroup( "//plugin/pkg/admission/exec:all-srcs", "//plugin/pkg/admission/gc:all-srcs", "//plugin/pkg/admission/imagepolicy:all-srcs", + "//plugin/pkg/admission/initialization:all-srcs", "//plugin/pkg/admission/initialresources:all-srcs", "//plugin/pkg/admission/limitranger:all-srcs", "//plugin/pkg/admission/namespace/autoprovision:all-srcs", diff --git a/plugin/pkg/admission/initialization/BUILD b/plugin/pkg/admission/initialization/BUILD new file mode 100644 index 00000000000..88838997ef7 --- /dev/null +++ b/plugin/pkg/admission/initialization/BUILD @@ -0,0 +1,38 @@ +package(default_visibility = ["//visibility:public"]) + +licenses(["notice"]) + +load( + "@io_bazel_rules_go//go:def.bzl", + "go_library", +) + +go_library( + name = "go_default_library", + srcs = ["initialization.go"], + tags = ["automanaged"], + deps = [ + "//vendor/github.com/golang/glog:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/api/meta:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/api/validation:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/validation/field:go_default_library", + "//vendor/k8s.io/apiserver/pkg/admission:go_default_library", + "//vendor/k8s.io/apiserver/pkg/authorization/authorizer:go_default_library", + ], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], +) diff --git a/plugin/pkg/admission/initialization/initialization.go b/plugin/pkg/admission/initialization/initialization.go new file mode 100644 index 00000000000..fe7db064f7a --- /dev/null +++ b/plugin/pkg/admission/initialization/initialization.go @@ -0,0 +1,169 @@ +/* +Copyright 2014 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package initialization + +import ( + "fmt" + "io" + + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/api/validation" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/validation/field" + "k8s.io/apiserver/pkg/admission" + "k8s.io/apiserver/pkg/authorization/authorizer" +) + +// Register registers a plugin +func Register(plugins *admission.Plugins) { + plugins.Register("Initializers", func(config io.Reader) (admission.Interface, error) { + return NewInitializer(), nil + }) +} + +type initializerOptions struct { + Initializers []string +} + +type initializer struct { + resources map[schema.GroupResource]initializerOptions + authorizer authorizer.Authorizer +} + +// NewAlwaysAdmit creates a new always admit admission handler +func NewInitializer() admission.Interface { + return &initializer{ + resources: map[schema.GroupResource]initializerOptions{ + //schema.GroupResource{Resource: "pods"}: {Initializers: []string{"Test"}}, + }, + } +} + +func (i *initializer) Validate() error { + if i.authorizer == nil { + return fmt.Errorf("requires authorizer") + } + return nil +} + +func (i *initializer) SetAuthorizer(a authorizer.Authorizer) { + i.authorizer = a +} + +var initializerFieldPath = field.NewPath("metadata", "initializers") + +func (i *initializer) Admit(a admission.Attributes) (err error) { + // TODO: sub-resource action should be denied until the object is initialized + if len(a.GetSubresource()) > 0 { + return nil + } + + resource, ok := i.resources[a.GetResource().GroupResource()] + if !ok { + return nil + } + + switch a.GetOperation() { + case admission.Create: + accessor, err := meta.Accessor(a.GetObject()) + if err != nil { + // objects without meta accessor cannot be checked for initialization, and it is possible to make calls + // via our API that don't have ObjectMeta + return nil + } + existing := accessor.GetInitializers() + // it must be possible for some users to bypass initialization - for now, check the initialize operation + if existing != nil { + if err := i.canInitialize(a); err != nil { + return err + } + } + + // TODO: pull this from config + accessor.SetInitializers(copiedInitializers(resource.Initializers)) + + case admission.Update: + accessor, err := meta.Accessor(a.GetObject()) + if err != nil { + // objects without meta accessor cannot be checked for initialization, and it is possible to make calls + // via our API that don't have ObjectMeta + return nil + } + updated := accessor.GetInitializers() + + existingAccessor, err := meta.Accessor(a.GetOldObject()) + if err != nil { + // if the old object does not have an accessor, but the new one does, error out + return fmt.Errorf("initialized resources must be able to set initializers (%T): %v", a.GetOldObject(), err) + } + existing := existingAccessor.GetInitializers() + + // because we are called before validation, we need to ensure the update transition is valid. + if errs := validation.ValidateInitializersUpdate(updated, existing, initializerFieldPath); len(errs) > 0 { + return errors.NewInvalid(a.GetKind().GroupKind(), a.GetName(), errs) + } + + // caller must have the ability to mutate un-initialized resources + if err := i.canInitialize(a); err != nil { + return err + } + + // TODO: restrict initialization list changes to specific clients? + } + + return nil +} + +func (i *initializer) canInitialize(a admission.Attributes) error { + // caller must have the ability to mutate un-initialized resources + authorized, reason, err := i.authorizer.Authorize(authorizer.AttributesRecord{ + Name: a.GetName(), + ResourceRequest: true, + User: a.GetUserInfo(), + Verb: "initialize", + Namespace: a.GetNamespace(), + APIGroup: a.GetResource().Group, + APIVersion: a.GetResource().Version, + Resource: a.GetResource().Resource, + }) + if err != nil { + return err + } + if !authorized { + return fmt.Errorf("user must have permission to initialize resources: %s", reason) + } + return nil +} + +func (i *initializer) Handles(op admission.Operation) bool { + return true +} + +func copiedInitializers(names []string) *metav1.Initializers { + if len(names) == 0 { + return nil + } + var init []metav1.Initializer + for _, name := range names { + init = append(init, metav1.Initializer{Name: name}) + } + return &metav1.Initializers{ + Pending: init, + } +} diff --git a/staging/src/k8s.io/apimachinery/pkg/api/validation/objectmeta.go b/staging/src/k8s.io/apimachinery/pkg/api/validation/objectmeta.go index 04f605843df..84bd9cdedc6 100644 --- a/staging/src/k8s.io/apimachinery/pkg/api/validation/objectmeta.go +++ b/staging/src/k8s.io/apimachinery/pkg/api/validation/objectmeta.go @@ -184,10 +184,41 @@ func ValidateObjectMetaAccessor(meta metav1.Object, requiresNamespace bool, name allErrs = append(allErrs, v1validation.ValidateLabels(meta.GetLabels(), fldPath.Child("labels"))...) allErrs = append(allErrs, ValidateAnnotations(meta.GetAnnotations(), fldPath.Child("annotations"))...) allErrs = append(allErrs, ValidateOwnerReferences(meta.GetOwnerReferences(), fldPath.Child("ownerReferences"))...) + allErrs = append(allErrs, ValidateInitializers(meta.GetInitializers(), fldPath.Child("initializers"))...) allErrs = append(allErrs, ValidateFinalizers(meta.GetFinalizers(), fldPath.Child("finalizers"))...) return allErrs } +func ValidateInitializers(initializers *metav1.Initializers, fldPath *field.Path) field.ErrorList { + var allErrs field.ErrorList + if initializers == nil { + return allErrs + } + for i, initializer := range initializers.Pending { + for _, msg := range validation.IsQualifiedName(initializer.Name) { + allErrs = append(allErrs, field.Invalid(fldPath.Child("pending").Index(i), initializer.Name, msg)) + } + } + allErrs = append(allErrs, validateInitializersResult(initializers.Result, fldPath.Child("result"))...) + if len(initializers.Pending) == 0 && initializers.Result == nil { + allErrs = append(allErrs, field.Invalid(fldPath.Child("pending"), nil, "must be non-empty when result is not set")) + } + return allErrs +} + +func validateInitializersResult(result *metav1.Status, fldPath *field.Path) field.ErrorList { + var allErrs field.ErrorList + if result == nil { + return allErrs + } + switch result.Status { + case metav1.StatusFailure: + default: + allErrs = append(allErrs, field.Invalid(fldPath.Child("status"), result.Status, "must be 'Failure'")) + } + return allErrs +} + // ValidateFinalizers tests if the finalizers name are valid, and if there are conflicting finalizers. func ValidateFinalizers(finalizers []string, fldPath *field.Path) field.ErrorList { allErrs := field.ErrorList{} @@ -226,7 +257,7 @@ func ValidateObjectMetaUpdate(newMeta, oldMeta *metav1.ObjectMeta, fldPath *fiel } func ValidateObjectMetaAccessorUpdate(newMeta, oldMeta metav1.Object, fldPath *field.Path) field.ErrorList { - allErrs := field.ErrorList{} + var allErrs field.ErrorList if !RepairMalformedUpdates && newMeta.GetUID() != oldMeta.GetUID() { allErrs = append(allErrs, field.Invalid(fldPath.Child("uid"), newMeta.GetUID(), "field is immutable")) @@ -276,6 +307,8 @@ func ValidateObjectMetaAccessorUpdate(newMeta, oldMeta metav1.Object, fldPath *f allErrs = append(allErrs, field.Invalid(fldPath.Child("generation"), newMeta.GetGeneration(), "must not be decremented")) } + allErrs = append(allErrs, ValidateInitializersUpdate(newMeta.GetInitializers(), oldMeta.GetInitializers(), fldPath.Child("initializers"))...) + allErrs = append(allErrs, ValidateImmutableField(newMeta.GetName(), oldMeta.GetName(), fldPath.Child("name"))...) allErrs = append(allErrs, ValidateImmutableField(newMeta.GetNamespace(), oldMeta.GetNamespace(), fldPath.Child("namespace"))...) allErrs = append(allErrs, ValidateImmutableField(newMeta.GetUID(), oldMeta.GetUID(), fldPath.Child("uid"))...) @@ -288,3 +321,28 @@ func ValidateObjectMetaAccessorUpdate(newMeta, oldMeta metav1.Object, fldPath *f return allErrs } + +// ValidateInitializersUpdate checks the update of the metadata initializers field +func ValidateInitializersUpdate(newInit, oldInit *metav1.Initializers, fldPath *field.Path) field.ErrorList { + var allErrs field.ErrorList + switch { + case oldInit == nil && newInit != nil: + // Initializers may not be set on new objects + allErrs = append(allErrs, field.Invalid(fldPath, nil, "field is immutable once initialization has completed")) + case oldInit != nil && newInit == nil: + // this is a valid transition and means initialization was successful + case oldInit != nil && newInit != nil: + // validate changes to initializers + switch { + case oldInit.Result == nil && newInit.Result != nil: + // setting a result is allowed + allErrs = append(allErrs, validateInitializersResult(newInit.Result, fldPath.Child("result"))...) + case oldInit.Result != nil: + // setting Result implies permanent failure, and all future updates will be prevented + allErrs = append(allErrs, ValidateImmutableField(newInit.Result, oldInit.Result, fldPath.Child("result"))...) + default: + // leaving the result nil is allowed + } + } + return allErrs +} diff --git a/staging/src/k8s.io/apimachinery/pkg/apis/meta/internalversion/conversion.go b/staging/src/k8s.io/apimachinery/pkg/apis/meta/internalversion/conversion.go index 73e7d475624..eff9a7e12e4 100644 --- a/staging/src/k8s.io/apimachinery/pkg/apis/meta/internalversion/conversion.go +++ b/staging/src/k8s.io/apimachinery/pkg/apis/meta/internalversion/conversion.go @@ -18,6 +18,7 @@ package internalversion import ( "fmt" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/conversion" "k8s.io/apimachinery/pkg/util/validation/field" @@ -30,6 +31,7 @@ func Convert_internalversion_ListOptions_To_v1_ListOptions(in *ListOptions, out if err := metav1.Convert_labels_Selector_To_string(&in.LabelSelector, &out.LabelSelector, s); err != nil { return err } + out.IncludeUninitialized = in.IncludeUninitialized out.ResourceVersion = in.ResourceVersion out.TimeoutSeconds = in.TimeoutSeconds out.Watch = in.Watch @@ -43,6 +45,7 @@ func Convert_v1_ListOptions_To_internalversion_ListOptions(in *metav1.ListOption if err := metav1.Convert_string_To_labels_Selector(&in.LabelSelector, &out.LabelSelector, s); err != nil { return err } + out.IncludeUninitialized = in.IncludeUninitialized out.ResourceVersion = in.ResourceVersion out.TimeoutSeconds = in.TimeoutSeconds out.Watch = in.Watch diff --git a/staging/src/k8s.io/apimachinery/pkg/apis/meta/internalversion/types.go b/staging/src/k8s.io/apimachinery/pkg/apis/meta/internalversion/types.go index 576baeac64e..0aa4188df2d 100644 --- a/staging/src/k8s.io/apimachinery/pkg/apis/meta/internalversion/types.go +++ b/staging/src/k8s.io/apimachinery/pkg/apis/meta/internalversion/types.go @@ -33,7 +33,7 @@ type ListOptions struct { FieldSelector fields.Selector // If true, partially initialized resources are included in the response. // +optional - IncludeUninitialized bool `json:"includeUninitialized,omitempty"` + IncludeUninitialized bool // If true, watch for changes to this list Watch bool // When specified with a watch call, shows changes that occur after that particular version of a resource. diff --git a/staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured/BUILD b/staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured/BUILD index 0b4366c93ae..b6b0d2bb5d4 100644 --- a/staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured/BUILD +++ b/staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured/BUILD @@ -22,9 +22,11 @@ go_library( deps = [ "//vendor/github.com/golang/glog:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/conversion/unstructured:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/json:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library", ], ) diff --git a/staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured/unstructured.go b/staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured/unstructured.go index a74ed02689a..d9a4f887b7a 100644 --- a/staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured/unstructured.go +++ b/staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured/unstructured.go @@ -27,10 +27,12 @@ import ( "github.com/golang/glog" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/conversion/unstructured" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/json" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" ) // Unstructured allows objects that do not have Golang structs registered to be manipulated @@ -452,12 +454,34 @@ func (u *Unstructured) GroupVersionKind() schema.GroupVersionKind { return gvk } +var converter = unstructured.NewConverter(false) + func (u *Unstructured) GetInitializers() *metav1.Initializers { - panic("not implemented") + field := getNestedField(u.Object, "metadata", "initializers") + if field == nil { + return nil + } + obj, ok := field.(map[string]interface{}) + if !ok { + return nil + } + out := &metav1.Initializers{} + if err := converter.FromUnstructured(obj, out); err != nil { + utilruntime.HandleError(fmt.Errorf("unable to retrieve initializers for object: %v", err)) + } + return out } func (u *Unstructured) SetInitializers(initializers *metav1.Initializers) { - panic("not implemented") + if initializers == nil { + setNestedField(u.Object, nil, "metadata", "initializers") + return + } + out := make(map[string]interface{}) + if err := converter.ToUnstructured(initializers, &out); err != nil { + utilruntime.HandleError(fmt.Errorf("unable to retrieve initializers for object: %v", err)) + } + setNestedField(u.Object, out, "metadata", "initializers") } func (u *Unstructured) GetFinalizers() []string { diff --git a/staging/src/k8s.io/apimachinery/pkg/conversion/unstructured/BUILD b/staging/src/k8s.io/apimachinery/pkg/conversion/unstructured/BUILD index b0337b40f4c..0804afc3301 100644 --- a/staging/src/k8s.io/apimachinery/pkg/conversion/unstructured/BUILD +++ b/staging/src/k8s.io/apimachinery/pkg/conversion/unstructured/BUILD @@ -31,7 +31,6 @@ go_library( deps = [ "//vendor/github.com/golang/glog:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/equality:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/diff:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/json:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library", diff --git a/staging/src/k8s.io/apimachinery/pkg/conversion/unstructured/converter.go b/staging/src/k8s.io/apimachinery/pkg/conversion/unstructured/converter.go index fc28406b0b5..cf84a619898 100644 --- a/staging/src/k8s.io/apimachinery/pkg/conversion/unstructured/converter.go +++ b/staging/src/k8s.io/apimachinery/pkg/conversion/unstructured/converter.go @@ -29,7 +29,6 @@ import ( "sync/atomic" apiequality "k8s.io/apimachinery/pkg/api/equality" - "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/diff" "k8s.io/apimachinery/pkg/util/json" utilruntime "k8s.io/apimachinery/pkg/util/runtime" @@ -37,11 +36,11 @@ import ( "github.com/golang/glog" ) -// Converter is an interface for converting between runtime.Object +// Converter is an interface for converting between interface{} // and map[string]interface representation. type Converter interface { - ToUnstructured(obj runtime.Object, u *map[string]interface{}) error - FromUnstructured(u map[string]interface{}, obj runtime.Object) error + ToUnstructured(obj interface{}, u *map[string]interface{}) error + FromUnstructured(u map[string]interface{}, obj interface{}) error } type structField struct { @@ -92,7 +91,7 @@ func parseBool(key string) bool { return value } -// ConverterImpl knows how to convert betweek runtime.Object and +// ConverterImpl knows how to convert between interface{} and // Unstructured in both ways. type converterImpl struct { // If true, we will be additionally running conversion via json @@ -107,10 +106,15 @@ func NewConverter(mismatchDetection bool) Converter { } } -func (c *converterImpl) FromUnstructured(u map[string]interface{}, obj runtime.Object) error { - err := fromUnstructured(reflect.ValueOf(u), reflect.ValueOf(obj).Elem()) +func (c *converterImpl) FromUnstructured(u map[string]interface{}, obj interface{}) error { + t := reflect.TypeOf(obj) + value := reflect.ValueOf(obj) + if t.Kind() != reflect.Ptr || value.IsNil() { + return fmt.Errorf("FromUnstructured requires a non-nil pointer to an object, got %v", t) + } + err := fromUnstructured(reflect.ValueOf(u), value.Elem()) if c.mismatchDetection { - newObj := reflect.New(reflect.TypeOf(obj).Elem()).Interface().(runtime.Object) + newObj := reflect.New(t.Elem()).Interface() newErr := fromUnstructuredViaJSON(u, newObj) if (err != nil) != (newErr != nil) { glog.Fatalf("FromUnstructured unexpected error for %v: error: %v", u, err) @@ -122,7 +126,7 @@ func (c *converterImpl) FromUnstructured(u map[string]interface{}, obj runtime.O return err } -func fromUnstructuredViaJSON(u map[string]interface{}, obj runtime.Object) error { +func fromUnstructuredViaJSON(u map[string]interface{}, obj interface{}) error { data, err := json.Marshal(u) if err != nil { return err @@ -384,8 +388,13 @@ func interfaceFromUnstructured(sv, dv reflect.Value) error { return nil } -func (c *converterImpl) ToUnstructured(obj runtime.Object, u *map[string]interface{}) error { - err := toUnstructured(reflect.ValueOf(obj).Elem(), reflect.ValueOf(u).Elem()) +func (c *converterImpl) ToUnstructured(obj interface{}, u *map[string]interface{}) error { + t := reflect.TypeOf(obj) + value := reflect.ValueOf(obj) + if t.Kind() != reflect.Ptr || value.IsNil() { + return fmt.Errorf("ToUnstructured requires a non-nil pointer to an object, got %v", t) + } + err := toUnstructured(value.Elem(), reflect.ValueOf(u).Elem()) if c.mismatchDetection { newUnstr := &map[string]interface{}{} newErr := toUnstructuredViaJSON(obj, newUnstr) @@ -399,7 +408,7 @@ func (c *converterImpl) ToUnstructured(obj runtime.Object, u *map[string]interfa return err } -func toUnstructuredViaJSON(obj runtime.Object, u *map[string]interface{}) error { +func toUnstructuredViaJSON(obj interface{}, u *map[string]interface{}) error { data, err := json.Marshal(obj) if err != nil { return err diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/apiserver_test.go b/staging/src/k8s.io/apiserver/pkg/endpoints/apiserver_test.go index 24f0698f9ee..467e7dc5777 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/apiserver_test.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/apiserver_test.go @@ -407,6 +407,7 @@ type SimpleRESTStorage struct { fakeWatch *watch.FakeWatcher requestedLabelSelector labels.Selector requestedFieldSelector fields.Selector + requestedUninitialized bool requestedResourceVersion string requestedResourceNamespace string @@ -449,6 +450,7 @@ func (storage *SimpleRESTStorage) List(ctx request.Context, options *metainterna if options != nil && options.FieldSelector != nil { storage.requestedFieldSelector = options.FieldSelector } + storage.requestedUninitialized = options.IncludeUninitialized return result, storage.errors["list"] } @@ -522,7 +524,7 @@ func (storage *SimpleRESTStorage) NewList() runtime.Object { return &genericapitesting.SimpleList{} } -func (storage *SimpleRESTStorage) Create(ctx request.Context, obj runtime.Object) (runtime.Object, error) { +func (storage *SimpleRESTStorage) Create(ctx request.Context, obj runtime.Object, includeUninitialized bool) (runtime.Object, error) { storage.checkContext(ctx) storage.created = obj.(*genericapitesting.Simple) if err := storage.errors["create"]; err != nil { @@ -717,7 +719,7 @@ type NamedCreaterRESTStorage struct { createdName string } -func (storage *NamedCreaterRESTStorage) Create(ctx request.Context, name string, obj runtime.Object) (runtime.Object, error) { +func (storage *NamedCreaterRESTStorage) Create(ctx request.Context, name string, obj runtime.Object, includeUninitialized bool) (runtime.Object, error) { storage.checkContext(ctx) storage.created = obj.(*genericapitesting.Simple) storage.createdName = name @@ -1470,6 +1472,52 @@ func TestGet(t *testing.T) { } } +func TestGetUninitialized(t *testing.T) { + storage := map[string]rest.Storage{} + simpleStorage := SimpleRESTStorage{ + list: []genericapitesting.Simple{ + { + ObjectMeta: metav1.ObjectMeta{ + Initializers: &metav1.Initializers{ + Pending: []metav1.Initializer{{Name: "test"}}, + }, + }, + Other: "foo", + }, + }, + } + selfLinker := &setTestSelfLinker{ + t: t, + expectedSet: "/" + prefix + "/" + testGroupVersion.Group + "/" + testGroupVersion.Version + "/namespaces/default/simple/id", + alternativeSet: sets.NewString("/" + prefix + "/" + testGroupVersion.Group + "/" + testGroupVersion.Version + "/namespaces/default/simple"), + name: "id", + namespace: "default", + } + storage["simple"] = &simpleStorage + handler := handleLinker(storage, selfLinker) + server := httptest.NewServer(handler) + defer server.Close() + + resp, err := http.Get(server.URL + "/" + prefix + "/" + testGroupVersion.Group + "/" + testGroupVersion.Version + "/namespaces/default/simple?includeUninitialized=true") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if resp.StatusCode != http.StatusOK { + t.Fatalf("unexpected response: %#v", resp) + } + var itemOut genericapitesting.SimpleList + body, err := extractBody(resp, &itemOut) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + if len(itemOut.Items) != 1 || itemOut.Items[0].Other != "foo" { + t.Errorf("Unexpected data: %#v, expected %#v (%s)", itemOut, simpleStorage.item, string(body)) + } + if !simpleStorage.requestedUninitialized { + t.Errorf("Didn't set correct flag") + } +} + func TestGetPretty(t *testing.T) { storage := map[string]rest.Storage{} simpleStorage := SimpleRESTStorage{ diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/rest.go b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/rest.go index 578d1c95ff2..e57b694b61f 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/rest.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/rest.go @@ -449,13 +449,12 @@ func createHandler(r rest.NamedCreater, scope RequestScope, typer runtime.Object } } + // TODO: replace with content type negotiation? + includeUninitialized := req.URL.Query().Get("includeUninitialized") == "1" + trace.Step("About to store object in database") result, err := finishRequest(timeout, func() (runtime.Object, error) { - out, err := r.Create(ctx, name, obj) - if status, ok := out.(*metav1.Status); ok && err == nil && status.Code == 0 { - status.Code = http.StatusCreated - } - return out, err + return r.Create(ctx, name, obj, includeUninitialized) }) if err != nil { scope.err(err, w, req) @@ -474,7 +473,19 @@ func createHandler(r rest.NamedCreater, scope RequestScope, typer runtime.Object } trace.Step("Self-link added") - transformResponseObject(ctx, scope, req, w, http.StatusCreated, result) + // If the object is partially initialized, always indicate it via StatusAccepted + code := http.StatusCreated + if accessor, err := meta.Accessor(result); err == nil { + if accessor.GetInitializers() != nil { + code = http.StatusAccepted + } + } + status, ok := result.(*metav1.Status) + if ok && err == nil && status.Code == 0 { + status.Code = int32(code) + } + + transformResponseObject(ctx, scope, req, w, code, result) } } @@ -492,8 +503,8 @@ type namedCreaterAdapter struct { rest.Creater } -func (c *namedCreaterAdapter) Create(ctx request.Context, name string, obj runtime.Object) (runtime.Object, error) { - return c.Creater.Create(ctx, obj) +func (c *namedCreaterAdapter) Create(ctx request.Context, name string, obj runtime.Object, includeUninitialized bool) (runtime.Object, error) { + return c.Creater.Create(ctx, obj, includeUninitialized) } // PatchResource returns a function that will handle a resource patch diff --git a/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go b/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go index 74ddaee2c67..77b55229e88 100644 --- a/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go +++ b/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go @@ -259,6 +259,7 @@ func (e *Store) ListPredicate(ctx genericapirequest.Context, p storage.Selection // By default we should serve the request from etcd. options = &metainternalversion.ListOptions{ResourceVersion: ""} } + p.IncludeUninitialized = options.IncludeUninitialized list := e.NewListFunc() if name, ok := p.MatchesSingle(); ok { if key, err := e.KeyFunc(ctx, name); err == nil { @@ -273,7 +274,7 @@ func (e *Store) ListPredicate(ctx genericapirequest.Context, p storage.Selection } // Create inserts a new item according to the unique key from the object. -func (e *Store) Create(ctx genericapirequest.Context, obj runtime.Object) (runtime.Object, error) { +func (e *Store) Create(ctx genericapirequest.Context, obj runtime.Object, includeUninitialized bool) (runtime.Object, error) { if err := rest.BeforeCreate(e.CreateStrategy, ctx, obj); err != nil { return nil, err } @@ -319,15 +320,91 @@ func (e *Store) Create(ctx genericapirequest.Context, obj runtime.Object) (runti return nil, err } } + if !includeUninitialized { + return e.WaitForInitialized(ctx, out) + } return out, nil } +func (e *Store) WaitForInitialized(ctx genericapirequest.Context, obj runtime.Object) (runtime.Object, error) { + // return early if we don't have initializers, or if they've completed already + accessor, err := meta.Accessor(obj) + if err != nil { + return obj, nil + } + initializers := accessor.GetInitializers() + if initializers == nil { + return obj, nil + } + if result := initializers.Result; result != nil { + return nil, kubeerr.FromObject(result) + } + + key, err := e.KeyFunc(ctx, accessor.GetName()) + if err != nil { + return nil, err + } + w, err := e.Storage.Watch(ctx, key, accessor.GetResourceVersion(), storage.SelectionPredicate{ + Label: labels.Everything(), + Field: fields.Everything(), + + IncludeUninitialized: true, + }) + if err != nil { + return nil, err + } + defer w.Stop() + + latest := obj + ch := w.ResultChan() + for { + select { + case event, ok := <-ch: + if !ok { + // TODO: should we just expose the partially initialized object? + return nil, kubeerr.NewServerTimeout(e.QualifiedResource, "create", 0) + } + switch event.Type { + case watch.Deleted: + if latest = event.Object; latest != nil { + if accessor, err := meta.Accessor(latest); err == nil { + if initializers := accessor.GetInitializers(); initializers != nil && initializers.Result != nil { + // initialization failed, but we missed the modification event + return nil, kubeerr.FromObject(initializers.Result) + } + } + } + return nil, kubeerr.NewInternalError(fmt.Errorf("object deleted while waiting for creation")) + case watch.Error: + if status, ok := event.Object.(*metav1.Status); ok { + return nil, &kubeerr.StatusError{ErrStatus: *status} + } + return nil, kubeerr.NewInternalError(fmt.Errorf("unexpected object in watch stream, can't complete initialization %T", event.Object)) + case watch.Modified: + latest = event.Object + accessor, err = meta.Accessor(latest) + if err != nil { + return nil, kubeerr.NewInternalError(fmt.Errorf("object no longer has access to metadata %T: %v", latest, err)) + } + initializers := accessor.GetInitializers() + if initializers == nil { + // completed initialization + return latest, nil + } + if result := initializers.Result; result != nil { + // initialization failed + return nil, kubeerr.FromObject(result) + } + } + case <-ctx.Done(): + } + } +} + // shouldDeleteDuringUpdate checks if a Update is removing all the object's // finalizers. If so, it further checks if the object's -// DeletionGracePeriodSeconds is 0. If so, it returns true. -// -// If the store does not have garbage collection enabled, -// shouldDeleteDuringUpdate will always return false. +// DeletionGracePeriodSeconds is 0. If so, it returns true. If garbage collection +// is disabled it always returns false. func (e *Store) shouldDeleteDuringUpdate(ctx genericapirequest.Context, key string, obj, existing runtime.Object) bool { if !e.EnableGarbageCollection { return false @@ -345,9 +422,23 @@ func (e *Store) shouldDeleteDuringUpdate(ctx genericapirequest.Context, key stri return len(newMeta.GetFinalizers()) == 0 && oldMeta.GetDeletionGracePeriodSeconds() != nil && *oldMeta.GetDeletionGracePeriodSeconds() == 0 } -// deleteForEmptyFinalizers handles deleting an object once its finalizer list -// becomes empty due to an update. -func (e *Store) deleteForEmptyFinalizers(ctx genericapirequest.Context, name, key string, obj runtime.Object, preconditions *storage.Preconditions) (runtime.Object, bool, error) { +// shouldDeleteForFailedInitialization returns true if the provided object is initializing and has +// a failure recorded. +func (e *Store) shouldDeleteForFailedInitialization(ctx genericapirequest.Context, obj runtime.Object) bool { + m, err := meta.Accessor(obj) + if err != nil { + utilruntime.HandleError(err) + return false + } + if initializers := m.GetInitializers(); initializers != nil && initializers.Result != nil { + return true + } + return false +} + +// deleteWithoutFinalizers handles deleting an object ignoring its finalizer list. +// Used for objects that are either been finalized or have never initialized. +func (e *Store) deleteWithoutFinalizers(ctx genericapirequest.Context, name, key string, obj runtime.Object, preconditions *storage.Preconditions) (runtime.Object, bool, error) { out := e.NewFunc() glog.V(6).Infof("going to delete %s from registry, triggered by update", name) if err := e.Storage.Delete(ctx, key, out, preconditions); err != nil { @@ -477,7 +568,7 @@ func (e *Store) Update(ctx genericapirequest.Context, name string, objInfo rest. if err != nil { // delete the object if err == errEmptiedFinalizers { - return e.deleteForEmptyFinalizers(ctx, name, key, deleteObj, storagePreconditions) + return e.deleteWithoutFinalizers(ctx, name, key, deleteObj, storagePreconditions) } if creating { err = storeerr.InterpretCreateError(err, e.QualifiedResource, name) @@ -487,6 +578,11 @@ func (e *Store) Update(ctx genericapirequest.Context, name string, objInfo rest. } return nil, false, err } + + if e.shouldDeleteForFailedInitialization(ctx, out) { + return e.deleteWithoutFinalizers(ctx, name, key, out, storagePreconditions) + } + if creating { if e.AfterCreate != nil { if err := e.AfterCreate(out); err != nil { @@ -1025,11 +1121,14 @@ func (e *Store) Watch(ctx genericapirequest.Context, options *metainternalversio if options != nil && options.FieldSelector != nil { field = options.FieldSelector } + predicate := e.PredicateFunc(label, field) + resourceVersion := "" if options != nil { resourceVersion = options.ResourceVersion + predicate.IncludeUninitialized = options.IncludeUninitialized } - return e.WatchPredicate(ctx, e.PredicateFunc(label, field), resourceVersion) + return e.WatchPredicate(ctx, predicate, resourceVersion) } // WatchPredicate starts a watch for the items that m matches. diff --git a/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store_test.go b/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store_test.go index d589d57256b..d0443115efd 100644 --- a/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store_test.go +++ b/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store_test.go @@ -39,6 +39,7 @@ import ( "k8s.io/apimachinery/pkg/selection" "k8s.io/apimachinery/pkg/util/validation/field" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apimachinery/pkg/watch" "k8s.io/apiserver/pkg/apis/example" examplev1 "k8s.io/apiserver/pkg/apis/example/v1" genericapirequest "k8s.io/apiserver/pkg/endpoints/request" @@ -117,9 +118,9 @@ func NewTestGenericStoreRegistry(t *testing.T) (factory.DestroyFunc, *Store) { return newTestGenericStoreRegistry(t, scheme, false) } -func getPodAttrs(obj runtime.Object) (labels.Set, fields.Set, error) { +func getPodAttrs(obj runtime.Object) (labels.Set, fields.Set, bool, error) { pod := obj.(*example.Pod) - return labels.Set{"name": pod.ObjectMeta.Name}, nil, nil + return labels.Set{"name": pod.ObjectMeta.Name}, nil, pod.Initializers != nil, nil } // matchPodName returns selection predicate that matches any pod with name in the set. @@ -142,8 +143,8 @@ func matchEverything() storage.SelectionPredicate { return storage.SelectionPredicate{ Label: labels.Everything(), Field: fields.Everything(), - GetAttrs: func(obj runtime.Object) (label labels.Set, field fields.Set, err error) { - return nil, nil, nil + GetAttrs: func(obj runtime.Object) (label labels.Set, field fields.Set, uninitialized bool, err error) { + return nil, nil, false, nil }, } } @@ -238,7 +239,7 @@ func TestStoreListResourceVersion(t *testing.T) { destroyFunc, registry := newTestGenericStoreRegistry(t, scheme, true) defer destroyFunc() - obj, err := registry.Create(ctx, fooPod) + obj, err := registry.Create(ctx, fooPod, false) if err != nil { t.Fatal(err) } @@ -268,7 +269,7 @@ func TestStoreListResourceVersion(t *testing.T) { t.Fatalf("expected waiting, but get %#v", l) } - if _, err := registry.Create(ctx, barPod); err != nil { + if _, err := registry.Create(ctx, barPod, false); err != nil { t.Fatal(err) } @@ -305,7 +306,7 @@ func TestStoreCreate(t *testing.T) { registry.DeleteStrategy = testGracefulStrategy{defaultDeleteStrategy} // create the object - objA, err := registry.Create(testContext, podA) + objA, err := registry.Create(testContext, podA, false) if err != nil { t.Errorf("Unexpected error: %v", err) } @@ -322,7 +323,7 @@ func TestStoreCreate(t *testing.T) { } // now try to create the second pod - _, err = registry.Create(testContext, podB) + _, err = registry.Create(testContext, podB, false) if !errors.IsAlreadyExists(err) { t.Errorf("Unexpected error: %v", err) } @@ -341,7 +342,7 @@ func TestStoreCreate(t *testing.T) { } // try to create before graceful deletion period is over - _, err = registry.Create(testContext, podA) + _, err = registry.Create(testContext, podA, false) if err == nil || !errors.IsAlreadyExists(err) { t.Fatalf("Expected 'already exists' error from storage, but got %v", err) } @@ -353,6 +354,208 @@ func TestStoreCreate(t *testing.T) { } } +func isPendingInitialization(obj metav1.Object) bool { + return obj.GetInitializers() != nil && obj.GetInitializers().Result == nil && len(obj.GetInitializers().Pending) > 0 +} + +func hasInitializers(obj metav1.Object, expected ...string) bool { + if !isPendingInitialization(obj) { + return false + } + if len(expected) != len(obj.GetInitializers().Pending) { + return false + } + for i, init := range obj.GetInitializers().Pending { + if init.Name != expected[i] { + return false + } + } + return true +} + +func isFailedInitialization(obj metav1.Object) bool { + return obj.GetInitializers() != nil && obj.GetInitializers().Result != nil && obj.GetInitializers().Result.Status == metav1.StatusFailure +} + +func isInitialized(obj metav1.Object) bool { + return obj.GetInitializers() == nil +} + +func TestStoreCreateInitialized(t *testing.T) { + podA := &example.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", Namespace: "test", + Initializers: &metav1.Initializers{ + Pending: []metav1.Initializer{{Name: "Test"}}, + }, + }, + Spec: example.PodSpec{NodeName: "machine"}, + } + + ctx := genericapirequest.WithNamespace(genericapirequest.NewContext(), "test") + destroyFunc, registry := NewTestGenericStoreRegistry(t) + defer destroyFunc() + + ch := make(chan struct{}) + chObserver := make(chan struct{}) + + // simulate a background initializer that initializes the object + early := make(chan struct{}, 1) + go func() { + defer close(ch) + w, err := registry.Watch(ctx, &metainternalversion.ListOptions{ + IncludeUninitialized: true, + Watch: true, + FieldSelector: fields.OneTermEqualSelector("metadata.name", "foo"), + }) + if err != nil { + t.Fatal(err) + } + defer w.Stop() + event := <-w.ResultChan() + pod := event.Object.(*example.Pod) + if event.Type != watch.Added || !hasInitializers(pod, "Test") { + t.Fatalf("unexpected event: %s %#v", event.Type, event.Object) + } + + select { + case <-early: + t.Fatalf("CreateInitialized should not have returned") + default: + } + + pod.Initializers = nil + updated, _, err := registry.Update(ctx, podA.Name, rest.DefaultUpdatedObjectInfo(pod, scheme)) + if err != nil { + t.Fatal(err) + } + pod = updated.(*example.Pod) + if !isInitialized(pod) { + t.Fatalf("unexpected update: %#v", pod.Initializers) + } + + event = <-w.ResultChan() + if event.Type != watch.Modified || !isInitialized(event.Object.(*example.Pod)) { + t.Fatalf("unexpected event: %s %#v", event.Type, event.Object) + } + }() + + // create a background worker that should only observe the final creation + go func() { + defer close(chObserver) + w, err := registry.Watch(ctx, &metainternalversion.ListOptions{ + IncludeUninitialized: false, + Watch: true, + FieldSelector: fields.OneTermEqualSelector("metadata.name", "foo"), + }) + if err != nil { + t.Fatal(err) + } + defer w.Stop() + + event := <-w.ResultChan() + pod := event.Object.(*example.Pod) + if event.Type != watch.Added || !isInitialized(pod) { + t.Fatalf("unexpected event: %s %#v", event.Type, event.Object) + } + }() + + // create the object + objA, err := registry.Create(ctx, podA, false) + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + + // signal that we're now waiting, then wait for both observers to see + // the result of the create. + early <- struct{}{} + <-ch + <-chObserver + + // get the object + checkobj, err := registry.Get(ctx, podA.Name, &metav1.GetOptions{}) + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + + // verify objects are equal + if e, a := objA, checkobj; !reflect.DeepEqual(e, a) { + t.Errorf("Expected %#v, got %#v", e, a) + } +} + +func TestStoreCreateInitializedFailed(t *testing.T) { + podA := &example.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", Namespace: "test", + Initializers: &metav1.Initializers{ + Pending: []metav1.Initializer{{Name: "Test"}}, + }, + }, + Spec: example.PodSpec{NodeName: "machine"}, + } + + ctx := genericapirequest.WithNamespace(genericapirequest.NewContext(), "test") + destroyFunc, registry := NewTestGenericStoreRegistry(t) + defer destroyFunc() + + ch := make(chan struct{}) + go func() { + w, err := registry.Watch(ctx, &metainternalversion.ListOptions{ + IncludeUninitialized: true, + Watch: true, + FieldSelector: fields.OneTermEqualSelector("metadata.name", "foo"), + }) + if err != nil { + t.Fatal(err) + } + event := <-w.ResultChan() + pod := event.Object.(*example.Pod) + if event.Type != watch.Added || !hasInitializers(pod, "Test") { + t.Fatalf("unexpected event: %s %#v", event.Type, event.Object) + } + pod.Initializers.Pending = nil + pod.Initializers.Result = &metav1.Status{Status: metav1.StatusFailure, Code: 403, Reason: metav1.StatusReasonForbidden, Message: "induced failure"} + updated, _, err := registry.Update(ctx, podA.Name, rest.DefaultUpdatedObjectInfo(pod, scheme)) + if err != nil { + t.Fatal(err) + } + pod = updated.(*example.Pod) + if !isFailedInitialization(pod) { + t.Fatalf("unexpected update: %#v", pod.Initializers) + } + + event = <-w.ResultChan() + if event.Type != watch.Modified || !isFailedInitialization(event.Object.(*example.Pod)) { + t.Fatalf("unexpected event: %s %#v", event.Type, event.Object) + } + + event = <-w.ResultChan() + if event.Type != watch.Deleted || !isFailedInitialization(event.Object.(*example.Pod)) { + t.Fatalf("unexpected event: %s %#v", event.Type, event.Object) + } + w.Stop() + close(ch) + }() + + // create the object + _, err := registry.Create(ctx, podA, false) + if !errors.IsForbidden(err) { + t.Fatalf("unexpected error: %#v", err.(errors.APIStatus).Status()) + } + if err.(errors.APIStatus).Status().Message != "induced failure" { + t.Fatalf("unexpected error: %#v", err) + } + + <-ch + + // get the object + _, err = registry.Get(ctx, podA.Name, &metav1.GetOptions{}) + if !errors.IsNotFound(err) { + t.Fatalf("Unexpected error: %v", err) + } +} + func updateAndVerify(t *testing.T, ctx genericapirequest.Context, registry *Store, pod *example.Pod) bool { obj, _, err := registry.Update(ctx, pod.Name, rest.DefaultUpdatedObjectInfo(pod, scheme)) if err != nil { @@ -440,7 +643,7 @@ func TestNoOpUpdates(t *testing.T) { var err error var createResult runtime.Object - if createResult, err = registry.Create(genericapirequest.NewDefaultContext(), newPod()); err != nil { + if createResult, err = registry.Create(genericapirequest.NewDefaultContext(), newPod(), false); err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -615,7 +818,7 @@ func TestStoreDelete(t *testing.T) { } // create pod - _, err = registry.Create(testContext, podA) + _, err = registry.Create(testContext, podA, false) if err != nil { t.Errorf("Unexpected error: %v", err) } @@ -687,7 +890,7 @@ func TestGracefulStoreHandleFinalizers(t *testing.T) { registry.DeleteStrategy = testGracefulStrategy{defaultDeleteStrategy} defer destroyFunc() // create pod - _, err := registry.Create(testContext, podWithFinalizer) + _, err := registry.Create(testContext, podWithFinalizer, false) if err != nil { t.Errorf("Unexpected error: %v", err) } @@ -735,6 +938,43 @@ func TestGracefulStoreHandleFinalizers(t *testing.T) { } } +func TestFailedInitializationStoreUpdate(t *testing.T) { + initialGeneration := int64(1) + podInitializing := &example.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "foo", Initializers: &metav1.Initializers{Pending: []metav1.Initializer{{Name: "Test"}}}, Generation: initialGeneration}, + Spec: example.PodSpec{NodeName: "machine"}, + } + + testContext := genericapirequest.WithNamespace(genericapirequest.NewContext(), "test") + destroyFunc, registry := NewTestGenericStoreRegistry(t) + registry.EnableGarbageCollection = true + defaultDeleteStrategy := testRESTStrategy{scheme, names.SimpleNameGenerator, true, false, true} + registry.DeleteStrategy = testGracefulStrategy{defaultDeleteStrategy} + defer destroyFunc() + + // create pod, view initializing + obj, err := registry.Create(testContext, podInitializing, true) + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + pod := obj.(*example.Pod) + + // update the pod with initialization failure, the pod should be deleted + pod.Initializers.Result = &metav1.Status{Status: metav1.StatusFailure} + result, _, err := registry.Update(testContext, podInitializing.Name, rest.DefaultUpdatedObjectInfo(pod, scheme)) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + _, err = registry.Get(testContext, podInitializing.Name, &metav1.GetOptions{}) + if err == nil || !errors.IsNotFound(err) { + t.Fatalf("Unexpected error: %v", err) + } + pod = result.(*example.Pod) + if pod.Initializers == nil || pod.Initializers.Result == nil || pod.Initializers.Result.Status != metav1.StatusFailure { + t.Fatalf("Pod returned from update was not correct: %#v", pod) + } +} + func TestNonGracefulStoreHandleFinalizers(t *testing.T) { initialGeneration := int64(1) podWithFinalizer := &example.Pod{ @@ -747,7 +987,7 @@ func TestNonGracefulStoreHandleFinalizers(t *testing.T) { registry.EnableGarbageCollection = true defer destroyFunc() // create pod - _, err := registry.Create(testContext, podWithFinalizer) + _, err := registry.Create(testContext, podWithFinalizer, false) if err != nil { t.Errorf("Unexpected error: %v", err) } @@ -1048,7 +1288,7 @@ func TestStoreDeleteWithOrphanDependents(t *testing.T) { for _, tc := range testcases { registry.DeleteStrategy = tc.strategy // create pod - _, err := registry.Create(testContext, tc.pod) + _, err := registry.Create(testContext, tc.pod, false) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -1267,7 +1507,7 @@ func TestStoreDeletionPropagation(t *testing.T) { i++ pod := createPod(i, tc.existingFinalizers) // create pod - _, err := registry.Create(testContext, pod) + _, err := registry.Create(testContext, pod, false) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -1311,10 +1551,10 @@ func TestStoreDeleteCollection(t *testing.T) { destroyFunc, registry := NewTestGenericStoreRegistry(t) defer destroyFunc() - if _, err := registry.Create(testContext, podA); err != nil { + if _, err := registry.Create(testContext, podA, false); err != nil { t.Errorf("Unexpected error: %v", err) } - if _, err := registry.Create(testContext, podB); err != nil { + if _, err := registry.Create(testContext, podB, false); err != nil { t.Errorf("Unexpected error: %v", err) } @@ -1347,10 +1587,10 @@ func TestStoreDeleteCollectionNotFound(t *testing.T) { for i := 0; i < 10; i++ { // Setup - if _, err := registry.Create(testContext, podA); err != nil { + if _, err := registry.Create(testContext, podA, false); err != nil { t.Errorf("Unexpected error: %v", err) } - if _, err := registry.Create(testContext, podB); err != nil { + if _, err := registry.Create(testContext, podB, false); err != nil { t.Errorf("Unexpected error: %v", err) } @@ -1386,7 +1626,7 @@ func TestStoreDeleteCollectionWithWatch(t *testing.T) { destroyFunc, registry := NewTestGenericStoreRegistry(t) defer destroyFunc() - objCreated, err := registry.Create(testContext, podA) + objCreated, err := registry.Create(testContext, podA, false) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -1455,7 +1695,7 @@ func TestStoreWatch(t *testing.T) { if err != nil { t.Errorf("%v: unexpected error: %v", name, err) } else { - obj, err := registry.Create(testContext, podA) + obj, err := registry.Create(testContext, podA, false) if err != nil { got, open := <-wi.ResultChan() if !open { @@ -1530,12 +1770,12 @@ func newTestGenericStoreRegistry(t *testing.T, scheme *runtime.Scheme, hasCacheE return storage.SelectionPredicate{ Label: label, Field: field, - GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) { + GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, bool, error) { pod, ok := obj.(*example.Pod) if !ok { - return nil, nil, fmt.Errorf("not a pod") + return nil, nil, false, fmt.Errorf("not a pod") } - return labels.Set(pod.ObjectMeta.Labels), generic.ObjectMetaFieldsSet(&pod.ObjectMeta, true), nil + return labels.Set(pod.ObjectMeta.Labels), generic.ObjectMetaFieldsSet(&pod.ObjectMeta, true), pod.Initializers != nil, nil }, } }, diff --git a/staging/src/k8s.io/apiserver/pkg/registry/rest/rest.go b/staging/src/k8s.io/apiserver/pkg/registry/rest/rest.go index 015754f8602..4421ffcac34 100644 --- a/staging/src/k8s.io/apiserver/pkg/registry/rest/rest.go +++ b/staging/src/k8s.io/apiserver/pkg/registry/rest/rest.go @@ -174,8 +174,9 @@ type Creater interface { // This object must be a pointer type for use with Codec.DecodeInto([]byte, runtime.Object) New() runtime.Object - // Create creates a new version of a resource. - Create(ctx genericapirequest.Context, obj runtime.Object) (runtime.Object, error) + // Create creates a new version of a resource. If includeUninitialized is set, the object may be returned + // without completing initialization. + Create(ctx genericapirequest.Context, obj runtime.Object, includeUninitialized bool) (runtime.Object, error) } // NamedCreater is an object that can create an instance of a RESTful object using a name parameter. @@ -186,8 +187,9 @@ type NamedCreater interface { // Create creates a new version of a resource. It expects a name parameter from the path. // This is needed for create operations on subresources which include the name of the parent - // resource in the path. - Create(ctx genericapirequest.Context, name string, obj runtime.Object) (runtime.Object, error) + // resource in the path. If includeUninitialized is set, the object may be returned without + // completing initialization. + Create(ctx genericapirequest.Context, name string, obj runtime.Object, includeUninitialized bool) (runtime.Object, error) } // UpdatedObjectInfo provides information about an updated object to an Updater. diff --git a/staging/src/k8s.io/apiserver/pkg/registry/rest/resttest/resttest.go b/staging/src/k8s.io/apiserver/pkg/registry/rest/resttest/resttest.go index be2fb94fe38..f1e2a1bad12 100644 --- a/staging/src/k8s.io/apiserver/pkg/registry/rest/resttest/resttest.go +++ b/staging/src/k8s.io/apiserver/pkg/registry/rest/resttest/resttest.go @@ -251,7 +251,7 @@ func (t *Tester) testCreateAlreadyExisting(obj runtime.Object, createFn CreateFu } defer t.delete(ctx, foo) - _, err := t.storage.(rest.Creater).Create(ctx, foo) + _, err := t.storage.(rest.Creater).Create(ctx, foo, false) if !errors.IsAlreadyExists(err) { t.Errorf("expected already exists err, got %v", err) } @@ -263,7 +263,7 @@ func (t *Tester) testCreateEquals(obj runtime.Object, getFn GetFunc) { foo := copyOrDie(obj, t.scheme) t.setObjectMeta(foo, t.namer(2)) - created, err := t.storage.(rest.Creater).Create(ctx, foo) + created, err := t.storage.(rest.Creater).Create(ctx, foo, false) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -291,7 +291,7 @@ func (t *Tester) testCreateDiscardsObjectNamespace(valid runtime.Object) { objectMeta.SetNamespace("not-default") // Ideally, we'd get an error back here, but at least verify the namespace wasn't persisted - created, err := t.storage.(rest.Creater).Create(t.TestContext(), copyOrDie(valid, t.scheme)) + created, err := t.storage.(rest.Creater).Create(t.TestContext(), copyOrDie(valid, t.scheme), false) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -307,7 +307,7 @@ func (t *Tester) testCreateGeneratesName(valid runtime.Object) { objectMeta.SetName("") objectMeta.SetGenerateName("test-") - created, err := t.storage.(rest.Creater).Create(t.TestContext(), valid) + created, err := t.storage.(rest.Creater).Create(t.TestContext(), valid, false) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -322,7 +322,7 @@ func (t *Tester) testCreateHasMetadata(valid runtime.Object) { objectMeta.SetName(t.namer(1)) objectMeta.SetNamespace(t.TestNamespace()) - obj, err := t.storage.(rest.Creater).Create(t.TestContext(), valid) + obj, err := t.storage.(rest.Creater).Create(t.TestContext(), valid, false) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -340,7 +340,7 @@ func (t *Tester) testCreateIgnoresContextNamespace(valid runtime.Object) { ctx := genericapirequest.WithNamespace(genericapirequest.NewContext(), "not-default2") // Ideally, we'd get an error back here, but at least verify the namespace wasn't persisted - created, err := t.storage.(rest.Creater).Create(ctx, copyOrDie(valid, t.scheme)) + created, err := t.storage.(rest.Creater).Create(ctx, copyOrDie(valid, t.scheme), false) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -359,7 +359,7 @@ func (t *Tester) testCreateIgnoresMismatchedNamespace(valid runtime.Object) { ctx := genericapirequest.WithNamespace(genericapirequest.NewContext(), "not-default2") // Ideally, we'd get an error back here, but at least verify the namespace wasn't persisted - created, err := t.storage.(rest.Creater).Create(ctx, copyOrDie(valid, t.scheme)) + created, err := t.storage.(rest.Creater).Create(ctx, copyOrDie(valid, t.scheme), false) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -377,7 +377,7 @@ func (t *Tester) testCreateValidatesNames(valid runtime.Object) { objCopyMeta.SetName(invalidName) ctx := t.TestContext() - _, err := t.storage.(rest.Creater).Create(ctx, objCopy) + _, err := t.storage.(rest.Creater).Create(ctx, objCopy, false) if !errors.IsInvalid(err) { t.Errorf("%s: Expected to get an invalid resource error, got '%v'", invalidName, err) } @@ -389,7 +389,7 @@ func (t *Tester) testCreateValidatesNames(valid runtime.Object) { objCopyMeta.SetName(objCopyMeta.GetName() + invalidSuffix) ctx := t.TestContext() - _, err := t.storage.(rest.Creater).Create(ctx, objCopy) + _, err := t.storage.(rest.Creater).Create(ctx, objCopy, false) if !errors.IsInvalid(err) { t.Errorf("%s: Expected to get an invalid resource error, got '%v'", invalidSuffix, err) } @@ -399,7 +399,7 @@ func (t *Tester) testCreateValidatesNames(valid runtime.Object) { func (t *Tester) testCreateInvokesValidation(invalid ...runtime.Object) { for i, obj := range invalid { ctx := t.TestContext() - _, err := t.storage.(rest.Creater).Create(ctx, obj) + _, err := t.storage.(rest.Creater).Create(ctx, obj, false) if !errors.IsInvalid(err) { t.Errorf("%d: Expected to get an invalid resource error, got %v", i, err) } @@ -410,7 +410,7 @@ func (t *Tester) testCreateRejectsMismatchedNamespace(valid runtime.Object) { objectMeta := t.getObjectMetaOrFail(valid) objectMeta.SetNamespace("not-default") - _, err := t.storage.(rest.Creater).Create(t.TestContext(), valid) + _, err := t.storage.(rest.Creater).Create(t.TestContext(), valid, false) if err == nil { t.Errorf("Expected an error, but we didn't get one") } else if !strings.Contains(err.Error(), "does not match the namespace sent on the request") { @@ -424,7 +424,7 @@ func (t *Tester) testCreateResetsUserData(valid runtime.Object) { objectMeta.SetUID("bad-uid") objectMeta.SetCreationTimestamp(now) - obj, err := t.storage.(rest.Creater).Create(t.TestContext(), valid) + obj, err := t.storage.(rest.Creater).Create(t.TestContext(), valid, false) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -442,7 +442,7 @@ func (t *Tester) testCreateIgnoreClusterName(valid runtime.Object) { objectMeta.SetName(t.namer(3)) objectMeta.SetClusterName("clustername-to-ignore") - obj, err := t.storage.(rest.Creater).Create(t.TestContext(), copyOrDie(valid, t.scheme)) + obj, err := t.storage.(rest.Creater).Create(t.TestContext(), copyOrDie(valid, t.scheme), false) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -1071,14 +1071,14 @@ func (t *Tester) testGetDifferentNamespace(obj runtime.Object) { ctx1 := genericapirequest.WithNamespace(genericapirequest.NewContext(), "bar3") objMeta.SetNamespace(genericapirequest.NamespaceValue(ctx1)) - _, err := t.storage.(rest.Creater).Create(ctx1, obj) + _, err := t.storage.(rest.Creater).Create(ctx1, obj, false) if err != nil { t.Errorf("unexpected error: %v", err) } ctx2 := genericapirequest.WithNamespace(genericapirequest.NewContext(), "bar4") objMeta.SetNamespace(genericapirequest.NamespaceValue(ctx2)) - _, err = t.storage.(rest.Creater).Create(ctx2, obj) + _, err = t.storage.(rest.Creater).Create(ctx2, obj, false) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -1112,7 +1112,7 @@ func (t *Tester) testGetFound(obj runtime.Object) { ctx := t.TestContext() t.setObjectMeta(obj, t.namer(1)) - existing, err := t.storage.(rest.Creater).Create(ctx, obj) + existing, err := t.storage.(rest.Creater).Create(ctx, obj, false) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -1135,7 +1135,7 @@ func (t *Tester) testGetMimatchedNamespace(obj runtime.Object) { objMeta := t.getObjectMetaOrFail(obj) objMeta.SetName(t.namer(4)) objMeta.SetNamespace(genericapirequest.NamespaceValue(ctx1)) - _, err := t.storage.(rest.Creater).Create(ctx1, obj) + _, err := t.storage.(rest.Creater).Create(ctx1, obj, false) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -1154,7 +1154,7 @@ func (t *Tester) testGetMimatchedNamespace(obj runtime.Object) { func (t *Tester) testGetNotFound(obj runtime.Object) { ctx := t.TestContext() t.setObjectMeta(obj, t.namer(2)) - _, err := t.storage.(rest.Creater).Create(ctx, obj) + _, err := t.storage.(rest.Creater).Create(ctx, obj, false) if err != nil { t.Errorf("unexpected error: %v", err) } diff --git a/staging/src/k8s.io/apiserver/pkg/server/options/admission.go b/staging/src/k8s.io/apiserver/pkg/server/options/admission.go index 760f4fc3da7..0a86aefa078 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/options/admission.go +++ b/staging/src/k8s.io/apiserver/pkg/server/options/admission.go @@ -41,7 +41,7 @@ type AdmissionOptions struct { func NewAdmissionOptions() *AdmissionOptions { options := &AdmissionOptions{ Plugins: &admission.Plugins{}, - PluginNames: []string{}, + PluginNames: []string{"Initializers"}, } server.RegisterAllAdmissionPlugins(options.Plugins) return options diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher.go index ac5e0160822..5cc9e93e152 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher.go @@ -62,8 +62,8 @@ type CacherConfig struct { // KeyFunc is used to get a key in the underlying storage for a given object. KeyFunc func(runtime.Object) (string, error) - // GetAttrsFunc is used to get object labels and fields. - GetAttrsFunc func(runtime.Object) (labels.Set, fields.Set, error) + // GetAttrsFunc is used to get object labels, fields, and the uninitialized bool + GetAttrsFunc func(runtime.Object) (label labels.Set, field fields.Set, uninitialized bool, err error) // TriggerPublisherFunc is used for optimizing amount of watchers that // needs to process an incoming event. @@ -131,7 +131,7 @@ func (i *indexedWatchers) terminateAll(objectType reflect.Type) { } } -type watchFilterFunc func(string, labels.Set, fields.Set) bool +type watchFilterFunc func(key string, l labels.Set, f fields.Set, uninitialized bool) bool // Cacher is responsible for serving WATCH and LIST requests for a given // resource from its internal cache and updating its cache in the background @@ -658,11 +658,11 @@ func filterFunction(key string, p SelectionPredicate) func(string, runtime.Objec } func watchFilterFunction(key string, p SelectionPredicate) watchFilterFunc { - filterFunc := func(objKey string, label labels.Set, field fields.Set) bool { + filterFunc := func(objKey string, label labels.Set, field fields.Set, uninitialized bool) bool { if !hasPathPrefix(objKey, key) { return false } - return p.MatchesLabelsAndFields(label, field) + return p.MatchesObjectAttributes(label, field, uninitialized) } return filterFunc } @@ -840,10 +840,10 @@ func (c *cacheWatcher) add(event *watchCacheEvent, budget *timeBudget) { // NOTE: sendWatchCacheEvent is assumed to not modify !!! func (c *cacheWatcher) sendWatchCacheEvent(event *watchCacheEvent) { - curObjPasses := event.Type != watch.Deleted && c.filter(event.Key, event.ObjLabels, event.ObjFields) + curObjPasses := event.Type != watch.Deleted && c.filter(event.Key, event.ObjLabels, event.ObjFields, event.ObjUninitialized) oldObjPasses := false if event.PrevObject != nil { - oldObjPasses = c.filter(event.Key, event.PrevObjLabels, event.PrevObjFields) + oldObjPasses = c.filter(event.Key, event.PrevObjLabels, event.PrevObjFields, event.PrevObjUninitialized) } if !curObjPasses && !oldObjPasses { // Watcher is not interested in that object. diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher_whitebox_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher_whitebox_test.go index 9a80b5f0da1..b8973d0b685 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher_whitebox_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher_whitebox_test.go @@ -37,7 +37,7 @@ import ( func TestCacheWatcherCleanupNotBlockedByResult(t *testing.T) { var lock sync.RWMutex count := 0 - filter := func(string, labels.Set, fields.Set) bool { return true } + filter := func(string, labels.Set, fields.Set, bool) bool { return true } forget := func(bool) { lock.Lock() defer lock.Unlock() @@ -61,7 +61,7 @@ func TestCacheWatcherCleanupNotBlockedByResult(t *testing.T) { } func TestCacheWatcherHandlesFiltering(t *testing.T) { - filter := func(_ string, _ labels.Set, field fields.Set) bool { + filter := func(_ string, _ labels.Set, field fields.Set, _ bool) bool { return field["spec.nodeName"] == "host" } forget := func(bool) {} diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd/etcd_helper_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd/etcd_helper_test.go index 9cc29673509..c6b046a4a76 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd/etcd_helper_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd/etcd_helper_test.go @@ -249,9 +249,9 @@ func TestListFiltered(t *testing.T) { p := storage.SelectionPredicate{ Label: labels.Everything(), Field: fields.SelectorFromSet(fields.Set{"metadata.name": "bar"}), - GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) { + GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, bool, error) { pod := obj.(*example.Pod) - return labels.Set(pod.Labels), fields.Set{"metadata.name": pod.Name}, nil + return labels.Set(pod.Labels), fields.Set{"metadata.name": pod.Name}, pod.Initializers != nil, nil }, } var got example.PodList diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go index 77f397e8046..cc1d76c7789 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go @@ -285,9 +285,9 @@ func TestGetToList(t *testing.T) { pred: storage.SelectionPredicate{ Label: labels.Everything(), Field: fields.ParseSelectorOrDie("metadata.name!=" + storedObj.Name), - GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) { + GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, bool, error) { pod := obj.(*example.Pod) - return nil, fields.Set{"metadata.name": pod.Name}, nil + return nil, fields.Set{"metadata.name": pod.Name}, pod.Initializers != nil, nil }, }, expectedOut: nil, @@ -644,9 +644,9 @@ func TestList(t *testing.T) { pred: storage.SelectionPredicate{ Label: labels.Everything(), Field: fields.ParseSelectorOrDie("metadata.name!=" + preset[0].storedObj.Name), - GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) { + GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, bool, error) { pod := obj.(*example.Pod) - return nil, fields.Set{"metadata.name": pod.Name}, nil + return nil, fields.Set{"metadata.name": pod.Name}, pod.Initializers != nil, nil }, }, expectedOut: nil, diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go index cb172564d3d..50b46371021 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go @@ -118,7 +118,7 @@ func (w *watcher) createWatchChan(ctx context.Context, key string, rev int64, re resultChan: make(chan watch.Event, outgoingBufSize), errChan: make(chan error, 1), } - if pred.Label.Empty() && pred.Field.Empty() { + if pred.Empty() { // The filter doesn't filter out any object. wc.internalFilter = nil } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go index 58cbfe4c29b..052e4dc73a9 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go @@ -73,9 +73,9 @@ func testWatch(t *testing.T, recursive bool) { pred: storage.SelectionPredicate{ Label: labels.Everything(), Field: fields.ParseSelectorOrDie("metadata.name=bar"), - GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) { + GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, bool, error) { pod := obj.(*example.Pod) - return nil, fields.Set{"metadata.name": pod.Name}, nil + return nil, fields.Set{"metadata.name": pod.Name}, pod.Initializers != nil, nil }, }, }, { // update @@ -88,9 +88,9 @@ func testWatch(t *testing.T, recursive bool) { pred: storage.SelectionPredicate{ Label: labels.Everything(), Field: fields.ParseSelectorOrDie("metadata.name!=bar"), - GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) { + GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, bool, error) { pod := obj.(*example.Pod) - return nil, fields.Set{"metadata.name": pod.Name}, nil + return nil, fields.Set{"metadata.name": pod.Name}, pod.Initializers != nil, nil }, }, }} diff --git a/staging/src/k8s.io/apiserver/pkg/storage/interfaces.go b/staging/src/k8s.io/apiserver/pkg/storage/interfaces.go index e8181c3e848..74abfdc3e0c 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/interfaces.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/interfaces.go @@ -72,6 +72,8 @@ type FilterFunc func(obj runtime.Object) bool var Everything = SelectionPredicate{ Label: labels.Everything(), Field: fields.Everything(), + // TODO: split this into a new top level constant? + IncludeUninitialized: true, } // Pass an UpdateFunc to Interface.GuaranteedUpdate to make an update diff --git a/staging/src/k8s.io/apiserver/pkg/storage/selection_predicate.go b/staging/src/k8s.io/apiserver/pkg/storage/selection_predicate.go index 8878245d1f2..3a345303b7c 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/selection_predicate.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/selection_predicate.go @@ -22,29 +22,33 @@ import ( "k8s.io/apimachinery/pkg/runtime" ) -// AttrFunc returns label and field sets for List or Watch to match. +// AttrFunc returns label and field sets and the uninitialized flag for List or Watch to match. // In any failure to parse given object, it returns error. -type AttrFunc func(obj runtime.Object) (labels.Set, fields.Set, error) +type AttrFunc func(obj runtime.Object) (labels.Set, fields.Set, bool, error) // SelectionPredicate is used to represent the way to select objects from api storage. type SelectionPredicate struct { - Label labels.Selector - Field fields.Selector - GetAttrs AttrFunc - IndexFields []string + Label labels.Selector + Field fields.Selector + IncludeUninitialized bool + GetAttrs AttrFunc + IndexFields []string } // Matches returns true if the given object's labels and fields (as // returned by s.GetAttrs) match s.Label and s.Field. An error is // returned if s.GetAttrs fails. func (s *SelectionPredicate) Matches(obj runtime.Object) (bool, error) { - if s.Label.Empty() && s.Field.Empty() { + if s.Empty() { return true, nil } - labels, fields, err := s.GetAttrs(obj) + labels, fields, uninitialized, err := s.GetAttrs(obj) if err != nil { return false, err } + if !s.IncludeUninitialized && uninitialized { + return false, nil + } matched := s.Label.Matches(labels) if matched && s.Field != nil { matched = (matched && s.Field.Matches(fields)) @@ -52,9 +56,12 @@ func (s *SelectionPredicate) Matches(obj runtime.Object) (bool, error) { return matched, nil } -// MatchesLabelsAndFields returns true if the given labels and fields +// MatchesObjectAttributes returns true if the given labels and fields // match s.Label and s.Field. -func (s *SelectionPredicate) MatchesLabelsAndFields(l labels.Set, f fields.Set) bool { +func (s *SelectionPredicate) MatchesObjectAttributes(l labels.Set, f fields.Set, uninitialized bool) bool { + if !s.IncludeUninitialized && uninitialized { + return false + } if s.Label.Empty() && s.Field.Empty() { return true } @@ -93,10 +100,11 @@ func (s *SelectionPredicate) RemoveMatchesSingleRequirements() (SelectionPredica } } return SelectionPredicate{ - Label: s.Label, - Field: fieldsSelector, - GetAttrs: s.GetAttrs, - IndexFields: s.IndexFields, + Label: s.Label, + Field: fieldsSelector, + IncludeUninitialized: s.IncludeUninitialized, + GetAttrs: s.GetAttrs, + IndexFields: s.IndexFields, }, nil } @@ -113,3 +121,8 @@ func (s *SelectionPredicate) MatcherIndex() []MatchValue { } return result } + +// Empty returns true if the predicate performs no filtering. +func (s *SelectionPredicate) Empty() bool { + return s.Label.Empty() && s.Field.Empty() && s.IncludeUninitialized +} diff --git a/staging/src/k8s.io/apiserver/pkg/storage/selection_predicate_test.go b/staging/src/k8s.io/apiserver/pkg/storage/selection_predicate_test.go index d51666e21be..3c5da649a63 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/selection_predicate_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/selection_predicate_test.go @@ -42,6 +42,7 @@ func TestSelectionPredicate(t *testing.T) { labelSelector, fieldSelector string labels labels.Set fields fields.Set + uninitialized bool err error shouldMatch bool matchSingleKey string @@ -74,6 +75,14 @@ func TestSelectionPredicate(t *testing.T) { shouldMatch: true, matchSingleKey: "12345", }, + "E": { + fieldSelector: "metadata.name=12345", + labels: labels.Set{}, + fields: fields.Set{"metadata.name": "12345"}, + uninitialized: true, + shouldMatch: false, + matchSingleKey: "12345", + }, "error": { labelSelector: "name=foo", fieldSelector: "uid=12345", @@ -94,8 +103,8 @@ func TestSelectionPredicate(t *testing.T) { sp := &SelectionPredicate{ Label: parsedLabel, Field: parsedField, - GetAttrs: func(runtime.Object) (label labels.Set, field fields.Set, err error) { - return item.labels, item.fields, item.err + GetAttrs: func(runtime.Object) (label labels.Set, field fields.Set, uninitialized bool, err error) { + return item.labels, item.fields, item.uninitialized, item.err }, } got, err := sp.Matches(&Ignored{}) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go b/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go index 619c3ea8368..ef0c4f38de7 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go @@ -61,12 +61,12 @@ func init() { } // GetAttrs returns labels and fields of a given object for filtering purposes. -func GetAttrs(obj runtime.Object) (labels.Set, fields.Set, error) { +func GetAttrs(obj runtime.Object) (labels.Set, fields.Set, bool, error) { pod, ok := obj.(*example.Pod) if !ok { - return nil, nil, fmt.Errorf("not a pod") + return nil, nil, false, fmt.Errorf("not a pod") } - return labels.Set(pod.ObjectMeta.Labels), PodToSelectableFields(pod), nil + return labels.Set(pod.ObjectMeta.Labels), PodToSelectableFields(pod), pod.Initializers != nil, nil } // PodToSelectableFields returns a field set that represents the object @@ -469,12 +469,12 @@ func TestFiltering(t *testing.T) { pred := storage.SelectionPredicate{ Label: labels.SelectorFromSet(labels.Set{"filter": "foo"}), Field: fields.Everything(), - GetAttrs: func(obj runtime.Object) (label labels.Set, field fields.Set, err error) { + GetAttrs: func(obj runtime.Object) (label labels.Set, field fields.Set, uninitialized bool, err error) { metadata, err := meta.Accessor(obj) if err != nil { t.Fatalf("Unexpected error: %v", err) } - return labels.Set(metadata.GetLabels()), nil, nil + return labels.Set(metadata.GetLabels()), nil, metadata.GetInitializers() != nil, nil }, } watcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", fooCreated.ResourceVersion, pred) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/watch_cache.go b/staging/src/k8s.io/apiserver/pkg/storage/watch_cache.go index beca63488d8..1268b9d7a51 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/watch_cache.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/watch_cache.go @@ -47,15 +47,17 @@ const ( // the previous value of the object to enable proper filtering in the // upper layers. type watchCacheEvent struct { - Type watch.EventType - Object runtime.Object - ObjLabels labels.Set - ObjFields fields.Set - PrevObject runtime.Object - PrevObjLabels labels.Set - PrevObjFields fields.Set - Key string - ResourceVersion uint64 + Type watch.EventType + Object runtime.Object + ObjLabels labels.Set + ObjFields fields.Set + ObjUninitialized bool + PrevObject runtime.Object + PrevObjLabels labels.Set + PrevObjFields fields.Set + PrevObjUninitialized bool + Key string + ResourceVersion uint64 } // Computing a key of an object is generally non-trivial (it performs @@ -102,7 +104,7 @@ type watchCache struct { keyFunc func(runtime.Object) (string, error) // getAttrsFunc is used to get labels and fields of an object. - getAttrsFunc func(runtime.Object) (labels.Set, fields.Set, error) + getAttrsFunc func(runtime.Object) (labels.Set, fields.Set, bool, error) // cache is used a cyclic buffer - its first element (with the smallest // resourceVersion) is defined by startIndex, its last element is defined @@ -136,7 +138,7 @@ type watchCache struct { func newWatchCache( capacity int, keyFunc func(runtime.Object) (string, error), - getAttrsFunc func(runtime.Object) (labels.Set, fields.Set, error)) *watchCache { + getAttrsFunc func(runtime.Object) (labels.Set, fields.Set, bool, error)) *watchCache { wc := &watchCache{ capacity: capacity, keyFunc: keyFunc, @@ -229,30 +231,33 @@ func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, upd if err != nil { return err } - objLabels, objFields, err := w.getAttrsFunc(event.Object) + objLabels, objFields, objUninitialized, err := w.getAttrsFunc(event.Object) if err != nil { return err } var prevObject runtime.Object var prevObjLabels labels.Set var prevObjFields fields.Set + var prevObjUninitialized bool if exists { prevObject = previous.(*storeElement).Object - prevObjLabels, prevObjFields, err = w.getAttrsFunc(prevObject) + prevObjLabels, prevObjFields, prevObjUninitialized, err = w.getAttrsFunc(prevObject) if err != nil { return err } } watchCacheEvent := &watchCacheEvent{ - Type: event.Type, - Object: event.Object, - ObjLabels: objLabels, - ObjFields: objFields, - PrevObject: prevObject, - PrevObjLabels: prevObjLabels, - PrevObjFields: prevObjFields, - Key: key, - ResourceVersion: resourceVersion, + Type: event.Type, + Object: event.Object, + ObjLabels: objLabels, + ObjFields: objFields, + ObjUninitialized: objUninitialized, + PrevObject: prevObject, + PrevObjLabels: prevObjLabels, + PrevObjFields: prevObjFields, + PrevObjUninitialized: prevObjUninitialized, + Key: key, + ResourceVersion: resourceVersion, } if w.onEvent != nil { w.onEvent(watchCacheEvent) @@ -425,17 +430,18 @@ func (w *watchCache) GetAllEventsSinceThreadUnsafe(resourceVersion uint64) ([]*w if !ok { return nil, fmt.Errorf("not a storeElement: %v", elem) } - objLabels, objFields, err := w.getAttrsFunc(elem.Object) + objLabels, objFields, objUninitialized, err := w.getAttrsFunc(elem.Object) if err != nil { return nil, err } result[i] = &watchCacheEvent{ - Type: watch.Added, - Object: elem.Object, - ObjLabels: objLabels, - ObjFields: objFields, - Key: elem.Key, - ResourceVersion: w.resourceVersion, + Type: watch.Added, + Object: elem.Object, + ObjLabels: objLabels, + ObjFields: objFields, + ObjUninitialized: objUninitialized, + Key: elem.Key, + ResourceVersion: w.resourceVersion, } } return result, nil diff --git a/staging/src/k8s.io/apiserver/pkg/storage/watch_cache_test.go b/staging/src/k8s.io/apiserver/pkg/storage/watch_cache_test.go index 744326d5ba6..9f8a360e00e 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/watch_cache_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/watch_cache_test.go @@ -50,8 +50,8 @@ func newTestWatchCache(capacity int) *watchCache { keyFunc := func(obj runtime.Object) (string, error) { return NamespaceKeyFunc("prefix", obj) } - getAttrsFunc := func(obj runtime.Object) (labels.Set, fields.Set, error) { - return nil, nil, nil + getAttrsFunc := func(obj runtime.Object) (labels.Set, fields.Set, bool, error) { + return nil, nil, false, nil } wc := newWatchCache(capacity, keyFunc, getAttrsFunc) wc.clock = clock.NewFakeClock(time.Now()) diff --git a/staging/src/k8s.io/kube-aggregator/pkg/registry/apiservice/strategy.go b/staging/src/k8s.io/kube-aggregator/pkg/registry/apiservice/strategy.go index a07d5439426..fdcb6637f76 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/registry/apiservice/strategy.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/registry/apiservice/strategy.go @@ -113,12 +113,12 @@ func (apiServerStatusStrategy) ValidateUpdate(ctx genericapirequest.Context, obj return validation.ValidateAPIServiceStatusUpdate(obj.(*apiregistration.APIService), old.(*apiregistration.APIService)) } -func GetAttrs(obj runtime.Object) (labels.Set, fields.Set, error) { +func GetAttrs(obj runtime.Object) (labels.Set, fields.Set, bool, error) { apiserver, ok := obj.(*apiregistration.APIService) if !ok { - return nil, nil, fmt.Errorf("given object is not a APIService.") + return nil, nil, false, fmt.Errorf("given object is not a APIService.") } - return labels.Set(apiserver.ObjectMeta.Labels), APIServiceToSelectableFields(apiserver), nil + return labels.Set(apiserver.ObjectMeta.Labels), APIServiceToSelectableFields(apiserver), apiserver.Initializers != nil, nil } // MatchAPIService is the filter used by the generic etcd backend to watch events diff --git a/staging/src/k8s.io/kube-apiextensions-server/pkg/registry/customresource/strategy.go b/staging/src/k8s.io/kube-apiextensions-server/pkg/registry/customresource/strategy.go index c2b739f3cec..bafe064ac1c 100644 --- a/staging/src/k8s.io/kube-apiextensions-server/pkg/registry/customresource/strategy.go +++ b/staging/src/k8s.io/kube-apiextensions-server/pkg/registry/customresource/strategy.go @@ -83,12 +83,12 @@ func (CustomResourceDefinitionStorageStrategy) ValidateUpdate(ctx genericapirequ return validation.ValidateObjectMetaAccessorUpdate(objAccessor, oldAccessor, field.NewPath("metadata")) } -func (a CustomResourceDefinitionStorageStrategy) GetAttrs(obj runtime.Object) (labels.Set, fields.Set, error) { +func (a CustomResourceDefinitionStorageStrategy) GetAttrs(obj runtime.Object) (labels.Set, fields.Set, bool, error) { accessor, err := meta.Accessor(obj) if err != nil { - return nil, nil, err + return nil, nil, false, err } - return labels.Set(accessor.GetLabels()), objectMetaFieldsSet(accessor, a.namespaceScoped), nil + return labels.Set(accessor.GetLabels()), objectMetaFieldsSet(accessor, a.namespaceScoped), accessor.GetInitializers() != nil, nil } // objectMetaFieldsSet returns a fields that represent the ObjectMeta. diff --git a/staging/src/k8s.io/kube-apiextensions-server/pkg/registry/customresourcedefinition/strategy.go b/staging/src/k8s.io/kube-apiextensions-server/pkg/registry/customresourcedefinition/strategy.go index 07e77c5204d..e802102bbe6 100644 --- a/staging/src/k8s.io/kube-apiextensions-server/pkg/registry/customresourcedefinition/strategy.go +++ b/staging/src/k8s.io/kube-apiextensions-server/pkg/registry/customresourcedefinition/strategy.go @@ -107,12 +107,12 @@ func (statusStrategy) ValidateUpdate(ctx genericapirequest.Context, obj, old run return validation.ValidateUpdateCustomResourceDefinitionStatus(obj.(*apiextensions.CustomResourceDefinition), old.(*apiextensions.CustomResourceDefinition)) } -func GetAttrs(obj runtime.Object) (labels.Set, fields.Set, error) { +func GetAttrs(obj runtime.Object) (labels.Set, fields.Set, bool, error) { apiserver, ok := obj.(*apiextensions.CustomResourceDefinition) if !ok { - return nil, nil, fmt.Errorf("given object is not a CustomResourceDefinition.") + return nil, nil, false, fmt.Errorf("given object is not a CustomResourceDefinition.") } - return labels.Set(apiserver.ObjectMeta.Labels), CustomResourceDefinitionToSelectableFields(apiserver), nil + return labels.Set(apiserver.ObjectMeta.Labels), CustomResourceDefinitionToSelectableFields(apiserver), apiserver.Initializers != nil, nil } // MatchCustomResourceDefinition is the filter used by the generic etcd backend to watch events diff --git a/staging/src/k8s.io/sample-apiserver/pkg/registry/wardle/strategy.go b/staging/src/k8s.io/sample-apiserver/pkg/registry/wardle/strategy.go index 12b1a6a8172..24a4cb16cb3 100644 --- a/staging/src/k8s.io/sample-apiserver/pkg/registry/wardle/strategy.go +++ b/staging/src/k8s.io/sample-apiserver/pkg/registry/wardle/strategy.go @@ -71,12 +71,12 @@ func (apiServerStrategy) ValidateUpdate(ctx genericapirequest.Context, obj, old // return validation.ValidateFlunderUpdate(obj.(*wardle.Flunder), old.(*wardle.Flunder)) } -func GetAttrs(obj runtime.Object) (labels.Set, fields.Set, error) { +func GetAttrs(obj runtime.Object) (labels.Set, fields.Set, bool, error) { apiserver, ok := obj.(*wardle.Flunder) if !ok { - return nil, nil, fmt.Errorf("given object is not a Flunder.") + return nil, nil, false, fmt.Errorf("given object is not a Flunder.") } - return labels.Set(apiserver.ObjectMeta.Labels), FlunderToSelectableFields(apiserver), nil + return labels.Set(apiserver.ObjectMeta.Labels), FlunderToSelectableFields(apiserver), apiserver.Initializers != nil, nil } // MatchFlunder is the filter used by the generic etcd backend to watch events diff --git a/test/e2e/BUILD b/test/e2e/BUILD index 5155a8bd6f4..1896139378e 100644 --- a/test/e2e/BUILD +++ b/test/e2e/BUILD @@ -23,6 +23,7 @@ go_test( "//pkg/metrics:go_default_library", "//test/e2e/autoscaling:go_default_library", "//test/e2e/cluster-logging:go_default_library", + "//test/e2e/extension:go_default_library", "//test/e2e/framework:go_default_library", "//test/e2e/perf:go_default_library", "//test/e2e/scheduling:go_default_library", @@ -234,6 +235,7 @@ filegroup( "//test/e2e/chaosmonkey:all-srcs", "//test/e2e/cluster-logging:all-srcs", "//test/e2e/common:all-srcs", + "//test/e2e/extension:all-srcs", "//test/e2e/framework:all-srcs", "//test/e2e/generated:all-srcs", "//test/e2e/perf:all-srcs", diff --git a/test/e2e/e2e_test.go b/test/e2e/e2e_test.go index 0d382763696..fe4672e9051 100644 --- a/test/e2e/e2e_test.go +++ b/test/e2e/e2e_test.go @@ -21,6 +21,7 @@ import ( _ "k8s.io/kubernetes/test/e2e/autoscaling" _ "k8s.io/kubernetes/test/e2e/cluster-logging" + _ "k8s.io/kubernetes/test/e2e/extension" "k8s.io/kubernetes/test/e2e/framework" _ "k8s.io/kubernetes/test/e2e/perf" _ "k8s.io/kubernetes/test/e2e/scheduling" diff --git a/test/e2e/extension/BUILD b/test/e2e/extension/BUILD new file mode 100644 index 00000000000..bc9bf9064a4 --- /dev/null +++ b/test/e2e/extension/BUILD @@ -0,0 +1,36 @@ +package(default_visibility = ["//visibility:public"]) + +licenses(["notice"]) + +load( + "@io_bazel_rules_go//go:def.bzl", + "go_library", +) + +go_library( + name = "go_default_library", + srcs = ["initializers.go"], + tags = ["automanaged"], + deps = [ + "//pkg/api/v1:go_default_library", + "//test/e2e/framework:go_default_library", + "//vendor/github.com/onsi/ginkgo:go_default_library", + "//vendor/github.com/onsi/gomega:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", + ], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], +) diff --git a/test/e2e/extension/initializers.go b/test/e2e/extension/initializers.go new file mode 100644 index 00000000000..49922ac2b3f --- /dev/null +++ b/test/e2e/extension/initializers.go @@ -0,0 +1,121 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package extension + +import ( + "fmt" + "strings" + "time" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/kubernetes/pkg/api/v1" + "k8s.io/kubernetes/test/e2e/framework" +) + +var _ = framework.KubeDescribe("Initializers", func() { + f := framework.NewDefaultFramework("initializers") + + // TODO: Add failure traps once we have JustAfterEach + // See https://github.com/onsi/ginkgo/issues/303 + + It("should be invisible to controllers by default", func() { + ns := f.Namespace.Name + c := f.ClientSet + + podName := "uninitialized-pod" + framework.Logf("Creating pod %s", podName) + + ch := make(chan struct{}) + go func() { + _, err := c.Core().Pods(ns).Create(newUninitializedPod(podName)) + Expect(err).NotTo(HaveOccurred()) + close(ch) + }() + + // wait to ensure the scheduler does not act on an uninitialized pod + err := wait.PollImmediate(2*time.Second, 15*time.Second, func() (bool, error) { + p, err := c.Core().Pods(ns).Get(podName, metav1.GetOptions{}) + if err != nil { + if errors.IsNotFound(err) { + return false, nil + } + return false, err + } + return len(p.Spec.NodeName) > 0, nil + }) + Expect(err).To(Equal(wait.ErrWaitTimeout)) + + // verify that we can update an initializing pod + pod, err := c.Core().Pods(ns).Get(podName, metav1.GetOptions{}) + pod.Annotations = map[string]string{"update-1": "test"} + pod, err = c.Core().Pods(ns).Update(pod) + Expect(err).NotTo(HaveOccurred()) + + // clear initializers + pod.Initializers = nil + pod, err = c.Core().Pods(ns).Update(pod) + Expect(err).NotTo(HaveOccurred()) + + // pod should now start running + err = framework.WaitForPodRunningInNamespace(c, pod) + Expect(err).NotTo(HaveOccurred()) + + // ensure create call returns + <-ch + + // verify that we cannot start the pod initializing again + pod, err = c.Core().Pods(ns).Get(podName, metav1.GetOptions{}) + Expect(err).NotTo(HaveOccurred()) + pod.Initializers = &metav1.Initializers{ + Pending: []metav1.Initializer{{Name: "Other"}}, + } + _, err = c.Core().Pods(ns).Update(pod) + if !errors.IsInvalid(err) || !strings.Contains(err.Error(), "immutable") { + Fail(fmt.Sprintf("expected invalid error: %v", err)) + } + }) + +}) + +func newUninitializedPod(podName string) *v1.Pod { + containerName := fmt.Sprintf("%s-container", podName) + port := 8080 + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: podName, + Initializers: &metav1.Initializers{ + Pending: []metav1.Initializer{{Name: "Test"}}, + }, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: containerName, + Image: "gcr.io/google_containers/porter:4524579c0eb935c056c8e75563b4e1eda31587e0", + Env: []v1.EnvVar{{Name: fmt.Sprintf("SERVE_PORT_%d", port), Value: "foo"}}, + Ports: []v1.ContainerPort{{ContainerPort: int32(port)}}, + }, + }, + RestartPolicy: v1.RestartPolicyNever, + }, + } + return pod +}