diff --git a/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption.go b/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption.go index 3d896641469..660f557bb48 100644 --- a/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption.go +++ b/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption.go @@ -264,6 +264,11 @@ func (pl *DefaultPreemption) PodEligibleToPreemptOthers(pod *v1.Pod, nominatedNo return true, "" } +// OrderedScoreFuncs returns a list of ordered score functions to select preferable node where victims will be preempted. +func (pl *DefaultPreemption) OrderedScoreFuncs(ctx context.Context, nodesToVictims map[string]*extenderv1.Victims) []func(node string) int64 { + return nil +} + // podTerminatingByPreemption returns the pod's terminating state if feature PodDisruptionConditions is not enabled. // Otherwise, it additionally checks if the termination state is caused by scheduler preemption. func podTerminatingByPreemption(p *v1.Pod, enablePodDisruptionConditions bool) bool { diff --git a/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption_test.go b/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption_test.go index 0e24a7e9d54..f67600fa5f3 100644 --- a/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption_test.go +++ b/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption_test.go @@ -1389,7 +1389,7 @@ func TestSelectBestCandidate(t *testing.T) { } offset, numCandidates := pl.GetOffsetAndNumCandidates(int32(len(nodeInfos))) candidates, _, _ := pe.DryRunPreemption(ctx, tt.pod, nodeInfos, nil, offset, numCandidates) - s := pe.SelectCandidate(logger, candidates) + s := pe.SelectCandidate(ctx, candidates) if s == nil || len(s.Name()) == 0 { return } diff --git a/pkg/scheduler/framework/preemption/preemption.go b/pkg/scheduler/framework/preemption/preemption.go index e9442e949eb..376b6337e99 100644 --- a/pkg/scheduler/framework/preemption/preemption.go +++ b/pkg/scheduler/framework/preemption/preemption.go @@ -116,6 +116,10 @@ type Interface interface { // Note that both `state` and `nodeInfo` are deep copied. SelectVictimsOnNode(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo, pdbs []*policy.PodDisruptionBudget) ([]*v1.Pod, int, *framework.Status) + // OrderedScoreFuncs returns a list of ordered score functions to select preferable node where victims will be preempted. + // The ordered score functions will be processed one by one iff we find more than one node with the highest score. + // Default score functions will be processed if nil returned here for backwards-compatibility. + OrderedScoreFuncs(ctx context.Context, nodesToVictims map[string]*extenderv1.Victims) []func(node string) int64 } type Evaluator struct { @@ -190,7 +194,7 @@ func (ev *Evaluator) Preempt(ctx context.Context, pod *v1.Pod, m framework.NodeT } // 4) Find the best candidate. - bestCandidate := ev.SelectCandidate(logger, candidates) + bestCandidate := ev.SelectCandidate(ctx, candidates) if bestCandidate == nil || len(bestCandidate.Name()) == 0 { return nil, framework.NewStatus(framework.Unschedulable, "no candidate node for preemption") } @@ -309,7 +313,9 @@ func (ev *Evaluator) callExtenders(logger klog.Logger, pod *v1.Pod, candidates [ // SelectCandidate chooses the best-fit candidate from given and return it. // NOTE: This method is exported for easier testing in default preemption. -func (ev *Evaluator) SelectCandidate(logger klog.Logger, candidates []Candidate) Candidate { +func (ev *Evaluator) SelectCandidate(ctx context.Context, candidates []Candidate) Candidate { + logger := klog.FromContext(ctx) + if len(candidates) == 0 { return nil } @@ -318,7 +324,8 @@ func (ev *Evaluator) SelectCandidate(logger klog.Logger, candidates []Candidate) } victimsMap := ev.CandidatesToVictimsMap(candidates) - candidateNode := pickOneNodeForPreemption(logger, victimsMap) + scoreFuncs := ev.OrderedScoreFuncs(ctx, victimsMap) + candidateNode := pickOneNodeForPreemption(logger, victimsMap, scoreFuncs) // Same as candidatesToVictimsMap, this logic is not applicable for out-of-tree // preemption plugins that exercise different candidates on the same nominated node. @@ -428,8 +435,10 @@ func getPodDisruptionBudgets(pdbLister policylisters.PodDisruptionBudgetLister) return nil, nil } -// pickOneNodeForPreemption chooses one node among the given nodes. It assumes -// pods in each map entry are ordered by decreasing priority. +// pickOneNodeForPreemption chooses one node among the given nodes. +// It assumes pods in each map entry are ordered by decreasing priority. +// If the scoreFuns is not empty, It picks a node based on score scoreFuns returns. +// If the scoreFuns is empty, // It picks a node based on the following criteria: // 1. A node with minimum number of PDB violations. // 2. A node with minimum highest priority victim is picked. @@ -439,7 +448,7 @@ func getPodDisruptionBudgets(pdbLister policylisters.PodDisruptionBudgetLister) // 6. If there are still ties, the first such node is picked (sort of randomly). // The 'minNodes1' and 'minNodes2' are being reused here to save the memory // allocation and garbage collection time. -func pickOneNodeForPreemption(logger klog.Logger, nodesToVictims map[string]*extenderv1.Victims) string { +func pickOneNodeForPreemption(logger klog.Logger, nodesToVictims map[string]*extenderv1.Victims, scoreFuncs []func(node string) int64) string { if len(nodesToVictims) == 0 { return "" } @@ -449,58 +458,60 @@ func pickOneNodeForPreemption(logger klog.Logger, nodesToVictims map[string]*ext allCandidates = append(allCandidates, node) } - minNumPDBViolatingScoreFunc := func(node string) int64 { - // The smaller the NumPDBViolations, the higher the score. - return -nodesToVictims[node].NumPDBViolations - } - minHighestPriorityScoreFunc := func(node string) int64 { - // highestPodPriority is the highest priority among the victims on this node. - highestPodPriority := corev1helpers.PodPriority(nodesToVictims[node].Pods[0]) - // The smaller the highestPodPriority, the higher the score. - return -int64(highestPodPriority) - } - minSumPrioritiesScoreFunc := func(node string) int64 { - var sumPriorities int64 - for _, pod := range nodesToVictims[node].Pods { - // We add MaxInt32+1 to all priorities to make all of them >= 0. This is - // needed so that a node with a few pods with negative priority is not - // picked over a node with a smaller number of pods with the same negative - // priority (and similar scenarios). - sumPriorities += int64(corev1helpers.PodPriority(pod)) + int64(math.MaxInt32+1) + if len(scoreFuncs) == 0 { + minNumPDBViolatingScoreFunc := func(node string) int64 { + // The smaller the NumPDBViolations, the higher the score. + return -nodesToVictims[node].NumPDBViolations } - // The smaller the sumPriorities, the higher the score. - return -sumPriorities - } - minNumPodsScoreFunc := func(node string) int64 { - // The smaller the length of pods, the higher the score. - return -int64(len(nodesToVictims[node].Pods)) - } - latestStartTimeScoreFunc := func(node string) int64 { - // Get earliest start time of all pods on the current node. - earliestStartTimeOnNode := util.GetEarliestPodStartTime(nodesToVictims[node]) - if earliestStartTimeOnNode == nil { - logger.Error(errors.New("earliestStartTime is nil for node"), "Should not reach here", "node", node) - return int64(math.MinInt64) + minHighestPriorityScoreFunc := func(node string) int64 { + // highestPodPriority is the highest priority among the victims on this node. + highestPodPriority := corev1helpers.PodPriority(nodesToVictims[node].Pods[0]) + // The smaller the highestPodPriority, the higher the score. + return -int64(highestPodPriority) + } + minSumPrioritiesScoreFunc := func(node string) int64 { + var sumPriorities int64 + for _, pod := range nodesToVictims[node].Pods { + // We add MaxInt32+1 to all priorities to make all of them >= 0. This is + // needed so that a node with a few pods with negative priority is not + // picked over a node with a smaller number of pods with the same negative + // priority (and similar scenarios). + sumPriorities += int64(corev1helpers.PodPriority(pod)) + int64(math.MaxInt32+1) + } + // The smaller the sumPriorities, the higher the score. + return -sumPriorities + } + minNumPodsScoreFunc := func(node string) int64 { + // The smaller the length of pods, the higher the score. + return -int64(len(nodesToVictims[node].Pods)) + } + latestStartTimeScoreFunc := func(node string) int64 { + // Get the earliest start time of all pods on the current node. + earliestStartTimeOnNode := util.GetEarliestPodStartTime(nodesToVictims[node]) + if earliestStartTimeOnNode == nil { + logger.Error(errors.New("earliestStartTime is nil for node"), "Should not reach here", "node", node) + return int64(math.MinInt64) + } + // The bigger the earliestStartTimeOnNode, the higher the score. + return earliestStartTimeOnNode.UnixNano() } - // The bigger the earliestStartTimeOnNode, the higher the score. - return earliestStartTimeOnNode.UnixNano() - } - // Each scoreFunc scores the nodes according to specific rules and keeps the name of the node - // with the highest score. If and only if the scoreFunc has more than one node with the highest - // score, we will execute the other scoreFunc in order of precedence. - scoreFuncs := []func(string) int64{ - // A node with a minimum number of PDB is preferable. - minNumPDBViolatingScoreFunc, - // A node with a minimum highest priority victim is preferable. - minHighestPriorityScoreFunc, - // A node with the smallest sum of priorities is preferable. - minSumPrioritiesScoreFunc, - // A node with the minimum number of pods is preferable. - minNumPodsScoreFunc, - // A node with the latest start time of all highest priority victims is preferable. - latestStartTimeScoreFunc, - // If there are still ties, then the first Node in the list is selected. + // Each scoreFunc scores the nodes according to specific rules and keeps the name of the node + // with the highest score. If and only if the scoreFunc has more than one node with the highest + // score, we will execute the other scoreFunc in order of precedence. + scoreFuncs = []func(string) int64{ + // A node with a minimum number of PDB is preferable. + minNumPDBViolatingScoreFunc, + // A node with a minimum highest priority victim is preferable. + minHighestPriorityScoreFunc, + // A node with the smallest sum of priorities is preferable. + minSumPrioritiesScoreFunc, + // A node with the minimum number of pods is preferable. + minNumPodsScoreFunc, + // A node with the latest start time of all highest priority victims is preferable. + latestStartTimeScoreFunc, + // If there are still ties, then the first Node in the list is selected. + } } for _, f := range scoreFuncs { diff --git a/pkg/scheduler/framework/preemption/preemption_test.go b/pkg/scheduler/framework/preemption/preemption_test.go index 632f3dd9ea1..5ddf60c1997 100644 --- a/pkg/scheduler/framework/preemption/preemption_test.go +++ b/pkg/scheduler/framework/preemption/preemption_test.go @@ -82,6 +82,47 @@ func (pl *FakePostFilterPlugin) PodEligibleToPreemptOthers(pod *v1.Pod, nominate return true, "" } +func (pl *FakePostFilterPlugin) OrderedScoreFuncs(ctx context.Context, nodesToVictims map[string]*extenderv1.Victims) []func(node string) int64 { + return nil +} + +type FakePreemptionScorePostFilterPlugin struct{} + +func (pl *FakePreemptionScorePostFilterPlugin) SelectVictimsOnNode( + ctx context.Context, state *framework.CycleState, pod *v1.Pod, + nodeInfo *framework.NodeInfo, pdbs []*policy.PodDisruptionBudget) (victims []*v1.Pod, numViolatingVictim int, status *framework.Status) { + return append(victims, nodeInfo.Pods[0].Pod), 1, nil +} + +func (pl *FakePreemptionScorePostFilterPlugin) GetOffsetAndNumCandidates(nodes int32) (int32, int32) { + return 0, nodes +} + +func (pl *FakePreemptionScorePostFilterPlugin) CandidatesToVictimsMap(candidates []Candidate) map[string]*extenderv1.Victims { + m := make(map[string]*extenderv1.Victims, len(candidates)) + for _, c := range candidates { + m[c.Name()] = c.Victims() + } + return m +} + +func (pl *FakePreemptionScorePostFilterPlugin) PodEligibleToPreemptOthers(pod *v1.Pod, nominatedNodeStatus *framework.Status) (bool, string) { + return true, "" +} + +func (pl *FakePreemptionScorePostFilterPlugin) OrderedScoreFuncs(ctx context.Context, nodesToVictims map[string]*extenderv1.Victims) []func(node string) int64 { + return []func(string) int64{ + func(node string) int64 { + var sumContainers int64 + for _, pod := range nodesToVictims[node].Pods { + sumContainers += int64(len(pod.Spec.Containers) + len(pod.Spec.InitContainers)) + } + // The smaller the sumContainers, the higher the score. + return -sumContainers + }, + } +} + func TestNodesWherePreemptionMightHelp(t *testing.T) { // Prepare 4 nodes names. nodeNames := []string{"node1", "node2", "node3", "node4"} @@ -337,3 +378,100 @@ func TestDryRunPreemption(t *testing.T) { }) } } + +func TestSelectCandidate(t *testing.T) { + tests := []struct { + name string + nodeNames []string + pod *v1.Pod + testPods []*v1.Pod + expected string + }{ + { + name: "pod has different number of containers on each node", + nodeNames: []string{"node1", "node2", "node3"}, + pod: st.MakePod().Name("p").UID("p").Priority(highPriority).Req(veryLargeRes).Obj(), + testPods: []*v1.Pod{ + st.MakePod().Name("p1.1").UID("p1.1").Node("node1").Priority(midPriority).Containers([]v1.Container{ + st.MakeContainer().Name("container1").Obj(), + st.MakeContainer().Name("container2").Obj(), + }).Obj(), + st.MakePod().Name("p2.1").UID("p2.1").Node("node2").Priority(midPriority).Containers([]v1.Container{ + st.MakeContainer().Name("container1").Obj(), + }).Obj(), + st.MakePod().Name("p3.1").UID("p3.1").Node("node3").Priority(midPriority).Containers([]v1.Container{ + st.MakeContainer().Name("container1").Obj(), + st.MakeContainer().Name("container2").Obj(), + st.MakeContainer().Name("container3").Obj(), + }).Obj(), + }, + expected: "node2", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) + nodes := make([]*v1.Node, len(tt.nodeNames)) + for i, nodeName := range tt.nodeNames { + nodes[i] = st.MakeNode().Name(nodeName).Capacity(veryLargeRes).Obj() + } + registeredPlugins := append([]tf.RegisterPluginFunc{ + tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New)}, + tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), + ) + var objs []runtime.Object + objs = append(objs, tt.pod) + for _, pod := range tt.testPods { + objs = append(objs, pod) + } + informerFactory := informers.NewSharedInformerFactory(clientsetfake.NewSimpleClientset(objs...), 0) + snapshot := internalcache.NewSnapshot(tt.testPods, nodes) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + fwk, err := tf.NewFramework( + ctx, + registeredPlugins, + "", + frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())), + frameworkruntime.WithSnapshotSharedLister(snapshot), + frameworkruntime.WithLogger(logger), + ) + if err != nil { + t.Fatal(err) + } + + state := framework.NewCycleState() + // Some tests rely on PreFilter plugin to compute its CycleState. + if _, status := fwk.RunPreFilterPlugins(ctx, state, tt.pod); !status.IsSuccess() { + t.Errorf("Unexpected PreFilter Status: %v", status) + } + nodeInfos, err := snapshot.NodeInfos().List() + if err != nil { + t.Fatal(err) + } + + fakePreemptionScorePostFilterPlugin := &FakePreemptionScorePostFilterPlugin{} + + for _, pod := range tt.testPods { + state := framework.NewCycleState() + pe := Evaluator{ + PluginName: "FakePreemptionScorePostFilter", + Handler: fwk, + Interface: fakePreemptionScorePostFilterPlugin, + State: state, + } + candidates, _, _ := pe.DryRunPreemption(context.Background(), pod, nodeInfos, nil, 0, int32(len(nodeInfos))) + s := pe.SelectCandidate(ctx, candidates) + if s == nil || len(s.Name()) == 0 { + t.Errorf("expect any node in %v, but no candidate selected", tt.expected) + return + } + if diff := cmp.Diff(tt.expected, s.Name()); diff != "" { + t.Errorf("expect any node in %v, but got %v", tt.expected, s.Name()) + } + } + }) + } +}