From fea883687feafc597591e15773f8c6f491b35d08 Mon Sep 17 00:00:00 2001 From: Michal Wozniak Date: Mon, 24 Oct 2022 14:55:16 +0200 Subject: [PATCH] SSA to add pod failure conditions - ready for review --- pkg/controller/disruption/disruption.go | 24 ++++---- .../nodelifecycle/scheduler/taint_manager.go | 26 ++++---- pkg/controller/podgc/gc_controller.go | 60 +++++++++++++++---- .../framework/preemption/preemption.go | 34 ++++++----- .../src/k8s.io/client-go/testing/fixture.go | 2 +- 5 files changed, 96 insertions(+), 50 deletions(-) diff --git a/pkg/controller/disruption/disruption.go b/pkg/controller/disruption/disruption.go index 581992fc100..b69eb8c9481 100644 --- a/pkg/controller/disruption/disruption.go +++ b/pkg/controller/disruption/disruption.go @@ -35,6 +35,7 @@ import ( "k8s.io/apimachinery/pkg/util/intstr" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" + corev1apply "k8s.io/client-go/applyconfigurations/core/v1" "k8s.io/client-go/discovery" appsv1informers "k8s.io/client-go/informers/apps/v1" coreinformers "k8s.io/client-go/informers/core/v1" @@ -53,7 +54,6 @@ import ( "k8s.io/klog/v2" apipod "k8s.io/kubernetes/pkg/api/v1/pod" "k8s.io/kubernetes/pkg/controller" - utilpod "k8s.io/kubernetes/pkg/util/pod" "k8s.io/utils/clock" ) @@ -75,6 +75,9 @@ const ( // Once the timeout is reached, this controller attempts to set the status // of the condition to False. stalePodDisruptionTimeout = 2 * time.Minute + + // field manager used to disable the pod failure condition + fieldManager = "DisruptionController" ) type updater func(context.Context, *policy.PodDisruptionBudget) error @@ -748,15 +751,16 @@ func (dc *DisruptionController) syncStalePodDisruption(ctx context.Context, key return nil } - newStatus := pod.Status.DeepCopy() - updated := apipod.UpdatePodCondition(newStatus, &v1.PodCondition{ - Type: v1.AlphaNoCompatGuaranteeDisruptionTarget, - Status: v1.ConditionFalse, - }) - if !updated { - return nil - } - if _, _, _, err := utilpod.PatchPodStatus(ctx, dc.kubeClient, namespace, name, pod.UID, pod.Status, *newStatus); err != nil { + podApply := corev1apply.Pod(pod.Name, pod.Namespace). + WithStatus(corev1apply.PodStatus()). + WithResourceVersion(pod.ResourceVersion) + podApply.Status.WithConditions(corev1apply.PodCondition(). + WithType(v1.AlphaNoCompatGuaranteeDisruptionTarget). + WithStatus(v1.ConditionFalse). + WithLastTransitionTime(metav1.Now()), + ) + + if _, err := dc.kubeClient.CoreV1().Pods(pod.Namespace).ApplyStatus(ctx, podApply, metav1.ApplyOptions{FieldManager: fieldManager, Force: true}); err != nil { return err } klog.V(2).InfoS("Reset stale DisruptionTarget condition to False", "pod", klog.KObj(pod)) diff --git a/pkg/controller/nodelifecycle/scheduler/taint_manager.go b/pkg/controller/nodelifecycle/scheduler/taint_manager.go index b79435d5081..bc50f8eb697 100644 --- a/pkg/controller/nodelifecycle/scheduler/taint_manager.go +++ b/pkg/controller/nodelifecycle/scheduler/taint_manager.go @@ -31,17 +31,16 @@ import ( "k8s.io/apimachinery/pkg/types" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apiserver/pkg/util/feature" + corev1apply "k8s.io/client-go/applyconfigurations/core/v1" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" v1core "k8s.io/client-go/kubernetes/typed/core/v1" corelisters "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" - apipod "k8s.io/kubernetes/pkg/api/v1/pod" "k8s.io/kubernetes/pkg/apis/core/helper" v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper" "k8s.io/kubernetes/pkg/features" - utilpod "k8s.io/kubernetes/pkg/util/pod" "k8s.io/klog/v2" ) @@ -56,6 +55,9 @@ const ( UpdateWorkerSize = 8 podUpdateChannelSize = 1 retries = 5 + + // fieldManager used to add pod disruption condition when evicting pods due to NoExecute taint + fieldManager = "TaintManager" ) type nodeUpdateItem struct { @@ -125,16 +127,16 @@ func addConditionAndDeletePod(ctx context.Context, c clientset.Interface, name, if err != nil { return err } - newStatus := pod.Status.DeepCopy() - if apipod.UpdatePodCondition(newStatus, &v1.PodCondition{ - Type: v1.AlphaNoCompatGuaranteeDisruptionTarget, - Status: v1.ConditionTrue, - Reason: "DeletionByTaintManager", - Message: "Taint manager: deleting due to NoExecute taint", - }) { - if _, _, _, err = utilpod.PatchPodStatus(ctx, c, pod.Namespace, pod.Name, pod.UID, pod.Status, *newStatus); err != nil { - return err - } + podApply := corev1apply.Pod(pod.Name, pod.Namespace).WithStatus(corev1apply.PodStatus()) + podApply.Status.WithConditions(corev1apply.PodCondition(). + WithType(v1.AlphaNoCompatGuaranteeDisruptionTarget). + WithStatus(v1.ConditionTrue). + WithReason("DeletionByTaintManager"). + WithMessage("Taint manager: deleting due to NoExecute taint"). + WithLastTransitionTime(metav1.Now()), + ) + if _, err := c.CoreV1().Pods(pod.Namespace).ApplyStatus(ctx, podApply, metav1.ApplyOptions{FieldManager: fieldManager, Force: true}); err != nil { + return err } } return c.CoreV1().Pods(ns).Delete(ctx, name, metav1.DeleteOptions{}) diff --git a/pkg/controller/podgc/gc_controller.go b/pkg/controller/podgc/gc_controller.go index 0257751319a..afba67b7f48 100644 --- a/pkg/controller/podgc/gc_controller.go +++ b/pkg/controller/podgc/gc_controller.go @@ -30,16 +30,15 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" utilfeature "k8s.io/apiserver/pkg/util/feature" + corev1apply "k8s.io/client-go/applyconfigurations/core/v1" coreinformers "k8s.io/client-go/informers/core/v1" clientset "k8s.io/client-go/kubernetes" corelisters "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" - apipod "k8s.io/kubernetes/pkg/api/v1/pod" "k8s.io/kubernetes/pkg/features" nodeutil "k8s.io/kubernetes/pkg/util/node" - utilpod "k8s.io/kubernetes/pkg/util/pod" "k8s.io/kubernetes/pkg/util/taints" ) @@ -49,6 +48,9 @@ const ( // quarantineTime defines how long Orphaned GC waits for nodes to show up // in an informer before issuing a GET call to check if they are truly gone quarantineTime = 40 * time.Second + + // field manager used to add pod failure condition and change the pod phase + fieldManager = "PodGC" ) type PodGCController struct { @@ -236,12 +238,13 @@ func (gcc *PodGCController) gcOrphaned(ctx context.Context, pods []*v1.Pod, node continue } klog.V(2).InfoS("Found orphaned Pod assigned to the Node, deleting.", "pod", klog.KObj(pod), "node", pod.Spec.NodeName) - condition := &v1.PodCondition{ - Type: v1.AlphaNoCompatGuaranteeDisruptionTarget, - Status: v1.ConditionTrue, - Reason: "DeletionByPodGC", - Message: "PodGC: node no longer exists", - } + condition := corev1apply.PodCondition(). + WithType(v1.AlphaNoCompatGuaranteeDisruptionTarget). + WithStatus(v1.ConditionTrue). + WithReason("DeletionByPodGC"). + WithMessage("PodGC: node no longer exists"). + WithLastTransitionTime(metav1.Now()) + if err := gcc.markFailedAndDeletePodWithCondition(ctx, pod, condition); err != nil { utilruntime.HandleError(err) } else { @@ -316,26 +319,57 @@ func (gcc *PodGCController) markFailedAndDeletePod(ctx context.Context, pod *v1. return gcc.markFailedAndDeletePodWithCondition(ctx, pod, nil) } -func (gcc *PodGCController) markFailedAndDeletePodWithCondition(ctx context.Context, pod *v1.Pod, condition *v1.PodCondition) error { +func (gcc *PodGCController) markFailedAndDeletePodWithCondition(ctx context.Context, pod *v1.Pod, condition *corev1apply.PodConditionApplyConfiguration) error { klog.InfoS("PodGC is force deleting Pod", "pod", klog.KRef(pod.Namespace, pod.Name)) if utilfeature.DefaultFeatureGate.Enabled(features.PodDisruptionConditions) { - newStatus := pod.Status.DeepCopy() + + // Extact the pod status as PodGC may or may not own the pod phase, if + // it owns the phase then we need to send the field back if the condition + // is added. + podApply, err := corev1apply.ExtractPodStatus(pod, fieldManager) + if err != nil { + return nil + } + + // Set the status in case PodGC does not own any status fields yet + if podApply.Status == nil { + podApply.WithStatus(corev1apply.PodStatus()) + } + updated := false if condition != nil { - updated = apipod.UpdatePodCondition(newStatus, condition) + updatePodCondition(podApply.Status, condition) + updated = true } // Mark the pod as failed - this is especially important in case the pod // is orphaned, in which case the pod would remain in the Running phase // forever as there is no kubelet running to change the phase. if pod.Status.Phase != v1.PodSucceeded && pod.Status.Phase != v1.PodFailed { - newStatus.Phase = v1.PodFailed + podApply.Status.WithPhase(v1.PodFailed) updated = true } if updated { - if _, _, _, err := utilpod.PatchPodStatus(ctx, gcc.kubeClient, pod.Namespace, pod.Name, pod.UID, pod.Status, *newStatus); err != nil { + if _, err := gcc.kubeClient.CoreV1().Pods(pod.Namespace).ApplyStatus(ctx, podApply, metav1.ApplyOptions{FieldManager: fieldManager, Force: true}); err != nil { return err } } } return gcc.kubeClient.CoreV1().Pods(pod.Namespace).Delete(ctx, pod.Name, *metav1.NewDeleteOptions(0)) } + +func updatePodCondition(podStatusApply *corev1apply.PodStatusApplyConfiguration, condition *corev1apply.PodConditionApplyConfiguration) { + if conditionIndex, _ := findPodConditionApplyByType(podStatusApply.Conditions, *condition.Type); conditionIndex < 0 { + podStatusApply.WithConditions(condition) + } else { + podStatusApply.Conditions[conditionIndex] = *condition + } +} + +func findPodConditionApplyByType(conditionApplyList []corev1apply.PodConditionApplyConfiguration, cType v1.PodConditionType) (int, *corev1apply.PodConditionApplyConfiguration) { + for index, conditionApply := range conditionApplyList { + if *conditionApply.Type == cType { + return index, &conditionApply + } + } + return -1, nil +} diff --git a/pkg/scheduler/framework/preemption/preemption.go b/pkg/scheduler/framework/preemption/preemption.go index 8834fb5fe43..8d0c51479ef 100644 --- a/pkg/scheduler/framework/preemption/preemption.go +++ b/pkg/scheduler/framework/preemption/preemption.go @@ -26,15 +26,16 @@ import ( v1 "k8s.io/api/core/v1" policy "k8s.io/api/policy/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" utilerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apiserver/pkg/util/feature" + corev1apply "k8s.io/client-go/applyconfigurations/core/v1" corelisters "k8s.io/client-go/listers/core/v1" policylisters "k8s.io/client-go/listers/policy/v1" corev1helpers "k8s.io/component-helpers/scheduling/corev1" "k8s.io/klog/v2" extenderv1 "k8s.io/kube-scheduler/extender/v1" - apipod "k8s.io/kubernetes/pkg/api/v1/pod" "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework/parallelize" @@ -42,6 +43,11 @@ import ( "k8s.io/kubernetes/pkg/scheduler/util" ) +const ( + // fieldManager used to add pod disruption condition to the victim pods + fieldManager = "KubeScheduler" +) + // Candidate represents a nominated node on which the preemptor can be scheduled, // along with the list of victims that should be evicted for the preemptor to fit the node. type Candidate interface { @@ -351,19 +357,19 @@ func (ev *Evaluator) prepareCandidate(ctx context.Context, c Candidate, pod *v1. waitingPod.Reject(pluginName, "preempted") } else { if feature.DefaultFeatureGate.Enabled(features.PodDisruptionConditions) { - condition := &v1.PodCondition{ - Type: v1.AlphaNoCompatGuaranteeDisruptionTarget, - Status: v1.ConditionTrue, - Reason: "PreemptionByKubeScheduler", - Message: "Kube-scheduler: preempting", - } - newStatus := pod.Status.DeepCopy() - if apipod.UpdatePodCondition(newStatus, condition) { - if err := util.PatchPodStatus(ctx, cs, victim, newStatus); err != nil { - klog.ErrorS(err, "Preparing pod preemption", "pod", klog.KObj(victim), "preemptor", klog.KObj(pod)) - errCh.SendErrorWithCancel(err, cancel) - return - } + victimPodApply := corev1apply.Pod(victim.Name, victim.Namespace).WithStatus(corev1apply.PodStatus()) + victimPodApply.Status.WithConditions(corev1apply.PodCondition(). + WithType(v1.AlphaNoCompatGuaranteeDisruptionTarget). + WithStatus(v1.ConditionTrue). + WithReason("PreemptionByKubeScheduler"). + WithMessage(fmt.Sprintf("Kube-scheduler: preempting to accommodate a higher priority pod: %s", klog.KObj(pod))). + WithLastTransitionTime(metav1.Now()), + ) + + if _, err := cs.CoreV1().Pods(victim.Namespace).ApplyStatus(ctx, victimPodApply, metav1.ApplyOptions{FieldManager: fieldManager, Force: true}); err != nil { + klog.ErrorS(err, "Preparing pod preemption", "pod", klog.KObj(victim), "preemptor", klog.KObj(pod)) + errCh.SendErrorWithCancel(err, cancel) + return } } if err := util.DeletePod(ctx, cs, victim); err != nil { diff --git a/staging/src/k8s.io/client-go/testing/fixture.go b/staging/src/k8s.io/client-go/testing/fixture.go index 85444f9d055..396840670fd 100644 --- a/staging/src/k8s.io/client-go/testing/fixture.go +++ b/staging/src/k8s.io/client-go/testing/fixture.go @@ -181,7 +181,7 @@ func ObjectReaction(tracker ObjectTracker) ReactionFunc { if err := json.Unmarshal(modified, obj); err != nil { return true, nil, err } - case types.StrategicMergePatchType: + case types.StrategicMergePatchType, types.ApplyPatchType: mergedByte, err := strategicpatch.StrategicMergePatch(old, action.GetPatch(), obj) if err != nil { return true, nil, err