mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-20 10:20:51 +00:00
Refactor schedulingCycle and bindingCycle in scheduler
Signed-off-by: kerthcet <kerthcet@gmail.com>
This commit is contained in:
parent
83415e5c9e
commit
f7f857814f
@ -91,8 +91,10 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
|
|||||||
|
|
||||||
schedulingCycleCtx, cancel := context.WithCancel(ctx)
|
schedulingCycleCtx, cancel := context.WithCancel(ctx)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
scheduleResult, assumedPodInfo := sched.schedulingCycle(schedulingCycleCtx, state, fwk, podInfo, podsToActivate, start)
|
|
||||||
if scheduleResult.FeasibleNodes == 0 {
|
scheduleResult, assumedPodInfo, err := sched.schedulingCycle(schedulingCycleCtx, state, fwk, podInfo, start, podsToActivate)
|
||||||
|
if err != nil {
|
||||||
|
sched.FailureHandler(schedulingCycleCtx, fwk, assumedPodInfo, err, scheduleResult.reason, scheduleResult.nominatingInfo, start)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -106,14 +108,25 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
|
|||||||
metrics.Goroutines.WithLabelValues(metrics.Binding).Inc()
|
metrics.Goroutines.WithLabelValues(metrics.Binding).Inc()
|
||||||
defer metrics.Goroutines.WithLabelValues(metrics.Binding).Dec()
|
defer metrics.Goroutines.WithLabelValues(metrics.Binding).Dec()
|
||||||
|
|
||||||
sched.bindingCycle(bindingCycleCtx, state, fwk, scheduleResult, assumedPodInfo, podsToActivate, start)
|
status := sched.bindingCycle(bindingCycleCtx, state, fwk, scheduleResult, assumedPodInfo, start, podsToActivate)
|
||||||
|
if !status.IsSuccess() {
|
||||||
|
sched.handleBindingCycleError(bindingCycleCtx, state, fwk, assumedPodInfo, start, scheduleResult, status)
|
||||||
|
}
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
var clearNominatedNode = &framework.NominatingInfo{NominatingMode: framework.ModeOverride, NominatedNodeName: ""}
|
var clearNominatedNode = &framework.NominatingInfo{NominatingMode: framework.ModeOverride, NominatedNodeName: ""}
|
||||||
|
|
||||||
// schedulingCycle tries to schedule a single Pod.
|
// schedulingCycle tries to schedule a single Pod.
|
||||||
func (sched *Scheduler) schedulingCycle(ctx context.Context, state *framework.CycleState, fwk framework.Framework, podInfo *framework.QueuedPodInfo, podsToActivate *framework.PodsToActivate, start time.Time) (ScheduleResult, *framework.QueuedPodInfo) {
|
func (sched *Scheduler) schedulingCycle(
|
||||||
|
ctx context.Context,
|
||||||
|
state *framework.CycleState,
|
||||||
|
fwk framework.Framework,
|
||||||
|
podInfo *framework.QueuedPodInfo,
|
||||||
|
start time.Time,
|
||||||
|
podsToActivate *framework.PodsToActivate,
|
||||||
|
) (ScheduleResult, *framework.QueuedPodInfo, error) {
|
||||||
|
|
||||||
pod := podInfo.Pod
|
pod := podInfo.Pod
|
||||||
scheduleResult, err := sched.SchedulePod(ctx, fwk, state, pod)
|
scheduleResult, err := sched.SchedulePod(ctx, fwk, state, pod)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -139,22 +152,14 @@ func (sched *Scheduler) schedulingCycle(ctx context.Context, state *framework.Cy
|
|||||||
nominatingInfo = result.NominatingInfo
|
nominatingInfo = result.NominatingInfo
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Pod did not fit anywhere, so it is counted as a failure. If preemption
|
|
||||||
// succeeds, the pod should get counted as a success the next time we try to
|
|
||||||
// schedule it. (hopefully)
|
|
||||||
metrics.PodUnschedulable(fwk.ProfileName(), metrics.SinceInSeconds(start))
|
|
||||||
} else if err == ErrNoNodesAvailable {
|
} else if err == ErrNoNodesAvailable {
|
||||||
nominatingInfo = clearNominatedNode
|
nominatingInfo = clearNominatedNode
|
||||||
// No nodes available is counted as unschedulable rather than an error.
|
|
||||||
metrics.PodUnschedulable(fwk.ProfileName(), metrics.SinceInSeconds(start))
|
|
||||||
} else {
|
} else {
|
||||||
nominatingInfo = clearNominatedNode
|
|
||||||
klog.ErrorS(err, "Error selecting node for pod", "pod", klog.KObj(pod))
|
klog.ErrorS(err, "Error selecting node for pod", "pod", klog.KObj(pod))
|
||||||
metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))
|
nominatingInfo = clearNominatedNode
|
||||||
reason = v1.PodReasonSchedulerError
|
reason = v1.PodReasonSchedulerError
|
||||||
}
|
}
|
||||||
sched.FailureHandler(ctx, fwk, podInfo, err, reason, nominatingInfo)
|
return ScheduleResult{nominatingInfo: nominatingInfo, reason: reason}, podInfo, err
|
||||||
return ScheduleResult{}, nil
|
|
||||||
}
|
}
|
||||||
metrics.SchedulingAlgorithmLatency.Observe(metrics.SinceInSeconds(start))
|
metrics.SchedulingAlgorithmLatency.Observe(metrics.SinceInSeconds(start))
|
||||||
// Tell the cache to assume that a pod now is running on a given node, even though it hasn't been bound yet.
|
// Tell the cache to assume that a pod now is running on a given node, even though it hasn't been bound yet.
|
||||||
@ -164,14 +169,14 @@ func (sched *Scheduler) schedulingCycle(ctx context.Context, state *framework.Cy
|
|||||||
// assume modifies `assumedPod` by setting NodeName=scheduleResult.SuggestedHost
|
// assume modifies `assumedPod` by setting NodeName=scheduleResult.SuggestedHost
|
||||||
err = sched.assume(assumedPod, scheduleResult.SuggestedHost)
|
err = sched.assume(assumedPod, scheduleResult.SuggestedHost)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))
|
|
||||||
// This is most probably result of a BUG in retrying logic.
|
// This is most probably result of a BUG in retrying logic.
|
||||||
// We report an error here so that pod scheduling can be retried.
|
// We report an error here so that pod scheduling can be retried.
|
||||||
// This relies on the fact that Error will check if the pod has been bound
|
// This relies on the fact that Error will check if the pod has been bound
|
||||||
// to a node and if so will not add it back to the unscheduled pods queue
|
// to a node and if so will not add it back to the unscheduled pods queue
|
||||||
// (otherwise this would cause an infinite loop).
|
// (otherwise this would cause an infinite loop).
|
||||||
sched.FailureHandler(ctx, fwk, assumedPodInfo, err, v1.PodReasonSchedulerError, clearNominatedNode)
|
return ScheduleResult{nominatingInfo: clearNominatedNode, reason: v1.PodReasonSchedulerError},
|
||||||
return ScheduleResult{}, nil
|
assumedPodInfo,
|
||||||
|
err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Run the Reserve method of reserve plugins.
|
// Run the Reserve method of reserve plugins.
|
||||||
@ -182,31 +187,28 @@ func (sched *Scheduler) schedulingCycle(ctx context.Context, state *framework.Cy
|
|||||||
klog.ErrorS(forgetErr, "Scheduler cache ForgetPod failed")
|
klog.ErrorS(forgetErr, "Scheduler cache ForgetPod failed")
|
||||||
}
|
}
|
||||||
|
|
||||||
metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))
|
return ScheduleResult{nominatingInfo: clearNominatedNode, reason: v1.PodReasonSchedulerError},
|
||||||
sched.FailureHandler(ctx, fwk, assumedPodInfo, sts.AsError(), v1.PodReasonSchedulerError, clearNominatedNode)
|
assumedPodInfo,
|
||||||
return ScheduleResult{}, nil
|
sts.AsError()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Run "permit" plugins.
|
// Run "permit" plugins.
|
||||||
runPermitStatus := fwk.RunPermitPlugins(ctx, state, assumedPod, scheduleResult.SuggestedHost)
|
runPermitStatus := fwk.RunPermitPlugins(ctx, state, assumedPod, scheduleResult.SuggestedHost)
|
||||||
if !runPermitStatus.IsWait() && !runPermitStatus.IsSuccess() {
|
if !runPermitStatus.IsWait() && !runPermitStatus.IsSuccess() {
|
||||||
// One of the plugins returned status different from success or wait.
|
// trigger un-reserve to clean up state associated with the reserved Pod
|
||||||
fwk.RunReservePluginsUnreserve(ctx, state, assumedPod, scheduleResult.SuggestedHost)
|
fwk.RunReservePluginsUnreserve(ctx, state, assumedPod, scheduleResult.SuggestedHost)
|
||||||
if forgetErr := sched.Cache.ForgetPod(assumedPod); forgetErr != nil {
|
if forgetErr := sched.Cache.ForgetPod(assumedPod); forgetErr != nil {
|
||||||
klog.ErrorS(forgetErr, "Scheduler cache ForgetPod failed")
|
klog.ErrorS(forgetErr, "Scheduler cache ForgetPod failed")
|
||||||
}
|
}
|
||||||
|
|
||||||
var reason string
|
reason := v1.PodReasonSchedulerError
|
||||||
if runPermitStatus.IsUnschedulable() {
|
if runPermitStatus.IsUnschedulable() {
|
||||||
metrics.PodUnschedulable(fwk.ProfileName(), metrics.SinceInSeconds(start))
|
|
||||||
reason = v1.PodReasonUnschedulable
|
reason = v1.PodReasonUnschedulable
|
||||||
} else {
|
|
||||||
metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))
|
|
||||||
reason = v1.PodReasonSchedulerError
|
|
||||||
}
|
}
|
||||||
|
|
||||||
sched.FailureHandler(ctx, fwk, assumedPodInfo, runPermitStatus.AsError(), reason, clearNominatedNode)
|
return ScheduleResult{nominatingInfo: clearNominatedNode, reason: reason},
|
||||||
return ScheduleResult{}, nil
|
assumedPodInfo,
|
||||||
|
runPermitStatus.AsError()
|
||||||
}
|
}
|
||||||
|
|
||||||
// At the end of a successful scheduling cycle, pop and move up Pods if needed.
|
// At the end of a successful scheduling cycle, pop and move up Pods if needed.
|
||||||
@ -216,111 +218,36 @@ func (sched *Scheduler) schedulingCycle(ctx context.Context, state *framework.Cy
|
|||||||
podsToActivate.Map = make(map[string]*v1.Pod)
|
podsToActivate.Map = make(map[string]*v1.Pod)
|
||||||
}
|
}
|
||||||
|
|
||||||
return scheduleResult, assumedPodInfo
|
return scheduleResult, assumedPodInfo, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// bindingCycle tries to bind an assumed Pod.
|
// bindingCycle tries to bind an assumed Pod.
|
||||||
func (sched *Scheduler) bindingCycle(ctx context.Context, state *framework.CycleState, fwk framework.Framework, scheduleResult ScheduleResult, assumedPodInfo *framework.QueuedPodInfo, podsToActivate *framework.PodsToActivate, start time.Time) {
|
func (sched *Scheduler) bindingCycle(
|
||||||
|
ctx context.Context,
|
||||||
|
state *framework.CycleState,
|
||||||
|
fwk framework.Framework,
|
||||||
|
scheduleResult ScheduleResult,
|
||||||
|
assumedPodInfo *framework.QueuedPodInfo,
|
||||||
|
start time.Time,
|
||||||
|
podsToActivate *framework.PodsToActivate) *framework.Status {
|
||||||
|
|
||||||
assumedPod := assumedPodInfo.Pod
|
assumedPod := assumedPodInfo.Pod
|
||||||
|
|
||||||
waitOnPermitStatus := fwk.WaitOnPermit(ctx, assumedPod)
|
// Run "permit" plugins.
|
||||||
if !waitOnPermitStatus.IsSuccess() {
|
if status := fwk.WaitOnPermit(ctx, assumedPod); !status.IsSuccess() {
|
||||||
// trigger un-reserve plugins to clean up state associated with the reserved Pod
|
return status
|
||||||
fwk.RunReservePluginsUnreserve(ctx, state, assumedPod, scheduleResult.SuggestedHost)
|
|
||||||
if forgetErr := sched.Cache.ForgetPod(assumedPod); forgetErr != nil {
|
|
||||||
klog.ErrorS(forgetErr, "scheduler cache ForgetPod failed")
|
|
||||||
} else {
|
|
||||||
// "Forget"ing an assumed Pod in binding cycle should be treated as a PodDelete event,
|
|
||||||
// as the assumed Pod had occupied a certain amount of resources in scheduler cache.
|
|
||||||
// TODO(#103853): de-duplicate the logic.
|
|
||||||
// Avoid moving the assumed Pod itself as it's always Unschedulable.
|
|
||||||
// It's intentional to "defer" this operation; otherwise MoveAllToActiveOrBackoffQueue() would
|
|
||||||
// update `q.moveRequest` and thus move the assumed pod to backoffQ anyways.
|
|
||||||
if waitOnPermitStatus.IsUnschedulable() {
|
|
||||||
defer sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(internalqueue.AssignedPodDelete, func(pod *v1.Pod) bool {
|
|
||||||
return assumedPod.UID != pod.UID
|
|
||||||
})
|
|
||||||
} else {
|
|
||||||
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(internalqueue.AssignedPodDelete, nil)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
var reason string
|
|
||||||
if waitOnPermitStatus.IsUnschedulable() {
|
|
||||||
metrics.PodUnschedulable(fwk.ProfileName(), metrics.SinceInSeconds(start))
|
|
||||||
reason = v1.PodReasonUnschedulable
|
|
||||||
} else {
|
|
||||||
metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))
|
|
||||||
reason = v1.PodReasonSchedulerError
|
|
||||||
}
|
|
||||||
sched.FailureHandler(ctx, fwk, assumedPodInfo, waitOnPermitStatus.AsError(), reason, clearNominatedNode)
|
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Run "prebind" plugins.
|
// Run "prebind" plugins.
|
||||||
preBindStatus := fwk.RunPreBindPlugins(ctx, state, assumedPod, scheduleResult.SuggestedHost)
|
if status := fwk.RunPreBindPlugins(ctx, state, assumedPod, scheduleResult.SuggestedHost); !status.IsSuccess() {
|
||||||
if !preBindStatus.IsSuccess() {
|
return status
|
||||||
// trigger un-reserve plugins to clean up state associated with the reserved Pod
|
|
||||||
fwk.RunReservePluginsUnreserve(ctx, state, assumedPod, scheduleResult.SuggestedHost)
|
|
||||||
if forgetErr := sched.Cache.ForgetPod(assumedPod); forgetErr != nil {
|
|
||||||
klog.ErrorS(forgetErr, "scheduler cache ForgetPod failed")
|
|
||||||
} else {
|
|
||||||
// "Forget"ing an assumed Pod in binding cycle should be treated as a PodDelete event,
|
|
||||||
// as the assumed Pod had occupied a certain amount of resources in scheduler cache.
|
|
||||||
// TODO(#103853): de-duplicate the logic.
|
|
||||||
if preBindStatus.IsUnschedulable() {
|
|
||||||
defer sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(internalqueue.AssignedPodDelete, func(pod *v1.Pod) bool {
|
|
||||||
return assumedPod.UID != pod.UID
|
|
||||||
})
|
|
||||||
} else {
|
|
||||||
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(internalqueue.AssignedPodDelete, nil)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
var reason string
|
|
||||||
if preBindStatus.IsUnschedulable() {
|
|
||||||
metrics.PodUnschedulable(fwk.ProfileName(), metrics.SinceInSeconds(start))
|
|
||||||
reason = v1.PodReasonUnschedulable
|
|
||||||
} else {
|
|
||||||
metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))
|
|
||||||
reason = v1.PodReasonSchedulerError
|
|
||||||
}
|
|
||||||
|
|
||||||
sched.FailureHandler(ctx, fwk, assumedPodInfo, preBindStatus.AsError(), reason, clearNominatedNode)
|
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
|
||||||
bindStatus := sched.bind(ctx, fwk, assumedPod, scheduleResult.SuggestedHost, state)
|
// Run "bind" plugins.
|
||||||
if !bindStatus.IsSuccess() {
|
if status := sched.bind(ctx, fwk, assumedPod, scheduleResult.SuggestedHost, state); !status.IsSuccess() {
|
||||||
// trigger un-reserve plugins to clean up state associated with the reserved Pod
|
return status
|
||||||
fwk.RunReservePluginsUnreserve(ctx, state, assumedPod, scheduleResult.SuggestedHost)
|
|
||||||
if err := sched.Cache.ForgetPod(assumedPod); err != nil {
|
|
||||||
klog.ErrorS(err, "scheduler cache ForgetPod failed")
|
|
||||||
} else {
|
|
||||||
// "Forget"ing an assumed Pod in binding cycle should be treated as a PodDelete event,
|
|
||||||
// as the assumed Pod had occupied a certain amount of resources in scheduler cache.
|
|
||||||
// TODO(#103853): de-duplicate the logic.
|
|
||||||
if bindStatus.IsUnschedulable() {
|
|
||||||
defer sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(internalqueue.AssignedPodDelete, func(pod *v1.Pod) bool {
|
|
||||||
return assumedPod.UID != pod.UID
|
|
||||||
})
|
|
||||||
} else {
|
|
||||||
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(internalqueue.AssignedPodDelete, nil)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
var reason string
|
|
||||||
if bindStatus.IsUnschedulable() {
|
|
||||||
metrics.PodUnschedulable(fwk.ProfileName(), metrics.SinceInSeconds(start))
|
|
||||||
reason = v1.PodReasonUnschedulable
|
|
||||||
} else {
|
|
||||||
metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))
|
|
||||||
reason = v1.PodReasonSchedulerError
|
|
||||||
}
|
|
||||||
|
|
||||||
sched.FailureHandler(ctx, fwk, assumedPodInfo, fmt.Errorf("binding rejected: %w", bindStatus.AsError()), reason, clearNominatedNode)
|
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Calculating nodeResourceString can be heavy. Avoid it if klog verbosity is below 2.
|
// Calculating nodeResourceString can be heavy. Avoid it if klog verbosity is below 2.
|
||||||
klog.V(2).InfoS("Successfully bound pod to node", "pod", klog.KObj(assumedPod), "node", scheduleResult.SuggestedHost, "evaluatedNodes", scheduleResult.EvaluatedNodes, "feasibleNodes", scheduleResult.FeasibleNodes)
|
klog.V(2).InfoS("Successfully bound pod to node", "pod", klog.KObj(assumedPod), "node", scheduleResult.SuggestedHost, "evaluatedNodes", scheduleResult.EvaluatedNodes, "feasibleNodes", scheduleResult.FeasibleNodes)
|
||||||
metrics.PodScheduled(fwk.ProfileName(), metrics.SinceInSeconds(start))
|
metrics.PodScheduled(fwk.ProfileName(), metrics.SinceInSeconds(start))
|
||||||
@ -336,6 +263,46 @@ func (sched *Scheduler) bindingCycle(ctx context.Context, state *framework.Cycle
|
|||||||
// Unlike the logic in schedulingCycle(), we don't bother deleting the entries
|
// Unlike the logic in schedulingCycle(), we don't bother deleting the entries
|
||||||
// as `podsToActivate.Map` is no longer consumed.
|
// as `podsToActivate.Map` is no longer consumed.
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sched *Scheduler) handleBindingCycleError(
|
||||||
|
ctx context.Context,
|
||||||
|
state *framework.CycleState,
|
||||||
|
fwk framework.Framework,
|
||||||
|
podInfo *framework.QueuedPodInfo,
|
||||||
|
start time.Time,
|
||||||
|
scheduleResult ScheduleResult,
|
||||||
|
status *framework.Status) {
|
||||||
|
|
||||||
|
assumedPod := podInfo.Pod
|
||||||
|
// trigger un-reserve plugins to clean up state associated with the reserved Pod
|
||||||
|
fwk.RunReservePluginsUnreserve(ctx, state, assumedPod, scheduleResult.SuggestedHost)
|
||||||
|
if forgetErr := sched.Cache.ForgetPod(assumedPod); forgetErr != nil {
|
||||||
|
klog.ErrorS(forgetErr, "scheduler cache ForgetPod failed")
|
||||||
|
} else {
|
||||||
|
// "Forget"ing an assumed Pod in binding cycle should be treated as a PodDelete event,
|
||||||
|
// as the assumed Pod had occupied a certain amount of resources in scheduler cache.
|
||||||
|
//
|
||||||
|
// Avoid moving the assumed Pod itself as it's always Unschedulable.
|
||||||
|
// It's intentional to "defer" this operation; otherwise MoveAllToActiveOrBackoffQueue() would
|
||||||
|
// update `q.moveRequest` and thus move the assumed pod to backoffQ anyways.
|
||||||
|
if status.IsUnschedulable() {
|
||||||
|
defer sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(internalqueue.AssignedPodDelete, func(pod *v1.Pod) bool {
|
||||||
|
return assumedPod.UID != pod.UID
|
||||||
|
})
|
||||||
|
} else {
|
||||||
|
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(internalqueue.AssignedPodDelete, nil)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
reason := v1.PodReasonSchedulerError
|
||||||
|
if status.IsUnschedulable() {
|
||||||
|
reason = v1.PodReasonUnschedulable
|
||||||
|
}
|
||||||
|
|
||||||
|
sched.FailureHandler(ctx, fwk, podInfo, status.AsError(), reason, clearNominatedNode, start)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sched *Scheduler) frameworkForPod(pod *v1.Pod) (framework.Framework, error) {
|
func (sched *Scheduler) frameworkForPod(pod *v1.Pod) (framework.Framework, error) {
|
||||||
@ -870,7 +837,14 @@ func getAttemptsLabel(p *framework.QueuedPodInfo) string {
|
|||||||
|
|
||||||
// handleSchedulingFailure records an event for the pod that indicates the
|
// handleSchedulingFailure records an event for the pod that indicates the
|
||||||
// pod has failed to schedule. Also, update the pod condition and nominated node name if set.
|
// pod has failed to schedule. Also, update the pod condition and nominated node name if set.
|
||||||
func (sched *Scheduler) handleSchedulingFailure(ctx context.Context, fwk framework.Framework, podInfo *framework.QueuedPodInfo, err error, reason string, nominatingInfo *framework.NominatingInfo) {
|
func (sched *Scheduler) handleSchedulingFailure(ctx context.Context, fwk framework.Framework, podInfo *framework.QueuedPodInfo, err error, reason string, nominatingInfo *framework.NominatingInfo, start time.Time) {
|
||||||
|
switch reason {
|
||||||
|
case v1.PodReasonUnschedulable:
|
||||||
|
metrics.PodUnschedulable(fwk.ProfileName(), metrics.SinceInSeconds(start))
|
||||||
|
case v1.PodReasonSchedulerError:
|
||||||
|
metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))
|
||||||
|
}
|
||||||
|
|
||||||
pod := podInfo.Pod
|
pod := podInfo.Pod
|
||||||
var errMsg string
|
var errMsg string
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -527,7 +527,7 @@ func TestSchedulerScheduleOne(t *testing.T) {
|
|||||||
expectBind: &v1.Binding{ObjectMeta: metav1.ObjectMeta{Name: "foo", UID: types.UID("foo")}, Target: v1.ObjectReference{Kind: "Node", Name: testNode.Name}},
|
expectBind: &v1.Binding{ObjectMeta: metav1.ObjectMeta{Name: "foo", UID: types.UID("foo")}, Target: v1.ObjectReference{Kind: "Node", Name: testNode.Name}},
|
||||||
expectAssumedPod: podWithID("foo", testNode.Name),
|
expectAssumedPod: podWithID("foo", testNode.Name),
|
||||||
injectBindError: errB,
|
injectBindError: errB,
|
||||||
expectError: fmt.Errorf(`binding rejected: %w`, fmt.Errorf("running Bind plugin %q: %w", "DefaultBinder", errors.New("binder"))),
|
expectError: fmt.Errorf("running Bind plugin %q: %w", "DefaultBinder", errors.New("binder")),
|
||||||
expectErrorPod: podWithID("foo", testNode.Name),
|
expectErrorPod: podWithID("foo", testNode.Name),
|
||||||
expectForgetPod: podWithID("foo", testNode.Name),
|
expectForgetPod: podWithID("foo", testNode.Name),
|
||||||
eventReason: "FailedScheduling",
|
eventReason: "FailedScheduling",
|
||||||
@ -597,7 +597,7 @@ func TestSchedulerScheduleOne(t *testing.T) {
|
|||||||
sched.SchedulePod = func(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (ScheduleResult, error) {
|
sched.SchedulePod = func(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (ScheduleResult, error) {
|
||||||
return item.mockResult.result, item.mockResult.err
|
return item.mockResult.result, item.mockResult.err
|
||||||
}
|
}
|
||||||
sched.FailureHandler = func(_ context.Context, fwk framework.Framework, p *framework.QueuedPodInfo, err error, _ string, _ *framework.NominatingInfo) {
|
sched.FailureHandler = func(_ context.Context, fwk framework.Framework, p *framework.QueuedPodInfo, err error, _ string, _ *framework.NominatingInfo, _ time.Time) {
|
||||||
gotPod = p.Pod
|
gotPod = p.Pod
|
||||||
gotError = err
|
gotError = err
|
||||||
|
|
||||||
@ -2680,7 +2680,7 @@ func setupTestScheduler(ctx context.Context, t *testing.T, queuedPodStore *clien
|
|||||||
}
|
}
|
||||||
|
|
||||||
sched.SchedulePod = sched.schedulePod
|
sched.SchedulePod = sched.schedulePod
|
||||||
sched.FailureHandler = func(_ context.Context, _ framework.Framework, p *framework.QueuedPodInfo, err error, _ string, _ *framework.NominatingInfo) {
|
sched.FailureHandler = func(_ context.Context, _ framework.Framework, p *framework.QueuedPodInfo, err error, _ string, _ *framework.NominatingInfo, _ time.Time) {
|
||||||
errChan <- err
|
errChan <- err
|
||||||
|
|
||||||
msg := truncateMessage(err.Error())
|
msg := truncateMessage(err.Error())
|
||||||
|
@ -132,6 +132,11 @@ type ScheduleResult struct {
|
|||||||
EvaluatedNodes int
|
EvaluatedNodes int
|
||||||
// The number of nodes out of the evaluated ones that fit the pod.
|
// The number of nodes out of the evaluated ones that fit the pod.
|
||||||
FeasibleNodes int
|
FeasibleNodes int
|
||||||
|
|
||||||
|
// The reason records the failure in scheduling cycle.
|
||||||
|
reason string
|
||||||
|
// The nominating info for scheduling cycle.
|
||||||
|
nominatingInfo *framework.NominatingInfo
|
||||||
}
|
}
|
||||||
|
|
||||||
// WithComponentConfigVersion sets the component config version to the
|
// WithComponentConfigVersion sets the component config version to the
|
||||||
@ -420,7 +425,7 @@ func buildExtenders(extenders []schedulerapi.Extender, profiles []schedulerapi.K
|
|||||||
return fExtenders, nil
|
return fExtenders, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
type FailureHandlerFn func(ctx context.Context, fwk framework.Framework, podInfo *framework.QueuedPodInfo, err error, reason string, nominatingInfo *framework.NominatingInfo)
|
type FailureHandlerFn func(ctx context.Context, fwk framework.Framework, podInfo *framework.QueuedPodInfo, err error, reason string, nominatingInfo *framework.NominatingInfo, start time.Time)
|
||||||
|
|
||||||
func unionedGVKs(m map[framework.ClusterEvent]sets.String) map[framework.GVK]framework.ActionType {
|
func unionedGVKs(m map[framework.ClusterEvent]sets.String) map[framework.GVK]framework.ActionType {
|
||||||
gvkMap := make(map[framework.GVK]framework.ActionType)
|
gvkMap := make(map[framework.GVK]framework.ActionType)
|
||||||
|
@ -294,7 +294,7 @@ func TestFailureHandler(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
testPodInfo := &framework.QueuedPodInfo{PodInfo: mustNewPodInfo(t, testPod)}
|
testPodInfo := &framework.QueuedPodInfo{PodInfo: mustNewPodInfo(t, testPod)}
|
||||||
s.FailureHandler(ctx, fwk, testPodInfo, tt.injectErr, v1.PodReasonUnschedulable, nil)
|
s.FailureHandler(ctx, fwk, testPodInfo, tt.injectErr, v1.PodReasonUnschedulable, nil, time.Now())
|
||||||
|
|
||||||
var got *v1.Pod
|
var got *v1.Pod
|
||||||
if tt.podUpdatedDuringScheduling {
|
if tt.podUpdatedDuringScheduling {
|
||||||
@ -369,7 +369,7 @@ func TestFailureHandler_NodeNotFound(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
testPodInfo := &framework.QueuedPodInfo{PodInfo: mustNewPodInfo(t, testPod)}
|
testPodInfo := &framework.QueuedPodInfo{PodInfo: mustNewPodInfo(t, testPod)}
|
||||||
s.FailureHandler(ctx, fwk, testPodInfo, tt.injectErr, v1.PodReasonUnschedulable, nil)
|
s.FailureHandler(ctx, fwk, testPodInfo, tt.injectErr, v1.PodReasonUnschedulable, nil, time.Now())
|
||||||
|
|
||||||
gotNodes := schedulerCache.Dump().Nodes
|
gotNodes := schedulerCache.Dump().Nodes
|
||||||
gotNodeNames := sets.NewString()
|
gotNodeNames := sets.NewString()
|
||||||
@ -408,7 +408,7 @@ func TestFailureHandler_PodAlreadyBound(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
testPodInfo := &framework.QueuedPodInfo{PodInfo: mustNewPodInfo(t, testPod)}
|
testPodInfo := &framework.QueuedPodInfo{PodInfo: mustNewPodInfo(t, testPod)}
|
||||||
s.FailureHandler(ctx, fwk, testPodInfo, fmt.Errorf("binding rejected: timeout"), v1.PodReasonUnschedulable, nil)
|
s.FailureHandler(ctx, fwk, testPodInfo, fmt.Errorf("binding rejected: timeout"), v1.PodReasonUnschedulable, nil, time.Now())
|
||||||
|
|
||||||
pod := getPodFromPriorityQueue(queue, testPod)
|
pod := getPodFromPriorityQueue(queue, testPod)
|
||||||
if pod != nil {
|
if pod != nil {
|
||||||
|
@ -106,7 +106,7 @@ func TestCoreResourceEnqueue(t *testing.T) {
|
|||||||
if fitError == nil {
|
if fitError == nil {
|
||||||
t.Fatalf("Expect Pod %v to fail at scheduling.", podInfo.Pod.Name)
|
t.Fatalf("Expect Pod %v to fail at scheduling.", podInfo.Pod.Name)
|
||||||
}
|
}
|
||||||
testCtx.Scheduler.FailureHandler(ctx, fwk, podInfo, fitError, v1.PodReasonUnschedulable, nil)
|
testCtx.Scheduler.FailureHandler(ctx, fwk, podInfo, fitError, v1.PodReasonUnschedulable, nil, time.Now())
|
||||||
}
|
}
|
||||||
|
|
||||||
// Trigger a NodeTaintChange event.
|
// Trigger a NodeTaintChange event.
|
||||||
@ -282,7 +282,7 @@ func TestCustomResourceEnqueue(t *testing.T) {
|
|||||||
if fitError == nil {
|
if fitError == nil {
|
||||||
t.Fatalf("Expect Pod %v to fail at scheduling.", podInfo.Pod.Name)
|
t.Fatalf("Expect Pod %v to fail at scheduling.", podInfo.Pod.Name)
|
||||||
}
|
}
|
||||||
testCtx.Scheduler.FailureHandler(ctx, fwk, podInfo, fitError, v1.PodReasonUnschedulable, nil)
|
testCtx.Scheduler.FailureHandler(ctx, fwk, podInfo, fitError, v1.PodReasonUnschedulable, nil, time.Now())
|
||||||
|
|
||||||
// Scheduling cycle is incremented from 0 to 1 after NextPod() is called, so
|
// Scheduling cycle is incremented from 0 to 1 after NextPod() is called, so
|
||||||
// pass a number larger than 1 to move Pod to unschedulablePods.
|
// pass a number larger than 1 to move Pod to unschedulablePods.
|
||||||
|
Loading…
Reference in New Issue
Block a user