From 0535e7422414601f047ebc52bd0d3992a5e94af2 Mon Sep 17 00:00:00 2001 From: Kensei Nakada Date: Sun, 25 Dec 2022 03:51:42 +0000 Subject: [PATCH] feature(schedule_one): use heap to find the highest score node --- pkg/scheduler/schedule_one.go | 85 ++++++++++++++++--- pkg/scheduler/schedule_one_test.go | 132 ++++++++++++++++++++++------- 2 files changed, 171 insertions(+), 46 deletions(-) diff --git a/pkg/scheduler/schedule_one.go b/pkg/scheduler/schedule_one.go index eabd07d4c96..b38af7f2ef1 100644 --- a/pkg/scheduler/schedule_one.go +++ b/pkg/scheduler/schedule_one.go @@ -17,7 +17,9 @@ limitations under the License. package scheduler import ( + "container/heap" "context" + "errors" "fmt" "math/rand" "strconv" @@ -56,6 +58,9 @@ const ( // to ensure that a certain minimum of nodes are checked for feasibility. // This in turn helps ensure a minimum level of spreading. minFeasibleNodesPercentageToFind = 5 + // numberOfHighestScoredNodesToReport is the number of node scores + // to be included in ScheduleResult. + numberOfHighestScoredNodesToReport = 3 ) // scheduleOne does the entire scheduling workflow for a single pod. It is serialized on the scheduling algorithm's host fitting. @@ -372,7 +377,7 @@ func (sched *Scheduler) schedulePod(ctx context.Context, fwk framework.Framework return result, err } - host, err := selectHost(priorityList) + host, _, err := selectHost(priorityList, numberOfHighestScoredNodesToReport) trace.Step("Prioritizing done") return ScheduleResult{ @@ -744,29 +749,81 @@ func prioritizeNodes( return nodesScores, nil } +var errEmptyPriorityList = errors.New("empty priorityList") + // selectHost takes a prioritized list of nodes and then picks one // in a reservoir sampling manner from the nodes that had the highest score. -func selectHost(nodeScores []framework.NodePluginScores) (string, error) { - if len(nodeScores) == 0 { - return "", fmt.Errorf("empty priorityList") +// It also returns the top {count} Nodes, +// and the top of the list will be always the selected host. +func selectHost(nodeScoreList []framework.NodePluginScores, count int) (string, []framework.NodePluginScores, error) { + if len(nodeScoreList) == 0 { + return "", nil, errEmptyPriorityList } - maxScore := nodeScores[0].TotalScore - selected := nodeScores[0].Name + + var h nodeScoreHeap = nodeScoreList + heap.Init(&h) cntOfMaxScore := 1 - for _, ns := range nodeScores[1:] { - if ns.TotalScore > maxScore { - maxScore = ns.TotalScore - selected = ns.Name - cntOfMaxScore = 1 - } else if ns.TotalScore == maxScore { + selectedIndex := 0 + // The top of the heap is the NodeScoreResult with the highest score. + sortedNodeScoreList := make([]framework.NodePluginScores, 0, count) + sortedNodeScoreList = append(sortedNodeScoreList, heap.Pop(&h).(framework.NodePluginScores)) + + // This for-loop will continue until all Nodes with the highest scores get checked for a reservoir sampling, + // and sortedNodeScoreList gets (count - 1) elements. + for ns := heap.Pop(&h).(framework.NodePluginScores); ; ns = heap.Pop(&h).(framework.NodePluginScores) { + if ns.TotalScore != sortedNodeScoreList[0].TotalScore && len(sortedNodeScoreList) == count { + break + } + + if ns.TotalScore == sortedNodeScoreList[0].TotalScore { cntOfMaxScore++ if rand.Intn(cntOfMaxScore) == 0 { // Replace the candidate with probability of 1/cntOfMaxScore - selected = ns.Name + selectedIndex = cntOfMaxScore - 1 } } + + sortedNodeScoreList = append(sortedNodeScoreList, ns) + + if h.Len() == 0 { + break + } } - return selected, nil + + if selectedIndex != 0 { + // replace the first one with selected one + previous := sortedNodeScoreList[0] + sortedNodeScoreList[0] = sortedNodeScoreList[selectedIndex] + sortedNodeScoreList[selectedIndex] = previous + } + + if len(sortedNodeScoreList) > count { + sortedNodeScoreList = sortedNodeScoreList[:count] + } + + return sortedNodeScoreList[0].Name, sortedNodeScoreList, nil +} + +// nodeScoreHeap is a heap of framework.NodePluginScores. +type nodeScoreHeap []framework.NodePluginScores + +// nodeScoreHeap implements heap.Interface. +var _ heap.Interface = &nodeScoreHeap{} + +func (h nodeScoreHeap) Len() int { return len(h) } +func (h nodeScoreHeap) Less(i, j int) bool { return h[i].TotalScore > h[j].TotalScore } +func (h nodeScoreHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] } + +func (h *nodeScoreHeap) Push(x interface{}) { + *h = append(*h, x.(framework.NodePluginScores)) +} + +func (h *nodeScoreHeap) Pop() interface{} { + old := *h + n := len(old) + x := old[n-1] + *h = old[0 : n-1] + return x } // assume signals to the cache that a pod is already in the cache, so that binding can be asynchronous. diff --git a/pkg/scheduler/schedule_one_test.go b/pkg/scheduler/schedule_one_test.go index fd925059c38..19e52d593c7 100644 --- a/pkg/scheduler/schedule_one_test.go +++ b/pkg/scheduler/schedule_one_test.go @@ -1268,50 +1268,109 @@ func TestUpdatePod(t *testing.T) { } } -func TestSelectHost(t *testing.T) { +func Test_SelectHost(t *testing.T) { tests := []struct { - name string - list []framework.NodePluginScores - possibleHosts sets.Set[string] - expectsErr bool + name string + list []framework.NodePluginScores + topNodesCnt int + possibleNodes sets.Set[string] + possibleNodeLists [][]framework.NodePluginScores + wantError error }{ { name: "unique properly ordered scores", list: []framework.NodePluginScores{ - {Name: "node1.1", TotalScore: 1}, - {Name: "node2.1", TotalScore: 2}, + {Name: "node1", TotalScore: 1}, + {Name: "node2", TotalScore: 2}, + }, + topNodesCnt: 2, + possibleNodes: sets.New("node2"), + possibleNodeLists: [][]framework.NodePluginScores{ + { + {Name: "node2", TotalScore: 2}, + {Name: "node1", TotalScore: 1}, + }, + }, + }, + { + name: "numberOfNodeScoresToReturn > len(list)", + list: []framework.NodePluginScores{ + {Name: "node1", TotalScore: 1}, + {Name: "node2", TotalScore: 2}, + }, + topNodesCnt: 100, + possibleNodes: sets.New("node2"), + possibleNodeLists: [][]framework.NodePluginScores{ + { + {Name: "node2", TotalScore: 2}, + {Name: "node1", TotalScore: 1}, + }, }, - possibleHosts: sets.New("node2.1"), - expectsErr: false, }, { name: "equal scores", list: []framework.NodePluginScores{ - {Name: "node1.1", TotalScore: 1}, - {Name: "node1.2", TotalScore: 2}, - {Name: "node1.3", TotalScore: 2}, {Name: "node2.1", TotalScore: 2}, + {Name: "node2.2", TotalScore: 2}, + {Name: "node2.3", TotalScore: 2}, + }, + topNodesCnt: 2, + possibleNodes: sets.New("node2.1", "node2.2", "node2.3"), + possibleNodeLists: [][]framework.NodePluginScores{ + { + {Name: "node2.1", TotalScore: 2}, + {Name: "node2.2", TotalScore: 2}, + }, + { + {Name: "node2.1", TotalScore: 2}, + {Name: "node2.3", TotalScore: 2}, + }, + { + {Name: "node2.2", TotalScore: 2}, + {Name: "node2.1", TotalScore: 2}, + }, + { + {Name: "node2.2", TotalScore: 2}, + {Name: "node2.3", TotalScore: 2}, + }, + { + {Name: "node2.3", TotalScore: 2}, + {Name: "node2.1", TotalScore: 2}, + }, + { + {Name: "node2.3", TotalScore: 2}, + {Name: "node2.2", TotalScore: 2}, + }, }, - possibleHosts: sets.New("node1.2", "node1.3", "node2.1"), - expectsErr: false, }, { name: "out of order scores", list: []framework.NodePluginScores{ - {Name: "node1.1", TotalScore: 3}, - {Name: "node1.2", TotalScore: 3}, + {Name: "node3.1", TotalScore: 3}, {Name: "node2.1", TotalScore: 2}, - {Name: "node3.1", TotalScore: 1}, - {Name: "node1.3", TotalScore: 3}, + {Name: "node1.1", TotalScore: 1}, + {Name: "node3.2", TotalScore: 3}, + }, + topNodesCnt: 3, + possibleNodes: sets.New("node3.1", "node3.2"), + possibleNodeLists: [][]framework.NodePluginScores{ + { + {Name: "node3.1", TotalScore: 3}, + {Name: "node3.2", TotalScore: 3}, + {Name: "node2.1", TotalScore: 2}, + }, + { + {Name: "node3.2", TotalScore: 3}, + {Name: "node3.1", TotalScore: 3}, + {Name: "node2.1", TotalScore: 2}, + }, }, - possibleHosts: sets.New("node1.1", "node1.2", "node1.3"), - expectsErr: false, }, { name: "empty priority list", list: []framework.NodePluginScores{}, - possibleHosts: sets.New[string](), - expectsErr: true, + possibleNodes: sets.Set[string]{}, + wantError: errEmptyPriorityList, }, } @@ -1319,19 +1378,28 @@ func TestSelectHost(t *testing.T) { t.Run(test.name, func(t *testing.T) { // increase the randomness for i := 0; i < 10; i++ { - got, err := selectHost(test.list) - if test.expectsErr { - if err == nil { - t.Error("Unexpected non-error") + got, scoreList, err := selectHost(test.list, test.topNodesCnt) + if err != test.wantError { + t.Fatalf("unexpected error is returned from selectHost: got: %v want: %v", err, test.wantError) + } + if test.possibleNodes.Len() == 0 { + if got != "" { + t.Fatalf("expected nothing returned as selected Node, but actually %s is returned from selectHost", got) } - } else { - if err != nil { - t.Errorf("Unexpected error: %v", err) - } - if !test.possibleHosts.Has(got) { - t.Errorf("got %s is not in the possible map %v", got, test.possibleHosts) + return + } + if !test.possibleNodes.Has(got) { + t.Errorf("got %s is not in the possible map %v", got, test.possibleNodes) + } + if got != scoreList[0].Name { + t.Errorf("The head of list should be the selected Node's score: got: %v, expected: %v", scoreList[0], got) + } + for _, list := range test.possibleNodeLists { + if cmp.Equal(list, scoreList) { + return } } + t.Errorf("Unexpected scoreList: %v", scoreList) } }) }