Merge pull request #121103 from mimowo/dont-use-ssa-for-podgc

Use Patch instead of SSA for Pod Disruption condition
This commit is contained in:
Kubernetes Prow Robot 2023-10-20 00:08:10 +02:00 committed by GitHub
commit 88d9573c30
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 167 additions and 73 deletions

View File

@ -34,7 +34,6 @@ import (
"k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/intstr"
utilruntime "k8s.io/apimachinery/pkg/util/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
corev1apply "k8s.io/client-go/applyconfigurations/core/v1"
"k8s.io/client-go/discovery" "k8s.io/client-go/discovery"
appsv1informers "k8s.io/client-go/informers/apps/v1" appsv1informers "k8s.io/client-go/informers/apps/v1"
coreinformers "k8s.io/client-go/informers/core/v1" coreinformers "k8s.io/client-go/informers/core/v1"
@ -74,9 +73,6 @@ const (
// Once the timeout is reached, this controller attempts to set the status // Once the timeout is reached, this controller attempts to set the status
// of the condition to False. // of the condition to False.
stalePodDisruptionTimeout = 2 * time.Minute stalePodDisruptionTimeout = 2 * time.Minute
// field manager used to disable the pod failure condition
fieldManager = "DisruptionController"
) )
type updater func(context.Context, *policy.PodDisruptionBudget) error type updater func(context.Context, *policy.PodDisruptionBudget) error
@ -770,16 +766,15 @@ func (dc *DisruptionController) syncStalePodDisruption(ctx context.Context, key
return nil return nil
} }
podApply := corev1apply.Pod(pod.Name, pod.Namespace). newPod := pod.DeepCopy()
WithStatus(corev1apply.PodStatus()). updated := apipod.UpdatePodCondition(&newPod.Status, &v1.PodCondition{
WithResourceVersion(pod.ResourceVersion) Type: v1.DisruptionTarget,
podApply.Status.WithConditions(corev1apply.PodCondition(). Status: v1.ConditionFalse,
WithType(v1.DisruptionTarget). })
WithStatus(v1.ConditionFalse). if !updated {
WithLastTransitionTime(metav1.Now()), return nil
) }
if _, err := dc.kubeClient.CoreV1().Pods(pod.Namespace).UpdateStatus(ctx, newPod, metav1.UpdateOptions{}); err != nil {
if _, err := dc.kubeClient.CoreV1().Pods(pod.Namespace).ApplyStatus(ctx, podApply, metav1.ApplyOptions{FieldManager: fieldManager, Force: true}); err != nil {
return err return err
} }
logger.V(2).Info("Reset stale DisruptionTarget condition to False", "pod", klog.KObj(pod)) logger.V(2).Info("Reset stale DisruptionTarget condition to False", "pod", klog.KObj(pod))

View File

@ -31,16 +31,17 @@ import (
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apiserver/pkg/util/feature" "k8s.io/apiserver/pkg/util/feature"
corev1apply "k8s.io/client-go/applyconfigurations/core/v1"
clientset "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/kubernetes/scheme"
v1core "k8s.io/client-go/kubernetes/typed/core/v1" v1core "k8s.io/client-go/kubernetes/typed/core/v1"
corelisters "k8s.io/client-go/listers/core/v1" corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/record" "k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue" "k8s.io/client-go/util/workqueue"
apipod "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/apis/core/helper" "k8s.io/kubernetes/pkg/apis/core/helper"
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper" v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
"k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/features"
utilpod "k8s.io/kubernetes/pkg/util/pod"
"k8s.io/klog/v2" "k8s.io/klog/v2"
) )
@ -55,9 +56,6 @@ const (
UpdateWorkerSize = 8 UpdateWorkerSize = 8
podUpdateChannelSize = 1 podUpdateChannelSize = 1
retries = 5 retries = 5
// fieldManager used to add pod disruption condition when evicting pods due to NoExecute taint
fieldManager = "TaintManager"
) )
type nodeUpdateItem struct { type nodeUpdateItem struct {
@ -127,16 +125,17 @@ func addConditionAndDeletePod(ctx context.Context, c clientset.Interface, name,
if err != nil { if err != nil {
return err return err
} }
podApply := corev1apply.Pod(pod.Name, pod.Namespace).WithStatus(corev1apply.PodStatus()) newStatus := pod.Status.DeepCopy()
podApply.Status.WithConditions(corev1apply.PodCondition(). updated := apipod.UpdatePodCondition(newStatus, &v1.PodCondition{
WithType(v1.DisruptionTarget). Type: v1.DisruptionTarget,
WithStatus(v1.ConditionTrue). Status: v1.ConditionTrue,
WithReason("DeletionByTaintManager"). Reason: "DeletionByTaintManager",
WithMessage("Taint manager: deleting due to NoExecute taint"). Message: "Taint manager: deleting due to NoExecute taint",
WithLastTransitionTime(metav1.Now()), })
) if updated {
if _, err := c.CoreV1().Pods(pod.Namespace).ApplyStatus(ctx, podApply, metav1.ApplyOptions{FieldManager: fieldManager, Force: true}); err != nil { if _, _, _, err := utilpod.PatchPodStatus(ctx, c, pod.Namespace, pod.Name, pod.UID, pod.Status, *newStatus); err != nil {
return err return err
}
} }
} }
return c.CoreV1().Pods(ns).Delete(ctx, name, metav1.DeleteOptions{}) return c.CoreV1().Pods(ns).Delete(ctx, name, metav1.DeleteOptions{})

View File

@ -30,17 +30,18 @@ import (
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature" utilfeature "k8s.io/apiserver/pkg/util/feature"
corev1apply "k8s.io/client-go/applyconfigurations/core/v1"
coreinformers "k8s.io/client-go/informers/core/v1" coreinformers "k8s.io/client-go/informers/core/v1"
clientset "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes"
corelisters "k8s.io/client-go/listers/core/v1" corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue" "k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2" "k8s.io/klog/v2"
apipod "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/controller/podgc/metrics" "k8s.io/kubernetes/pkg/controller/podgc/metrics"
"k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/kubelet/eviction" "k8s.io/kubernetes/pkg/kubelet/eviction"
nodeutil "k8s.io/kubernetes/pkg/util/node" nodeutil "k8s.io/kubernetes/pkg/util/node"
utilpod "k8s.io/kubernetes/pkg/util/pod"
"k8s.io/kubernetes/pkg/util/taints" "k8s.io/kubernetes/pkg/util/taints"
) )
@ -50,9 +51,6 @@ const (
// quarantineTime defines how long Orphaned GC waits for nodes to show up // quarantineTime defines how long Orphaned GC waits for nodes to show up
// in an informer before issuing a GET call to check if they are truly gone // in an informer before issuing a GET call to check if they are truly gone
quarantineTime = 40 * time.Second quarantineTime = 40 * time.Second
// field manager used to add pod failure condition and change the pod phase
fieldManager = "PodGC"
) )
type PodGCController struct { type PodGCController struct {
@ -249,12 +247,12 @@ func (gcc *PodGCController) gcOrphaned(ctx context.Context, pods []*v1.Pod, node
continue continue
} }
logger.V(2).Info("Found orphaned Pod assigned to the Node, deleting", "pod", klog.KObj(pod), "node", klog.KRef("", pod.Spec.NodeName)) logger.V(2).Info("Found orphaned Pod assigned to the Node, deleting", "pod", klog.KObj(pod), "node", klog.KRef("", pod.Spec.NodeName))
condition := corev1apply.PodCondition(). condition := &v1.PodCondition{
WithType(v1.DisruptionTarget). Type: v1.DisruptionTarget,
WithStatus(v1.ConditionTrue). Status: v1.ConditionTrue,
WithReason("DeletionByPodGC"). Reason: "DeletionByPodGC",
WithMessage("PodGC: node no longer exists"). Message: "PodGC: node no longer exists",
WithLastTransitionTime(metav1.Now()) }
if err := gcc.markFailedAndDeletePodWithCondition(ctx, pod, condition); err != nil { if err := gcc.markFailedAndDeletePodWithCondition(ctx, pod, condition); err != nil {
utilruntime.HandleError(err) utilruntime.HandleError(err)
metrics.DeletingPodsErrorTotal.WithLabelValues(pod.Namespace, metrics.PodGCReasonOrphaned).Inc() metrics.DeletingPodsErrorTotal.WithLabelValues(pod.Namespace, metrics.PodGCReasonOrphaned).Inc()
@ -341,7 +339,7 @@ func (gcc *PodGCController) markFailedAndDeletePod(ctx context.Context, pod *v1.
return gcc.markFailedAndDeletePodWithCondition(ctx, pod, nil) return gcc.markFailedAndDeletePodWithCondition(ctx, pod, nil)
} }
func (gcc *PodGCController) markFailedAndDeletePodWithCondition(ctx context.Context, pod *v1.Pod, condition *corev1apply.PodConditionApplyConfiguration) error { func (gcc *PodGCController) markFailedAndDeletePodWithCondition(ctx context.Context, pod *v1.Pod, condition *v1.PodCondition) error {
logger := klog.FromContext(ctx) logger := klog.FromContext(ctx)
logger.Info("PodGC is force deleting Pod", "pod", klog.KObj(pod)) logger.Info("PodGC is force deleting Pod", "pod", klog.KObj(pod))
// Patch the pod to make sure it is transitioned to the Failed phase before deletion. // Patch the pod to make sure it is transitioned to the Failed phase before deletion.
@ -354,17 +352,12 @@ func (gcc *PodGCController) markFailedAndDeletePodWithCondition(ctx context.Cont
// is orphaned, in which case the pod would remain in the Running phase // is orphaned, in which case the pod would remain in the Running phase
// forever as there is no kubelet running to change the phase. // forever as there is no kubelet running to change the phase.
if pod.Status.Phase != v1.PodSucceeded && pod.Status.Phase != v1.PodFailed { if pod.Status.Phase != v1.PodSucceeded && pod.Status.Phase != v1.PodFailed {
podApply := corev1apply.Pod(pod.Name, pod.Namespace).WithStatus(corev1apply.PodStatus()) newStatus := pod.Status.DeepCopy()
// we don't need to extract the pod apply configuration and can send newStatus.Phase = v1.PodFailed
// only phase and the DisruptionTarget condition as PodGC would not
// own other fields. If the DisruptionTarget condition is owned by
// PodGC it means that it is in the Failed phase, so sending the
// condition will not be re-attempted.
podApply.Status.WithPhase(v1.PodFailed)
if condition != nil && utilfeature.DefaultFeatureGate.Enabled(features.PodDisruptionConditions) { if condition != nil && utilfeature.DefaultFeatureGate.Enabled(features.PodDisruptionConditions) {
podApply.Status.WithConditions(condition) apipod.UpdatePodCondition(newStatus, condition)
} }
if _, err := gcc.kubeClient.CoreV1().Pods(pod.Namespace).ApplyStatus(ctx, podApply, metav1.ApplyOptions{FieldManager: fieldManager, Force: true}); err != nil { if _, _, _, err := utilpod.PatchPodStatus(ctx, gcc.kubeClient, pod.Namespace, pod.Name, pod.UID, pod.Status, *newStatus); err != nil {
return err return err
} }
} }

View File

@ -714,16 +714,16 @@ func TestGCInspectingPatchedPodBeforeDeletion(t *testing.T) {
Status: v1.PodStatus{ Status: v1.PodStatus{
Phase: v1.PodFailed, Phase: v1.PodFailed,
Conditions: []v1.PodCondition{ Conditions: []v1.PodCondition{
{
Type: v1.PodReady,
Status: v1.ConditionTrue,
},
{ {
Type: v1.DisruptionTarget, Type: v1.DisruptionTarget,
Status: v1.ConditionTrue, Status: v1.ConditionTrue,
Reason: "DeletionByPodGC", Reason: "DeletionByPodGC",
Message: "PodGC: node no longer exists", Message: "PodGC: node no longer exists",
}, },
{
Type: v1.PodReady,
Status: v1.ConditionTrue,
},
}, },
}, },
}, },

View File

@ -26,16 +26,15 @@ import (
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
policy "k8s.io/api/policy/v1" policy "k8s.io/api/policy/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
utilerrors "k8s.io/apimachinery/pkg/util/errors" utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apiserver/pkg/util/feature" "k8s.io/apiserver/pkg/util/feature"
corev1apply "k8s.io/client-go/applyconfigurations/core/v1"
corelisters "k8s.io/client-go/listers/core/v1" corelisters "k8s.io/client-go/listers/core/v1"
policylisters "k8s.io/client-go/listers/policy/v1" policylisters "k8s.io/client-go/listers/policy/v1"
corev1helpers "k8s.io/component-helpers/scheduling/corev1" corev1helpers "k8s.io/component-helpers/scheduling/corev1"
"k8s.io/klog/v2" "k8s.io/klog/v2"
extenderv1 "k8s.io/kube-scheduler/extender/v1" extenderv1 "k8s.io/kube-scheduler/extender/v1"
apipod "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/parallelize" "k8s.io/kubernetes/pkg/scheduler/framework/parallelize"
@ -43,11 +42,6 @@ import (
"k8s.io/kubernetes/pkg/scheduler/util" "k8s.io/kubernetes/pkg/scheduler/util"
) )
const (
// fieldManager used to add pod disruption condition to the victim pods
fieldManager = "KubeScheduler"
)
// Candidate represents a nominated node on which the preemptor can be scheduled, // Candidate represents a nominated node on which the preemptor can be scheduled,
// along with the list of victims that should be evicted for the preemptor to fit the node. // along with the list of victims that should be evicted for the preemptor to fit the node.
type Candidate interface { type Candidate interface {
@ -362,19 +356,20 @@ func (ev *Evaluator) prepareCandidate(ctx context.Context, c Candidate, pod *v1.
klog.V(2).InfoS("Preemptor pod rejected a waiting pod", "preemptor", klog.KObj(pod), "waitingPod", klog.KObj(victim), "node", c.Name()) klog.V(2).InfoS("Preemptor pod rejected a waiting pod", "preemptor", klog.KObj(pod), "waitingPod", klog.KObj(victim), "node", c.Name())
} else { } else {
if feature.DefaultFeatureGate.Enabled(features.PodDisruptionConditions) { if feature.DefaultFeatureGate.Enabled(features.PodDisruptionConditions) {
victimPodApply := corev1apply.Pod(victim.Name, victim.Namespace).WithStatus(corev1apply.PodStatus()) condition := &v1.PodCondition{
victimPodApply.Status.WithConditions(corev1apply.PodCondition(). Type: v1.DisruptionTarget,
WithType(v1.DisruptionTarget). Status: v1.ConditionTrue,
WithStatus(v1.ConditionTrue). Reason: v1.PodReasonPreemptionByScheduler,
WithReason(v1.PodReasonPreemptionByScheduler). Message: fmt.Sprintf("%s: preempting to accommodate a higher priority pod", pod.Spec.SchedulerName),
WithMessage(fmt.Sprintf("%s: preempting to accommodate a higher priority pod", pod.Spec.SchedulerName)). }
WithLastTransitionTime(metav1.Now()), newStatus := pod.Status.DeepCopy()
) updated := apipod.UpdatePodCondition(newStatus, condition)
if updated {
if _, err := cs.CoreV1().Pods(victim.Namespace).ApplyStatus(ctx, victimPodApply, metav1.ApplyOptions{FieldManager: fieldManager, Force: true}); err != nil { if err := util.PatchPodStatus(ctx, cs, victim, newStatus); err != nil {
logger.Error(err, "Could not add DisruptionTarget condition due to preemption", "pod", klog.KObj(victim), "preemptor", klog.KObj(pod)) logger.Error(err, "Could not add DisruptionTarget condition due to preemption", "pod", klog.KObj(victim), "preemptor", klog.KObj(pod))
errCh.SendErrorWithCancel(err, cancel) errCh.SendErrorWithCancel(err, cancel)
return return
}
} }
} }
if err := util.DeletePod(ctx, cs, victim); err != nil { if err := util.DeletePod(ctx, cs, victim); err != nil {

View File

@ -140,7 +140,7 @@ func buildControllerRoles() ([]rbacv1.ClusterRole, []rbacv1.ClusterRoleBinding)
}, },
} }
if utilfeature.DefaultFeatureGate.Enabled(features.PodDisruptionConditions) { if utilfeature.DefaultFeatureGate.Enabled(features.PodDisruptionConditions) {
role.Rules = append(role.Rules, rbacv1helpers.NewRule("patch").Groups(legacyGroup).Resources("pods/status").RuleOrDie()) role.Rules = append(role.Rules, rbacv1helpers.NewRule("patch", "update").Groups(legacyGroup).Resources("pods/status").RuleOrDie())
} }
return role return role
}()) }())

View File

@ -436,6 +436,7 @@ items:
- pods/status - pods/status
verbs: verbs:
- patch - patch
- update
- apiVersion: rbac.authorization.k8s.io/v1 - apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole kind: ClusterRole
metadata: metadata:

View File

@ -309,6 +309,117 @@ func TestTerminatingOnOutOfServiceNode(t *testing.T) {
} }
} }
// TestPodGcForPodsWithDuplicatedFieldKeys regression test for https://issues.k8s.io/118261
func TestPodGcForPodsWithDuplicatedFieldKeys(t *testing.T) {
tests := map[string]struct {
pod *v1.Pod
wantDisruptionTarget *v1.PodCondition
}{
"Orphan pod with duplicated env vars": {
pod: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "testpod",
Finalizers: []string{"test.k8s.io/finalizer"},
},
Spec: v1.PodSpec{
NodeName: "non-existing-node",
Containers: []v1.Container{
{
Name: "foo",
Image: "bar",
Env: []v1.EnvVar{
{
Name: "XYZ",
Value: "1",
},
{
Name: "XYZ",
Value: "2",
},
},
},
},
},
},
wantDisruptionTarget: &v1.PodCondition{
Type: v1.DisruptionTarget,
Status: v1.ConditionTrue,
Reason: "DeletionByPodGC",
Message: "PodGC: node no longer exists",
},
},
"Orphan pod with duplicated ports; scenario from https://issues.k8s.io/113482": {
pod: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "testpod",
Finalizers: []string{"test.k8s.io/finalizer"},
},
Spec: v1.PodSpec{
NodeName: "non-existing-node",
Containers: []v1.Container{
{
Name: "foo",
Image: "bar",
Ports: []v1.ContainerPort{
{
ContainerPort: 93,
HostPort: 9376,
},
{
ContainerPort: 93,
HostPort: 9377,
},
},
},
},
},
},
wantDisruptionTarget: &v1.PodCondition{
Type: v1.DisruptionTarget,
Status: v1.ConditionTrue,
Reason: "DeletionByPodGC",
Message: "PodGC: node no longer exists",
},
},
}
for name, test := range tests {
t.Run(name, func(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.PodDisruptionConditions, true)()
testCtx := setup(t, "podgc-orphaned")
cs := testCtx.ClientSet
pod := test.pod
pod.Namespace = testCtx.NS.Namespace
pod, err := cs.CoreV1().Pods(testCtx.NS.Name).Create(testCtx.Ctx, pod, metav1.CreateOptions{})
if err != nil {
t.Fatalf("Error %v, while creating pod: %v", err, klog.KObj(pod))
}
defer testutils.RemovePodFinalizers(testCtx.Ctx, testCtx.ClientSet, t, *pod)
// getting evicted due to NodeName being "non-existing-node"
err = wait.PollUntilContextTimeout(testCtx.Ctx, time.Second, time.Second*15, true, testutils.PodIsGettingEvicted(cs, pod.Namespace, pod.Name))
if err != nil {
t.Fatalf("Error '%v' while waiting for the pod '%v' to be terminating", err, klog.KObj(pod))
}
pod, err = cs.CoreV1().Pods(testCtx.NS.Name).Get(testCtx.Ctx, pod.Name, metav1.GetOptions{})
if err != nil {
t.Fatalf("Error: '%v' while updating pod info: '%v'", err, klog.KObj(pod))
}
_, gotDisruptionTarget := podutil.GetPodCondition(&pod.Status, v1.DisruptionTarget)
if diff := cmp.Diff(test.wantDisruptionTarget, gotDisruptionTarget, cmpopts.IgnoreFields(v1.PodCondition{}, "LastTransitionTime")); diff != "" {
t.Errorf("Pod %v has unexpected DisruptionTarget condition: %s", klog.KObj(pod), diff)
}
if gotDisruptionTarget != nil && gotDisruptionTarget.LastTransitionTime.IsZero() {
t.Errorf("Pod %v has DisruptionTarget condition without LastTransitionTime", klog.KObj(pod))
}
if pod.Status.Phase != v1.PodFailed {
t.Errorf("Unexpected phase for pod %q. Got: %q, want: %q", klog.KObj(pod), pod.Status.Phase, v1.PodFailed)
}
})
}
}
func setup(t *testing.T, name string) *testutils.TestContext { func setup(t *testing.T, name string) *testutils.TestContext {
testCtx := testutils.InitTestAPIServer(t, name, nil) testCtx := testutils.InitTestAPIServer(t, name, nil)
externalInformers := informers.NewSharedInformerFactory(testCtx.ClientSet, time.Second) externalInformers := informers.NewSharedInformerFactory(testCtx.ClientSet, time.Second)