Update scheduler's RunFilterPlugins to return a plugin to status map

This commit is contained in:
Abdullah Gharaibeh 2020-01-08 13:26:10 -05:00
parent 79bb357193
commit f3c7a4c823
6 changed files with 116 additions and 28 deletions

View File

@ -616,7 +616,8 @@ func (g *genericScheduler) podFitsOnNode(
break break
} }
status = g.framework.RunFilterPlugins(ctx, stateToUse, pod, nodeInfoToUse) statusMap := g.framework.RunFilterPlugins(ctx, stateToUse, pod, nodeInfoToUse)
status = statusMap.Merge()
if !status.IsSuccess() && !status.IsUnschedulable() { if !status.IsSuccess() && !status.IsUnschedulable() {
return false, status, status.AsError() return false, status, status.AsError()
} }

View File

@ -415,37 +415,36 @@ func (f *framework) RunFilterPlugins(
state *CycleState, state *CycleState,
pod *v1.Pod, pod *v1.Pod,
nodeInfo *schedulernodeinfo.NodeInfo, nodeInfo *schedulernodeinfo.NodeInfo,
) (finalStatus *Status) { ) PluginToStatus {
var firstFailedStatus *Status
if state.ShouldRecordFrameworkMetrics() { if state.ShouldRecordFrameworkMetrics() {
startTime := time.Now() startTime := time.Now()
defer func() { defer func() {
f.metricsRecorder.observeExtensionPointDurationAsync(filter, finalStatus, metrics.SinceInSeconds(startTime)) f.metricsRecorder.observeExtensionPointDurationAsync(filter, firstFailedStatus, metrics.SinceInSeconds(startTime))
}() }()
} }
statuses := make(PluginToStatus)
for _, pl := range f.filterPlugins { for _, pl := range f.filterPlugins {
pluginStatus := f.runFilterPlugin(ctx, pl, state, pod, nodeInfo) pluginStatus := f.runFilterPlugin(ctx, pl, state, pod, nodeInfo)
if len(statuses) == 0 {
firstFailedStatus = pluginStatus
}
if !pluginStatus.IsSuccess() { if !pluginStatus.IsSuccess() {
if !pluginStatus.IsUnschedulable() { if !pluginStatus.IsUnschedulable() {
// Filter plugins are not supposed to return any status other than // Filter plugins are not supposed to return any status other than
// Success or Unschedulable. // Success or Unschedulable.
return NewStatus(Error, fmt.Sprintf("running %q filter plugin for pod %q: %v", pl.Name(), pod.Name, pluginStatus.Message())) firstFailedStatus = NewStatus(Error, fmt.Sprintf("running %q filter plugin for pod %q: %v", pl.Name(), pod.Name, pluginStatus.Message()))
return map[string]*Status{pl.Name(): firstFailedStatus}
} }
statuses[pl.Name()] = pluginStatus
if !f.runAllFilters { if !f.runAllFilters {
// Exit early if we don't need to run all filters. // Exit early if we don't need to run all filters.
return pluginStatus return statuses
} }
// We need to continue and run all filters.
if finalStatus.IsSuccess() {
// This is the first failed plugin.
finalStatus = pluginStatus
continue
}
// We get here only if more than one Filter return unschedulable and runAllFilters is true.
finalStatus.reasons = append(finalStatus.reasons, pluginStatus.reasons...)
} }
} }
return finalStatus return statuses
} }
func (f *framework) runFilterPlugin(ctx context.Context, pl FilterPlugin, state *CycleState, pod *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status { func (f *framework) runFilterPlugin(ctx context.Context, pl FilterPlugin, state *CycleState, pod *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status {

View File

@ -601,6 +601,7 @@ func TestFilterPlugins(t *testing.T) {
name string name string
plugins []*TestPlugin plugins []*TestPlugin
wantStatus *Status wantStatus *Status
wantStatusMap PluginToStatus
runAllFilters bool runAllFilters bool
}{ }{
{ {
@ -611,7 +612,8 @@ func TestFilterPlugins(t *testing.T) {
inj: injectedResult{FilterStatus: int(Success)}, inj: injectedResult{FilterStatus: int(Success)},
}, },
}, },
wantStatus: nil, wantStatus: nil,
wantStatusMap: PluginToStatus{},
}, },
{ {
name: "ErrorFilter", name: "ErrorFilter",
@ -621,7 +623,8 @@ func TestFilterPlugins(t *testing.T) {
inj: injectedResult{FilterStatus: int(Error)}, inj: injectedResult{FilterStatus: int(Error)},
}, },
}, },
wantStatus: NewStatus(Error, `running "TestPlugin" filter plugin for pod "": injected filter status`), wantStatus: NewStatus(Error, `running "TestPlugin" filter plugin for pod "": injected filter status`),
wantStatusMap: PluginToStatus{"TestPlugin": NewStatus(Error, `running "TestPlugin" filter plugin for pod "": injected filter status`)},
}, },
{ {
name: "UnschedulableFilter", name: "UnschedulableFilter",
@ -631,7 +634,8 @@ func TestFilterPlugins(t *testing.T) {
inj: injectedResult{FilterStatus: int(Unschedulable)}, inj: injectedResult{FilterStatus: int(Unschedulable)},
}, },
}, },
wantStatus: NewStatus(Unschedulable, "injected filter status"), wantStatus: NewStatus(Unschedulable, "injected filter status"),
wantStatusMap: PluginToStatus{"TestPlugin": NewStatus(Unschedulable, "injected filter status")},
}, },
{ {
name: "UnschedulableAndUnresolvableFilter", name: "UnschedulableAndUnresolvableFilter",
@ -642,7 +646,8 @@ func TestFilterPlugins(t *testing.T) {
FilterStatus: int(UnschedulableAndUnresolvable)}, FilterStatus: int(UnschedulableAndUnresolvable)},
}, },
}, },
wantStatus: NewStatus(UnschedulableAndUnresolvable, "injected filter status"), wantStatus: NewStatus(UnschedulableAndUnresolvable, "injected filter status"),
wantStatusMap: PluginToStatus{"TestPlugin": NewStatus(UnschedulableAndUnresolvable, "injected filter status")},
}, },
// followings tests cover multiple-plugins scenarios // followings tests cover multiple-plugins scenarios
{ {
@ -658,7 +663,8 @@ func TestFilterPlugins(t *testing.T) {
inj: injectedResult{FilterStatus: int(Error)}, inj: injectedResult{FilterStatus: int(Error)},
}, },
}, },
wantStatus: NewStatus(Error, `running "TestPlugin1" filter plugin for pod "": injected filter status`), wantStatus: NewStatus(Error, `running "TestPlugin1" filter plugin for pod "": injected filter status`),
wantStatusMap: PluginToStatus{"TestPlugin1": NewStatus(Error, `running "TestPlugin1" filter plugin for pod "": injected filter status`)},
}, },
{ {
name: "SuccessAndSuccessFilters", name: "SuccessAndSuccessFilters",
@ -673,7 +679,8 @@ func TestFilterPlugins(t *testing.T) {
inj: injectedResult{FilterStatus: int(Success)}, inj: injectedResult{FilterStatus: int(Success)},
}, },
}, },
wantStatus: nil, wantStatus: nil,
wantStatusMap: PluginToStatus{},
}, },
{ {
name: "ErrorAndSuccessFilters", name: "ErrorAndSuccessFilters",
@ -687,7 +694,8 @@ func TestFilterPlugins(t *testing.T) {
inj: injectedResult{FilterStatus: int(Success)}, inj: injectedResult{FilterStatus: int(Success)},
}, },
}, },
wantStatus: NewStatus(Error, `running "TestPlugin1" filter plugin for pod "": injected filter status`), wantStatus: NewStatus(Error, `running "TestPlugin1" filter plugin for pod "": injected filter status`),
wantStatusMap: PluginToStatus{"TestPlugin1": NewStatus(Error, `running "TestPlugin1" filter plugin for pod "": injected filter status`)},
}, },
{ {
name: "SuccessAndErrorFilters", name: "SuccessAndErrorFilters",
@ -702,7 +710,8 @@ func TestFilterPlugins(t *testing.T) {
inj: injectedResult{FilterStatus: int(Error)}, inj: injectedResult{FilterStatus: int(Error)},
}, },
}, },
wantStatus: NewStatus(Error, `running "TestPlugin2" filter plugin for pod "": injected filter status`), wantStatus: NewStatus(Error, `running "TestPlugin2" filter plugin for pod "": injected filter status`),
wantStatusMap: PluginToStatus{"TestPlugin2": NewStatus(Error, `running "TestPlugin2" filter plugin for pod "": injected filter status`)},
}, },
{ {
name: "SuccessAndUnschedulableFilters", name: "SuccessAndUnschedulableFilters",
@ -717,7 +726,8 @@ func TestFilterPlugins(t *testing.T) {
inj: injectedResult{FilterStatus: int(Unschedulable)}, inj: injectedResult{FilterStatus: int(Unschedulable)},
}, },
}, },
wantStatus: NewStatus(Unschedulable, "injected filter status"), wantStatus: NewStatus(Unschedulable, "injected filter status"),
wantStatusMap: PluginToStatus{"TestPlugin2": NewStatus(Unschedulable, "injected filter status")},
}, },
{ {
name: "SuccessFilterWithRunAllFilters", name: "SuccessFilterWithRunAllFilters",
@ -729,6 +739,7 @@ func TestFilterPlugins(t *testing.T) {
}, },
runAllFilters: true, runAllFilters: true,
wantStatus: nil, wantStatus: nil,
wantStatusMap: PluginToStatus{},
}, },
{ {
name: "ErrorAndErrorFilters", name: "ErrorAndErrorFilters",
@ -745,6 +756,7 @@ func TestFilterPlugins(t *testing.T) {
}, },
runAllFilters: true, runAllFilters: true,
wantStatus: NewStatus(Error, `running "TestPlugin1" filter plugin for pod "": injected filter status`), wantStatus: NewStatus(Error, `running "TestPlugin1" filter plugin for pod "": injected filter status`),
wantStatusMap: PluginToStatus{"TestPlugin1": NewStatus(Error, `running "TestPlugin1" filter plugin for pod "": injected filter status`)},
}, },
{ {
name: "ErrorAndErrorFilters", name: "ErrorAndErrorFilters",
@ -761,6 +773,10 @@ func TestFilterPlugins(t *testing.T) {
}, },
runAllFilters: true, runAllFilters: true,
wantStatus: NewStatus(UnschedulableAndUnresolvable, "injected filter status", "injected filter status"), wantStatus: NewStatus(UnschedulableAndUnresolvable, "injected filter status", "injected filter status"),
wantStatusMap: PluginToStatus{
"TestPlugin1": NewStatus(UnschedulableAndUnresolvable, "injected filter status"),
"TestPlugin2": NewStatus(Unschedulable, "injected filter status"),
},
}, },
} }
@ -787,10 +803,15 @@ func TestFilterPlugins(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("fail to create framework: %s", err) t.Fatalf("fail to create framework: %s", err)
} }
status := f.RunFilterPlugins(context.TODO(), nil, pod, nil) gotStatusMap := f.RunFilterPlugins(context.TODO(), nil, pod, nil)
if !reflect.DeepEqual(status, tt.wantStatus) { gotStatus := gotStatusMap.Merge()
t.Errorf("Wrong status code. got: %v, want:%v", status, tt.wantStatus) if !reflect.DeepEqual(gotStatus, tt.wantStatus) {
t.Errorf("wrong status code. got: %v, want:%v", gotStatus, tt.wantStatus)
} }
if !reflect.DeepEqual(gotStatusMap, tt.wantStatusMap) {
t.Errorf("wrong status map. got: %+v, want: %+v", gotStatusMap, tt.wantStatusMap)
}
}) })
} }
} }

View File

@ -155,6 +155,43 @@ func NewStatus(code Code, reasons ...string) *Status {
} }
} }
// PluginToStatus maps plugin name to status. Currently used to identify which Filter plugin
// returned which status.
type PluginToStatus map[string]*Status
// Merge merges the statuses in the map into one. The resulting status code have the following
// precedence: Error, UnschedulableAndUnresolvable, Unschedulable.
func (p PluginToStatus) Merge() *Status {
if len(p) == 0 {
return nil
}
finalStatus := NewStatus(Success)
var hasError, hasUnschedulableAndUnresolvable, hasUnschedulable bool
for _, s := range p {
if s.Code() == Error {
hasError = true
} else if s.Code() == UnschedulableAndUnresolvable {
hasUnschedulableAndUnresolvable = true
} else if s.Code() == Unschedulable {
hasUnschedulable = true
}
finalStatus.code = s.Code()
for _, r := range s.reasons {
finalStatus.AppendReason(r)
}
}
if hasError {
finalStatus.code = Error
} else if hasUnschedulableAndUnresolvable {
finalStatus.code = UnschedulableAndUnresolvable
} else if hasUnschedulable {
finalStatus.code = Unschedulable
}
return finalStatus
}
// WaitingPod represents a pod currently waiting in the permit phase. // WaitingPod represents a pod currently waiting in the permit phase.
type WaitingPod interface { type WaitingPod interface {
// GetPod returns a reference to the waiting pod. // GetPod returns a reference to the waiting pod.
@ -390,7 +427,7 @@ type Framework interface {
// preemption, we may pass a copy of the original nodeInfo object that has some pods // preemption, we may pass a copy of the original nodeInfo object that has some pods
// removed from it to evaluate the possibility of preempting them to // removed from it to evaluate the possibility of preempting them to
// schedule the target pod. // schedule the target pod.
RunFilterPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status RunFilterPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) PluginToStatus
// RunPreFilterExtensionAddPod calls the AddPod interface for the set of configured // RunPreFilterExtensionAddPod calls the AddPod interface for the set of configured
// PreFilter plugins. It returns directly if any of the plugins return any // PreFilter plugins. It returns directly if any of the plugins return any

View File

@ -90,3 +90,33 @@ func assertStatusCode(t *testing.T, code Code, value int) {
t.Errorf("Status code %q should have a value of %v but got %v", code.String(), value, int(code)) t.Errorf("Status code %q should have a value of %v but got %v", code.String(), value, int(code))
} }
} }
func TestPluginToStatusMerge(t *testing.T) {
tests := []struct {
statusMap PluginToStatus
wantCode Code
}{
{
statusMap: PluginToStatus{"p1": NewStatus(Error), "p2": NewStatus(Unschedulable)},
wantCode: Error,
},
{
statusMap: PluginToStatus{"p1": NewStatus(Success), "p2": NewStatus(Unschedulable)},
wantCode: Unschedulable,
},
{
statusMap: PluginToStatus{"p1": NewStatus(Success), "p2": NewStatus(UnschedulableAndUnresolvable), "p3": NewStatus(Unschedulable)},
wantCode: UnschedulableAndUnresolvable,
},
{
wantCode: Success,
},
}
for i, test := range tests {
gotStatus := test.statusMap.Merge()
if test.wantCode != gotStatus.Code() {
t.Errorf("test #%v, wantCode %v, gotCode %v", i, test.wantCode, gotStatus.Code())
}
}
}

View File

@ -195,7 +195,7 @@ func (*fakeFramework) RunPreFilterPlugins(ctx context.Context, state *framework.
return nil return nil
} }
func (*fakeFramework) RunFilterPlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *framework.Status { func (*fakeFramework) RunFilterPlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) framework.PluginToStatus {
return nil return nil
} }