refactor: simplify RunScorePlugins for readability + performance

This commit is contained in:
Kensei Nakada 2023-06-11 03:29:05 +00:00
parent 336e46101c
commit a7eb7ed5c6

View File

@ -1016,19 +1016,17 @@ func (f *frameworkImpl) RunScorePlugins(ctx context.Context, state *framework.Cy
allNodePluginScores := make([]framework.NodePluginScores, len(nodes)) allNodePluginScores := make([]framework.NodePluginScores, len(nodes))
numPlugins := len(f.scorePlugins) - state.SkipScorePlugins.Len() numPlugins := len(f.scorePlugins) - state.SkipScorePlugins.Len()
plugins := make([]framework.ScorePlugin, 0, numPlugins) plugins := make([]framework.ScorePlugin, 0, numPlugins)
pluginToNodeScores := make(map[string]framework.NodeScoreList, numPlugins) pluginToNodeScores := make([]framework.NodeScoreList, numPlugins)
for _, pl := range f.scorePlugins { for _, pl := range f.scorePlugins {
if state.SkipScorePlugins.Has(pl.Name()) { if state.SkipScorePlugins.Has(pl.Name()) {
continue continue
} }
plugins = append(plugins, pl) plugins = append(plugins, pl)
pluginToNodeScores[pl.Name()] = make(framework.NodeScoreList, len(nodes))
} }
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
errCh := parallelize.NewErrorChannel() errCh := parallelize.NewErrorChannel()
if len(plugins) > 0 {
logger := klog.FromContext(ctx) logger := klog.FromContext(ctx)
logger = klog.LoggerWithName(logger, "Score") logger = klog.LoggerWithName(logger, "Score")
// TODO(knelasevero): Remove duplicated keys from log entry calls // TODO(knelasevero): Remove duplicated keys from log entry calls
@ -1036,11 +1034,13 @@ func (f *frameworkImpl) RunScorePlugins(ctx context.Context, state *framework.Cy
// https://github.com/kubernetes/kubernetes/issues/111672 // https://github.com/kubernetes/kubernetes/issues/111672
logger = klog.LoggerWithValues(logger, "pod", klog.KObj(pod)) logger = klog.LoggerWithValues(logger, "pod", klog.KObj(pod))
// Run Score method for each node in parallel. // Run Score method for each node in parallel.
f.Parallelizer().Until(ctx, len(nodes), func(index int) { f.Parallelizer().Until(ctx, len(plugins), func(i int) {
nodeName := nodes[index].Name pl := plugins[i]
logger := klog.LoggerWithValues(logger, "node", klog.ObjectRef{Name: nodeName})
for _, pl := range plugins {
logger := klog.LoggerWithName(logger, pl.Name()) logger := klog.LoggerWithName(logger, pl.Name())
nodeScores := make(framework.NodeScoreList, len(nodes))
for index, node := range nodes {
nodeName := node.Name
logger := klog.LoggerWithValues(logger, "node", klog.ObjectRef{Name: nodeName})
ctx := klog.NewContext(ctx, logger) ctx := klog.NewContext(ctx, logger)
s, status := f.runScorePlugin(ctx, pl, state, pod, nodeName) s, status := f.runScorePlugin(ctx, pl, state, pod, nodeName)
if !status.IsSuccess() { if !status.IsSuccess() {
@ -1048,30 +1048,24 @@ func (f *frameworkImpl) RunScorePlugins(ctx context.Context, state *framework.Cy
errCh.SendErrorWithCancel(err, cancel) errCh.SendErrorWithCancel(err, cancel)
return return
} }
pluginToNodeScores[pl.Name()][index] = framework.NodeScore{ nodeScores[index] = framework.NodeScore{
Name: nodeName, Name: nodeName,
Score: s, Score: s,
} }
} }
}, metrics.Score)
if err := errCh.ReceiveError(); err != nil {
return nil, framework.AsStatus(fmt.Errorf("running Score plugins: %w", err))
}
}
// Run NormalizeScore method for each ScorePlugin in parallel.
f.Parallelizer().Until(ctx, len(plugins), func(index int) {
pl := plugins[index]
if pl.ScoreExtensions() == nil { if pl.ScoreExtensions() == nil {
pluginToNodeScores[i] = nodeScores
return return
} }
nodeScoreList := pluginToNodeScores[pl.Name()]
status := f.runScoreExtension(ctx, pl, state, pod, nodeScoreList) status := f.runScoreExtension(ctx, pl, state, pod, nodeScores)
if !status.IsSuccess() { if !status.IsSuccess() {
err := fmt.Errorf("plugin %q failed with: %w", pl.Name(), status.AsError()) err := fmt.Errorf("plugin %q failed with: %w", pl.Name(), status.AsError())
errCh.SendErrorWithCancel(err, cancel) errCh.SendErrorWithCancel(err, cancel)
return return
} }
pluginToNodeScores[i] = nodeScores
}, metrics.Score) }, metrics.Score)
if err := errCh.ReceiveError(); err != nil { if err := errCh.ReceiveError(); err != nil {
return nil, framework.AsStatus(fmt.Errorf("running Normalize on Score plugins: %w", err)) return nil, framework.AsStatus(fmt.Errorf("running Normalize on Score plugins: %w", err))
@ -1087,7 +1081,7 @@ func (f *frameworkImpl) RunScorePlugins(ctx context.Context, state *framework.Cy
for i, pl := range plugins { for i, pl := range plugins {
weight := f.scorePluginWeight[pl.Name()] weight := f.scorePluginWeight[pl.Name()]
nodeScoreList := pluginToNodeScores[pl.Name()] nodeScoreList := pluginToNodeScores[i]
score := nodeScoreList[index].Score score := nodeScoreList[index].Score
if score > framework.MaxNodeScore || score < framework.MinNodeScore { if score > framework.MaxNodeScore || score < framework.MinNodeScore {