diff --git a/pkg/kubeapiserver/admission/util/initializer.go b/pkg/kubeapiserver/admission/util/initializer.go index 165c4ee9ba8..d20ba439c5a 100644 --- a/pkg/kubeapiserver/admission/util/initializer.go +++ b/pkg/kubeapiserver/admission/util/initializer.go @@ -18,11 +18,12 @@ package util import ( "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/util/initialization" "k8s.io/apiserver/pkg/admission" ) -// IsUpdatingInitializedObject returns if the operation is trying to update an -// already initialized object. +// IsUpdatingInitializedObject returns true if the operation is trying to update +// an already initialized object. func IsUpdatingInitializedObject(a admission.Attributes) (bool, error) { if a.GetOperation() != admission.Update { return false, nil @@ -32,14 +33,14 @@ func IsUpdatingInitializedObject(a admission.Attributes) (bool, error) { if err != nil { return false, err } - if accessor.GetInitializers() == nil { + if initialization.IsInitialized(accessor.GetInitializers()) { return true, nil } return false, nil } -// IsUpdatingUninitializedObject returns if the operation is trying to update an -// object that is not initialized yet. +// IsUpdatingUninitializedObject returns true if the operation is trying to +// update an object that is not initialized yet. func IsUpdatingUninitializedObject(a admission.Attributes) (bool, error) { if a.GetOperation() != admission.Update { return false, nil @@ -49,8 +50,30 @@ func IsUpdatingUninitializedObject(a admission.Attributes) (bool, error) { if err != nil { return false, err } - if accessor.GetInitializers() == nil { + if initialization.IsInitialized(accessor.GetInitializers()) { return false, nil } return true, nil } + +// IsInitializationCompletion returns true if the operation removes all pending +// initializers. +func IsInitializationCompletion(a admission.Attributes) (bool, error) { + if a.GetOperation() != admission.Update { + return false, nil + } + oldObj := a.GetOldObject() + oldInitialized, err := initialization.IsObjectInitialized(oldObj) + if err != nil { + return false, err + } + if oldInitialized { + return false, nil + } + newObj := a.GetObject() + newInitialized, err := initialization.IsObjectInitialized(newObj) + if err != nil { + return false, err + } + return newInitialized, nil +} diff --git a/pkg/quota/evaluator/core/persistent_volume_claims.go b/pkg/quota/evaluator/core/persistent_volume_claims.go index c7a3a61ea44..9759c492e68 100644 --- a/pkg/quota/evaluator/core/persistent_volume_claims.go +++ b/pkg/quota/evaluator/core/persistent_volume_claims.go @@ -25,15 +25,18 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/initialization" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apiserver/pkg/admission" + "k8s.io/apiserver/pkg/features" utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/informers" clientset "k8s.io/client-go/kubernetes" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/helper" k8s_api_v1 "k8s.io/kubernetes/pkg/api/v1" - "k8s.io/kubernetes/pkg/features" + k8sfeatures "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/kubeapiserver/admission/util" "k8s.io/kubernetes/pkg/quota" "k8s.io/kubernetes/pkg/quota/generic" @@ -149,17 +152,26 @@ func (p *pvcEvaluator) Handles(a admission.Attributes) bool { if op == admission.Create { return true } - if op == admission.Update && utilfeature.DefaultFeatureGate.Enabled(features.ExpandPersistentVolumes) { - return true + if op == admission.Update && utilfeature.DefaultFeatureGate.Enabled(k8sfeatures.ExpandPersistentVolumes) { + initialized, err := initialization.IsObjectInitialized(a.GetObject()) + if err != nil { + // fail closed, will try to give an evaluation. + utilruntime.HandleError(err) + return true + } + // only handle the update if the object is initialized after the update. + return initialized } - - updateUninitialized, err := util.IsUpdatingUninitializedObject(a) + // TODO: when the ExpandPersistentVolumes feature gate is removed, remove + // the initializationCompletion check as well, because it will become a + // subset of the "initialized" condition. + initializationCompletion, err := util.IsInitializationCompletion(a) if err != nil { // fail closed, will try to give an evaluation. + utilruntime.HandleError(err) return true } - // only uninitialized pvc might be updated. - return updateUninitialized + return initializationCompletion } // Matches returns true if the evaluator matches the specified quota with the provided input item @@ -194,10 +206,16 @@ func (p *pvcEvaluator) Usage(item runtime.Object) (api.ResourceList, error) { if err != nil { return result, err } - storageClassRef := helper.GetPersistentVolumeClaimClass(pvc) // charge for claim result[api.ResourcePersistentVolumeClaims] = resource.MustParse("1") + if utilfeature.DefaultFeatureGate.Enabled(features.Initializers) { + if !initialization.IsInitialized(pvc.Initializers) { + // Only charge pvc count for uninitialized pvc. + return result, nil + } + } + storageClassRef := helper.GetPersistentVolumeClaimClass(pvc) if len(storageClassRef) > 0 { storageClassClaim := api.ResourceName(storageClassRef + storageClassSuffix + string(api.ResourcePersistentVolumeClaims)) result[storageClassClaim] = resource.MustParse("1") diff --git a/pkg/quota/evaluator/core/pods.go b/pkg/quota/evaluator/core/pods.go index 0417bb8eccb..7272ed5e26c 100644 --- a/pkg/quota/evaluator/core/pods.go +++ b/pkg/quota/evaluator/core/pods.go @@ -25,9 +25,13 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/initialization" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/validation/field" "k8s.io/apiserver/pkg/admission" + "k8s.io/apiserver/pkg/features" + utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/informers" clientset "k8s.io/client-go/kubernetes" "k8s.io/kubernetes/pkg/api" @@ -135,19 +139,20 @@ func (p *podEvaluator) GroupKind() schema.GroupKind { return api.Kind("Pod") } -// Handles returns true of the evaluator should handle the specified attributes. +// Handles returns true if the evaluator should handle the specified attributes. func (p *podEvaluator) Handles(a admission.Attributes) bool { op := a.GetOperation() if op == admission.Create { return true } - updateUninitialized, err := util.IsUpdatingUninitializedObject(a) + initializationCompletion, err := util.IsInitializationCompletion(a) if err != nil { // fail closed, will try to give an evaluation. + utilruntime.HandleError(err) return true } // only uninitialized pods might be updated. - return updateUninitialized + return initializationCompletion } // Matches returns true if the evaluator matches the specified quota with the provided input item @@ -254,10 +259,19 @@ func PodUsageFunc(obj runtime.Object) (api.ResourceList, error) { if err != nil { return api.ResourceList{}, err } + // by convention, we do not quota pods that have reached an end-of-life state if !QuotaPod(pod) { return api.ResourceList{}, nil } + // Only charge pod count for uninitialized pod. + if utilfeature.DefaultFeatureGate.Enabled(features.Initializers) { + if !initialization.IsInitialized(pod.Initializers) { + result := api.ResourceList{} + result[api.ResourcePods] = resource.MustParse("1") + return result, nil + } + } requests := api.ResourceList{} limits := api.ResourceList{} // TODO: fix this when we have pod level cgroups diff --git a/staging/src/k8s.io/apimachinery/pkg/util/initialization/initialization.go b/staging/src/k8s.io/apimachinery/pkg/util/initialization/initialization.go new file mode 100644 index 00000000000..341b5955748 --- /dev/null +++ b/staging/src/k8s.io/apimachinery/pkg/util/initialization/initialization.go @@ -0,0 +1,48 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package initialization + +import ( + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" +) + +// IsInitialized returns if the initializers indicates means initialized. +func IsInitialized(initializers *metav1.Initializers) bool { + if initializers == nil { + return true + } + // Persisted objects will never be in this state. The initializer admission + // plugin will override metadata.initializers to nil. If the initializer + // admissio plugin is disabled, the generic registry always set + // metadata.initializers to nil. However, this function + // might be called before the object persisted, thus the check. + if len(initializers.Pending) == 0 && initializers.Result == nil { + return true + } + return false +} + +// IsObjectInitialized returns if the object is initialized. +func IsObjectInitialized(obj runtime.Object) (bool, error) { + accessor, err := meta.Accessor(obj) + if err != nil { + return false, err + } + return IsInitialized(accessor.GetInitializers()), nil +} diff --git a/test/e2e/resource_quota.go b/test/e2e/resource_quota.go index 0e87c16758c..1a58a07055a 100644 --- a/test/e2e/resource_quota.go +++ b/test/e2e/resource_quota.go @@ -168,42 +168,45 @@ var _ = framework.KubeDescribe("ResourceQuota", func() { requests[v1.ResourceMemory] = resource.MustParse("252Mi") pod := newTestPodForQuota(f, podName, requests, v1.ResourceList{}) pod.Initializers = &metav1.Initializers{Pending: []metav1.Initializer{{Name: "unhandled"}}} - _, err = f.ClientSet.Core().Pods(f.Namespace.Name).Create(pod) + pod, err = f.ClientSet.Core().Pods(f.Namespace.Name).Create(pod) // because no one is handling the initializer, server will return a 504 timeout if err != nil && !errors.IsTimeout(err) { framework.Failf("expect err to be timeout error, got %v", err) } - podToUpdate, err := f.ClientSet.Core().Pods(f.Namespace.Name).Get(podName, metav1.GetOptions{}) + createdPod, err := f.ClientSet.Core().Pods(f.Namespace.Name).Get(podName, metav1.GetOptions{}) Expect(err).NotTo(HaveOccurred()) - By("Ensuring ResourceQuota status captures the pod usage") + By("Ensuring only pod count is charged") + usedResources = v1.ResourceList{} usedResources[v1.ResourceQuotas] = resource.MustParse("1") usedResources[v1.ResourcePods] = resource.MustParse("1") - usedResources[v1.ResourceCPU] = requests[v1.ResourceCPU] - usedResources[v1.ResourceMemory] = requests[v1.ResourceMemory] err = waitForResourceQuota(f.ClientSet, f.Namespace.Name, quotaName, usedResources) Expect(err).NotTo(HaveOccurred()) - By("Not allowing an uninitialized pod to be created that exceeds remaining quota") - requests = v1.ResourceList{} - requests[v1.ResourceCPU] = resource.MustParse("600m") - requests[v1.ResourceMemory] = resource.MustParse("100Mi") - pod = newTestPodForQuota(f, "fail-pod", requests, v1.ResourceList{}) - pod.Initializers = &metav1.Initializers{Pending: []metav1.Initializer{{Name: "unhandled"}}} - pod, err = f.ClientSet.Core().Pods(f.Namespace.Name).Create(pod) - Expect(err).To(HaveOccurred()) - fmt.Println("CHAO: err=", err) - By("Ensuring an uninitialized pod can update its resource requirements") // a pod cannot dynamically update its resource requirements. requests = v1.ResourceList{} requests[v1.ResourceCPU] = resource.MustParse("100m") requests[v1.ResourceMemory] = resource.MustParse("100Mi") - podToUpdate.Spec.Containers[0].Resources.Requests = requests - _, err = f.ClientSet.Core().Pods(f.Namespace.Name).Update(podToUpdate) + _, err = framework.UpdatePodWithRetries(f.ClientSet, f.Namespace.Name, createdPod.Name, func(p *v1.Pod) { + p.Spec.Containers[0].Resources.Requests = requests + }) Expect(err).NotTo(HaveOccurred()) - By("Ensuring attempts to update pod resource requirements did change quota usage") + By("Ensuring ResourceQuota status doesn't change") + usedResources = v1.ResourceList{} + usedResources[v1.ResourceQuotas] = resource.MustParse("1") + usedResources[v1.ResourcePods] = resource.MustParse("1") + err = waitForResourceQuota(f.ClientSet, f.Namespace.Name, quotaName, usedResources) + Expect(err).NotTo(HaveOccurred()) + + By("Allowing initializing a Pod that fits quota") + _, err = framework.UpdatePodWithRetries(f.ClientSet, f.Namespace.Name, createdPod.Name, func(p *v1.Pod) { + p.Initializers = nil + }) + Expect(err).NotTo(HaveOccurred()) + + By("Ensuring ResourceQuota status captures the usage of the intialized pod") usedResources[v1.ResourceQuotas] = resource.MustParse("1") usedResources[v1.ResourcePods] = resource.MustParse("1") usedResources[v1.ResourceCPU] = requests[v1.ResourceCPU] @@ -211,20 +214,64 @@ var _ = framework.KubeDescribe("ResourceQuota", func() { err = waitForResourceQuota(f.ClientSet, f.Namespace.Name, quotaName, usedResources) Expect(err).NotTo(HaveOccurred()) - // TODO: uncomment the test after 51247 is merged, in which the - // replenishment_controller uses the sharedInformer that list/watches - // uninitialized objects. - // By("Deleting the pod") - // err = f.ClientSet.Core().Pods(f.Namespace.Name).Delete(podName, metav1.NewDeleteOptions(0)) - // Expect(err).NotTo(HaveOccurred()) - // - // By("Ensuring resource quota status released the pod usage") - // usedResources[v1.ResourceQuotas] = resource.MustParse("1") - // usedResources[v1.ResourcePods] = resource.MustParse("0") - // usedResources[v1.ResourceCPU] = resource.MustParse("0") - // usedResources[v1.ResourceMemory] = resource.MustParse("0") - // err = waitForResourceQuota(f.ClientSet, f.Namespace.Name, quotaName, usedResources) - // Expect(err).NotTo(HaveOccurred()) + By("Deleting the pod") + err = f.ClientSet.Core().Pods(f.Namespace.Name).Delete(createdPod.Name, metav1.NewDeleteOptions(0)) + Expect(err).NotTo(HaveOccurred()) + + By("Ensuring resource quota status released the pod usage") + usedResources[v1.ResourceQuotas] = resource.MustParse("1") + usedResources[v1.ResourcePods] = resource.MustParse("0") + usedResources[v1.ResourceCPU] = resource.MustParse("0") + usedResources[v1.ResourceMemory] = resource.MustParse("0") + err = waitForResourceQuota(f.ClientSet, f.Namespace.Name, quotaName, usedResources) + Expect(err).NotTo(HaveOccurred()) + + By("Allowing creating an uninitialized pod that exceeds remaining quota") + requests = v1.ResourceList{} + requests[v1.ResourceCPU] = resource.MustParse("1100m") + requests[v1.ResourceMemory] = resource.MustParse("100Mi") + podName = "too-large-pod" + pod = newTestPodForQuota(f, podName, requests, v1.ResourceList{}) + pod.Initializers = &metav1.Initializers{Pending: []metav1.Initializer{{Name: "unhandled"}}} + _, err = f.ClientSet.Core().Pods(f.Namespace.Name).Create(pod) + // because no one is handling the initializer, server will return a 504 timeout + if err != nil && !errors.IsTimeout(err) { + framework.Failf("expect err to be timeout error, got %v", err) + } + createdPod, err = f.ClientSet.Core().Pods(f.Namespace.Name).Get(podName, metav1.GetOptions{}) + Expect(err).NotTo(HaveOccurred()) + + By("Ensuring only charges pod count") + usedResources = v1.ResourceList{} + usedResources[v1.ResourceQuotas] = resource.MustParse("1") + usedResources[v1.ResourcePods] = resource.MustParse("1") + err = waitForResourceQuota(f.ClientSet, f.Namespace.Name, quotaName, usedResources) + Expect(err).NotTo(HaveOccurred()) + + By("Disallowing initializing a Pod that doesn't fit quota") + _, err = framework.UpdatePodWithRetries(f.ClientSet, f.Namespace.Name, createdPod.Name, func(p *v1.Pod) { + p.Initializers = nil + }) + Expect(err).To(HaveOccurred()) + + By("Ensuring ResourceQuota status doesn't change") + usedResources = v1.ResourceList{} + usedResources[v1.ResourceQuotas] = resource.MustParse("1") + usedResources[v1.ResourcePods] = resource.MustParse("1") + err = waitForResourceQuota(f.ClientSet, f.Namespace.Name, quotaName, usedResources) + Expect(err).NotTo(HaveOccurred()) + + By("Deleting the pod") + err = f.ClientSet.Core().Pods(f.Namespace.Name).Delete(createdPod.Name, metav1.NewDeleteOptions(0)) + Expect(err).NotTo(HaveOccurred()) + + By("Ensuring ResourceQuota status doesn't change") + usedResources = v1.ResourceList{} + usedResources[v1.ResourceQuotas] = resource.MustParse("1") + // TODO: This is a bug. We need 51247 to fix it. + usedResources[v1.ResourcePods] = resource.MustParse("1") + err = waitForResourceQuota(f.ClientSet, f.Namespace.Name, quotaName, usedResources) + Expect(err).NotTo(HaveOccurred()) }) It("should create a ResourceQuota and capture the life of a pod.", func() {