diff --git a/cmd/kube-apiserver/app/BUILD b/cmd/kube-apiserver/app/BUILD index 46a7a00e941..1377ae4916a 100644 --- a/cmd/kube-apiserver/app/BUILD +++ b/cmd/kube-apiserver/app/BUILD @@ -52,6 +52,7 @@ go_library( "//plugin/pkg/admission/exec:go_default_library", "//plugin/pkg/admission/gc:go_default_library", "//plugin/pkg/admission/imagepolicy:go_default_library", + "//plugin/pkg/admission/initialization:go_default_library", "//plugin/pkg/admission/initialresources:go_default_library", "//plugin/pkg/admission/limitranger:go_default_library", "//plugin/pkg/admission/namespace/autoprovision:go_default_library", diff --git a/cmd/kube-apiserver/app/plugins.go b/cmd/kube-apiserver/app/plugins.go index f287a9a47d0..deab5b6f232 100644 --- a/cmd/kube-apiserver/app/plugins.go +++ b/cmd/kube-apiserver/app/plugins.go @@ -33,6 +33,7 @@ import ( "k8s.io/kubernetes/plugin/pkg/admission/exec" "k8s.io/kubernetes/plugin/pkg/admission/gc" "k8s.io/kubernetes/plugin/pkg/admission/imagepolicy" + "k8s.io/kubernetes/plugin/pkg/admission/initialization" "k8s.io/kubernetes/plugin/pkg/admission/initialresources" "k8s.io/kubernetes/plugin/pkg/admission/limitranger" "k8s.io/kubernetes/plugin/pkg/admission/namespace/autoprovision" @@ -59,6 +60,7 @@ func registerAllAdmissionPlugins(plugins *admission.Plugins) { exec.Register(plugins) gc.Register(plugins) imagepolicy.Register(plugins) + initialization.Register(plugins) initialresources.Register(plugins) limitranger.Register(plugins) autoprovision.Register(plugins) diff --git a/federation/cmd/federation-apiserver/app/BUILD b/federation/cmd/federation-apiserver/app/BUILD index 0966608dce0..830442e65d1 100644 --- a/federation/cmd/federation-apiserver/app/BUILD +++ b/federation/cmd/federation-apiserver/app/BUILD @@ -67,6 +67,7 @@ go_library( "//plugin/pkg/admission/admit:go_default_library", "//plugin/pkg/admission/deny:go_default_library", "//plugin/pkg/admission/gc:go_default_library", + "//plugin/pkg/admission/initialization:go_default_library", "//vendor/github.com/go-openapi/spec:go_default_library", "//vendor/github.com/golang/glog:go_default_library", "//vendor/github.com/spf13/cobra:go_default_library", diff --git a/federation/cmd/federation-apiserver/app/plugins.go b/federation/cmd/federation-apiserver/app/plugins.go index 61c5902eb12..168a0ef6e64 100644 --- a/federation/cmd/federation-apiserver/app/plugins.go +++ b/federation/cmd/federation-apiserver/app/plugins.go @@ -28,6 +28,7 @@ import ( "k8s.io/kubernetes/plugin/pkg/admission/admit" "k8s.io/kubernetes/plugin/pkg/admission/deny" "k8s.io/kubernetes/plugin/pkg/admission/gc" + "k8s.io/kubernetes/plugin/pkg/admission/initialization" ) // registerAllAdmissionPlugins registers all admission plugins @@ -35,4 +36,5 @@ func registerAllAdmissionPlugins(plugins *admission.Plugins) { admit.Register(plugins) deny.Register(plugins) gc.Register(plugins) + initialization.Register(plugins) } diff --git a/pkg/registry/authentication/tokenreview/storage.go b/pkg/registry/authentication/tokenreview/storage.go index 3c681ef9d2a..f1ddd5a1572 100644 --- a/pkg/registry/authentication/tokenreview/storage.go +++ b/pkg/registry/authentication/tokenreview/storage.go @@ -76,3 +76,7 @@ func (r *REST) Create(ctx genericapirequest.Context, obj runtime.Object) (runtim return tokenReview, nil } + +func (r *REST) CreateInitialized(ctx genericapirequest.Context, obj runtime.Object) (runtime.Object, error) { + return r.Create(ctx, obj) +} diff --git a/pkg/registry/authorization/localsubjectaccessreview/rest.go b/pkg/registry/authorization/localsubjectaccessreview/rest.go index 379a42b96ba..61d526a39e6 100644 --- a/pkg/registry/authorization/localsubjectaccessreview/rest.go +++ b/pkg/registry/authorization/localsubjectaccessreview/rest.go @@ -69,3 +69,7 @@ func (r *REST) Create(ctx genericapirequest.Context, obj runtime.Object) (runtim return localSubjectAccessReview, nil } + +func (r *REST) CreateInitialized(ctx genericapirequest.Context, obj runtime.Object) (runtime.Object, error) { + return r.Create(ctx, obj) +} diff --git a/pkg/registry/authorization/selfsubjectaccessreview/rest.go b/pkg/registry/authorization/selfsubjectaccessreview/rest.go index 94b4c06d727..1c77f9351bc 100644 --- a/pkg/registry/authorization/selfsubjectaccessreview/rest.go +++ b/pkg/registry/authorization/selfsubjectaccessreview/rest.go @@ -72,3 +72,7 @@ func (r *REST) Create(ctx genericapirequest.Context, obj runtime.Object) (runtim return selfSAR, nil } + +func (r *REST) CreateInitialized(ctx genericapirequest.Context, obj runtime.Object) (runtime.Object, error) { + return r.Create(ctx, obj) +} diff --git a/pkg/registry/authorization/subjectaccessreview/rest.go b/pkg/registry/authorization/subjectaccessreview/rest.go index 5b984b4c8bf..3b54f1f4275 100644 --- a/pkg/registry/authorization/subjectaccessreview/rest.go +++ b/pkg/registry/authorization/subjectaccessreview/rest.go @@ -62,3 +62,7 @@ func (r *REST) Create(ctx genericapirequest.Context, obj runtime.Object) (runtim return subjectAccessReview, nil } + +func (r *REST) CreateInitialized(ctx genericapirequest.Context, obj runtime.Object) (runtime.Object, error) { + return r.Create(ctx, obj) +} diff --git a/pkg/registry/core/pod/storage/eviction.go b/pkg/registry/core/pod/storage/eviction.go index 214680cce0a..acda808a0df 100644 --- a/pkg/registry/core/pod/storage/eviction.go +++ b/pkg/registry/core/pod/storage/eviction.go @@ -145,6 +145,11 @@ func (r *EvictionREST) Create(ctx genericapirequest.Context, obj runtime.Object) return &metav1.Status{Status: metav1.StatusSuccess}, nil } +// CreateInitialized will ensure the pod is evicted. +func (r *EvictionREST) CreateInitialized(ctx genericapirequest.Context, obj runtime.Object) (runtime.Object, error) { + return r.Create(ctx, obj) +} + // checkAndDecrement checks if the provided PodDisruptionBudget allows any disruption. func (r *EvictionREST) checkAndDecrement(namespace string, podName string, pdb policy.PodDisruptionBudget) (ok bool, err error) { if pdb.Status.ObservedGeneration < pdb.Generation { diff --git a/pkg/registry/core/pod/storage/storage.go b/pkg/registry/core/pod/storage/storage.go index d4107fff034..79964908ddb 100644 --- a/pkg/registry/core/pod/storage/storage.go +++ b/pkg/registry/core/pod/storage/storage.go @@ -143,6 +143,11 @@ func (r *BindingREST) Create(ctx genericapirequest.Context, obj runtime.Object) return } +// CreateInitialized will ensure the pod is bound. +func (r *BindingREST) CreateInitialized(ctx genericapirequest.Context, obj runtime.Object) (runtime.Object, error) { + return r.Create(ctx, obj) +} + // setPodHostAndAnnotations sets the given pod's host to 'machine' if and only if it was // previously 'oldMachine' and merges the provided annotations with those of the pod. // Returns the current state of the pod, or an error. diff --git a/pkg/registry/core/service/rest.go b/pkg/registry/core/service/rest.go index eadf3050319..cc8ec6472e6 100644 --- a/pkg/registry/core/service/rest.go +++ b/pkg/registry/core/service/rest.go @@ -192,6 +192,11 @@ func (rs *REST) Create(ctx genericapirequest.Context, obj runtime.Object) (runti return out, err } +// TODO: fix services to support initialization by using generic.Store +func (rs *REST) CreateInitialized(ctx genericapirequest.Context, obj runtime.Object) (runtime.Object, error) { + return rs.Create(ctx, obj) +} + func (rs *REST) Delete(ctx genericapirequest.Context, id string) (runtime.Object, error) { service, err := rs.registry.GetService(ctx, id, &metav1.GetOptions{}) if err != nil { diff --git a/pkg/registry/extensions/deployment/storage/storage.go b/pkg/registry/extensions/deployment/storage/storage.go index 2b65ec9aa9f..22a28882144 100644 --- a/pkg/registry/extensions/deployment/storage/storage.go +++ b/pkg/registry/extensions/deployment/storage/storage.go @@ -144,6 +144,10 @@ func (r *RollbackREST) Create(ctx genericapirequest.Context, obj runtime.Object) }, nil } +func (r *RollbackREST) CreateInitialized(ctx genericapirequest.Context, obj runtime.Object) (runtime.Object, error) { + return r.Create(ctx, obj) +} + func (r *RollbackREST) rollbackDeployment(ctx genericapirequest.Context, deploymentID string, config *extensions.RollbackConfig, annotations map[string]string) error { if _, err := r.setDeploymentRollback(ctx, deploymentID, config, annotations); err != nil { err = storeerr.InterpretGetError(err, extensions.Resource("deployments"), deploymentID) diff --git a/pkg/registry/rbac/clusterrole/policybased/storage.go b/pkg/registry/rbac/clusterrole/policybased/storage.go index 12ad18b5859..0d56bd5ddc9 100644 --- a/pkg/registry/rbac/clusterrole/policybased/storage.go +++ b/pkg/registry/rbac/clusterrole/policybased/storage.go @@ -52,6 +52,10 @@ func (s *Storage) Create(ctx genericapirequest.Context, obj runtime.Object) (run return s.StandardStorage.Create(ctx, obj) } +func (s *Storage) CreateInitialized(ctx genericapirequest.Context, obj runtime.Object) (runtime.Object, error) { + return s.Create(ctx, obj) +} + func (s *Storage) Update(ctx genericapirequest.Context, name string, obj rest.UpdatedObjectInfo) (runtime.Object, bool, error) { if rbacregistry.EscalationAllowed(ctx) { return s.StandardStorage.Update(ctx, name, obj) diff --git a/pkg/registry/rbac/clusterrolebinding/policybased/storage.go b/pkg/registry/rbac/clusterrolebinding/policybased/storage.go index 6794f88c696..6794c583f8a 100644 --- a/pkg/registry/rbac/clusterrolebinding/policybased/storage.go +++ b/pkg/registry/rbac/clusterrolebinding/policybased/storage.go @@ -63,6 +63,10 @@ func (s *Storage) Create(ctx genericapirequest.Context, obj runtime.Object) (run return s.StandardStorage.Create(ctx, obj) } +func (s *Storage) CreateInitialized(ctx genericapirequest.Context, obj runtime.Object) (runtime.Object, error) { + return s.Create(ctx, obj) +} + func (s *Storage) Update(ctx genericapirequest.Context, name string, obj rest.UpdatedObjectInfo) (runtime.Object, bool, error) { if rbacregistry.EscalationAllowed(ctx) { return s.StandardStorage.Update(ctx, name, obj) diff --git a/pkg/registry/rbac/role/policybased/storage.go b/pkg/registry/rbac/role/policybased/storage.go index 879b10419f8..fb51b37be79 100644 --- a/pkg/registry/rbac/role/policybased/storage.go +++ b/pkg/registry/rbac/role/policybased/storage.go @@ -52,6 +52,10 @@ func (s *Storage) Create(ctx genericapirequest.Context, obj runtime.Object) (run return s.StandardStorage.Create(ctx, obj) } +func (s *Storage) CreateInitialized(ctx genericapirequest.Context, obj runtime.Object) (runtime.Object, error) { + return s.Create(ctx, obj) +} + func (s *Storage) Update(ctx genericapirequest.Context, name string, obj rest.UpdatedObjectInfo) (runtime.Object, bool, error) { if rbacregistry.EscalationAllowed(ctx) { return s.StandardStorage.Update(ctx, name, obj) diff --git a/pkg/registry/rbac/rolebinding/policybased/storage.go b/pkg/registry/rbac/rolebinding/policybased/storage.go index ad53ea3a6c2..13adaa1dcac 100644 --- a/pkg/registry/rbac/rolebinding/policybased/storage.go +++ b/pkg/registry/rbac/rolebinding/policybased/storage.go @@ -69,6 +69,10 @@ func (s *Storage) Create(ctx genericapirequest.Context, obj runtime.Object) (run return s.StandardStorage.Create(ctx, obj) } +func (s *Storage) CreateInitialized(ctx genericapirequest.Context, obj runtime.Object) (runtime.Object, error) { + return s.Create(ctx, obj) +} + func (s *Storage) Update(ctx genericapirequest.Context, name string, obj rest.UpdatedObjectInfo) (runtime.Object, bool, error) { if rbacregistry.EscalationAllowed(ctx) { return s.StandardStorage.Update(ctx, name, obj) diff --git a/plugin/BUILD b/plugin/BUILD index 2af6b49bbce..5209dbdd37a 100644 --- a/plugin/BUILD +++ b/plugin/BUILD @@ -22,6 +22,7 @@ filegroup( "//plugin/pkg/admission/exec:all-srcs", "//plugin/pkg/admission/gc:all-srcs", "//plugin/pkg/admission/imagepolicy:all-srcs", + "//plugin/pkg/admission/initialization:all-srcs", "//plugin/pkg/admission/initialresources:all-srcs", "//plugin/pkg/admission/limitranger:all-srcs", "//plugin/pkg/admission/namespace/autoprovision:all-srcs", diff --git a/plugin/pkg/admission/initialization/BUILD b/plugin/pkg/admission/initialization/BUILD new file mode 100644 index 00000000000..88838997ef7 --- /dev/null +++ b/plugin/pkg/admission/initialization/BUILD @@ -0,0 +1,38 @@ +package(default_visibility = ["//visibility:public"]) + +licenses(["notice"]) + +load( + "@io_bazel_rules_go//go:def.bzl", + "go_library", +) + +go_library( + name = "go_default_library", + srcs = ["initialization.go"], + tags = ["automanaged"], + deps = [ + "//vendor/github.com/golang/glog:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/api/meta:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/api/validation:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/validation/field:go_default_library", + "//vendor/k8s.io/apiserver/pkg/admission:go_default_library", + "//vendor/k8s.io/apiserver/pkg/authorization/authorizer:go_default_library", + ], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], +) diff --git a/plugin/pkg/admission/initialization/initialization.go b/plugin/pkg/admission/initialization/initialization.go new file mode 100644 index 00000000000..fe7db064f7a --- /dev/null +++ b/plugin/pkg/admission/initialization/initialization.go @@ -0,0 +1,169 @@ +/* +Copyright 2014 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package initialization + +import ( + "fmt" + "io" + + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/api/validation" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/validation/field" + "k8s.io/apiserver/pkg/admission" + "k8s.io/apiserver/pkg/authorization/authorizer" +) + +// Register registers a plugin +func Register(plugins *admission.Plugins) { + plugins.Register("Initializers", func(config io.Reader) (admission.Interface, error) { + return NewInitializer(), nil + }) +} + +type initializerOptions struct { + Initializers []string +} + +type initializer struct { + resources map[schema.GroupResource]initializerOptions + authorizer authorizer.Authorizer +} + +// NewAlwaysAdmit creates a new always admit admission handler +func NewInitializer() admission.Interface { + return &initializer{ + resources: map[schema.GroupResource]initializerOptions{ + //schema.GroupResource{Resource: "pods"}: {Initializers: []string{"Test"}}, + }, + } +} + +func (i *initializer) Validate() error { + if i.authorizer == nil { + return fmt.Errorf("requires authorizer") + } + return nil +} + +func (i *initializer) SetAuthorizer(a authorizer.Authorizer) { + i.authorizer = a +} + +var initializerFieldPath = field.NewPath("metadata", "initializers") + +func (i *initializer) Admit(a admission.Attributes) (err error) { + // TODO: sub-resource action should be denied until the object is initialized + if len(a.GetSubresource()) > 0 { + return nil + } + + resource, ok := i.resources[a.GetResource().GroupResource()] + if !ok { + return nil + } + + switch a.GetOperation() { + case admission.Create: + accessor, err := meta.Accessor(a.GetObject()) + if err != nil { + // objects without meta accessor cannot be checked for initialization, and it is possible to make calls + // via our API that don't have ObjectMeta + return nil + } + existing := accessor.GetInitializers() + // it must be possible for some users to bypass initialization - for now, check the initialize operation + if existing != nil { + if err := i.canInitialize(a); err != nil { + return err + } + } + + // TODO: pull this from config + accessor.SetInitializers(copiedInitializers(resource.Initializers)) + + case admission.Update: + accessor, err := meta.Accessor(a.GetObject()) + if err != nil { + // objects without meta accessor cannot be checked for initialization, and it is possible to make calls + // via our API that don't have ObjectMeta + return nil + } + updated := accessor.GetInitializers() + + existingAccessor, err := meta.Accessor(a.GetOldObject()) + if err != nil { + // if the old object does not have an accessor, but the new one does, error out + return fmt.Errorf("initialized resources must be able to set initializers (%T): %v", a.GetOldObject(), err) + } + existing := existingAccessor.GetInitializers() + + // because we are called before validation, we need to ensure the update transition is valid. + if errs := validation.ValidateInitializersUpdate(updated, existing, initializerFieldPath); len(errs) > 0 { + return errors.NewInvalid(a.GetKind().GroupKind(), a.GetName(), errs) + } + + // caller must have the ability to mutate un-initialized resources + if err := i.canInitialize(a); err != nil { + return err + } + + // TODO: restrict initialization list changes to specific clients? + } + + return nil +} + +func (i *initializer) canInitialize(a admission.Attributes) error { + // caller must have the ability to mutate un-initialized resources + authorized, reason, err := i.authorizer.Authorize(authorizer.AttributesRecord{ + Name: a.GetName(), + ResourceRequest: true, + User: a.GetUserInfo(), + Verb: "initialize", + Namespace: a.GetNamespace(), + APIGroup: a.GetResource().Group, + APIVersion: a.GetResource().Version, + Resource: a.GetResource().Resource, + }) + if err != nil { + return err + } + if !authorized { + return fmt.Errorf("user must have permission to initialize resources: %s", reason) + } + return nil +} + +func (i *initializer) Handles(op admission.Operation) bool { + return true +} + +func copiedInitializers(names []string) *metav1.Initializers { + if len(names) == 0 { + return nil + } + var init []metav1.Initializer + for _, name := range names { + init = append(init, metav1.Initializer{Name: name}) + } + return &metav1.Initializers{ + Pending: init, + } +} diff --git a/staging/src/k8s.io/apimachinery/pkg/api/validation/objectmeta.go b/staging/src/k8s.io/apimachinery/pkg/api/validation/objectmeta.go index 04f605843df..84bd9cdedc6 100644 --- a/staging/src/k8s.io/apimachinery/pkg/api/validation/objectmeta.go +++ b/staging/src/k8s.io/apimachinery/pkg/api/validation/objectmeta.go @@ -184,10 +184,41 @@ func ValidateObjectMetaAccessor(meta metav1.Object, requiresNamespace bool, name allErrs = append(allErrs, v1validation.ValidateLabels(meta.GetLabels(), fldPath.Child("labels"))...) allErrs = append(allErrs, ValidateAnnotations(meta.GetAnnotations(), fldPath.Child("annotations"))...) allErrs = append(allErrs, ValidateOwnerReferences(meta.GetOwnerReferences(), fldPath.Child("ownerReferences"))...) + allErrs = append(allErrs, ValidateInitializers(meta.GetInitializers(), fldPath.Child("initializers"))...) allErrs = append(allErrs, ValidateFinalizers(meta.GetFinalizers(), fldPath.Child("finalizers"))...) return allErrs } +func ValidateInitializers(initializers *metav1.Initializers, fldPath *field.Path) field.ErrorList { + var allErrs field.ErrorList + if initializers == nil { + return allErrs + } + for i, initializer := range initializers.Pending { + for _, msg := range validation.IsQualifiedName(initializer.Name) { + allErrs = append(allErrs, field.Invalid(fldPath.Child("pending").Index(i), initializer.Name, msg)) + } + } + allErrs = append(allErrs, validateInitializersResult(initializers.Result, fldPath.Child("result"))...) + if len(initializers.Pending) == 0 && initializers.Result == nil { + allErrs = append(allErrs, field.Invalid(fldPath.Child("pending"), nil, "must be non-empty when result is not set")) + } + return allErrs +} + +func validateInitializersResult(result *metav1.Status, fldPath *field.Path) field.ErrorList { + var allErrs field.ErrorList + if result == nil { + return allErrs + } + switch result.Status { + case metav1.StatusFailure: + default: + allErrs = append(allErrs, field.Invalid(fldPath.Child("status"), result.Status, "must be 'Failure'")) + } + return allErrs +} + // ValidateFinalizers tests if the finalizers name are valid, and if there are conflicting finalizers. func ValidateFinalizers(finalizers []string, fldPath *field.Path) field.ErrorList { allErrs := field.ErrorList{} @@ -226,7 +257,7 @@ func ValidateObjectMetaUpdate(newMeta, oldMeta *metav1.ObjectMeta, fldPath *fiel } func ValidateObjectMetaAccessorUpdate(newMeta, oldMeta metav1.Object, fldPath *field.Path) field.ErrorList { - allErrs := field.ErrorList{} + var allErrs field.ErrorList if !RepairMalformedUpdates && newMeta.GetUID() != oldMeta.GetUID() { allErrs = append(allErrs, field.Invalid(fldPath.Child("uid"), newMeta.GetUID(), "field is immutable")) @@ -276,6 +307,8 @@ func ValidateObjectMetaAccessorUpdate(newMeta, oldMeta metav1.Object, fldPath *f allErrs = append(allErrs, field.Invalid(fldPath.Child("generation"), newMeta.GetGeneration(), "must not be decremented")) } + allErrs = append(allErrs, ValidateInitializersUpdate(newMeta.GetInitializers(), oldMeta.GetInitializers(), fldPath.Child("initializers"))...) + allErrs = append(allErrs, ValidateImmutableField(newMeta.GetName(), oldMeta.GetName(), fldPath.Child("name"))...) allErrs = append(allErrs, ValidateImmutableField(newMeta.GetNamespace(), oldMeta.GetNamespace(), fldPath.Child("namespace"))...) allErrs = append(allErrs, ValidateImmutableField(newMeta.GetUID(), oldMeta.GetUID(), fldPath.Child("uid"))...) @@ -288,3 +321,28 @@ func ValidateObjectMetaAccessorUpdate(newMeta, oldMeta metav1.Object, fldPath *f return allErrs } + +// ValidateInitializersUpdate checks the update of the metadata initializers field +func ValidateInitializersUpdate(newInit, oldInit *metav1.Initializers, fldPath *field.Path) field.ErrorList { + var allErrs field.ErrorList + switch { + case oldInit == nil && newInit != nil: + // Initializers may not be set on new objects + allErrs = append(allErrs, field.Invalid(fldPath, nil, "field is immutable once initialization has completed")) + case oldInit != nil && newInit == nil: + // this is a valid transition and means initialization was successful + case oldInit != nil && newInit != nil: + // validate changes to initializers + switch { + case oldInit.Result == nil && newInit.Result != nil: + // setting a result is allowed + allErrs = append(allErrs, validateInitializersResult(newInit.Result, fldPath.Child("result"))...) + case oldInit.Result != nil: + // setting Result implies permanent failure, and all future updates will be prevented + allErrs = append(allErrs, ValidateImmutableField(newInit.Result, oldInit.Result, fldPath.Child("result"))...) + default: + // leaving the result nil is allowed + } + } + return allErrs +} diff --git a/staging/src/k8s.io/apimachinery/pkg/apis/meta/internalversion/conversion.go b/staging/src/k8s.io/apimachinery/pkg/apis/meta/internalversion/conversion.go index 73e7d475624..eff9a7e12e4 100644 --- a/staging/src/k8s.io/apimachinery/pkg/apis/meta/internalversion/conversion.go +++ b/staging/src/k8s.io/apimachinery/pkg/apis/meta/internalversion/conversion.go @@ -18,6 +18,7 @@ package internalversion import ( "fmt" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/conversion" "k8s.io/apimachinery/pkg/util/validation/field" @@ -30,6 +31,7 @@ func Convert_internalversion_ListOptions_To_v1_ListOptions(in *ListOptions, out if err := metav1.Convert_labels_Selector_To_string(&in.LabelSelector, &out.LabelSelector, s); err != nil { return err } + out.IncludeUninitialized = in.IncludeUninitialized out.ResourceVersion = in.ResourceVersion out.TimeoutSeconds = in.TimeoutSeconds out.Watch = in.Watch @@ -43,6 +45,7 @@ func Convert_v1_ListOptions_To_internalversion_ListOptions(in *metav1.ListOption if err := metav1.Convert_string_To_labels_Selector(&in.LabelSelector, &out.LabelSelector, s); err != nil { return err } + out.IncludeUninitialized = in.IncludeUninitialized out.ResourceVersion = in.ResourceVersion out.TimeoutSeconds = in.TimeoutSeconds out.Watch = in.Watch diff --git a/staging/src/k8s.io/apimachinery/pkg/apis/meta/internalversion/types.go b/staging/src/k8s.io/apimachinery/pkg/apis/meta/internalversion/types.go index 576baeac64e..0aa4188df2d 100644 --- a/staging/src/k8s.io/apimachinery/pkg/apis/meta/internalversion/types.go +++ b/staging/src/k8s.io/apimachinery/pkg/apis/meta/internalversion/types.go @@ -33,7 +33,7 @@ type ListOptions struct { FieldSelector fields.Selector // If true, partially initialized resources are included in the response. // +optional - IncludeUninitialized bool `json:"includeUninitialized,omitempty"` + IncludeUninitialized bool // If true, watch for changes to this list Watch bool // When specified with a watch call, shows changes that occur after that particular version of a resource. diff --git a/staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured/BUILD b/staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured/BUILD index 0b4366c93ae..b6b0d2bb5d4 100644 --- a/staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured/BUILD +++ b/staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured/BUILD @@ -22,9 +22,11 @@ go_library( deps = [ "//vendor/github.com/golang/glog:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/conversion/unstructured:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/json:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library", ], ) diff --git a/staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured/unstructured.go b/staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured/unstructured.go index a74ed02689a..d9a4f887b7a 100644 --- a/staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured/unstructured.go +++ b/staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured/unstructured.go @@ -27,10 +27,12 @@ import ( "github.com/golang/glog" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/conversion/unstructured" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/json" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" ) // Unstructured allows objects that do not have Golang structs registered to be manipulated @@ -452,12 +454,34 @@ func (u *Unstructured) GroupVersionKind() schema.GroupVersionKind { return gvk } +var converter = unstructured.NewConverter(false) + func (u *Unstructured) GetInitializers() *metav1.Initializers { - panic("not implemented") + field := getNestedField(u.Object, "metadata", "initializers") + if field == nil { + return nil + } + obj, ok := field.(map[string]interface{}) + if !ok { + return nil + } + out := &metav1.Initializers{} + if err := converter.FromUnstructured(obj, out); err != nil { + utilruntime.HandleError(fmt.Errorf("unable to retrieve initializers for object: %v", err)) + } + return out } func (u *Unstructured) SetInitializers(initializers *metav1.Initializers) { - panic("not implemented") + if initializers == nil { + setNestedField(u.Object, nil, "metadata", "initializers") + return + } + out := make(map[string]interface{}) + if err := converter.ToUnstructured(initializers, &out); err != nil { + utilruntime.HandleError(fmt.Errorf("unable to retrieve initializers for object: %v", err)) + } + setNestedField(u.Object, out, "metadata", "initializers") } func (u *Unstructured) GetFinalizers() []string { diff --git a/staging/src/k8s.io/apimachinery/pkg/conversion/unstructured/BUILD b/staging/src/k8s.io/apimachinery/pkg/conversion/unstructured/BUILD index b0337b40f4c..0804afc3301 100644 --- a/staging/src/k8s.io/apimachinery/pkg/conversion/unstructured/BUILD +++ b/staging/src/k8s.io/apimachinery/pkg/conversion/unstructured/BUILD @@ -31,7 +31,6 @@ go_library( deps = [ "//vendor/github.com/golang/glog:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/equality:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/diff:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/json:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library", diff --git a/staging/src/k8s.io/apimachinery/pkg/conversion/unstructured/converter.go b/staging/src/k8s.io/apimachinery/pkg/conversion/unstructured/converter.go index fc28406b0b5..cf84a619898 100644 --- a/staging/src/k8s.io/apimachinery/pkg/conversion/unstructured/converter.go +++ b/staging/src/k8s.io/apimachinery/pkg/conversion/unstructured/converter.go @@ -29,7 +29,6 @@ import ( "sync/atomic" apiequality "k8s.io/apimachinery/pkg/api/equality" - "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/diff" "k8s.io/apimachinery/pkg/util/json" utilruntime "k8s.io/apimachinery/pkg/util/runtime" @@ -37,11 +36,11 @@ import ( "github.com/golang/glog" ) -// Converter is an interface for converting between runtime.Object +// Converter is an interface for converting between interface{} // and map[string]interface representation. type Converter interface { - ToUnstructured(obj runtime.Object, u *map[string]interface{}) error - FromUnstructured(u map[string]interface{}, obj runtime.Object) error + ToUnstructured(obj interface{}, u *map[string]interface{}) error + FromUnstructured(u map[string]interface{}, obj interface{}) error } type structField struct { @@ -92,7 +91,7 @@ func parseBool(key string) bool { return value } -// ConverterImpl knows how to convert betweek runtime.Object and +// ConverterImpl knows how to convert between interface{} and // Unstructured in both ways. type converterImpl struct { // If true, we will be additionally running conversion via json @@ -107,10 +106,15 @@ func NewConverter(mismatchDetection bool) Converter { } } -func (c *converterImpl) FromUnstructured(u map[string]interface{}, obj runtime.Object) error { - err := fromUnstructured(reflect.ValueOf(u), reflect.ValueOf(obj).Elem()) +func (c *converterImpl) FromUnstructured(u map[string]interface{}, obj interface{}) error { + t := reflect.TypeOf(obj) + value := reflect.ValueOf(obj) + if t.Kind() != reflect.Ptr || value.IsNil() { + return fmt.Errorf("FromUnstructured requires a non-nil pointer to an object, got %v", t) + } + err := fromUnstructured(reflect.ValueOf(u), value.Elem()) if c.mismatchDetection { - newObj := reflect.New(reflect.TypeOf(obj).Elem()).Interface().(runtime.Object) + newObj := reflect.New(t.Elem()).Interface() newErr := fromUnstructuredViaJSON(u, newObj) if (err != nil) != (newErr != nil) { glog.Fatalf("FromUnstructured unexpected error for %v: error: %v", u, err) @@ -122,7 +126,7 @@ func (c *converterImpl) FromUnstructured(u map[string]interface{}, obj runtime.O return err } -func fromUnstructuredViaJSON(u map[string]interface{}, obj runtime.Object) error { +func fromUnstructuredViaJSON(u map[string]interface{}, obj interface{}) error { data, err := json.Marshal(u) if err != nil { return err @@ -384,8 +388,13 @@ func interfaceFromUnstructured(sv, dv reflect.Value) error { return nil } -func (c *converterImpl) ToUnstructured(obj runtime.Object, u *map[string]interface{}) error { - err := toUnstructured(reflect.ValueOf(obj).Elem(), reflect.ValueOf(u).Elem()) +func (c *converterImpl) ToUnstructured(obj interface{}, u *map[string]interface{}) error { + t := reflect.TypeOf(obj) + value := reflect.ValueOf(obj) + if t.Kind() != reflect.Ptr || value.IsNil() { + return fmt.Errorf("ToUnstructured requires a non-nil pointer to an object, got %v", t) + } + err := toUnstructured(value.Elem(), reflect.ValueOf(u).Elem()) if c.mismatchDetection { newUnstr := &map[string]interface{}{} newErr := toUnstructuredViaJSON(obj, newUnstr) @@ -399,7 +408,7 @@ func (c *converterImpl) ToUnstructured(obj runtime.Object, u *map[string]interfa return err } -func toUnstructuredViaJSON(obj runtime.Object, u *map[string]interface{}) error { +func toUnstructuredViaJSON(obj interface{}, u *map[string]interface{}) error { data, err := json.Marshal(obj) if err != nil { return err diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/apiserver_test.go b/staging/src/k8s.io/apiserver/pkg/endpoints/apiserver_test.go index 24f0698f9ee..467e7dc5777 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/apiserver_test.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/apiserver_test.go @@ -407,6 +407,7 @@ type SimpleRESTStorage struct { fakeWatch *watch.FakeWatcher requestedLabelSelector labels.Selector requestedFieldSelector fields.Selector + requestedUninitialized bool requestedResourceVersion string requestedResourceNamespace string @@ -449,6 +450,7 @@ func (storage *SimpleRESTStorage) List(ctx request.Context, options *metainterna if options != nil && options.FieldSelector != nil { storage.requestedFieldSelector = options.FieldSelector } + storage.requestedUninitialized = options.IncludeUninitialized return result, storage.errors["list"] } @@ -522,7 +524,7 @@ func (storage *SimpleRESTStorage) NewList() runtime.Object { return &genericapitesting.SimpleList{} } -func (storage *SimpleRESTStorage) Create(ctx request.Context, obj runtime.Object) (runtime.Object, error) { +func (storage *SimpleRESTStorage) Create(ctx request.Context, obj runtime.Object, includeUninitialized bool) (runtime.Object, error) { storage.checkContext(ctx) storage.created = obj.(*genericapitesting.Simple) if err := storage.errors["create"]; err != nil { @@ -717,7 +719,7 @@ type NamedCreaterRESTStorage struct { createdName string } -func (storage *NamedCreaterRESTStorage) Create(ctx request.Context, name string, obj runtime.Object) (runtime.Object, error) { +func (storage *NamedCreaterRESTStorage) Create(ctx request.Context, name string, obj runtime.Object, includeUninitialized bool) (runtime.Object, error) { storage.checkContext(ctx) storage.created = obj.(*genericapitesting.Simple) storage.createdName = name @@ -1470,6 +1472,52 @@ func TestGet(t *testing.T) { } } +func TestGetUninitialized(t *testing.T) { + storage := map[string]rest.Storage{} + simpleStorage := SimpleRESTStorage{ + list: []genericapitesting.Simple{ + { + ObjectMeta: metav1.ObjectMeta{ + Initializers: &metav1.Initializers{ + Pending: []metav1.Initializer{{Name: "test"}}, + }, + }, + Other: "foo", + }, + }, + } + selfLinker := &setTestSelfLinker{ + t: t, + expectedSet: "/" + prefix + "/" + testGroupVersion.Group + "/" + testGroupVersion.Version + "/namespaces/default/simple/id", + alternativeSet: sets.NewString("/" + prefix + "/" + testGroupVersion.Group + "/" + testGroupVersion.Version + "/namespaces/default/simple"), + name: "id", + namespace: "default", + } + storage["simple"] = &simpleStorage + handler := handleLinker(storage, selfLinker) + server := httptest.NewServer(handler) + defer server.Close() + + resp, err := http.Get(server.URL + "/" + prefix + "/" + testGroupVersion.Group + "/" + testGroupVersion.Version + "/namespaces/default/simple?includeUninitialized=true") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if resp.StatusCode != http.StatusOK { + t.Fatalf("unexpected response: %#v", resp) + } + var itemOut genericapitesting.SimpleList + body, err := extractBody(resp, &itemOut) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + if len(itemOut.Items) != 1 || itemOut.Items[0].Other != "foo" { + t.Errorf("Unexpected data: %#v, expected %#v (%s)", itemOut, simpleStorage.item, string(body)) + } + if !simpleStorage.requestedUninitialized { + t.Errorf("Didn't set correct flag") + } +} + func TestGetPretty(t *testing.T) { storage := map[string]rest.Storage{} simpleStorage := SimpleRESTStorage{ diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/rest.go b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/rest.go index 578d1c95ff2..e57b694b61f 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/rest.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/rest.go @@ -449,13 +449,12 @@ func createHandler(r rest.NamedCreater, scope RequestScope, typer runtime.Object } } + // TODO: replace with content type negotiation? + includeUninitialized := req.URL.Query().Get("includeUninitialized") == "1" + trace.Step("About to store object in database") result, err := finishRequest(timeout, func() (runtime.Object, error) { - out, err := r.Create(ctx, name, obj) - if status, ok := out.(*metav1.Status); ok && err == nil && status.Code == 0 { - status.Code = http.StatusCreated - } - return out, err + return r.Create(ctx, name, obj, includeUninitialized) }) if err != nil { scope.err(err, w, req) @@ -474,7 +473,19 @@ func createHandler(r rest.NamedCreater, scope RequestScope, typer runtime.Object } trace.Step("Self-link added") - transformResponseObject(ctx, scope, req, w, http.StatusCreated, result) + // If the object is partially initialized, always indicate it via StatusAccepted + code := http.StatusCreated + if accessor, err := meta.Accessor(result); err == nil { + if accessor.GetInitializers() != nil { + code = http.StatusAccepted + } + } + status, ok := result.(*metav1.Status) + if ok && err == nil && status.Code == 0 { + status.Code = int32(code) + } + + transformResponseObject(ctx, scope, req, w, code, result) } } @@ -492,8 +503,8 @@ type namedCreaterAdapter struct { rest.Creater } -func (c *namedCreaterAdapter) Create(ctx request.Context, name string, obj runtime.Object) (runtime.Object, error) { - return c.Creater.Create(ctx, obj) +func (c *namedCreaterAdapter) Create(ctx request.Context, name string, obj runtime.Object, includeUninitialized bool) (runtime.Object, error) { + return c.Creater.Create(ctx, obj, includeUninitialized) } // PatchResource returns a function that will handle a resource patch diff --git a/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go b/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go index 74ddaee2c67..77b55229e88 100644 --- a/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go +++ b/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go @@ -259,6 +259,7 @@ func (e *Store) ListPredicate(ctx genericapirequest.Context, p storage.Selection // By default we should serve the request from etcd. options = &metainternalversion.ListOptions{ResourceVersion: ""} } + p.IncludeUninitialized = options.IncludeUninitialized list := e.NewListFunc() if name, ok := p.MatchesSingle(); ok { if key, err := e.KeyFunc(ctx, name); err == nil { @@ -273,7 +274,7 @@ func (e *Store) ListPredicate(ctx genericapirequest.Context, p storage.Selection } // Create inserts a new item according to the unique key from the object. -func (e *Store) Create(ctx genericapirequest.Context, obj runtime.Object) (runtime.Object, error) { +func (e *Store) Create(ctx genericapirequest.Context, obj runtime.Object, includeUninitialized bool) (runtime.Object, error) { if err := rest.BeforeCreate(e.CreateStrategy, ctx, obj); err != nil { return nil, err } @@ -319,15 +320,91 @@ func (e *Store) Create(ctx genericapirequest.Context, obj runtime.Object) (runti return nil, err } } + if !includeUninitialized { + return e.WaitForInitialized(ctx, out) + } return out, nil } +func (e *Store) WaitForInitialized(ctx genericapirequest.Context, obj runtime.Object) (runtime.Object, error) { + // return early if we don't have initializers, or if they've completed already + accessor, err := meta.Accessor(obj) + if err != nil { + return obj, nil + } + initializers := accessor.GetInitializers() + if initializers == nil { + return obj, nil + } + if result := initializers.Result; result != nil { + return nil, kubeerr.FromObject(result) + } + + key, err := e.KeyFunc(ctx, accessor.GetName()) + if err != nil { + return nil, err + } + w, err := e.Storage.Watch(ctx, key, accessor.GetResourceVersion(), storage.SelectionPredicate{ + Label: labels.Everything(), + Field: fields.Everything(), + + IncludeUninitialized: true, + }) + if err != nil { + return nil, err + } + defer w.Stop() + + latest := obj + ch := w.ResultChan() + for { + select { + case event, ok := <-ch: + if !ok { + // TODO: should we just expose the partially initialized object? + return nil, kubeerr.NewServerTimeout(e.QualifiedResource, "create", 0) + } + switch event.Type { + case watch.Deleted: + if latest = event.Object; latest != nil { + if accessor, err := meta.Accessor(latest); err == nil { + if initializers := accessor.GetInitializers(); initializers != nil && initializers.Result != nil { + // initialization failed, but we missed the modification event + return nil, kubeerr.FromObject(initializers.Result) + } + } + } + return nil, kubeerr.NewInternalError(fmt.Errorf("object deleted while waiting for creation")) + case watch.Error: + if status, ok := event.Object.(*metav1.Status); ok { + return nil, &kubeerr.StatusError{ErrStatus: *status} + } + return nil, kubeerr.NewInternalError(fmt.Errorf("unexpected object in watch stream, can't complete initialization %T", event.Object)) + case watch.Modified: + latest = event.Object + accessor, err = meta.Accessor(latest) + if err != nil { + return nil, kubeerr.NewInternalError(fmt.Errorf("object no longer has access to metadata %T: %v", latest, err)) + } + initializers := accessor.GetInitializers() + if initializers == nil { + // completed initialization + return latest, nil + } + if result := initializers.Result; result != nil { + // initialization failed + return nil, kubeerr.FromObject(result) + } + } + case <-ctx.Done(): + } + } +} + // shouldDeleteDuringUpdate checks if a Update is removing all the object's // finalizers. If so, it further checks if the object's -// DeletionGracePeriodSeconds is 0. If so, it returns true. -// -// If the store does not have garbage collection enabled, -// shouldDeleteDuringUpdate will always return false. +// DeletionGracePeriodSeconds is 0. If so, it returns true. If garbage collection +// is disabled it always returns false. func (e *Store) shouldDeleteDuringUpdate(ctx genericapirequest.Context, key string, obj, existing runtime.Object) bool { if !e.EnableGarbageCollection { return false @@ -345,9 +422,23 @@ func (e *Store) shouldDeleteDuringUpdate(ctx genericapirequest.Context, key stri return len(newMeta.GetFinalizers()) == 0 && oldMeta.GetDeletionGracePeriodSeconds() != nil && *oldMeta.GetDeletionGracePeriodSeconds() == 0 } -// deleteForEmptyFinalizers handles deleting an object once its finalizer list -// becomes empty due to an update. -func (e *Store) deleteForEmptyFinalizers(ctx genericapirequest.Context, name, key string, obj runtime.Object, preconditions *storage.Preconditions) (runtime.Object, bool, error) { +// shouldDeleteForFailedInitialization returns true if the provided object is initializing and has +// a failure recorded. +func (e *Store) shouldDeleteForFailedInitialization(ctx genericapirequest.Context, obj runtime.Object) bool { + m, err := meta.Accessor(obj) + if err != nil { + utilruntime.HandleError(err) + return false + } + if initializers := m.GetInitializers(); initializers != nil && initializers.Result != nil { + return true + } + return false +} + +// deleteWithoutFinalizers handles deleting an object ignoring its finalizer list. +// Used for objects that are either been finalized or have never initialized. +func (e *Store) deleteWithoutFinalizers(ctx genericapirequest.Context, name, key string, obj runtime.Object, preconditions *storage.Preconditions) (runtime.Object, bool, error) { out := e.NewFunc() glog.V(6).Infof("going to delete %s from registry, triggered by update", name) if err := e.Storage.Delete(ctx, key, out, preconditions); err != nil { @@ -477,7 +568,7 @@ func (e *Store) Update(ctx genericapirequest.Context, name string, objInfo rest. if err != nil { // delete the object if err == errEmptiedFinalizers { - return e.deleteForEmptyFinalizers(ctx, name, key, deleteObj, storagePreconditions) + return e.deleteWithoutFinalizers(ctx, name, key, deleteObj, storagePreconditions) } if creating { err = storeerr.InterpretCreateError(err, e.QualifiedResource, name) @@ -487,6 +578,11 @@ func (e *Store) Update(ctx genericapirequest.Context, name string, objInfo rest. } return nil, false, err } + + if e.shouldDeleteForFailedInitialization(ctx, out) { + return e.deleteWithoutFinalizers(ctx, name, key, out, storagePreconditions) + } + if creating { if e.AfterCreate != nil { if err := e.AfterCreate(out); err != nil { @@ -1025,11 +1121,14 @@ func (e *Store) Watch(ctx genericapirequest.Context, options *metainternalversio if options != nil && options.FieldSelector != nil { field = options.FieldSelector } + predicate := e.PredicateFunc(label, field) + resourceVersion := "" if options != nil { resourceVersion = options.ResourceVersion + predicate.IncludeUninitialized = options.IncludeUninitialized } - return e.WatchPredicate(ctx, e.PredicateFunc(label, field), resourceVersion) + return e.WatchPredicate(ctx, predicate, resourceVersion) } // WatchPredicate starts a watch for the items that m matches. diff --git a/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store_test.go b/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store_test.go index d589d57256b..d0443115efd 100644 --- a/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store_test.go +++ b/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store_test.go @@ -39,6 +39,7 @@ import ( "k8s.io/apimachinery/pkg/selection" "k8s.io/apimachinery/pkg/util/validation/field" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apimachinery/pkg/watch" "k8s.io/apiserver/pkg/apis/example" examplev1 "k8s.io/apiserver/pkg/apis/example/v1" genericapirequest "k8s.io/apiserver/pkg/endpoints/request" @@ -117,9 +118,9 @@ func NewTestGenericStoreRegistry(t *testing.T) (factory.DestroyFunc, *Store) { return newTestGenericStoreRegistry(t, scheme, false) } -func getPodAttrs(obj runtime.Object) (labels.Set, fields.Set, error) { +func getPodAttrs(obj runtime.Object) (labels.Set, fields.Set, bool, error) { pod := obj.(*example.Pod) - return labels.Set{"name": pod.ObjectMeta.Name}, nil, nil + return labels.Set{"name": pod.ObjectMeta.Name}, nil, pod.Initializers != nil, nil } // matchPodName returns selection predicate that matches any pod with name in the set. @@ -142,8 +143,8 @@ func matchEverything() storage.SelectionPredicate { return storage.SelectionPredicate{ Label: labels.Everything(), Field: fields.Everything(), - GetAttrs: func(obj runtime.Object) (label labels.Set, field fields.Set, err error) { - return nil, nil, nil + GetAttrs: func(obj runtime.Object) (label labels.Set, field fields.Set, uninitialized bool, err error) { + return nil, nil, false, nil }, } } @@ -238,7 +239,7 @@ func TestStoreListResourceVersion(t *testing.T) { destroyFunc, registry := newTestGenericStoreRegistry(t, scheme, true) defer destroyFunc() - obj, err := registry.Create(ctx, fooPod) + obj, err := registry.Create(ctx, fooPod, false) if err != nil { t.Fatal(err) } @@ -268,7 +269,7 @@ func TestStoreListResourceVersion(t *testing.T) { t.Fatalf("expected waiting, but get %#v", l) } - if _, err := registry.Create(ctx, barPod); err != nil { + if _, err := registry.Create(ctx, barPod, false); err != nil { t.Fatal(err) } @@ -305,7 +306,7 @@ func TestStoreCreate(t *testing.T) { registry.DeleteStrategy = testGracefulStrategy{defaultDeleteStrategy} // create the object - objA, err := registry.Create(testContext, podA) + objA, err := registry.Create(testContext, podA, false) if err != nil { t.Errorf("Unexpected error: %v", err) } @@ -322,7 +323,7 @@ func TestStoreCreate(t *testing.T) { } // now try to create the second pod - _, err = registry.Create(testContext, podB) + _, err = registry.Create(testContext, podB, false) if !errors.IsAlreadyExists(err) { t.Errorf("Unexpected error: %v", err) } @@ -341,7 +342,7 @@ func TestStoreCreate(t *testing.T) { } // try to create before graceful deletion period is over - _, err = registry.Create(testContext, podA) + _, err = registry.Create(testContext, podA, false) if err == nil || !errors.IsAlreadyExists(err) { t.Fatalf("Expected 'already exists' error from storage, but got %v", err) } @@ -353,6 +354,208 @@ func TestStoreCreate(t *testing.T) { } } +func isPendingInitialization(obj metav1.Object) bool { + return obj.GetInitializers() != nil && obj.GetInitializers().Result == nil && len(obj.GetInitializers().Pending) > 0 +} + +func hasInitializers(obj metav1.Object, expected ...string) bool { + if !isPendingInitialization(obj) { + return false + } + if len(expected) != len(obj.GetInitializers().Pending) { + return false + } + for i, init := range obj.GetInitializers().Pending { + if init.Name != expected[i] { + return false + } + } + return true +} + +func isFailedInitialization(obj metav1.Object) bool { + return obj.GetInitializers() != nil && obj.GetInitializers().Result != nil && obj.GetInitializers().Result.Status == metav1.StatusFailure +} + +func isInitialized(obj metav1.Object) bool { + return obj.GetInitializers() == nil +} + +func TestStoreCreateInitialized(t *testing.T) { + podA := &example.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", Namespace: "test", + Initializers: &metav1.Initializers{ + Pending: []metav1.Initializer{{Name: "Test"}}, + }, + }, + Spec: example.PodSpec{NodeName: "machine"}, + } + + ctx := genericapirequest.WithNamespace(genericapirequest.NewContext(), "test") + destroyFunc, registry := NewTestGenericStoreRegistry(t) + defer destroyFunc() + + ch := make(chan struct{}) + chObserver := make(chan struct{}) + + // simulate a background initializer that initializes the object + early := make(chan struct{}, 1) + go func() { + defer close(ch) + w, err := registry.Watch(ctx, &metainternalversion.ListOptions{ + IncludeUninitialized: true, + Watch: true, + FieldSelector: fields.OneTermEqualSelector("metadata.name", "foo"), + }) + if err != nil { + t.Fatal(err) + } + defer w.Stop() + event := <-w.ResultChan() + pod := event.Object.(*example.Pod) + if event.Type != watch.Added || !hasInitializers(pod, "Test") { + t.Fatalf("unexpected event: %s %#v", event.Type, event.Object) + } + + select { + case <-early: + t.Fatalf("CreateInitialized should not have returned") + default: + } + + pod.Initializers = nil + updated, _, err := registry.Update(ctx, podA.Name, rest.DefaultUpdatedObjectInfo(pod, scheme)) + if err != nil { + t.Fatal(err) + } + pod = updated.(*example.Pod) + if !isInitialized(pod) { + t.Fatalf("unexpected update: %#v", pod.Initializers) + } + + event = <-w.ResultChan() + if event.Type != watch.Modified || !isInitialized(event.Object.(*example.Pod)) { + t.Fatalf("unexpected event: %s %#v", event.Type, event.Object) + } + }() + + // create a background worker that should only observe the final creation + go func() { + defer close(chObserver) + w, err := registry.Watch(ctx, &metainternalversion.ListOptions{ + IncludeUninitialized: false, + Watch: true, + FieldSelector: fields.OneTermEqualSelector("metadata.name", "foo"), + }) + if err != nil { + t.Fatal(err) + } + defer w.Stop() + + event := <-w.ResultChan() + pod := event.Object.(*example.Pod) + if event.Type != watch.Added || !isInitialized(pod) { + t.Fatalf("unexpected event: %s %#v", event.Type, event.Object) + } + }() + + // create the object + objA, err := registry.Create(ctx, podA, false) + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + + // signal that we're now waiting, then wait for both observers to see + // the result of the create. + early <- struct{}{} + <-ch + <-chObserver + + // get the object + checkobj, err := registry.Get(ctx, podA.Name, &metav1.GetOptions{}) + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + + // verify objects are equal + if e, a := objA, checkobj; !reflect.DeepEqual(e, a) { + t.Errorf("Expected %#v, got %#v", e, a) + } +} + +func TestStoreCreateInitializedFailed(t *testing.T) { + podA := &example.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", Namespace: "test", + Initializers: &metav1.Initializers{ + Pending: []metav1.Initializer{{Name: "Test"}}, + }, + }, + Spec: example.PodSpec{NodeName: "machine"}, + } + + ctx := genericapirequest.WithNamespace(genericapirequest.NewContext(), "test") + destroyFunc, registry := NewTestGenericStoreRegistry(t) + defer destroyFunc() + + ch := make(chan struct{}) + go func() { + w, err := registry.Watch(ctx, &metainternalversion.ListOptions{ + IncludeUninitialized: true, + Watch: true, + FieldSelector: fields.OneTermEqualSelector("metadata.name", "foo"), + }) + if err != nil { + t.Fatal(err) + } + event := <-w.ResultChan() + pod := event.Object.(*example.Pod) + if event.Type != watch.Added || !hasInitializers(pod, "Test") { + t.Fatalf("unexpected event: %s %#v", event.Type, event.Object) + } + pod.Initializers.Pending = nil + pod.Initializers.Result = &metav1.Status{Status: metav1.StatusFailure, Code: 403, Reason: metav1.StatusReasonForbidden, Message: "induced failure"} + updated, _, err := registry.Update(ctx, podA.Name, rest.DefaultUpdatedObjectInfo(pod, scheme)) + if err != nil { + t.Fatal(err) + } + pod = updated.(*example.Pod) + if !isFailedInitialization(pod) { + t.Fatalf("unexpected update: %#v", pod.Initializers) + } + + event = <-w.ResultChan() + if event.Type != watch.Modified || !isFailedInitialization(event.Object.(*example.Pod)) { + t.Fatalf("unexpected event: %s %#v", event.Type, event.Object) + } + + event = <-w.ResultChan() + if event.Type != watch.Deleted || !isFailedInitialization(event.Object.(*example.Pod)) { + t.Fatalf("unexpected event: %s %#v", event.Type, event.Object) + } + w.Stop() + close(ch) + }() + + // create the object + _, err := registry.Create(ctx, podA, false) + if !errors.IsForbidden(err) { + t.Fatalf("unexpected error: %#v", err.(errors.APIStatus).Status()) + } + if err.(errors.APIStatus).Status().Message != "induced failure" { + t.Fatalf("unexpected error: %#v", err) + } + + <-ch + + // get the object + _, err = registry.Get(ctx, podA.Name, &metav1.GetOptions{}) + if !errors.IsNotFound(err) { + t.Fatalf("Unexpected error: %v", err) + } +} + func updateAndVerify(t *testing.T, ctx genericapirequest.Context, registry *Store, pod *example.Pod) bool { obj, _, err := registry.Update(ctx, pod.Name, rest.DefaultUpdatedObjectInfo(pod, scheme)) if err != nil { @@ -440,7 +643,7 @@ func TestNoOpUpdates(t *testing.T) { var err error var createResult runtime.Object - if createResult, err = registry.Create(genericapirequest.NewDefaultContext(), newPod()); err != nil { + if createResult, err = registry.Create(genericapirequest.NewDefaultContext(), newPod(), false); err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -615,7 +818,7 @@ func TestStoreDelete(t *testing.T) { } // create pod - _, err = registry.Create(testContext, podA) + _, err = registry.Create(testContext, podA, false) if err != nil { t.Errorf("Unexpected error: %v", err) } @@ -687,7 +890,7 @@ func TestGracefulStoreHandleFinalizers(t *testing.T) { registry.DeleteStrategy = testGracefulStrategy{defaultDeleteStrategy} defer destroyFunc() // create pod - _, err := registry.Create(testContext, podWithFinalizer) + _, err := registry.Create(testContext, podWithFinalizer, false) if err != nil { t.Errorf("Unexpected error: %v", err) } @@ -735,6 +938,43 @@ func TestGracefulStoreHandleFinalizers(t *testing.T) { } } +func TestFailedInitializationStoreUpdate(t *testing.T) { + initialGeneration := int64(1) + podInitializing := &example.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "foo", Initializers: &metav1.Initializers{Pending: []metav1.Initializer{{Name: "Test"}}}, Generation: initialGeneration}, + Spec: example.PodSpec{NodeName: "machine"}, + } + + testContext := genericapirequest.WithNamespace(genericapirequest.NewContext(), "test") + destroyFunc, registry := NewTestGenericStoreRegistry(t) + registry.EnableGarbageCollection = true + defaultDeleteStrategy := testRESTStrategy{scheme, names.SimpleNameGenerator, true, false, true} + registry.DeleteStrategy = testGracefulStrategy{defaultDeleteStrategy} + defer destroyFunc() + + // create pod, view initializing + obj, err := registry.Create(testContext, podInitializing, true) + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + pod := obj.(*example.Pod) + + // update the pod with initialization failure, the pod should be deleted + pod.Initializers.Result = &metav1.Status{Status: metav1.StatusFailure} + result, _, err := registry.Update(testContext, podInitializing.Name, rest.DefaultUpdatedObjectInfo(pod, scheme)) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + _, err = registry.Get(testContext, podInitializing.Name, &metav1.GetOptions{}) + if err == nil || !errors.IsNotFound(err) { + t.Fatalf("Unexpected error: %v", err) + } + pod = result.(*example.Pod) + if pod.Initializers == nil || pod.Initializers.Result == nil || pod.Initializers.Result.Status != metav1.StatusFailure { + t.Fatalf("Pod returned from update was not correct: %#v", pod) + } +} + func TestNonGracefulStoreHandleFinalizers(t *testing.T) { initialGeneration := int64(1) podWithFinalizer := &example.Pod{ @@ -747,7 +987,7 @@ func TestNonGracefulStoreHandleFinalizers(t *testing.T) { registry.EnableGarbageCollection = true defer destroyFunc() // create pod - _, err := registry.Create(testContext, podWithFinalizer) + _, err := registry.Create(testContext, podWithFinalizer, false) if err != nil { t.Errorf("Unexpected error: %v", err) } @@ -1048,7 +1288,7 @@ func TestStoreDeleteWithOrphanDependents(t *testing.T) { for _, tc := range testcases { registry.DeleteStrategy = tc.strategy // create pod - _, err := registry.Create(testContext, tc.pod) + _, err := registry.Create(testContext, tc.pod, false) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -1267,7 +1507,7 @@ func TestStoreDeletionPropagation(t *testing.T) { i++ pod := createPod(i, tc.existingFinalizers) // create pod - _, err := registry.Create(testContext, pod) + _, err := registry.Create(testContext, pod, false) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -1311,10 +1551,10 @@ func TestStoreDeleteCollection(t *testing.T) { destroyFunc, registry := NewTestGenericStoreRegistry(t) defer destroyFunc() - if _, err := registry.Create(testContext, podA); err != nil { + if _, err := registry.Create(testContext, podA, false); err != nil { t.Errorf("Unexpected error: %v", err) } - if _, err := registry.Create(testContext, podB); err != nil { + if _, err := registry.Create(testContext, podB, false); err != nil { t.Errorf("Unexpected error: %v", err) } @@ -1347,10 +1587,10 @@ func TestStoreDeleteCollectionNotFound(t *testing.T) { for i := 0; i < 10; i++ { // Setup - if _, err := registry.Create(testContext, podA); err != nil { + if _, err := registry.Create(testContext, podA, false); err != nil { t.Errorf("Unexpected error: %v", err) } - if _, err := registry.Create(testContext, podB); err != nil { + if _, err := registry.Create(testContext, podB, false); err != nil { t.Errorf("Unexpected error: %v", err) } @@ -1386,7 +1626,7 @@ func TestStoreDeleteCollectionWithWatch(t *testing.T) { destroyFunc, registry := NewTestGenericStoreRegistry(t) defer destroyFunc() - objCreated, err := registry.Create(testContext, podA) + objCreated, err := registry.Create(testContext, podA, false) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -1455,7 +1695,7 @@ func TestStoreWatch(t *testing.T) { if err != nil { t.Errorf("%v: unexpected error: %v", name, err) } else { - obj, err := registry.Create(testContext, podA) + obj, err := registry.Create(testContext, podA, false) if err != nil { got, open := <-wi.ResultChan() if !open { @@ -1530,12 +1770,12 @@ func newTestGenericStoreRegistry(t *testing.T, scheme *runtime.Scheme, hasCacheE return storage.SelectionPredicate{ Label: label, Field: field, - GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) { + GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, bool, error) { pod, ok := obj.(*example.Pod) if !ok { - return nil, nil, fmt.Errorf("not a pod") + return nil, nil, false, fmt.Errorf("not a pod") } - return labels.Set(pod.ObjectMeta.Labels), generic.ObjectMetaFieldsSet(&pod.ObjectMeta, true), nil + return labels.Set(pod.ObjectMeta.Labels), generic.ObjectMetaFieldsSet(&pod.ObjectMeta, true), pod.Initializers != nil, nil }, } }, diff --git a/staging/src/k8s.io/apiserver/pkg/registry/rest/rest.go b/staging/src/k8s.io/apiserver/pkg/registry/rest/rest.go index 015754f8602..4421ffcac34 100644 --- a/staging/src/k8s.io/apiserver/pkg/registry/rest/rest.go +++ b/staging/src/k8s.io/apiserver/pkg/registry/rest/rest.go @@ -174,8 +174,9 @@ type Creater interface { // This object must be a pointer type for use with Codec.DecodeInto([]byte, runtime.Object) New() runtime.Object - // Create creates a new version of a resource. - Create(ctx genericapirequest.Context, obj runtime.Object) (runtime.Object, error) + // Create creates a new version of a resource. If includeUninitialized is set, the object may be returned + // without completing initialization. + Create(ctx genericapirequest.Context, obj runtime.Object, includeUninitialized bool) (runtime.Object, error) } // NamedCreater is an object that can create an instance of a RESTful object using a name parameter. @@ -186,8 +187,9 @@ type NamedCreater interface { // Create creates a new version of a resource. It expects a name parameter from the path. // This is needed for create operations on subresources which include the name of the parent - // resource in the path. - Create(ctx genericapirequest.Context, name string, obj runtime.Object) (runtime.Object, error) + // resource in the path. If includeUninitialized is set, the object may be returned without + // completing initialization. + Create(ctx genericapirequest.Context, name string, obj runtime.Object, includeUninitialized bool) (runtime.Object, error) } // UpdatedObjectInfo provides information about an updated object to an Updater. diff --git a/staging/src/k8s.io/apiserver/pkg/registry/rest/resttest/resttest.go b/staging/src/k8s.io/apiserver/pkg/registry/rest/resttest/resttest.go index be2fb94fe38..f1e2a1bad12 100644 --- a/staging/src/k8s.io/apiserver/pkg/registry/rest/resttest/resttest.go +++ b/staging/src/k8s.io/apiserver/pkg/registry/rest/resttest/resttest.go @@ -251,7 +251,7 @@ func (t *Tester) testCreateAlreadyExisting(obj runtime.Object, createFn CreateFu } defer t.delete(ctx, foo) - _, err := t.storage.(rest.Creater).Create(ctx, foo) + _, err := t.storage.(rest.Creater).Create(ctx, foo, false) if !errors.IsAlreadyExists(err) { t.Errorf("expected already exists err, got %v", err) } @@ -263,7 +263,7 @@ func (t *Tester) testCreateEquals(obj runtime.Object, getFn GetFunc) { foo := copyOrDie(obj, t.scheme) t.setObjectMeta(foo, t.namer(2)) - created, err := t.storage.(rest.Creater).Create(ctx, foo) + created, err := t.storage.(rest.Creater).Create(ctx, foo, false) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -291,7 +291,7 @@ func (t *Tester) testCreateDiscardsObjectNamespace(valid runtime.Object) { objectMeta.SetNamespace("not-default") // Ideally, we'd get an error back here, but at least verify the namespace wasn't persisted - created, err := t.storage.(rest.Creater).Create(t.TestContext(), copyOrDie(valid, t.scheme)) + created, err := t.storage.(rest.Creater).Create(t.TestContext(), copyOrDie(valid, t.scheme), false) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -307,7 +307,7 @@ func (t *Tester) testCreateGeneratesName(valid runtime.Object) { objectMeta.SetName("") objectMeta.SetGenerateName("test-") - created, err := t.storage.(rest.Creater).Create(t.TestContext(), valid) + created, err := t.storage.(rest.Creater).Create(t.TestContext(), valid, false) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -322,7 +322,7 @@ func (t *Tester) testCreateHasMetadata(valid runtime.Object) { objectMeta.SetName(t.namer(1)) objectMeta.SetNamespace(t.TestNamespace()) - obj, err := t.storage.(rest.Creater).Create(t.TestContext(), valid) + obj, err := t.storage.(rest.Creater).Create(t.TestContext(), valid, false) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -340,7 +340,7 @@ func (t *Tester) testCreateIgnoresContextNamespace(valid runtime.Object) { ctx := genericapirequest.WithNamespace(genericapirequest.NewContext(), "not-default2") // Ideally, we'd get an error back here, but at least verify the namespace wasn't persisted - created, err := t.storage.(rest.Creater).Create(ctx, copyOrDie(valid, t.scheme)) + created, err := t.storage.(rest.Creater).Create(ctx, copyOrDie(valid, t.scheme), false) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -359,7 +359,7 @@ func (t *Tester) testCreateIgnoresMismatchedNamespace(valid runtime.Object) { ctx := genericapirequest.WithNamespace(genericapirequest.NewContext(), "not-default2") // Ideally, we'd get an error back here, but at least verify the namespace wasn't persisted - created, err := t.storage.(rest.Creater).Create(ctx, copyOrDie(valid, t.scheme)) + created, err := t.storage.(rest.Creater).Create(ctx, copyOrDie(valid, t.scheme), false) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -377,7 +377,7 @@ func (t *Tester) testCreateValidatesNames(valid runtime.Object) { objCopyMeta.SetName(invalidName) ctx := t.TestContext() - _, err := t.storage.(rest.Creater).Create(ctx, objCopy) + _, err := t.storage.(rest.Creater).Create(ctx, objCopy, false) if !errors.IsInvalid(err) { t.Errorf("%s: Expected to get an invalid resource error, got '%v'", invalidName, err) } @@ -389,7 +389,7 @@ func (t *Tester) testCreateValidatesNames(valid runtime.Object) { objCopyMeta.SetName(objCopyMeta.GetName() + invalidSuffix) ctx := t.TestContext() - _, err := t.storage.(rest.Creater).Create(ctx, objCopy) + _, err := t.storage.(rest.Creater).Create(ctx, objCopy, false) if !errors.IsInvalid(err) { t.Errorf("%s: Expected to get an invalid resource error, got '%v'", invalidSuffix, err) } @@ -399,7 +399,7 @@ func (t *Tester) testCreateValidatesNames(valid runtime.Object) { func (t *Tester) testCreateInvokesValidation(invalid ...runtime.Object) { for i, obj := range invalid { ctx := t.TestContext() - _, err := t.storage.(rest.Creater).Create(ctx, obj) + _, err := t.storage.(rest.Creater).Create(ctx, obj, false) if !errors.IsInvalid(err) { t.Errorf("%d: Expected to get an invalid resource error, got %v", i, err) } @@ -410,7 +410,7 @@ func (t *Tester) testCreateRejectsMismatchedNamespace(valid runtime.Object) { objectMeta := t.getObjectMetaOrFail(valid) objectMeta.SetNamespace("not-default") - _, err := t.storage.(rest.Creater).Create(t.TestContext(), valid) + _, err := t.storage.(rest.Creater).Create(t.TestContext(), valid, false) if err == nil { t.Errorf("Expected an error, but we didn't get one") } else if !strings.Contains(err.Error(), "does not match the namespace sent on the request") { @@ -424,7 +424,7 @@ func (t *Tester) testCreateResetsUserData(valid runtime.Object) { objectMeta.SetUID("bad-uid") objectMeta.SetCreationTimestamp(now) - obj, err := t.storage.(rest.Creater).Create(t.TestContext(), valid) + obj, err := t.storage.(rest.Creater).Create(t.TestContext(), valid, false) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -442,7 +442,7 @@ func (t *Tester) testCreateIgnoreClusterName(valid runtime.Object) { objectMeta.SetName(t.namer(3)) objectMeta.SetClusterName("clustername-to-ignore") - obj, err := t.storage.(rest.Creater).Create(t.TestContext(), copyOrDie(valid, t.scheme)) + obj, err := t.storage.(rest.Creater).Create(t.TestContext(), copyOrDie(valid, t.scheme), false) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -1071,14 +1071,14 @@ func (t *Tester) testGetDifferentNamespace(obj runtime.Object) { ctx1 := genericapirequest.WithNamespace(genericapirequest.NewContext(), "bar3") objMeta.SetNamespace(genericapirequest.NamespaceValue(ctx1)) - _, err := t.storage.(rest.Creater).Create(ctx1, obj) + _, err := t.storage.(rest.Creater).Create(ctx1, obj, false) if err != nil { t.Errorf("unexpected error: %v", err) } ctx2 := genericapirequest.WithNamespace(genericapirequest.NewContext(), "bar4") objMeta.SetNamespace(genericapirequest.NamespaceValue(ctx2)) - _, err = t.storage.(rest.Creater).Create(ctx2, obj) + _, err = t.storage.(rest.Creater).Create(ctx2, obj, false) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -1112,7 +1112,7 @@ func (t *Tester) testGetFound(obj runtime.Object) { ctx := t.TestContext() t.setObjectMeta(obj, t.namer(1)) - existing, err := t.storage.(rest.Creater).Create(ctx, obj) + existing, err := t.storage.(rest.Creater).Create(ctx, obj, false) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -1135,7 +1135,7 @@ func (t *Tester) testGetMimatchedNamespace(obj runtime.Object) { objMeta := t.getObjectMetaOrFail(obj) objMeta.SetName(t.namer(4)) objMeta.SetNamespace(genericapirequest.NamespaceValue(ctx1)) - _, err := t.storage.(rest.Creater).Create(ctx1, obj) + _, err := t.storage.(rest.Creater).Create(ctx1, obj, false) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -1154,7 +1154,7 @@ func (t *Tester) testGetMimatchedNamespace(obj runtime.Object) { func (t *Tester) testGetNotFound(obj runtime.Object) { ctx := t.TestContext() t.setObjectMeta(obj, t.namer(2)) - _, err := t.storage.(rest.Creater).Create(ctx, obj) + _, err := t.storage.(rest.Creater).Create(ctx, obj, false) if err != nil { t.Errorf("unexpected error: %v", err) } diff --git a/staging/src/k8s.io/apiserver/pkg/server/options/admission.go b/staging/src/k8s.io/apiserver/pkg/server/options/admission.go index 760f4fc3da7..0a86aefa078 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/options/admission.go +++ b/staging/src/k8s.io/apiserver/pkg/server/options/admission.go @@ -41,7 +41,7 @@ type AdmissionOptions struct { func NewAdmissionOptions() *AdmissionOptions { options := &AdmissionOptions{ Plugins: &admission.Plugins{}, - PluginNames: []string{}, + PluginNames: []string{"Initializers"}, } server.RegisterAllAdmissionPlugins(options.Plugins) return options diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher.go index ac5e0160822..5cc9e93e152 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher.go @@ -62,8 +62,8 @@ type CacherConfig struct { // KeyFunc is used to get a key in the underlying storage for a given object. KeyFunc func(runtime.Object) (string, error) - // GetAttrsFunc is used to get object labels and fields. - GetAttrsFunc func(runtime.Object) (labels.Set, fields.Set, error) + // GetAttrsFunc is used to get object labels, fields, and the uninitialized bool + GetAttrsFunc func(runtime.Object) (label labels.Set, field fields.Set, uninitialized bool, err error) // TriggerPublisherFunc is used for optimizing amount of watchers that // needs to process an incoming event. @@ -131,7 +131,7 @@ func (i *indexedWatchers) terminateAll(objectType reflect.Type) { } } -type watchFilterFunc func(string, labels.Set, fields.Set) bool +type watchFilterFunc func(key string, l labels.Set, f fields.Set, uninitialized bool) bool // Cacher is responsible for serving WATCH and LIST requests for a given // resource from its internal cache and updating its cache in the background @@ -658,11 +658,11 @@ func filterFunction(key string, p SelectionPredicate) func(string, runtime.Objec } func watchFilterFunction(key string, p SelectionPredicate) watchFilterFunc { - filterFunc := func(objKey string, label labels.Set, field fields.Set) bool { + filterFunc := func(objKey string, label labels.Set, field fields.Set, uninitialized bool) bool { if !hasPathPrefix(objKey, key) { return false } - return p.MatchesLabelsAndFields(label, field) + return p.MatchesObjectAttributes(label, field, uninitialized) } return filterFunc } @@ -840,10 +840,10 @@ func (c *cacheWatcher) add(event *watchCacheEvent, budget *timeBudget) { // NOTE: sendWatchCacheEvent is assumed to not modify !!! func (c *cacheWatcher) sendWatchCacheEvent(event *watchCacheEvent) { - curObjPasses := event.Type != watch.Deleted && c.filter(event.Key, event.ObjLabels, event.ObjFields) + curObjPasses := event.Type != watch.Deleted && c.filter(event.Key, event.ObjLabels, event.ObjFields, event.ObjUninitialized) oldObjPasses := false if event.PrevObject != nil { - oldObjPasses = c.filter(event.Key, event.PrevObjLabels, event.PrevObjFields) + oldObjPasses = c.filter(event.Key, event.PrevObjLabels, event.PrevObjFields, event.PrevObjUninitialized) } if !curObjPasses && !oldObjPasses { // Watcher is not interested in that object. diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher_whitebox_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher_whitebox_test.go index 9a80b5f0da1..b8973d0b685 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher_whitebox_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher_whitebox_test.go @@ -37,7 +37,7 @@ import ( func TestCacheWatcherCleanupNotBlockedByResult(t *testing.T) { var lock sync.RWMutex count := 0 - filter := func(string, labels.Set, fields.Set) bool { return true } + filter := func(string, labels.Set, fields.Set, bool) bool { return true } forget := func(bool) { lock.Lock() defer lock.Unlock() @@ -61,7 +61,7 @@ func TestCacheWatcherCleanupNotBlockedByResult(t *testing.T) { } func TestCacheWatcherHandlesFiltering(t *testing.T) { - filter := func(_ string, _ labels.Set, field fields.Set) bool { + filter := func(_ string, _ labels.Set, field fields.Set, _ bool) bool { return field["spec.nodeName"] == "host" } forget := func(bool) {} diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd/etcd_helper_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd/etcd_helper_test.go index 9cc29673509..c6b046a4a76 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd/etcd_helper_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd/etcd_helper_test.go @@ -249,9 +249,9 @@ func TestListFiltered(t *testing.T) { p := storage.SelectionPredicate{ Label: labels.Everything(), Field: fields.SelectorFromSet(fields.Set{"metadata.name": "bar"}), - GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) { + GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, bool, error) { pod := obj.(*example.Pod) - return labels.Set(pod.Labels), fields.Set{"metadata.name": pod.Name}, nil + return labels.Set(pod.Labels), fields.Set{"metadata.name": pod.Name}, pod.Initializers != nil, nil }, } var got example.PodList diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go index 77f397e8046..cc1d76c7789 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go @@ -285,9 +285,9 @@ func TestGetToList(t *testing.T) { pred: storage.SelectionPredicate{ Label: labels.Everything(), Field: fields.ParseSelectorOrDie("metadata.name!=" + storedObj.Name), - GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) { + GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, bool, error) { pod := obj.(*example.Pod) - return nil, fields.Set{"metadata.name": pod.Name}, nil + return nil, fields.Set{"metadata.name": pod.Name}, pod.Initializers != nil, nil }, }, expectedOut: nil, @@ -644,9 +644,9 @@ func TestList(t *testing.T) { pred: storage.SelectionPredicate{ Label: labels.Everything(), Field: fields.ParseSelectorOrDie("metadata.name!=" + preset[0].storedObj.Name), - GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) { + GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, bool, error) { pod := obj.(*example.Pod) - return nil, fields.Set{"metadata.name": pod.Name}, nil + return nil, fields.Set{"metadata.name": pod.Name}, pod.Initializers != nil, nil }, }, expectedOut: nil, diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go index cb172564d3d..50b46371021 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go @@ -118,7 +118,7 @@ func (w *watcher) createWatchChan(ctx context.Context, key string, rev int64, re resultChan: make(chan watch.Event, outgoingBufSize), errChan: make(chan error, 1), } - if pred.Label.Empty() && pred.Field.Empty() { + if pred.Empty() { // The filter doesn't filter out any object. wc.internalFilter = nil } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go index 58cbfe4c29b..052e4dc73a9 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go @@ -73,9 +73,9 @@ func testWatch(t *testing.T, recursive bool) { pred: storage.SelectionPredicate{ Label: labels.Everything(), Field: fields.ParseSelectorOrDie("metadata.name=bar"), - GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) { + GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, bool, error) { pod := obj.(*example.Pod) - return nil, fields.Set{"metadata.name": pod.Name}, nil + return nil, fields.Set{"metadata.name": pod.Name}, pod.Initializers != nil, nil }, }, }, { // update @@ -88,9 +88,9 @@ func testWatch(t *testing.T, recursive bool) { pred: storage.SelectionPredicate{ Label: labels.Everything(), Field: fields.ParseSelectorOrDie("metadata.name!=bar"), - GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) { + GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, bool, error) { pod := obj.(*example.Pod) - return nil, fields.Set{"metadata.name": pod.Name}, nil + return nil, fields.Set{"metadata.name": pod.Name}, pod.Initializers != nil, nil }, }, }} diff --git a/staging/src/k8s.io/apiserver/pkg/storage/interfaces.go b/staging/src/k8s.io/apiserver/pkg/storage/interfaces.go index e8181c3e848..74abfdc3e0c 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/interfaces.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/interfaces.go @@ -72,6 +72,8 @@ type FilterFunc func(obj runtime.Object) bool var Everything = SelectionPredicate{ Label: labels.Everything(), Field: fields.Everything(), + // TODO: split this into a new top level constant? + IncludeUninitialized: true, } // Pass an UpdateFunc to Interface.GuaranteedUpdate to make an update diff --git a/staging/src/k8s.io/apiserver/pkg/storage/selection_predicate.go b/staging/src/k8s.io/apiserver/pkg/storage/selection_predicate.go index 8878245d1f2..3a345303b7c 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/selection_predicate.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/selection_predicate.go @@ -22,29 +22,33 @@ import ( "k8s.io/apimachinery/pkg/runtime" ) -// AttrFunc returns label and field sets for List or Watch to match. +// AttrFunc returns label and field sets and the uninitialized flag for List or Watch to match. // In any failure to parse given object, it returns error. -type AttrFunc func(obj runtime.Object) (labels.Set, fields.Set, error) +type AttrFunc func(obj runtime.Object) (labels.Set, fields.Set, bool, error) // SelectionPredicate is used to represent the way to select objects from api storage. type SelectionPredicate struct { - Label labels.Selector - Field fields.Selector - GetAttrs AttrFunc - IndexFields []string + Label labels.Selector + Field fields.Selector + IncludeUninitialized bool + GetAttrs AttrFunc + IndexFields []string } // Matches returns true if the given object's labels and fields (as // returned by s.GetAttrs) match s.Label and s.Field. An error is // returned if s.GetAttrs fails. func (s *SelectionPredicate) Matches(obj runtime.Object) (bool, error) { - if s.Label.Empty() && s.Field.Empty() { + if s.Empty() { return true, nil } - labels, fields, err := s.GetAttrs(obj) + labels, fields, uninitialized, err := s.GetAttrs(obj) if err != nil { return false, err } + if !s.IncludeUninitialized && uninitialized { + return false, nil + } matched := s.Label.Matches(labels) if matched && s.Field != nil { matched = (matched && s.Field.Matches(fields)) @@ -52,9 +56,12 @@ func (s *SelectionPredicate) Matches(obj runtime.Object) (bool, error) { return matched, nil } -// MatchesLabelsAndFields returns true if the given labels and fields +// MatchesObjectAttributes returns true if the given labels and fields // match s.Label and s.Field. -func (s *SelectionPredicate) MatchesLabelsAndFields(l labels.Set, f fields.Set) bool { +func (s *SelectionPredicate) MatchesObjectAttributes(l labels.Set, f fields.Set, uninitialized bool) bool { + if !s.IncludeUninitialized && uninitialized { + return false + } if s.Label.Empty() && s.Field.Empty() { return true } @@ -93,10 +100,11 @@ func (s *SelectionPredicate) RemoveMatchesSingleRequirements() (SelectionPredica } } return SelectionPredicate{ - Label: s.Label, - Field: fieldsSelector, - GetAttrs: s.GetAttrs, - IndexFields: s.IndexFields, + Label: s.Label, + Field: fieldsSelector, + IncludeUninitialized: s.IncludeUninitialized, + GetAttrs: s.GetAttrs, + IndexFields: s.IndexFields, }, nil } @@ -113,3 +121,8 @@ func (s *SelectionPredicate) MatcherIndex() []MatchValue { } return result } + +// Empty returns true if the predicate performs no filtering. +func (s *SelectionPredicate) Empty() bool { + return s.Label.Empty() && s.Field.Empty() && s.IncludeUninitialized +} diff --git a/staging/src/k8s.io/apiserver/pkg/storage/selection_predicate_test.go b/staging/src/k8s.io/apiserver/pkg/storage/selection_predicate_test.go index d51666e21be..3c5da649a63 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/selection_predicate_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/selection_predicate_test.go @@ -42,6 +42,7 @@ func TestSelectionPredicate(t *testing.T) { labelSelector, fieldSelector string labels labels.Set fields fields.Set + uninitialized bool err error shouldMatch bool matchSingleKey string @@ -74,6 +75,14 @@ func TestSelectionPredicate(t *testing.T) { shouldMatch: true, matchSingleKey: "12345", }, + "E": { + fieldSelector: "metadata.name=12345", + labels: labels.Set{}, + fields: fields.Set{"metadata.name": "12345"}, + uninitialized: true, + shouldMatch: false, + matchSingleKey: "12345", + }, "error": { labelSelector: "name=foo", fieldSelector: "uid=12345", @@ -94,8 +103,8 @@ func TestSelectionPredicate(t *testing.T) { sp := &SelectionPredicate{ Label: parsedLabel, Field: parsedField, - GetAttrs: func(runtime.Object) (label labels.Set, field fields.Set, err error) { - return item.labels, item.fields, item.err + GetAttrs: func(runtime.Object) (label labels.Set, field fields.Set, uninitialized bool, err error) { + return item.labels, item.fields, item.uninitialized, item.err }, } got, err := sp.Matches(&Ignored{}) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go b/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go index 619c3ea8368..ef0c4f38de7 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go @@ -61,12 +61,12 @@ func init() { } // GetAttrs returns labels and fields of a given object for filtering purposes. -func GetAttrs(obj runtime.Object) (labels.Set, fields.Set, error) { +func GetAttrs(obj runtime.Object) (labels.Set, fields.Set, bool, error) { pod, ok := obj.(*example.Pod) if !ok { - return nil, nil, fmt.Errorf("not a pod") + return nil, nil, false, fmt.Errorf("not a pod") } - return labels.Set(pod.ObjectMeta.Labels), PodToSelectableFields(pod), nil + return labels.Set(pod.ObjectMeta.Labels), PodToSelectableFields(pod), pod.Initializers != nil, nil } // PodToSelectableFields returns a field set that represents the object @@ -469,12 +469,12 @@ func TestFiltering(t *testing.T) { pred := storage.SelectionPredicate{ Label: labels.SelectorFromSet(labels.Set{"filter": "foo"}), Field: fields.Everything(), - GetAttrs: func(obj runtime.Object) (label labels.Set, field fields.Set, err error) { + GetAttrs: func(obj runtime.Object) (label labels.Set, field fields.Set, uninitialized bool, err error) { metadata, err := meta.Accessor(obj) if err != nil { t.Fatalf("Unexpected error: %v", err) } - return labels.Set(metadata.GetLabels()), nil, nil + return labels.Set(metadata.GetLabels()), nil, metadata.GetInitializers() != nil, nil }, } watcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", fooCreated.ResourceVersion, pred) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/watch_cache.go b/staging/src/k8s.io/apiserver/pkg/storage/watch_cache.go index beca63488d8..1268b9d7a51 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/watch_cache.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/watch_cache.go @@ -47,15 +47,17 @@ const ( // the previous value of the object to enable proper filtering in the // upper layers. type watchCacheEvent struct { - Type watch.EventType - Object runtime.Object - ObjLabels labels.Set - ObjFields fields.Set - PrevObject runtime.Object - PrevObjLabels labels.Set - PrevObjFields fields.Set - Key string - ResourceVersion uint64 + Type watch.EventType + Object runtime.Object + ObjLabels labels.Set + ObjFields fields.Set + ObjUninitialized bool + PrevObject runtime.Object + PrevObjLabels labels.Set + PrevObjFields fields.Set + PrevObjUninitialized bool + Key string + ResourceVersion uint64 } // Computing a key of an object is generally non-trivial (it performs @@ -102,7 +104,7 @@ type watchCache struct { keyFunc func(runtime.Object) (string, error) // getAttrsFunc is used to get labels and fields of an object. - getAttrsFunc func(runtime.Object) (labels.Set, fields.Set, error) + getAttrsFunc func(runtime.Object) (labels.Set, fields.Set, bool, error) // cache is used a cyclic buffer - its first element (with the smallest // resourceVersion) is defined by startIndex, its last element is defined @@ -136,7 +138,7 @@ type watchCache struct { func newWatchCache( capacity int, keyFunc func(runtime.Object) (string, error), - getAttrsFunc func(runtime.Object) (labels.Set, fields.Set, error)) *watchCache { + getAttrsFunc func(runtime.Object) (labels.Set, fields.Set, bool, error)) *watchCache { wc := &watchCache{ capacity: capacity, keyFunc: keyFunc, @@ -229,30 +231,33 @@ func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, upd if err != nil { return err } - objLabels, objFields, err := w.getAttrsFunc(event.Object) + objLabels, objFields, objUninitialized, err := w.getAttrsFunc(event.Object) if err != nil { return err } var prevObject runtime.Object var prevObjLabels labels.Set var prevObjFields fields.Set + var prevObjUninitialized bool if exists { prevObject = previous.(*storeElement).Object - prevObjLabels, prevObjFields, err = w.getAttrsFunc(prevObject) + prevObjLabels, prevObjFields, prevObjUninitialized, err = w.getAttrsFunc(prevObject) if err != nil { return err } } watchCacheEvent := &watchCacheEvent{ - Type: event.Type, - Object: event.Object, - ObjLabels: objLabels, - ObjFields: objFields, - PrevObject: prevObject, - PrevObjLabels: prevObjLabels, - PrevObjFields: prevObjFields, - Key: key, - ResourceVersion: resourceVersion, + Type: event.Type, + Object: event.Object, + ObjLabels: objLabels, + ObjFields: objFields, + ObjUninitialized: objUninitialized, + PrevObject: prevObject, + PrevObjLabels: prevObjLabels, + PrevObjFields: prevObjFields, + PrevObjUninitialized: prevObjUninitialized, + Key: key, + ResourceVersion: resourceVersion, } if w.onEvent != nil { w.onEvent(watchCacheEvent) @@ -425,17 +430,18 @@ func (w *watchCache) GetAllEventsSinceThreadUnsafe(resourceVersion uint64) ([]*w if !ok { return nil, fmt.Errorf("not a storeElement: %v", elem) } - objLabels, objFields, err := w.getAttrsFunc(elem.Object) + objLabels, objFields, objUninitialized, err := w.getAttrsFunc(elem.Object) if err != nil { return nil, err } result[i] = &watchCacheEvent{ - Type: watch.Added, - Object: elem.Object, - ObjLabels: objLabels, - ObjFields: objFields, - Key: elem.Key, - ResourceVersion: w.resourceVersion, + Type: watch.Added, + Object: elem.Object, + ObjLabels: objLabels, + ObjFields: objFields, + ObjUninitialized: objUninitialized, + Key: elem.Key, + ResourceVersion: w.resourceVersion, } } return result, nil diff --git a/staging/src/k8s.io/apiserver/pkg/storage/watch_cache_test.go b/staging/src/k8s.io/apiserver/pkg/storage/watch_cache_test.go index 744326d5ba6..9f8a360e00e 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/watch_cache_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/watch_cache_test.go @@ -50,8 +50,8 @@ func newTestWatchCache(capacity int) *watchCache { keyFunc := func(obj runtime.Object) (string, error) { return NamespaceKeyFunc("prefix", obj) } - getAttrsFunc := func(obj runtime.Object) (labels.Set, fields.Set, error) { - return nil, nil, nil + getAttrsFunc := func(obj runtime.Object) (labels.Set, fields.Set, bool, error) { + return nil, nil, false, nil } wc := newWatchCache(capacity, keyFunc, getAttrsFunc) wc.clock = clock.NewFakeClock(time.Now()) diff --git a/staging/src/k8s.io/kube-aggregator/pkg/registry/apiservice/strategy.go b/staging/src/k8s.io/kube-aggregator/pkg/registry/apiservice/strategy.go index a07d5439426..fdcb6637f76 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/registry/apiservice/strategy.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/registry/apiservice/strategy.go @@ -113,12 +113,12 @@ func (apiServerStatusStrategy) ValidateUpdate(ctx genericapirequest.Context, obj return validation.ValidateAPIServiceStatusUpdate(obj.(*apiregistration.APIService), old.(*apiregistration.APIService)) } -func GetAttrs(obj runtime.Object) (labels.Set, fields.Set, error) { +func GetAttrs(obj runtime.Object) (labels.Set, fields.Set, bool, error) { apiserver, ok := obj.(*apiregistration.APIService) if !ok { - return nil, nil, fmt.Errorf("given object is not a APIService.") + return nil, nil, false, fmt.Errorf("given object is not a APIService.") } - return labels.Set(apiserver.ObjectMeta.Labels), APIServiceToSelectableFields(apiserver), nil + return labels.Set(apiserver.ObjectMeta.Labels), APIServiceToSelectableFields(apiserver), apiserver.Initializers != nil, nil } // MatchAPIService is the filter used by the generic etcd backend to watch events diff --git a/staging/src/k8s.io/kube-apiextensions-server/pkg/registry/customresource/strategy.go b/staging/src/k8s.io/kube-apiextensions-server/pkg/registry/customresource/strategy.go index c2b739f3cec..bafe064ac1c 100644 --- a/staging/src/k8s.io/kube-apiextensions-server/pkg/registry/customresource/strategy.go +++ b/staging/src/k8s.io/kube-apiextensions-server/pkg/registry/customresource/strategy.go @@ -83,12 +83,12 @@ func (CustomResourceDefinitionStorageStrategy) ValidateUpdate(ctx genericapirequ return validation.ValidateObjectMetaAccessorUpdate(objAccessor, oldAccessor, field.NewPath("metadata")) } -func (a CustomResourceDefinitionStorageStrategy) GetAttrs(obj runtime.Object) (labels.Set, fields.Set, error) { +func (a CustomResourceDefinitionStorageStrategy) GetAttrs(obj runtime.Object) (labels.Set, fields.Set, bool, error) { accessor, err := meta.Accessor(obj) if err != nil { - return nil, nil, err + return nil, nil, false, err } - return labels.Set(accessor.GetLabels()), objectMetaFieldsSet(accessor, a.namespaceScoped), nil + return labels.Set(accessor.GetLabels()), objectMetaFieldsSet(accessor, a.namespaceScoped), accessor.GetInitializers() != nil, nil } // objectMetaFieldsSet returns a fields that represent the ObjectMeta. diff --git a/staging/src/k8s.io/kube-apiextensions-server/pkg/registry/customresourcedefinition/strategy.go b/staging/src/k8s.io/kube-apiextensions-server/pkg/registry/customresourcedefinition/strategy.go index 07e77c5204d..e802102bbe6 100644 --- a/staging/src/k8s.io/kube-apiextensions-server/pkg/registry/customresourcedefinition/strategy.go +++ b/staging/src/k8s.io/kube-apiextensions-server/pkg/registry/customresourcedefinition/strategy.go @@ -107,12 +107,12 @@ func (statusStrategy) ValidateUpdate(ctx genericapirequest.Context, obj, old run return validation.ValidateUpdateCustomResourceDefinitionStatus(obj.(*apiextensions.CustomResourceDefinition), old.(*apiextensions.CustomResourceDefinition)) } -func GetAttrs(obj runtime.Object) (labels.Set, fields.Set, error) { +func GetAttrs(obj runtime.Object) (labels.Set, fields.Set, bool, error) { apiserver, ok := obj.(*apiextensions.CustomResourceDefinition) if !ok { - return nil, nil, fmt.Errorf("given object is not a CustomResourceDefinition.") + return nil, nil, false, fmt.Errorf("given object is not a CustomResourceDefinition.") } - return labels.Set(apiserver.ObjectMeta.Labels), CustomResourceDefinitionToSelectableFields(apiserver), nil + return labels.Set(apiserver.ObjectMeta.Labels), CustomResourceDefinitionToSelectableFields(apiserver), apiserver.Initializers != nil, nil } // MatchCustomResourceDefinition is the filter used by the generic etcd backend to watch events diff --git a/staging/src/k8s.io/sample-apiserver/pkg/registry/wardle/strategy.go b/staging/src/k8s.io/sample-apiserver/pkg/registry/wardle/strategy.go index 12b1a6a8172..24a4cb16cb3 100644 --- a/staging/src/k8s.io/sample-apiserver/pkg/registry/wardle/strategy.go +++ b/staging/src/k8s.io/sample-apiserver/pkg/registry/wardle/strategy.go @@ -71,12 +71,12 @@ func (apiServerStrategy) ValidateUpdate(ctx genericapirequest.Context, obj, old // return validation.ValidateFlunderUpdate(obj.(*wardle.Flunder), old.(*wardle.Flunder)) } -func GetAttrs(obj runtime.Object) (labels.Set, fields.Set, error) { +func GetAttrs(obj runtime.Object) (labels.Set, fields.Set, bool, error) { apiserver, ok := obj.(*wardle.Flunder) if !ok { - return nil, nil, fmt.Errorf("given object is not a Flunder.") + return nil, nil, false, fmt.Errorf("given object is not a Flunder.") } - return labels.Set(apiserver.ObjectMeta.Labels), FlunderToSelectableFields(apiserver), nil + return labels.Set(apiserver.ObjectMeta.Labels), FlunderToSelectableFields(apiserver), apiserver.Initializers != nil, nil } // MatchFlunder is the filter used by the generic etcd backend to watch events diff --git a/test/e2e/BUILD b/test/e2e/BUILD index 5155a8bd6f4..1896139378e 100644 --- a/test/e2e/BUILD +++ b/test/e2e/BUILD @@ -23,6 +23,7 @@ go_test( "//pkg/metrics:go_default_library", "//test/e2e/autoscaling:go_default_library", "//test/e2e/cluster-logging:go_default_library", + "//test/e2e/extension:go_default_library", "//test/e2e/framework:go_default_library", "//test/e2e/perf:go_default_library", "//test/e2e/scheduling:go_default_library", @@ -234,6 +235,7 @@ filegroup( "//test/e2e/chaosmonkey:all-srcs", "//test/e2e/cluster-logging:all-srcs", "//test/e2e/common:all-srcs", + "//test/e2e/extension:all-srcs", "//test/e2e/framework:all-srcs", "//test/e2e/generated:all-srcs", "//test/e2e/perf:all-srcs", diff --git a/test/e2e/e2e_test.go b/test/e2e/e2e_test.go index 0d382763696..fe4672e9051 100644 --- a/test/e2e/e2e_test.go +++ b/test/e2e/e2e_test.go @@ -21,6 +21,7 @@ import ( _ "k8s.io/kubernetes/test/e2e/autoscaling" _ "k8s.io/kubernetes/test/e2e/cluster-logging" + _ "k8s.io/kubernetes/test/e2e/extension" "k8s.io/kubernetes/test/e2e/framework" _ "k8s.io/kubernetes/test/e2e/perf" _ "k8s.io/kubernetes/test/e2e/scheduling" diff --git a/test/e2e/extension/BUILD b/test/e2e/extension/BUILD new file mode 100644 index 00000000000..bc9bf9064a4 --- /dev/null +++ b/test/e2e/extension/BUILD @@ -0,0 +1,36 @@ +package(default_visibility = ["//visibility:public"]) + +licenses(["notice"]) + +load( + "@io_bazel_rules_go//go:def.bzl", + "go_library", +) + +go_library( + name = "go_default_library", + srcs = ["initializers.go"], + tags = ["automanaged"], + deps = [ + "//pkg/api/v1:go_default_library", + "//test/e2e/framework:go_default_library", + "//vendor/github.com/onsi/ginkgo:go_default_library", + "//vendor/github.com/onsi/gomega:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", + ], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], +) diff --git a/test/e2e/extension/initializers.go b/test/e2e/extension/initializers.go new file mode 100644 index 00000000000..49922ac2b3f --- /dev/null +++ b/test/e2e/extension/initializers.go @@ -0,0 +1,121 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package extension + +import ( + "fmt" + "strings" + "time" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/kubernetes/pkg/api/v1" + "k8s.io/kubernetes/test/e2e/framework" +) + +var _ = framework.KubeDescribe("Initializers", func() { + f := framework.NewDefaultFramework("initializers") + + // TODO: Add failure traps once we have JustAfterEach + // See https://github.com/onsi/ginkgo/issues/303 + + It("should be invisible to controllers by default", func() { + ns := f.Namespace.Name + c := f.ClientSet + + podName := "uninitialized-pod" + framework.Logf("Creating pod %s", podName) + + ch := make(chan struct{}) + go func() { + _, err := c.Core().Pods(ns).Create(newUninitializedPod(podName)) + Expect(err).NotTo(HaveOccurred()) + close(ch) + }() + + // wait to ensure the scheduler does not act on an uninitialized pod + err := wait.PollImmediate(2*time.Second, 15*time.Second, func() (bool, error) { + p, err := c.Core().Pods(ns).Get(podName, metav1.GetOptions{}) + if err != nil { + if errors.IsNotFound(err) { + return false, nil + } + return false, err + } + return len(p.Spec.NodeName) > 0, nil + }) + Expect(err).To(Equal(wait.ErrWaitTimeout)) + + // verify that we can update an initializing pod + pod, err := c.Core().Pods(ns).Get(podName, metav1.GetOptions{}) + pod.Annotations = map[string]string{"update-1": "test"} + pod, err = c.Core().Pods(ns).Update(pod) + Expect(err).NotTo(HaveOccurred()) + + // clear initializers + pod.Initializers = nil + pod, err = c.Core().Pods(ns).Update(pod) + Expect(err).NotTo(HaveOccurred()) + + // pod should now start running + err = framework.WaitForPodRunningInNamespace(c, pod) + Expect(err).NotTo(HaveOccurred()) + + // ensure create call returns + <-ch + + // verify that we cannot start the pod initializing again + pod, err = c.Core().Pods(ns).Get(podName, metav1.GetOptions{}) + Expect(err).NotTo(HaveOccurred()) + pod.Initializers = &metav1.Initializers{ + Pending: []metav1.Initializer{{Name: "Other"}}, + } + _, err = c.Core().Pods(ns).Update(pod) + if !errors.IsInvalid(err) || !strings.Contains(err.Error(), "immutable") { + Fail(fmt.Sprintf("expected invalid error: %v", err)) + } + }) + +}) + +func newUninitializedPod(podName string) *v1.Pod { + containerName := fmt.Sprintf("%s-container", podName) + port := 8080 + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: podName, + Initializers: &metav1.Initializers{ + Pending: []metav1.Initializer{{Name: "Test"}}, + }, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: containerName, + Image: "gcr.io/google_containers/porter:4524579c0eb935c056c8e75563b4e1eda31587e0", + Env: []v1.EnvVar{{Name: fmt.Sprintf("SERVE_PORT_%d", port), Value: "foo"}}, + Ports: []v1.ContainerPort{{ContainerPort: int32(port)}}, + }, + }, + RestartPolicy: v1.RestartPolicyNever, + }, + } + return pod +}