From 1538bbd73deeec046c0443cac46a969a4978b16d Mon Sep 17 00:00:00 2001 From: Kensei Nakada Date: Wed, 10 Aug 2022 05:11:36 +0000 Subject: [PATCH] fix(scheduler): split scheduleOne into two functions for schedulingCycle and bindingCycle --- pkg/scheduler/schedule_one.go | 200 +++++++++++++++++++--------------- 1 file changed, 110 insertions(+), 90 deletions(-) diff --git a/pkg/scheduler/schedule_one.go b/pkg/scheduler/schedule_one.go index 9d339048c02..9f3a170baad 100644 --- a/pkg/scheduler/schedule_one.go +++ b/pkg/scheduler/schedule_one.go @@ -61,8 +61,6 @@ const ( minFeasibleNodesPercentageToFind = 5 ) -var clearNominatedNode = &framework.NominatingInfo{NominatingMode: framework.ModeOverride, NominatedNodeName: ""} - // scheduleOne does the entire scheduling workflow for a single pod. It is serialized on the scheduling algorithm's host fitting. func (sched *Scheduler) scheduleOne(ctx context.Context) { podInfo := sched.NextPod() @@ -88,13 +86,36 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) { start := time.Now() state := framework.NewCycleState() state.SetRecordPluginMetrics(rand.Intn(100) < pluginMetricsSamplePercent) + // Initialize an empty podsToActivate struct, which will be filled up by plugins or stay empty. podsToActivate := framework.NewPodsToActivate() state.Write(framework.PodsToActivateKey, podsToActivate) schedulingCycleCtx, cancel := context.WithCancel(ctx) defer cancel() - scheduleResult, err := sched.SchedulePod(schedulingCycleCtx, fwk, state, pod) + scheduleResult, assumedPodInfo := sched.schedulingCycle(schedulingCycleCtx, state, fwk, podInfo, podsToActivate, start) + if scheduleResult.FeasibleNodes == 0 { + return + } + + // bind the pod to its host asynchronously (we can do this b/c of the assumption step above). + go func() { + bindingCycleCtx, cancel := context.WithCancel(ctx) + defer cancel() + + metrics.SchedulerGoroutines.WithLabelValues(metrics.Binding).Inc() + defer metrics.SchedulerGoroutines.WithLabelValues(metrics.Binding).Dec() + + sched.bindingCycle(bindingCycleCtx, state, fwk, scheduleResult, assumedPodInfo, podsToActivate, start) + }() +} + +var clearNominatedNode = &framework.NominatingInfo{NominatingMode: framework.ModeOverride, NominatedNodeName: ""} + +// schedulingCycle tries to schedule a single Pod. +func (sched *Scheduler) schedulingCycle(ctx context.Context, state *framework.CycleState, fwk framework.Framework, podInfo *framework.QueuedPodInfo, podsToActivate *framework.PodsToActivate, start time.Time) (ScheduleResult, *framework.QueuedPodInfo) { + pod := podInfo.Pod + scheduleResult, err := sched.SchedulePod(ctx, fwk, state, pod) if err != nil { // SchedulePod() may have failed because the pod would not fit on any host, so we try to // preempt, with the expectation that the next time the pod is tried for scheduling it @@ -131,7 +152,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) { metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start)) } sched.FailureHandler(ctx, fwk, podInfo, err, v1.PodReasonUnschedulable, nominatingInfo) - return + return ScheduleResult{}, nil } metrics.SchedulingAlgorithmLatency.Observe(metrics.SinceInSeconds(start)) // Tell the cache to assume that a pod now is running on a given node, even though it hasn't been bound yet. @@ -148,23 +169,23 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) { // to a node and if so will not add it back to the unscheduled pods queue // (otherwise this would cause an infinite loop). sched.FailureHandler(ctx, fwk, assumedPodInfo, err, SchedulerError, clearNominatedNode) - return + return ScheduleResult{}, nil } // Run the Reserve method of reserve plugins. - if sts := fwk.RunReservePluginsReserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost); !sts.IsSuccess() { + if sts := fwk.RunReservePluginsReserve(ctx, state, assumedPod, scheduleResult.SuggestedHost); !sts.IsSuccess() { metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start)) // trigger un-reserve to clean up state associated with the reserved Pod - fwk.RunReservePluginsUnreserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) + fwk.RunReservePluginsUnreserve(ctx, state, assumedPod, scheduleResult.SuggestedHost) if forgetErr := sched.Cache.ForgetPod(assumedPod); forgetErr != nil { klog.ErrorS(forgetErr, "Scheduler cache ForgetPod failed") } sched.FailureHandler(ctx, fwk, assumedPodInfo, sts.AsError(), SchedulerError, clearNominatedNode) - return + return ScheduleResult{}, nil } // Run "permit" plugins. - runPermitStatus := fwk.RunPermitPlugins(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) + runPermitStatus := fwk.RunPermitPlugins(ctx, state, assumedPod, scheduleResult.SuggestedHost) if !runPermitStatus.IsWait() && !runPermitStatus.IsSuccess() { var reason string if runPermitStatus.IsUnschedulable() { @@ -175,12 +196,12 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) { reason = SchedulerError } // One of the plugins returned status different than success or wait. - fwk.RunReservePluginsUnreserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) + fwk.RunReservePluginsUnreserve(ctx, state, assumedPod, scheduleResult.SuggestedHost) if forgetErr := sched.Cache.ForgetPod(assumedPod); forgetErr != nil { klog.ErrorS(forgetErr, "Scheduler cache ForgetPod failed") } sched.FailureHandler(ctx, fwk, assumedPodInfo, runPermitStatus.AsError(), reason, clearNominatedNode) - return + return ScheduleResult{}, nil } // At the end of a successful scheduling cycle, pop and move up Pods if needed. @@ -190,92 +211,91 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) { podsToActivate.Map = make(map[string]*v1.Pod) } - // bind the pod to its host asynchronously (we can do this b/c of the assumption step above). - go func() { - bindingCycleCtx, cancel := context.WithCancel(ctx) - defer cancel() - metrics.SchedulerGoroutines.WithLabelValues(metrics.Binding).Inc() - defer metrics.SchedulerGoroutines.WithLabelValues(metrics.Binding).Dec() + return scheduleResult, assumedPodInfo +} - waitOnPermitStatus := fwk.WaitOnPermit(bindingCycleCtx, assumedPod) - if !waitOnPermitStatus.IsSuccess() { - var reason string - if waitOnPermitStatus.IsUnschedulable() { - metrics.PodUnschedulable(fwk.ProfileName(), metrics.SinceInSeconds(start)) - reason = v1.PodReasonUnschedulable - } else { - metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start)) - reason = SchedulerError - } - // trigger un-reserve plugins to clean up state associated with the reserved Pod - fwk.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) - if forgetErr := sched.Cache.ForgetPod(assumedPod); forgetErr != nil { - klog.ErrorS(forgetErr, "scheduler cache ForgetPod failed") - } else { - // "Forget"ing an assumed Pod in binding cycle should be treated as a PodDelete event, - // as the assumed Pod had occupied a certain amount of resources in scheduler cache. - // TODO(#103853): de-duplicate the logic. - // Avoid moving the assumed Pod itself as it's always Unschedulable. - // It's intentional to "defer" this operation; otherwise MoveAllToActiveOrBackoffQueue() would - // update `q.moveRequest` and thus move the assumed pod to backoffQ anyways. - defer sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(internalqueue.AssignedPodDelete, func(pod *v1.Pod) bool { - return assumedPod.UID != pod.UID - }) - } - sched.FailureHandler(ctx, fwk, assumedPodInfo, waitOnPermitStatus.AsError(), reason, clearNominatedNode) - return - } +// bindingCycle tries to bind an assumed Pod. +func (sched *Scheduler) bindingCycle(ctx context.Context, state *framework.CycleState, fwk framework.Framework, scheduleResult ScheduleResult, assumedPodInfo *framework.QueuedPodInfo, podsToActivate *framework.PodsToActivate, start time.Time) { + assumedPod := assumedPodInfo.Pod - // Run "prebind" plugins. - preBindStatus := fwk.RunPreBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) - if !preBindStatus.IsSuccess() { + waitOnPermitStatus := fwk.WaitOnPermit(ctx, assumedPod) + if !waitOnPermitStatus.IsSuccess() { + var reason string + if waitOnPermitStatus.IsUnschedulable() { + metrics.PodUnschedulable(fwk.ProfileName(), metrics.SinceInSeconds(start)) + reason = v1.PodReasonUnschedulable + } else { metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start)) - // trigger un-reserve plugins to clean up state associated with the reserved Pod - fwk.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) - if forgetErr := sched.Cache.ForgetPod(assumedPod); forgetErr != nil { - klog.ErrorS(forgetErr, "scheduler cache ForgetPod failed") - } else { - // "Forget"ing an assumed Pod in binding cycle should be treated as a PodDelete event, - // as the assumed Pod had occupied a certain amount of resources in scheduler cache. - // TODO(#103853): de-duplicate the logic. - sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(internalqueue.AssignedPodDelete, nil) - } - sched.FailureHandler(ctx, fwk, assumedPodInfo, preBindStatus.AsError(), SchedulerError, clearNominatedNode) - return + reason = SchedulerError } - - err := sched.bind(bindingCycleCtx, fwk, assumedPod, scheduleResult.SuggestedHost, state) - if err != nil { - metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start)) - // trigger un-reserve plugins to clean up state associated with the reserved Pod - fwk.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) - if err := sched.Cache.ForgetPod(assumedPod); err != nil { - klog.ErrorS(err, "scheduler cache ForgetPod failed") - } else { - // "Forget"ing an assumed Pod in binding cycle should be treated as a PodDelete event, - // as the assumed Pod had occupied a certain amount of resources in scheduler cache. - // TODO(#103853): de-duplicate the logic. - sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(internalqueue.AssignedPodDelete, nil) - } - sched.FailureHandler(ctx, fwk, assumedPodInfo, fmt.Errorf("binding rejected: %w", err), SchedulerError, clearNominatedNode) - return + // trigger un-reserve plugins to clean up state associated with the reserved Pod + fwk.RunReservePluginsUnreserve(ctx, state, assumedPod, scheduleResult.SuggestedHost) + if forgetErr := sched.Cache.ForgetPod(assumedPod); forgetErr != nil { + klog.ErrorS(forgetErr, "scheduler cache ForgetPod failed") + } else { + // "Forget"ing an assumed Pod in binding cycle should be treated as a PodDelete event, + // as the assumed Pod had occupied a certain amount of resources in scheduler cache. + // TODO(#103853): de-duplicate the logic. + // Avoid moving the assumed Pod itself as it's always Unschedulable. + // It's intentional to "defer" this operation; otherwise MoveAllToActiveOrBackoffQueue() would + // update `q.moveRequest` and thus move the assumed pod to backoffQ anyways. + defer sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(internalqueue.AssignedPodDelete, func(pod *v1.Pod) bool { + return assumedPod.UID != pod.UID + }) } - // Calculating nodeResourceString can be heavy. Avoid it if klog verbosity is below 2. - klog.V(2).InfoS("Successfully bound pod to node", "pod", klog.KObj(pod), "node", scheduleResult.SuggestedHost, "evaluatedNodes", scheduleResult.EvaluatedNodes, "feasibleNodes", scheduleResult.FeasibleNodes) - metrics.PodScheduled(fwk.ProfileName(), metrics.SinceInSeconds(start)) - metrics.PodSchedulingAttempts.Observe(float64(podInfo.Attempts)) - metrics.PodSchedulingDuration.WithLabelValues(getAttemptsLabel(podInfo)).Observe(metrics.SinceInSeconds(podInfo.InitialAttemptTimestamp)) + sched.FailureHandler(ctx, fwk, assumedPodInfo, waitOnPermitStatus.AsError(), reason, clearNominatedNode) + return + } - // Run "postbind" plugins. - fwk.RunPostBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) - - // At the end of a successful binding cycle, move up Pods if needed. - if len(podsToActivate.Map) != 0 { - sched.SchedulingQueue.Activate(podsToActivate.Map) - // Unlike the logic in scheduling cycle, we don't bother deleting the entries - // as `podsToActivate.Map` is no longer consumed. + // Run "prebind" plugins. + preBindStatus := fwk.RunPreBindPlugins(ctx, state, assumedPod, scheduleResult.SuggestedHost) + if !preBindStatus.IsSuccess() { + metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start)) + // trigger un-reserve plugins to clean up state associated with the reserved Pod + fwk.RunReservePluginsUnreserve(ctx, state, assumedPod, scheduleResult.SuggestedHost) + if forgetErr := sched.Cache.ForgetPod(assumedPod); forgetErr != nil { + klog.ErrorS(forgetErr, "scheduler cache ForgetPod failed") + } else { + // "Forget"ing an assumed Pod in binding cycle should be treated as a PodDelete event, + // as the assumed Pod had occupied a certain amount of resources in scheduler cache. + // TODO(#103853): de-duplicate the logic. + sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(internalqueue.AssignedPodDelete, nil) } - }() + sched.FailureHandler(ctx, fwk, assumedPodInfo, preBindStatus.AsError(), SchedulerError, clearNominatedNode) + return + } + + err := sched.bind(ctx, fwk, assumedPod, scheduleResult.SuggestedHost, state) + if err != nil { + metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start)) + // trigger un-reserve plugins to clean up state associated with the reserved Pod + fwk.RunReservePluginsUnreserve(ctx, state, assumedPod, scheduleResult.SuggestedHost) + if err := sched.Cache.ForgetPod(assumedPod); err != nil { + klog.ErrorS(err, "scheduler cache ForgetPod failed") + } else { + // "Forget"ing an assumed Pod in binding cycle should be treated as a PodDelete event, + // as the assumed Pod had occupied a certain amount of resources in scheduler cache. + // TODO(#103853): de-duplicate the logic. + sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(internalqueue.AssignedPodDelete, nil) + } + sched.FailureHandler(ctx, fwk, assumedPodInfo, fmt.Errorf("binding rejected: %w", err), SchedulerError, clearNominatedNode) + return + } + // Calculating nodeResourceString can be heavy. Avoid it if klog verbosity is below 2. + klog.V(2).InfoS("Successfully bound pod to node", "pod", klog.KObj(assumedPod), "node", scheduleResult.SuggestedHost, "evaluatedNodes", scheduleResult.EvaluatedNodes, "feasibleNodes", scheduleResult.FeasibleNodes) + metrics.PodScheduled(fwk.ProfileName(), metrics.SinceInSeconds(start)) + metrics.PodSchedulingAttempts.Observe(float64(assumedPodInfo.Attempts)) + metrics.PodSchedulingDuration.WithLabelValues(getAttemptsLabel(assumedPodInfo)).Observe(metrics.SinceInSeconds(assumedPodInfo.InitialAttemptTimestamp)) + + // Run "postbind" plugins. + fwk.RunPostBindPlugins(ctx, state, assumedPod, scheduleResult.SuggestedHost) + + // At the end of a successful binding cycle, move up Pods if needed. + if len(podsToActivate.Map) != 0 { + sched.SchedulingQueue.Activate(podsToActivate.Map) + // Unlike the logic in schedulingCycle(), we don't bother deleting the entries + // as `podsToActivate.Map` is no longer consumed. + } } func (sched *Scheduler) frameworkForPod(pod *v1.Pod) (framework.Framework, error) {