diff --git a/pkg/scheduler/factory.go b/pkg/scheduler/factory.go index b93389167b4..33a0a98b288 100644 --- a/pkg/scheduler/factory.go +++ b/pkg/scheduler/factory.go @@ -89,6 +89,9 @@ type Configurator struct { // Disable pod preemption or not. disablePreemption bool + // Always check all predicates even if the middle of one predicate fails. + alwaysCheckAllPredicates bool + // percentageOfNodesToScore specifies percentage of all nodes to score in each scheduling cycle. percentageOfNodesToScore int32 @@ -202,6 +205,11 @@ func (c *Configurator) CreateFromConfig(policy schedulerapi.Policy) (*Scheduler, c.hardPodAffinitySymmetricWeight = policy.HardPodAffinitySymmetricWeight } + // When AlwaysCheckAllPredicates is set to true, scheduler checks all the configured + // predicates even after one or more of them fails. + if policy.AlwaysCheckAllPredicates { + c.alwaysCheckAllPredicates = policy.AlwaysCheckAllPredicates + } return c.CreateFromKeys(predicateKeys, priorityKeys, extenders) } @@ -250,6 +258,7 @@ func (c *Configurator) CreateFromKeys(predicateKeys, priorityKeys sets.String, e framework.WithClientSet(c.client), framework.WithInformerFactory(c.informerFactory), framework.WithSnapshotSharedLister(c.nodeInfoSnapshot), + framework.WithRunAllFilters(c.alwaysCheckAllPredicates), ) if err != nil { klog.Fatalf("error initializing the scheduling framework: %v", err) diff --git a/pkg/scheduler/framework/v1alpha1/framework.go b/pkg/scheduler/framework/v1alpha1/framework.go index 6e37c0469ff..1c456874ebc 100644 --- a/pkg/scheduler/framework/v1alpha1/framework.go +++ b/pkg/scheduler/framework/v1alpha1/framework.go @@ -78,6 +78,10 @@ type framework struct { informerFactory informers.SharedInformerFactory metricsRecorder *metricsRecorder + + // Indicates that RunFilterPlugins should accumulate all failed statuses and not return + // after the first failure. + runAllFilters bool } // extensionPoint encapsulates desired and applied set of plugins at a specific extension @@ -112,6 +116,7 @@ type frameworkOptions struct { informerFactory informers.SharedInformerFactory snapshotSharedLister schedulerlisters.SharedLister metricsRecorder *metricsRecorder + runAllFilters bool } // Option for the framework. @@ -138,6 +143,14 @@ func WithSnapshotSharedLister(snapshotSharedLister schedulerlisters.SharedLister } } +// WithRunAllFilters sets the runAllFilters flag, which means RunFilterPlugins accumulates +// all failure Statuses. +func WithRunAllFilters(runAllFilters bool) Option { + return func(o *frameworkOptions) { + o.runAllFilters = runAllFilters + } +} + // withMetricsRecorder is only used in tests. func withMetricsRecorder(recorder *metricsRecorder) Option { return func(o *frameworkOptions) { @@ -166,6 +179,7 @@ func NewFramework(r Registry, plugins *config.Plugins, args []config.PluginConfi clientSet: options.clientSet, informerFactory: options.informerFactory, metricsRecorder: options.metricsRecorder, + runAllFilters: options.runAllFilters, } if plugins == nil { return f, nil @@ -395,27 +409,37 @@ func (f *framework) RunFilterPlugins( state *CycleState, pod *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo, -) (status *Status) { +) (finalStatus *Status) { if state.ShouldRecordFrameworkMetrics() { startTime := time.Now() defer func() { - f.metricsRecorder.observeExtensionPointDurationAsync(filter, status, metrics.SinceInSeconds(startTime)) + f.metricsRecorder.observeExtensionPointDurationAsync(filter, finalStatus, metrics.SinceInSeconds(startTime)) }() } for _, pl := range f.filterPlugins { - status = f.runFilterPlugin(ctx, pl, state, pod, nodeInfo) - if !status.IsSuccess() { - if !status.IsUnschedulable() { - errMsg := fmt.Sprintf("error while running %q filter plugin for pod %q: %v", - pl.Name(), pod.Name, status.Message()) - klog.Error(errMsg) - return NewStatus(Error, errMsg) + pluginStatus := f.runFilterPlugin(ctx, pl, state, pod, nodeInfo) + if !pluginStatus.IsSuccess() { + if !pluginStatus.IsUnschedulable() { + // Filter plugins are not supposed to return any status other than + // Success or Unschedulable. + return NewStatus(Error, fmt.Sprintf("running %q filter plugin for pod %q: %v", pl.Name(), pod.Name, pluginStatus.Message())) } - return status + if !f.runAllFilters { + // Exit early if we don't need to run all filters. + return pluginStatus + } + // We need to continue and run all filters. + if finalStatus.IsSuccess() { + // This is the first failed plugin. + finalStatus = pluginStatus + continue + } + // We get here only if more than one Filter return unschedulable and runAllFilters is true. + finalStatus.reasons = append(finalStatus.reasons, pluginStatus.reasons...) } } - return nil + return finalStatus } func (f *framework) runFilterPlugin(ctx context.Context, pl FilterPlugin, state *CycleState, pod *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status { diff --git a/pkg/scheduler/framework/v1alpha1/framework_test.go b/pkg/scheduler/framework/v1alpha1/framework_test.go index 5be6dd81107..0aa5c800595 100644 --- a/pkg/scheduler/framework/v1alpha1/framework_test.go +++ b/pkg/scheduler/framework/v1alpha1/framework_test.go @@ -165,7 +165,7 @@ func (pl *TestPlugin) PreFilterExtensions() PreFilterExtensions { } func (pl *TestPlugin) Filter(ctx context.Context, state *CycleState, pod *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status { - return NewStatus(Code(pl.inj.FilterStatus), "injected status") + return NewStatus(Code(pl.inj.FilterStatus), "injected filter status") } func (pl *TestPlugin) PostFilter(ctx context.Context, state *CycleState, pod *v1.Pod, nodes []*v1.Node, filteredNodesStatuses NodeToStatusMap) *Status { @@ -598,9 +598,10 @@ func TestPreFilterPlugins(t *testing.T) { func TestFilterPlugins(t *testing.T) { tests := []struct { - name string - plugins []*TestPlugin - wantCode Code + name string + plugins []*TestPlugin + wantStatus *Status + runAllFilters bool }{ { name: "SuccessFilter", @@ -610,7 +611,7 @@ func TestFilterPlugins(t *testing.T) { inj: injectedResult{FilterStatus: int(Success)}, }, }, - wantCode: Success, + wantStatus: nil, }, { name: "ErrorFilter", @@ -620,7 +621,7 @@ func TestFilterPlugins(t *testing.T) { inj: injectedResult{FilterStatus: int(Error)}, }, }, - wantCode: Error, + wantStatus: NewStatus(Error, `running "TestPlugin" filter plugin for pod "": injected filter status`), }, { name: "UnschedulableFilter", @@ -630,7 +631,7 @@ func TestFilterPlugins(t *testing.T) { inj: injectedResult{FilterStatus: int(Unschedulable)}, }, }, - wantCode: Unschedulable, + wantStatus: NewStatus(Unschedulable, "injected filter status"), }, { name: "UnschedulableAndUnresolvableFilter", @@ -641,7 +642,7 @@ func TestFilterPlugins(t *testing.T) { FilterStatus: int(UnschedulableAndUnresolvable)}, }, }, - wantCode: UnschedulableAndUnresolvable, + wantStatus: NewStatus(UnschedulableAndUnresolvable, "injected filter status"), }, // followings tests cover multiple-plugins scenarios { @@ -657,7 +658,7 @@ func TestFilterPlugins(t *testing.T) { inj: injectedResult{FilterStatus: int(Error)}, }, }, - wantCode: Error, + wantStatus: NewStatus(Error, `running "TestPlugin1" filter plugin for pod "": injected filter status`), }, { name: "SuccessAndSuccessFilters", @@ -672,7 +673,7 @@ func TestFilterPlugins(t *testing.T) { inj: injectedResult{FilterStatus: int(Success)}, }, }, - wantCode: Success, + wantStatus: nil, }, { name: "ErrorAndSuccessFilters", @@ -686,7 +687,7 @@ func TestFilterPlugins(t *testing.T) { inj: injectedResult{FilterStatus: int(Success)}, }, }, - wantCode: Error, + wantStatus: NewStatus(Error, `running "TestPlugin1" filter plugin for pod "": injected filter status`), }, { name: "SuccessAndErrorFilters", @@ -701,7 +702,7 @@ func TestFilterPlugins(t *testing.T) { inj: injectedResult{FilterStatus: int(Error)}, }, }, - wantCode: Error, + wantStatus: NewStatus(Error, `running "TestPlugin2" filter plugin for pod "": injected filter status`), }, { name: "SuccessAndUnschedulableFilters", @@ -716,7 +717,50 @@ func TestFilterPlugins(t *testing.T) { inj: injectedResult{FilterStatus: int(Unschedulable)}, }, }, - wantCode: Unschedulable, + wantStatus: NewStatus(Unschedulable, "injected filter status"), + }, + { + name: "SuccessFilterWithRunAllFilters", + plugins: []*TestPlugin{ + { + name: "TestPlugin", + inj: injectedResult{FilterStatus: int(Success)}, + }, + }, + runAllFilters: true, + wantStatus: nil, + }, + { + name: "ErrorAndErrorFilters", + plugins: []*TestPlugin{ + { + name: "TestPlugin1", + inj: injectedResult{FilterStatus: int(Error)}, + }, + + { + name: "TestPlugin2", + inj: injectedResult{FilterStatus: int(Error)}, + }, + }, + runAllFilters: true, + wantStatus: NewStatus(Error, `running "TestPlugin1" filter plugin for pod "": injected filter status`), + }, + { + name: "ErrorAndErrorFilters", + plugins: []*TestPlugin{ + { + name: "TestPlugin1", + inj: injectedResult{FilterStatus: int(UnschedulableAndUnresolvable)}, + }, + + { + name: "TestPlugin2", + inj: injectedResult{FilterStatus: int(Unschedulable)}, + }, + }, + runAllFilters: true, + wantStatus: NewStatus(UnschedulableAndUnresolvable, "injected filter status", "injected filter status"), }, } @@ -739,13 +783,13 @@ func TestFilterPlugins(t *testing.T) { config.Plugin{Name: pl.name}) } - f, err := NewFramework(registry, cfgPls, emptyArgs) + f, err := NewFramework(registry, cfgPls, emptyArgs, WithRunAllFilters(tt.runAllFilters)) if err != nil { t.Fatalf("fail to create framework: %s", err) } status := f.RunFilterPlugins(context.TODO(), nil, pod, nil) - if status.Code() != tt.wantCode { - t.Errorf("Wrong status code. got: %v, want:%v", status.Code(), tt.wantCode) + if !reflect.DeepEqual(status, tt.wantStatus) { + t.Errorf("Wrong status code. got: %v, want:%v", status, tt.wantStatus) } }) } diff --git a/pkg/scheduler/framework/v1alpha1/interface.go b/pkg/scheduler/framework/v1alpha1/interface.go index eed3c66d64f..56f7a4ebdaf 100644 --- a/pkg/scheduler/framework/v1alpha1/interface.go +++ b/pkg/scheduler/framework/v1alpha1/interface.go @@ -379,12 +379,10 @@ type Framework interface { RunPreFilterPlugins(ctx context.Context, state *CycleState, pod *v1.Pod) *Status // RunFilterPlugins runs the set of configured filter plugins for pod on - // the given node. It returns directly if any of the filter plugins - // return any status other than "Success". Note that for the node being - // evaluated, the passed nodeInfo reference could be different from the - // one in NodeInfoSnapshot map (e.g., pods considered to be running on - // the node could be different). For example, during preemption, we may - // pass a copy of the original nodeInfo object that has some pods + // the given node. Note that for the node being evaluated, the passed nodeInfo + // reference could be different from the one in NodeInfoSnapshot map (e.g., pods + // considered to be running on the node could be different). For example, during + // preemption, we may pass a copy of the original nodeInfo object that has some pods // removed from it to evaluate the possibility of preempting them to // schedule the target pod. RunFilterPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index 052734f8e63..59ba739de2d 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -897,7 +897,7 @@ func TestSchedulerWithVolumeBinding(t *testing.T) { FindErr: findErr, }, eventReason: "FailedScheduling", - expectError: fmt.Errorf("error while running %q filter plugin for pod %q: %v", volumebinding.Name, "foo", findErr), + expectError: fmt.Errorf("running %q filter plugin for pod %q: %v", volumebinding.Name, "foo", findErr), }, { name: "assume error",