Only run Prioritize() for extenders with prioritizeVerb configured

This commit is contained in:
Aleksandra Malinowska 2023-11-28 10:55:37 +01:00
parent 56dcde8627
commit 3df00d1bdd
2 changed files with 19 additions and 14 deletions

View File

@ -420,7 +420,7 @@ func (sched *Scheduler) schedulePod(ctx context.Context, fwk framework.Framework
}, nil }, nil
} }
priorityList, err := prioritizeNodes(ctx, sched.Extenders, fwk, state, pod, feasibleNodes) priorityList, err := sched.prioritizeNodes(ctx, fwk, state, pod, feasibleNodes)
if err != nil { if err != nil {
return result, err return result, err
} }
@ -704,9 +704,8 @@ func findNodesThatPassExtenders(ctx context.Context, extenders []framework.Exten
// The scores from each plugin are added together to make the score for that node, then // The scores from each plugin are added together to make the score for that node, then
// any extenders are run as well. // any extenders are run as well.
// All scores are finally combined (added) to get the total weighted scores of all nodes // All scores are finally combined (added) to get the total weighted scores of all nodes
func prioritizeNodes( func (sched *Scheduler) prioritizeNodes(
ctx context.Context, ctx context.Context,
extenders []framework.Extender,
fwk framework.Framework, fwk framework.Framework,
state *framework.CycleState, state *framework.CycleState,
pod *v1.Pod, pod *v1.Pod,
@ -715,7 +714,7 @@ func prioritizeNodes(
logger := klog.FromContext(ctx) logger := klog.FromContext(ctx)
// If no priority configs are provided, then all nodes will have a score of one. // If no priority configs are provided, then all nodes will have a score of one.
// This is required to generate the priority list in the required format // This is required to generate the priority list in the required format
if len(extenders) == 0 && !fwk.HasScorePlugins() { if !sched.hasScoring(fwk) {
result := make([]framework.NodePluginScores, 0, len(nodes)) result := make([]framework.NodePluginScores, 0, len(nodes))
for i := range nodes { for i := range nodes {
result = append(result, framework.NodePluginScores{ result = append(result, framework.NodePluginScores{
@ -748,14 +747,17 @@ func prioritizeNodes(
} }
} }
if len(extenders) != 0 && nodes != nil { if len(sched.Extenders) != 0 && nodes != nil {
// allNodeExtendersScores has all extenders scores for all nodes. // allNodeExtendersScores has all extenders scores for all nodes.
// It is keyed with node name. // It is keyed with node name.
allNodeExtendersScores := make(map[string]*framework.NodePluginScores, len(nodes)) allNodeExtendersScores := make(map[string]*framework.NodePluginScores, len(nodes))
var mu sync.Mutex var mu sync.Mutex
var wg sync.WaitGroup var wg sync.WaitGroup
for i := range extenders { for i := range sched.Extenders {
if !extenders[i].IsInterested(pod) { if !sched.Extenders[i].IsInterested(pod) {
continue
}
if !sched.Extenders[i].IsPrioritizer() {
continue continue
} }
wg.Add(1) wg.Add(1)
@ -765,10 +767,10 @@ func prioritizeNodes(
metrics.Goroutines.WithLabelValues(metrics.PrioritizingExtender).Dec() metrics.Goroutines.WithLabelValues(metrics.PrioritizingExtender).Dec()
wg.Done() wg.Done()
}() }()
prioritizedList, weight, err := extenders[extIndex].Prioritize(pod, nodes) prioritizedList, weight, err := sched.Extenders[extIndex].Prioritize(pod, nodes)
if err != nil { if err != nil {
// Prioritization errors from extender can be ignored, let k8s/other extenders determine the priorities // Prioritization errors from extender can be ignored, let k8s/other extenders determine the priorities
logger.V(5).Info("Failed to run extender's priority function. No score given by this extender.", "error", err, "pod", klog.KObj(pod), "extender", extenders[extIndex].Name()) logger.V(5).Info("Failed to run extender's priority function. No score given by this extender.", "error", err, "pod", klog.KObj(pod), "extender", sched.Extenders[extIndex].Name())
return return
} }
mu.Lock() mu.Lock()
@ -777,7 +779,7 @@ func prioritizeNodes(
nodename := (*prioritizedList)[i].Host nodename := (*prioritizedList)[i].Host
score := (*prioritizedList)[i].Score score := (*prioritizedList)[i].Score
if loggerVTen.Enabled() { if loggerVTen.Enabled() {
loggerVTen.Info("Extender scored node for pod", "pod", klog.KObj(pod), "extender", extenders[extIndex].Name(), "node", nodename, "score", score) loggerVTen.Info("Extender scored node for pod", "pod", klog.KObj(pod), "extender", sched.Extenders[extIndex].Name(), "node", nodename, "score", score)
} }
// MaxExtenderPriority may diverge from the max priority used in the scheduler and defined by MaxNodeScore, // MaxExtenderPriority may diverge from the max priority used in the scheduler and defined by MaxNodeScore,
@ -787,11 +789,11 @@ func prioritizeNodes(
if allNodeExtendersScores[nodename] == nil { if allNodeExtendersScores[nodename] == nil {
allNodeExtendersScores[nodename] = &framework.NodePluginScores{ allNodeExtendersScores[nodename] = &framework.NodePluginScores{
Name: nodename, Name: nodename,
Scores: make([]framework.PluginScore, 0, len(extenders)), Scores: make([]framework.PluginScore, 0, len(sched.Extenders)),
} }
} }
allNodeExtendersScores[nodename].Scores = append(allNodeExtendersScores[nodename].Scores, framework.PluginScore{ allNodeExtendersScores[nodename].Scores = append(allNodeExtendersScores[nodename].Scores, framework.PluginScore{
Name: extenders[extIndex].Name(), Name: sched.Extenders[extIndex].Name(),
Score: finalscore, Score: finalscore,
}) })
allNodeExtendersScores[nodename].TotalScore += finalscore allNodeExtendersScores[nodename].TotalScore += finalscore

View File

@ -2700,7 +2700,7 @@ func TestZeroRequest(t *testing.T) {
t.Fatalf("error filtering nodes: %+v", err) t.Fatalf("error filtering nodes: %+v", err)
} }
fwk.RunPreScorePlugins(ctx, state, test.pod, test.nodes) fwk.RunPreScorePlugins(ctx, state, test.pod, test.nodes)
list, err := prioritizeNodes(ctx, nil, fwk, state, test.pod, test.nodes) list, err := sched.prioritizeNodes(ctx, fwk, state, test.pod, test.nodes)
if err != nil { if err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
} }
@ -3096,7 +3096,10 @@ func Test_prioritizeNodes(t *testing.T) {
for ii := range test.extenders { for ii := range test.extenders {
extenders = append(extenders, &test.extenders[ii]) extenders = append(extenders, &test.extenders[ii])
} }
nodesscores, err := prioritizeNodes(ctx, extenders, fwk, state, test.pod, test.nodes) sched := &Scheduler{
Extenders: extenders,
}
nodesscores, err := sched.prioritizeNodes(ctx, fwk, state, test.pod, test.nodes)
if err != nil { if err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
} }