Merge pull request #113456 from sanposhiho/use-totalscore-in-NodePluginScores

use TotalScore summarized in NodePluginScores
This commit is contained in:
Kubernetes Prow Robot 2022-12-12 09:01:45 -08:00 committed by GitHub
commit 2e3055863d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 230 additions and 48 deletions

View File

@ -644,15 +644,15 @@ func prioritizeNodes(
state *framework.CycleState,
pod *v1.Pod,
nodes []*v1.Node,
) (framework.NodeScoreList, error) {
) ([]framework.NodePluginScores, error) {
// If no priority configs are provided, then all nodes will have a score of one.
// This is required to generate the priority list in the required format
if len(extenders) == 0 && !fwk.HasScorePlugins() {
result := make(framework.NodeScoreList, 0, len(nodes))
result := make([]framework.NodePluginScores, 0, len(nodes))
for i := range nodes {
result = append(result, framework.NodeScore{
Name: nodes[i].Name,
Score: 1,
result = append(result, framework.NodePluginScores{
Name: nodes[i].Name,
TotalScore: 1,
})
}
return result, nil
@ -680,16 +680,12 @@ func prioritizeNodes(
}
}
// Summarize all scores.
result := make(framework.NodeScoreList, len(nodes))
for i, pluginScores := range nodesScores {
result[i] = framework.NodeScore{Name: nodes[i].Name, Score: pluginScores.TotalScore}
}
if len(extenders) != 0 && nodes != nil {
// allNodeExtendersScores has all extenders scores for all nodes.
// It is keyed with node name.
allNodeExtendersScores := make(map[string]*framework.NodePluginScores, len(nodes))
var mu sync.Mutex
var wg sync.WaitGroup
combinedScores := make(map[string]int64, len(nodes))
for i := range extenders {
if !extenders[i].IsInterested(pod) {
continue
@ -710,48 +706,65 @@ func prioritizeNodes(
return
}
mu.Lock()
defer mu.Unlock()
for i := range *prioritizedList {
host, score := (*prioritizedList)[i].Host, (*prioritizedList)[i].Score
nodename := (*prioritizedList)[i].Host
score := (*prioritizedList)[i].Score
if klogV.Enabled() {
klogV.InfoS("Extender scored node for pod", "pod", klog.KObj(pod), "extender", extenders[extIndex].Name(), "node", host, "score", score)
klogV.InfoS("Extender scored node for pod", "pod", klog.KObj(pod), "extender", extenders[extIndex].Name(), "node", nodename, "score", score)
}
combinedScores[host] += score * weight
// MaxExtenderPriority may diverge from the max priority used in the scheduler and defined by MaxNodeScore,
// therefore we need to scale the score returned by extenders to the score range used by the scheduler.
finalscore := score * weight * (framework.MaxNodeScore / extenderv1.MaxExtenderPriority)
if allNodeExtendersScores[nodename] == nil {
allNodeExtendersScores[nodename] = &framework.NodePluginScores{
Name: nodename,
Scores: make([]framework.PluginScore, 0, len(extenders)),
}
}
allNodeExtendersScores[nodename].Scores = append(allNodeExtendersScores[nodename].Scores, framework.PluginScore{
Name: extenders[extIndex].Name(),
Score: finalscore,
})
allNodeExtendersScores[nodename].TotalScore += finalscore
}
mu.Unlock()
}(i)
}
// wait for all go routines to finish
wg.Wait()
for i := range result {
// MaxExtenderPriority may diverge from the max priority used in the scheduler and defined by MaxNodeScore,
// therefore we need to scale the score returned by extenders to the score range used by the scheduler.
result[i].Score += combinedScores[result[i].Name] * (framework.MaxNodeScore / extenderv1.MaxExtenderPriority)
for i := range nodesScores {
if score, ok := allNodeExtendersScores[nodes[i].Name]; ok {
nodesScores[i].Scores = append(nodesScores[i].Scores, score.Scores...)
nodesScores[i].TotalScore += score.TotalScore
}
}
}
if klogV.Enabled() {
for i := range result {
klogV.InfoS("Calculated node's final score for pod", "pod", klog.KObj(pod), "node", result[i].Name, "score", result[i].Score)
for i := range nodesScores {
klogV.InfoS("Calculated node's final score for pod", "pod", klog.KObj(pod), "node", nodesScores[i].Name, "score", nodesScores[i].TotalScore)
}
}
return result, nil
return nodesScores, nil
}
// selectHost takes a prioritized list of nodes and then picks one
// in a reservoir sampling manner from the nodes that had the highest score.
func selectHost(nodeScoreList framework.NodeScoreList) (string, error) {
if len(nodeScoreList) == 0 {
func selectHost(nodeScores []framework.NodePluginScores) (string, error) {
if len(nodeScores) == 0 {
return "", fmt.Errorf("empty priorityList")
}
maxScore := nodeScoreList[0].Score
selected := nodeScoreList[0].Name
maxScore := nodeScores[0].TotalScore
selected := nodeScores[0].Name
cntOfMaxScore := 1
for _, ns := range nodeScoreList[1:] {
if ns.Score > maxScore {
maxScore = ns.Score
for _, ns := range nodeScores[1:] {
if ns.TotalScore > maxScore {
maxScore = ns.TotalScore
selected = ns.Name
cntOfMaxScore = 1
} else if ns.Score == maxScore {
} else if ns.TotalScore == maxScore {
cntOfMaxScore++
if rand.Intn(cntOfMaxScore) == 0 {
// Replace the candidate with probability of 1/cntOfMaxScore

View File

@ -23,6 +23,7 @@ import (
"math"
"reflect"
"regexp"
"sort"
"strconv"
"sync"
"testing"
@ -1270,45 +1271,45 @@ func TestUpdatePod(t *testing.T) {
func TestSelectHost(t *testing.T) {
tests := []struct {
name string
list framework.NodeScoreList
list []framework.NodePluginScores
possibleHosts sets.String
expectsErr bool
}{
{
name: "unique properly ordered scores",
list: []framework.NodeScore{
{Name: "node1.1", Score: 1},
{Name: "node2.1", Score: 2},
list: []framework.NodePluginScores{
{Name: "node1.1", TotalScore: 1},
{Name: "node2.1", TotalScore: 2},
},
possibleHosts: sets.NewString("node2.1"),
expectsErr: false,
},
{
name: "equal scores",
list: []framework.NodeScore{
{Name: "node1.1", Score: 1},
{Name: "node1.2", Score: 2},
{Name: "node1.3", Score: 2},
{Name: "node2.1", Score: 2},
list: []framework.NodePluginScores{
{Name: "node1.1", TotalScore: 1},
{Name: "node1.2", TotalScore: 2},
{Name: "node1.3", TotalScore: 2},
{Name: "node2.1", TotalScore: 2},
},
possibleHosts: sets.NewString("node1.2", "node1.3", "node2.1"),
expectsErr: false,
},
{
name: "out of order scores",
list: []framework.NodeScore{
{Name: "node1.1", Score: 3},
{Name: "node1.2", Score: 3},
{Name: "node2.1", Score: 2},
{Name: "node3.1", Score: 1},
{Name: "node1.3", Score: 3},
list: []framework.NodePluginScores{
{Name: "node1.1", TotalScore: 3},
{Name: "node1.2", TotalScore: 3},
{Name: "node2.1", TotalScore: 2},
{Name: "node3.1", TotalScore: 1},
{Name: "node1.3", TotalScore: 3},
},
possibleHosts: sets.NewString("node1.1", "node1.2", "node1.3"),
expectsErr: false,
},
{
name: "empty priority list",
list: []framework.NodeScore{},
list: []framework.NodePluginScores{},
possibleHosts: sets.NewString(),
expectsErr: true,
},
@ -2343,7 +2344,7 @@ func TestZeroRequest(t *testing.T) {
t.Errorf("unexpected error: %v", err)
}
for _, hp := range list {
if hp.Score != test.expectedScore {
if hp.TotalScore != test.expectedScore {
t.Errorf("expected %d for all priorities, got list %#v", test.expectedScore, list)
}
}
@ -2351,6 +2352,174 @@ func TestZeroRequest(t *testing.T) {
}
}
func Test_prioritizeNodes(t *testing.T) {
tests := []struct {
name string
pod *v1.Pod
pods []*v1.Pod
nodes []*v1.Node
pluginRegistrations []st.RegisterPluginFunc
extenders []st.FakeExtender
want []framework.NodePluginScores
}{
{
name: "the score from all plugins should be recorded in PluginToNodeScores",
pod: &v1.Pod{},
nodes: []*v1.Node{makeNode("node1", 1000, schedutil.DefaultMemoryRequest*10), makeNode("node2", 1000, schedutil.DefaultMemoryRequest*10)},
pluginRegistrations: []st.RegisterPluginFunc{
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
st.RegisterScorePlugin(noderesources.BalancedAllocationName, frameworkruntime.FactoryAdapter(feature.Features{}, noderesources.NewBalancedAllocation), 1),
st.RegisterScorePlugin("Node2Prioritizer", st.NewNode2PrioritizerPlugin(), 1),
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
},
extenders: nil,
want: []framework.NodePluginScores{
{
Name: "node1",
Scores: []framework.PluginScore{
{
Name: "Node2Prioritizer",
Score: 10,
},
{
Name: "NodeResourcesBalancedAllocation",
Score: 100,
},
},
TotalScore: 110,
},
{
Name: "node2",
Scores: []framework.PluginScore{
{
Name: "Node2Prioritizer",
Score: 100,
},
{
Name: "NodeResourcesBalancedAllocation",
Score: 100,
},
},
TotalScore: 200,
},
},
},
{
name: "the score from extender should also be recorded in PluginToNodeScores with plugin scores",
pod: &v1.Pod{},
nodes: []*v1.Node{makeNode("node1", 1000, schedutil.DefaultMemoryRequest*10), makeNode("node2", 1000, schedutil.DefaultMemoryRequest*10)},
pluginRegistrations: []st.RegisterPluginFunc{
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
st.RegisterScorePlugin(noderesources.BalancedAllocationName, frameworkruntime.FactoryAdapter(feature.Features{}, noderesources.NewBalancedAllocation), 1),
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
},
extenders: []st.FakeExtender{
{
ExtenderName: "FakeExtender1",
Weight: 1,
Prioritizers: []st.PriorityConfig{
{
Weight: 3,
Function: st.Node1PrioritizerExtender,
},
},
},
{
ExtenderName: "FakeExtender2",
Weight: 1,
Prioritizers: []st.PriorityConfig{
{
Weight: 2,
Function: st.Node2PrioritizerExtender,
},
},
},
},
want: []framework.NodePluginScores{
{
Name: "node1",
Scores: []framework.PluginScore{
{
Name: "FakeExtender1",
Score: 300,
},
{
Name: "FakeExtender2",
Score: 20,
},
{
Name: "NodeResourcesBalancedAllocation",
Score: 100,
},
},
TotalScore: 420,
},
{
Name: "node2",
Scores: []framework.PluginScore{
{
Name: "FakeExtender1",
Score: 30,
},
{
Name: "FakeExtender2",
Score: 200,
},
{
Name: "NodeResourcesBalancedAllocation",
Score: 100,
},
},
TotalScore: 330,
},
},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
client := clientsetfake.NewSimpleClientset()
informerFactory := informers.NewSharedInformerFactory(client, 0)
snapshot := internalcache.NewSnapshot(test.pods, test.nodes)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
fwk, err := st.NewFramework(
test.pluginRegistrations, "",
ctx.Done(),
frameworkruntime.WithInformerFactory(informerFactory),
frameworkruntime.WithSnapshotSharedLister(snapshot),
frameworkruntime.WithClientSet(client),
frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())),
)
if err != nil {
t.Fatalf("error creating framework: %+v", err)
}
state := framework.NewCycleState()
fwk.RunPreScorePlugins(ctx, state, test.pod, test.nodes)
var extenders []framework.Extender
for ii := range test.extenders {
extenders = append(extenders, &test.extenders[ii])
}
nodesscores, err := prioritizeNodes(ctx, extenders, fwk, state, test.pod, test.nodes)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
for i := range nodesscores {
sort.Slice(nodesscores[i].Scores, func(j, k int) bool {
return nodesscores[i].Scores[j].Name < nodesscores[i].Scores[k].Name
})
}
if diff := cmp.Diff(test.want, nodesscores); diff != "" {
t.Errorf("returned nodes scores (-want,+got):\n%s", diff)
}
})
}
}
var lowPriority, midPriority, highPriority = int32(0), int32(100), int32(1000)
func TestNumFeasibleNodesToFind(t *testing.T) {