From a7eb7ed5c6a364ba815e5dd32c1ba598b19cdb57 Mon Sep 17 00:00:00 2001 From: Kensei Nakada Date: Sun, 11 Jun 2023 03:29:05 +0000 Subject: [PATCH 1/2] refactor: simplify RunScorePlugins for readability + performance --- pkg/scheduler/framework/runtime/framework.go | 64 +++++++++----------- 1 file changed, 29 insertions(+), 35 deletions(-) diff --git a/pkg/scheduler/framework/runtime/framework.go b/pkg/scheduler/framework/runtime/framework.go index 780e0a7923e..623c0b95c50 100644 --- a/pkg/scheduler/framework/runtime/framework.go +++ b/pkg/scheduler/framework/runtime/framework.go @@ -1016,62 +1016,56 @@ func (f *frameworkImpl) RunScorePlugins(ctx context.Context, state *framework.Cy allNodePluginScores := make([]framework.NodePluginScores, len(nodes)) numPlugins := len(f.scorePlugins) - state.SkipScorePlugins.Len() plugins := make([]framework.ScorePlugin, 0, numPlugins) - pluginToNodeScores := make(map[string]framework.NodeScoreList, numPlugins) + pluginToNodeScores := make([]framework.NodeScoreList, numPlugins) for _, pl := range f.scorePlugins { if state.SkipScorePlugins.Has(pl.Name()) { continue } plugins = append(plugins, pl) - pluginToNodeScores[pl.Name()] = make(framework.NodeScoreList, len(nodes)) } ctx, cancel := context.WithCancel(ctx) defer cancel() errCh := parallelize.NewErrorChannel() - if len(plugins) > 0 { - logger := klog.FromContext(ctx) - logger = klog.LoggerWithName(logger, "Score") - // TODO(knelasevero): Remove duplicated keys from log entry calls - // When contextualized logging hits GA - // https://github.com/kubernetes/kubernetes/issues/111672 - logger = klog.LoggerWithValues(logger, "pod", klog.KObj(pod)) - // Run Score method for each node in parallel. - f.Parallelizer().Until(ctx, len(nodes), func(index int) { - nodeName := nodes[index].Name + logger := klog.FromContext(ctx) + logger = klog.LoggerWithName(logger, "Score") + // TODO(knelasevero): Remove duplicated keys from log entry calls + // When contextualized logging hits GA + // https://github.com/kubernetes/kubernetes/issues/111672 + logger = klog.LoggerWithValues(logger, "pod", klog.KObj(pod)) + // Run Score method for each node in parallel. + f.Parallelizer().Until(ctx, len(plugins), func(i int) { + pl := plugins[i] + logger := klog.LoggerWithName(logger, pl.Name()) + nodeScores := make(framework.NodeScoreList, len(nodes)) + for index, node := range nodes { + nodeName := node.Name logger := klog.LoggerWithValues(logger, "node", klog.ObjectRef{Name: nodeName}) - for _, pl := range plugins { - logger := klog.LoggerWithName(logger, pl.Name()) - ctx := klog.NewContext(ctx, logger) - s, status := f.runScorePlugin(ctx, pl, state, pod, nodeName) - if !status.IsSuccess() { - err := fmt.Errorf("plugin %q failed with: %w", pl.Name(), status.AsError()) - errCh.SendErrorWithCancel(err, cancel) - return - } - pluginToNodeScores[pl.Name()][index] = framework.NodeScore{ - Name: nodeName, - Score: s, - } + ctx := klog.NewContext(ctx, logger) + s, status := f.runScorePlugin(ctx, pl, state, pod, nodeName) + if !status.IsSuccess() { + err := fmt.Errorf("plugin %q failed with: %w", pl.Name(), status.AsError()) + errCh.SendErrorWithCancel(err, cancel) + return + } + nodeScores[index] = framework.NodeScore{ + Name: nodeName, + Score: s, } - }, metrics.Score) - if err := errCh.ReceiveError(); err != nil { - return nil, framework.AsStatus(fmt.Errorf("running Score plugins: %w", err)) } - } - // Run NormalizeScore method for each ScorePlugin in parallel. - f.Parallelizer().Until(ctx, len(plugins), func(index int) { - pl := plugins[index] if pl.ScoreExtensions() == nil { + pluginToNodeScores[i] = nodeScores return } - nodeScoreList := pluginToNodeScores[pl.Name()] - status := f.runScoreExtension(ctx, pl, state, pod, nodeScoreList) + + status := f.runScoreExtension(ctx, pl, state, pod, nodeScores) if !status.IsSuccess() { err := fmt.Errorf("plugin %q failed with: %w", pl.Name(), status.AsError()) errCh.SendErrorWithCancel(err, cancel) return } + pluginToNodeScores[i] = nodeScores }, metrics.Score) if err := errCh.ReceiveError(); err != nil { return nil, framework.AsStatus(fmt.Errorf("running Normalize on Score plugins: %w", err)) @@ -1087,7 +1081,7 @@ func (f *frameworkImpl) RunScorePlugins(ctx context.Context, state *framework.Cy for i, pl := range plugins { weight := f.scorePluginWeight[pl.Name()] - nodeScoreList := pluginToNodeScores[pl.Name()] + nodeScoreList := pluginToNodeScores[i] score := nodeScoreList[index].Score if score > framework.MaxNodeScore || score < framework.MinNodeScore { From be14b026e33b682c52750741a2256373e480b7d7 Mon Sep 17 00:00:00 2001 From: Kensei Nakada Date: Sun, 11 Jun 2023 04:47:18 +0000 Subject: [PATCH 2/2] fix the integration test --- .../scheduler/plugins/plugins_test.go | 5 ++++ .../scheduler/scoring/priorities_test.go | 25 +++++++++++++------ 2 files changed, 22 insertions(+), 8 deletions(-) diff --git a/test/integration/scheduler/plugins/plugins_test.go b/test/integration/scheduler/plugins/plugins_test.go index b0651cf1d81..94bf6d715c3 100644 --- a/test/integration/scheduler/plugins/plugins_test.go +++ b/test/integration/scheduler/plugins/plugins_test.go @@ -808,6 +808,11 @@ func TestPostFilterPlugin(t *testing.T) { {Name: filterPluginName}, }, }, + PreScore: configv1.PluginSet{ + Disabled: []configv1.Plugin{ + {Name: "*"}, + }, + }, Score: configv1.PluginSet{ Enabled: []configv1.Plugin{ {Name: scorePluginName}, diff --git a/test/integration/scheduler/scoring/priorities_test.go b/test/integration/scheduler/scoring/priorities_test.go index d50d4d19996..de687fa82cb 100644 --- a/test/integration/scheduler/scoring/priorities_test.go +++ b/test/integration/scheduler/scoring/priorities_test.go @@ -72,11 +72,16 @@ const ( ) // This file tests the scheduler priority functions. -func initTestSchedulerForPriorityTest(t *testing.T, scorePluginName string) *testutils.TestContext { - cfg := configtesting.V1ToInternalWithDefaults(t, configv1.KubeSchedulerConfiguration{ +func initTestSchedulerForPriorityTest(t *testing.T, preScorePluginName, scorePluginName string) *testutils.TestContext { + cc := configv1.KubeSchedulerConfiguration{ Profiles: []configv1.KubeSchedulerProfile{{ SchedulerName: pointer.String(v1.DefaultSchedulerName), Plugins: &configv1.Plugins{ + PreScore: configv1.PluginSet{ + Disabled: []configv1.Plugin{ + {Name: "*"}, + }, + }, Score: configv1.PluginSet{ Enabled: []configv1.Plugin{ {Name: scorePluginName, Weight: pointer.Int32(1)}, @@ -87,7 +92,11 @@ func initTestSchedulerForPriorityTest(t *testing.T, scorePluginName string) *tes }, }, }}, - }) + } + if preScorePluginName != "" { + cc.Profiles[0].Plugins.PreScore.Enabled = append(cc.Profiles[0].Plugins.PreScore.Enabled, configv1.Plugin{Name: preScorePluginName}) + } + cfg := configtesting.V1ToInternalWithDefaults(t, cc) testCtx := testutils.InitTestSchedulerWithOptions( t, testutils.InitTestAPIServer(t, strings.ToLower(scorePluginName), nil), @@ -201,7 +210,7 @@ func TestNodeResourcesScoring(t *testing.T) { // TestNodeAffinityScoring verifies that scheduler's node affinity priority function // works correctly. func TestNodeAffinityScoring(t *testing.T) { - testCtx := initTestSchedulerForPriorityTest(t, nodeaffinity.Name) + testCtx := initTestSchedulerForPriorityTest(t, nodeaffinity.Name, nodeaffinity.Name) // Add a few nodes. _, err := createAndWaitForNodesInCache(testCtx, "testnode", st.MakeNode(), 4) if err != nil { @@ -320,7 +329,7 @@ func TestPodAffinityScoring(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - testCtx := initTestSchedulerForPriorityTest(t, interpodaffinity.Name) + testCtx := initTestSchedulerForPriorityTest(t, interpodaffinity.Name, interpodaffinity.Name) // Add a few nodes. nodesInTopology, err := createAndWaitForNodesInCache(testCtx, "in-topology", st.MakeNode().Label(topologyKey, topologyValue), 5) if err != nil { @@ -364,7 +373,7 @@ func TestPodAffinityScoring(t *testing.T) { // TestImageLocalityScoring verifies that the scheduler's image locality priority function // works correctly, i.e., the pod gets scheduled to the node where its container images are ready. func TestImageLocalityScoring(t *testing.T) { - testCtx := initTestSchedulerForPriorityTest(t, imagelocality.Name) + testCtx := initTestSchedulerForPriorityTest(t, "", imagelocality.Name) // Create a node with the large image. // We use a fake large image as the test image used by the pod, which has @@ -596,7 +605,7 @@ func TestPodTopologySpreadScoring(t *testing.T) { defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.NodeInclusionPolicyInPodTopologySpread, tt.enableNodeInclusionPolicy)() defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.MatchLabelKeysInPodTopologySpread, tt.enableMatchLabelKeys)() - testCtx := initTestSchedulerForPriorityTest(t, podtopologyspread.Name) + testCtx := initTestSchedulerForPriorityTest(t, podtopologyspread.Name, podtopologyspread.Name) cs := testCtx.ClientSet ns := testCtx.NS.Name @@ -646,7 +655,7 @@ func TestPodTopologySpreadScoring(t *testing.T) { // with the system default spreading spreads Pods belonging to a Service. // The setup has 300 nodes over 3 zones. func TestDefaultPodTopologySpreadScoring(t *testing.T) { - testCtx := initTestSchedulerForPriorityTest(t, podtopologyspread.Name) + testCtx := initTestSchedulerForPriorityTest(t, podtopologyspread.Name, podtopologyspread.Name) cs := testCtx.ClientSet ns := testCtx.NS.Name