Merge pull request #125994 from carlory/fix-job-api

clean up codes after PodDisruptionConditions was promoted to GA
This commit is contained in:
Kubernetes Prow Robot 2024-07-17 14:37:09 -07:00 committed by GitHub
commit 5d40866fae
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
18 changed files with 85 additions and 154 deletions

View File

@ -1903,9 +1903,8 @@ func ensureJobConditionStatus(list []batch.JobCondition, cType batch.JobConditio
} }
func isPodFailed(p *v1.Pod, job *batch.Job) bool { func isPodFailed(p *v1.Pod, job *batch.Job) bool {
if feature.DefaultFeatureGate.Enabled(features.PodDisruptionConditions) && feature.DefaultFeatureGate.Enabled(features.JobPodFailurePolicy) && job.Spec.PodFailurePolicy != nil { if feature.DefaultFeatureGate.Enabled(features.JobPodFailurePolicy) && job.Spec.PodFailurePolicy != nil {
// When PodDisruptionConditions is enabled, orphan Pods and unschedulable // Orphan Pods and unschedulable terminating Pods are marked as Failed. So we only need to check the phase.
// terminating Pods are marked as Failed. So we only need to check the phase.
return p.Status.Phase == v1.PodFailed return p.Status.Phase == v1.PodFailed
} }
if p.Status.Phase == v1.PodFailed { if p.Status.Phase == v1.PodFailed {

View File

@ -29,7 +29,6 @@ import (
utilruntime "k8s.io/apimachinery/pkg/util/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
coreinformers "k8s.io/client-go/informers/core/v1" coreinformers "k8s.io/client-go/informers/core/v1"
clientset "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes"
corelisters "k8s.io/client-go/listers/core/v1" corelisters "k8s.io/client-go/listers/core/v1"
@ -38,7 +37,6 @@ import (
"k8s.io/klog/v2" "k8s.io/klog/v2"
apipod "k8s.io/kubernetes/pkg/api/v1/pod" apipod "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/controller/podgc/metrics" "k8s.io/kubernetes/pkg/controller/podgc/metrics"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/kubelet/eviction" "k8s.io/kubernetes/pkg/kubelet/eviction"
nodeutil "k8s.io/kubernetes/pkg/util/node" nodeutil "k8s.io/kubernetes/pkg/util/node"
utilpod "k8s.io/kubernetes/pkg/util/pod" utilpod "k8s.io/kubernetes/pkg/util/pod"
@ -343,23 +341,18 @@ func (gcc *PodGCController) markFailedAndDeletePodWithCondition(ctx context.Cont
logger := klog.FromContext(ctx) logger := klog.FromContext(ctx)
logger.Info("PodGC is force deleting Pod", "pod", klog.KObj(pod)) logger.Info("PodGC is force deleting Pod", "pod", klog.KObj(pod))
// Patch the pod to make sure it is transitioned to the Failed phase before deletion. // Patch the pod to make sure it is transitioned to the Failed phase before deletion.
// This is needed for the JobPodReplacementPolicy feature to make sure Job replacement pods are created. //
// See https://github.com/kubernetes/enhancements/tree/master/keps/sig-apps/3939-allow-replacement-when-fully-terminated#risks-and-mitigations // Mark the pod as failed - this is especially important in case the pod
// for more details. // is orphaned, in which case the pod would remain in the Running phase
if utilfeature.DefaultFeatureGate.Enabled(features.PodDisruptionConditions) || utilfeature.DefaultFeatureGate.Enabled(features.JobPodReplacementPolicy) { // forever as there is no kubelet running to change the phase.
if pod.Status.Phase != v1.PodSucceeded && pod.Status.Phase != v1.PodFailed {
// Mark the pod as failed - this is especially important in case the pod newStatus := pod.Status.DeepCopy()
// is orphaned, in which case the pod would remain in the Running phase newStatus.Phase = v1.PodFailed
// forever as there is no kubelet running to change the phase. if condition != nil {
if pod.Status.Phase != v1.PodSucceeded && pod.Status.Phase != v1.PodFailed { apipod.UpdatePodCondition(newStatus, condition)
newStatus := pod.Status.DeepCopy() }
newStatus.Phase = v1.PodFailed if _, _, _, err := utilpod.PatchPodStatus(ctx, gcc.kubeClient, pod.Namespace, pod.Name, pod.UID, pod.Status, *newStatus); err != nil {
if condition != nil && utilfeature.DefaultFeatureGate.Enabled(features.PodDisruptionConditions) { return err
apipod.UpdatePodCondition(newStatus, condition)
}
if _, _, _, err := utilpod.PatchPodStatus(ctx, gcc.kubeClient, pod.Namespace, pod.Name, pod.UID, pod.Status, *newStatus); err != nil {
return err
}
} }
} }
return gcc.kubeClient.CoreV1().Pods(pod.Namespace).Delete(ctx, pod.Name, *metav1.NewDeleteOptions(0)) return gcc.kubeClient.CoreV1().Pods(pod.Namespace).Delete(ctx, pod.Name, *metav1.NewDeleteOptions(0))

View File

@ -30,7 +30,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apiserver/pkg/util/feature"
corev1informers "k8s.io/client-go/informers/core/v1" corev1informers "k8s.io/client-go/informers/core/v1"
clientset "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/kubernetes/scheme"
@ -45,7 +44,6 @@ import (
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper" v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
"k8s.io/kubernetes/pkg/controller/tainteviction/metrics" "k8s.io/kubernetes/pkg/controller/tainteviction/metrics"
controllerutil "k8s.io/kubernetes/pkg/controller/util/node" controllerutil "k8s.io/kubernetes/pkg/controller/util/node"
"k8s.io/kubernetes/pkg/features"
utilpod "k8s.io/kubernetes/pkg/util/pod" utilpod "k8s.io/kubernetes/pkg/util/pod"
) )
@ -129,23 +127,21 @@ func deletePodHandler(c clientset.Interface, emitEventFunc func(types.Namespaced
} }
func addConditionAndDeletePod(ctx context.Context, c clientset.Interface, name, ns string) (err error) { func addConditionAndDeletePod(ctx context.Context, c clientset.Interface, name, ns string) (err error) {
if feature.DefaultFeatureGate.Enabled(features.PodDisruptionConditions) { pod, err := c.CoreV1().Pods(ns).Get(ctx, name, metav1.GetOptions{})
pod, err := c.CoreV1().Pods(ns).Get(ctx, name, metav1.GetOptions{}) if err != nil {
if err != nil { return err
}
newStatus := pod.Status.DeepCopy()
updated := apipod.UpdatePodCondition(newStatus, &v1.PodCondition{
Type: v1.DisruptionTarget,
Status: v1.ConditionTrue,
Reason: "DeletionByTaintManager",
Message: "Taint manager: deleting due to NoExecute taint",
})
if updated {
if _, _, _, err := utilpod.PatchPodStatus(ctx, c, pod.Namespace, pod.Name, pod.UID, pod.Status, *newStatus); err != nil {
return err return err
} }
newStatus := pod.Status.DeepCopy()
updated := apipod.UpdatePodCondition(newStatus, &v1.PodCondition{
Type: v1.DisruptionTarget,
Status: v1.ConditionTrue,
Reason: "DeletionByTaintManager",
Message: "Taint manager: deleting due to NoExecute taint",
})
if updated {
if _, _, _, err := utilpod.PatchPodStatus(ctx, c, pod.Namespace, pod.Name, pod.UID, pod.Status, *newStatus); err != nil {
return err
}
}
} }
return c.CoreV1().Pods(ns).Delete(ctx, name, metav1.DeleteOptions{}) return c.CoreV1().Pods(ns).Delete(ctx, name, metav1.DeleteOptions{})
} }

View File

@ -411,14 +411,11 @@ func (m *managerImpl) synchronize(diskInfoProvider DiskInfoProvider, podFunc Act
gracePeriodOverride = m.config.MaxPodGracePeriodSeconds gracePeriodOverride = m.config.MaxPodGracePeriodSeconds
} }
message, annotations := evictionMessage(resourceToReclaim, pod, statsFunc, thresholds, observations) message, annotations := evictionMessage(resourceToReclaim, pod, statsFunc, thresholds, observations)
var condition *v1.PodCondition condition := &v1.PodCondition{
if utilfeature.DefaultFeatureGate.Enabled(features.PodDisruptionConditions) { Type: v1.DisruptionTarget,
condition = &v1.PodCondition{ Status: v1.ConditionTrue,
Type: v1.DisruptionTarget, Reason: v1.PodReasonTerminationByKubelet,
Status: v1.ConditionTrue, Message: message,
Reason: v1.PodReasonTerminationByKubelet,
Message: message,
}
} }
if m.evictPod(pod, gracePeriodOverride, message, annotations, condition) { if m.evictPod(pod, gracePeriodOverride, message, annotations, condition) {
metrics.Evictions.WithLabelValues(string(thresholdToReclaim.Signal)).Inc() metrics.Evictions.WithLabelValues(string(thresholdToReclaim.Signal)).Inc()

View File

@ -1835,15 +1835,13 @@ func (kl *Kubelet) generateAPIPodStatus(pod *v1.Pod, podStatus *kubecontainer.Po
} }
} }
if utilfeature.DefaultFeatureGate.Enabled(features.PodDisruptionConditions) { // copy over the pod disruption conditions from state which is already
// copy over the pod disruption conditions from state which is already // updated during the eviciton (due to either node resource pressure or
// updated during the eviciton (due to either node resource pressure or // node graceful shutdown). We do not re-generate the conditions based
// node graceful shutdown). We do not re-generate the conditions based // on the container statuses as they are added based on one-time events.
// on the container statuses as they are added based on one-time events. cType := v1.DisruptionTarget
cType := v1.DisruptionTarget if _, condition := podutil.GetPodConditionFromList(oldPodStatus.Conditions, cType); condition != nil {
if _, condition := podutil.GetPodConditionFromList(oldPodStatus.Conditions, cType); condition != nil { s.Conditions = utilpod.ReplaceOrAppendPodCondition(s.Conditions, condition)
s.Conditions = utilpod.ReplaceOrAppendPodCondition(s.Conditions, condition)
}
} }
// set all Kubelet-owned conditions // set all Kubelet-owned conditions

View File

@ -381,14 +381,12 @@ func (m *managerImpl) processShutdownEvent() error {
} }
status.Message = nodeShutdownMessage status.Message = nodeShutdownMessage
status.Reason = nodeShutdownReason status.Reason = nodeShutdownReason
if utilfeature.DefaultFeatureGate.Enabled(features.PodDisruptionConditions) { podutil.UpdatePodCondition(status, &v1.PodCondition{
podutil.UpdatePodCondition(status, &v1.PodCondition{ Type: v1.DisruptionTarget,
Type: v1.DisruptionTarget, Status: v1.ConditionTrue,
Status: v1.ConditionTrue, Reason: v1.PodReasonTerminationByKubelet,
Reason: v1.PodReasonTerminationByKubelet, Message: nodeShutdownMessage,
Message: nodeShutdownMessage, })
})
}
}); err != nil { }); err != nil {
m.logger.V(1).Info("Shutdown manager failed killing pod", "pod", klog.KObj(pod), "err", err) m.logger.V(1).Info("Shutdown manager failed killing pod", "pod", klog.KObj(pod), "err", err)
} else { } else {

View File

@ -21,13 +21,11 @@ import (
"math" "math"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/tools/record" "k8s.io/client-go/tools/record"
"k8s.io/klog/v2" "k8s.io/klog/v2"
podutil "k8s.io/kubernetes/pkg/api/v1/pod" podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/api/v1/resource" "k8s.io/kubernetes/pkg/api/v1/resource"
v1qos "k8s.io/kubernetes/pkg/apis/core/v1/helper/qos" v1qos "k8s.io/kubernetes/pkg/apis/core/v1/helper/qos"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/kubelet/events" "k8s.io/kubernetes/pkg/kubelet/events"
"k8s.io/kubernetes/pkg/kubelet/eviction" "k8s.io/kubernetes/pkg/kubelet/eviction"
"k8s.io/kubernetes/pkg/kubelet/lifecycle" "k8s.io/kubernetes/pkg/kubelet/lifecycle"
@ -106,14 +104,12 @@ func (c *CriticalPodAdmissionHandler) evictPodsToFreeRequests(admitPod *v1.Pod,
status.Phase = v1.PodFailed status.Phase = v1.PodFailed
status.Reason = events.PreemptContainer status.Reason = events.PreemptContainer
status.Message = message status.Message = message
if utilfeature.DefaultFeatureGate.Enabled(features.PodDisruptionConditions) { podutil.UpdatePodCondition(status, &v1.PodCondition{
podutil.UpdatePodCondition(status, &v1.PodCondition{ Type: v1.DisruptionTarget,
Type: v1.DisruptionTarget, Status: v1.ConditionTrue,
Status: v1.ConditionTrue, Reason: v1.PodReasonTerminationByKubelet,
Reason: v1.PodReasonTerminationByKubelet, Message: "Pod was preempted by Kubelet to accommodate a critical pod.",
Message: "Pod was preempted by Kubelet to accommodate a critical pod.", })
})
}
}) })
if err != nil { if err != nil {
klog.ErrorS(err, "Failed to evict pod", "pod", klog.KObj(pod)) klog.ErrorS(err, "Failed to evict pod", "pod", klog.KObj(pod))

View File

@ -639,10 +639,8 @@ func (m *manager) updateStatusInternal(pod *v1.Pod, status v1.PodStatus, forceUp
// Set PodScheduledCondition.LastTransitionTime. // Set PodScheduledCondition.LastTransitionTime.
updateLastTransitionTime(&status, &oldStatus, v1.PodScheduled) updateLastTransitionTime(&status, &oldStatus, v1.PodScheduled)
if utilfeature.DefaultFeatureGate.Enabled(features.PodDisruptionConditions) { // Set DisruptionTarget.LastTransitionTime.
// Set DisruptionTarget.LastTransitionTime. updateLastTransitionTime(&status, &oldStatus, v1.DisruptionTarget)
updateLastTransitionTime(&status, &oldStatus, v1.DisruptionTarget)
}
// ensure that the start time does not change across updates. // ensure that the start time does not change across updates.
if oldStatus.StartTime != nil && !oldStatus.StartTime.IsZero() { if oldStatus.StartTime != nil && !oldStatus.StartTime.IsZero() {

View File

@ -47,10 +47,5 @@ func PodConditionByKubelet(conditionType v1.PodConditionType) bool {
// PodConditionSharedByKubelet returns if the pod condition type is shared by kubelet // PodConditionSharedByKubelet returns if the pod condition type is shared by kubelet
func PodConditionSharedByKubelet(conditionType v1.PodConditionType) bool { func PodConditionSharedByKubelet(conditionType v1.PodConditionType) bool {
if utilfeature.DefaultFeatureGate.Enabled(features.PodDisruptionConditions) { return conditionType == v1.DisruptionTarget
if conditionType == v1.DisruptionTarget {
return true
}
}
return false
} }

View File

@ -33,14 +33,12 @@ import (
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/registry/rest" "k8s.io/apiserver/pkg/registry/rest"
"k8s.io/apiserver/pkg/util/dryrun" "k8s.io/apiserver/pkg/util/dryrun"
"k8s.io/apiserver/pkg/util/feature"
policyclient "k8s.io/client-go/kubernetes/typed/policy/v1" policyclient "k8s.io/client-go/kubernetes/typed/policy/v1"
"k8s.io/client-go/util/retry" "k8s.io/client-go/util/retry"
pdbhelper "k8s.io/component-helpers/apps/poddisruptionbudget" pdbhelper "k8s.io/component-helpers/apps/poddisruptionbudget"
podutil "k8s.io/kubernetes/pkg/api/pod" podutil "k8s.io/kubernetes/pkg/api/pod"
api "k8s.io/kubernetes/pkg/apis/core" api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/apis/policy" "k8s.io/kubernetes/pkg/apis/policy"
"k8s.io/kubernetes/pkg/features"
) )
const ( const (
@ -307,7 +305,7 @@ func (r *EvictionREST) Create(ctx context.Context, name string, obj runtime.Obje
} }
func addConditionAndDeletePod(r *EvictionREST, ctx context.Context, name string, validation rest.ValidateObjectFunc, options *metav1.DeleteOptions) error { func addConditionAndDeletePod(r *EvictionREST, ctx context.Context, name string, validation rest.ValidateObjectFunc, options *metav1.DeleteOptions) error {
if !dryrun.IsDryRun(options.DryRun) && feature.DefaultFeatureGate.Enabled(features.PodDisruptionConditions) { if !dryrun.IsDryRun(options.DryRun) {
getLatestPod := func(_ context.Context, _, oldObj runtime.Object) (runtime.Object, error) { getLatestPod := func(_ context.Context, _, oldObj runtime.Object) (runtime.Object, error) {
// Throwaway the newObj. We care only about the latest pod obtained from etcd (oldObj). // Throwaway the newObj. We care only about the latest pod obtained from etcd (oldObj).
// So we can add DisruptionTarget condition in conditionAppender without conflicts. // So we can add DisruptionTarget condition in conditionAppender without conflicts.

View File

@ -253,7 +253,7 @@ func (pl *DefaultPreemption) PodEligibleToPreemptOthers(pod *v1.Pod, nominatedNo
if nodeInfo, _ := nodeInfos.Get(nomNodeName); nodeInfo != nil { if nodeInfo, _ := nodeInfos.Get(nomNodeName); nodeInfo != nil {
podPriority := corev1helpers.PodPriority(pod) podPriority := corev1helpers.PodPriority(pod)
for _, p := range nodeInfo.Pods { for _, p := range nodeInfo.Pods {
if corev1helpers.PodPriority(p.Pod) < podPriority && podTerminatingByPreemption(p.Pod, pl.fts.EnablePodDisruptionConditions) { if corev1helpers.PodPriority(p.Pod) < podPriority && podTerminatingByPreemption(p.Pod) {
// There is a terminating pod on the nominated node. // There is a terminating pod on the nominated node.
return false, "not eligible due to a terminating pod on the nominated node." return false, "not eligible due to a terminating pod on the nominated node."
} }
@ -268,17 +268,12 @@ func (pl *DefaultPreemption) OrderedScoreFuncs(ctx context.Context, nodesToVicti
return nil return nil
} }
// podTerminatingByPreemption returns the pod's terminating state if feature PodDisruptionConditions is not enabled. // podTerminatingByPreemption returns true if the pod is in the termination state caused by scheduler preemption.
// Otherwise, it additionally checks if the termination state is caused by scheduler preemption. func podTerminatingByPreemption(p *v1.Pod) bool {
func podTerminatingByPreemption(p *v1.Pod, enablePodDisruptionConditions bool) bool {
if p.DeletionTimestamp == nil { if p.DeletionTimestamp == nil {
return false return false
} }
if !enablePodDisruptionConditions {
return true
}
for _, condition := range p.Status.Conditions { for _, condition := range p.Status.Conditions {
if condition.Type == v1.DisruptionTarget { if condition.Type == v1.DisruptionTarget {
return condition.Status == v1.ConditionTrue && condition.Reason == v1.PodReasonPreemptionByScheduler return condition.Status == v1.ConditionTrue && condition.Reason == v1.PodReasonPreemptionByScheduler

View File

@ -1466,7 +1466,6 @@ func TestPodEligibleToPreemptOthers(t *testing.T) {
for _, test := range tests { for _, test := range tests {
t.Run(test.name, func(t *testing.T) { t.Run(test.name, func(t *testing.T) {
test.fts.EnablePodDisruptionConditions = true
logger, ctx := ktesting.NewTestContext(t) logger, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()

View File

@ -24,7 +24,6 @@ type Features struct {
EnableVolumeCapacityPriority bool EnableVolumeCapacityPriority bool
EnableNodeInclusionPolicyInPodTopologySpread bool EnableNodeInclusionPolicyInPodTopologySpread bool
EnableMatchLabelKeysInPodTopologySpread bool EnableMatchLabelKeysInPodTopologySpread bool
EnablePodDisruptionConditions bool
EnableInPlacePodVerticalScaling bool EnableInPlacePodVerticalScaling bool
EnableSidecarContainers bool EnableSidecarContainers bool
EnableSchedulingQueueHint bool EnableSchedulingQueueHint bool

View File

@ -50,7 +50,6 @@ func NewInTreeRegistry() runtime.Registry {
EnableVolumeCapacityPriority: feature.DefaultFeatureGate.Enabled(features.VolumeCapacityPriority), EnableVolumeCapacityPriority: feature.DefaultFeatureGate.Enabled(features.VolumeCapacityPriority),
EnableNodeInclusionPolicyInPodTopologySpread: feature.DefaultFeatureGate.Enabled(features.NodeInclusionPolicyInPodTopologySpread), EnableNodeInclusionPolicyInPodTopologySpread: feature.DefaultFeatureGate.Enabled(features.NodeInclusionPolicyInPodTopologySpread),
EnableMatchLabelKeysInPodTopologySpread: feature.DefaultFeatureGate.Enabled(features.MatchLabelKeysInPodTopologySpread), EnableMatchLabelKeysInPodTopologySpread: feature.DefaultFeatureGate.Enabled(features.MatchLabelKeysInPodTopologySpread),
EnablePodDisruptionConditions: feature.DefaultFeatureGate.Enabled(features.PodDisruptionConditions),
EnableInPlacePodVerticalScaling: feature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling), EnableInPlacePodVerticalScaling: feature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling),
EnableSidecarContainers: feature.DefaultFeatureGate.Enabled(features.SidecarContainers), EnableSidecarContainers: feature.DefaultFeatureGate.Enabled(features.SidecarContainers),
EnableSchedulingQueueHint: feature.DefaultFeatureGate.Enabled(features.SchedulerQueueingHints), EnableSchedulingQueueHint: feature.DefaultFeatureGate.Enabled(features.SchedulerQueueingHints),

View File

@ -28,14 +28,12 @@ import (
policy "k8s.io/api/policy/v1" policy "k8s.io/api/policy/v1"
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
utilerrors "k8s.io/apimachinery/pkg/util/errors" utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apiserver/pkg/util/feature"
corelisters "k8s.io/client-go/listers/core/v1" corelisters "k8s.io/client-go/listers/core/v1"
policylisters "k8s.io/client-go/listers/policy/v1" policylisters "k8s.io/client-go/listers/policy/v1"
corev1helpers "k8s.io/component-helpers/scheduling/corev1" corev1helpers "k8s.io/component-helpers/scheduling/corev1"
"k8s.io/klog/v2" "k8s.io/klog/v2"
extenderv1 "k8s.io/kube-scheduler/extender/v1" extenderv1 "k8s.io/kube-scheduler/extender/v1"
apipod "k8s.io/kubernetes/pkg/api/v1/pod" apipod "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/parallelize" "k8s.io/kubernetes/pkg/scheduler/framework/parallelize"
"k8s.io/kubernetes/pkg/scheduler/metrics" "k8s.io/kubernetes/pkg/scheduler/metrics"
@ -362,21 +360,19 @@ func (ev *Evaluator) prepareCandidate(ctx context.Context, c Candidate, pod *v1.
waitingPod.Reject(pluginName, "preempted") waitingPod.Reject(pluginName, "preempted")
logger.V(2).Info("Preemptor pod rejected a waiting pod", "preemptor", klog.KObj(pod), "waitingPod", klog.KObj(victim), "node", c.Name()) logger.V(2).Info("Preemptor pod rejected a waiting pod", "preemptor", klog.KObj(pod), "waitingPod", klog.KObj(victim), "node", c.Name())
} else { } else {
if feature.DefaultFeatureGate.Enabled(features.PodDisruptionConditions) { condition := &v1.PodCondition{
condition := &v1.PodCondition{ Type: v1.DisruptionTarget,
Type: v1.DisruptionTarget, Status: v1.ConditionTrue,
Status: v1.ConditionTrue, Reason: v1.PodReasonPreemptionByScheduler,
Reason: v1.PodReasonPreemptionByScheduler, Message: fmt.Sprintf("%s: preempting to accommodate a higher priority pod", pod.Spec.SchedulerName),
Message: fmt.Sprintf("%s: preempting to accommodate a higher priority pod", pod.Spec.SchedulerName), }
} newStatus := pod.Status.DeepCopy()
newStatus := pod.Status.DeepCopy() updated := apipod.UpdatePodCondition(newStatus, condition)
updated := apipod.UpdatePodCondition(newStatus, condition) if updated {
if updated { if err := util.PatchPodStatus(ctx, cs, victim, newStatus); err != nil {
if err := util.PatchPodStatus(ctx, cs, victim, newStatus); err != nil { logger.Error(err, "Could not add DisruptionTarget condition due to preemption", "pod", klog.KObj(victim), "preemptor", klog.KObj(pod))
logger.Error(err, "Could not add DisruptionTarget condition due to preemption", "pod", klog.KObj(victim), "preemptor", klog.KObj(pod)) errCh.SendErrorWithCancel(err, cancel)
errCh.SendErrorWithCancel(err, cancel) return
return
}
} }
} }
if err := util.DeletePod(ctx, cs, victim); err != nil { if err := util.DeletePod(ctx, cs, victim); err != nil {

View File

@ -135,13 +135,11 @@ func buildControllerRoles() ([]rbacv1.ClusterRole, []rbacv1.ClusterRoleBinding)
rbacv1helpers.NewRule("get", "list", "watch").Groups(policyGroup).Resources("poddisruptionbudgets").RuleOrDie(), rbacv1helpers.NewRule("get", "list", "watch").Groups(policyGroup).Resources("poddisruptionbudgets").RuleOrDie(),
rbacv1helpers.NewRule("get", "list", "watch").Groups(appsGroup).Resources("statefulsets").RuleOrDie(), rbacv1helpers.NewRule("get", "list", "watch").Groups(appsGroup).Resources("statefulsets").RuleOrDie(),
rbacv1helpers.NewRule("update").Groups(policyGroup).Resources("poddisruptionbudgets/status").RuleOrDie(), rbacv1helpers.NewRule("update").Groups(policyGroup).Resources("poddisruptionbudgets/status").RuleOrDie(),
rbacv1helpers.NewRule("patch", "update").Groups(legacyGroup).Resources("pods/status").RuleOrDie(),
rbacv1helpers.NewRule("get").Groups("*").Resources("*/scale").RuleOrDie(), rbacv1helpers.NewRule("get").Groups("*").Resources("*/scale").RuleOrDie(),
eventsRule(), eventsRule(),
}, },
} }
if utilfeature.DefaultFeatureGate.Enabled(features.PodDisruptionConditions) {
role.Rules = append(role.Rules, rbacv1helpers.NewRule("patch", "update").Groups(legacyGroup).Resources("pods/status").RuleOrDie())
}
return role return role
}()) }())
addControllerRole(&controllerRoles, &controllerRoleBindings, rbacv1.ClusterRole{ addControllerRole(&controllerRoles, &controllerRoleBindings, rbacv1.ClusterRole{
@ -269,13 +267,10 @@ func buildControllerRoles() ([]rbacv1.ClusterRole, []rbacv1.ClusterRoleBinding)
rbacv1helpers.NewRule("patch", "update").Groups(legacyGroup).Resources("nodes/status").RuleOrDie(), rbacv1helpers.NewRule("patch", "update").Groups(legacyGroup).Resources("nodes/status").RuleOrDie(),
// used for pod deletion // used for pod deletion
rbacv1helpers.NewRule("patch", "update").Groups(legacyGroup).Resources("pods/status").RuleOrDie(), rbacv1helpers.NewRule("patch", "update").Groups(legacyGroup).Resources("pods/status").RuleOrDie(),
rbacv1helpers.NewRule("list", "delete").Groups(legacyGroup).Resources("pods").RuleOrDie(), rbacv1helpers.NewRule("list", "get", "delete").Groups(legacyGroup).Resources("pods").RuleOrDie(),
eventsRule(), eventsRule(),
}, },
} }
if utilfeature.DefaultFeatureGate.Enabled(features.PodDisruptionConditions) {
role.Rules = append(role.Rules, rbacv1helpers.NewRule("get").Groups(legacyGroup).Resources("pods").RuleOrDie())
}
return role return role
}()) }())
addControllerRole(&controllerRoles, &controllerRoleBindings, rbacv1.ClusterRole{ addControllerRole(&controllerRoles, &controllerRoleBindings, rbacv1.ClusterRole{
@ -307,11 +302,9 @@ func buildControllerRoles() ([]rbacv1.ClusterRole, []rbacv1.ClusterRoleBinding)
Rules: []rbacv1.PolicyRule{ Rules: []rbacv1.PolicyRule{
rbacv1helpers.NewRule("list", "watch", "delete").Groups(legacyGroup).Resources("pods").RuleOrDie(), rbacv1helpers.NewRule("list", "watch", "delete").Groups(legacyGroup).Resources("pods").RuleOrDie(),
rbacv1helpers.NewRule("get", "list").Groups(legacyGroup).Resources("nodes").RuleOrDie(), rbacv1helpers.NewRule("get", "list").Groups(legacyGroup).Resources("nodes").RuleOrDie(),
rbacv1helpers.NewRule("patch").Groups(legacyGroup).Resources("pods/status").RuleOrDie(),
}, },
} }
if utilfeature.DefaultFeatureGate.Enabled(features.PodDisruptionConditions) {
role.Rules = append(role.Rules, rbacv1helpers.NewRule("patch").Groups(legacyGroup).Resources("pods/status").RuleOrDie())
}
return role return role
}()) }())
addControllerRole(&controllerRoles, &controllerRoleBindings, rbacv1.ClusterRole{ addControllerRole(&controllerRoles, &controllerRoleBindings, rbacv1.ClusterRole{

View File

@ -415,6 +415,13 @@ items:
- poddisruptionbudgets/status - poddisruptionbudgets/status
verbs: verbs:
- update - update
- apiGroups:
- ""
resources:
- pods/status
verbs:
- patch
- update
- apiGroups: - apiGroups:
- '*' - '*'
resources: resources:
@ -430,13 +437,6 @@ items:
- create - create
- patch - patch
- update - update
- apiGroups:
- ""
resources:
- pods/status
verbs:
- patch
- update
- apiVersion: rbac.authorization.k8s.io/v1 - apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole kind: ClusterRole
metadata: metadata:
@ -930,6 +930,7 @@ items:
- pods - pods
verbs: verbs:
- delete - delete
- get
- list - list
- apiGroups: - apiGroups:
- "" - ""
@ -940,12 +941,6 @@ items:
- create - create
- patch - patch
- update - update
- apiGroups:
- ""
resources:
- pods
verbs:
- get
- apiVersion: rbac.authorization.k8s.io/v1 - apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole kind: ClusterRole
metadata: metadata:

View File

@ -40,10 +40,9 @@ import (
// TestPodGcOrphanedPodsWithFinalizer tests deletion of orphaned pods // TestPodGcOrphanedPodsWithFinalizer tests deletion of orphaned pods
func TestPodGcOrphanedPodsWithFinalizer(t *testing.T) { func TestPodGcOrphanedPodsWithFinalizer(t *testing.T) {
tests := map[string]struct { tests := map[string]struct {
enableJobPodReplacementPolicy bool phase v1.PodPhase
phase v1.PodPhase wantPhase v1.PodPhase
wantPhase v1.PodPhase wantDisruptionTarget *v1.PodCondition
wantDisruptionTarget *v1.PodCondition
}{ }{
"pending pod": { "pending pod": {
phase: v1.PodPending, phase: v1.PodPending,
@ -55,17 +54,6 @@ func TestPodGcOrphanedPodsWithFinalizer(t *testing.T) {
Message: "PodGC: node no longer exists", Message: "PodGC: node no longer exists",
}, },
}, },
"pending pod; PodReplacementPolicy enabled": {
enableJobPodReplacementPolicy: true,
phase: v1.PodPending,
wantPhase: v1.PodFailed,
wantDisruptionTarget: &v1.PodCondition{
Type: v1.DisruptionTarget,
Status: v1.ConditionTrue,
Reason: "DeletionByPodGC",
Message: "PodGC: node no longer exists",
},
},
"succeeded pod": { "succeeded pod": {
phase: v1.PodSucceeded, phase: v1.PodSucceeded,
wantPhase: v1.PodSucceeded, wantPhase: v1.PodSucceeded,
@ -78,7 +66,6 @@ func TestPodGcOrphanedPodsWithFinalizer(t *testing.T) {
for name, test := range tests { for name, test := range tests {
t.Run(name, func(t *testing.T) { t.Run(name, func(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.JobPodReplacementPolicy, test.enableJobPodReplacementPolicy)
testCtx := setup(t, "podgc-orphaned") testCtx := setup(t, "podgc-orphaned")
cs := testCtx.ClientSet cs := testCtx.ClientSet