Merge pull request #113304 from mimowo/handling-pod-failures-beta-ssa

Use SSA to add pod failure conditions
This commit is contained in:
Kubernetes Prow Robot 2022-10-28 07:32:32 -07:00 committed by GitHub
commit 3c9928e4f8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 96 additions and 50 deletions

View File

@ -35,6 +35,7 @@ import (
"k8s.io/apimachinery/pkg/util/intstr"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
corev1apply "k8s.io/client-go/applyconfigurations/core/v1"
"k8s.io/client-go/discovery"
appsv1informers "k8s.io/client-go/informers/apps/v1"
coreinformers "k8s.io/client-go/informers/core/v1"
@ -53,7 +54,6 @@ import (
"k8s.io/klog/v2"
apipod "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/controller"
utilpod "k8s.io/kubernetes/pkg/util/pod"
"k8s.io/utils/clock"
)
@ -75,6 +75,9 @@ const (
// Once the timeout is reached, this controller attempts to set the status
// of the condition to False.
stalePodDisruptionTimeout = 2 * time.Minute
// field manager used to disable the pod failure condition
fieldManager = "DisruptionController"
)
type updater func(context.Context, *policy.PodDisruptionBudget) error
@ -748,15 +751,16 @@ func (dc *DisruptionController) syncStalePodDisruption(ctx context.Context, key
return nil
}
newStatus := pod.Status.DeepCopy()
updated := apipod.UpdatePodCondition(newStatus, &v1.PodCondition{
Type: v1.AlphaNoCompatGuaranteeDisruptionTarget,
Status: v1.ConditionFalse,
})
if !updated {
return nil
}
if _, _, _, err := utilpod.PatchPodStatus(ctx, dc.kubeClient, namespace, name, pod.UID, pod.Status, *newStatus); err != nil {
podApply := corev1apply.Pod(pod.Name, pod.Namespace).
WithStatus(corev1apply.PodStatus()).
WithResourceVersion(pod.ResourceVersion)
podApply.Status.WithConditions(corev1apply.PodCondition().
WithType(v1.AlphaNoCompatGuaranteeDisruptionTarget).
WithStatus(v1.ConditionFalse).
WithLastTransitionTime(metav1.Now()),
)
if _, err := dc.kubeClient.CoreV1().Pods(pod.Namespace).ApplyStatus(ctx, podApply, metav1.ApplyOptions{FieldManager: fieldManager, Force: true}); err != nil {
return err
}
klog.V(2).InfoS("Reset stale DisruptionTarget condition to False", "pod", klog.KObj(pod))

View File

@ -31,17 +31,16 @@ import (
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apiserver/pkg/util/feature"
corev1apply "k8s.io/client-go/applyconfigurations/core/v1"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
apipod "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/apis/core/helper"
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
"k8s.io/kubernetes/pkg/features"
utilpod "k8s.io/kubernetes/pkg/util/pod"
"k8s.io/klog/v2"
)
@ -56,6 +55,9 @@ const (
UpdateWorkerSize = 8
podUpdateChannelSize = 1
retries = 5
// fieldManager used to add pod disruption condition when evicting pods due to NoExecute taint
fieldManager = "TaintManager"
)
type nodeUpdateItem struct {
@ -125,18 +127,18 @@ func addConditionAndDeletePod(ctx context.Context, c clientset.Interface, name,
if err != nil {
return err
}
newStatus := pod.Status.DeepCopy()
if apipod.UpdatePodCondition(newStatus, &v1.PodCondition{
Type: v1.AlphaNoCompatGuaranteeDisruptionTarget,
Status: v1.ConditionTrue,
Reason: "DeletionByTaintManager",
Message: "Taint manager: deleting due to NoExecute taint",
}) {
if _, _, _, err = utilpod.PatchPodStatus(ctx, c, pod.Namespace, pod.Name, pod.UID, pod.Status, *newStatus); err != nil {
podApply := corev1apply.Pod(pod.Name, pod.Namespace).WithStatus(corev1apply.PodStatus())
podApply.Status.WithConditions(corev1apply.PodCondition().
WithType(v1.AlphaNoCompatGuaranteeDisruptionTarget).
WithStatus(v1.ConditionTrue).
WithReason("DeletionByTaintManager").
WithMessage("Taint manager: deleting due to NoExecute taint").
WithLastTransitionTime(metav1.Now()),
)
if _, err := c.CoreV1().Pods(pod.Namespace).ApplyStatus(ctx, podApply, metav1.ApplyOptions{FieldManager: fieldManager, Force: true}); err != nil {
return err
}
}
}
return c.CoreV1().Pods(ns).Delete(ctx, name, metav1.DeleteOptions{})
}

View File

@ -30,16 +30,15 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
corev1apply "k8s.io/client-go/applyconfigurations/core/v1"
coreinformers "k8s.io/client-go/informers/core/v1"
clientset "k8s.io/client-go/kubernetes"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
apipod "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/features"
nodeutil "k8s.io/kubernetes/pkg/util/node"
utilpod "k8s.io/kubernetes/pkg/util/pod"
"k8s.io/kubernetes/pkg/util/taints"
)
@ -49,6 +48,9 @@ const (
// quarantineTime defines how long Orphaned GC waits for nodes to show up
// in an informer before issuing a GET call to check if they are truly gone
quarantineTime = 40 * time.Second
// field manager used to add pod failure condition and change the pod phase
fieldManager = "PodGC"
)
type PodGCController struct {
@ -236,12 +238,13 @@ func (gcc *PodGCController) gcOrphaned(ctx context.Context, pods []*v1.Pod, node
continue
}
klog.V(2).InfoS("Found orphaned Pod assigned to the Node, deleting.", "pod", klog.KObj(pod), "node", pod.Spec.NodeName)
condition := &v1.PodCondition{
Type: v1.AlphaNoCompatGuaranteeDisruptionTarget,
Status: v1.ConditionTrue,
Reason: "DeletionByPodGC",
Message: "PodGC: node no longer exists",
}
condition := corev1apply.PodCondition().
WithType(v1.AlphaNoCompatGuaranteeDisruptionTarget).
WithStatus(v1.ConditionTrue).
WithReason("DeletionByPodGC").
WithMessage("PodGC: node no longer exists").
WithLastTransitionTime(metav1.Now())
if err := gcc.markFailedAndDeletePodWithCondition(ctx, pod, condition); err != nil {
utilruntime.HandleError(err)
} else {
@ -316,26 +319,57 @@ func (gcc *PodGCController) markFailedAndDeletePod(ctx context.Context, pod *v1.
return gcc.markFailedAndDeletePodWithCondition(ctx, pod, nil)
}
func (gcc *PodGCController) markFailedAndDeletePodWithCondition(ctx context.Context, pod *v1.Pod, condition *v1.PodCondition) error {
func (gcc *PodGCController) markFailedAndDeletePodWithCondition(ctx context.Context, pod *v1.Pod, condition *corev1apply.PodConditionApplyConfiguration) error {
klog.InfoS("PodGC is force deleting Pod", "pod", klog.KRef(pod.Namespace, pod.Name))
if utilfeature.DefaultFeatureGate.Enabled(features.PodDisruptionConditions) {
newStatus := pod.Status.DeepCopy()
// Extact the pod status as PodGC may or may not own the pod phase, if
// it owns the phase then we need to send the field back if the condition
// is added.
podApply, err := corev1apply.ExtractPodStatus(pod, fieldManager)
if err != nil {
return nil
}
// Set the status in case PodGC does not own any status fields yet
if podApply.Status == nil {
podApply.WithStatus(corev1apply.PodStatus())
}
updated := false
if condition != nil {
updated = apipod.UpdatePodCondition(newStatus, condition)
updatePodCondition(podApply.Status, condition)
updated = true
}
// Mark the pod as failed - this is especially important in case the pod
// is orphaned, in which case the pod would remain in the Running phase
// forever as there is no kubelet running to change the phase.
if pod.Status.Phase != v1.PodSucceeded && pod.Status.Phase != v1.PodFailed {
newStatus.Phase = v1.PodFailed
podApply.Status.WithPhase(v1.PodFailed)
updated = true
}
if updated {
if _, _, _, err := utilpod.PatchPodStatus(ctx, gcc.kubeClient, pod.Namespace, pod.Name, pod.UID, pod.Status, *newStatus); err != nil {
if _, err := gcc.kubeClient.CoreV1().Pods(pod.Namespace).ApplyStatus(ctx, podApply, metav1.ApplyOptions{FieldManager: fieldManager, Force: true}); err != nil {
return err
}
}
}
return gcc.kubeClient.CoreV1().Pods(pod.Namespace).Delete(ctx, pod.Name, *metav1.NewDeleteOptions(0))
}
func updatePodCondition(podStatusApply *corev1apply.PodStatusApplyConfiguration, condition *corev1apply.PodConditionApplyConfiguration) {
if conditionIndex, _ := findPodConditionApplyByType(podStatusApply.Conditions, *condition.Type); conditionIndex < 0 {
podStatusApply.WithConditions(condition)
} else {
podStatusApply.Conditions[conditionIndex] = *condition
}
}
func findPodConditionApplyByType(conditionApplyList []corev1apply.PodConditionApplyConfiguration, cType v1.PodConditionType) (int, *corev1apply.PodConditionApplyConfiguration) {
for index, conditionApply := range conditionApplyList {
if *conditionApply.Type == cType {
return index, &conditionApply
}
}
return -1, nil
}

View File

@ -26,15 +26,16 @@ import (
v1 "k8s.io/api/core/v1"
policy "k8s.io/api/policy/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apiserver/pkg/util/feature"
corev1apply "k8s.io/client-go/applyconfigurations/core/v1"
corelisters "k8s.io/client-go/listers/core/v1"
policylisters "k8s.io/client-go/listers/policy/v1"
corev1helpers "k8s.io/component-helpers/scheduling/corev1"
"k8s.io/klog/v2"
extenderv1 "k8s.io/kube-scheduler/extender/v1"
apipod "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/parallelize"
@ -42,6 +43,11 @@ import (
"k8s.io/kubernetes/pkg/scheduler/util"
)
const (
// fieldManager used to add pod disruption condition to the victim pods
fieldManager = "KubeScheduler"
)
// Candidate represents a nominated node on which the preemptor can be scheduled,
// along with the list of victims that should be evicted for the preemptor to fit the node.
type Candidate interface {
@ -351,21 +357,21 @@ func (ev *Evaluator) prepareCandidate(ctx context.Context, c Candidate, pod *v1.
waitingPod.Reject(pluginName, "preempted")
} else {
if feature.DefaultFeatureGate.Enabled(features.PodDisruptionConditions) {
condition := &v1.PodCondition{
Type: v1.AlphaNoCompatGuaranteeDisruptionTarget,
Status: v1.ConditionTrue,
Reason: "PreemptionByKubeScheduler",
Message: "Kube-scheduler: preempting",
}
newStatus := pod.Status.DeepCopy()
if apipod.UpdatePodCondition(newStatus, condition) {
if err := util.PatchPodStatus(ctx, cs, victim, newStatus); err != nil {
victimPodApply := corev1apply.Pod(victim.Name, victim.Namespace).WithStatus(corev1apply.PodStatus())
victimPodApply.Status.WithConditions(corev1apply.PodCondition().
WithType(v1.AlphaNoCompatGuaranteeDisruptionTarget).
WithStatus(v1.ConditionTrue).
WithReason("PreemptionByKubeScheduler").
WithMessage(fmt.Sprintf("Kube-scheduler: preempting to accommodate a higher priority pod: %s", klog.KObj(pod))).
WithLastTransitionTime(metav1.Now()),
)
if _, err := cs.CoreV1().Pods(victim.Namespace).ApplyStatus(ctx, victimPodApply, metav1.ApplyOptions{FieldManager: fieldManager, Force: true}); err != nil {
klog.ErrorS(err, "Preparing pod preemption", "pod", klog.KObj(victim), "preemptor", klog.KObj(pod))
errCh.SendErrorWithCancel(err, cancel)
return
}
}
}
if err := util.DeletePod(ctx, cs, victim); err != nil {
klog.ErrorS(err, "Preempting pod", "pod", klog.KObj(victim), "preemptor", klog.KObj(pod))
errCh.SendErrorWithCancel(err, cancel)

View File

@ -181,7 +181,7 @@ func ObjectReaction(tracker ObjectTracker) ReactionFunc {
if err := json.Unmarshal(modified, obj); err != nil {
return true, nil, err
}
case types.StrategicMergePatchType:
case types.StrategicMergePatchType, types.ApplyPatchType:
mergedByte, err := strategicpatch.StrategicMergePatch(old, action.GetPatch(), obj)
if err != nil {
return true, nil, err