Merge pull request #114082 from kidddddddddddddddddddddd/refactor_handleSchedulingFailure

pass status to handleSchedulingFailure
This commit is contained in:
Kubernetes Prow Robot 2022-12-12 22:05:34 -08:00 committed by GitHub
commit dc1e77143f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 48 additions and 46 deletions

View File

@ -159,6 +159,11 @@ type Status struct {
failedPlugin string
}
func (s *Status) WithError(err error) *Status {
s.err = err
return s
}
// Code returns code of the Status.
func (s *Status) Code() Code {
if s == nil {

View File

@ -92,9 +92,9 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
schedulingCycleCtx, cancel := context.WithCancel(ctx)
defer cancel()
scheduleResult, assumedPodInfo, err := sched.schedulingCycle(schedulingCycleCtx, state, fwk, podInfo, start, podsToActivate)
if err != nil {
sched.FailureHandler(schedulingCycleCtx, fwk, assumedPodInfo, err, scheduleResult.reason, scheduleResult.nominatingInfo, start)
scheduleResult, assumedPodInfo, status := sched.schedulingCycle(schedulingCycleCtx, state, fwk, podInfo, start, podsToActivate)
if !status.IsSuccess() {
sched.FailureHandler(schedulingCycleCtx, fwk, assumedPodInfo, status, scheduleResult.nominatingInfo, start)
return
}
@ -125,7 +125,7 @@ func (sched *Scheduler) schedulingCycle(
podInfo *framework.QueuedPodInfo,
start time.Time,
podsToActivate *framework.PodsToActivate,
) (ScheduleResult, *framework.QueuedPodInfo, error) {
) (ScheduleResult, *framework.QueuedPodInfo, *framework.Status) {
pod := podInfo.Pod
scheduleResult, err := sched.SchedulePod(ctx, fwk, state, pod)
@ -134,8 +134,10 @@ func (sched *Scheduler) schedulingCycle(
// preempt, with the expectation that the next time the pod is tried for scheduling it
// will fit due to the preemption. It is also possible that a different pod will schedule
// into the resources that were preempted, but this is harmless.
var nominatingInfo *framework.NominatingInfo
reason := v1.PodReasonUnschedulable
var (
nominatingInfo *framework.NominatingInfo
status *framework.Status
)
if fitError, ok := err.(*framework.FitError); ok {
if !fwk.HasPostFilterPlugins() {
klog.V(3).InfoS("No PostFilter plugins are registered, so no preemption will be performed")
@ -153,15 +155,19 @@ func (sched *Scheduler) schedulingCycle(
nominatingInfo = result.NominatingInfo
}
}
status = framework.NewStatus(framework.Unschedulable).WithError(err)
} else if err == ErrNoNodesAvailable {
nominatingInfo = clearNominatedNode
status = framework.NewStatus(framework.UnschedulableAndUnresolvable).WithError(err)
} else {
klog.ErrorS(err, "Error selecting node for pod", "pod", klog.KObj(pod))
nominatingInfo = clearNominatedNode
reason = v1.PodReasonSchedulerError
status = framework.AsStatus(err)
}
return ScheduleResult{nominatingInfo: nominatingInfo, reason: reason}, podInfo, err
return ScheduleResult{nominatingInfo: nominatingInfo}, podInfo, status
}
metrics.SchedulingAlgorithmLatency.Observe(metrics.SinceInSeconds(start))
// Tell the cache to assume that a pod now is running on a given node, even though it hasn't been bound yet.
// This allows us to keep scheduling without waiting on binding to occur.
@ -175,9 +181,9 @@ func (sched *Scheduler) schedulingCycle(
// This relies on the fact that Error will check if the pod has been bound
// to a node and if so will not add it back to the unscheduled pods queue
// (otherwise this would cause an infinite loop).
return ScheduleResult{nominatingInfo: clearNominatedNode, reason: v1.PodReasonSchedulerError},
return ScheduleResult{nominatingInfo: clearNominatedNode},
assumedPodInfo,
err
framework.AsStatus(err)
}
// Run the Reserve method of reserve plugins.
@ -188,9 +194,9 @@ func (sched *Scheduler) schedulingCycle(
klog.ErrorS(forgetErr, "Scheduler cache ForgetPod failed")
}
return ScheduleResult{nominatingInfo: clearNominatedNode, reason: v1.PodReasonSchedulerError},
return ScheduleResult{nominatingInfo: clearNominatedNode},
assumedPodInfo,
sts.AsError()
sts
}
// Run "permit" plugins.
@ -202,14 +208,9 @@ func (sched *Scheduler) schedulingCycle(
klog.ErrorS(forgetErr, "Scheduler cache ForgetPod failed")
}
reason := v1.PodReasonSchedulerError
if runPermitStatus.IsUnschedulable() {
reason = v1.PodReasonUnschedulable
}
return ScheduleResult{nominatingInfo: clearNominatedNode, reason: reason},
return ScheduleResult{nominatingInfo: clearNominatedNode},
assumedPodInfo,
runPermitStatus.AsError()
runPermitStatus
}
// At the end of a successful scheduling cycle, pop and move up Pods if needed.
@ -298,12 +299,7 @@ func (sched *Scheduler) handleBindingCycleError(
}
}
reason := v1.PodReasonSchedulerError
if status.IsUnschedulable() {
reason = v1.PodReasonUnschedulable
}
sched.FailureHandler(ctx, fwk, podInfo, status.AsError(), reason, clearNominatedNode, start)
sched.FailureHandler(ctx, fwk, podInfo, status, clearNominatedNode, start)
}
func (sched *Scheduler) frameworkForPod(pod *v1.Pod) (framework.Framework, error) {
@ -848,7 +844,12 @@ func getAttemptsLabel(p *framework.QueuedPodInfo) string {
// handleSchedulingFailure records an event for the pod that indicates the
// pod has failed to schedule. Also, update the pod condition and nominated node name if set.
func (sched *Scheduler) handleSchedulingFailure(ctx context.Context, fwk framework.Framework, podInfo *framework.QueuedPodInfo, err error, reason string, nominatingInfo *framework.NominatingInfo, start time.Time) {
func (sched *Scheduler) handleSchedulingFailure(ctx context.Context, fwk framework.Framework, podInfo *framework.QueuedPodInfo, status *framework.Status, nominatingInfo *framework.NominatingInfo, start time.Time) {
reason := v1.PodReasonSchedulerError
if status.IsUnschedulable() {
reason = v1.PodReasonUnschedulable
}
switch reason {
case v1.PodReasonUnschedulable:
metrics.PodUnschedulable(fwk.ProfileName(), metrics.SinceInSeconds(start))
@ -857,12 +858,14 @@ func (sched *Scheduler) handleSchedulingFailure(ctx context.Context, fwk framewo
}
pod := podInfo.Pod
err := status.AsError()
var errMsg string
if err != nil {
errMsg = err.Error()
}
if err == ErrNoNodesAvailable {
klog.V(2).InfoS("Unable to schedule pod; no nodes are registered to the cluster; waiting", "pod", klog.KObj(pod))
klog.V(2).InfoS("Unable to schedule pod; no nodes are registered to the cluster; waiting", "pod", klog.KObj(pod), "err", err)
} else if fitError, ok := err.(*framework.FitError); ok {
// Inject UnschedulablePlugins to PodInfo, which will be used later for moving Pods between queues efficiently.
podInfo.UnschedulablePlugins = fitError.Diagnosis.UnschedulablePlugins

View File

@ -601,11 +601,11 @@ func TestSchedulerScheduleOne(t *testing.T) {
sched.SchedulePod = func(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (ScheduleResult, error) {
return item.mockResult.result, item.mockResult.err
}
sched.FailureHandler = func(_ context.Context, fwk framework.Framework, p *framework.QueuedPodInfo, err error, _ string, _ *framework.NominatingInfo, _ time.Time) {
sched.FailureHandler = func(_ context.Context, fwk framework.Framework, p *framework.QueuedPodInfo, status *framework.Status, _ *framework.NominatingInfo, _ time.Time) {
gotPod = p.Pod
gotError = err
gotError = status.AsError()
msg := truncateMessage(err.Error())
msg := truncateMessage(gotError.Error())
fwk.EventRecorder().Eventf(p.Pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", msg)
}
called := make(chan struct{})
@ -2858,7 +2858,8 @@ func setupTestScheduler(ctx context.Context, t *testing.T, queuedPodStore *clien
}
sched.SchedulePod = sched.schedulePod
sched.FailureHandler = func(_ context.Context, _ framework.Framework, p *framework.QueuedPodInfo, err error, _ string, _ *framework.NominatingInfo, _ time.Time) {
sched.FailureHandler = func(_ context.Context, _ framework.Framework, p *framework.QueuedPodInfo, status *framework.Status, _ *framework.NominatingInfo, _ time.Time) {
err := status.AsError()
errChan <- err
msg := truncateMessage(err.Error())

View File

@ -132,9 +132,6 @@ type ScheduleResult struct {
EvaluatedNodes int
// The number of nodes out of the evaluated ones that fit the pod.
FeasibleNodes int
// The reason records the failure in scheduling cycle.
reason string
// The nominating info for scheduling cycle.
nominatingInfo *framework.NominatingInfo
}
@ -430,7 +427,7 @@ func buildExtenders(extenders []schedulerapi.Extender, profiles []schedulerapi.K
return fExtenders, nil
}
type FailureHandlerFn func(ctx context.Context, fwk framework.Framework, podInfo *framework.QueuedPodInfo, err error, reason string, nominatingInfo *framework.NominatingInfo, start time.Time)
type FailureHandlerFn func(ctx context.Context, fwk framework.Framework, podInfo *framework.QueuedPodInfo, status *framework.Status, nominatingInfo *framework.NominatingInfo, start time.Time)
func unionedGVKs(m map[framework.ClusterEvent]sets.String) map[framework.GVK]framework.ActionType {
gvkMap := make(map[framework.GVK]framework.ActionType)

View File

@ -238,25 +238,21 @@ func TestFailureHandler(t *testing.T) {
tests := []struct {
name string
injectErr error
podUpdatedDuringScheduling bool // pod is updated during a scheduling cycle
podDeletedDuringScheduling bool // pod is deleted during a scheduling cycle
expect *v1.Pod
}{
{
name: "pod is updated during a scheduling cycle",
injectErr: nil,
podUpdatedDuringScheduling: true,
expect: testPodUpdated,
},
{
name: "pod is not updated during a scheduling cycle",
injectErr: nil,
expect: testPod,
name: "pod is not updated during a scheduling cycle",
expect: testPod,
},
{
name: "pod is deleted during a scheduling cycle",
injectErr: nil,
podDeletedDuringScheduling: true,
expect: nil,
},
@ -294,7 +290,7 @@ func TestFailureHandler(t *testing.T) {
}
testPodInfo := &framework.QueuedPodInfo{PodInfo: mustNewPodInfo(t, testPod)}
s.FailureHandler(ctx, fwk, testPodInfo, tt.injectErr, v1.PodReasonUnschedulable, nil, time.Now())
s.FailureHandler(ctx, fwk, testPodInfo, framework.NewStatus(framework.Unschedulable), nil, time.Now())
var got *v1.Pod
if tt.podUpdatedDuringScheduling {
@ -369,7 +365,7 @@ func TestFailureHandler_NodeNotFound(t *testing.T) {
}
testPodInfo := &framework.QueuedPodInfo{PodInfo: mustNewPodInfo(t, testPod)}
s.FailureHandler(ctx, fwk, testPodInfo, tt.injectErr, v1.PodReasonUnschedulable, nil, time.Now())
s.FailureHandler(ctx, fwk, testPodInfo, framework.NewStatus(framework.Unschedulable).WithError(tt.injectErr), nil, time.Now())
gotNodes := schedulerCache.Dump().Nodes
gotNodeNames := sets.NewString()
@ -408,7 +404,7 @@ func TestFailureHandler_PodAlreadyBound(t *testing.T) {
}
testPodInfo := &framework.QueuedPodInfo{PodInfo: mustNewPodInfo(t, testPod)}
s.FailureHandler(ctx, fwk, testPodInfo, fmt.Errorf("binding rejected: timeout"), v1.PodReasonUnschedulable, nil, time.Now())
s.FailureHandler(ctx, fwk, testPodInfo, framework.NewStatus(framework.Unschedulable).WithError(fmt.Errorf("binding rejected: timeout")), nil, time.Now())
pod := getPodFromPriorityQueue(queue, testPod)
if pod != nil {

View File

@ -233,7 +233,7 @@ func TestCoreResourceEnqueue(t *testing.T) {
if fitError == nil {
t.Fatalf("Expect Pod %v to fail at scheduling.", podInfo.Pod.Name)
}
testCtx.Scheduler.FailureHandler(ctx, fwk, podInfo, fitError, v1.PodReasonUnschedulable, nil, time.Now())
testCtx.Scheduler.FailureHandler(ctx, fwk, podInfo, framework.NewStatus(framework.Unschedulable).WithError(fitError), nil, time.Now())
}
// Trigger a NodeTaintChange event.
@ -409,7 +409,7 @@ func TestCustomResourceEnqueue(t *testing.T) {
if fitError == nil {
t.Fatalf("Expect Pod %v to fail at scheduling.", podInfo.Pod.Name)
}
testCtx.Scheduler.FailureHandler(ctx, fwk, podInfo, fitError, v1.PodReasonUnschedulable, nil, time.Now())
testCtx.Scheduler.FailureHandler(ctx, fwk, podInfo, framework.NewStatus(framework.Unschedulable).WithError(fitError), nil, time.Now())
// Scheduling cycle is incremented from 0 to 1 after NextPod() is called, so
// pass a number larger than 1 to move Pod to unschedulablePods.