clear pod's .status.nominatedNodeName when necessary

This commit is contained in:
Wei Huang
2021-12-16 10:54:58 -08:00
parent eb43b41cfd
commit 2433b083a9
12 changed files with 343 additions and 205 deletions

View File

@@ -299,7 +299,7 @@ func (sched *Scheduler) Run(ctx context.Context) {
// recordSchedulingFailure records an event for the pod that indicates the
// pod has failed to schedule. Also, update the pod condition and nominated node name if set.
func (sched *Scheduler) recordSchedulingFailure(fwk framework.Framework, podInfo *framework.QueuedPodInfo, err error, reason string, nominatedNode string) {
func (sched *Scheduler) recordSchedulingFailure(fwk framework.Framework, podInfo *framework.QueuedPodInfo, err error, reason string, nominatingInfo *framework.NominatingInfo) {
sched.Error(podInfo, err)
// Update the scheduling queue with the nominated pod information. Without
@@ -307,7 +307,7 @@ func (sched *Scheduler) recordSchedulingFailure(fwk framework.Framework, podInfo
// and the time the scheduler receives a Pod Update for the nominated pod.
// Here we check for nil only for tests.
if sched.SchedulingQueue != nil {
sched.SchedulingQueue.AddNominatedPod(podInfo.PodInfo, nominatedNode)
sched.SchedulingQueue.AddNominatedPod(podInfo.PodInfo, nominatingInfo)
}
pod := podInfo.Pod
@@ -318,7 +318,7 @@ func (sched *Scheduler) recordSchedulingFailure(fwk framework.Framework, podInfo
Status: v1.ConditionFalse,
Reason: reason,
Message: err.Error(),
}, nominatedNode); err != nil {
}, nominatingInfo); err != nil {
klog.ErrorS(err, "Error updating pod", "pod", klog.KObj(pod))
}
}
@@ -333,17 +333,17 @@ func truncateMessage(message string) string {
return message[:max-len(suffix)] + suffix
}
func updatePod(client clientset.Interface, pod *v1.Pod, condition *v1.PodCondition, nominatedNode string) error {
func updatePod(client clientset.Interface, pod *v1.Pod, condition *v1.PodCondition, nominatingInfo *framework.NominatingInfo) error {
klog.V(3).InfoS("Updating pod condition", "pod", klog.KObj(pod), "conditionType", condition.Type, "conditionStatus", condition.Status, "conditionReason", condition.Reason)
podStatusCopy := pod.Status.DeepCopy()
// NominatedNodeName is updated only if we are trying to set it, and the value is
// different from the existing one.
if !podutil.UpdatePodCondition(podStatusCopy, condition) &&
(len(nominatedNode) == 0 || pod.Status.NominatedNodeName == nominatedNode) {
nnnNeedsUpdate := nominatingInfo.Mode() == framework.ModeOverride && pod.Status.NominatedNodeName != nominatingInfo.NominatedNodeName
if !podutil.UpdatePodCondition(podStatusCopy, condition) && !nnnNeedsUpdate {
return nil
}
if nominatedNode != "" {
podStatusCopy.NominatedNodeName = nominatedNode
if nnnNeedsUpdate {
podStatusCopy.NominatedNodeName = nominatingInfo.NominatedNodeName
}
return util.PatchPodStatus(client, pod, podStatusCopy)
}
@@ -417,6 +417,10 @@ func (sched *Scheduler) finishBinding(fwk framework.Framework, assumed *v1.Pod,
fwk.EventRecorder().Eventf(assumed, nil, v1.EventTypeNormal, "Scheduled", "Binding", "Successfully assigned %v/%v to %v", assumed.Namespace, assumed.Name, targetNode)
}
var (
clearNominatedNode = &framework.NominatingInfo{NominatingMode: framework.ModeOverride, NominatedNodeName: ""}
)
// scheduleOne does the entire scheduling workflow for a single pod. It is serialized on the scheduling algorithm's host fitting.
func (sched *Scheduler) scheduleOne(ctx context.Context) {
podInfo := sched.NextPod()
@@ -454,7 +458,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
// preempt, with the expectation that the next time the pod is tried for scheduling it
// will fit due to the preemption. It is also possible that a different pod will schedule
// into the resources that were preempted, but this is harmless.
nominatedNode := ""
var nominatingInfo *framework.NominatingInfo
if fitError, ok := err.(*framework.FitError); ok {
if !fwk.HasPostFilterPlugins() {
klog.V(3).InfoS("No PostFilter plugins are registered, so no preemption will be performed")
@@ -466,8 +470,8 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
} else {
klog.V(5).InfoS("Status after running PostFilter plugins for pod", "pod", klog.KObj(pod), "status", status)
}
if status.IsSuccess() && result != nil {
nominatedNode = result.NominatedNodeName
if result != nil {
nominatingInfo = result.NominatingInfo
}
}
// Pod did not fit anywhere, so it is counted as a failure. If preemption
@@ -475,13 +479,15 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
// schedule it. (hopefully)
metrics.PodUnschedulable(fwk.ProfileName(), metrics.SinceInSeconds(start))
} else if err == ErrNoNodesAvailable {
nominatingInfo = clearNominatedNode
// No nodes available is counted as unschedulable rather than an error.
metrics.PodUnschedulable(fwk.ProfileName(), metrics.SinceInSeconds(start))
} else {
nominatingInfo = clearNominatedNode
klog.ErrorS(err, "Error selecting node for pod", "pod", klog.KObj(pod))
metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))
}
sched.recordSchedulingFailure(fwk, podInfo, err, v1.PodReasonUnschedulable, nominatedNode)
sched.recordSchedulingFailure(fwk, podInfo, err, v1.PodReasonUnschedulable, nominatingInfo)
return
}
metrics.SchedulingAlgorithmLatency.Observe(metrics.SinceInSeconds(start))
@@ -498,7 +504,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
// This relies on the fact that Error will check if the pod has been bound
// to a node and if so will not add it back to the unscheduled pods queue
// (otherwise this would cause an infinite loop).
sched.recordSchedulingFailure(fwk, assumedPodInfo, err, SchedulerError, "")
sched.recordSchedulingFailure(fwk, assumedPodInfo, err, SchedulerError, clearNominatedNode)
return
}
@@ -510,7 +516,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
if forgetErr := sched.SchedulerCache.ForgetPod(assumedPod); forgetErr != nil {
klog.ErrorS(forgetErr, "Scheduler cache ForgetPod failed")
}
sched.recordSchedulingFailure(fwk, assumedPodInfo, sts.AsError(), SchedulerError, "")
sched.recordSchedulingFailure(fwk, assumedPodInfo, sts.AsError(), SchedulerError, clearNominatedNode)
return
}
@@ -530,7 +536,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
if forgetErr := sched.SchedulerCache.ForgetPod(assumedPod); forgetErr != nil {
klog.ErrorS(forgetErr, "Scheduler cache ForgetPod failed")
}
sched.recordSchedulingFailure(fwk, assumedPodInfo, runPermitStatus.AsError(), reason, "")
sched.recordSchedulingFailure(fwk, assumedPodInfo, runPermitStatus.AsError(), reason, clearNominatedNode)
return
}
@@ -573,7 +579,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
return assumedPod.UID != pod.UID
})
}
sched.recordSchedulingFailure(fwk, assumedPodInfo, waitOnPermitStatus.AsError(), reason, "")
sched.recordSchedulingFailure(fwk, assumedPodInfo, waitOnPermitStatus.AsError(), reason, clearNominatedNode)
return
}
@@ -591,7 +597,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
// TODO(#103853): de-duplicate the logic.
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(internalqueue.AssignedPodDelete, nil)
}
sched.recordSchedulingFailure(fwk, assumedPodInfo, preBindStatus.AsError(), SchedulerError, "")
sched.recordSchedulingFailure(fwk, assumedPodInfo, preBindStatus.AsError(), SchedulerError, clearNominatedNode)
return
}
@@ -608,7 +614,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
// TODO(#103853): de-duplicate the logic.
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(internalqueue.AssignedPodDelete, nil)
}
sched.recordSchedulingFailure(fwk, assumedPodInfo, fmt.Errorf("binding rejected: %w", err), SchedulerError, "")
sched.recordSchedulingFailure(fwk, assumedPodInfo, fmt.Errorf("binding rejected: %w", err), SchedulerError, clearNominatedNode)
} else {
// Calculating nodeResourceString can be heavy. Avoid it if klog verbosity is below 2.
if klog.V(2).Enabled() {