Feat: unify the status handle when return in bindingCycle (#112103)

Signed-off-by: kerthcet <kerthcet@gmail.com>

Signed-off-by: kerthcet <kerthcet@gmail.com>
This commit is contained in:
Kante Yin 2022-09-09 23:31:23 +08:00 committed by GitHub
parent 2b2be7fa6b
commit 096dafe757
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 127 additions and 88 deletions

View File

@ -250,6 +250,9 @@ func NewStatus(code Code, reasons ...string) *Status {
// AsStatus wraps an error in a Status. // AsStatus wraps an error in a Status.
func AsStatus(err error) *Status { func AsStatus(err error) *Status {
if err == nil {
return nil
}
return &Status{ return &Status{
code: Error, code: Error,
reasons: []string{err.Error()}, reasons: []string{err.Error()},

View File

@ -22,7 +22,7 @@ import (
"math" "math"
"sync/atomic" "sync/atomic"
"k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
"k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework"
) )

View File

@ -1003,8 +1003,13 @@ func (f *frameworkImpl) RunPreBindPlugins(ctx context.Context, state *framework.
for _, pl := range f.preBindPlugins { for _, pl := range f.preBindPlugins {
status = f.runPreBindPlugin(ctx, pl, state, pod, nodeName) status = f.runPreBindPlugin(ctx, pl, state, pod, nodeName)
if !status.IsSuccess() { if !status.IsSuccess() {
if status.IsUnschedulable() {
klog.V(4).InfoS("Pod rejected by PreBind plugin", "pod", klog.KObj(pod), "node", nodeName, "plugin", pl.Name(), "status", status.Message())
status.SetFailedPlugin(pl.Name())
return status
}
err := status.AsError() err := status.AsError()
klog.ErrorS(err, "Failed running PreBind plugin", "plugin", pl.Name(), "pod", klog.KObj(pod)) klog.ErrorS(err, "Failed running PreBind plugin", "plugin", pl.Name(), "pod", klog.KObj(pod), "node", nodeName)
return framework.AsStatus(fmt.Errorf("running PreBind plugin %q: %w", pl.Name(), err)) return framework.AsStatus(fmt.Errorf("running PreBind plugin %q: %w", pl.Name(), err))
} }
} }
@ -1030,15 +1035,20 @@ func (f *frameworkImpl) RunBindPlugins(ctx context.Context, state *framework.Cyc
if len(f.bindPlugins) == 0 { if len(f.bindPlugins) == 0 {
return framework.NewStatus(framework.Skip, "") return framework.NewStatus(framework.Skip, "")
} }
for _, bp := range f.bindPlugins { for _, pl := range f.bindPlugins {
status = f.runBindPlugin(ctx, bp, state, pod, nodeName) status = f.runBindPlugin(ctx, pl, state, pod, nodeName)
if status.IsSkip() { if status.IsSkip() {
continue continue
} }
if !status.IsSuccess() { if !status.IsSuccess() {
if status.IsUnschedulable() {
klog.V(4).InfoS("Pod rejected by Bind plugin", "pod", klog.KObj(pod), "node", nodeName, "plugin", pl.Name(), "status", status.Message())
status.SetFailedPlugin(pl.Name())
return status
}
err := status.AsError() err := status.AsError()
klog.ErrorS(err, "Failed running Bind plugin", "plugin", bp.Name(), "pod", klog.KObj(pod)) klog.ErrorS(err, "Failed running Bind plugin", "plugin", pl.Name(), "pod", klog.KObj(pod), "node", nodeName)
return framework.AsStatus(fmt.Errorf("running Bind plugin %q: %w", bp.Name(), err)) return framework.AsStatus(fmt.Errorf("running Bind plugin %q: %w", pl.Name(), err))
} }
return status return status
} }

View File

@ -54,6 +54,9 @@ const (
testProfileName = "test-profile" testProfileName = "test-profile"
nodeName = "testNode" nodeName = "testNode"
injectReason = "injected status"
injectFilterReason = "injected filter status"
) )
// TestScoreWithNormalizePlugin implements ScoreWithNormalizePlugin interface. // TestScoreWithNormalizePlugin implements ScoreWithNormalizePlugin interface.
@ -121,7 +124,7 @@ func (pl *TestScorePlugin) Name() string {
} }
func (pl *TestScorePlugin) PreScore(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodes []*v1.Node) *framework.Status { func (pl *TestScorePlugin) PreScore(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodes []*v1.Node) *framework.Status {
return framework.NewStatus(framework.Code(pl.inj.PreScoreStatus), "injected status") return framework.NewStatus(framework.Code(pl.inj.PreScoreStatus), injectReason)
} }
func (pl *TestScorePlugin) Score(ctx context.Context, state *framework.CycleState, p *v1.Pod, nodeName string) (int64, *framework.Status) { func (pl *TestScorePlugin) Score(ctx context.Context, state *framework.CycleState, p *v1.Pod, nodeName string) (int64, *framework.Status) {
@ -150,10 +153,10 @@ type TestPlugin struct {
} }
func (pl *TestPlugin) AddPod(ctx context.Context, state *framework.CycleState, podToSchedule *v1.Pod, podInfoToAdd *framework.PodInfo, nodeInfo *framework.NodeInfo) *framework.Status { func (pl *TestPlugin) AddPod(ctx context.Context, state *framework.CycleState, podToSchedule *v1.Pod, podInfoToAdd *framework.PodInfo, nodeInfo *framework.NodeInfo) *framework.Status {
return framework.NewStatus(framework.Code(pl.inj.PreFilterAddPodStatus), "injected status") return framework.NewStatus(framework.Code(pl.inj.PreFilterAddPodStatus), injectReason)
} }
func (pl *TestPlugin) RemovePod(ctx context.Context, state *framework.CycleState, podToSchedule *v1.Pod, podInfoToRemove *framework.PodInfo, nodeInfo *framework.NodeInfo) *framework.Status { func (pl *TestPlugin) RemovePod(ctx context.Context, state *framework.CycleState, podToSchedule *v1.Pod, podInfoToRemove *framework.PodInfo, nodeInfo *framework.NodeInfo) *framework.Status {
return framework.NewStatus(framework.Code(pl.inj.PreFilterRemovePodStatus), "injected status") return framework.NewStatus(framework.Code(pl.inj.PreFilterRemovePodStatus), injectReason)
} }
func (pl *TestPlugin) Name() string { func (pl *TestPlugin) Name() string {
@ -165,7 +168,7 @@ func (pl *TestPlugin) Less(*framework.QueuedPodInfo, *framework.QueuedPodInfo) b
} }
func (pl *TestPlugin) Score(ctx context.Context, state *framework.CycleState, p *v1.Pod, nodeName string) (int64, *framework.Status) { func (pl *TestPlugin) Score(ctx context.Context, state *framework.CycleState, p *v1.Pod, nodeName string) (int64, *framework.Status) {
return 0, framework.NewStatus(framework.Code(pl.inj.ScoreStatus), "injected status") return 0, framework.NewStatus(framework.Code(pl.inj.ScoreStatus), injectReason)
} }
func (pl *TestPlugin) ScoreExtensions() framework.ScoreExtensions { func (pl *TestPlugin) ScoreExtensions() framework.ScoreExtensions {
@ -173,7 +176,7 @@ func (pl *TestPlugin) ScoreExtensions() framework.ScoreExtensions {
} }
func (pl *TestPlugin) PreFilter(ctx context.Context, state *framework.CycleState, p *v1.Pod) (*framework.PreFilterResult, *framework.Status) { func (pl *TestPlugin) PreFilter(ctx context.Context, state *framework.CycleState, p *v1.Pod) (*framework.PreFilterResult, *framework.Status) {
return nil, framework.NewStatus(framework.Code(pl.inj.PreFilterStatus), "injected status") return nil, framework.NewStatus(framework.Code(pl.inj.PreFilterStatus), injectReason)
} }
func (pl *TestPlugin) PreFilterExtensions() framework.PreFilterExtensions { func (pl *TestPlugin) PreFilterExtensions() framework.PreFilterExtensions {
@ -181,37 +184,37 @@ func (pl *TestPlugin) PreFilterExtensions() framework.PreFilterExtensions {
} }
func (pl *TestPlugin) Filter(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status { func (pl *TestPlugin) Filter(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
return framework.NewStatus(framework.Code(pl.inj.FilterStatus), "injected filter status") return framework.NewStatus(framework.Code(pl.inj.FilterStatus), injectFilterReason)
} }
func (pl *TestPlugin) PostFilter(_ context.Context, _ *framework.CycleState, _ *v1.Pod, _ framework.NodeToStatusMap) (*framework.PostFilterResult, *framework.Status) { func (pl *TestPlugin) PostFilter(_ context.Context, _ *framework.CycleState, _ *v1.Pod, _ framework.NodeToStatusMap) (*framework.PostFilterResult, *framework.Status) {
return nil, framework.NewStatus(framework.Code(pl.inj.PostFilterStatus), "injected status") return nil, framework.NewStatus(framework.Code(pl.inj.PostFilterStatus), injectReason)
} }
func (pl *TestPlugin) PreScore(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodes []*v1.Node) *framework.Status { func (pl *TestPlugin) PreScore(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodes []*v1.Node) *framework.Status {
return framework.NewStatus(framework.Code(pl.inj.PreScoreStatus), "injected status") return framework.NewStatus(framework.Code(pl.inj.PreScoreStatus), injectReason)
} }
func (pl *TestPlugin) Reserve(ctx context.Context, state *framework.CycleState, p *v1.Pod, nodeName string) *framework.Status { func (pl *TestPlugin) Reserve(ctx context.Context, state *framework.CycleState, p *v1.Pod, nodeName string) *framework.Status {
return framework.NewStatus(framework.Code(pl.inj.ReserveStatus), "injected status") return framework.NewStatus(framework.Code(pl.inj.ReserveStatus), injectReason)
} }
func (pl *TestPlugin) Unreserve(ctx context.Context, state *framework.CycleState, p *v1.Pod, nodeName string) { func (pl *TestPlugin) Unreserve(ctx context.Context, state *framework.CycleState, p *v1.Pod, nodeName string) {
} }
func (pl *TestPlugin) PreBind(ctx context.Context, state *framework.CycleState, p *v1.Pod, nodeName string) *framework.Status { func (pl *TestPlugin) PreBind(ctx context.Context, state *framework.CycleState, p *v1.Pod, nodeName string) *framework.Status {
return framework.NewStatus(framework.Code(pl.inj.PreBindStatus), "injected status") return framework.NewStatus(framework.Code(pl.inj.PreBindStatus), injectReason)
} }
func (pl *TestPlugin) PostBind(ctx context.Context, state *framework.CycleState, p *v1.Pod, nodeName string) { func (pl *TestPlugin) PostBind(ctx context.Context, state *framework.CycleState, p *v1.Pod, nodeName string) {
} }
func (pl *TestPlugin) Permit(ctx context.Context, state *framework.CycleState, p *v1.Pod, nodeName string) (*framework.Status, time.Duration) { func (pl *TestPlugin) Permit(ctx context.Context, state *framework.CycleState, p *v1.Pod, nodeName string) (*framework.Status, time.Duration) {
return framework.NewStatus(framework.Code(pl.inj.PermitStatus), "injected status"), time.Duration(0) return framework.NewStatus(framework.Code(pl.inj.PermitStatus), injectReason), time.Duration(0)
} }
func (pl *TestPlugin) Bind(ctx context.Context, state *framework.CycleState, p *v1.Pod, nodeName string) *framework.Status { func (pl *TestPlugin) Bind(ctx context.Context, state *framework.CycleState, p *v1.Pod, nodeName string) *framework.Status {
return framework.NewStatus(framework.Code(pl.inj.BindStatus), "injected status") return framework.NewStatus(framework.Code(pl.inj.BindStatus), injectReason)
} }
// TestPreFilterPlugin only implements PreFilterPlugin interface. // TestPreFilterPlugin only implements PreFilterPlugin interface.
@ -374,8 +377,8 @@ var nodes = []*v1.Node{
} }
var ( var (
errInjectedStatus = errors.New("injected status") errInjectedStatus = errors.New(injectReason)
errInjectedFilterStatus = errors.New("injected filter status") errInjectedFilterStatus = errors.New(injectFilterReason)
) )
func newFrameworkWithQueueSortAndBind(r Registry, profile config.KubeSchedulerProfile, stopCh <-chan struct{}, opts ...Option) (framework.Framework, error) { func newFrameworkWithQueueSortAndBind(r Registry, profile config.KubeSchedulerProfile, stopCh <-chan struct{}, opts ...Option) (framework.Framework, error) {
@ -1307,9 +1310,9 @@ func TestFilterPlugins(t *testing.T) {
inj: injectedResult{FilterStatus: int(framework.Unschedulable)}, inj: injectedResult{FilterStatus: int(framework.Unschedulable)},
}, },
}, },
wantStatus: framework.NewStatus(framework.Unschedulable, "injected filter status").WithFailedPlugin("TestPlugin"), wantStatus: framework.NewStatus(framework.Unschedulable, injectFilterReason).WithFailedPlugin("TestPlugin"),
wantStatusMap: framework.PluginToStatus{ wantStatusMap: framework.PluginToStatus{
"TestPlugin": framework.NewStatus(framework.Unschedulable, "injected filter status").WithFailedPlugin("TestPlugin"), "TestPlugin": framework.NewStatus(framework.Unschedulable, injectFilterReason).WithFailedPlugin("TestPlugin"),
}, },
}, },
{ {
@ -1321,9 +1324,9 @@ func TestFilterPlugins(t *testing.T) {
FilterStatus: int(framework.UnschedulableAndUnresolvable)}, FilterStatus: int(framework.UnschedulableAndUnresolvable)},
}, },
}, },
wantStatus: framework.NewStatus(framework.UnschedulableAndUnresolvable, "injected filter status").WithFailedPlugin("TestPlugin"), wantStatus: framework.NewStatus(framework.UnschedulableAndUnresolvable, injectFilterReason).WithFailedPlugin("TestPlugin"),
wantStatusMap: framework.PluginToStatus{ wantStatusMap: framework.PluginToStatus{
"TestPlugin": framework.NewStatus(framework.UnschedulableAndUnresolvable, "injected filter status").WithFailedPlugin("TestPlugin"), "TestPlugin": framework.NewStatus(framework.UnschedulableAndUnresolvable, injectFilterReason).WithFailedPlugin("TestPlugin"),
}, },
}, },
// following tests cover multiple-plugins scenarios // following tests cover multiple-plugins scenarios
@ -1409,9 +1412,9 @@ func TestFilterPlugins(t *testing.T) {
inj: injectedResult{FilterStatus: int(framework.Unschedulable)}, inj: injectedResult{FilterStatus: int(framework.Unschedulable)},
}, },
}, },
wantStatus: framework.NewStatus(framework.Unschedulable, "injected filter status").WithFailedPlugin("TestPlugin2"), wantStatus: framework.NewStatus(framework.Unschedulable, injectFilterReason).WithFailedPlugin("TestPlugin2"),
wantStatusMap: framework.PluginToStatus{ wantStatusMap: framework.PluginToStatus{
"TestPlugin2": framework.NewStatus(framework.Unschedulable, "injected filter status").WithFailedPlugin("TestPlugin2"), "TestPlugin2": framework.NewStatus(framework.Unschedulable, injectFilterReason).WithFailedPlugin("TestPlugin2"),
}, },
}, },
} }
@ -1467,7 +1470,7 @@ func TestPostFilterPlugins(t *testing.T) {
inj: injectedResult{PostFilterStatus: int(framework.Success)}, inj: injectedResult{PostFilterStatus: int(framework.Success)},
}, },
}, },
wantStatus: framework.NewStatus(framework.Success, "injected status"), wantStatus: framework.NewStatus(framework.Success, injectReason),
}, },
{ {
name: "plugin1 failed to make a Pod schedulable, followed by plugin2 which makes the Pod schedulable", name: "plugin1 failed to make a Pod schedulable, followed by plugin2 which makes the Pod schedulable",
@ -1481,7 +1484,7 @@ func TestPostFilterPlugins(t *testing.T) {
inj: injectedResult{PostFilterStatus: int(framework.Success)}, inj: injectedResult{PostFilterStatus: int(framework.Success)},
}, },
}, },
wantStatus: framework.NewStatus(framework.Success, "injected status"), wantStatus: framework.NewStatus(framework.Success, injectReason),
}, },
{ {
name: "plugin1 makes a Pod schedulable, followed by plugin2 which cannot make the Pod schedulable", name: "plugin1 makes a Pod schedulable, followed by plugin2 which cannot make the Pod schedulable",
@ -1495,7 +1498,7 @@ func TestPostFilterPlugins(t *testing.T) {
inj: injectedResult{PostFilterStatus: int(framework.Unschedulable)}, inj: injectedResult{PostFilterStatus: int(framework.Unschedulable)},
}, },
}, },
wantStatus: framework.NewStatus(framework.Success, "injected status"), wantStatus: framework.NewStatus(framework.Success, injectReason),
}, },
} }
@ -1712,7 +1715,7 @@ func TestPreBindPlugins(t *testing.T) {
inj: injectedResult{PreBindStatus: int(framework.Unschedulable)}, inj: injectedResult{PreBindStatus: int(framework.Unschedulable)},
}, },
}, },
wantStatus: framework.AsStatus(fmt.Errorf(`running PreBind plugin "TestPlugin": %w`, errInjectedStatus)), wantStatus: framework.NewStatus(framework.Unschedulable, injectReason).WithFailedPlugin("TestPlugin"),
}, },
{ {
name: "ErrorPreBindPlugin", name: "ErrorPreBindPlugin",
@ -1732,7 +1735,7 @@ func TestPreBindPlugins(t *testing.T) {
inj: injectedResult{PreBindStatus: int(framework.UnschedulableAndUnresolvable)}, inj: injectedResult{PreBindStatus: int(framework.UnschedulableAndUnresolvable)},
}, },
}, },
wantStatus: framework.AsStatus(fmt.Errorf(`running PreBind plugin "TestPlugin": %w`, errInjectedStatus)), wantStatus: framework.NewStatus(framework.UnschedulableAndUnresolvable, injectReason).WithFailedPlugin("TestPlugin"),
}, },
{ {
name: "SuccessErrorPreBindPlugins", name: "SuccessErrorPreBindPlugins",
@ -1802,7 +1805,7 @@ func TestPreBindPlugins(t *testing.T) {
inj: injectedResult{PreBindStatus: int(framework.Success)}, inj: injectedResult{PreBindStatus: int(framework.Success)},
}, },
}, },
wantStatus: framework.AsStatus(fmt.Errorf(`running PreBind plugin "TestPlugin": %w`, errInjectedStatus)), wantStatus: framework.NewStatus(framework.Unschedulable, injectReason).WithFailedPlugin("TestPlugin"),
}, },
} }
@ -2028,7 +2031,7 @@ func TestPermitPlugins(t *testing.T) {
inj: injectedResult{PermitStatus: int(framework.Unschedulable)}, inj: injectedResult{PermitStatus: int(framework.Unschedulable)},
}, },
}, },
want: framework.NewStatus(framework.Unschedulable, "injected status").WithFailedPlugin("TestPlugin"), want: framework.NewStatus(framework.Unschedulable, injectReason).WithFailedPlugin("TestPlugin"),
}, },
{ {
name: "ErrorPermitPlugin", name: "ErrorPermitPlugin",
@ -2048,7 +2051,7 @@ func TestPermitPlugins(t *testing.T) {
inj: injectedResult{PermitStatus: int(framework.UnschedulableAndUnresolvable)}, inj: injectedResult{PermitStatus: int(framework.UnschedulableAndUnresolvable)},
}, },
}, },
want: framework.NewStatus(framework.UnschedulableAndUnresolvable, "injected status").WithFailedPlugin("TestPlugin"), want: framework.NewStatus(framework.UnschedulableAndUnresolvable, injectReason).WithFailedPlugin("TestPlugin"),
}, },
{ {
name: "WaitPermitPlugin", name: "WaitPermitPlugin",
@ -2341,7 +2344,7 @@ func TestRunBindPlugins(t *testing.T) {
{ {
name: "invalid status", name: "invalid status",
injects: []framework.Code{framework.Unschedulable}, injects: []framework.Code{framework.Unschedulable},
wantStatus: framework.Error, wantStatus: framework.Unschedulable,
}, },
{ {
name: "simple error", name: "simple error",
@ -2356,7 +2359,7 @@ func TestRunBindPlugins(t *testing.T) {
{ {
name: "invalid status, returns error", name: "invalid status, returns error",
injects: []framework.Code{framework.Skip, framework.UnschedulableAndUnresolvable}, injects: []framework.Code{framework.Skip, framework.UnschedulableAndUnresolvable},
wantStatus: framework.Error, wantStatus: framework.UnschedulableAndUnresolvable,
}, },
{ {
name: "error after success status, returns success", name: "error after success status, returns success",

View File

@ -239,9 +239,13 @@ func (sched *Scheduler) bindingCycle(ctx context.Context, state *framework.Cycle
// Avoid moving the assumed Pod itself as it's always Unschedulable. // Avoid moving the assumed Pod itself as it's always Unschedulable.
// It's intentional to "defer" this operation; otherwise MoveAllToActiveOrBackoffQueue() would // It's intentional to "defer" this operation; otherwise MoveAllToActiveOrBackoffQueue() would
// update `q.moveRequest` and thus move the assumed pod to backoffQ anyways. // update `q.moveRequest` and thus move the assumed pod to backoffQ anyways.
if waitOnPermitStatus.IsUnschedulable() {
defer sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(internalqueue.AssignedPodDelete, func(pod *v1.Pod) bool { defer sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(internalqueue.AssignedPodDelete, func(pod *v1.Pod) bool {
return assumedPod.UID != pod.UID return assumedPod.UID != pod.UID
}) })
} else {
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(internalqueue.AssignedPodDelete, nil)
}
} }
sched.FailureHandler(ctx, fwk, assumedPodInfo, waitOnPermitStatus.AsError(), reason, clearNominatedNode) sched.FailureHandler(ctx, fwk, assumedPodInfo, waitOnPermitStatus.AsError(), reason, clearNominatedNode)
return return
@ -250,7 +254,15 @@ func (sched *Scheduler) bindingCycle(ctx context.Context, state *framework.Cycle
// Run "prebind" plugins. // Run "prebind" plugins.
preBindStatus := fwk.RunPreBindPlugins(ctx, state, assumedPod, scheduleResult.SuggestedHost) preBindStatus := fwk.RunPreBindPlugins(ctx, state, assumedPod, scheduleResult.SuggestedHost)
if !preBindStatus.IsSuccess() { if !preBindStatus.IsSuccess() {
var reason string
if preBindStatus.IsUnschedulable() {
metrics.PodUnschedulable(fwk.ProfileName(), metrics.SinceInSeconds(start))
reason = v1.PodReasonUnschedulable
} else {
metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start)) metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))
reason = v1.PodReasonSchedulerError
}
// trigger un-reserve plugins to clean up state associated with the reserved Pod // trigger un-reserve plugins to clean up state associated with the reserved Pod
fwk.RunReservePluginsUnreserve(ctx, state, assumedPod, scheduleResult.SuggestedHost) fwk.RunReservePluginsUnreserve(ctx, state, assumedPod, scheduleResult.SuggestedHost)
if forgetErr := sched.Cache.ForgetPod(assumedPod); forgetErr != nil { if forgetErr := sched.Cache.ForgetPod(assumedPod); forgetErr != nil {
@ -259,15 +271,29 @@ func (sched *Scheduler) bindingCycle(ctx context.Context, state *framework.Cycle
// "Forget"ing an assumed Pod in binding cycle should be treated as a PodDelete event, // "Forget"ing an assumed Pod in binding cycle should be treated as a PodDelete event,
// as the assumed Pod had occupied a certain amount of resources in scheduler cache. // as the assumed Pod had occupied a certain amount of resources in scheduler cache.
// TODO(#103853): de-duplicate the logic. // TODO(#103853): de-duplicate the logic.
if preBindStatus.IsUnschedulable() {
defer sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(internalqueue.AssignedPodDelete, func(pod *v1.Pod) bool {
return assumedPod.UID != pod.UID
})
} else {
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(internalqueue.AssignedPodDelete, nil) sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(internalqueue.AssignedPodDelete, nil)
} }
sched.FailureHandler(ctx, fwk, assumedPodInfo, preBindStatus.AsError(), v1.PodReasonSchedulerError, clearNominatedNode) }
sched.FailureHandler(ctx, fwk, assumedPodInfo, preBindStatus.AsError(), reason, clearNominatedNode)
return return
} }
err := sched.bind(ctx, fwk, assumedPod, scheduleResult.SuggestedHost, state) bindStatus := sched.bind(ctx, fwk, assumedPod, scheduleResult.SuggestedHost, state)
if err != nil { if !bindStatus.IsSuccess() {
var reason string
if bindStatus.IsUnschedulable() {
metrics.PodUnschedulable(fwk.ProfileName(), metrics.SinceInSeconds(start))
reason = v1.PodReasonUnschedulable
} else {
metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start)) metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))
reason = v1.PodReasonSchedulerError
}
// trigger un-reserve plugins to clean up state associated with the reserved Pod // trigger un-reserve plugins to clean up state associated with the reserved Pod
fwk.RunReservePluginsUnreserve(ctx, state, assumedPod, scheduleResult.SuggestedHost) fwk.RunReservePluginsUnreserve(ctx, state, assumedPod, scheduleResult.SuggestedHost)
if err := sched.Cache.ForgetPod(assumedPod); err != nil { if err := sched.Cache.ForgetPod(assumedPod); err != nil {
@ -276,9 +302,15 @@ func (sched *Scheduler) bindingCycle(ctx context.Context, state *framework.Cycle
// "Forget"ing an assumed Pod in binding cycle should be treated as a PodDelete event, // "Forget"ing an assumed Pod in binding cycle should be treated as a PodDelete event,
// as the assumed Pod had occupied a certain amount of resources in scheduler cache. // as the assumed Pod had occupied a certain amount of resources in scheduler cache.
// TODO(#103853): de-duplicate the logic. // TODO(#103853): de-duplicate the logic.
if bindStatus.IsUnschedulable() {
defer sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(internalqueue.AssignedPodDelete, func(pod *v1.Pod) bool {
return assumedPod.UID != pod.UID
})
} else {
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(internalqueue.AssignedPodDelete, nil) sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(internalqueue.AssignedPodDelete, nil)
} }
sched.FailureHandler(ctx, fwk, assumedPodInfo, fmt.Errorf("binding rejected: %w", err), v1.PodReasonSchedulerError, clearNominatedNode) }
sched.FailureHandler(ctx, fwk, assumedPodInfo, fmt.Errorf("binding rejected: %w", bindStatus.AsError()), reason, clearNominatedNode)
return return
} }
// Calculating nodeResourceString can be heavy. Avoid it if klog verbosity is below 2. // Calculating nodeResourceString can be heavy. Avoid it if klog verbosity is below 2.
@ -773,23 +805,16 @@ func (sched *Scheduler) assume(assumed *v1.Pod, host string) error {
// bind binds a pod to a given node defined in a binding object. // bind binds a pod to a given node defined in a binding object.
// The precedence for binding is: (1) extenders and (2) framework plugins. // The precedence for binding is: (1) extenders and (2) framework plugins.
// We expect this to run asynchronously, so we handle binding metrics internally. // We expect this to run asynchronously, so we handle binding metrics internally.
func (sched *Scheduler) bind(ctx context.Context, fwk framework.Framework, assumed *v1.Pod, targetNode string, state *framework.CycleState) (err error) { func (sched *Scheduler) bind(ctx context.Context, fwk framework.Framework, assumed *v1.Pod, targetNode string, state *framework.CycleState) (status *framework.Status) {
defer func() { defer func() {
sched.finishBinding(fwk, assumed, targetNode, err) sched.finishBinding(fwk, assumed, targetNode, status)
}() }()
bound, err := sched.extendersBinding(assumed, targetNode) bound, err := sched.extendersBinding(assumed, targetNode)
if bound { if bound {
return err return framework.AsStatus(err)
} }
bindStatus := fwk.RunBindPlugins(ctx, state, assumed, targetNode) return fwk.RunBindPlugins(ctx, state, assumed, targetNode)
if bindStatus.IsSuccess() {
return nil
}
if bindStatus.Code() == framework.Error {
return bindStatus.AsError()
}
return fmt.Errorf("bind status: %s, %v", bindStatus.Code().String(), bindStatus.Message())
} }
// TODO(#87159): Move this to a Plugin. // TODO(#87159): Move this to a Plugin.
@ -806,11 +831,11 @@ func (sched *Scheduler) extendersBinding(pod *v1.Pod, node string) (bool, error)
return false, nil return false, nil
} }
func (sched *Scheduler) finishBinding(fwk framework.Framework, assumed *v1.Pod, targetNode string, err error) { func (sched *Scheduler) finishBinding(fwk framework.Framework, assumed *v1.Pod, targetNode string, status *framework.Status) {
if finErr := sched.Cache.FinishBinding(assumed); finErr != nil { if finErr := sched.Cache.FinishBinding(assumed); finErr != nil {
klog.ErrorS(finErr, "Scheduler cache FinishBinding failed") klog.ErrorS(finErr, "Scheduler cache FinishBinding failed")
} }
if err != nil { if !status.IsSuccess() {
klog.V(1).InfoS("Failed to bind pod", "pod", klog.KObj(assumed)) klog.V(1).InfoS("Failed to bind pod", "pod", klog.KObj(assumed))
return return
} }

View File

@ -1061,9 +1061,9 @@ func TestSchedulerBinding(t *testing.T) {
nodeInfoSnapshot: nil, nodeInfoSnapshot: nil,
percentageOfNodesToScore: 0, percentageOfNodesToScore: 0,
} }
err = sched.bind(ctx, fwk, pod, "node", nil) status := sched.bind(ctx, fwk, pod, "node", nil)
if err != nil { if !status.IsSuccess() {
t.Error(err) t.Error(status.AsError())
} }
// Checking default binding. // Checking default binding.

View File

@ -988,12 +988,6 @@ func TestPrebindPlugin(t *testing.T) {
reject: false, reject: false,
succeedOnRetry: true, succeedOnRetry: true,
}, },
{
name: "reject on 1st try but succeed on retry",
fail: false,
reject: true,
succeedOnRetry: true,
},
{ {
name: "failure on preBind moves unschedulable pods", name: "failure on preBind moves unschedulable pods",
fail: true, fail: true,
@ -1022,7 +1016,7 @@ func TestPrebindPlugin(t *testing.T) {
t.Errorf("Error while creating a test pod: %v", err) t.Errorf("Error while creating a test pod: %v", err)
} }
if test.fail || test.reject { if test.fail {
if test.succeedOnRetry { if test.succeedOnRetry {
if err = testutils.WaitForPodToScheduleWithTimeout(testCtx.ClientSet, pod, 10*time.Second); err != nil { if err = testutils.WaitForPodToScheduleWithTimeout(testCtx.ClientSet, pod, 10*time.Second); err != nil {
t.Errorf("Expected the pod to be schedulable on retry, but got an error: %v", err) t.Errorf("Expected the pod to be schedulable on retry, but got an error: %v", err)
@ -1030,6 +1024,10 @@ func TestPrebindPlugin(t *testing.T) {
} else if err = wait.Poll(10*time.Millisecond, 30*time.Second, podSchedulingError(testCtx.ClientSet, pod.Namespace, pod.Name)); err != nil { } else if err = wait.Poll(10*time.Millisecond, 30*time.Second, podSchedulingError(testCtx.ClientSet, pod.Namespace, pod.Name)); err != nil {
t.Errorf("Expected a scheduling error, but didn't get it. error: %v", err) t.Errorf("Expected a scheduling error, but didn't get it. error: %v", err)
} }
} else if test.reject {
if err = testutils.WaitForPodUnschedulable(testCtx.ClientSet, pod); err != nil {
t.Errorf("Expected the pod to be unschedulable")
}
} else if err = testutils.WaitForPodToSchedule(testCtx.ClientSet, pod); err != nil { } else if err = testutils.WaitForPodToSchedule(testCtx.ClientSet, pod); err != nil {
t.Errorf("Expected the pod to be scheduled. error: %v", err) t.Errorf("Expected the pod to be scheduled. error: %v", err)
} }
@ -1212,11 +1210,11 @@ func TestUnReservePermitPlugins(t *testing.T) {
tests := []struct { tests := []struct {
name string name string
plugin *PermitPlugin plugin *PermitPlugin
fail bool reject bool
}{ }{
{ {
name: "All Reserve plugins passed, but a Permit plugin was rejected", name: "All Reserve plugins passed, but a Permit plugin was rejected",
fail: true, reject: true,
plugin: &PermitPlugin{ plugin: &PermitPlugin{
name: "rejectedPermitPlugin", name: "rejectedPermitPlugin",
rejectPermit: true, rejectPermit: true,
@ -1224,7 +1222,7 @@ func TestUnReservePermitPlugins(t *testing.T) {
}, },
{ {
name: "All Reserve plugins passed, but a Permit plugin timeout in waiting", name: "All Reserve plugins passed, but a Permit plugin timeout in waiting",
fail: true, reject: true,
plugin: &PermitPlugin{ plugin: &PermitPlugin{
name: "timeoutPermitPlugin", name: "timeoutPermitPlugin",
timeoutPermit: true, timeoutPermit: true,
@ -1232,7 +1230,7 @@ func TestUnReservePermitPlugins(t *testing.T) {
}, },
{ {
name: "The Permit plugin succeed", name: "The Permit plugin succeed",
fail: false, reject: false,
plugin: &PermitPlugin{ plugin: &PermitPlugin{
name: "succeedPermitPlugin", name: "succeedPermitPlugin",
}, },
@ -1264,7 +1262,7 @@ func TestUnReservePermitPlugins(t *testing.T) {
t.Errorf("Error while creating a test pod: %v", err) t.Errorf("Error while creating a test pod: %v", err)
} }
if test.fail { if test.reject {
if err = waitForPodUnschedulable(testCtx.ClientSet, pod); err != nil { if err = waitForPodUnschedulable(testCtx.ClientSet, pod); err != nil {
t.Errorf("Didn't expect the pod to be scheduled. error: %v", err) t.Errorf("Didn't expect the pod to be scheduled. error: %v", err)
} }
@ -1298,11 +1296,11 @@ func TestUnReservePreBindPlugins(t *testing.T) {
tests := []struct { tests := []struct {
name string name string
plugin *PreBindPlugin plugin *PreBindPlugin
fail bool wantReject bool
}{ }{
{ {
name: "All Reserve plugins passed, but a PreBind plugin failed", name: "All Reserve plugins passed, but a PreBind plugin failed",
fail: true, wantReject: true,
plugin: &PreBindPlugin{ plugin: &PreBindPlugin{
podUIDs: make(map[types.UID]struct{}), podUIDs: make(map[types.UID]struct{}),
rejectPreBind: true, rejectPreBind: true,
@ -1310,7 +1308,7 @@ func TestUnReservePreBindPlugins(t *testing.T) {
}, },
{ {
name: "All Reserve plugins passed, and PreBind plugin succeed", name: "All Reserve plugins passed, and PreBind plugin succeed",
fail: false, wantReject: false,
plugin: &PreBindPlugin{podUIDs: make(map[types.UID]struct{})}, plugin: &PreBindPlugin{podUIDs: make(map[types.UID]struct{})},
}, },
} }
@ -1340,8 +1338,8 @@ func TestUnReservePreBindPlugins(t *testing.T) {
t.Errorf("Error while creating a test pod: %v", err) t.Errorf("Error while creating a test pod: %v", err)
} }
if test.fail { if test.wantReject {
if err = wait.Poll(10*time.Millisecond, 30*time.Second, podSchedulingError(testCtx.ClientSet, pod.Namespace, pod.Name)); err != nil { if err = waitForPodUnschedulable(testCtx.ClientSet, pod); err != nil {
t.Errorf("Expected a reasons other than Unschedulable, but got: %v", err) t.Errorf("Expected a reasons other than Unschedulable, but got: %v", err)
} }