Merge pull request #84589 from draveness/feature/make-prioritize-nodes-private

feat: make prioritizeNodes private function of genericScheduler
This commit is contained in:
Kubernetes Prow Robot 2019-10-31 09:11:52 -07:00 committed by GitHub
commit e8aad06769
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 52 additions and 34 deletions

View File

@ -239,7 +239,7 @@ func (g *genericScheduler) Schedule(ctx context.Context, state *framework.CycleS
}
metaPrioritiesInterface := g.priorityMetaProducer(pod, filteredNodes, g.nodeInfoSnapshot)
priorityList, err := PrioritizeNodes(ctx, pod, g.nodeInfoSnapshot, metaPrioritiesInterface, g.prioritizers, filteredNodes, g.extenders, g.framework, state)
priorityList, err := g.prioritizeNodes(ctx, state, pod, metaPrioritiesInterface, filteredNodes)
if err != nil {
return result, err
}
@ -695,28 +695,25 @@ func (g *genericScheduler) podFitsOnNode(
return len(failedPredicates) == 0 && status.IsSuccess(), failedPredicates, status, nil
}
// PrioritizeNodes prioritizes the nodes by running the individual priority functions in parallel.
// prioritizeNodes prioritizes the nodes by running the individual priority functions in parallel.
// Each priority function is expected to set a score of 0-10
// 0 is the lowest priority score (least preferred node) and 10 is the highest
// Each priority function can also have its own weight
// The node scores returned by the priority function are multiplied by the weights to get weighted scores
// All scores are finally combined (added) to get the total weighted scores of all nodes
func PrioritizeNodes(
func (g *genericScheduler) prioritizeNodes(
ctx context.Context,
state *framework.CycleState,
pod *v1.Pod,
snapshot *nodeinfosnapshot.Snapshot,
meta interface{},
priorityConfigs []priorities.PriorityConfig,
nodes []*v1.Node,
extenders []algorithm.SchedulerExtender,
fwk framework.Framework,
state *framework.CycleState) (framework.NodeScoreList, error) {
) (framework.NodeScoreList, error) {
// If no priority configs are provided, then the EqualPriority function is applied
// This is required to generate the priority list in the required format
if len(priorityConfigs) == 0 && len(extenders) == 0 && !fwk.HasScorePlugins() {
if len(g.prioritizers) == 0 && len(g.extenders) == 0 && !g.framework.HasScorePlugins() {
result := make(framework.NodeScoreList, 0, len(nodes))
for i := range nodes {
hostPriority, err := EqualPriorityMap(pod, meta, snapshot.NodeInfoMap[nodes[i].Name])
hostPriority, err := EqualPriorityMap(pod, meta, g.nodeInfoSnapshot.NodeInfoMap[nodes[i].Name])
if err != nil {
return nil, err
}
@ -736,12 +733,12 @@ func PrioritizeNodes(
errs = append(errs, err)
}
results := make([]framework.NodeScoreList, len(priorityConfigs))
results := make([]framework.NodeScoreList, len(g.prioritizers))
// DEPRECATED: we can remove this when all priorityConfigs implement the
// Map-Reduce pattern.
for i := range priorityConfigs {
if priorityConfigs[i].Function != nil {
for i := range g.prioritizers {
if g.prioritizers[i].Function != nil {
wg.Add(1)
go func(index int) {
metrics.SchedulerGoroutines.WithLabelValues("prioritizing_legacy").Inc()
@ -750,7 +747,7 @@ func PrioritizeNodes(
wg.Done()
}()
var err error
results[index], err = priorityConfigs[index].Function(pod, snapshot, nodes)
results[index], err = g.prioritizers[index].Function(pod, g.nodeInfoSnapshot, nodes)
if err != nil {
appendError(err)
}
@ -761,14 +758,14 @@ func PrioritizeNodes(
}
workqueue.ParallelizeUntil(context.TODO(), 16, len(nodes), func(index int) {
nodeInfo := snapshot.NodeInfoMap[nodes[index].Name]
for i := range priorityConfigs {
if priorityConfigs[i].Function != nil {
nodeInfo := g.nodeInfoSnapshot.NodeInfoMap[nodes[index].Name]
for i := range g.prioritizers {
if g.prioritizers[i].Function != nil {
continue
}
var err error
results[i][index], err = priorityConfigs[i].Map(pod, meta, nodeInfo)
results[i][index], err = g.prioritizers[i].Map(pod, meta, nodeInfo)
if err != nil {
appendError(err)
results[i][index].Name = nodes[index].Name
@ -776,8 +773,8 @@ func PrioritizeNodes(
}
})
for i := range priorityConfigs {
if priorityConfigs[i].Reduce == nil {
for i := range g.prioritizers {
if g.prioritizers[i].Reduce == nil {
continue
}
wg.Add(1)
@ -787,12 +784,12 @@ func PrioritizeNodes(
metrics.SchedulerGoroutines.WithLabelValues("prioritizing_mapreduce").Dec()
wg.Done()
}()
if err := priorityConfigs[index].Reduce(pod, meta, snapshot, results[index]); err != nil {
if err := g.prioritizers[index].Reduce(pod, meta, g.nodeInfoSnapshot, results[index]); err != nil {
appendError(err)
}
if klog.V(10) {
for _, hostPriority := range results[index] {
klog.Infof("%v -> %v: %v, Score: (%d)", util.GetPodFullName(pod), hostPriority.Name, priorityConfigs[index].Name, hostPriority.Score)
klog.Infof("%v -> %v: %v, Score: (%d)", util.GetPodFullName(pod), hostPriority.Name, g.prioritizers[index].Name, hostPriority.Score)
}
}
}(i)
@ -805,7 +802,7 @@ func PrioritizeNodes(
// Run the Score plugins.
state.Write(migration.PrioritiesStateKey, &migration.PrioritiesStateData{Reference: meta})
scoresMap, scoreStatus := fwk.RunScorePlugins(ctx, state, pod, nodes)
scoresMap, scoreStatus := g.framework.RunScorePlugins(ctx, state, pod, nodes)
if !scoreStatus.IsSuccess() {
return framework.NodeScoreList{}, scoreStatus.AsError()
}
@ -815,8 +812,8 @@ func PrioritizeNodes(
for i := range nodes {
result = append(result, framework.NodeScore{Name: nodes[i].Name, Score: 0})
for j := range priorityConfigs {
result[i].Score += results[j][i].Score * priorityConfigs[j].Weight
for j := range g.prioritizers {
result[i].Score += results[j][i].Score * g.prioritizers[j].Weight
}
for j := range scoresMap {
@ -824,10 +821,10 @@ func PrioritizeNodes(
}
}
if len(extenders) != 0 && nodes != nil {
combinedScores := make(map[string]int64, len(snapshot.NodeInfoList))
for i := range extenders {
if !extenders[i].IsInterested(pod) {
if len(g.extenders) != 0 && nodes != nil {
combinedScores := make(map[string]int64, len(g.nodeInfoSnapshot.NodeInfoList))
for i := range g.extenders {
if !g.extenders[i].IsInterested(pod) {
continue
}
wg.Add(1)
@ -837,7 +834,7 @@ func PrioritizeNodes(
metrics.SchedulerGoroutines.WithLabelValues("prioritizing_extender").Dec()
wg.Done()
}()
prioritizedList, weight, err := extenders[extIndex].Prioritize(pod, nodes)
prioritizedList, weight, err := g.extenders[extIndex].Prioritize(pod, nodes)
if err != nil {
// Prioritization errors from extender can be ignored, let k8s/other extenders determine the priorities
return
@ -846,7 +843,7 @@ func PrioritizeNodes(
for i := range *prioritizedList {
host, score := (*prioritizedList)[i].Host, (*prioritizedList)[i].Score
if klog.V(10) {
klog.Infof("%v -> %v: %v, Score: (%d)", util.GetPodFullName(pod), host, extenders[extIndex].Name(), score)
klog.Infof("%v -> %v: %v, Score: (%d)", util.GetPodFullName(pod), host, g.extenders[extIndex].Name(), score)
}
combinedScores[host] += score * weight
}

View File

@ -1010,10 +1010,31 @@ func TestZeroRequest(t *testing.T) {
metaData := metaDataProducer(test.pod, test.nodes, snapshot)
list, err := PrioritizeNodes(
scheduler := NewGenericScheduler(
nil,
nil,
nil,
nil,
priorityConfigs,
metaDataProducer,
emptyFramework,
[]algorithm.SchedulerExtender{},
nil,
nil,
nil,
false,
false,
schedulerapi.DefaultPercentageOfNodesToScore,
false).(*genericScheduler)
scheduler.nodeInfoSnapshot = snapshot
list, err := scheduler.prioritizeNodes(
context.Background(),
test.pod, snapshot, metaData, priorityConfigs,
test.nodes, []algorithm.SchedulerExtender{}, emptyFramework, framework.NewCycleState())
framework.NewCycleState(),
test.pod,
metaData,
test.nodes,
)
if err != nil {
t.Errorf("unexpected error: %v", err)
}