From 32fdb551920270fd14263468235120c816f7be1b Mon Sep 17 00:00:00 2001 From: Michal Wozniak Date: Tue, 10 Oct 2023 12:44:44 +0200 Subject: [PATCH] Use Patch instead of SSA for Pod Disruption condition --- pkg/controller/disruption/disruption.go | 23 ++-- .../nodelifecycle/scheduler/taint_manager.go | 27 ++--- pkg/controller/podgc/gc_controller.go | 33 ++---- pkg/controller/podgc/gc_controller_test.go | 8 +- .../framework/preemption/preemption.go | 35 +++--- .../rbac/bootstrappolicy/controller_policy.go | 2 +- .../testdata/controller-roles.yaml | 1 + test/integration/podgc/podgc_test.go | 111 ++++++++++++++++++ 8 files changed, 167 insertions(+), 73 deletions(-) diff --git a/pkg/controller/disruption/disruption.go b/pkg/controller/disruption/disruption.go index da13ebb23e3..2527e31480e 100644 --- a/pkg/controller/disruption/disruption.go +++ b/pkg/controller/disruption/disruption.go @@ -34,7 +34,6 @@ import ( "k8s.io/apimachinery/pkg/util/intstr" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" - corev1apply "k8s.io/client-go/applyconfigurations/core/v1" "k8s.io/client-go/discovery" appsv1informers "k8s.io/client-go/informers/apps/v1" coreinformers "k8s.io/client-go/informers/core/v1" @@ -74,9 +73,6 @@ const ( // Once the timeout is reached, this controller attempts to set the status // of the condition to False. stalePodDisruptionTimeout = 2 * time.Minute - - // field manager used to disable the pod failure condition - fieldManager = "DisruptionController" ) type updater func(context.Context, *policy.PodDisruptionBudget) error @@ -770,16 +766,15 @@ func (dc *DisruptionController) syncStalePodDisruption(ctx context.Context, key return nil } - podApply := corev1apply.Pod(pod.Name, pod.Namespace). - WithStatus(corev1apply.PodStatus()). - WithResourceVersion(pod.ResourceVersion) - podApply.Status.WithConditions(corev1apply.PodCondition(). - WithType(v1.DisruptionTarget). - WithStatus(v1.ConditionFalse). - WithLastTransitionTime(metav1.Now()), - ) - - if _, err := dc.kubeClient.CoreV1().Pods(pod.Namespace).ApplyStatus(ctx, podApply, metav1.ApplyOptions{FieldManager: fieldManager, Force: true}); err != nil { + newPod := pod.DeepCopy() + updated := apipod.UpdatePodCondition(&newPod.Status, &v1.PodCondition{ + Type: v1.DisruptionTarget, + Status: v1.ConditionFalse, + }) + if !updated { + return nil + } + if _, err := dc.kubeClient.CoreV1().Pods(pod.Namespace).UpdateStatus(ctx, newPod, metav1.UpdateOptions{}); err != nil { return err } logger.V(2).Info("Reset stale DisruptionTarget condition to False", "pod", klog.KObj(pod)) diff --git a/pkg/controller/nodelifecycle/scheduler/taint_manager.go b/pkg/controller/nodelifecycle/scheduler/taint_manager.go index c5083902b2c..65e669e45b5 100644 --- a/pkg/controller/nodelifecycle/scheduler/taint_manager.go +++ b/pkg/controller/nodelifecycle/scheduler/taint_manager.go @@ -31,16 +31,17 @@ import ( "k8s.io/apimachinery/pkg/types" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apiserver/pkg/util/feature" - corev1apply "k8s.io/client-go/applyconfigurations/core/v1" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" v1core "k8s.io/client-go/kubernetes/typed/core/v1" corelisters "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" + apipod "k8s.io/kubernetes/pkg/api/v1/pod" "k8s.io/kubernetes/pkg/apis/core/helper" v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper" "k8s.io/kubernetes/pkg/features" + utilpod "k8s.io/kubernetes/pkg/util/pod" "k8s.io/klog/v2" ) @@ -55,9 +56,6 @@ const ( UpdateWorkerSize = 8 podUpdateChannelSize = 1 retries = 5 - - // fieldManager used to add pod disruption condition when evicting pods due to NoExecute taint - fieldManager = "TaintManager" ) type nodeUpdateItem struct { @@ -127,16 +125,17 @@ func addConditionAndDeletePod(ctx context.Context, c clientset.Interface, name, if err != nil { return err } - podApply := corev1apply.Pod(pod.Name, pod.Namespace).WithStatus(corev1apply.PodStatus()) - podApply.Status.WithConditions(corev1apply.PodCondition(). - WithType(v1.DisruptionTarget). - WithStatus(v1.ConditionTrue). - WithReason("DeletionByTaintManager"). - WithMessage("Taint manager: deleting due to NoExecute taint"). - WithLastTransitionTime(metav1.Now()), - ) - if _, err := c.CoreV1().Pods(pod.Namespace).ApplyStatus(ctx, podApply, metav1.ApplyOptions{FieldManager: fieldManager, Force: true}); err != nil { - return err + newStatus := pod.Status.DeepCopy() + updated := apipod.UpdatePodCondition(newStatus, &v1.PodCondition{ + Type: v1.DisruptionTarget, + Status: v1.ConditionTrue, + Reason: "DeletionByTaintManager", + Message: "Taint manager: deleting due to NoExecute taint", + }) + if updated { + if _, _, _, err := utilpod.PatchPodStatus(ctx, c, pod.Namespace, pod.Name, pod.UID, pod.Status, *newStatus); err != nil { + return err + } } } return c.CoreV1().Pods(ns).Delete(ctx, name, metav1.DeleteOptions{}) diff --git a/pkg/controller/podgc/gc_controller.go b/pkg/controller/podgc/gc_controller.go index 81288dfd6ae..476c764bdc0 100644 --- a/pkg/controller/podgc/gc_controller.go +++ b/pkg/controller/podgc/gc_controller.go @@ -30,17 +30,18 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" utilfeature "k8s.io/apiserver/pkg/util/feature" - corev1apply "k8s.io/client-go/applyconfigurations/core/v1" coreinformers "k8s.io/client-go/informers/core/v1" clientset "k8s.io/client-go/kubernetes" corelisters "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" + apipod "k8s.io/kubernetes/pkg/api/v1/pod" "k8s.io/kubernetes/pkg/controller/podgc/metrics" "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/kubelet/eviction" nodeutil "k8s.io/kubernetes/pkg/util/node" + utilpod "k8s.io/kubernetes/pkg/util/pod" "k8s.io/kubernetes/pkg/util/taints" ) @@ -50,9 +51,6 @@ const ( // quarantineTime defines how long Orphaned GC waits for nodes to show up // in an informer before issuing a GET call to check if they are truly gone quarantineTime = 40 * time.Second - - // field manager used to add pod failure condition and change the pod phase - fieldManager = "PodGC" ) type PodGCController struct { @@ -249,12 +247,12 @@ func (gcc *PodGCController) gcOrphaned(ctx context.Context, pods []*v1.Pod, node continue } logger.V(2).Info("Found orphaned Pod assigned to the Node, deleting", "pod", klog.KObj(pod), "node", klog.KRef("", pod.Spec.NodeName)) - condition := corev1apply.PodCondition(). - WithType(v1.DisruptionTarget). - WithStatus(v1.ConditionTrue). - WithReason("DeletionByPodGC"). - WithMessage("PodGC: node no longer exists"). - WithLastTransitionTime(metav1.Now()) + condition := &v1.PodCondition{ + Type: v1.DisruptionTarget, + Status: v1.ConditionTrue, + Reason: "DeletionByPodGC", + Message: "PodGC: node no longer exists", + } if err := gcc.markFailedAndDeletePodWithCondition(ctx, pod, condition); err != nil { utilruntime.HandleError(err) metrics.DeletingPodsErrorTotal.WithLabelValues(pod.Namespace, metrics.PodGCReasonOrphaned).Inc() @@ -341,7 +339,7 @@ func (gcc *PodGCController) markFailedAndDeletePod(ctx context.Context, pod *v1. return gcc.markFailedAndDeletePodWithCondition(ctx, pod, nil) } -func (gcc *PodGCController) markFailedAndDeletePodWithCondition(ctx context.Context, pod *v1.Pod, condition *corev1apply.PodConditionApplyConfiguration) error { +func (gcc *PodGCController) markFailedAndDeletePodWithCondition(ctx context.Context, pod *v1.Pod, condition *v1.PodCondition) error { logger := klog.FromContext(ctx) logger.Info("PodGC is force deleting Pod", "pod", klog.KObj(pod)) // Patch the pod to make sure it is transitioned to the Failed phase before deletion. @@ -354,17 +352,12 @@ func (gcc *PodGCController) markFailedAndDeletePodWithCondition(ctx context.Cont // is orphaned, in which case the pod would remain in the Running phase // forever as there is no kubelet running to change the phase. if pod.Status.Phase != v1.PodSucceeded && pod.Status.Phase != v1.PodFailed { - podApply := corev1apply.Pod(pod.Name, pod.Namespace).WithStatus(corev1apply.PodStatus()) - // we don't need to extract the pod apply configuration and can send - // only phase and the DisruptionTarget condition as PodGC would not - // own other fields. If the DisruptionTarget condition is owned by - // PodGC it means that it is in the Failed phase, so sending the - // condition will not be re-attempted. - podApply.Status.WithPhase(v1.PodFailed) + newStatus := pod.Status.DeepCopy() + newStatus.Phase = v1.PodFailed if condition != nil && utilfeature.DefaultFeatureGate.Enabled(features.PodDisruptionConditions) { - podApply.Status.WithConditions(condition) + apipod.UpdatePodCondition(newStatus, condition) } - if _, err := gcc.kubeClient.CoreV1().Pods(pod.Namespace).ApplyStatus(ctx, podApply, metav1.ApplyOptions{FieldManager: fieldManager, Force: true}); err != nil { + if _, _, _, err := utilpod.PatchPodStatus(ctx, gcc.kubeClient, pod.Namespace, pod.Name, pod.UID, pod.Status, *newStatus); err != nil { return err } } diff --git a/pkg/controller/podgc/gc_controller_test.go b/pkg/controller/podgc/gc_controller_test.go index 4bc11255613..d78879b0307 100644 --- a/pkg/controller/podgc/gc_controller_test.go +++ b/pkg/controller/podgc/gc_controller_test.go @@ -714,16 +714,16 @@ func TestGCInspectingPatchedPodBeforeDeletion(t *testing.T) { Status: v1.PodStatus{ Phase: v1.PodFailed, Conditions: []v1.PodCondition{ + { + Type: v1.PodReady, + Status: v1.ConditionTrue, + }, { Type: v1.DisruptionTarget, Status: v1.ConditionTrue, Reason: "DeletionByPodGC", Message: "PodGC: node no longer exists", }, - { - Type: v1.PodReady, - Status: v1.ConditionTrue, - }, }, }, }, diff --git a/pkg/scheduler/framework/preemption/preemption.go b/pkg/scheduler/framework/preemption/preemption.go index 5c0a35134a2..f2e91115fc1 100644 --- a/pkg/scheduler/framework/preemption/preemption.go +++ b/pkg/scheduler/framework/preemption/preemption.go @@ -26,16 +26,15 @@ import ( v1 "k8s.io/api/core/v1" policy "k8s.io/api/policy/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" utilerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apiserver/pkg/util/feature" - corev1apply "k8s.io/client-go/applyconfigurations/core/v1" corelisters "k8s.io/client-go/listers/core/v1" policylisters "k8s.io/client-go/listers/policy/v1" corev1helpers "k8s.io/component-helpers/scheduling/corev1" "k8s.io/klog/v2" extenderv1 "k8s.io/kube-scheduler/extender/v1" + apipod "k8s.io/kubernetes/pkg/api/v1/pod" "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework/parallelize" @@ -43,11 +42,6 @@ import ( "k8s.io/kubernetes/pkg/scheduler/util" ) -const ( - // fieldManager used to add pod disruption condition to the victim pods - fieldManager = "KubeScheduler" -) - // Candidate represents a nominated node on which the preemptor can be scheduled, // along with the list of victims that should be evicted for the preemptor to fit the node. type Candidate interface { @@ -362,19 +356,20 @@ func (ev *Evaluator) prepareCandidate(ctx context.Context, c Candidate, pod *v1. klog.V(2).InfoS("Preemptor pod rejected a waiting pod", "preemptor", klog.KObj(pod), "waitingPod", klog.KObj(victim), "node", c.Name()) } else { if feature.DefaultFeatureGate.Enabled(features.PodDisruptionConditions) { - victimPodApply := corev1apply.Pod(victim.Name, victim.Namespace).WithStatus(corev1apply.PodStatus()) - victimPodApply.Status.WithConditions(corev1apply.PodCondition(). - WithType(v1.DisruptionTarget). - WithStatus(v1.ConditionTrue). - WithReason(v1.PodReasonPreemptionByScheduler). - WithMessage(fmt.Sprintf("%s: preempting to accommodate a higher priority pod", pod.Spec.SchedulerName)). - WithLastTransitionTime(metav1.Now()), - ) - - if _, err := cs.CoreV1().Pods(victim.Namespace).ApplyStatus(ctx, victimPodApply, metav1.ApplyOptions{FieldManager: fieldManager, Force: true}); err != nil { - logger.Error(err, "Could not add DisruptionTarget condition due to preemption", "pod", klog.KObj(victim), "preemptor", klog.KObj(pod)) - errCh.SendErrorWithCancel(err, cancel) - return + condition := &v1.PodCondition{ + Type: v1.DisruptionTarget, + Status: v1.ConditionTrue, + Reason: v1.PodReasonPreemptionByScheduler, + Message: fmt.Sprintf("%s: preempting to accommodate a higher priority pod", pod.Spec.SchedulerName), + } + newStatus := pod.Status.DeepCopy() + updated := apipod.UpdatePodCondition(newStatus, condition) + if updated { + if err := util.PatchPodStatus(ctx, cs, victim, newStatus); err != nil { + logger.Error(err, "Could not add DisruptionTarget condition due to preemption", "pod", klog.KObj(victim), "preemptor", klog.KObj(pod)) + errCh.SendErrorWithCancel(err, cancel) + return + } } } if err := util.DeletePod(ctx, cs, victim); err != nil { diff --git a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy.go b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy.go index 70716a9b348..983d41886e1 100644 --- a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy.go +++ b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy.go @@ -140,7 +140,7 @@ func buildControllerRoles() ([]rbacv1.ClusterRole, []rbacv1.ClusterRoleBinding) }, } if utilfeature.DefaultFeatureGate.Enabled(features.PodDisruptionConditions) { - role.Rules = append(role.Rules, rbacv1helpers.NewRule("patch").Groups(legacyGroup).Resources("pods/status").RuleOrDie()) + role.Rules = append(role.Rules, rbacv1helpers.NewRule("patch", "update").Groups(legacyGroup).Resources("pods/status").RuleOrDie()) } return role }()) diff --git a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/testdata/controller-roles.yaml b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/testdata/controller-roles.yaml index 5493b907811..aa886f7017f 100644 --- a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/testdata/controller-roles.yaml +++ b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/testdata/controller-roles.yaml @@ -436,6 +436,7 @@ items: - pods/status verbs: - patch + - update - apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRole metadata: diff --git a/test/integration/podgc/podgc_test.go b/test/integration/podgc/podgc_test.go index 236e350762d..da434f867db 100644 --- a/test/integration/podgc/podgc_test.go +++ b/test/integration/podgc/podgc_test.go @@ -309,6 +309,117 @@ func TestTerminatingOnOutOfServiceNode(t *testing.T) { } } +// TestPodGcForPodsWithDuplicatedFieldKeys regression test for https://issues.k8s.io/118261 +func TestPodGcForPodsWithDuplicatedFieldKeys(t *testing.T) { + tests := map[string]struct { + pod *v1.Pod + wantDisruptionTarget *v1.PodCondition + }{ + "Orphan pod with duplicated env vars": { + pod: &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "testpod", + Finalizers: []string{"test.k8s.io/finalizer"}, + }, + Spec: v1.PodSpec{ + NodeName: "non-existing-node", + Containers: []v1.Container{ + { + Name: "foo", + Image: "bar", + Env: []v1.EnvVar{ + { + Name: "XYZ", + Value: "1", + }, + { + Name: "XYZ", + Value: "2", + }, + }, + }, + }, + }, + }, + wantDisruptionTarget: &v1.PodCondition{ + Type: v1.DisruptionTarget, + Status: v1.ConditionTrue, + Reason: "DeletionByPodGC", + Message: "PodGC: node no longer exists", + }, + }, + "Orphan pod with duplicated ports; scenario from https://issues.k8s.io/113482": { + pod: &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "testpod", + Finalizers: []string{"test.k8s.io/finalizer"}, + }, + Spec: v1.PodSpec{ + NodeName: "non-existing-node", + Containers: []v1.Container{ + { + Name: "foo", + Image: "bar", + Ports: []v1.ContainerPort{ + { + ContainerPort: 93, + HostPort: 9376, + }, + { + ContainerPort: 93, + HostPort: 9377, + }, + }, + }, + }, + }, + }, + wantDisruptionTarget: &v1.PodCondition{ + Type: v1.DisruptionTarget, + Status: v1.ConditionTrue, + Reason: "DeletionByPodGC", + Message: "PodGC: node no longer exists", + }, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.PodDisruptionConditions, true)() + testCtx := setup(t, "podgc-orphaned") + cs := testCtx.ClientSet + + pod := test.pod + pod.Namespace = testCtx.NS.Namespace + pod, err := cs.CoreV1().Pods(testCtx.NS.Name).Create(testCtx.Ctx, pod, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("Error %v, while creating pod: %v", err, klog.KObj(pod)) + } + defer testutils.RemovePodFinalizers(testCtx.Ctx, testCtx.ClientSet, t, *pod) + + // getting evicted due to NodeName being "non-existing-node" + err = wait.PollUntilContextTimeout(testCtx.Ctx, time.Second, time.Second*15, true, testutils.PodIsGettingEvicted(cs, pod.Namespace, pod.Name)) + if err != nil { + t.Fatalf("Error '%v' while waiting for the pod '%v' to be terminating", err, klog.KObj(pod)) + } + pod, err = cs.CoreV1().Pods(testCtx.NS.Name).Get(testCtx.Ctx, pod.Name, metav1.GetOptions{}) + if err != nil { + t.Fatalf("Error: '%v' while updating pod info: '%v'", err, klog.KObj(pod)) + } + _, gotDisruptionTarget := podutil.GetPodCondition(&pod.Status, v1.DisruptionTarget) + if diff := cmp.Diff(test.wantDisruptionTarget, gotDisruptionTarget, cmpopts.IgnoreFields(v1.PodCondition{}, "LastTransitionTime")); diff != "" { + t.Errorf("Pod %v has unexpected DisruptionTarget condition: %s", klog.KObj(pod), diff) + } + if gotDisruptionTarget != nil && gotDisruptionTarget.LastTransitionTime.IsZero() { + t.Errorf("Pod %v has DisruptionTarget condition without LastTransitionTime", klog.KObj(pod)) + } + if pod.Status.Phase != v1.PodFailed { + t.Errorf("Unexpected phase for pod %q. Got: %q, want: %q", klog.KObj(pod), pod.Status.Phase, v1.PodFailed) + } + }) + } +} + func setup(t *testing.T, name string) *testutils.TestContext { testCtx := testutils.InitTestAPIServer(t, name, nil) externalInformers := informers.NewSharedInformerFactory(testCtx.ClientSet, time.Second)