Merge pull request #86496 from ahg-g/ahg1-check

Support AlwaysCheckAllPredicates in the scheduler framework.
This commit is contained in:
Kubernetes Prow Robot 2019-12-23 09:53:33 -08:00 committed by GitHub
commit 06fb3eb582
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 109 additions and 34 deletions

View File

@ -89,6 +89,9 @@ type Configurator struct {
// Disable pod preemption or not. // Disable pod preemption or not.
disablePreemption bool disablePreemption bool
// Always check all predicates even if the middle of one predicate fails.
alwaysCheckAllPredicates bool
// percentageOfNodesToScore specifies percentage of all nodes to score in each scheduling cycle. // percentageOfNodesToScore specifies percentage of all nodes to score in each scheduling cycle.
percentageOfNodesToScore int32 percentageOfNodesToScore int32
@ -202,6 +205,11 @@ func (c *Configurator) CreateFromConfig(policy schedulerapi.Policy) (*Scheduler,
c.hardPodAffinitySymmetricWeight = policy.HardPodAffinitySymmetricWeight c.hardPodAffinitySymmetricWeight = policy.HardPodAffinitySymmetricWeight
} }
// When AlwaysCheckAllPredicates is set to true, scheduler checks all the configured
// predicates even after one or more of them fails.
if policy.AlwaysCheckAllPredicates {
c.alwaysCheckAllPredicates = policy.AlwaysCheckAllPredicates
}
return c.CreateFromKeys(predicateKeys, priorityKeys, extenders) return c.CreateFromKeys(predicateKeys, priorityKeys, extenders)
} }
@ -250,6 +258,7 @@ func (c *Configurator) CreateFromKeys(predicateKeys, priorityKeys sets.String, e
framework.WithClientSet(c.client), framework.WithClientSet(c.client),
framework.WithInformerFactory(c.informerFactory), framework.WithInformerFactory(c.informerFactory),
framework.WithSnapshotSharedLister(c.nodeInfoSnapshot), framework.WithSnapshotSharedLister(c.nodeInfoSnapshot),
framework.WithRunAllFilters(c.alwaysCheckAllPredicates),
) )
if err != nil { if err != nil {
klog.Fatalf("error initializing the scheduling framework: %v", err) klog.Fatalf("error initializing the scheduling framework: %v", err)

View File

@ -78,6 +78,10 @@ type framework struct {
informerFactory informers.SharedInformerFactory informerFactory informers.SharedInformerFactory
metricsRecorder *metricsRecorder metricsRecorder *metricsRecorder
// Indicates that RunFilterPlugins should accumulate all failed statuses and not return
// after the first failure.
runAllFilters bool
} }
// extensionPoint encapsulates desired and applied set of plugins at a specific extension // extensionPoint encapsulates desired and applied set of plugins at a specific extension
@ -112,6 +116,7 @@ type frameworkOptions struct {
informerFactory informers.SharedInformerFactory informerFactory informers.SharedInformerFactory
snapshotSharedLister schedulerlisters.SharedLister snapshotSharedLister schedulerlisters.SharedLister
metricsRecorder *metricsRecorder metricsRecorder *metricsRecorder
runAllFilters bool
} }
// Option for the framework. // Option for the framework.
@ -138,6 +143,14 @@ func WithSnapshotSharedLister(snapshotSharedLister schedulerlisters.SharedLister
} }
} }
// WithRunAllFilters sets the runAllFilters flag, which means RunFilterPlugins accumulates
// all failure Statuses.
func WithRunAllFilters(runAllFilters bool) Option {
return func(o *frameworkOptions) {
o.runAllFilters = runAllFilters
}
}
// withMetricsRecorder is only used in tests. // withMetricsRecorder is only used in tests.
func withMetricsRecorder(recorder *metricsRecorder) Option { func withMetricsRecorder(recorder *metricsRecorder) Option {
return func(o *frameworkOptions) { return func(o *frameworkOptions) {
@ -166,6 +179,7 @@ func NewFramework(r Registry, plugins *config.Plugins, args []config.PluginConfi
clientSet: options.clientSet, clientSet: options.clientSet,
informerFactory: options.informerFactory, informerFactory: options.informerFactory,
metricsRecorder: options.metricsRecorder, metricsRecorder: options.metricsRecorder,
runAllFilters: options.runAllFilters,
} }
if plugins == nil { if plugins == nil {
return f, nil return f, nil
@ -395,27 +409,37 @@ func (f *framework) RunFilterPlugins(
state *CycleState, state *CycleState,
pod *v1.Pod, pod *v1.Pod,
nodeInfo *schedulernodeinfo.NodeInfo, nodeInfo *schedulernodeinfo.NodeInfo,
) (status *Status) { ) (finalStatus *Status) {
if state.ShouldRecordFrameworkMetrics() { if state.ShouldRecordFrameworkMetrics() {
startTime := time.Now() startTime := time.Now()
defer func() { defer func() {
f.metricsRecorder.observeExtensionPointDurationAsync(filter, status, metrics.SinceInSeconds(startTime)) f.metricsRecorder.observeExtensionPointDurationAsync(filter, finalStatus, metrics.SinceInSeconds(startTime))
}() }()
} }
for _, pl := range f.filterPlugins { for _, pl := range f.filterPlugins {
status = f.runFilterPlugin(ctx, pl, state, pod, nodeInfo) pluginStatus := f.runFilterPlugin(ctx, pl, state, pod, nodeInfo)
if !status.IsSuccess() { if !pluginStatus.IsSuccess() {
if !status.IsUnschedulable() { if !pluginStatus.IsUnschedulable() {
errMsg := fmt.Sprintf("error while running %q filter plugin for pod %q: %v", // Filter plugins are not supposed to return any status other than
pl.Name(), pod.Name, status.Message()) // Success or Unschedulable.
klog.Error(errMsg) return NewStatus(Error, fmt.Sprintf("running %q filter plugin for pod %q: %v", pl.Name(), pod.Name, pluginStatus.Message()))
return NewStatus(Error, errMsg)
} }
return status if !f.runAllFilters {
// Exit early if we don't need to run all filters.
return pluginStatus
}
// We need to continue and run all filters.
if finalStatus.IsSuccess() {
// This is the first failed plugin.
finalStatus = pluginStatus
continue
}
// We get here only if more than one Filter return unschedulable and runAllFilters is true.
finalStatus.reasons = append(finalStatus.reasons, pluginStatus.reasons...)
} }
} }
return nil return finalStatus
} }
func (f *framework) runFilterPlugin(ctx context.Context, pl FilterPlugin, state *CycleState, pod *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status { func (f *framework) runFilterPlugin(ctx context.Context, pl FilterPlugin, state *CycleState, pod *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status {

View File

@ -165,7 +165,7 @@ func (pl *TestPlugin) PreFilterExtensions() PreFilterExtensions {
} }
func (pl *TestPlugin) Filter(ctx context.Context, state *CycleState, pod *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status { func (pl *TestPlugin) Filter(ctx context.Context, state *CycleState, pod *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status {
return NewStatus(Code(pl.inj.FilterStatus), "injected status") return NewStatus(Code(pl.inj.FilterStatus), "injected filter status")
} }
func (pl *TestPlugin) PostFilter(ctx context.Context, state *CycleState, pod *v1.Pod, nodes []*v1.Node, filteredNodesStatuses NodeToStatusMap) *Status { func (pl *TestPlugin) PostFilter(ctx context.Context, state *CycleState, pod *v1.Pod, nodes []*v1.Node, filteredNodesStatuses NodeToStatusMap) *Status {
@ -600,7 +600,8 @@ func TestFilterPlugins(t *testing.T) {
tests := []struct { tests := []struct {
name string name string
plugins []*TestPlugin plugins []*TestPlugin
wantCode Code wantStatus *Status
runAllFilters bool
}{ }{
{ {
name: "SuccessFilter", name: "SuccessFilter",
@ -610,7 +611,7 @@ func TestFilterPlugins(t *testing.T) {
inj: injectedResult{FilterStatus: int(Success)}, inj: injectedResult{FilterStatus: int(Success)},
}, },
}, },
wantCode: Success, wantStatus: nil,
}, },
{ {
name: "ErrorFilter", name: "ErrorFilter",
@ -620,7 +621,7 @@ func TestFilterPlugins(t *testing.T) {
inj: injectedResult{FilterStatus: int(Error)}, inj: injectedResult{FilterStatus: int(Error)},
}, },
}, },
wantCode: Error, wantStatus: NewStatus(Error, `running "TestPlugin" filter plugin for pod "": injected filter status`),
}, },
{ {
name: "UnschedulableFilter", name: "UnschedulableFilter",
@ -630,7 +631,7 @@ func TestFilterPlugins(t *testing.T) {
inj: injectedResult{FilterStatus: int(Unschedulable)}, inj: injectedResult{FilterStatus: int(Unschedulable)},
}, },
}, },
wantCode: Unschedulable, wantStatus: NewStatus(Unschedulable, "injected filter status"),
}, },
{ {
name: "UnschedulableAndUnresolvableFilter", name: "UnschedulableAndUnresolvableFilter",
@ -641,7 +642,7 @@ func TestFilterPlugins(t *testing.T) {
FilterStatus: int(UnschedulableAndUnresolvable)}, FilterStatus: int(UnschedulableAndUnresolvable)},
}, },
}, },
wantCode: UnschedulableAndUnresolvable, wantStatus: NewStatus(UnschedulableAndUnresolvable, "injected filter status"),
}, },
// followings tests cover multiple-plugins scenarios // followings tests cover multiple-plugins scenarios
{ {
@ -657,7 +658,7 @@ func TestFilterPlugins(t *testing.T) {
inj: injectedResult{FilterStatus: int(Error)}, inj: injectedResult{FilterStatus: int(Error)},
}, },
}, },
wantCode: Error, wantStatus: NewStatus(Error, `running "TestPlugin1" filter plugin for pod "": injected filter status`),
}, },
{ {
name: "SuccessAndSuccessFilters", name: "SuccessAndSuccessFilters",
@ -672,7 +673,7 @@ func TestFilterPlugins(t *testing.T) {
inj: injectedResult{FilterStatus: int(Success)}, inj: injectedResult{FilterStatus: int(Success)},
}, },
}, },
wantCode: Success, wantStatus: nil,
}, },
{ {
name: "ErrorAndSuccessFilters", name: "ErrorAndSuccessFilters",
@ -686,7 +687,7 @@ func TestFilterPlugins(t *testing.T) {
inj: injectedResult{FilterStatus: int(Success)}, inj: injectedResult{FilterStatus: int(Success)},
}, },
}, },
wantCode: Error, wantStatus: NewStatus(Error, `running "TestPlugin1" filter plugin for pod "": injected filter status`),
}, },
{ {
name: "SuccessAndErrorFilters", name: "SuccessAndErrorFilters",
@ -701,7 +702,7 @@ func TestFilterPlugins(t *testing.T) {
inj: injectedResult{FilterStatus: int(Error)}, inj: injectedResult{FilterStatus: int(Error)},
}, },
}, },
wantCode: Error, wantStatus: NewStatus(Error, `running "TestPlugin2" filter plugin for pod "": injected filter status`),
}, },
{ {
name: "SuccessAndUnschedulableFilters", name: "SuccessAndUnschedulableFilters",
@ -716,7 +717,50 @@ func TestFilterPlugins(t *testing.T) {
inj: injectedResult{FilterStatus: int(Unschedulable)}, inj: injectedResult{FilterStatus: int(Unschedulable)},
}, },
}, },
wantCode: Unschedulable, wantStatus: NewStatus(Unschedulable, "injected filter status"),
},
{
name: "SuccessFilterWithRunAllFilters",
plugins: []*TestPlugin{
{
name: "TestPlugin",
inj: injectedResult{FilterStatus: int(Success)},
},
},
runAllFilters: true,
wantStatus: nil,
},
{
name: "ErrorAndErrorFilters",
plugins: []*TestPlugin{
{
name: "TestPlugin1",
inj: injectedResult{FilterStatus: int(Error)},
},
{
name: "TestPlugin2",
inj: injectedResult{FilterStatus: int(Error)},
},
},
runAllFilters: true,
wantStatus: NewStatus(Error, `running "TestPlugin1" filter plugin for pod "": injected filter status`),
},
{
name: "ErrorAndErrorFilters",
plugins: []*TestPlugin{
{
name: "TestPlugin1",
inj: injectedResult{FilterStatus: int(UnschedulableAndUnresolvable)},
},
{
name: "TestPlugin2",
inj: injectedResult{FilterStatus: int(Unschedulable)},
},
},
runAllFilters: true,
wantStatus: NewStatus(UnschedulableAndUnresolvable, "injected filter status", "injected filter status"),
}, },
} }
@ -739,13 +783,13 @@ func TestFilterPlugins(t *testing.T) {
config.Plugin{Name: pl.name}) config.Plugin{Name: pl.name})
} }
f, err := NewFramework(registry, cfgPls, emptyArgs) f, err := NewFramework(registry, cfgPls, emptyArgs, WithRunAllFilters(tt.runAllFilters))
if err != nil { if err != nil {
t.Fatalf("fail to create framework: %s", err) t.Fatalf("fail to create framework: %s", err)
} }
status := f.RunFilterPlugins(context.TODO(), nil, pod, nil) status := f.RunFilterPlugins(context.TODO(), nil, pod, nil)
if status.Code() != tt.wantCode { if !reflect.DeepEqual(status, tt.wantStatus) {
t.Errorf("Wrong status code. got: %v, want:%v", status.Code(), tt.wantCode) t.Errorf("Wrong status code. got: %v, want:%v", status, tt.wantStatus)
} }
}) })
} }

View File

@ -384,12 +384,10 @@ type Framework interface {
RunPreFilterPlugins(ctx context.Context, state *CycleState, pod *v1.Pod) *Status RunPreFilterPlugins(ctx context.Context, state *CycleState, pod *v1.Pod) *Status
// RunFilterPlugins runs the set of configured filter plugins for pod on // RunFilterPlugins runs the set of configured filter plugins for pod on
// the given node. It returns directly if any of the filter plugins // the given node. Note that for the node being evaluated, the passed nodeInfo
// return any status other than "Success". Note that for the node being // reference could be different from the one in NodeInfoSnapshot map (e.g., pods
// evaluated, the passed nodeInfo reference could be different from the // considered to be running on the node could be different). For example, during
// one in NodeInfoSnapshot map (e.g., pods considered to be running on // preemption, we may pass a copy of the original nodeInfo object that has some pods
// the node could be different). For example, during preemption, we may
// pass a copy of the original nodeInfo object that has some pods
// removed from it to evaluate the possibility of preempting them to // removed from it to evaluate the possibility of preempting them to
// schedule the target pod. // schedule the target pod.
RunFilterPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status RunFilterPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status

View File

@ -890,7 +890,7 @@ func TestSchedulerWithVolumeBinding(t *testing.T) {
FindErr: findErr, FindErr: findErr,
}, },
eventReason: "FailedScheduling", eventReason: "FailedScheduling",
expectError: fmt.Errorf("error while running %q filter plugin for pod %q: %v", volumebinding.Name, "foo", findErr), expectError: fmt.Errorf("running %q filter plugin for pod %q: %v", volumebinding.Name, "foo", findErr),
}, },
{ {
name: "assume error", name: "assume error",