Merge pull request #111775 from sanposhiho/fix-cotext-leak

fix(scheduler): split scheduleOne into two functions for schedulingCycle and bindingCycle
This commit is contained in:
Kubernetes Prow Robot 2022-08-23 19:00:29 -07:00 committed by GitHub
commit 5028dcb4ca
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -61,8 +61,6 @@ const (
minFeasibleNodesPercentageToFind = 5 minFeasibleNodesPercentageToFind = 5
) )
var clearNominatedNode = &framework.NominatingInfo{NominatingMode: framework.ModeOverride, NominatedNodeName: ""}
// scheduleOne does the entire scheduling workflow for a single pod. It is serialized on the scheduling algorithm's host fitting. // scheduleOne does the entire scheduling workflow for a single pod. It is serialized on the scheduling algorithm's host fitting.
func (sched *Scheduler) scheduleOne(ctx context.Context) { func (sched *Scheduler) scheduleOne(ctx context.Context) {
podInfo := sched.NextPod() podInfo := sched.NextPod()
@ -88,13 +86,36 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
start := time.Now() start := time.Now()
state := framework.NewCycleState() state := framework.NewCycleState()
state.SetRecordPluginMetrics(rand.Intn(100) < pluginMetricsSamplePercent) state.SetRecordPluginMetrics(rand.Intn(100) < pluginMetricsSamplePercent)
// Initialize an empty podsToActivate struct, which will be filled up by plugins or stay empty. // Initialize an empty podsToActivate struct, which will be filled up by plugins or stay empty.
podsToActivate := framework.NewPodsToActivate() podsToActivate := framework.NewPodsToActivate()
state.Write(framework.PodsToActivateKey, podsToActivate) state.Write(framework.PodsToActivateKey, podsToActivate)
schedulingCycleCtx, cancel := context.WithCancel(ctx) schedulingCycleCtx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
scheduleResult, err := sched.SchedulePod(schedulingCycleCtx, fwk, state, pod) scheduleResult, assumedPodInfo := sched.schedulingCycle(schedulingCycleCtx, state, fwk, podInfo, podsToActivate, start)
if scheduleResult.FeasibleNodes == 0 {
return
}
// bind the pod to its host asynchronously (we can do this b/c of the assumption step above).
go func() {
bindingCycleCtx, cancel := context.WithCancel(ctx)
defer cancel()
metrics.SchedulerGoroutines.WithLabelValues(metrics.Binding).Inc()
defer metrics.SchedulerGoroutines.WithLabelValues(metrics.Binding).Dec()
sched.bindingCycle(bindingCycleCtx, state, fwk, scheduleResult, assumedPodInfo, podsToActivate, start)
}()
}
var clearNominatedNode = &framework.NominatingInfo{NominatingMode: framework.ModeOverride, NominatedNodeName: ""}
// schedulingCycle tries to schedule a single Pod.
func (sched *Scheduler) schedulingCycle(ctx context.Context, state *framework.CycleState, fwk framework.Framework, podInfo *framework.QueuedPodInfo, podsToActivate *framework.PodsToActivate, start time.Time) (ScheduleResult, *framework.QueuedPodInfo) {
pod := podInfo.Pod
scheduleResult, err := sched.SchedulePod(ctx, fwk, state, pod)
if err != nil { if err != nil {
// SchedulePod() may have failed because the pod would not fit on any host, so we try to // SchedulePod() may have failed because the pod would not fit on any host, so we try to
// preempt, with the expectation that the next time the pod is tried for scheduling it // preempt, with the expectation that the next time the pod is tried for scheduling it
@ -131,7 +152,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start)) metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))
} }
sched.FailureHandler(ctx, fwk, podInfo, err, v1.PodReasonUnschedulable, nominatingInfo) sched.FailureHandler(ctx, fwk, podInfo, err, v1.PodReasonUnschedulable, nominatingInfo)
return return ScheduleResult{}, nil
} }
metrics.SchedulingAlgorithmLatency.Observe(metrics.SinceInSeconds(start)) metrics.SchedulingAlgorithmLatency.Observe(metrics.SinceInSeconds(start))
// Tell the cache to assume that a pod now is running on a given node, even though it hasn't been bound yet. // Tell the cache to assume that a pod now is running on a given node, even though it hasn't been bound yet.
@ -148,23 +169,23 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
// to a node and if so will not add it back to the unscheduled pods queue // to a node and if so will not add it back to the unscheduled pods queue
// (otherwise this would cause an infinite loop). // (otherwise this would cause an infinite loop).
sched.FailureHandler(ctx, fwk, assumedPodInfo, err, SchedulerError, clearNominatedNode) sched.FailureHandler(ctx, fwk, assumedPodInfo, err, SchedulerError, clearNominatedNode)
return return ScheduleResult{}, nil
} }
// Run the Reserve method of reserve plugins. // Run the Reserve method of reserve plugins.
if sts := fwk.RunReservePluginsReserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost); !sts.IsSuccess() { if sts := fwk.RunReservePluginsReserve(ctx, state, assumedPod, scheduleResult.SuggestedHost); !sts.IsSuccess() {
metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start)) metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))
// trigger un-reserve to clean up state associated with the reserved Pod // trigger un-reserve to clean up state associated with the reserved Pod
fwk.RunReservePluginsUnreserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) fwk.RunReservePluginsUnreserve(ctx, state, assumedPod, scheduleResult.SuggestedHost)
if forgetErr := sched.Cache.ForgetPod(assumedPod); forgetErr != nil { if forgetErr := sched.Cache.ForgetPod(assumedPod); forgetErr != nil {
klog.ErrorS(forgetErr, "Scheduler cache ForgetPod failed") klog.ErrorS(forgetErr, "Scheduler cache ForgetPod failed")
} }
sched.FailureHandler(ctx, fwk, assumedPodInfo, sts.AsError(), SchedulerError, clearNominatedNode) sched.FailureHandler(ctx, fwk, assumedPodInfo, sts.AsError(), SchedulerError, clearNominatedNode)
return return ScheduleResult{}, nil
} }
// Run "permit" plugins. // Run "permit" plugins.
runPermitStatus := fwk.RunPermitPlugins(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) runPermitStatus := fwk.RunPermitPlugins(ctx, state, assumedPod, scheduleResult.SuggestedHost)
if !runPermitStatus.IsWait() && !runPermitStatus.IsSuccess() { if !runPermitStatus.IsWait() && !runPermitStatus.IsSuccess() {
var reason string var reason string
if runPermitStatus.IsUnschedulable() { if runPermitStatus.IsUnschedulable() {
@ -175,12 +196,12 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
reason = SchedulerError reason = SchedulerError
} }
// One of the plugins returned status different than success or wait. // One of the plugins returned status different than success or wait.
fwk.RunReservePluginsUnreserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) fwk.RunReservePluginsUnreserve(ctx, state, assumedPod, scheduleResult.SuggestedHost)
if forgetErr := sched.Cache.ForgetPod(assumedPod); forgetErr != nil { if forgetErr := sched.Cache.ForgetPod(assumedPod); forgetErr != nil {
klog.ErrorS(forgetErr, "Scheduler cache ForgetPod failed") klog.ErrorS(forgetErr, "Scheduler cache ForgetPod failed")
} }
sched.FailureHandler(ctx, fwk, assumedPodInfo, runPermitStatus.AsError(), reason, clearNominatedNode) sched.FailureHandler(ctx, fwk, assumedPodInfo, runPermitStatus.AsError(), reason, clearNominatedNode)
return return ScheduleResult{}, nil
} }
// At the end of a successful scheduling cycle, pop and move up Pods if needed. // At the end of a successful scheduling cycle, pop and move up Pods if needed.
@ -190,92 +211,91 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
podsToActivate.Map = make(map[string]*v1.Pod) podsToActivate.Map = make(map[string]*v1.Pod)
} }
// bind the pod to its host asynchronously (we can do this b/c of the assumption step above). return scheduleResult, assumedPodInfo
go func() { }
bindingCycleCtx, cancel := context.WithCancel(ctx)
defer cancel()
metrics.SchedulerGoroutines.WithLabelValues(metrics.Binding).Inc()
defer metrics.SchedulerGoroutines.WithLabelValues(metrics.Binding).Dec()
waitOnPermitStatus := fwk.WaitOnPermit(bindingCycleCtx, assumedPod) // bindingCycle tries to bind an assumed Pod.
if !waitOnPermitStatus.IsSuccess() { func (sched *Scheduler) bindingCycle(ctx context.Context, state *framework.CycleState, fwk framework.Framework, scheduleResult ScheduleResult, assumedPodInfo *framework.QueuedPodInfo, podsToActivate *framework.PodsToActivate, start time.Time) {
var reason string assumedPod := assumedPodInfo.Pod
if waitOnPermitStatus.IsUnschedulable() {
metrics.PodUnschedulable(fwk.ProfileName(), metrics.SinceInSeconds(start))
reason = v1.PodReasonUnschedulable
} else {
metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))
reason = SchedulerError
}
// trigger un-reserve plugins to clean up state associated with the reserved Pod
fwk.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
if forgetErr := sched.Cache.ForgetPod(assumedPod); forgetErr != nil {
klog.ErrorS(forgetErr, "scheduler cache ForgetPod failed")
} else {
// "Forget"ing an assumed Pod in binding cycle should be treated as a PodDelete event,
// as the assumed Pod had occupied a certain amount of resources in scheduler cache.
// TODO(#103853): de-duplicate the logic.
// Avoid moving the assumed Pod itself as it's always Unschedulable.
// It's intentional to "defer" this operation; otherwise MoveAllToActiveOrBackoffQueue() would
// update `q.moveRequest` and thus move the assumed pod to backoffQ anyways.
defer sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(internalqueue.AssignedPodDelete, func(pod *v1.Pod) bool {
return assumedPod.UID != pod.UID
})
}
sched.FailureHandler(ctx, fwk, assumedPodInfo, waitOnPermitStatus.AsError(), reason, clearNominatedNode)
return
}
// Run "prebind" plugins. waitOnPermitStatus := fwk.WaitOnPermit(ctx, assumedPod)
preBindStatus := fwk.RunPreBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) if !waitOnPermitStatus.IsSuccess() {
if !preBindStatus.IsSuccess() { var reason string
if waitOnPermitStatus.IsUnschedulable() {
metrics.PodUnschedulable(fwk.ProfileName(), metrics.SinceInSeconds(start))
reason = v1.PodReasonUnschedulable
} else {
metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start)) metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))
// trigger un-reserve plugins to clean up state associated with the reserved Pod reason = SchedulerError
fwk.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
if forgetErr := sched.Cache.ForgetPod(assumedPod); forgetErr != nil {
klog.ErrorS(forgetErr, "scheduler cache ForgetPod failed")
} else {
// "Forget"ing an assumed Pod in binding cycle should be treated as a PodDelete event,
// as the assumed Pod had occupied a certain amount of resources in scheduler cache.
// TODO(#103853): de-duplicate the logic.
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(internalqueue.AssignedPodDelete, nil)
}
sched.FailureHandler(ctx, fwk, assumedPodInfo, preBindStatus.AsError(), SchedulerError, clearNominatedNode)
return
} }
// trigger un-reserve plugins to clean up state associated with the reserved Pod
err := sched.bind(bindingCycleCtx, fwk, assumedPod, scheduleResult.SuggestedHost, state) fwk.RunReservePluginsUnreserve(ctx, state, assumedPod, scheduleResult.SuggestedHost)
if err != nil { if forgetErr := sched.Cache.ForgetPod(assumedPod); forgetErr != nil {
metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start)) klog.ErrorS(forgetErr, "scheduler cache ForgetPod failed")
// trigger un-reserve plugins to clean up state associated with the reserved Pod } else {
fwk.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) // "Forget"ing an assumed Pod in binding cycle should be treated as a PodDelete event,
if err := sched.Cache.ForgetPod(assumedPod); err != nil { // as the assumed Pod had occupied a certain amount of resources in scheduler cache.
klog.ErrorS(err, "scheduler cache ForgetPod failed") // TODO(#103853): de-duplicate the logic.
} else { // Avoid moving the assumed Pod itself as it's always Unschedulable.
// "Forget"ing an assumed Pod in binding cycle should be treated as a PodDelete event, // It's intentional to "defer" this operation; otherwise MoveAllToActiveOrBackoffQueue() would
// as the assumed Pod had occupied a certain amount of resources in scheduler cache. // update `q.moveRequest` and thus move the assumed pod to backoffQ anyways.
// TODO(#103853): de-duplicate the logic. defer sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(internalqueue.AssignedPodDelete, func(pod *v1.Pod) bool {
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(internalqueue.AssignedPodDelete, nil) return assumedPod.UID != pod.UID
} })
sched.FailureHandler(ctx, fwk, assumedPodInfo, fmt.Errorf("binding rejected: %w", err), SchedulerError, clearNominatedNode)
return
} }
// Calculating nodeResourceString can be heavy. Avoid it if klog verbosity is below 2. sched.FailureHandler(ctx, fwk, assumedPodInfo, waitOnPermitStatus.AsError(), reason, clearNominatedNode)
klog.V(2).InfoS("Successfully bound pod to node", "pod", klog.KObj(pod), "node", scheduleResult.SuggestedHost, "evaluatedNodes", scheduleResult.EvaluatedNodes, "feasibleNodes", scheduleResult.FeasibleNodes) return
metrics.PodScheduled(fwk.ProfileName(), metrics.SinceInSeconds(start)) }
metrics.PodSchedulingAttempts.Observe(float64(podInfo.Attempts))
metrics.PodSchedulingDuration.WithLabelValues(getAttemptsLabel(podInfo)).Observe(metrics.SinceInSeconds(podInfo.InitialAttemptTimestamp))
// Run "postbind" plugins. // Run "prebind" plugins.
fwk.RunPostBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) preBindStatus := fwk.RunPreBindPlugins(ctx, state, assumedPod, scheduleResult.SuggestedHost)
if !preBindStatus.IsSuccess() {
// At the end of a successful binding cycle, move up Pods if needed. metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))
if len(podsToActivate.Map) != 0 { // trigger un-reserve plugins to clean up state associated with the reserved Pod
sched.SchedulingQueue.Activate(podsToActivate.Map) fwk.RunReservePluginsUnreserve(ctx, state, assumedPod, scheduleResult.SuggestedHost)
// Unlike the logic in scheduling cycle, we don't bother deleting the entries if forgetErr := sched.Cache.ForgetPod(assumedPod); forgetErr != nil {
// as `podsToActivate.Map` is no longer consumed. klog.ErrorS(forgetErr, "scheduler cache ForgetPod failed")
} else {
// "Forget"ing an assumed Pod in binding cycle should be treated as a PodDelete event,
// as the assumed Pod had occupied a certain amount of resources in scheduler cache.
// TODO(#103853): de-duplicate the logic.
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(internalqueue.AssignedPodDelete, nil)
} }
}() sched.FailureHandler(ctx, fwk, assumedPodInfo, preBindStatus.AsError(), SchedulerError, clearNominatedNode)
return
}
err := sched.bind(ctx, fwk, assumedPod, scheduleResult.SuggestedHost, state)
if err != nil {
metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))
// trigger un-reserve plugins to clean up state associated with the reserved Pod
fwk.RunReservePluginsUnreserve(ctx, state, assumedPod, scheduleResult.SuggestedHost)
if err := sched.Cache.ForgetPod(assumedPod); err != nil {
klog.ErrorS(err, "scheduler cache ForgetPod failed")
} else {
// "Forget"ing an assumed Pod in binding cycle should be treated as a PodDelete event,
// as the assumed Pod had occupied a certain amount of resources in scheduler cache.
// TODO(#103853): de-duplicate the logic.
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(internalqueue.AssignedPodDelete, nil)
}
sched.FailureHandler(ctx, fwk, assumedPodInfo, fmt.Errorf("binding rejected: %w", err), SchedulerError, clearNominatedNode)
return
}
// Calculating nodeResourceString can be heavy. Avoid it if klog verbosity is below 2.
klog.V(2).InfoS("Successfully bound pod to node", "pod", klog.KObj(assumedPod), "node", scheduleResult.SuggestedHost, "evaluatedNodes", scheduleResult.EvaluatedNodes, "feasibleNodes", scheduleResult.FeasibleNodes)
metrics.PodScheduled(fwk.ProfileName(), metrics.SinceInSeconds(start))
metrics.PodSchedulingAttempts.Observe(float64(assumedPodInfo.Attempts))
metrics.PodSchedulingDuration.WithLabelValues(getAttemptsLabel(assumedPodInfo)).Observe(metrics.SinceInSeconds(assumedPodInfo.InitialAttemptTimestamp))
// Run "postbind" plugins.
fwk.RunPostBindPlugins(ctx, state, assumedPod, scheduleResult.SuggestedHost)
// At the end of a successful binding cycle, move up Pods if needed.
if len(podsToActivate.Map) != 0 {
sched.SchedulingQueue.Activate(podsToActivate.Map)
// Unlike the logic in schedulingCycle(), we don't bother deleting the entries
// as `podsToActivate.Map` is no longer consumed.
}
} }
func (sched *Scheduler) frameworkForPod(pod *v1.Pod) (framework.Framework, error) { func (sched *Scheduler) frameworkForPod(pod *v1.Pod) (framework.Framework, error) {