From 6ca62eb2cb766de620b4d034f87def1ede2451ec Mon Sep 17 00:00:00 2001 From: kidddddddddddddddddddddd <1062602710@qq.com> Date: Wed, 7 Dec 2022 18:46:36 +0800 Subject: [PATCH] refactor --- pkg/scheduler/framework/interface.go | 5 +++ pkg/scheduler/schedule_one.go | 57 +++++++++++++----------- pkg/scheduler/schedule_one_test.go | 9 ++-- pkg/scheduler/scheduler.go | 5 +-- pkg/scheduler/scheduler_test.go | 14 +++--- test/integration/scheduler/queue_test.go | 4 +- 6 files changed, 48 insertions(+), 46 deletions(-) diff --git a/pkg/scheduler/framework/interface.go b/pkg/scheduler/framework/interface.go index 0049e7acb1b..4b4c8a7aef2 100644 --- a/pkg/scheduler/framework/interface.go +++ b/pkg/scheduler/framework/interface.go @@ -159,6 +159,11 @@ type Status struct { failedPlugin string } +func (s *Status) WithError(err error) *Status { + s.err = err + return s +} + // Code returns code of the Status. func (s *Status) Code() Code { if s == nil { diff --git a/pkg/scheduler/schedule_one.go b/pkg/scheduler/schedule_one.go index 4a45f5b4adb..188576a366f 100644 --- a/pkg/scheduler/schedule_one.go +++ b/pkg/scheduler/schedule_one.go @@ -92,9 +92,9 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) { schedulingCycleCtx, cancel := context.WithCancel(ctx) defer cancel() - scheduleResult, assumedPodInfo, err := sched.schedulingCycle(schedulingCycleCtx, state, fwk, podInfo, start, podsToActivate) - if err != nil { - sched.FailureHandler(schedulingCycleCtx, fwk, assumedPodInfo, err, scheduleResult.reason, scheduleResult.nominatingInfo, start) + scheduleResult, assumedPodInfo, status := sched.schedulingCycle(schedulingCycleCtx, state, fwk, podInfo, start, podsToActivate) + if !status.IsSuccess() { + sched.FailureHandler(schedulingCycleCtx, fwk, assumedPodInfo, status, scheduleResult.nominatingInfo, start) return } @@ -125,7 +125,7 @@ func (sched *Scheduler) schedulingCycle( podInfo *framework.QueuedPodInfo, start time.Time, podsToActivate *framework.PodsToActivate, -) (ScheduleResult, *framework.QueuedPodInfo, error) { +) (ScheduleResult, *framework.QueuedPodInfo, *framework.Status) { pod := podInfo.Pod scheduleResult, err := sched.SchedulePod(ctx, fwk, state, pod) @@ -134,8 +134,10 @@ func (sched *Scheduler) schedulingCycle( // preempt, with the expectation that the next time the pod is tried for scheduling it // will fit due to the preemption. It is also possible that a different pod will schedule // into the resources that were preempted, but this is harmless. - var nominatingInfo *framework.NominatingInfo - reason := v1.PodReasonUnschedulable + var ( + nominatingInfo *framework.NominatingInfo + status *framework.Status + ) if fitError, ok := err.(*framework.FitError); ok { if !fwk.HasPostFilterPlugins() { klog.V(3).InfoS("No PostFilter plugins are registered, so no preemption will be performed") @@ -152,15 +154,19 @@ func (sched *Scheduler) schedulingCycle( nominatingInfo = result.NominatingInfo } } + status = framework.NewStatus(framework.Unschedulable).WithError(err) } else if err == ErrNoNodesAvailable { nominatingInfo = clearNominatedNode + status = framework.NewStatus(framework.UnschedulableAndUnresolvable).WithError(err) } else { klog.ErrorS(err, "Error selecting node for pod", "pod", klog.KObj(pod)) nominatingInfo = clearNominatedNode - reason = v1.PodReasonSchedulerError + status = framework.AsStatus(err) } - return ScheduleResult{nominatingInfo: nominatingInfo, reason: reason}, podInfo, err + + return ScheduleResult{nominatingInfo: nominatingInfo}, podInfo, status } + metrics.SchedulingAlgorithmLatency.Observe(metrics.SinceInSeconds(start)) // Tell the cache to assume that a pod now is running on a given node, even though it hasn't been bound yet. // This allows us to keep scheduling without waiting on binding to occur. @@ -174,9 +180,9 @@ func (sched *Scheduler) schedulingCycle( // This relies on the fact that Error will check if the pod has been bound // to a node and if so will not add it back to the unscheduled pods queue // (otherwise this would cause an infinite loop). - return ScheduleResult{nominatingInfo: clearNominatedNode, reason: v1.PodReasonSchedulerError}, + return ScheduleResult{nominatingInfo: clearNominatedNode}, assumedPodInfo, - err + framework.AsStatus(err) } // Run the Reserve method of reserve plugins. @@ -187,9 +193,9 @@ func (sched *Scheduler) schedulingCycle( klog.ErrorS(forgetErr, "Scheduler cache ForgetPod failed") } - return ScheduleResult{nominatingInfo: clearNominatedNode, reason: v1.PodReasonSchedulerError}, + return ScheduleResult{nominatingInfo: clearNominatedNode}, assumedPodInfo, - sts.AsError() + sts } // Run "permit" plugins. @@ -201,14 +207,9 @@ func (sched *Scheduler) schedulingCycle( klog.ErrorS(forgetErr, "Scheduler cache ForgetPod failed") } - reason := v1.PodReasonSchedulerError - if runPermitStatus.IsUnschedulable() { - reason = v1.PodReasonUnschedulable - } - - return ScheduleResult{nominatingInfo: clearNominatedNode, reason: reason}, + return ScheduleResult{nominatingInfo: clearNominatedNode}, assumedPodInfo, - runPermitStatus.AsError() + runPermitStatus } // At the end of a successful scheduling cycle, pop and move up Pods if needed. @@ -297,12 +298,7 @@ func (sched *Scheduler) handleBindingCycleError( } } - reason := v1.PodReasonSchedulerError - if status.IsUnschedulable() { - reason = v1.PodReasonUnschedulable - } - - sched.FailureHandler(ctx, fwk, podInfo, status.AsError(), reason, clearNominatedNode, start) + sched.FailureHandler(ctx, fwk, podInfo, status, clearNominatedNode, start) } func (sched *Scheduler) frameworkForPod(pod *v1.Pod) (framework.Framework, error) { @@ -833,7 +829,12 @@ func getAttemptsLabel(p *framework.QueuedPodInfo) string { // handleSchedulingFailure records an event for the pod that indicates the // pod has failed to schedule. Also, update the pod condition and nominated node name if set. -func (sched *Scheduler) handleSchedulingFailure(ctx context.Context, fwk framework.Framework, podInfo *framework.QueuedPodInfo, err error, reason string, nominatingInfo *framework.NominatingInfo, start time.Time) { +func (sched *Scheduler) handleSchedulingFailure(ctx context.Context, fwk framework.Framework, podInfo *framework.QueuedPodInfo, status *framework.Status, nominatingInfo *framework.NominatingInfo, start time.Time) { + reason := v1.PodReasonSchedulerError + if status.IsUnschedulable() { + reason = v1.PodReasonUnschedulable + } + switch reason { case v1.PodReasonUnschedulable: metrics.PodUnschedulable(fwk.ProfileName(), metrics.SinceInSeconds(start)) @@ -842,12 +843,14 @@ func (sched *Scheduler) handleSchedulingFailure(ctx context.Context, fwk framewo } pod := podInfo.Pod + err := status.AsError() var errMsg string if err != nil { errMsg = err.Error() } + if err == ErrNoNodesAvailable { - klog.V(2).InfoS("Unable to schedule pod; no nodes are registered to the cluster; waiting", "pod", klog.KObj(pod)) + klog.V(2).InfoS("Unable to schedule pod; no nodes are registered to the cluster; waiting", "pod", klog.KObj(pod), "err", err) } else if fitError, ok := err.(*framework.FitError); ok { // Inject UnschedulablePlugins to PodInfo, which will be used later for moving Pods between queues efficiently. podInfo.UnschedulablePlugins = fitError.Diagnosis.UnschedulablePlugins diff --git a/pkg/scheduler/schedule_one_test.go b/pkg/scheduler/schedule_one_test.go index e2dad0ea07a..dc4180d9c41 100644 --- a/pkg/scheduler/schedule_one_test.go +++ b/pkg/scheduler/schedule_one_test.go @@ -600,11 +600,11 @@ func TestSchedulerScheduleOne(t *testing.T) { sched.SchedulePod = func(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (ScheduleResult, error) { return item.mockResult.result, item.mockResult.err } - sched.FailureHandler = func(_ context.Context, fwk framework.Framework, p *framework.QueuedPodInfo, err error, _ string, _ *framework.NominatingInfo, _ time.Time) { + sched.FailureHandler = func(_ context.Context, fwk framework.Framework, p *framework.QueuedPodInfo, status *framework.Status, _ *framework.NominatingInfo, _ time.Time) { gotPod = p.Pod - gotError = err + gotError = status.AsError() - msg := truncateMessage(err.Error()) + msg := truncateMessage(gotError.Error()) fwk.EventRecorder().Eventf(p.Pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", msg) } called := make(chan struct{}) @@ -2689,7 +2689,8 @@ func setupTestScheduler(ctx context.Context, t *testing.T, queuedPodStore *clien } sched.SchedulePod = sched.schedulePod - sched.FailureHandler = func(_ context.Context, _ framework.Framework, p *framework.QueuedPodInfo, err error, _ string, _ *framework.NominatingInfo, _ time.Time) { + sched.FailureHandler = func(_ context.Context, _ framework.Framework, p *framework.QueuedPodInfo, status *framework.Status, _ *framework.NominatingInfo, _ time.Time) { + err := status.AsError() errChan <- err msg := truncateMessage(err.Error()) diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 581c792a7cf..03bd947a325 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -132,9 +132,6 @@ type ScheduleResult struct { EvaluatedNodes int // The number of nodes out of the evaluated ones that fit the pod. FeasibleNodes int - - // The reason records the failure in scheduling cycle. - reason string // The nominating info for scheduling cycle. nominatingInfo *framework.NominatingInfo } @@ -430,7 +427,7 @@ func buildExtenders(extenders []schedulerapi.Extender, profiles []schedulerapi.K return fExtenders, nil } -type FailureHandlerFn func(ctx context.Context, fwk framework.Framework, podInfo *framework.QueuedPodInfo, err error, reason string, nominatingInfo *framework.NominatingInfo, start time.Time) +type FailureHandlerFn func(ctx context.Context, fwk framework.Framework, podInfo *framework.QueuedPodInfo, status *framework.Status, nominatingInfo *framework.NominatingInfo, start time.Time) func unionedGVKs(m map[framework.ClusterEvent]sets.String) map[framework.GVK]framework.ActionType { gvkMap := make(map[framework.GVK]framework.ActionType) diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index 9ae7f3ceca2..6510fa159c1 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -238,25 +238,21 @@ func TestFailureHandler(t *testing.T) { tests := []struct { name string - injectErr error podUpdatedDuringScheduling bool // pod is updated during a scheduling cycle podDeletedDuringScheduling bool // pod is deleted during a scheduling cycle expect *v1.Pod }{ { name: "pod is updated during a scheduling cycle", - injectErr: nil, podUpdatedDuringScheduling: true, expect: testPodUpdated, }, { - name: "pod is not updated during a scheduling cycle", - injectErr: nil, - expect: testPod, + name: "pod is not updated during a scheduling cycle", + expect: testPod, }, { name: "pod is deleted during a scheduling cycle", - injectErr: nil, podDeletedDuringScheduling: true, expect: nil, }, @@ -294,7 +290,7 @@ func TestFailureHandler(t *testing.T) { } testPodInfo := &framework.QueuedPodInfo{PodInfo: mustNewPodInfo(t, testPod)} - s.FailureHandler(ctx, fwk, testPodInfo, tt.injectErr, v1.PodReasonUnschedulable, nil, time.Now()) + s.FailureHandler(ctx, fwk, testPodInfo, framework.NewStatus(framework.Unschedulable), nil, time.Now()) var got *v1.Pod if tt.podUpdatedDuringScheduling { @@ -369,7 +365,7 @@ func TestFailureHandler_NodeNotFound(t *testing.T) { } testPodInfo := &framework.QueuedPodInfo{PodInfo: mustNewPodInfo(t, testPod)} - s.FailureHandler(ctx, fwk, testPodInfo, tt.injectErr, v1.PodReasonUnschedulable, nil, time.Now()) + s.FailureHandler(ctx, fwk, testPodInfo, framework.NewStatus(framework.Unschedulable).WithError(tt.injectErr), nil, time.Now()) gotNodes := schedulerCache.Dump().Nodes gotNodeNames := sets.NewString() @@ -408,7 +404,7 @@ func TestFailureHandler_PodAlreadyBound(t *testing.T) { } testPodInfo := &framework.QueuedPodInfo{PodInfo: mustNewPodInfo(t, testPod)} - s.FailureHandler(ctx, fwk, testPodInfo, fmt.Errorf("binding rejected: timeout"), v1.PodReasonUnschedulable, nil, time.Now()) + s.FailureHandler(ctx, fwk, testPodInfo, framework.NewStatus(framework.Unschedulable).WithError(fmt.Errorf("binding rejected: timeout")), nil, time.Now()) pod := getPodFromPriorityQueue(queue, testPod) if pod != nil { diff --git a/test/integration/scheduler/queue_test.go b/test/integration/scheduler/queue_test.go index 4227f843028..42b36f048a4 100644 --- a/test/integration/scheduler/queue_test.go +++ b/test/integration/scheduler/queue_test.go @@ -233,7 +233,7 @@ func TestCoreResourceEnqueue(t *testing.T) { if fitError == nil { t.Fatalf("Expect Pod %v to fail at scheduling.", podInfo.Pod.Name) } - testCtx.Scheduler.FailureHandler(ctx, fwk, podInfo, fitError, v1.PodReasonUnschedulable, nil, time.Now()) + testCtx.Scheduler.FailureHandler(ctx, fwk, podInfo, framework.NewStatus(framework.Unschedulable).WithError(fitError), nil, time.Now()) } // Trigger a NodeTaintChange event. @@ -409,7 +409,7 @@ func TestCustomResourceEnqueue(t *testing.T) { if fitError == nil { t.Fatalf("Expect Pod %v to fail at scheduling.", podInfo.Pod.Name) } - testCtx.Scheduler.FailureHandler(ctx, fwk, podInfo, fitError, v1.PodReasonUnschedulable, nil, time.Now()) + testCtx.Scheduler.FailureHandler(ctx, fwk, podInfo, framework.NewStatus(framework.Unschedulable).WithError(fitError), nil, time.Now()) // Scheduling cycle is incremented from 0 to 1 after NextPod() is called, so // pass a number larger than 1 to move Pod to unschedulablePods.