From 9fd15f1fa3eb4c94a0675a6519f2eb8727ea97dd Mon Sep 17 00:00:00 2001 From: Kensei Nakada Date: Sun, 30 Oct 2022 23:17:27 +0000 Subject: [PATCH] use TotalScore summarized in NodePluginScores --- pkg/scheduler/schedule_one.go | 75 ++++++----- pkg/scheduler/schedule_one_test.go | 203 ++++++++++++++++++++++++++--- 2 files changed, 230 insertions(+), 48 deletions(-) diff --git a/pkg/scheduler/schedule_one.go b/pkg/scheduler/schedule_one.go index 4a45f5b4adb..11705f69b63 100644 --- a/pkg/scheduler/schedule_one.go +++ b/pkg/scheduler/schedule_one.go @@ -642,15 +642,15 @@ func prioritizeNodes( state *framework.CycleState, pod *v1.Pod, nodes []*v1.Node, -) (framework.NodeScoreList, error) { +) ([]framework.NodePluginScores, error) { // If no priority configs are provided, then all nodes will have a score of one. // This is required to generate the priority list in the required format if len(extenders) == 0 && !fwk.HasScorePlugins() { - result := make(framework.NodeScoreList, 0, len(nodes)) + result := make([]framework.NodePluginScores, 0, len(nodes)) for i := range nodes { - result = append(result, framework.NodeScore{ - Name: nodes[i].Name, - Score: 1, + result = append(result, framework.NodePluginScores{ + Name: nodes[i].Name, + TotalScore: 1, }) } return result, nil @@ -678,16 +678,12 @@ func prioritizeNodes( } } - // Summarize all scores. - result := make(framework.NodeScoreList, len(nodes)) - for i, pluginScores := range nodesScores { - result[i] = framework.NodeScore{Name: nodes[i].Name, Score: pluginScores.TotalScore} - } - if len(extenders) != 0 && nodes != nil { + // allNodeExtendersScores has all extenders scores for all nodes. + // It is keyed with node name. + allNodeExtendersScores := make(map[string]*framework.NodePluginScores, len(nodes)) var mu sync.Mutex var wg sync.WaitGroup - combinedScores := make(map[string]int64, len(nodes)) for i := range extenders { if !extenders[i].IsInterested(pod) { continue @@ -708,48 +704,65 @@ func prioritizeNodes( return } mu.Lock() + defer mu.Unlock() for i := range *prioritizedList { - host, score := (*prioritizedList)[i].Host, (*prioritizedList)[i].Score + nodename := (*prioritizedList)[i].Host + score := (*prioritizedList)[i].Score if klogV.Enabled() { - klogV.InfoS("Extender scored node for pod", "pod", klog.KObj(pod), "extender", extenders[extIndex].Name(), "node", host, "score", score) + klogV.InfoS("Extender scored node for pod", "pod", klog.KObj(pod), "extender", extenders[extIndex].Name(), "node", nodename, "score", score) } - combinedScores[host] += score * weight + + // MaxExtenderPriority may diverge from the max priority used in the scheduler and defined by MaxNodeScore, + // therefore we need to scale the score returned by extenders to the score range used by the scheduler. + finalscore := score * weight * (framework.MaxNodeScore / extenderv1.MaxExtenderPriority) + + if allNodeExtendersScores[nodename] == nil { + allNodeExtendersScores[nodename] = &framework.NodePluginScores{ + Name: nodename, + Scores: make([]framework.PluginScore, 0, len(extenders)), + } + } + allNodeExtendersScores[nodename].Scores = append(allNodeExtendersScores[nodename].Scores, framework.PluginScore{ + Name: extenders[extIndex].Name(), + Score: finalscore, + }) + allNodeExtendersScores[nodename].TotalScore += finalscore } - mu.Unlock() }(i) } // wait for all go routines to finish wg.Wait() - for i := range result { - // MaxExtenderPriority may diverge from the max priority used in the scheduler and defined by MaxNodeScore, - // therefore we need to scale the score returned by extenders to the score range used by the scheduler. - result[i].Score += combinedScores[result[i].Name] * (framework.MaxNodeScore / extenderv1.MaxExtenderPriority) + for i := range nodesScores { + if score, ok := allNodeExtendersScores[nodes[i].Name]; ok { + nodesScores[i].Scores = append(nodesScores[i].Scores, score.Scores...) + nodesScores[i].TotalScore += score.TotalScore + } } } if klogV.Enabled() { - for i := range result { - klogV.InfoS("Calculated node's final score for pod", "pod", klog.KObj(pod), "node", result[i].Name, "score", result[i].Score) + for i := range nodesScores { + klogV.InfoS("Calculated node's final score for pod", "pod", klog.KObj(pod), "node", nodesScores[i].Name, "score", nodesScores[i].TotalScore) } } - return result, nil + return nodesScores, nil } // selectHost takes a prioritized list of nodes and then picks one // in a reservoir sampling manner from the nodes that had the highest score. -func selectHost(nodeScoreList framework.NodeScoreList) (string, error) { - if len(nodeScoreList) == 0 { +func selectHost(nodeScores []framework.NodePluginScores) (string, error) { + if len(nodeScores) == 0 { return "", fmt.Errorf("empty priorityList") } - maxScore := nodeScoreList[0].Score - selected := nodeScoreList[0].Name + maxScore := nodeScores[0].TotalScore + selected := nodeScores[0].Name cntOfMaxScore := 1 - for _, ns := range nodeScoreList[1:] { - if ns.Score > maxScore { - maxScore = ns.Score + for _, ns := range nodeScores[1:] { + if ns.TotalScore > maxScore { + maxScore = ns.TotalScore selected = ns.Name cntOfMaxScore = 1 - } else if ns.Score == maxScore { + } else if ns.TotalScore == maxScore { cntOfMaxScore++ if rand.Intn(cntOfMaxScore) == 0 { // Replace the candidate with probability of 1/cntOfMaxScore diff --git a/pkg/scheduler/schedule_one_test.go b/pkg/scheduler/schedule_one_test.go index 3a28a60676f..1e7eefdbebe 100644 --- a/pkg/scheduler/schedule_one_test.go +++ b/pkg/scheduler/schedule_one_test.go @@ -23,6 +23,7 @@ import ( "math" "reflect" "regexp" + "sort" "strconv" "sync" "testing" @@ -1261,45 +1262,45 @@ func TestUpdatePod(t *testing.T) { func TestSelectHost(t *testing.T) { tests := []struct { name string - list framework.NodeScoreList + list []framework.NodePluginScores possibleHosts sets.String expectsErr bool }{ { name: "unique properly ordered scores", - list: []framework.NodeScore{ - {Name: "node1.1", Score: 1}, - {Name: "node2.1", Score: 2}, + list: []framework.NodePluginScores{ + {Name: "node1.1", TotalScore: 1}, + {Name: "node2.1", TotalScore: 2}, }, possibleHosts: sets.NewString("node2.1"), expectsErr: false, }, { name: "equal scores", - list: []framework.NodeScore{ - {Name: "node1.1", Score: 1}, - {Name: "node1.2", Score: 2}, - {Name: "node1.3", Score: 2}, - {Name: "node2.1", Score: 2}, + list: []framework.NodePluginScores{ + {Name: "node1.1", TotalScore: 1}, + {Name: "node1.2", TotalScore: 2}, + {Name: "node1.3", TotalScore: 2}, + {Name: "node2.1", TotalScore: 2}, }, possibleHosts: sets.NewString("node1.2", "node1.3", "node2.1"), expectsErr: false, }, { name: "out of order scores", - list: []framework.NodeScore{ - {Name: "node1.1", Score: 3}, - {Name: "node1.2", Score: 3}, - {Name: "node2.1", Score: 2}, - {Name: "node3.1", Score: 1}, - {Name: "node1.3", Score: 3}, + list: []framework.NodePluginScores{ + {Name: "node1.1", TotalScore: 3}, + {Name: "node1.2", TotalScore: 3}, + {Name: "node2.1", TotalScore: 2}, + {Name: "node3.1", TotalScore: 1}, + {Name: "node1.3", TotalScore: 3}, }, possibleHosts: sets.NewString("node1.1", "node1.2", "node1.3"), expectsErr: false, }, { name: "empty priority list", - list: []framework.NodeScore{}, + list: []framework.NodePluginScores{}, possibleHosts: sets.NewString(), expectsErr: true, }, @@ -2334,7 +2335,7 @@ func TestZeroRequest(t *testing.T) { t.Errorf("unexpected error: %v", err) } for _, hp := range list { - if hp.Score != test.expectedScore { + if hp.TotalScore != test.expectedScore { t.Errorf("expected %d for all priorities, got list %#v", test.expectedScore, list) } } @@ -2342,6 +2343,174 @@ func TestZeroRequest(t *testing.T) { } } +func Test_prioritizeNodes(t *testing.T) { + tests := []struct { + name string + pod *v1.Pod + pods []*v1.Pod + nodes []*v1.Node + pluginRegistrations []st.RegisterPluginFunc + extenders []st.FakeExtender + want []framework.NodePluginScores + }{ + { + name: "the score from all plugins should be recorded in PluginToNodeScores", + pod: &v1.Pod{}, + nodes: []*v1.Node{makeNode("node1", 1000, schedutil.DefaultMemoryRequest*10), makeNode("node2", 1000, schedutil.DefaultMemoryRequest*10)}, + pluginRegistrations: []st.RegisterPluginFunc{ + st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), + st.RegisterScorePlugin(noderesources.BalancedAllocationName, frameworkruntime.FactoryAdapter(feature.Features{}, noderesources.NewBalancedAllocation), 1), + st.RegisterScorePlugin("Node2Prioritizer", st.NewNode2PrioritizerPlugin(), 1), + st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), + }, + extenders: nil, + want: []framework.NodePluginScores{ + { + Name: "node1", + Scores: []framework.PluginScore{ + { + Name: "Node2Prioritizer", + Score: 10, + }, + { + Name: "NodeResourcesBalancedAllocation", + Score: 100, + }, + }, + TotalScore: 110, + }, + { + Name: "node2", + Scores: []framework.PluginScore{ + { + Name: "Node2Prioritizer", + Score: 100, + }, + { + Name: "NodeResourcesBalancedAllocation", + Score: 100, + }, + }, + TotalScore: 200, + }, + }, + }, + { + name: "the score from extender should also be recorded in PluginToNodeScores with plugin scores", + pod: &v1.Pod{}, + nodes: []*v1.Node{makeNode("node1", 1000, schedutil.DefaultMemoryRequest*10), makeNode("node2", 1000, schedutil.DefaultMemoryRequest*10)}, + pluginRegistrations: []st.RegisterPluginFunc{ + st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), + st.RegisterScorePlugin(noderesources.BalancedAllocationName, frameworkruntime.FactoryAdapter(feature.Features{}, noderesources.NewBalancedAllocation), 1), + st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), + }, + extenders: []st.FakeExtender{ + { + ExtenderName: "FakeExtender1", + Weight: 1, + Prioritizers: []st.PriorityConfig{ + { + Weight: 3, + Function: st.Node1PrioritizerExtender, + }, + }, + }, + { + ExtenderName: "FakeExtender2", + Weight: 1, + Prioritizers: []st.PriorityConfig{ + { + Weight: 2, + Function: st.Node2PrioritizerExtender, + }, + }, + }, + }, + want: []framework.NodePluginScores{ + { + Name: "node1", + Scores: []framework.PluginScore{ + + { + Name: "FakeExtender1", + Score: 300, + }, + { + Name: "FakeExtender2", + Score: 20, + }, + { + Name: "NodeResourcesBalancedAllocation", + Score: 100, + }, + }, + TotalScore: 420, + }, + { + Name: "node2", + Scores: []framework.PluginScore{ + { + Name: "FakeExtender1", + Score: 30, + }, + { + Name: "FakeExtender2", + Score: 200, + }, + { + Name: "NodeResourcesBalancedAllocation", + Score: 100, + }, + }, + TotalScore: 330, + }, + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + client := clientsetfake.NewSimpleClientset() + informerFactory := informers.NewSharedInformerFactory(client, 0) + + snapshot := internalcache.NewSnapshot(test.pods, test.nodes) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + fwk, err := st.NewFramework( + test.pluginRegistrations, "", + ctx.Done(), + frameworkruntime.WithInformerFactory(informerFactory), + frameworkruntime.WithSnapshotSharedLister(snapshot), + frameworkruntime.WithClientSet(client), + frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())), + ) + if err != nil { + t.Fatalf("error creating framework: %+v", err) + } + + state := framework.NewCycleState() + fwk.RunPreScorePlugins(ctx, state, test.pod, test.nodes) + var extenders []framework.Extender + for ii := range test.extenders { + extenders = append(extenders, &test.extenders[ii]) + } + nodesscores, err := prioritizeNodes(ctx, extenders, fwk, state, test.pod, test.nodes) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + for i := range nodesscores { + sort.Slice(nodesscores[i].Scores, func(j, k int) bool { + return nodesscores[i].Scores[j].Name < nodesscores[i].Scores[k].Name + }) + } + + if diff := cmp.Diff(test.want, nodesscores); diff != "" { + t.Errorf("returned nodes scores (-want,+got):\n%s", diff) + } + }) + } +} + var lowPriority, midPriority, highPriority = int32(0), int32(100), int32(1000) func TestNumFeasibleNodesToFind(t *testing.T) {