feature(schedule_one): use heap to find the highest score node

This commit is contained in:
Kensei Nakada 2022-12-25 03:51:42 +00:00
parent 6e541a6da7
commit 0535e74224
2 changed files with 171 additions and 46 deletions

View File

@ -17,7 +17,9 @@ limitations under the License.
package scheduler
import (
"container/heap"
"context"
"errors"
"fmt"
"math/rand"
"strconv"
@ -56,6 +58,9 @@ const (
// to ensure that a certain minimum of nodes are checked for feasibility.
// This in turn helps ensure a minimum level of spreading.
minFeasibleNodesPercentageToFind = 5
// numberOfHighestScoredNodesToReport is the number of node scores
// to be included in ScheduleResult.
numberOfHighestScoredNodesToReport = 3
)
// scheduleOne does the entire scheduling workflow for a single pod. It is serialized on the scheduling algorithm's host fitting.
@ -372,7 +377,7 @@ func (sched *Scheduler) schedulePod(ctx context.Context, fwk framework.Framework
return result, err
}
host, err := selectHost(priorityList)
host, _, err := selectHost(priorityList, numberOfHighestScoredNodesToReport)
trace.Step("Prioritizing done")
return ScheduleResult{
@ -744,29 +749,81 @@ func prioritizeNodes(
return nodesScores, nil
}
var errEmptyPriorityList = errors.New("empty priorityList")
// selectHost takes a prioritized list of nodes and then picks one
// in a reservoir sampling manner from the nodes that had the highest score.
func selectHost(nodeScores []framework.NodePluginScores) (string, error) {
if len(nodeScores) == 0 {
return "", fmt.Errorf("empty priorityList")
// It also returns the top {count} Nodes,
// and the top of the list will be always the selected host.
func selectHost(nodeScoreList []framework.NodePluginScores, count int) (string, []framework.NodePluginScores, error) {
if len(nodeScoreList) == 0 {
return "", nil, errEmptyPriorityList
}
maxScore := nodeScores[0].TotalScore
selected := nodeScores[0].Name
var h nodeScoreHeap = nodeScoreList
heap.Init(&h)
cntOfMaxScore := 1
for _, ns := range nodeScores[1:] {
if ns.TotalScore > maxScore {
maxScore = ns.TotalScore
selected = ns.Name
cntOfMaxScore = 1
} else if ns.TotalScore == maxScore {
selectedIndex := 0
// The top of the heap is the NodeScoreResult with the highest score.
sortedNodeScoreList := make([]framework.NodePluginScores, 0, count)
sortedNodeScoreList = append(sortedNodeScoreList, heap.Pop(&h).(framework.NodePluginScores))
// This for-loop will continue until all Nodes with the highest scores get checked for a reservoir sampling,
// and sortedNodeScoreList gets (count - 1) elements.
for ns := heap.Pop(&h).(framework.NodePluginScores); ; ns = heap.Pop(&h).(framework.NodePluginScores) {
if ns.TotalScore != sortedNodeScoreList[0].TotalScore && len(sortedNodeScoreList) == count {
break
}
if ns.TotalScore == sortedNodeScoreList[0].TotalScore {
cntOfMaxScore++
if rand.Intn(cntOfMaxScore) == 0 {
// Replace the candidate with probability of 1/cntOfMaxScore
selected = ns.Name
selectedIndex = cntOfMaxScore - 1
}
}
sortedNodeScoreList = append(sortedNodeScoreList, ns)
if h.Len() == 0 {
break
}
}
return selected, nil
if selectedIndex != 0 {
// replace the first one with selected one
previous := sortedNodeScoreList[0]
sortedNodeScoreList[0] = sortedNodeScoreList[selectedIndex]
sortedNodeScoreList[selectedIndex] = previous
}
if len(sortedNodeScoreList) > count {
sortedNodeScoreList = sortedNodeScoreList[:count]
}
return sortedNodeScoreList[0].Name, sortedNodeScoreList, nil
}
// nodeScoreHeap is a heap of framework.NodePluginScores.
type nodeScoreHeap []framework.NodePluginScores
// nodeScoreHeap implements heap.Interface.
var _ heap.Interface = &nodeScoreHeap{}
func (h nodeScoreHeap) Len() int { return len(h) }
func (h nodeScoreHeap) Less(i, j int) bool { return h[i].TotalScore > h[j].TotalScore }
func (h nodeScoreHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
func (h *nodeScoreHeap) Push(x interface{}) {
*h = append(*h, x.(framework.NodePluginScores))
}
func (h *nodeScoreHeap) Pop() interface{} {
old := *h
n := len(old)
x := old[n-1]
*h = old[0 : n-1]
return x
}
// assume signals to the cache that a pod is already in the cache, so that binding can be asynchronous.

View File

@ -1268,50 +1268,109 @@ func TestUpdatePod(t *testing.T) {
}
}
func TestSelectHost(t *testing.T) {
func Test_SelectHost(t *testing.T) {
tests := []struct {
name string
list []framework.NodePluginScores
possibleHosts sets.Set[string]
expectsErr bool
name string
list []framework.NodePluginScores
topNodesCnt int
possibleNodes sets.Set[string]
possibleNodeLists [][]framework.NodePluginScores
wantError error
}{
{
name: "unique properly ordered scores",
list: []framework.NodePluginScores{
{Name: "node1.1", TotalScore: 1},
{Name: "node2.1", TotalScore: 2},
{Name: "node1", TotalScore: 1},
{Name: "node2", TotalScore: 2},
},
topNodesCnt: 2,
possibleNodes: sets.New("node2"),
possibleNodeLists: [][]framework.NodePluginScores{
{
{Name: "node2", TotalScore: 2},
{Name: "node1", TotalScore: 1},
},
},
},
{
name: "numberOfNodeScoresToReturn > len(list)",
list: []framework.NodePluginScores{
{Name: "node1", TotalScore: 1},
{Name: "node2", TotalScore: 2},
},
topNodesCnt: 100,
possibleNodes: sets.New("node2"),
possibleNodeLists: [][]framework.NodePluginScores{
{
{Name: "node2", TotalScore: 2},
{Name: "node1", TotalScore: 1},
},
},
possibleHosts: sets.New("node2.1"),
expectsErr: false,
},
{
name: "equal scores",
list: []framework.NodePluginScores{
{Name: "node1.1", TotalScore: 1},
{Name: "node1.2", TotalScore: 2},
{Name: "node1.3", TotalScore: 2},
{Name: "node2.1", TotalScore: 2},
{Name: "node2.2", TotalScore: 2},
{Name: "node2.3", TotalScore: 2},
},
topNodesCnt: 2,
possibleNodes: sets.New("node2.1", "node2.2", "node2.3"),
possibleNodeLists: [][]framework.NodePluginScores{
{
{Name: "node2.1", TotalScore: 2},
{Name: "node2.2", TotalScore: 2},
},
{
{Name: "node2.1", TotalScore: 2},
{Name: "node2.3", TotalScore: 2},
},
{
{Name: "node2.2", TotalScore: 2},
{Name: "node2.1", TotalScore: 2},
},
{
{Name: "node2.2", TotalScore: 2},
{Name: "node2.3", TotalScore: 2},
},
{
{Name: "node2.3", TotalScore: 2},
{Name: "node2.1", TotalScore: 2},
},
{
{Name: "node2.3", TotalScore: 2},
{Name: "node2.2", TotalScore: 2},
},
},
possibleHosts: sets.New("node1.2", "node1.3", "node2.1"),
expectsErr: false,
},
{
name: "out of order scores",
list: []framework.NodePluginScores{
{Name: "node1.1", TotalScore: 3},
{Name: "node1.2", TotalScore: 3},
{Name: "node3.1", TotalScore: 3},
{Name: "node2.1", TotalScore: 2},
{Name: "node3.1", TotalScore: 1},
{Name: "node1.3", TotalScore: 3},
{Name: "node1.1", TotalScore: 1},
{Name: "node3.2", TotalScore: 3},
},
topNodesCnt: 3,
possibleNodes: sets.New("node3.1", "node3.2"),
possibleNodeLists: [][]framework.NodePluginScores{
{
{Name: "node3.1", TotalScore: 3},
{Name: "node3.2", TotalScore: 3},
{Name: "node2.1", TotalScore: 2},
},
{
{Name: "node3.2", TotalScore: 3},
{Name: "node3.1", TotalScore: 3},
{Name: "node2.1", TotalScore: 2},
},
},
possibleHosts: sets.New("node1.1", "node1.2", "node1.3"),
expectsErr: false,
},
{
name: "empty priority list",
list: []framework.NodePluginScores{},
possibleHosts: sets.New[string](),
expectsErr: true,
possibleNodes: sets.Set[string]{},
wantError: errEmptyPriorityList,
},
}
@ -1319,19 +1378,28 @@ func TestSelectHost(t *testing.T) {
t.Run(test.name, func(t *testing.T) {
// increase the randomness
for i := 0; i < 10; i++ {
got, err := selectHost(test.list)
if test.expectsErr {
if err == nil {
t.Error("Unexpected non-error")
got, scoreList, err := selectHost(test.list, test.topNodesCnt)
if err != test.wantError {
t.Fatalf("unexpected error is returned from selectHost: got: %v want: %v", err, test.wantError)
}
if test.possibleNodes.Len() == 0 {
if got != "" {
t.Fatalf("expected nothing returned as selected Node, but actually %s is returned from selectHost", got)
}
} else {
if err != nil {
t.Errorf("Unexpected error: %v", err)
}
if !test.possibleHosts.Has(got) {
t.Errorf("got %s is not in the possible map %v", got, test.possibleHosts)
return
}
if !test.possibleNodes.Has(got) {
t.Errorf("got %s is not in the possible map %v", got, test.possibleNodes)
}
if got != scoreList[0].Name {
t.Errorf("The head of list should be the selected Node's score: got: %v, expected: %v", scoreList[0], got)
}
for _, list := range test.possibleNodeLists {
if cmp.Equal(list, scoreList) {
return
}
}
t.Errorf("Unexpected scoreList: %v", scoreList)
}
})
}