From b8a353a8a6689208338ddf827e9cb866fc712649 Mon Sep 17 00:00:00 2001 From: draveness Date: Thu, 31 Oct 2019 19:40:27 +0800 Subject: [PATCH] feat: make prioritizeNodes private function of genericScheduler --- pkg/scheduler/core/generic_scheduler.go | 59 ++++++++++---------- pkg/scheduler/core/generic_scheduler_test.go | 27 ++++++++- 2 files changed, 52 insertions(+), 34 deletions(-) diff --git a/pkg/scheduler/core/generic_scheduler.go b/pkg/scheduler/core/generic_scheduler.go index 8f3202e37ac..2b7286cac7e 100644 --- a/pkg/scheduler/core/generic_scheduler.go +++ b/pkg/scheduler/core/generic_scheduler.go @@ -239,7 +239,7 @@ func (g *genericScheduler) Schedule(ctx context.Context, state *framework.CycleS } metaPrioritiesInterface := g.priorityMetaProducer(pod, filteredNodes, g.nodeInfoSnapshot) - priorityList, err := PrioritizeNodes(ctx, pod, g.nodeInfoSnapshot, metaPrioritiesInterface, g.prioritizers, filteredNodes, g.extenders, g.framework, state) + priorityList, err := g.prioritizeNodes(ctx, state, pod, metaPrioritiesInterface, filteredNodes) if err != nil { return result, err } @@ -695,28 +695,25 @@ func (g *genericScheduler) podFitsOnNode( return len(failedPredicates) == 0 && status.IsSuccess(), failedPredicates, status, nil } -// PrioritizeNodes prioritizes the nodes by running the individual priority functions in parallel. +// prioritizeNodes prioritizes the nodes by running the individual priority functions in parallel. // Each priority function is expected to set a score of 0-10 // 0 is the lowest priority score (least preferred node) and 10 is the highest // Each priority function can also have its own weight // The node scores returned by the priority function are multiplied by the weights to get weighted scores // All scores are finally combined (added) to get the total weighted scores of all nodes -func PrioritizeNodes( +func (g *genericScheduler) prioritizeNodes( ctx context.Context, + state *framework.CycleState, pod *v1.Pod, - snapshot *nodeinfosnapshot.Snapshot, meta interface{}, - priorityConfigs []priorities.PriorityConfig, nodes []*v1.Node, - extenders []algorithm.SchedulerExtender, - fwk framework.Framework, - state *framework.CycleState) (framework.NodeScoreList, error) { +) (framework.NodeScoreList, error) { // If no priority configs are provided, then the EqualPriority function is applied // This is required to generate the priority list in the required format - if len(priorityConfigs) == 0 && len(extenders) == 0 && !fwk.HasScorePlugins() { + if len(g.prioritizers) == 0 && len(g.extenders) == 0 && !g.framework.HasScorePlugins() { result := make(framework.NodeScoreList, 0, len(nodes)) for i := range nodes { - hostPriority, err := EqualPriorityMap(pod, meta, snapshot.NodeInfoMap[nodes[i].Name]) + hostPriority, err := EqualPriorityMap(pod, meta, g.nodeInfoSnapshot.NodeInfoMap[nodes[i].Name]) if err != nil { return nil, err } @@ -736,12 +733,12 @@ func PrioritizeNodes( errs = append(errs, err) } - results := make([]framework.NodeScoreList, len(priorityConfigs)) + results := make([]framework.NodeScoreList, len(g.prioritizers)) // DEPRECATED: we can remove this when all priorityConfigs implement the // Map-Reduce pattern. - for i := range priorityConfigs { - if priorityConfigs[i].Function != nil { + for i := range g.prioritizers { + if g.prioritizers[i].Function != nil { wg.Add(1) go func(index int) { metrics.SchedulerGoroutines.WithLabelValues("prioritizing_legacy").Inc() @@ -750,7 +747,7 @@ func PrioritizeNodes( wg.Done() }() var err error - results[index], err = priorityConfigs[index].Function(pod, snapshot, nodes) + results[index], err = g.prioritizers[index].Function(pod, g.nodeInfoSnapshot, nodes) if err != nil { appendError(err) } @@ -761,14 +758,14 @@ func PrioritizeNodes( } workqueue.ParallelizeUntil(context.TODO(), 16, len(nodes), func(index int) { - nodeInfo := snapshot.NodeInfoMap[nodes[index].Name] - for i := range priorityConfigs { - if priorityConfigs[i].Function != nil { + nodeInfo := g.nodeInfoSnapshot.NodeInfoMap[nodes[index].Name] + for i := range g.prioritizers { + if g.prioritizers[i].Function != nil { continue } var err error - results[i][index], err = priorityConfigs[i].Map(pod, meta, nodeInfo) + results[i][index], err = g.prioritizers[i].Map(pod, meta, nodeInfo) if err != nil { appendError(err) results[i][index].Name = nodes[index].Name @@ -776,8 +773,8 @@ func PrioritizeNodes( } }) - for i := range priorityConfigs { - if priorityConfigs[i].Reduce == nil { + for i := range g.prioritizers { + if g.prioritizers[i].Reduce == nil { continue } wg.Add(1) @@ -787,12 +784,12 @@ func PrioritizeNodes( metrics.SchedulerGoroutines.WithLabelValues("prioritizing_mapreduce").Dec() wg.Done() }() - if err := priorityConfigs[index].Reduce(pod, meta, snapshot, results[index]); err != nil { + if err := g.prioritizers[index].Reduce(pod, meta, g.nodeInfoSnapshot, results[index]); err != nil { appendError(err) } if klog.V(10) { for _, hostPriority := range results[index] { - klog.Infof("%v -> %v: %v, Score: (%d)", util.GetPodFullName(pod), hostPriority.Name, priorityConfigs[index].Name, hostPriority.Score) + klog.Infof("%v -> %v: %v, Score: (%d)", util.GetPodFullName(pod), hostPriority.Name, g.prioritizers[index].Name, hostPriority.Score) } } }(i) @@ -805,7 +802,7 @@ func PrioritizeNodes( // Run the Score plugins. state.Write(migration.PrioritiesStateKey, &migration.PrioritiesStateData{Reference: meta}) - scoresMap, scoreStatus := fwk.RunScorePlugins(ctx, state, pod, nodes) + scoresMap, scoreStatus := g.framework.RunScorePlugins(ctx, state, pod, nodes) if !scoreStatus.IsSuccess() { return framework.NodeScoreList{}, scoreStatus.AsError() } @@ -815,8 +812,8 @@ func PrioritizeNodes( for i := range nodes { result = append(result, framework.NodeScore{Name: nodes[i].Name, Score: 0}) - for j := range priorityConfigs { - result[i].Score += results[j][i].Score * priorityConfigs[j].Weight + for j := range g.prioritizers { + result[i].Score += results[j][i].Score * g.prioritizers[j].Weight } for j := range scoresMap { @@ -824,10 +821,10 @@ func PrioritizeNodes( } } - if len(extenders) != 0 && nodes != nil { - combinedScores := make(map[string]int64, len(snapshot.NodeInfoList)) - for i := range extenders { - if !extenders[i].IsInterested(pod) { + if len(g.extenders) != 0 && nodes != nil { + combinedScores := make(map[string]int64, len(g.nodeInfoSnapshot.NodeInfoList)) + for i := range g.extenders { + if !g.extenders[i].IsInterested(pod) { continue } wg.Add(1) @@ -837,7 +834,7 @@ func PrioritizeNodes( metrics.SchedulerGoroutines.WithLabelValues("prioritizing_extender").Dec() wg.Done() }() - prioritizedList, weight, err := extenders[extIndex].Prioritize(pod, nodes) + prioritizedList, weight, err := g.extenders[extIndex].Prioritize(pod, nodes) if err != nil { // Prioritization errors from extender can be ignored, let k8s/other extenders determine the priorities return @@ -846,7 +843,7 @@ func PrioritizeNodes( for i := range *prioritizedList { host, score := (*prioritizedList)[i].Host, (*prioritizedList)[i].Score if klog.V(10) { - klog.Infof("%v -> %v: %v, Score: (%d)", util.GetPodFullName(pod), host, extenders[extIndex].Name(), score) + klog.Infof("%v -> %v: %v, Score: (%d)", util.GetPodFullName(pod), host, g.extenders[extIndex].Name(), score) } combinedScores[host] += score * weight } diff --git a/pkg/scheduler/core/generic_scheduler_test.go b/pkg/scheduler/core/generic_scheduler_test.go index bc13ab068c7..914b2f1f8eb 100644 --- a/pkg/scheduler/core/generic_scheduler_test.go +++ b/pkg/scheduler/core/generic_scheduler_test.go @@ -1010,10 +1010,31 @@ func TestZeroRequest(t *testing.T) { metaData := metaDataProducer(test.pod, test.nodes, snapshot) - list, err := PrioritizeNodes( + scheduler := NewGenericScheduler( + nil, + nil, + nil, + nil, + priorityConfigs, + metaDataProducer, + emptyFramework, + []algorithm.SchedulerExtender{}, + nil, + nil, + nil, + false, + false, + schedulerapi.DefaultPercentageOfNodesToScore, + false).(*genericScheduler) + scheduler.nodeInfoSnapshot = snapshot + + list, err := scheduler.prioritizeNodes( context.Background(), - test.pod, snapshot, metaData, priorityConfigs, - test.nodes, []algorithm.SchedulerExtender{}, emptyFramework, framework.NewCycleState()) + framework.NewCycleState(), + test.pod, + metaData, + test.nodes, + ) if err != nil { t.Errorf("unexpected error: %v", err) }