Distinguish unschedulable with unresolvable in scheduler

Before, in RunPostFilterPlugins, we didn't distinguish between unschedulable and unresolvable
because we only have one postFilterPlugin by default, now, we have at least two, we should
make sure that once a postFilterPlugin returns unresolvable, we'll return directly

Signed-off-by: Kante Yin <kerthcet@gmail.com>
This commit is contained in:
Kante Yin 2023-01-05 16:59:19 +08:00
parent 41b442534d
commit 2c205e291d
5 changed files with 120 additions and 104 deletions

View File

@ -79,13 +79,14 @@ const (
// Error is used for internal plugin errors, unexpected input, etc. // Error is used for internal plugin errors, unexpected input, etc.
Error Error
// Unschedulable is used when a plugin finds a pod unschedulable. The scheduler might attempt to // Unschedulable is used when a plugin finds a pod unschedulable. The scheduler might attempt to
// preempt other pods to get this pod scheduled. Use UnschedulableAndUnresolvable to make the // run other postFilter plugins like preemption to get this pod scheduled.
// scheduler skip preemption. // Use UnschedulableAndUnresolvable to make the scheduler skipping other postFilter plugins.
// The accompanying status message should explain why the pod is unschedulable. // The accompanying status message should explain why the pod is unschedulable.
Unschedulable Unschedulable
// UnschedulableAndUnresolvable is used when a plugin finds a pod unschedulable and // UnschedulableAndUnresolvable is used when a plugin finds a pod unschedulable and
// preemption would not change anything. Plugins should return Unschedulable if it is possible // other postFilter plugins like preemption would not change anything.
// that the pod can get scheduled with preemption. // Plugins should return Unschedulable if it is possible that the pod can get scheduled
// after running other postFilter plugins.
// The accompanying status message should explain why the pod is unschedulable. // The accompanying status message should explain why the pod is unschedulable.
UnschedulableAndUnresolvable UnschedulableAndUnresolvable
// Wait is used when a Permit plugin finds a pod scheduling should wait. // Wait is used when a Permit plugin finds a pod scheduling should wait.
@ -97,15 +98,6 @@ const (
// This list should be exactly the same as the codes iota defined above in the same order. // This list should be exactly the same as the codes iota defined above in the same order.
var codes = []string{"Success", "Error", "Unschedulable", "UnschedulableAndUnresolvable", "Wait", "Skip"} var codes = []string{"Success", "Error", "Unschedulable", "UnschedulableAndUnresolvable", "Wait", "Skip"}
// statusPrecedence defines a map from status to its precedence, larger value means higher precedent.
var statusPrecedence = map[Code]int{
Error: 3,
UnschedulableAndUnresolvable: 2,
Unschedulable: 1,
// Any other statuses we know today, `Skip` or `Wait`, will take precedence over `Success`.
Success: -1,
}
func (c Code) String() string { func (c Code) String() string {
return codes[c] return codes[c]
} }
@ -255,7 +247,10 @@ func (s *Status) Equal(x *Status) bool {
if !cmp.Equal(s.err, x.err, cmpopts.EquateErrors()) { if !cmp.Equal(s.err, x.err, cmpopts.EquateErrors()) {
return false return false
} }
return cmp.Equal(s.reasons, x.reasons) if !cmp.Equal(s.reasons, x.reasons) {
return false
}
return cmp.Equal(s.failedPlugin, x.failedPlugin)
} }
// NewStatus makes a Status out of the given arguments and returns its pointer. // NewStatus makes a Status out of the given arguments and returns its pointer.
@ -278,36 +273,6 @@ func AsStatus(err error) *Status {
} }
} }
// PluginToStatus maps plugin name to status it returned.
type PluginToStatus map[string]*Status
// Merge merges the statuses in the map into one. The resulting status code have the following
// precedence: Error, UnschedulableAndUnresolvable, Unschedulable.
func (p PluginToStatus) Merge() *Status {
if len(p) == 0 {
return nil
}
finalStatus := NewStatus(Success)
for _, s := range p {
if statusPrecedence[s.Code()] > statusPrecedence[finalStatus.code] {
finalStatus.code = s.Code()
// Same as code, we keep the most relevant failedPlugin in the returned Status.
finalStatus.failedPlugin = s.FailedPlugin()
}
reasons := s.Reasons()
if finalStatus.err == nil {
finalStatus.err = s.err
reasons = s.reasons
}
for _, r := range reasons {
finalStatus.AppendReason(r)
}
}
return finalStatus
}
// WaitingPod represents a pod currently waiting in the permit phase. // WaitingPod represents a pod currently waiting in the permit phase.
type WaitingPod interface { type WaitingPod interface {
// GetPod returns a reference to the waiting pod. // GetPod returns a reference to the waiting pod.

View File

@ -139,43 +139,6 @@ func assertStatusCode(t *testing.T, code Code, value int) {
} }
} }
func TestPluginToStatusMerge(t *testing.T) {
tests := []struct {
name string
statusMap PluginToStatus
wantCode Code
}{
{
name: "merge Error and Unschedulable statuses",
statusMap: PluginToStatus{"p1": NewStatus(Error), "p2": NewStatus(Unschedulable)},
wantCode: Error,
},
{
name: "merge Success and Unschedulable statuses",
statusMap: PluginToStatus{"p1": NewStatus(Success), "p2": NewStatus(Unschedulable)},
wantCode: Unschedulable,
},
{
name: "merge Success, UnschedulableAndUnresolvable and Unschedulable statuses",
statusMap: PluginToStatus{"p1": NewStatus(Success), "p2": NewStatus(UnschedulableAndUnresolvable), "p3": NewStatus(Unschedulable)},
wantCode: UnschedulableAndUnresolvable,
},
{
name: "merge nil status",
wantCode: Success,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
gotStatus := test.statusMap.Merge()
if test.wantCode != gotStatus.Code() {
t.Errorf("wantCode %v, gotCode %v", test.wantCode, gotStatus.Code())
}
})
}
}
func TestPreFilterResultMerge(t *testing.T) { func TestPreFilterResultMerge(t *testing.T) {
tests := map[string]struct { tests := map[string]struct {
receiver *PreFilterResult receiver *PreFilterResult

View File

@ -752,30 +752,39 @@ func (f *frameworkImpl) runFilterPlugin(ctx context.Context, pl framework.Filter
} }
// RunPostFilterPlugins runs the set of configured PostFilter plugins until the first // RunPostFilterPlugins runs the set of configured PostFilter plugins until the first
// Success or Error is met, otherwise continues to execute all plugins. // Success, Error or UnschedulableAndUnresolvable is met; otherwise continues to execute all plugins.
func (f *frameworkImpl) RunPostFilterPlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod, filteredNodeStatusMap framework.NodeToStatusMap) (_ *framework.PostFilterResult, status *framework.Status) { func (f *frameworkImpl) RunPostFilterPlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod, filteredNodeStatusMap framework.NodeToStatusMap) (_ *framework.PostFilterResult, status *framework.Status) {
startTime := time.Now() startTime := time.Now()
defer func() { defer func() {
metrics.FrameworkExtensionPointDuration.WithLabelValues(postFilter, status.Code().String(), f.profileName).Observe(metrics.SinceInSeconds(startTime)) metrics.FrameworkExtensionPointDuration.WithLabelValues(postFilter, status.Code().String(), f.profileName).Observe(metrics.SinceInSeconds(startTime))
}() }()
statuses := make(framework.PluginToStatus)
// `result` records the last meaningful(non-noop) PostFilterResult. // `result` records the last meaningful(non-noop) PostFilterResult.
var result *framework.PostFilterResult var result *framework.PostFilterResult
var reasons []string
var failedPlugin string
for _, pl := range f.postFilterPlugins { for _, pl := range f.postFilterPlugins {
r, s := f.runPostFilterPlugin(ctx, pl, state, pod, filteredNodeStatusMap) r, s := f.runPostFilterPlugin(ctx, pl, state, pod, filteredNodeStatusMap)
if s.IsSuccess() { if s.IsSuccess() {
return r, s return r, s
} else if s.Code() == framework.UnschedulableAndUnresolvable {
return r, s.WithFailedPlugin(pl.Name())
} else if !s.IsUnschedulable() { } else if !s.IsUnschedulable() {
// Any status other than Success or Unschedulable is Error. // Any status other than Success, Unschedulable or UnschedulableAndUnresolvable is Error.
return nil, framework.AsStatus(s.AsError()) return nil, framework.AsStatus(s.AsError()).WithFailedPlugin(pl.Name())
} else if r != nil && r.Mode() != framework.ModeNoop { } else if r != nil && r.Mode() != framework.ModeNoop {
result = r result = r
} }
statuses[pl.Name()] = s
reasons = append(reasons, s.Reasons()...)
// Record the first failed plugin unless we proved that
// the latter is more relevant.
if len(failedPlugin) == 0 {
failedPlugin = pl.Name()
}
} }
return result, statuses.Merge() return result, framework.NewStatus(framework.Unschedulable, reasons...).WithFailedPlugin(failedPlugin)
} }
func (f *frameworkImpl) runPostFilterPlugin(ctx context.Context, pl framework.PostFilterPlugin, state *framework.CycleState, pod *v1.Pod, filteredNodeStatusMap framework.NodeToStatusMap) (*framework.PostFilterResult, *framework.Status) { func (f *frameworkImpl) runPostFilterPlugin(ctx context.Context, pl framework.PostFilterPlugin, state *framework.CycleState, pod *v1.Pod, filteredNodeStatusMap framework.NodeToStatusMap) (*framework.PostFilterResult, *framework.Status) {

View File

@ -1676,6 +1676,48 @@ func TestPostFilterPlugins(t *testing.T) {
}, },
wantStatus: framework.NewStatus(framework.Success, injectReason), wantStatus: framework.NewStatus(framework.Success, injectReason),
}, },
{
name: "plugin1 failed to make a Pod schedulable, followed by plugin2 which makes the Pod schedulable",
plugins: []*TestPlugin{
{
name: "TestPlugin1",
inj: injectedResult{PostFilterStatus: int(framework.Error)},
},
{
name: "TestPlugin2",
inj: injectedResult{PostFilterStatus: int(framework.Success)},
},
},
wantStatus: framework.AsStatus(fmt.Errorf(injectReason)).WithFailedPlugin("TestPlugin1"),
},
{
name: "plugin1 failed to make a Pod schedulable, followed by plugin2 which makes the Pod unresolvable",
plugins: []*TestPlugin{
{
name: "TestPlugin1",
inj: injectedResult{PostFilterStatus: int(framework.Unschedulable)},
},
{
name: "TestPlugin2",
inj: injectedResult{PostFilterStatus: int(framework.UnschedulableAndUnresolvable)},
},
},
wantStatus: framework.NewStatus(framework.UnschedulableAndUnresolvable, injectReason).WithFailedPlugin("TestPlugin2"),
},
{
name: "both plugins failed to make a Pod schedulable",
plugins: []*TestPlugin{
{
name: "TestPlugin1",
inj: injectedResult{PostFilterStatus: int(framework.Unschedulable)},
},
{
name: "TestPlugin2",
inj: injectedResult{PostFilterStatus: int(framework.Unschedulable)},
},
},
wantStatus: framework.NewStatus(framework.Unschedulable, []string{injectReason, injectReason}...).WithFailedPlugin("TestPlugin1"),
},
} }
for _, tt := range tests { for _, tt := range tests {

View File

@ -96,10 +96,12 @@ type FilterPlugin struct {
} }
type PostFilterPlugin struct { type PostFilterPlugin struct {
name string
fh framework.Handle fh framework.Handle
numPostFilterCalled int numPostFilterCalled int
failPostFilter bool failPostFilter bool
rejectPostFilter bool rejectPostFilter bool
breakPostFilter bool
} }
type ReservePlugin struct { type ReservePlugin struct {
@ -463,7 +465,7 @@ func (pp *PreFilterPlugin) reset() {
// Name returns name of the plugin. // Name returns name of the plugin.
func (pp *PostFilterPlugin) Name() string { func (pp *PostFilterPlugin) Name() string {
return postfilterPluginName return pp.name
} }
func (pp *PostFilterPlugin) PostFilter(ctx context.Context, state *framework.CycleState, pod *v1.Pod, _ framework.NodeToStatusMap) (*framework.PostFilterResult, *framework.Status) { func (pp *PostFilterPlugin) PostFilter(ctx context.Context, state *framework.CycleState, pod *v1.Pod, _ framework.NodeToStatusMap) (*framework.PostFilterResult, *framework.Status) {
@ -486,8 +488,12 @@ func (pp *PostFilterPlugin) PostFilter(ctx context.Context, state *framework.Cyc
return nil, framework.NewStatus(framework.Error, fmt.Sprintf("injecting failure for pod %v", pod.Name)) return nil, framework.NewStatus(framework.Error, fmt.Sprintf("injecting failure for pod %v", pod.Name))
} }
if pp.rejectPostFilter { if pp.rejectPostFilter {
return nil, framework.NewStatus(framework.Unschedulable, fmt.Sprintf("reject pod %v", pod.Name)) return nil, framework.NewStatus(framework.Unschedulable, fmt.Sprintf("injecting unschedulable for pod %v", pod.Name))
} }
if pp.breakPostFilter {
return nil, framework.NewStatus(framework.UnschedulableAndUnresolvable, fmt.Sprintf("injecting unresolvable for pod %v", pod.Name))
}
return nil, framework.NewStatus(framework.Success, fmt.Sprintf("make room for pod %v to be schedulable", pod.Name)) return nil, framework.NewStatus(framework.Success, fmt.Sprintf("make room for pod %v to be schedulable", pod.Name))
} }
@ -628,7 +634,7 @@ func TestPreFilterPlugin(t *testing.T) {
} }
} }
// TestPostFilterPlugin tests invocation of postfilter plugins. // TestPostFilterPlugin tests invocation of postFilter plugins.
func TestPostFilterPlugin(t *testing.T) { func TestPostFilterPlugin(t *testing.T) {
var numNodes int32 = 1 var numNodes int32 = 1
tests := []struct { tests := []struct {
@ -637,6 +643,9 @@ func TestPostFilterPlugin(t *testing.T) {
rejectFilter bool rejectFilter bool
failScore bool failScore bool
rejectPostFilter bool rejectPostFilter bool
rejectPostFilter2 bool
breakPostFilter bool
breakPostFilter2 bool
expectFilterNumCalled int32 expectFilterNumCalled int32
expectScoreNumCalled int32 expectScoreNumCalled int32
expectPostFilterNumCalled int expectPostFilterNumCalled int
@ -669,7 +678,7 @@ func TestPostFilterPlugin(t *testing.T) {
rejectPostFilter: true, rejectPostFilter: true,
expectFilterNumCalled: numNodes * 2, expectFilterNumCalled: numNodes * 2,
expectScoreNumCalled: 1, expectScoreNumCalled: 1,
expectPostFilterNumCalled: 1, expectPostFilterNumCalled: 2,
}, },
{ {
name: "Score failed and PostFilter failed", name: "Score failed and PostFilter failed",
@ -679,25 +688,53 @@ func TestPostFilterPlugin(t *testing.T) {
rejectPostFilter: true, rejectPostFilter: true,
expectFilterNumCalled: numNodes * 2, expectFilterNumCalled: numNodes * 2,
expectScoreNumCalled: 1, expectScoreNumCalled: 1,
expectPostFilterNumCalled: 2,
},
{
name: "Filter failed and first PostFilter broken",
numNodes: numNodes,
rejectFilter: true,
breakPostFilter: true,
expectFilterNumCalled: numNodes * 2,
expectScoreNumCalled: 0,
expectPostFilterNumCalled: 1, expectPostFilterNumCalled: 1,
}, },
{
name: "Filter failed and second PostFilter broken",
numNodes: numNodes,
rejectFilter: true,
rejectPostFilter: true,
rejectPostFilter2: true,
breakPostFilter2: true,
expectFilterNumCalled: numNodes * 2,
expectScoreNumCalled: 0,
expectPostFilterNumCalled: 2,
},
} }
var postFilterPluginName2 = postfilterPluginName + "2"
for i, tt := range tests { for i, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
// Create a plugin registry for testing. Register a combination of filter and postFilter plugin. // Create a plugin registry for testing. Register a combination of filter and postFilter plugin.
var ( var (
filterPlugin = &FilterPlugin{} filterPlugin = &FilterPlugin{}
scorePlugin = &ScorePlugin{} scorePlugin = &ScorePlugin{}
postFilterPlugin = &PostFilterPlugin{} postFilterPlugin = &PostFilterPlugin{name: postfilterPluginName}
postFilterPlugin2 = &PostFilterPlugin{name: postFilterPluginName2}
) )
filterPlugin.rejectFilter = tt.rejectFilter filterPlugin.rejectFilter = tt.rejectFilter
scorePlugin.failScore = tt.failScore scorePlugin.failScore = tt.failScore
postFilterPlugin.rejectPostFilter = tt.rejectPostFilter postFilterPlugin.rejectPostFilter = tt.rejectPostFilter
postFilterPlugin2.rejectPostFilter = tt.rejectPostFilter2
postFilterPlugin.breakPostFilter = tt.breakPostFilter
postFilterPlugin2.breakPostFilter = tt.breakPostFilter2
registry := frameworkruntime.Registry{ registry := frameworkruntime.Registry{
filterPluginName: newPlugin(filterPlugin), filterPluginName: newPlugin(filterPlugin),
scorePluginName: newPlugin(scorePlugin), scorePluginName: newPlugin(scorePlugin),
postfilterPluginName: newPlugin(postFilterPlugin), postfilterPluginName: newPlugin(postFilterPlugin),
postFilterPluginName2: newPlugin(postFilterPlugin2),
} }
// Setup plugins for testing. // Setup plugins for testing.
@ -723,9 +760,10 @@ func TestPostFilterPlugin(t *testing.T) {
PostFilter: configv1.PluginSet{ PostFilter: configv1.PluginSet{
Enabled: []configv1.Plugin{ Enabled: []configv1.Plugin{
{Name: postfilterPluginName}, {Name: postfilterPluginName},
{Name: postFilterPluginName2},
}, },
// Need to disable default in-tree PostFilter plugins, as they will // Need to disable default in-tree PostFilter plugins, as they will
// call RunFilterPlugins and hence impact the "numFilterCalled". // call RunPostFilterPlugins and hence impact the "numPostFilterCalled".
Disabled: []configv1.Plugin{ Disabled: []configv1.Plugin{
{Name: "*"}, {Name: "*"},
}, },
@ -760,9 +798,6 @@ func TestPostFilterPlugin(t *testing.T) {
if numScoreCalled := atomic.LoadInt32(&scorePlugin.numScoreCalled); numScoreCalled < tt.expectScoreNumCalled { if numScoreCalled := atomic.LoadInt32(&scorePlugin.numScoreCalled); numScoreCalled < tt.expectScoreNumCalled {
t.Errorf("Expected the score plugin to be called at least %v times, but got %v.", tt.expectScoreNumCalled, numScoreCalled) t.Errorf("Expected the score plugin to be called at least %v times, but got %v.", tt.expectScoreNumCalled, numScoreCalled)
} }
if postFilterPlugin.numPostFilterCalled < tt.expectPostFilterNumCalled {
t.Errorf("Expected the postfilter plugin to be called at least %v times, but got %v.", tt.expectPostFilterNumCalled, postFilterPlugin.numPostFilterCalled)
}
} else { } else {
if err = testutils.WaitForPodToSchedule(testCtx.ClientSet, pod); err != nil { if err = testutils.WaitForPodToSchedule(testCtx.ClientSet, pod); err != nil {
t.Errorf("Expected the pod to be scheduled. error: %v", err) t.Errorf("Expected the pod to be scheduled. error: %v", err)
@ -773,9 +808,11 @@ func TestPostFilterPlugin(t *testing.T) {
if numScoreCalled := atomic.LoadInt32(&scorePlugin.numScoreCalled); numScoreCalled != tt.expectScoreNumCalled { if numScoreCalled := atomic.LoadInt32(&scorePlugin.numScoreCalled); numScoreCalled != tt.expectScoreNumCalled {
t.Errorf("Expected the score plugin to be called %v times, but got %v.", tt.expectScoreNumCalled, numScoreCalled) t.Errorf("Expected the score plugin to be called %v times, but got %v.", tt.expectScoreNumCalled, numScoreCalled)
} }
if postFilterPlugin.numPostFilterCalled != tt.expectPostFilterNumCalled { }
t.Errorf("Expected the postfilter plugin to be called %v times, but got %v.", tt.expectPostFilterNumCalled, postFilterPlugin.numPostFilterCalled)
} numPostFilterCalled := postFilterPlugin.numPostFilterCalled + postFilterPlugin2.numPostFilterCalled
if numPostFilterCalled != tt.expectPostFilterNumCalled {
t.Errorf("Expected the postfilter plugin to be called %v times, but got %v.", tt.expectPostFilterNumCalled, numPostFilterCalled)
} }
}) })
} }