Fix a bug where the ResourceQuota admission plugin does not respect ANY scope change when a resource is being updated. i.e. to set/unset an existing pod's terminationGracePeriodSeconds field.

This commit is contained in:
carlory 2025-02-10 14:30:50 +08:00
parent 69ab91a5c5
commit eb0f003d25
6 changed files with 267 additions and 25 deletions

View File

@ -170,6 +170,17 @@ func (p *podEvaluator) Handles(a admission.Attributes) bool {
op := a.GetOperation()
switch a.GetSubresource() {
case "":
if op == admission.Update {
pod, err1 := toExternalPodOrError(a.GetObject())
oldPod, err2 := toExternalPodOrError(a.GetOldObject())
if err1 != nil || err2 != nil {
return false
}
// when scope changed
if IsTerminating(oldPod) != IsTerminating(pod) {
return true
}
}
return op == admission.Create
case "resize":
return op == admission.Update
@ -333,9 +344,9 @@ func podMatchesScopeFunc(selector corev1.ScopedResourceSelectorRequirement, obje
}
switch selector.ScopeName {
case corev1.ResourceQuotaScopeTerminating:
return isTerminating(pod), nil
return IsTerminating(pod), nil
case corev1.ResourceQuotaScopeNotTerminating:
return !isTerminating(pod), nil
return !IsTerminating(pod), nil
case corev1.ResourceQuotaScopeBestEffort:
return isBestEffort(pod), nil
case corev1.ResourceQuotaScopeNotBestEffort:
@ -393,7 +404,7 @@ func isBestEffort(pod *corev1.Pod) bool {
return qos.GetPodQOS(pod) == corev1.PodQOSBestEffort
}
func isTerminating(pod *corev1.Pod) bool {
func IsTerminating(pod *corev1.Pod) bool {
if pod.Spec.ActiveDeadlineSeconds != nil && *pod.Spec.ActiveDeadlineSeconds >= int64(0) {
return true
}

View File

@ -38,6 +38,7 @@ import (
"k8s.io/kubernetes/pkg/util/node"
"k8s.io/utils/clock"
testingclock "k8s.io/utils/clock/testing"
"k8s.io/utils/ptr"
)
func TestPodConstraintsFunc(t *testing.T) {
@ -1297,6 +1298,21 @@ func TestPodEvaluatorHandles(t *testing.T) {
attrs: admission.NewAttributesRecord(nil, nil, schema.GroupVersionKind{Group: "core", Version: "v1", Kind: "Pod"}, "", "", schema.GroupVersionResource{Group: "core", Version: "v1", Resource: "pods"}, "", admission.Create, nil, false, nil),
want: true,
},
{
name: "update-activeDeadlineSeconds-to-nil",
attrs: admission.NewAttributesRecord(&corev1.Pod{}, &corev1.Pod{Spec: corev1.PodSpec{ActiveDeadlineSeconds: ptr.To[int64](1)}}, schema.GroupVersionKind{Group: "core", Version: "v1", Kind: "Pod"}, "", "", schema.GroupVersionResource{Group: "core", Version: "v1", Resource: "pods"}, "", admission.Update, nil, false, nil),
want: true,
},
{
name: "update-activeDeadlineSeconds-from-nil",
attrs: admission.NewAttributesRecord(&corev1.Pod{Spec: corev1.PodSpec{ActiveDeadlineSeconds: ptr.To[int64](1)}}, &corev1.Pod{}, schema.GroupVersionKind{Group: "core", Version: "v1", Kind: "Pod"}, "", "", schema.GroupVersionResource{Group: "core", Version: "v1", Resource: "pods"}, "", admission.Update, nil, false, nil),
want: true,
},
{
name: "update-activeDeadlineSeconds-with-different-values",
attrs: admission.NewAttributesRecord(&corev1.Pod{Spec: corev1.PodSpec{ActiveDeadlineSeconds: ptr.To[int64](1)}}, &corev1.Pod{Spec: corev1.PodSpec{ActiveDeadlineSeconds: ptr.To[int64](2)}}, schema.GroupVersionKind{Group: "core", Version: "v1", Kind: "Pod"}, "", "", schema.GroupVersionResource{Group: "core", Version: "v1", Resource: "pods"}, "", admission.Update, nil, false, nil),
want: false,
},
{
name: "update",
attrs: admission.NewAttributesRecord(nil, nil, schema.GroupVersionKind{Group: "core", Version: "v1", Kind: "Pod"}, "", "", schema.GroupVersionResource{Group: "core", Version: "v1", Resource: "pods"}, "", admission.Update, nil, false, nil),

View File

@ -38,6 +38,12 @@ func DefaultUpdateFilter() func(resource schema.GroupVersionResource, oldObj, ne
if feature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) && hasResourcesChanged(oldPod, newPod) {
return true
}
// when scope changed
if core.IsTerminating(oldPod) != core.IsTerminating(newPod) {
return true
}
return core.QuotaV1Pod(oldPod, clock.RealClock{}) && !core.QuotaV1Pod(newPod, clock.RealClock{})
case schema.GroupResource{Resource: "services"}:
oldService := oldObj.(*v1.Service)

View File

@ -860,6 +860,128 @@ func TestAdmitBelowTerminatingQuotaLimit(t *testing.T) {
}
}
// TestAdmitBelowTerminatingQuotaLimitWhenPodScopeUpdated ensures that terminating pods are charged to the right quota.
// It creates a terminating and non-terminating quota, and changes an existing pod to be terminating.
// It ensures that the terminating quota is incremented, and the non-terminating quota is not.
//
// The Quota admission is intended to fail "high" in this case, and depends on a controller reconciling actual persisted
// use to lower / free the reserved quota. We need always overcount in the admission plugin if something later causes
// the request to be rejected, so you can not reduce quota with requests that aren't completed.
func TestAdmitBelowTerminatingQuotaLimitWhenPodScopeUpdated(t *testing.T) {
resourceQuotaNonTerminating := &corev1.ResourceQuota{
ObjectMeta: metav1.ObjectMeta{Name: "quota-non-terminating", Namespace: "test", ResourceVersion: "124"},
Spec: corev1.ResourceQuotaSpec{
Scopes: []corev1.ResourceQuotaScope{corev1.ResourceQuotaScopeNotTerminating},
},
Status: corev1.ResourceQuotaStatus{
Hard: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("3"),
corev1.ResourceMemory: resource.MustParse("100Gi"),
corev1.ResourcePods: resource.MustParse("5"),
},
Used: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("1"),
corev1.ResourceMemory: resource.MustParse("50Gi"),
corev1.ResourcePods: resource.MustParse("3"),
},
},
}
resourceQuotaTerminating := &corev1.ResourceQuota{
ObjectMeta: metav1.ObjectMeta{Name: "quota-terminating", Namespace: "test", ResourceVersion: "124"},
Spec: corev1.ResourceQuotaSpec{
Scopes: []corev1.ResourceQuotaScope{corev1.ResourceQuotaScopeTerminating},
},
Status: corev1.ResourceQuotaStatus{
Hard: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("3"),
corev1.ResourceMemory: resource.MustParse("100Gi"),
corev1.ResourcePods: resource.MustParse("5"),
},
Used: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("1"),
corev1.ResourceMemory: resource.MustParse("50Gi"),
corev1.ResourcePods: resource.MustParse("3"),
},
},
}
stopCh := make(chan struct{})
defer close(stopCh)
kubeClient := fake.NewSimpleClientset(resourceQuotaTerminating, resourceQuotaNonTerminating)
informerFactory := informers.NewSharedInformerFactory(kubeClient, 0)
handler, err := createHandler(kubeClient, informerFactory, stopCh)
if err != nil {
t.Errorf("Error occurred while creating admission plugin: %v", err)
}
err = informerFactory.Core().V1().ResourceQuotas().Informer().GetIndexer().Add(resourceQuotaNonTerminating)
if err != nil {
t.Errorf("Error occurred while adding resource quota to the indexer: %v", err)
}
err = informerFactory.Core().V1().ResourceQuotas().Informer().GetIndexer().Add(resourceQuotaTerminating)
if err != nil {
t.Errorf("Error occurred while adding resource quota to the indexer: %v", err)
}
// old pod belonged to the non-terminating scope, but updated version belongs to the terminating scope
existingPod := validPod("allowed-pod", 1, getResourceRequirements(getResourceList("100m", "2Gi"), getResourceList("", "")))
existingPod.ResourceVersion = "1"
newPod := validPod("allowed-pod", 1, getResourceRequirements(getResourceList("100m", "2Gi"), getResourceList("", "")))
activeDeadlineSeconds := int64(30)
newPod.Spec.ActiveDeadlineSeconds = &activeDeadlineSeconds
err = handler.Validate(context.TODO(), admission.NewAttributesRecord(newPod, existingPod, api.Kind("Pod").WithVersion("version"), newPod.Namespace, newPod.Name, corev1.Resource("pods").WithVersion("version"), "", admission.Update, &metav1.CreateOptions{}, false, nil), nil)
if err != nil {
t.Errorf("Unexpected error: %v", err)
}
if len(kubeClient.Actions()) == 0 {
t.Errorf("Expected a client action")
}
expectedActionSet := sets.NewString(
strings.Join([]string{"update", "resourcequotas", "status"}, "-"),
)
actionSet := sets.NewString()
for _, action := range kubeClient.Actions() {
actionSet.Insert(strings.Join([]string{action.GetVerb(), action.GetResource().Resource, action.GetSubresource()}, "-"))
}
if !actionSet.HasAll(expectedActionSet.List()...) {
t.Errorf("Expected actions:\n%v\n but got:\n%v\nDifference:\n%v", expectedActionSet, actionSet, expectedActionSet.Difference(actionSet))
}
decimatedActions := removeListWatch(kubeClient.Actions())
lastActionIndex := len(decimatedActions) - 1
usage := decimatedActions[lastActionIndex].(testcore.UpdateAction).GetObject().(*corev1.ResourceQuota)
// ensure only the quota-terminating was updated
if usage.Name != resourceQuotaTerminating.Name {
t.Errorf("Incremented the wrong quota, expected %v, actual %v", resourceQuotaTerminating.Name, usage.Name)
}
expectedUsage := corev1.ResourceQuota{
Status: corev1.ResourceQuotaStatus{
Hard: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("3"),
corev1.ResourceMemory: resource.MustParse("100Gi"),
corev1.ResourcePods: resource.MustParse("5"),
},
Used: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("1100m"),
corev1.ResourceMemory: resource.MustParse("52Gi"),
corev1.ResourcePods: resource.MustParse("4"),
},
},
}
for k, v := range expectedUsage.Status.Used {
actual := usage.Status.Used[k]
actualValue := actual.String()
expectedValue := v.String()
if expectedValue != actualValue {
t.Errorf("Usage Used: Key: %v, Expected: %v, Actual: %v", k, expectedValue, actualValue)
}
}
}
// TestAdmitBelowBestEffortQuotaLimit creates a best effort and non-best effort quota.
// It verifies that best effort pods are properly scoped to the best effort quota document.
func TestAdmitBelowBestEffortQuotaLimit(t *testing.T) {

View File

@ -492,16 +492,26 @@ func CheckRequest(quotas []corev1.ResourceQuota, a admission.Attributes, evaluat
// as a result, we need to measure the usage of this object for quota
// on updates, we need to subtract the previous measured usage
// if usage shows no change, just return since it has no impact on quota
deltaUsage, err := evaluator.Usage(inputObject)
inputUsage, err := evaluator.Usage(inputObject)
if err != nil {
return quotas, err
}
// ensure that usage for input object is never negative (this would mean a resource made a negative resource requirement)
if negativeUsage := quota.IsNegative(deltaUsage); len(negativeUsage) > 0 {
if negativeUsage := quota.IsNegative(inputUsage); len(negativeUsage) > 0 {
return nil, admission.NewForbidden(a, fmt.Errorf("quota usage is negative for resource(s): %s", prettyPrintResourceNames(negativeUsage)))
}
// initialize a map of delta usage for each interesting quota index.
deltaUsageIndexMap := make(map[int]corev1.ResourceList, len(interestingQuotaIndexes))
for _, index := range interestingQuotaIndexes {
deltaUsageIndexMap[index] = inputUsage
}
var deltaUsageWhenNoInterestingQuota corev1.ResourceList
if admission.Create == a.GetOperation() && len(interestingQuotaIndexes) == 0 {
deltaUsageWhenNoInterestingQuota = inputUsage
}
if admission.Update == a.GetOperation() {
prevItem := a.GetOldObject()
if prevItem == nil {
@ -511,20 +521,55 @@ func CheckRequest(quotas []corev1.ResourceQuota, a admission.Attributes, evaluat
// if we can definitively determine that this is not a case of "create on update",
// then charge based on the delta. Otherwise, bill the maximum
metadata, err := meta.Accessor(prevItem)
if err == nil && len(metadata.GetResourceVersion()) > 0 {
prevUsage, innerErr := evaluator.Usage(prevItem)
if innerErr != nil {
return quotas, innerErr
if err == nil {
if len(metadata.GetResourceVersion()) > 0 {
prevUsage, innerErr := evaluator.Usage(prevItem)
if innerErr != nil {
return quotas, innerErr
}
deltaUsage := quota.SubtractWithNonNegativeResult(inputUsage, prevUsage)
if len(interestingQuotaIndexes) == 0 {
deltaUsageWhenNoInterestingQuota = deltaUsage
}
for _, index := range interestingQuotaIndexes {
resourceQuota := quotas[index]
match, err := evaluator.Matches(&resourceQuota, prevItem)
if err != nil {
klog.ErrorS(err, "Error occurred while matching resource quota against the existing object",
"resourceQuota", resourceQuota)
return quotas, err
}
if match {
deltaUsageIndexMap[index] = deltaUsage
}
}
} else if len(interestingQuotaIndexes) == 0 {
deltaUsageWhenNoInterestingQuota = inputUsage
}
deltaUsage = quota.SubtractWithNonNegativeResult(deltaUsage, prevUsage)
}
}
// ignore items in deltaUsage with zero usage
deltaUsage = quota.RemoveZeros(deltaUsage)
// ignore items in deltaUsageIndexMap with zero usage,
// as they will not impact the quota.
for index := range deltaUsageIndexMap {
deltaUsageIndexMap[index] = quota.RemoveZeros(deltaUsageIndexMap[index])
if len(deltaUsageIndexMap[index]) == 0 {
delete(deltaUsageIndexMap, index)
}
}
// if there is no remaining non-zero usage, short-circuit and return
if len(deltaUsage) == 0 {
return quotas, nil
if len(interestingQuotaIndexes) != 0 {
if len(deltaUsageIndexMap) == 0 {
return quotas, nil
}
} else {
deltaUsage := quota.RemoveZeros(deltaUsageWhenNoInterestingQuota)
if len(deltaUsage) == 0 {
return quotas, nil
}
}
// verify that for every resource that had limited by default consumption
@ -557,6 +602,10 @@ func CheckRequest(quotas []corev1.ResourceQuota, a admission.Attributes, evaluat
for _, index := range interestingQuotaIndexes {
resourceQuota := outQuotas[index]
deltaUsage, ok := deltaUsageIndexMap[index]
if !ok {
continue
}
hardResources := quota.ResourceNames(resourceQuota.Status.Hard)
requestedUsage := quota.Mask(deltaUsage, hardResources)

View File

@ -1367,16 +1367,23 @@ var _ = SIGDescribe("ResourceQuota", func() {
err = waitForResourceQuota(ctx, f.ClientSet, f.Namespace.Name, resourceQuotaNotTerminating.Name, usedResources)
framework.ExpectNoError(err)
ginkgo.By("Creating a long running pod")
podName := "test-pod"
requests := v1.ResourceList{}
requests[v1.ResourceCPU] = resource.MustParse("500m")
requests[v1.ResourceMemory] = resource.MustParse("200Mi")
limits := v1.ResourceList{}
limits[v1.ResourceCPU] = resource.MustParse("1")
limits[v1.ResourceMemory] = resource.MustParse("400Mi")
pod := newTestPodForQuota(f, podName, requests, limits)
_, err = f.ClientSet.CoreV1().Pods(f.Namespace.Name).Create(ctx, pod, metav1.CreateOptions{})
podName1 := "test-pod"
pod1 := newTestPodForQuota(f, podName1, requests, limits)
podName2 := "terminating-pod"
pod2 := newTestPodForQuota(f, podName2, requests, limits)
activeDeadlineSeconds := int64(3600)
pod2.Spec.ActiveDeadlineSeconds = &activeDeadlineSeconds
ginkgo.By("Creating a long running pod")
_, err = f.ClientSet.CoreV1().Pods(f.Namespace.Name).Create(ctx, pod1, metav1.CreateOptions{})
framework.ExpectNoError(err)
ginkgo.By("Ensuring resource quota with not terminating scope captures the pod usage")
@ -1397,8 +1404,42 @@ var _ = SIGDescribe("ResourceQuota", func() {
err = waitForResourceQuota(ctx, f.ClientSet, f.Namespace.Name, resourceQuotaTerminating.Name, usedResources)
framework.ExpectNoError(err)
ginkgo.By("Updating the pod to have an active deadline")
err = retry.RetryOnConflict(retry.DefaultRetry, func() error {
gotPod, err := f.ClientSet.CoreV1().Pods(f.Namespace.Name).Get(ctx, podName1, metav1.GetOptions{})
if err != nil {
return err
}
gotPod.Spec.ActiveDeadlineSeconds = &activeDeadlineSeconds
_, err = f.ClientSet.CoreV1().Pods(f.Namespace.Name).Update(ctx, gotPod, metav1.UpdateOptions{})
return err
})
framework.ExpectNoError(err)
ginkgo.By("Creating second terminating pod")
_, err = f.ClientSet.CoreV1().Pods(f.Namespace.Name).Create(ctx, pod2, metav1.CreateOptions{})
gomega.Expect(err).To(gomega.MatchError(apierrors.IsForbidden, "expect a forbidden error when creating a Pod that exceeds quota"))
ginkgo.By("Ensuring resource quota with terminating scope captures the pod usage")
usedResources[v1.ResourcePods] = resource.MustParse("1")
usedResources[v1.ResourceRequestsCPU] = requests[v1.ResourceCPU]
usedResources[v1.ResourceRequestsMemory] = requests[v1.ResourceMemory]
usedResources[v1.ResourceLimitsCPU] = limits[v1.ResourceCPU]
usedResources[v1.ResourceLimitsMemory] = limits[v1.ResourceMemory]
err = waitForResourceQuota(ctx, f.ClientSet, f.Namespace.Name, resourceQuotaTerminating.Name, usedResources)
framework.ExpectNoError(err)
ginkgo.By("Ensuring resource quota with not terminating scope ignored the pod usage")
usedResources[v1.ResourcePods] = resource.MustParse("0")
usedResources[v1.ResourceRequestsCPU] = resource.MustParse("0")
usedResources[v1.ResourceRequestsMemory] = resource.MustParse("0")
usedResources[v1.ResourceLimitsCPU] = resource.MustParse("0")
usedResources[v1.ResourceLimitsMemory] = resource.MustParse("0")
err = waitForResourceQuota(ctx, f.ClientSet, f.Namespace.Name, resourceQuotaNotTerminating.Name, usedResources)
framework.ExpectNoError(err)
ginkgo.By("Deleting the pod")
err = f.ClientSet.CoreV1().Pods(f.Namespace.Name).Delete(ctx, podName, *metav1.NewDeleteOptions(0))
err = f.ClientSet.CoreV1().Pods(f.Namespace.Name).Delete(ctx, podName1, *metav1.NewDeleteOptions(0))
framework.ExpectNoError(err)
ginkgo.By("Ensuring resource quota status released the pod usage")
@ -1411,11 +1452,7 @@ var _ = SIGDescribe("ResourceQuota", func() {
framework.ExpectNoError(err)
ginkgo.By("Creating a terminating pod")
podName = "terminating-pod"
pod = newTestPodForQuota(f, podName, requests, limits)
activeDeadlineSeconds := int64(3600)
pod.Spec.ActiveDeadlineSeconds = &activeDeadlineSeconds
_, err = f.ClientSet.CoreV1().Pods(f.Namespace.Name).Create(ctx, pod, metav1.CreateOptions{})
_, err = f.ClientSet.CoreV1().Pods(f.Namespace.Name).Create(ctx, pod2, metav1.CreateOptions{})
framework.ExpectNoError(err)
ginkgo.By("Ensuring resource quota with terminating scope captures the pod usage")
@ -1437,7 +1474,7 @@ var _ = SIGDescribe("ResourceQuota", func() {
framework.ExpectNoError(err)
ginkgo.By("Deleting the pod")
err = f.ClientSet.CoreV1().Pods(f.Namespace.Name).Delete(ctx, podName, *metav1.NewDeleteOptions(0))
err = f.ClientSet.CoreV1().Pods(f.Namespace.Name).Delete(ctx, podName2, *metav1.NewDeleteOptions(0))
framework.ExpectNoError(err)
ginkgo.By("Ensuring resource quota status released the pod usage")
@ -1874,6 +1911,7 @@ func newTestResourceQuotaWithScopeSelector(name string, scope v1.ResourceQuotaSc
hard[v1.ResourcePods] = resource.MustParse("5")
switch scope {
case v1.ResourceQuotaScopeTerminating, v1.ResourceQuotaScopeNotTerminating:
hard[v1.ResourcePods] = resource.MustParse("1")
hard[v1.ResourceRequestsCPU] = resource.MustParse("1")
hard[v1.ResourceRequestsMemory] = resource.MustParse("500Mi")
hard[v1.ResourceLimitsCPU] = resource.MustParse("2")