diff --git a/pkg/scheduler/framework/cycle_state.go b/pkg/scheduler/framework/cycle_state.go index e227c7da865..9227fe545d8 100644 --- a/pkg/scheduler/framework/cycle_state.go +++ b/pkg/scheduler/framework/cycle_state.go @@ -52,6 +52,8 @@ type CycleState struct { recordPluginMetrics bool // SkipFilterPlugins are plugins that will be skipped in the Filter extension point. SkipFilterPlugins sets.Set[string] + // SkipScorePlugins are plugins that will be skipped in the Score extension point. + SkipScorePlugins sets.Set[string] } // NewCycleState initializes a new CycleState and returns its pointer. @@ -88,6 +90,7 @@ func (c *CycleState) Clone() *CycleState { }) copy.recordPluginMetrics = c.recordPluginMetrics copy.SkipFilterPlugins = c.SkipFilterPlugins + copy.SkipScorePlugins = c.SkipScorePlugins return copy } diff --git a/pkg/scheduler/framework/interface.go b/pkg/scheduler/framework/interface.go index 07640e5be58..8ce836620c4 100644 --- a/pkg/scheduler/framework/interface.go +++ b/pkg/scheduler/framework/interface.go @@ -94,6 +94,7 @@ const ( // Skip is used in the following scenarios: // - when a Bind plugin chooses to skip binding. // - when a PreFilter plugin returns Skip so that coupled Filter plugin/PreFilterExtensions() will be skipped. + // - when a PreScore plugin returns Skip so that coupled Score plugin will be skipped. Skip ) @@ -411,6 +412,8 @@ type PreScorePlugin interface { // PreScore is called by the scheduling framework after a list of nodes // passed the filtering phase. All prescore plugins must return success or // the pod will be rejected + // When it returns Skip status, other fields in status are just ignored, + // and coupled Score plugin will be skipped in this scheduling cycle. PreScore(ctx context.Context, state *CycleState, pod *v1.Pod, nodes []*v1.Node) *Status } diff --git a/pkg/scheduler/framework/runtime/framework.go b/pkg/scheduler/framework/runtime/framework.go index 1dbce09ff11..da2db2b6023 100644 --- a/pkg/scheduler/framework/runtime/framework.go +++ b/pkg/scheduler/framework/runtime/framework.go @@ -887,7 +887,9 @@ func addNominatedPods(ctx context.Context, fh framework.Handle, pod *v1.Pod, sta } // RunPreScorePlugins runs the set of configured pre-score plugins. If any -// of these plugins returns any status other than "Success", the given pod is rejected. +// of these plugins returns any status other than Success/Skip, the given pod is rejected. +// When it returns Skip status, other fields in status are just ignored, +// and coupled Score plugin will be skipped in this scheduling cycle. func (f *frameworkImpl) RunPreScorePlugins( ctx context.Context, state *framework.CycleState, @@ -898,13 +900,18 @@ func (f *frameworkImpl) RunPreScorePlugins( defer func() { metrics.FrameworkExtensionPointDuration.WithLabelValues(preScore, status.Code().String(), f.profileName).Observe(metrics.SinceInSeconds(startTime)) }() + skipPlugins := sets.New[string]() for _, pl := range f.preScorePlugins { status = f.runPreScorePlugin(ctx, pl, state, pod, nodes) + if status.IsSkip() { + skipPlugins.Insert(pl.Name()) + continue + } if !status.IsSuccess() { return framework.AsStatus(fmt.Errorf("running PreScore plugin %q: %w", pl.Name(), status.AsError())) } } - + state.SkipScorePlugins = skipPlugins return nil } @@ -928,37 +935,45 @@ func (f *frameworkImpl) RunScorePlugins(ctx context.Context, state *framework.Cy metrics.FrameworkExtensionPointDuration.WithLabelValues(score, status.Code().String(), f.profileName).Observe(metrics.SinceInSeconds(startTime)) }() allNodePluginScores := make([]framework.NodePluginScores, len(nodes)) - pluginToNodeScores := make(map[string]framework.NodeScoreList, len(f.scorePlugins)) + numPlugins := len(f.scorePlugins) - state.SkipScorePlugins.Len() + plugins := make([]framework.ScorePlugin, 0, numPlugins) + pluginToNodeScores := make(map[string]framework.NodeScoreList, numPlugins) for _, pl := range f.scorePlugins { + if state.SkipScorePlugins.Has(pl.Name()) { + continue + } + plugins = append(plugins, pl) pluginToNodeScores[pl.Name()] = make(framework.NodeScoreList, len(nodes)) } ctx, cancel := context.WithCancel(ctx) defer cancel() errCh := parallelize.NewErrorChannel() - // Run Score method for each node in parallel. - f.Parallelizer().Until(ctx, len(nodes), func(index int) { - for _, pl := range f.scorePlugins { - nodeName := nodes[index].Name - s, status := f.runScorePlugin(ctx, pl, state, pod, nodeName) - if !status.IsSuccess() { - err := fmt.Errorf("plugin %q failed with: %w", pl.Name(), status.AsError()) - errCh.SendErrorWithCancel(err, cancel) - return - } - pluginToNodeScores[pl.Name()][index] = framework.NodeScore{ - Name: nodeName, - Score: s, + if len(plugins) > 0 { + // Run Score method for each node in parallel. + f.Parallelizer().Until(ctx, len(nodes), func(index int) { + for _, pl := range plugins { + nodeName := nodes[index].Name + s, status := f.runScorePlugin(ctx, pl, state, pod, nodeName) + if !status.IsSuccess() { + err := fmt.Errorf("plugin %q failed with: %w", pl.Name(), status.AsError()) + errCh.SendErrorWithCancel(err, cancel) + return + } + pluginToNodeScores[pl.Name()][index] = framework.NodeScore{ + Name: nodeName, + Score: s, + } } + }, score) + if err := errCh.ReceiveError(); err != nil { + return nil, framework.AsStatus(fmt.Errorf("running Score plugins: %w", err)) } - }, score) - if err := errCh.ReceiveError(); err != nil { - return nil, framework.AsStatus(fmt.Errorf("running Score plugins: %w", err)) } // Run NormalizeScore method for each ScorePlugin in parallel. - f.Parallelizer().Until(ctx, len(f.scorePlugins), func(index int) { - pl := f.scorePlugins[index] + f.Parallelizer().Until(ctx, len(plugins), func(index int) { + pl := plugins[index] if pl.ScoreExtensions() == nil { return } @@ -979,10 +994,10 @@ func (f *frameworkImpl) RunScorePlugins(ctx context.Context, state *framework.Cy f.Parallelizer().Until(ctx, len(nodes), func(index int) { nodePluginScores := framework.NodePluginScores{ Name: nodes[index].Name, - Scores: make([]framework.PluginScore, len(f.scorePlugins)), + Scores: make([]framework.PluginScore, len(plugins)), } - for i, pl := range f.scorePlugins { + for i, pl := range plugins { weight := f.scorePluginWeight[pl.Name()] nodeScoreList := pluginToNodeScores[pl.Name()] score := nodeScoreList[index].Score diff --git a/pkg/scheduler/framework/runtime/framework_test.go b/pkg/scheduler/framework/runtime/framework_test.go index 680a48f3411..a07af5259b4 100644 --- a/pkg/scheduler/framework/runtime/framework_test.go +++ b/pkg/scheduler/framework/runtime/framework_test.go @@ -1061,13 +1061,139 @@ func TestPreEnqueuePlugins(t *testing.T) { } } +func TestRunPreScorePlugins(t *testing.T) { + tests := []struct { + name string + plugins []*TestPlugin + wantSkippedPlugins sets.Set[string] + wantStatusCode framework.Code + }{ + { + name: "all PreScorePlugins returned success", + plugins: []*TestPlugin{ + { + name: "success1", + }, + { + name: "success2", + }, + }, + wantStatusCode: framework.Success, + }, + { + name: "one PreScore plugin returned success, but another PreScore plugin returned non-success", + plugins: []*TestPlugin{ + { + name: "success", + }, + { + name: "error", + inj: injectedResult{PreScoreStatus: int(framework.Error)}, + }, + }, + wantStatusCode: framework.Error, + }, + { + name: "one PreScore plugin returned skip, but another PreScore plugin returned non-success", + plugins: []*TestPlugin{ + { + name: "skip", + inj: injectedResult{PreScoreStatus: int(framework.Skip)}, + }, + { + name: "error", + inj: injectedResult{PreScoreStatus: int(framework.Error)}, + }, + }, + wantStatusCode: framework.Error, + }, + { + name: "all PreScore plugins returned skip", + plugins: []*TestPlugin{ + { + name: "skip1", + inj: injectedResult{PreScoreStatus: int(framework.Skip)}, + }, + { + name: "skip2", + inj: injectedResult{PreScoreStatus: int(framework.Skip)}, + }, + { + name: "skip3", + inj: injectedResult{PreScoreStatus: int(framework.Skip)}, + }, + }, + wantSkippedPlugins: sets.New("skip1", "skip2", "skip3"), + wantStatusCode: framework.Success, + }, + { + name: "some PreScore plugins returned skip", + plugins: []*TestPlugin{ + { + name: "skip1", + inj: injectedResult{PreScoreStatus: int(framework.Skip)}, + }, + { + name: "success1", + }, + { + name: "skip2", + inj: injectedResult{PreScoreStatus: int(framework.Skip)}, + }, + { + name: "success2", + }, + }, + wantSkippedPlugins: sets.New("skip1", "skip2"), + wantStatusCode: framework.Success, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + r := make(Registry) + enabled := make([]config.Plugin, len(tt.plugins)) + for i, p := range tt.plugins { + p := p + enabled[i].Name = p.name + r.Register(p.name, func(_ runtime.Object, fh framework.Handle) (framework.Plugin, error) { + return p, nil + }) + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + f, err := newFrameworkWithQueueSortAndBind( + r, + config.KubeSchedulerProfile{Plugins: &config.Plugins{PreScore: config.PluginSet{Enabled: enabled}}}, + ctx.Done(), + ) + if err != nil { + t.Fatalf("Failed to create framework for testing: %v", err) + } + + state := framework.NewCycleState() + status := f.RunPreScorePlugins(ctx, state, nil, nil) + if status.Code() != tt.wantStatusCode { + t.Errorf("wrong status code. got: %v, want: %v", status, tt.wantStatusCode) + } + skipped := state.SkipScorePlugins + if d := cmp.Diff(skipped, tt.wantSkippedPlugins); d != "" { + t.Errorf("wrong skip score plugins. got: %v, want: %v, diff: %s", skipped, tt.wantSkippedPlugins, d) + } + }) + } +} + func TestRunScorePlugins(t *testing.T) { tests := []struct { - name string - registry Registry - plugins *config.Plugins - pluginConfigs []config.PluginConfig - want []framework.NodePluginScores + name string + registry Registry + plugins *config.Plugins + pluginConfigs []config.PluginConfig + want []framework.NodePluginScores + skippedPlugins sets.Set[string] // If err is true, we expect RunScorePlugin to fail. err bool }{ @@ -1345,6 +1471,70 @@ func TestRunScorePlugins(t *testing.T) { }, }, }, + { + name: "one success plugin, one skip plugin", + plugins: buildScoreConfigDefaultWeights(scorePlugin1, scoreWithNormalizePlugin1), + pluginConfigs: []config.PluginConfig{ + { + Name: scorePlugin1, + Args: &runtime.Unknown{ + Raw: []byte(`{ "scoreRes": 1 }`), + }, + }, + { + Name: scoreWithNormalizePlugin1, + Args: &runtime.Unknown{ + Raw: []byte(`{ "scoreStatus": 1 }`), // To make sure this plugin isn't called, set error as an injected result. + }, + }, + }, + skippedPlugins: sets.New(scoreWithNormalizePlugin1), + want: []framework.NodePluginScores{ + { + Name: "node1", + Scores: []framework.PluginScore{ + { + Name: scorePlugin1, + Score: 1, + }, + }, + TotalScore: 1, + }, + { + Name: "node2", + Scores: []framework.PluginScore{ + { + Name: scorePlugin1, + Score: 1, + }, + }, + TotalScore: 1, + }, + }, + }, + { + name: "all plugins are skipped in prescore", + plugins: buildScoreConfigDefaultWeights(scorePlugin1), + pluginConfigs: []config.PluginConfig{ + { + Name: scorePlugin1, + Args: &runtime.Unknown{ + Raw: []byte(`{ "scoreStatus": 1 }`), // To make sure this plugin isn't called, set error as an injected result. + }, + }, + }, + skippedPlugins: sets.New(scorePlugin1), + want: []framework.NodePluginScores{ + { + Name: "node1", + Scores: []framework.PluginScore{}, + }, + { + Name: "node2", + Scores: []framework.PluginScore{}, + }, + }, + }, } for _, tt := range tests { @@ -1361,6 +1551,8 @@ func TestRunScorePlugins(t *testing.T) { t.Fatalf("Failed to create framework for testing: %v", err) } + state := framework.NewCycleState() + state.SkipScorePlugins = tt.skippedPlugins res, status := f.RunScorePlugins(ctx, state, pod, nodes) if tt.err { diff --git a/pkg/scheduler/schedule_one_test.go b/pkg/scheduler/schedule_one_test.go index 2ce7ddd2a38..5d0936a9a9e 100644 --- a/pkg/scheduler/schedule_one_test.go +++ b/pkg/scheduler/schedule_one_test.go @@ -2016,6 +2016,21 @@ func TestSchedulerSchedulePod(t *testing.T) { wantNodes: sets.NewString("node2", "node3"), wantEvaluatedNodes: pointer.Int32(3), }, + { + name: "test all prescore plugins return skip", + registerPlugins: []st.RegisterPluginFunc{ + st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), + st.RegisterFilterPlugin("TrueFilter", st.NewTrueFilterPlugin), + st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), + st.RegisterPluginAsExtensions("FakePreScoreAndScorePlugin", st.NewFakePreScoreAndScorePlugin("FakePreScoreAndScorePlugin", 0, + framework.NewStatus(framework.Skip, "fake skip"), + framework.NewStatus(framework.Error, "this score function shouldn't be executed because this plugin returned Skip in the PreScore"), + ), "PreScore", "Score"), + }, + nodes: []string{"node1", "node2"}, + pod: st.MakePod().Name("ignore").UID("ignore").Obj(), + wantNodes: sets.NewString("node1", "node2"), + }, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { @@ -2514,6 +2529,70 @@ func Test_prioritizeNodes(t *testing.T) { }, }, }, + { + name: "plugin which returned skip in preScore shouldn't be executed in the score phase", + pod: &v1.Pod{}, + nodes: []*v1.Node{makeNode("node1", 1000, schedutil.DefaultMemoryRequest*10), makeNode("node2", 1000, schedutil.DefaultMemoryRequest*10)}, + pluginRegistrations: []st.RegisterPluginFunc{ + st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), + st.RegisterScorePlugin(noderesources.BalancedAllocationName, frameworkruntime.FactoryAdapter(feature.Features{}, noderesources.NewBalancedAllocation), 1), + st.RegisterScorePlugin("Node2Prioritizer", st.NewNode2PrioritizerPlugin(), 1), + st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), + st.RegisterPluginAsExtensions("FakePreScoreAndScorePlugin", st.NewFakePreScoreAndScorePlugin("FakePreScoreAndScorePlugin", 0, + framework.NewStatus(framework.Skip, "fake skip"), + framework.NewStatus(framework.Error, "this score function shouldn't be executed because this plugin returned Skip in the PreScore"), + ), "PreScore", "Score"), + }, + extenders: nil, + want: []framework.NodePluginScores{ + { + Name: "node1", + Scores: []framework.PluginScore{ + { + Name: "Node2Prioritizer", + Score: 10, + }, + { + Name: "NodeResourcesBalancedAllocation", + Score: 100, + }, + }, + TotalScore: 110, + }, + { + Name: "node2", + Scores: []framework.PluginScore{ + { + Name: "Node2Prioritizer", + Score: 100, + }, + { + Name: "NodeResourcesBalancedAllocation", + Score: 100, + }, + }, + TotalScore: 200, + }, + }, + }, + { + name: "all score plugins are skipped", + pod: &v1.Pod{}, + nodes: []*v1.Node{makeNode("node1", 1000, schedutil.DefaultMemoryRequest*10), makeNode("node2", 1000, schedutil.DefaultMemoryRequest*10)}, + pluginRegistrations: []st.RegisterPluginFunc{ + st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), + st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), + st.RegisterPluginAsExtensions("FakePreScoreAndScorePlugin", st.NewFakePreScoreAndScorePlugin("FakePreScoreAndScorePlugin", 0, + framework.NewStatus(framework.Skip, "fake skip"), + framework.NewStatus(framework.Error, "this score function shouldn't be executed because this plugin returned Skip in the PreScore"), + ), "PreScore", "Score"), + }, + extenders: nil, + want: []framework.NodePluginScores{ + {Name: "node1", Scores: []framework.PluginScore{}}, + {Name: "node2", Scores: []framework.PluginScore{}}, + }, + }, } for _, test := range tests { @@ -2537,7 +2616,6 @@ func Test_prioritizeNodes(t *testing.T) { } state := framework.NewCycleState() - fwk.RunPreScorePlugins(ctx, state, test.pod, test.nodes) var extenders []framework.Extender for ii := range test.extenders { extenders = append(extenders, &test.extenders[ii]) diff --git a/pkg/scheduler/testing/fake_plugins.go b/pkg/scheduler/testing/fake_plugins.go index 0367f965e47..ce6fafa4a8f 100644 --- a/pkg/scheduler/testing/fake_plugins.go +++ b/pkg/scheduler/testing/fake_plugins.go @@ -245,3 +245,38 @@ func NewFakePermitPlugin(status *framework.Status, timeout time.Duration) framew }, nil } } + +type FakePreScoreAndScorePlugin struct { + name string + score int64 + preScoreStatus *framework.Status + scoreStatus *framework.Status +} + +// Name returns name of the plugin. +func (pl *FakePreScoreAndScorePlugin) Name() string { + return pl.name +} + +func (pl *FakePreScoreAndScorePlugin) Score(ctx context.Context, state *framework.CycleState, p *v1.Pod, nodeName string) (int64, *framework.Status) { + return pl.score, pl.scoreStatus +} + +func (pl *FakePreScoreAndScorePlugin) ScoreExtensions() framework.ScoreExtensions { + return nil +} + +func (pl *FakePreScoreAndScorePlugin) PreScore(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodes []*v1.Node) *framework.Status { + return pl.preScoreStatus +} + +func NewFakePreScoreAndScorePlugin(name string, score int64, preScoreStatus, scoreStatus *framework.Status) frameworkruntime.PluginFactory { + return func(_ runtime.Object, _ framework.Handle) (framework.Plugin, error) { + return &FakePreScoreAndScorePlugin{ + name: name, + score: score, + preScoreStatus: preScoreStatus, + scoreStatus: scoreStatus, + }, nil + } +}