From eb0f003d25209c850b47a275358aea53252274b4 Mon Sep 17 00:00:00 2001 From: carlory Date: Mon, 10 Feb 2025 14:30:50 +0800 Subject: [PATCH] Fix a bug where the `ResourceQuota` admission plugin does not respect ANY scope change when a resource is being updated. i.e. to set/unset an existing pod's `terminationGracePeriodSeconds` field. --- pkg/quota/v1/evaluator/core/pods.go | 17 ++- pkg/quota/v1/evaluator/core/pods_test.go | 16 +++ pkg/quota/v1/install/update_filter.go | 6 + .../admission/resourcequota/admission_test.go | 122 ++++++++++++++++++ .../plugin/resourcequota/controller.go | 71 ++++++++-- test/e2e/apimachinery/resource_quota.go | 60 +++++++-- 6 files changed, 267 insertions(+), 25 deletions(-) diff --git a/pkg/quota/v1/evaluator/core/pods.go b/pkg/quota/v1/evaluator/core/pods.go index 8efdec6cc2b..8f91588db2e 100644 --- a/pkg/quota/v1/evaluator/core/pods.go +++ b/pkg/quota/v1/evaluator/core/pods.go @@ -170,6 +170,17 @@ func (p *podEvaluator) Handles(a admission.Attributes) bool { op := a.GetOperation() switch a.GetSubresource() { case "": + if op == admission.Update { + pod, err1 := toExternalPodOrError(a.GetObject()) + oldPod, err2 := toExternalPodOrError(a.GetOldObject()) + if err1 != nil || err2 != nil { + return false + } + // when scope changed + if IsTerminating(oldPod) != IsTerminating(pod) { + return true + } + } return op == admission.Create case "resize": return op == admission.Update @@ -333,9 +344,9 @@ func podMatchesScopeFunc(selector corev1.ScopedResourceSelectorRequirement, obje } switch selector.ScopeName { case corev1.ResourceQuotaScopeTerminating: - return isTerminating(pod), nil + return IsTerminating(pod), nil case corev1.ResourceQuotaScopeNotTerminating: - return !isTerminating(pod), nil + return !IsTerminating(pod), nil case corev1.ResourceQuotaScopeBestEffort: return isBestEffort(pod), nil case corev1.ResourceQuotaScopeNotBestEffort: @@ -393,7 +404,7 @@ func isBestEffort(pod *corev1.Pod) bool { return qos.GetPodQOS(pod) == corev1.PodQOSBestEffort } -func isTerminating(pod *corev1.Pod) bool { +func IsTerminating(pod *corev1.Pod) bool { if pod.Spec.ActiveDeadlineSeconds != nil && *pod.Spec.ActiveDeadlineSeconds >= int64(0) { return true } diff --git a/pkg/quota/v1/evaluator/core/pods_test.go b/pkg/quota/v1/evaluator/core/pods_test.go index 15f9945f29c..b7c99e416f2 100644 --- a/pkg/quota/v1/evaluator/core/pods_test.go +++ b/pkg/quota/v1/evaluator/core/pods_test.go @@ -38,6 +38,7 @@ import ( "k8s.io/kubernetes/pkg/util/node" "k8s.io/utils/clock" testingclock "k8s.io/utils/clock/testing" + "k8s.io/utils/ptr" ) func TestPodConstraintsFunc(t *testing.T) { @@ -1297,6 +1298,21 @@ func TestPodEvaluatorHandles(t *testing.T) { attrs: admission.NewAttributesRecord(nil, nil, schema.GroupVersionKind{Group: "core", Version: "v1", Kind: "Pod"}, "", "", schema.GroupVersionResource{Group: "core", Version: "v1", Resource: "pods"}, "", admission.Create, nil, false, nil), want: true, }, + { + name: "update-activeDeadlineSeconds-to-nil", + attrs: admission.NewAttributesRecord(&corev1.Pod{}, &corev1.Pod{Spec: corev1.PodSpec{ActiveDeadlineSeconds: ptr.To[int64](1)}}, schema.GroupVersionKind{Group: "core", Version: "v1", Kind: "Pod"}, "", "", schema.GroupVersionResource{Group: "core", Version: "v1", Resource: "pods"}, "", admission.Update, nil, false, nil), + want: true, + }, + { + name: "update-activeDeadlineSeconds-from-nil", + attrs: admission.NewAttributesRecord(&corev1.Pod{Spec: corev1.PodSpec{ActiveDeadlineSeconds: ptr.To[int64](1)}}, &corev1.Pod{}, schema.GroupVersionKind{Group: "core", Version: "v1", Kind: "Pod"}, "", "", schema.GroupVersionResource{Group: "core", Version: "v1", Resource: "pods"}, "", admission.Update, nil, false, nil), + want: true, + }, + { + name: "update-activeDeadlineSeconds-with-different-values", + attrs: admission.NewAttributesRecord(&corev1.Pod{Spec: corev1.PodSpec{ActiveDeadlineSeconds: ptr.To[int64](1)}}, &corev1.Pod{Spec: corev1.PodSpec{ActiveDeadlineSeconds: ptr.To[int64](2)}}, schema.GroupVersionKind{Group: "core", Version: "v1", Kind: "Pod"}, "", "", schema.GroupVersionResource{Group: "core", Version: "v1", Resource: "pods"}, "", admission.Update, nil, false, nil), + want: false, + }, { name: "update", attrs: admission.NewAttributesRecord(nil, nil, schema.GroupVersionKind{Group: "core", Version: "v1", Kind: "Pod"}, "", "", schema.GroupVersionResource{Group: "core", Version: "v1", Resource: "pods"}, "", admission.Update, nil, false, nil), diff --git a/pkg/quota/v1/install/update_filter.go b/pkg/quota/v1/install/update_filter.go index 507ff05ad87..9a4ce799727 100644 --- a/pkg/quota/v1/install/update_filter.go +++ b/pkg/quota/v1/install/update_filter.go @@ -38,6 +38,12 @@ func DefaultUpdateFilter() func(resource schema.GroupVersionResource, oldObj, ne if feature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) && hasResourcesChanged(oldPod, newPod) { return true } + + // when scope changed + if core.IsTerminating(oldPod) != core.IsTerminating(newPod) { + return true + } + return core.QuotaV1Pod(oldPod, clock.RealClock{}) && !core.QuotaV1Pod(newPod, clock.RealClock{}) case schema.GroupResource{Resource: "services"}: oldService := oldObj.(*v1.Service) diff --git a/plugin/pkg/admission/resourcequota/admission_test.go b/plugin/pkg/admission/resourcequota/admission_test.go index 2f49cfdb8b2..080f52ec5f3 100644 --- a/plugin/pkg/admission/resourcequota/admission_test.go +++ b/plugin/pkg/admission/resourcequota/admission_test.go @@ -860,6 +860,128 @@ func TestAdmitBelowTerminatingQuotaLimit(t *testing.T) { } } +// TestAdmitBelowTerminatingQuotaLimitWhenPodScopeUpdated ensures that terminating pods are charged to the right quota. +// It creates a terminating and non-terminating quota, and changes an existing pod to be terminating. +// It ensures that the terminating quota is incremented, and the non-terminating quota is not. +// +// The Quota admission is intended to fail "high" in this case, and depends on a controller reconciling actual persisted +// use to lower / free the reserved quota. We need always overcount in the admission plugin if something later causes +// the request to be rejected, so you can not reduce quota with requests that aren't completed. +func TestAdmitBelowTerminatingQuotaLimitWhenPodScopeUpdated(t *testing.T) { + resourceQuotaNonTerminating := &corev1.ResourceQuota{ + ObjectMeta: metav1.ObjectMeta{Name: "quota-non-terminating", Namespace: "test", ResourceVersion: "124"}, + Spec: corev1.ResourceQuotaSpec{ + Scopes: []corev1.ResourceQuotaScope{corev1.ResourceQuotaScopeNotTerminating}, + }, + Status: corev1.ResourceQuotaStatus{ + Hard: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("3"), + corev1.ResourceMemory: resource.MustParse("100Gi"), + corev1.ResourcePods: resource.MustParse("5"), + }, + Used: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("1"), + corev1.ResourceMemory: resource.MustParse("50Gi"), + corev1.ResourcePods: resource.MustParse("3"), + }, + }, + } + resourceQuotaTerminating := &corev1.ResourceQuota{ + ObjectMeta: metav1.ObjectMeta{Name: "quota-terminating", Namespace: "test", ResourceVersion: "124"}, + Spec: corev1.ResourceQuotaSpec{ + Scopes: []corev1.ResourceQuotaScope{corev1.ResourceQuotaScopeTerminating}, + }, + Status: corev1.ResourceQuotaStatus{ + Hard: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("3"), + corev1.ResourceMemory: resource.MustParse("100Gi"), + corev1.ResourcePods: resource.MustParse("5"), + }, + Used: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("1"), + corev1.ResourceMemory: resource.MustParse("50Gi"), + corev1.ResourcePods: resource.MustParse("3"), + }, + }, + } + stopCh := make(chan struct{}) + defer close(stopCh) + + kubeClient := fake.NewSimpleClientset(resourceQuotaTerminating, resourceQuotaNonTerminating) + informerFactory := informers.NewSharedInformerFactory(kubeClient, 0) + + handler, err := createHandler(kubeClient, informerFactory, stopCh) + if err != nil { + t.Errorf("Error occurred while creating admission plugin: %v", err) + } + + err = informerFactory.Core().V1().ResourceQuotas().Informer().GetIndexer().Add(resourceQuotaNonTerminating) + if err != nil { + t.Errorf("Error occurred while adding resource quota to the indexer: %v", err) + } + err = informerFactory.Core().V1().ResourceQuotas().Informer().GetIndexer().Add(resourceQuotaTerminating) + if err != nil { + t.Errorf("Error occurred while adding resource quota to the indexer: %v", err) + } + + // old pod belonged to the non-terminating scope, but updated version belongs to the terminating scope + existingPod := validPod("allowed-pod", 1, getResourceRequirements(getResourceList("100m", "2Gi"), getResourceList("", ""))) + existingPod.ResourceVersion = "1" + newPod := validPod("allowed-pod", 1, getResourceRequirements(getResourceList("100m", "2Gi"), getResourceList("", ""))) + activeDeadlineSeconds := int64(30) + newPod.Spec.ActiveDeadlineSeconds = &activeDeadlineSeconds + err = handler.Validate(context.TODO(), admission.NewAttributesRecord(newPod, existingPod, api.Kind("Pod").WithVersion("version"), newPod.Namespace, newPod.Name, corev1.Resource("pods").WithVersion("version"), "", admission.Update, &metav1.CreateOptions{}, false, nil), nil) + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + if len(kubeClient.Actions()) == 0 { + t.Errorf("Expected a client action") + } + + expectedActionSet := sets.NewString( + strings.Join([]string{"update", "resourcequotas", "status"}, "-"), + ) + actionSet := sets.NewString() + for _, action := range kubeClient.Actions() { + actionSet.Insert(strings.Join([]string{action.GetVerb(), action.GetResource().Resource, action.GetSubresource()}, "-")) + } + if !actionSet.HasAll(expectedActionSet.List()...) { + t.Errorf("Expected actions:\n%v\n but got:\n%v\nDifference:\n%v", expectedActionSet, actionSet, expectedActionSet.Difference(actionSet)) + } + + decimatedActions := removeListWatch(kubeClient.Actions()) + lastActionIndex := len(decimatedActions) - 1 + usage := decimatedActions[lastActionIndex].(testcore.UpdateAction).GetObject().(*corev1.ResourceQuota) + + // ensure only the quota-terminating was updated + if usage.Name != resourceQuotaTerminating.Name { + t.Errorf("Incremented the wrong quota, expected %v, actual %v", resourceQuotaTerminating.Name, usage.Name) + } + + expectedUsage := corev1.ResourceQuota{ + Status: corev1.ResourceQuotaStatus{ + Hard: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("3"), + corev1.ResourceMemory: resource.MustParse("100Gi"), + corev1.ResourcePods: resource.MustParse("5"), + }, + Used: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("1100m"), + corev1.ResourceMemory: resource.MustParse("52Gi"), + corev1.ResourcePods: resource.MustParse("4"), + }, + }, + } + for k, v := range expectedUsage.Status.Used { + actual := usage.Status.Used[k] + actualValue := actual.String() + expectedValue := v.String() + if expectedValue != actualValue { + t.Errorf("Usage Used: Key: %v, Expected: %v, Actual: %v", k, expectedValue, actualValue) + } + } +} + // TestAdmitBelowBestEffortQuotaLimit creates a best effort and non-best effort quota. // It verifies that best effort pods are properly scoped to the best effort quota document. func TestAdmitBelowBestEffortQuotaLimit(t *testing.T) { diff --git a/staging/src/k8s.io/apiserver/pkg/admission/plugin/resourcequota/controller.go b/staging/src/k8s.io/apiserver/pkg/admission/plugin/resourcequota/controller.go index 9a54c40b24f..a49692a4140 100644 --- a/staging/src/k8s.io/apiserver/pkg/admission/plugin/resourcequota/controller.go +++ b/staging/src/k8s.io/apiserver/pkg/admission/plugin/resourcequota/controller.go @@ -492,16 +492,26 @@ func CheckRequest(quotas []corev1.ResourceQuota, a admission.Attributes, evaluat // as a result, we need to measure the usage of this object for quota // on updates, we need to subtract the previous measured usage // if usage shows no change, just return since it has no impact on quota - deltaUsage, err := evaluator.Usage(inputObject) + inputUsage, err := evaluator.Usage(inputObject) if err != nil { return quotas, err } // ensure that usage for input object is never negative (this would mean a resource made a negative resource requirement) - if negativeUsage := quota.IsNegative(deltaUsage); len(negativeUsage) > 0 { + if negativeUsage := quota.IsNegative(inputUsage); len(negativeUsage) > 0 { return nil, admission.NewForbidden(a, fmt.Errorf("quota usage is negative for resource(s): %s", prettyPrintResourceNames(negativeUsage))) } + // initialize a map of delta usage for each interesting quota index. + deltaUsageIndexMap := make(map[int]corev1.ResourceList, len(interestingQuotaIndexes)) + for _, index := range interestingQuotaIndexes { + deltaUsageIndexMap[index] = inputUsage + } + var deltaUsageWhenNoInterestingQuota corev1.ResourceList + if admission.Create == a.GetOperation() && len(interestingQuotaIndexes) == 0 { + deltaUsageWhenNoInterestingQuota = inputUsage + } + if admission.Update == a.GetOperation() { prevItem := a.GetOldObject() if prevItem == nil { @@ -511,20 +521,55 @@ func CheckRequest(quotas []corev1.ResourceQuota, a admission.Attributes, evaluat // if we can definitively determine that this is not a case of "create on update", // then charge based on the delta. Otherwise, bill the maximum metadata, err := meta.Accessor(prevItem) - if err == nil && len(metadata.GetResourceVersion()) > 0 { - prevUsage, innerErr := evaluator.Usage(prevItem) - if innerErr != nil { - return quotas, innerErr + if err == nil { + if len(metadata.GetResourceVersion()) > 0 { + prevUsage, innerErr := evaluator.Usage(prevItem) + if innerErr != nil { + return quotas, innerErr + } + + deltaUsage := quota.SubtractWithNonNegativeResult(inputUsage, prevUsage) + if len(interestingQuotaIndexes) == 0 { + deltaUsageWhenNoInterestingQuota = deltaUsage + } + + for _, index := range interestingQuotaIndexes { + resourceQuota := quotas[index] + match, err := evaluator.Matches(&resourceQuota, prevItem) + if err != nil { + klog.ErrorS(err, "Error occurred while matching resource quota against the existing object", + "resourceQuota", resourceQuota) + return quotas, err + } + if match { + deltaUsageIndexMap[index] = deltaUsage + } + } + } else if len(interestingQuotaIndexes) == 0 { + deltaUsageWhenNoInterestingQuota = inputUsage } - deltaUsage = quota.SubtractWithNonNegativeResult(deltaUsage, prevUsage) } } - // ignore items in deltaUsage with zero usage - deltaUsage = quota.RemoveZeros(deltaUsage) + // ignore items in deltaUsageIndexMap with zero usage, + // as they will not impact the quota. + for index := range deltaUsageIndexMap { + deltaUsageIndexMap[index] = quota.RemoveZeros(deltaUsageIndexMap[index]) + if len(deltaUsageIndexMap[index]) == 0 { + delete(deltaUsageIndexMap, index) + } + } + // if there is no remaining non-zero usage, short-circuit and return - if len(deltaUsage) == 0 { - return quotas, nil + if len(interestingQuotaIndexes) != 0 { + if len(deltaUsageIndexMap) == 0 { + return quotas, nil + } + } else { + deltaUsage := quota.RemoveZeros(deltaUsageWhenNoInterestingQuota) + if len(deltaUsage) == 0 { + return quotas, nil + } } // verify that for every resource that had limited by default consumption @@ -557,6 +602,10 @@ func CheckRequest(quotas []corev1.ResourceQuota, a admission.Attributes, evaluat for _, index := range interestingQuotaIndexes { resourceQuota := outQuotas[index] + deltaUsage, ok := deltaUsageIndexMap[index] + if !ok { + continue + } hardResources := quota.ResourceNames(resourceQuota.Status.Hard) requestedUsage := quota.Mask(deltaUsage, hardResources) diff --git a/test/e2e/apimachinery/resource_quota.go b/test/e2e/apimachinery/resource_quota.go index 03a52576596..fcf8a3df77b 100644 --- a/test/e2e/apimachinery/resource_quota.go +++ b/test/e2e/apimachinery/resource_quota.go @@ -1367,16 +1367,23 @@ var _ = SIGDescribe("ResourceQuota", func() { err = waitForResourceQuota(ctx, f.ClientSet, f.Namespace.Name, resourceQuotaNotTerminating.Name, usedResources) framework.ExpectNoError(err) - ginkgo.By("Creating a long running pod") - podName := "test-pod" requests := v1.ResourceList{} requests[v1.ResourceCPU] = resource.MustParse("500m") requests[v1.ResourceMemory] = resource.MustParse("200Mi") limits := v1.ResourceList{} limits[v1.ResourceCPU] = resource.MustParse("1") limits[v1.ResourceMemory] = resource.MustParse("400Mi") - pod := newTestPodForQuota(f, podName, requests, limits) - _, err = f.ClientSet.CoreV1().Pods(f.Namespace.Name).Create(ctx, pod, metav1.CreateOptions{}) + + podName1 := "test-pod" + pod1 := newTestPodForQuota(f, podName1, requests, limits) + + podName2 := "terminating-pod" + pod2 := newTestPodForQuota(f, podName2, requests, limits) + activeDeadlineSeconds := int64(3600) + pod2.Spec.ActiveDeadlineSeconds = &activeDeadlineSeconds + + ginkgo.By("Creating a long running pod") + _, err = f.ClientSet.CoreV1().Pods(f.Namespace.Name).Create(ctx, pod1, metav1.CreateOptions{}) framework.ExpectNoError(err) ginkgo.By("Ensuring resource quota with not terminating scope captures the pod usage") @@ -1397,8 +1404,42 @@ var _ = SIGDescribe("ResourceQuota", func() { err = waitForResourceQuota(ctx, f.ClientSet, f.Namespace.Name, resourceQuotaTerminating.Name, usedResources) framework.ExpectNoError(err) + ginkgo.By("Updating the pod to have an active deadline") + err = retry.RetryOnConflict(retry.DefaultRetry, func() error { + gotPod, err := f.ClientSet.CoreV1().Pods(f.Namespace.Name).Get(ctx, podName1, metav1.GetOptions{}) + if err != nil { + return err + } + gotPod.Spec.ActiveDeadlineSeconds = &activeDeadlineSeconds + _, err = f.ClientSet.CoreV1().Pods(f.Namespace.Name).Update(ctx, gotPod, metav1.UpdateOptions{}) + return err + }) + framework.ExpectNoError(err) + + ginkgo.By("Creating second terminating pod") + _, err = f.ClientSet.CoreV1().Pods(f.Namespace.Name).Create(ctx, pod2, metav1.CreateOptions{}) + gomega.Expect(err).To(gomega.MatchError(apierrors.IsForbidden, "expect a forbidden error when creating a Pod that exceeds quota")) + + ginkgo.By("Ensuring resource quota with terminating scope captures the pod usage") + usedResources[v1.ResourcePods] = resource.MustParse("1") + usedResources[v1.ResourceRequestsCPU] = requests[v1.ResourceCPU] + usedResources[v1.ResourceRequestsMemory] = requests[v1.ResourceMemory] + usedResources[v1.ResourceLimitsCPU] = limits[v1.ResourceCPU] + usedResources[v1.ResourceLimitsMemory] = limits[v1.ResourceMemory] + err = waitForResourceQuota(ctx, f.ClientSet, f.Namespace.Name, resourceQuotaTerminating.Name, usedResources) + framework.ExpectNoError(err) + + ginkgo.By("Ensuring resource quota with not terminating scope ignored the pod usage") + usedResources[v1.ResourcePods] = resource.MustParse("0") + usedResources[v1.ResourceRequestsCPU] = resource.MustParse("0") + usedResources[v1.ResourceRequestsMemory] = resource.MustParse("0") + usedResources[v1.ResourceLimitsCPU] = resource.MustParse("0") + usedResources[v1.ResourceLimitsMemory] = resource.MustParse("0") + err = waitForResourceQuota(ctx, f.ClientSet, f.Namespace.Name, resourceQuotaNotTerminating.Name, usedResources) + framework.ExpectNoError(err) + ginkgo.By("Deleting the pod") - err = f.ClientSet.CoreV1().Pods(f.Namespace.Name).Delete(ctx, podName, *metav1.NewDeleteOptions(0)) + err = f.ClientSet.CoreV1().Pods(f.Namespace.Name).Delete(ctx, podName1, *metav1.NewDeleteOptions(0)) framework.ExpectNoError(err) ginkgo.By("Ensuring resource quota status released the pod usage") @@ -1411,11 +1452,7 @@ var _ = SIGDescribe("ResourceQuota", func() { framework.ExpectNoError(err) ginkgo.By("Creating a terminating pod") - podName = "terminating-pod" - pod = newTestPodForQuota(f, podName, requests, limits) - activeDeadlineSeconds := int64(3600) - pod.Spec.ActiveDeadlineSeconds = &activeDeadlineSeconds - _, err = f.ClientSet.CoreV1().Pods(f.Namespace.Name).Create(ctx, pod, metav1.CreateOptions{}) + _, err = f.ClientSet.CoreV1().Pods(f.Namespace.Name).Create(ctx, pod2, metav1.CreateOptions{}) framework.ExpectNoError(err) ginkgo.By("Ensuring resource quota with terminating scope captures the pod usage") @@ -1437,7 +1474,7 @@ var _ = SIGDescribe("ResourceQuota", func() { framework.ExpectNoError(err) ginkgo.By("Deleting the pod") - err = f.ClientSet.CoreV1().Pods(f.Namespace.Name).Delete(ctx, podName, *metav1.NewDeleteOptions(0)) + err = f.ClientSet.CoreV1().Pods(f.Namespace.Name).Delete(ctx, podName2, *metav1.NewDeleteOptions(0)) framework.ExpectNoError(err) ginkgo.By("Ensuring resource quota status released the pod usage") @@ -1874,6 +1911,7 @@ func newTestResourceQuotaWithScopeSelector(name string, scope v1.ResourceQuotaSc hard[v1.ResourcePods] = resource.MustParse("5") switch scope { case v1.ResourceQuotaScopeTerminating, v1.ResourceQuotaScopeNotTerminating: + hard[v1.ResourcePods] = resource.MustParse("1") hard[v1.ResourceRequestsCPU] = resource.MustParse("1") hard[v1.ResourceRequestsMemory] = resource.MustParse("500Mi") hard[v1.ResourceLimitsCPU] = resource.MustParse("2")