From 786be73b4ba5ad986b2e6afd28105cab066c7546 Mon Sep 17 00:00:00 2001 From: Kensei Nakada Date: Thu, 20 Oct 2022 00:05:13 +0000 Subject: [PATCH] feature(scheduler): won't run Filter if PreFilter returned a Skip status --- pkg/scheduler/framework/cycle_state.go | 5 + pkg/scheduler/framework/interface.go | 1 + pkg/scheduler/framework/runtime/framework.go | 18 +- .../framework/runtime/framework_test.go | 192 +++++++++++++++--- 4 files changed, 181 insertions(+), 35 deletions(-) diff --git a/pkg/scheduler/framework/cycle_state.go b/pkg/scheduler/framework/cycle_state.go index 7f4a1351258..f414a17358c 100644 --- a/pkg/scheduler/framework/cycle_state.go +++ b/pkg/scheduler/framework/cycle_state.go @@ -19,6 +19,8 @@ package framework import ( "errors" "sync" + + "k8s.io/apimachinery/pkg/util/sets" ) var ( @@ -48,6 +50,8 @@ type CycleState struct { storage sync.Map // if recordPluginMetrics is true, PluginExecutionDuration will be recorded for this cycle. recordPluginMetrics bool + // SkipFilterPlugins are plugins that will be skipped in the Filter extension point. + SkipFilterPlugins sets.String } // NewCycleState initializes a new CycleState and returns its pointer. @@ -83,6 +87,7 @@ func (c *CycleState) Clone() *CycleState { return true }) copy.recordPluginMetrics = c.recordPluginMetrics + copy.SkipFilterPlugins = c.SkipFilterPlugins return copy } diff --git a/pkg/scheduler/framework/interface.go b/pkg/scheduler/framework/interface.go index 67b12bd7baf..6cae9bd1cdb 100644 --- a/pkg/scheduler/framework/interface.go +++ b/pkg/scheduler/framework/interface.go @@ -91,6 +91,7 @@ const ( // Wait is used when a Permit plugin finds a pod scheduling should wait. Wait // Skip is used when a Bind plugin chooses to skip binding. + // Also, if a PreFilter plugin returns Skip, coupled Filter plugin will be skipped. Skip ) diff --git a/pkg/scheduler/framework/runtime/framework.go b/pkg/scheduler/framework/runtime/framework.go index 3d9aef7b495..81979c11d56 100644 --- a/pkg/scheduler/framework/runtime/framework.go +++ b/pkg/scheduler/framework/runtime/framework.go @@ -592,8 +592,10 @@ func (f *frameworkImpl) QueueSortFunc() framework.LessFunc { // RunPreFilterPlugins runs the set of configured PreFilter plugins. It returns // *Status and its code is set to non-success if any of the plugins returns -// anything but Success. If a non-success status is returned, then the scheduling -// cycle is aborted. +// anything but Success/Skip. +// Plugins that returned Skip status are recorded in the cyclestate, +// and they are skipped in the Filter extension point. +// If a non-success status is returned, then the scheduling cycle is aborted. func (f *frameworkImpl) RunPreFilterPlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod) (_ *framework.PreFilterResult, status *framework.Status) { startTime := time.Now() defer func() { @@ -601,15 +603,19 @@ func (f *frameworkImpl) RunPreFilterPlugins(ctx context.Context, state *framewor }() var result *framework.PreFilterResult var pluginsWithNodes []string + skipPlugins := sets.NewString() for _, pl := range f.preFilterPlugins { r, s := f.runPreFilterPlugin(ctx, pl, state, pod) - if !s.IsSuccess() { + if !s.IsSuccess() && !s.IsSkip() { s.SetFailedPlugin(pl.Name()) if s.IsUnschedulable() { return nil, s } return nil, framework.AsStatus(fmt.Errorf("running PreFilter plugin %q: %w", pl.Name(), s.AsError())).WithFailedPlugin(pl.Name()) } + if s.IsSkip() { + skipPlugins.Insert(pl.Name()) + } if !r.AllNodes() { pluginsWithNodes = append(pluginsWithNodes, pl.Name()) } @@ -621,8 +627,8 @@ func (f *frameworkImpl) RunPreFilterPlugins(ctx context.Context, state *framewor } return nil, framework.NewStatus(framework.Unschedulable, msg) } - } + state.SkipFilterPlugins = skipPlugins return result, nil } @@ -716,8 +722,12 @@ func (f *frameworkImpl) RunFilterPlugins( pod *v1.Pod, nodeInfo *framework.NodeInfo, ) framework.PluginToStatus { + skippedPlugins := state.SkipFilterPlugins statuses := make(framework.PluginToStatus) for _, pl := range f.filterPlugins { + if skippedPlugins.Has(pl.Name()) { + continue + } pluginStatus := f.runFilterPlugin(ctx, pl, state, pod, nodeInfo) if !pluginStatus.IsSuccess() { if !pluginStatus.IsUnschedulable() { diff --git a/pkg/scheduler/framework/runtime/framework_test.go b/pkg/scheduler/framework/runtime/framework_test.go index 472ee0deb80..0aee06c5a4e 100644 --- a/pkg/scheduler/framework/runtime/framework_test.go +++ b/pkg/scheduler/framework/runtime/framework_test.go @@ -1325,7 +1325,7 @@ func TestPreFilterPlugins(t *testing.T) { if err != nil { t.Fatalf("Failed to create framework for testing: %v", err) } - f.RunPreFilterPlugins(ctx, nil, nil) + f.RunPreFilterPlugins(ctx, framework.NewCycleState(), nil) f.RunPreFilterExtensionAddPod(ctx, nil, nil, nil, nil) f.RunPreFilterExtensionRemovePod(ctx, nil, nil, nil, nil) @@ -1344,40 +1344,152 @@ func TestPreFilterPlugins(t *testing.T) { }) } -func TestRunPreFilterPluginsStatus(t *testing.T) { - preFilter := &TestPlugin{ - name: preFilterPluginName, - inj: injectedResult{PreFilterStatus: int(framework.Error)}, +func TestRunPreFilterPlugins(t *testing.T) { + tests := []struct { + name string + plugins []*TestPlugin + wantPreFilterResult *framework.PreFilterResult + wantSkippedPlugins sets.String + wantStatus *framework.Status + }{ + { + name: "all PreFilter returned success", + plugins: []*TestPlugin{ + { + name: "success1", + }, + { + name: "success2", + }, + }, + wantPreFilterResult: nil, + wantStatus: nil, + }, + { + name: "one PreFilter plugin returned success, but another PreFilter plugin returned non-success", + plugins: []*TestPlugin{ + { + name: "success", + }, + { + name: "error", + inj: injectedResult{PreFilterStatus: int(framework.Error)}, + }, + }, + wantPreFilterResult: nil, + wantStatus: framework.AsStatus(fmt.Errorf("running PreFilter plugin %q: %w", "error", errInjectedStatus)).WithFailedPlugin("error"), + }, + { + name: "one PreFilter plugin returned skip, but another PreFilter plugin returned non-success", + plugins: []*TestPlugin{ + { + name: "skip", + inj: injectedResult{PreFilterStatus: int(framework.Skip)}, + }, + { + name: "error", + inj: injectedResult{PreFilterStatus: int(framework.Error)}, + }, + }, + wantPreFilterResult: nil, + wantStatus: framework.AsStatus(fmt.Errorf("running PreFilter plugin %q: %w", "error", errInjectedStatus)).WithFailedPlugin("error"), + }, + { + name: "all PreFilter plugins returned skip", + plugins: []*TestPlugin{ + { + name: "skip1", + inj: injectedResult{PreFilterStatus: int(framework.Skip)}, + }, + { + name: "skip2", + inj: injectedResult{PreFilterStatus: int(framework.Skip)}, + }, + { + name: "skip3", + inj: injectedResult{PreFilterStatus: int(framework.Skip)}, + }, + }, + wantPreFilterResult: nil, + wantSkippedPlugins: sets.NewString("skip1", "skip2", "skip3"), + wantStatus: nil, + }, + { + name: "some PreFilter plugins returned skip", + plugins: []*TestPlugin{ + { + name: "skip1", + inj: injectedResult{PreFilterStatus: int(framework.Skip)}, + }, + { + name: "success1", + }, + { + name: "skip2", + inj: injectedResult{PreFilterStatus: int(framework.Skip)}, + }, + { + name: "success2", + }, + }, + wantPreFilterResult: nil, + wantSkippedPlugins: sets.NewString("skip1", "skip2"), + wantStatus: nil, + }, } - r := make(Registry) - r.Register(preFilterPluginName, - func(_ runtime.Object, fh framework.Handle) (framework.Plugin, error) { - return preFilter, nil + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + r := make(Registry) + enabled := make([]config.Plugin, len(tt.plugins)) + for i, p := range tt.plugins { + p := p + enabled[i].Name = p.name + r.Register(p.name, func(_ runtime.Object, fh framework.Handle) (framework.Plugin, error) { + return p, nil + }) + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + f, err := newFrameworkWithQueueSortAndBind( + r, + config.KubeSchedulerProfile{Plugins: &config.Plugins{PreFilter: config.PluginSet{Enabled: enabled}}}, + ctx.Done(), + ) + if err != nil { + t.Fatalf("Failed to create framework for testing: %v", err) + } + + state := framework.NewCycleState() + result, status := f.RunPreFilterPlugins(ctx, state, nil) + if d := cmp.Diff(result, tt.wantPreFilterResult); d != "" { + t.Errorf("wrong status. got: %v, want: %v, diff: %s", result, tt.wantPreFilterResult, d) + } + if d := cmp.Diff(status, tt.wantStatus, cmp.Comparer(func(a, b *framework.Status) bool { + if a.Code() == framework.Error && b.Code() == framework.Error { + // we assume two error status is equal to each other if both contain the same reasons. + return cmp.Equal(a.Reasons(), b.Reasons()) + } + return a.Equal(b) + })); d != "" { + t.Errorf("wrong status. got: %v, want: %v, diff: %s", status, tt.wantStatus, d) + } + skipped := state.SkipFilterPlugins + if d := cmp.Diff(skipped, tt.wantSkippedPlugins); d != "" { + t.Errorf("wrong skip filter plugins. got: %v, want: %v, diff: %s", skipped, tt.wantSkippedPlugins, d) + } }) - - plugins := &config.Plugins{PreFilter: config.PluginSet{Enabled: []config.Plugin{{Name: preFilterPluginName}}}} - - profile := config.KubeSchedulerProfile{Plugins: plugins} - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - f, err := newFrameworkWithQueueSortAndBind(r, profile, ctx.Done()) - if err != nil { - t.Fatalf("Failed to create framework for testing: %v", err) - } - _, status := f.RunPreFilterPlugins(ctx, nil, nil) - wantStatus := framework.AsStatus(fmt.Errorf("running PreFilter plugin %q: %w", preFilter.Name(), errInjectedStatus)).WithFailedPlugin(preFilter.Name()) - if !reflect.DeepEqual(status, wantStatus) { - t.Errorf("wrong status. got: %v, want:%v", status, wantStatus) } } func TestFilterPlugins(t *testing.T) { tests := []struct { - name string - plugins []*TestPlugin - wantStatus *framework.Status - wantStatusMap framework.PluginToStatus + name string + plugins []*TestPlugin + skippedPlugins sets.String + wantStatus *framework.Status + wantStatusMap framework.PluginToStatus }{ { name: "SuccessFilter", @@ -1387,7 +1499,6 @@ func TestFilterPlugins(t *testing.T) { inj: injectedResult{FilterStatus: int(framework.Success)}, }, }, - wantStatus: nil, wantStatusMap: framework.PluginToStatus{}, }, { @@ -1465,6 +1576,23 @@ func TestFilterPlugins(t *testing.T) { wantStatus: nil, wantStatusMap: framework.PluginToStatus{}, }, + { + name: "SuccessAndSkipFilters", + plugins: []*TestPlugin{ + { + name: "TestPlugin1", + inj: injectedResult{FilterStatus: int(framework.Success)}, + }, + + { + name: "TestPlugin2", + inj: injectedResult{FilterStatus: int(framework.Error)}, // To make sure this plugins isn't called, set error as an injected result. + }, + }, + wantStatus: nil, + skippedPlugins: sets.NewString("TestPlugin2"), + wantStatusMap: framework.PluginToStatus{}, + }, { name: "ErrorAndSuccessFilters", plugins: []*TestPlugin{ @@ -1545,7 +1673,9 @@ func TestFilterPlugins(t *testing.T) { if err != nil { t.Fatalf("fail to create framework: %s", err) } - gotStatusMap := f.RunFilterPlugins(ctx, nil, pod, nil) + state := framework.NewCycleState() + state.SkipFilterPlugins = tt.skippedPlugins + gotStatusMap := f.RunFilterPlugins(ctx, state, pod, nil) gotStatus := gotStatusMap.Merge() if !reflect.DeepEqual(gotStatus, tt.wantStatus) { t.Errorf("wrong status code. got: %v, want:%v", gotStatus, tt.wantStatus) @@ -1779,7 +1909,7 @@ func TestFilterPluginsWithNominatedPods(t *testing.T) { t.Fatalf("fail to create framework: %s", err) } tt.nodeInfo.SetNode(tt.node) - gotStatus := f.RunFilterPluginsWithNominatedPods(ctx, nil, tt.pod, tt.nodeInfo) + gotStatus := f.RunFilterPluginsWithNominatedPods(ctx, framework.NewCycleState(), tt.pod, tt.nodeInfo) if !reflect.DeepEqual(gotStatus, tt.wantStatus) { t.Errorf("Unexpected status. got: %v, want: %v", gotStatus, tt.wantStatus) }