From 2863b3d1ab7e4a99e3809561910560783b5ae2d9 Mon Sep 17 00:00:00 2001 From: AxeZhan Date: Thu, 20 Jul 2023 10:50:32 +0800 Subject: [PATCH] Revert "refactor: simplify RunScorePlugins for readability + performance" This reverts commit a7eb7ed5c6a364ba815e5dd32c1ba598b19cdb57. --- pkg/scheduler/framework/runtime/framework.go | 64 +++++++++++--------- 1 file changed, 35 insertions(+), 29 deletions(-) diff --git a/pkg/scheduler/framework/runtime/framework.go b/pkg/scheduler/framework/runtime/framework.go index 679bdb995cf..f83b3d45472 100644 --- a/pkg/scheduler/framework/runtime/framework.go +++ b/pkg/scheduler/framework/runtime/framework.go @@ -1017,56 +1017,62 @@ func (f *frameworkImpl) RunScorePlugins(ctx context.Context, state *framework.Cy allNodePluginScores := make([]framework.NodePluginScores, len(nodes)) numPlugins := len(f.scorePlugins) - state.SkipScorePlugins.Len() plugins := make([]framework.ScorePlugin, 0, numPlugins) - pluginToNodeScores := make([]framework.NodeScoreList, numPlugins) + pluginToNodeScores := make(map[string]framework.NodeScoreList, numPlugins) for _, pl := range f.scorePlugins { if state.SkipScorePlugins.Has(pl.Name()) { continue } plugins = append(plugins, pl) + pluginToNodeScores[pl.Name()] = make(framework.NodeScoreList, len(nodes)) } ctx, cancel := context.WithCancel(ctx) defer cancel() errCh := parallelize.NewErrorChannel() - logger := klog.FromContext(ctx) - logger = klog.LoggerWithName(logger, "Score") - // TODO(knelasevero): Remove duplicated keys from log entry calls - // When contextualized logging hits GA - // https://github.com/kubernetes/kubernetes/issues/111672 - logger = klog.LoggerWithValues(logger, "pod", klog.KObj(pod)) - // Run Score method for each node in parallel. - f.Parallelizer().Until(ctx, len(plugins), func(i int) { - pl := plugins[i] - logger := klog.LoggerWithName(logger, pl.Name()) - nodeScores := make(framework.NodeScoreList, len(nodes)) - for index, node := range nodes { - nodeName := node.Name + if len(plugins) > 0 { + logger := klog.FromContext(ctx) + logger = klog.LoggerWithName(logger, "Score") + // TODO(knelasevero): Remove duplicated keys from log entry calls + // When contextualized logging hits GA + // https://github.com/kubernetes/kubernetes/issues/111672 + logger = klog.LoggerWithValues(logger, "pod", klog.KObj(pod)) + // Run Score method for each node in parallel. + f.Parallelizer().Until(ctx, len(nodes), func(index int) { + nodeName := nodes[index].Name logger := klog.LoggerWithValues(logger, "node", klog.ObjectRef{Name: nodeName}) - ctx := klog.NewContext(ctx, logger) - s, status := f.runScorePlugin(ctx, pl, state, pod, nodeName) - if !status.IsSuccess() { - err := fmt.Errorf("plugin %q failed with: %w", pl.Name(), status.AsError()) - errCh.SendErrorWithCancel(err, cancel) - return - } - nodeScores[index] = framework.NodeScore{ - Name: nodeName, - Score: s, + for _, pl := range plugins { + logger := klog.LoggerWithName(logger, pl.Name()) + ctx := klog.NewContext(ctx, logger) + s, status := f.runScorePlugin(ctx, pl, state, pod, nodeName) + if !status.IsSuccess() { + err := fmt.Errorf("plugin %q failed with: %w", pl.Name(), status.AsError()) + errCh.SendErrorWithCancel(err, cancel) + return + } + pluginToNodeScores[pl.Name()][index] = framework.NodeScore{ + Name: nodeName, + Score: s, + } } + }, metrics.Score) + if err := errCh.ReceiveError(); err != nil { + return nil, framework.AsStatus(fmt.Errorf("running Score plugins: %w", err)) } + } + // Run NormalizeScore method for each ScorePlugin in parallel. + f.Parallelizer().Until(ctx, len(plugins), func(index int) { + pl := plugins[index] if pl.ScoreExtensions() == nil { - pluginToNodeScores[i] = nodeScores return } - - status := f.runScoreExtension(ctx, pl, state, pod, nodeScores) + nodeScoreList := pluginToNodeScores[pl.Name()] + status := f.runScoreExtension(ctx, pl, state, pod, nodeScoreList) if !status.IsSuccess() { err := fmt.Errorf("plugin %q failed with: %w", pl.Name(), status.AsError()) errCh.SendErrorWithCancel(err, cancel) return } - pluginToNodeScores[i] = nodeScores }, metrics.Score) if err := errCh.ReceiveError(); err != nil { return nil, framework.AsStatus(fmt.Errorf("running Normalize on Score plugins: %w", err)) @@ -1082,7 +1088,7 @@ func (f *frameworkImpl) RunScorePlugins(ctx context.Context, state *framework.Cy for i, pl := range plugins { weight := f.scorePluginWeight[pl.Name()] - nodeScoreList := pluginToNodeScores[i] + nodeScoreList := pluginToNodeScores[pl.Name()] score := nodeScoreList[index].Score if score > framework.MaxNodeScore || score < framework.MinNodeScore {