From 91742e2393d551d3bdc96daf182db3af865c5dc1 Mon Sep 17 00:00:00 2001 From: Wei Huang Date: Tue, 20 Dec 2022 16:31:21 -0800 Subject: [PATCH 1/2] Enhanced logic to identify eligible preemption node --- .../defaultpreemption/default_preemption.go | 23 +++++++++++- .../default_preemption_test.go | 37 ++++++++++++++++++- .../framework/plugins/feature/feature.go | 1 + pkg/scheduler/framework/plugins/registry.go | 1 + .../framework/preemption/preemption.go | 2 +- staging/src/k8s.io/api/core/v1/types.go | 4 ++ 6 files changed, 65 insertions(+), 3 deletions(-) diff --git a/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption.go b/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption.go index c1c5165b5b2..20bef23a47e 100644 --- a/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption.go +++ b/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption.go @@ -49,6 +49,7 @@ const Name = names.DefaultPreemption // DefaultPreemption is a PostFilter plugin implements the preemption logic. type DefaultPreemption struct { fh framework.Handle + fts feature.Features args config.DefaultPreemptionArgs podLister corelisters.PodLister pdbLister policylisters.PodDisruptionBudgetLister @@ -72,6 +73,7 @@ func New(dpArgs runtime.Object, fh framework.Handle, fts feature.Features) (fram } pl := DefaultPreemption{ fh: fh, + fts: fts, args: *args, podLister: fh.SharedInformerFactory().Core().V1().Pods().Lister(), pdbLister: getPDBLister(fh.SharedInformerFactory()), @@ -250,7 +252,7 @@ func (pl *DefaultPreemption) PodEligibleToPreemptOthers(pod *v1.Pod, nominatedNo if nodeInfo, _ := nodeInfos.Get(nomNodeName); nodeInfo != nil { podPriority := corev1helpers.PodPriority(pod) for _, p := range nodeInfo.Pods { - if p.Pod.DeletionTimestamp != nil && corev1helpers.PodPriority(p.Pod) < podPriority { + if corev1helpers.PodPriority(p.Pod) < podPriority && podTerminatingByPreemption(p.Pod, pl.fts.EnablePodDisruptionConditions) { // There is a terminating pod on the nominated node. return false, "not eligible due to a terminating pod on the nominated node." } @@ -260,6 +262,25 @@ func (pl *DefaultPreemption) PodEligibleToPreemptOthers(pod *v1.Pod, nominatedNo return true, "" } +// podTerminatingByPreemption returns the pod's terminating state if feature PodDisruptionConditions is not enabled. +// Otherwise, it additionally checks if the termination state is caused by scheduler preemption. +func podTerminatingByPreemption(p *v1.Pod, enablePodDisruptionConditions bool) bool { + if p.DeletionTimestamp == nil { + return false + } + + if !enablePodDisruptionConditions { + return true + } + + for _, condition := range p.Status.Conditions { + if condition.Type == v1.DisruptionTarget { + return condition.Status == v1.ConditionTrue && condition.Reason == v1.PodReasonPreemptionByKubeScheduler + } + } + return false +} + // filterPodsWithPDBViolation groups the given "pods" into two groups of "violatingPods" // and "nonViolatingPods" based on whether their PDBs will be violated if they are // preempted. diff --git a/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption_test.go b/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption_test.go index b67341a3cc5..bc059a9b89a 100644 --- a/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption_test.go +++ b/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption_test.go @@ -1401,6 +1401,7 @@ func TestSelectBestCandidate(t *testing.T) { func TestPodEligibleToPreemptOthers(t *testing.T) { tests := []struct { name string + fts feature.Features pod *v1.Pod pods []*v1.Pod nodes []string @@ -1439,6 +1440,40 @@ func TestPodEligibleToPreemptOthers(t *testing.T) { nominatedNodeStatus: nil, expected: false, }, + { + name: "victim Pods terminating, feature PodDisruptionConditions is enabled", + fts: feature.Features{EnablePodDisruptionConditions: true}, + pod: st.MakePod().Name("p_with_nominated_node").UID("p").Priority(highPriority).NominatedNodeName("node1").Obj(), + pods: []*v1.Pod{st.MakePod().Name("p1").UID("p1").Priority(lowPriority).Node("node1").Terminating(). + Condition(v1.DisruptionTarget, v1.ConditionTrue, v1.PodReasonPreemptionByKubeScheduler).Obj()}, + nodes: []string{"node1"}, + expected: false, + }, + { + name: "non-victim Pods terminating, feature PodDisruptionConditions is enabled", + fts: feature.Features{EnablePodDisruptionConditions: true}, + pod: st.MakePod().Name("p_with_nominated_node").UID("p").Priority(highPriority).NominatedNodeName("node1").Obj(), + pods: []*v1.Pod{st.MakePod().Name("p1").UID("p1").Priority(lowPriority).Node("node1").Terminating().Obj()}, + nodes: []string{"node1"}, + expected: true, + }, + { + name: "victim Pods terminating, feature PodDisruptionConditions is disabled", + fts: feature.Features{EnablePodDisruptionConditions: false}, + pod: st.MakePod().Name("p_with_nominated_node").UID("p").Priority(highPriority).NominatedNodeName("node1").Obj(), + pods: []*v1.Pod{st.MakePod().Name("p1").UID("p1").Priority(lowPriority).Node("node1").Terminating(). + Condition(v1.DisruptionTarget, v1.ConditionTrue, v1.PodReasonPreemptionByKubeScheduler).Obj()}, + nodes: []string{"node1"}, + expected: false, + }, + { + name: "non-victim Pods terminating, feature PodDisruptionConditions is disabled", + fts: feature.Features{EnablePodDisruptionConditions: false}, + pod: st.MakePod().Name("p_with_nominated_node").UID("p").Priority(highPriority).NominatedNodeName("node1").Obj(), + pods: []*v1.Pod{st.MakePod().Name("p1").UID("p1").Priority(lowPriority).Node("node1").Terminating().Obj()}, + nodes: []string{"node1"}, + expected: false, + }, } for _, test := range tests { @@ -1459,7 +1494,7 @@ func TestPodEligibleToPreemptOthers(t *testing.T) { if err != nil { t.Fatal(err) } - pl := DefaultPreemption{fh: f} + pl := DefaultPreemption{fh: f, fts: test.fts} if got, _ := pl.PodEligibleToPreemptOthers(test.pod, test.nominatedNodeStatus); got != test.expected { t.Errorf("expected %t, got %t for pod: %s", test.expected, got, test.pod.Name) } diff --git a/pkg/scheduler/framework/plugins/feature/feature.go b/pkg/scheduler/framework/plugins/feature/feature.go index e0790a681ca..04771b0d0f4 100644 --- a/pkg/scheduler/framework/plugins/feature/feature.go +++ b/pkg/scheduler/framework/plugins/feature/feature.go @@ -27,4 +27,5 @@ type Features struct { EnableNodeInclusionPolicyInPodTopologySpread bool EnableMatchLabelKeysInPodTopologySpread bool EnablePodSchedulingReadiness bool + EnablePodDisruptionConditions bool } diff --git a/pkg/scheduler/framework/plugins/registry.go b/pkg/scheduler/framework/plugins/registry.go index 5769aee6e12..a3a9a0f1387 100644 --- a/pkg/scheduler/framework/plugins/registry.go +++ b/pkg/scheduler/framework/plugins/registry.go @@ -54,6 +54,7 @@ func NewInTreeRegistry() runtime.Registry { EnableNodeInclusionPolicyInPodTopologySpread: feature.DefaultFeatureGate.Enabled(features.NodeInclusionPolicyInPodTopologySpread), EnableMatchLabelKeysInPodTopologySpread: feature.DefaultFeatureGate.Enabled(features.MatchLabelKeysInPodTopologySpread), EnablePodSchedulingReadiness: feature.DefaultFeatureGate.Enabled(features.PodSchedulingReadiness), + EnablePodDisruptionConditions: feature.DefaultFeatureGate.Enabled(features.PodDisruptionConditions), } registry := runtime.Registry{ diff --git a/pkg/scheduler/framework/preemption/preemption.go b/pkg/scheduler/framework/preemption/preemption.go index 32e1e1ba743..ee30605def2 100644 --- a/pkg/scheduler/framework/preemption/preemption.go +++ b/pkg/scheduler/framework/preemption/preemption.go @@ -361,7 +361,7 @@ func (ev *Evaluator) prepareCandidate(ctx context.Context, c Candidate, pod *v1. victimPodApply.Status.WithConditions(corev1apply.PodCondition(). WithType(v1.DisruptionTarget). WithStatus(v1.ConditionTrue). - WithReason("PreemptionByKubeScheduler"). + WithReason(v1.PodReasonPreemptionByKubeScheduler). WithMessage(fmt.Sprintf("Kube-scheduler: preempting to accommodate a higher priority pod: %s", klog.KObj(pod))). WithLastTransitionTime(metav1.Now()), ) diff --git a/staging/src/k8s.io/api/core/v1/types.go b/staging/src/k8s.io/api/core/v1/types.go index 43b1a5d3977..46e2b6d12ea 100644 --- a/staging/src/k8s.io/api/core/v1/types.go +++ b/staging/src/k8s.io/api/core/v1/types.go @@ -2721,6 +2721,10 @@ const ( // TerminationByKubelet reason in DisruptionTarget pod condition indicates that the termination // is initiated by kubelet PodReasonTerminationByKubelet = "TerminationByKubelet" + + // PodReasonPreemptionByKubeScheduler reason in DisruptionTarget pod condition indicates that the + // disruption was initiated by scheduler's preemption. + PodReasonPreemptionByKubeScheduler = "PreemptionByKubeScheduler" ) // PodCondition contains details for the current condition of this pod. From 9b64025f36630af9dae1600e231e6b429ae6bda6 Mon Sep 17 00:00:00 2001 From: Wei Huang Date: Thu, 5 Jan 2023 10:31:48 -0800 Subject: [PATCH 2/2] rename 'PreemptionByKubeScheduler' to 'PreemptionByScheduler' --- .../framework/plugins/defaultpreemption/default_preemption.go | 2 +- .../plugins/defaultpreemption/default_preemption_test.go | 4 ++-- pkg/scheduler/framework/preemption/preemption.go | 2 +- staging/src/k8s.io/api/core/v1/types.go | 4 ++-- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption.go b/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption.go index 20bef23a47e..711b54bf37d 100644 --- a/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption.go +++ b/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption.go @@ -275,7 +275,7 @@ func podTerminatingByPreemption(p *v1.Pod, enablePodDisruptionConditions bool) b for _, condition := range p.Status.Conditions { if condition.Type == v1.DisruptionTarget { - return condition.Status == v1.ConditionTrue && condition.Reason == v1.PodReasonPreemptionByKubeScheduler + return condition.Status == v1.ConditionTrue && condition.Reason == v1.PodReasonPreemptionByScheduler } } return false diff --git a/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption_test.go b/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption_test.go index bc059a9b89a..97916380a62 100644 --- a/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption_test.go +++ b/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption_test.go @@ -1445,7 +1445,7 @@ func TestPodEligibleToPreemptOthers(t *testing.T) { fts: feature.Features{EnablePodDisruptionConditions: true}, pod: st.MakePod().Name("p_with_nominated_node").UID("p").Priority(highPriority).NominatedNodeName("node1").Obj(), pods: []*v1.Pod{st.MakePod().Name("p1").UID("p1").Priority(lowPriority).Node("node1").Terminating(). - Condition(v1.DisruptionTarget, v1.ConditionTrue, v1.PodReasonPreemptionByKubeScheduler).Obj()}, + Condition(v1.DisruptionTarget, v1.ConditionTrue, v1.PodReasonPreemptionByScheduler).Obj()}, nodes: []string{"node1"}, expected: false, }, @@ -1462,7 +1462,7 @@ func TestPodEligibleToPreemptOthers(t *testing.T) { fts: feature.Features{EnablePodDisruptionConditions: false}, pod: st.MakePod().Name("p_with_nominated_node").UID("p").Priority(highPriority).NominatedNodeName("node1").Obj(), pods: []*v1.Pod{st.MakePod().Name("p1").UID("p1").Priority(lowPriority).Node("node1").Terminating(). - Condition(v1.DisruptionTarget, v1.ConditionTrue, v1.PodReasonPreemptionByKubeScheduler).Obj()}, + Condition(v1.DisruptionTarget, v1.ConditionTrue, v1.PodReasonPreemptionByScheduler).Obj()}, nodes: []string{"node1"}, expected: false, }, diff --git a/pkg/scheduler/framework/preemption/preemption.go b/pkg/scheduler/framework/preemption/preemption.go index ee30605def2..58706af76d4 100644 --- a/pkg/scheduler/framework/preemption/preemption.go +++ b/pkg/scheduler/framework/preemption/preemption.go @@ -361,7 +361,7 @@ func (ev *Evaluator) prepareCandidate(ctx context.Context, c Candidate, pod *v1. victimPodApply.Status.WithConditions(corev1apply.PodCondition(). WithType(v1.DisruptionTarget). WithStatus(v1.ConditionTrue). - WithReason(v1.PodReasonPreemptionByKubeScheduler). + WithReason(v1.PodReasonPreemptionByScheduler). WithMessage(fmt.Sprintf("Kube-scheduler: preempting to accommodate a higher priority pod: %s", klog.KObj(pod))). WithLastTransitionTime(metav1.Now()), ) diff --git a/staging/src/k8s.io/api/core/v1/types.go b/staging/src/k8s.io/api/core/v1/types.go index 46e2b6d12ea..341560b2c84 100644 --- a/staging/src/k8s.io/api/core/v1/types.go +++ b/staging/src/k8s.io/api/core/v1/types.go @@ -2722,9 +2722,9 @@ const ( // is initiated by kubelet PodReasonTerminationByKubelet = "TerminationByKubelet" - // PodReasonPreemptionByKubeScheduler reason in DisruptionTarget pod condition indicates that the + // PodReasonPreemptionByScheduler reason in DisruptionTarget pod condition indicates that the // disruption was initiated by scheduler's preemption. - PodReasonPreemptionByKubeScheduler = "PreemptionByKubeScheduler" + PodReasonPreemptionByScheduler = "PreemptionByScheduler" ) // PodCondition contains details for the current condition of this pod.