diff --git a/pkg/scheduler/framework/preemption/preemption.go b/pkg/scheduler/framework/preemption/preemption.go index 9b9f4bebfa8..af5e46d1ae2 100644 --- a/pkg/scheduler/framework/preemption/preemption.go +++ b/pkg/scheduler/framework/preemption/preemption.go @@ -186,7 +186,7 @@ func (ev *Evaluator) Preempt(ctx context.Context, pod *v1.Pod, m framework.NodeT } // 5) Perform preparation work before nominating the selected candidate. - if status := ev.prepareCandidate(bestCandidate, pod, ev.PluginName); !status.IsSuccess() { + if status := ev.prepareCandidate(ctx, bestCandidate, pod, ev.PluginName); !status.IsSuccess() { return nil, status } @@ -207,7 +207,7 @@ func (ev *Evaluator) findCandidates(ctx context.Context, pod *v1.Pod, m framewor if len(potentialNodes) == 0 { klog.V(3).InfoS("Preemption will not help schedule pod on any node", "pod", klog.KObj(pod)) // In this case, we should clean-up any existing nominated node name of the pod. - if err := util.ClearNominatedNodeName(ev.Handler.ClientSet(), pod); err != nil { + if err := util.ClearNominatedNodeName(ctx, ev.Handler.ClientSet(), pod); err != nil { klog.ErrorS(err, "Cannot clear 'NominatedNodeName' field of pod", "pod", klog.KObj(pod)) // We do not return as this error is not critical. } @@ -328,7 +328,7 @@ func (ev *Evaluator) SelectCandidate(candidates []Candidate) Candidate { // - Evict the victim pods // - Reject the victim pods if they are in waitingPod map // - Clear the low-priority pods' nominatedNodeName status if needed -func (ev *Evaluator) prepareCandidate(c Candidate, pod *v1.Pod, pluginName string) *framework.Status { +func (ev *Evaluator) prepareCandidate(ctx context.Context, c Candidate, pod *v1.Pod, pluginName string) *framework.Status { fh := ev.Handler cs := ev.Handler.ClientSet() for _, victim := range c.Victims().Pods { @@ -336,7 +336,7 @@ func (ev *Evaluator) prepareCandidate(c Candidate, pod *v1.Pod, pluginName strin // Otherwise we should delete the victim. if waitingPod := fh.GetWaitingPod(victim.UID); waitingPod != nil { waitingPod.Reject(pluginName, "preempted") - } else if err := util.DeletePod(cs, victim); err != nil { + } else if err := util.DeletePod(ctx, cs, victim); err != nil { klog.ErrorS(err, "Preempting pod", "pod", klog.KObj(victim), "preemptor", klog.KObj(pod)) return framework.AsStatus(err) } @@ -350,7 +350,7 @@ func (ev *Evaluator) prepareCandidate(c Candidate, pod *v1.Pod, pluginName strin // nomination updates these pods and moves them to the active queue. It // lets scheduler find another place for them. nominatedPods := getLowerPriorityNominatedPods(fh, pod, c.Name()) - if err := util.ClearNominatedNodeName(cs, nominatedPods...); err != nil { + if err := util.ClearNominatedNodeName(ctx, cs, nominatedPods...); err != nil { klog.ErrorS(err, "Cannot clear 'NominatedNodeName' field") // We do not return as this error is not critical. } diff --git a/pkg/scheduler/schedule_one.go b/pkg/scheduler/schedule_one.go index 584c31a8f76..dbb739778f6 100644 --- a/pkg/scheduler/schedule_one.go +++ b/pkg/scheduler/schedule_one.go @@ -129,7 +129,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) { klog.ErrorS(err, "Error selecting node for pod", "pod", klog.KObj(pod)) metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start)) } - sched.handleSchedulingFailure(fwk, podInfo, err, v1.PodReasonUnschedulable, nominatingInfo) + sched.handleSchedulingFailure(ctx, fwk, podInfo, err, v1.PodReasonUnschedulable, nominatingInfo) return } metrics.SchedulingAlgorithmLatency.Observe(metrics.SinceInSeconds(start)) @@ -146,7 +146,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) { // This relies on the fact that Error will check if the pod has been bound // to a node and if so will not add it back to the unscheduled pods queue // (otherwise this would cause an infinite loop). - sched.handleSchedulingFailure(fwk, assumedPodInfo, err, SchedulerError, clearNominatedNode) + sched.handleSchedulingFailure(ctx, fwk, assumedPodInfo, err, SchedulerError, clearNominatedNode) return } @@ -158,7 +158,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) { if forgetErr := sched.Cache.ForgetPod(assumedPod); forgetErr != nil { klog.ErrorS(forgetErr, "Scheduler cache ForgetPod failed") } - sched.handleSchedulingFailure(fwk, assumedPodInfo, sts.AsError(), SchedulerError, clearNominatedNode) + sched.handleSchedulingFailure(ctx, fwk, assumedPodInfo, sts.AsError(), SchedulerError, clearNominatedNode) return } @@ -178,7 +178,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) { if forgetErr := sched.Cache.ForgetPod(assumedPod); forgetErr != nil { klog.ErrorS(forgetErr, "Scheduler cache ForgetPod failed") } - sched.handleSchedulingFailure(fwk, assumedPodInfo, runPermitStatus.AsError(), reason, clearNominatedNode) + sched.handleSchedulingFailure(ctx, fwk, assumedPodInfo, runPermitStatus.AsError(), reason, clearNominatedNode) return } @@ -221,7 +221,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) { return assumedPod.UID != pod.UID }) } - sched.handleSchedulingFailure(fwk, assumedPodInfo, waitOnPermitStatus.AsError(), reason, clearNominatedNode) + sched.handleSchedulingFailure(ctx, fwk, assumedPodInfo, waitOnPermitStatus.AsError(), reason, clearNominatedNode) return } @@ -239,7 +239,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) { // TODO(#103853): de-duplicate the logic. sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(internalqueue.AssignedPodDelete, nil) } - sched.handleSchedulingFailure(fwk, assumedPodInfo, preBindStatus.AsError(), SchedulerError, clearNominatedNode) + sched.handleSchedulingFailure(ctx, fwk, assumedPodInfo, preBindStatus.AsError(), SchedulerError, clearNominatedNode) return } @@ -256,7 +256,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) { // TODO(#103853): de-duplicate the logic. sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(internalqueue.AssignedPodDelete, nil) } - sched.handleSchedulingFailure(fwk, assumedPodInfo, fmt.Errorf("binding rejected: %w", err), SchedulerError, clearNominatedNode) + sched.handleSchedulingFailure(ctx, fwk, assumedPodInfo, fmt.Errorf("binding rejected: %w", err), SchedulerError, clearNominatedNode) return } // Calculating nodeResourceString can be heavy. Avoid it if klog verbosity is below 2. @@ -809,7 +809,7 @@ func getAttemptsLabel(p *framework.QueuedPodInfo) string { // handleSchedulingFailure records an event for the pod that indicates the // pod has failed to schedule. Also, update the pod condition and nominated node name if set. -func (sched *Scheduler) handleSchedulingFailure(fwk framework.Framework, podInfo *framework.QueuedPodInfo, err error, reason string, nominatingInfo *framework.NominatingInfo) { +func (sched *Scheduler) handleSchedulingFailure(ctx context.Context, fwk framework.Framework, podInfo *framework.QueuedPodInfo, err error, reason string, nominatingInfo *framework.NominatingInfo) { sched.Error(podInfo, err) // Update the scheduling queue with the nominated pod information. Without @@ -823,7 +823,7 @@ func (sched *Scheduler) handleSchedulingFailure(fwk framework.Framework, podInfo pod := podInfo.Pod msg := truncateMessage(err.Error()) fwk.EventRecorder().Eventf(pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", msg) - if err := updatePod(sched.client, pod, &v1.PodCondition{ + if err := updatePod(ctx, sched.client, pod, &v1.PodCondition{ Type: v1.PodScheduled, Status: v1.ConditionFalse, Reason: reason, @@ -843,7 +843,7 @@ func truncateMessage(message string) string { return message[:max-len(suffix)] + suffix } -func updatePod(client clientset.Interface, pod *v1.Pod, condition *v1.PodCondition, nominatingInfo *framework.NominatingInfo) error { +func updatePod(ctx context.Context, client clientset.Interface, pod *v1.Pod, condition *v1.PodCondition, nominatingInfo *framework.NominatingInfo) error { klog.V(3).InfoS("Updating pod condition", "pod", klog.KObj(pod), "conditionType", condition.Type, "conditionStatus", condition.Status, "conditionReason", condition.Reason) podStatusCopy := pod.Status.DeepCopy() // NominatedNodeName is updated only if we are trying to set it, and the value is @@ -855,5 +855,5 @@ func updatePod(client clientset.Interface, pod *v1.Pod, condition *v1.PodConditi if nnnNeedsUpdate { podStatusCopy.NominatedNodeName = nominatingInfo.NominatedNodeName } - return util.PatchPodStatus(client, pod, podStatusCopy) + return util.PatchPodStatus(ctx, client, pod, podStatusCopy) } diff --git a/pkg/scheduler/schedule_one_test.go b/pkg/scheduler/schedule_one_test.go index 8bc6a24dcf6..c563e6c8952 100644 --- a/pkg/scheduler/schedule_one_test.go +++ b/pkg/scheduler/schedule_one_test.go @@ -1237,7 +1237,9 @@ func TestUpdatePod(t *testing.T) { pod := st.MakePod().Name("foo").NominatedNodeName(test.currentNominatedNodeName).Conditions(test.currentPodConditions).Obj() - if err := updatePod(cs, pod, test.newPodCondition, test.newNominatingInfo); err != nil { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + if err := updatePod(ctx, cs, pod, test.newPodCondition, test.newNominatingInfo); err != nil { t.Fatalf("Error calling update: %v", err) } diff --git a/pkg/scheduler/util/utils.go b/pkg/scheduler/util/utils.go index 3d4656c6e21..929dfb8fa44 100644 --- a/pkg/scheduler/util/utils.go +++ b/pkg/scheduler/util/utils.go @@ -92,7 +92,7 @@ func MoreImportantPod(pod1, pod2 *v1.Pod) bool { // PatchPodStatus calculates the delta bytes change from to , // and then submit a request to API server to patch the pod changes. -func PatchPodStatus(cs kubernetes.Interface, old *v1.Pod, newStatus *v1.PodStatus) error { +func PatchPodStatus(ctx context.Context, cs kubernetes.Interface, old *v1.Pod, newStatus *v1.PodStatus) error { if newStatus == nil { return nil } @@ -115,18 +115,18 @@ func PatchPodStatus(cs kubernetes.Interface, old *v1.Pod, newStatus *v1.PodStatu return nil } - _, err = cs.CoreV1().Pods(old.Namespace).Patch(context.TODO(), old.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}, "status") + _, err = cs.CoreV1().Pods(old.Namespace).Patch(ctx, old.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}, "status") return err } // DeletePod deletes the given from API server -func DeletePod(cs kubernetes.Interface, pod *v1.Pod) error { - return cs.CoreV1().Pods(pod.Namespace).Delete(context.TODO(), pod.Name, metav1.DeleteOptions{}) +func DeletePod(ctx context.Context, cs kubernetes.Interface, pod *v1.Pod) error { + return cs.CoreV1().Pods(pod.Namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{}) } // ClearNominatedNodeName internally submit a patch request to API server // to set each pods[*].Status.NominatedNodeName> to "". -func ClearNominatedNodeName(cs kubernetes.Interface, pods ...*v1.Pod) utilerrors.Aggregate { +func ClearNominatedNodeName(ctx context.Context, cs kubernetes.Interface, pods ...*v1.Pod) utilerrors.Aggregate { var errs []error for _, p := range pods { if len(p.Status.NominatedNodeName) == 0 { @@ -134,7 +134,7 @@ func ClearNominatedNodeName(cs kubernetes.Interface, pods ...*v1.Pod) utilerrors } podStatusCopy := p.Status.DeepCopy() podStatusCopy.NominatedNodeName = "" - if err := PatchPodStatus(cs, p, podStatusCopy); err != nil { + if err := PatchPodStatus(ctx, cs, p, podStatusCopy); err != nil { errs = append(errs, err) } } diff --git a/pkg/scheduler/util/utils_test.go b/pkg/scheduler/util/utils_test.go index ef0c77360da..a97f2a10546 100644 --- a/pkg/scheduler/util/utils_test.go +++ b/pkg/scheduler/util/utils_test.go @@ -19,10 +19,11 @@ package util import ( "context" "fmt" - "github.com/google/go-cmp/cmp" "testing" "time" + "github.com/google/go-cmp/cmp" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -161,7 +162,9 @@ func TestRemoveNominatedNodeName(t *testing.T) { Status: v1.PodStatus{NominatedNodeName: test.currentNominatedNodeName}, } - if err := ClearNominatedNodeName(cs, pod); err != nil { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + if err := ClearNominatedNodeName(ctx, cs, pod); err != nil { t.Fatalf("Error calling removeNominatedNodeName: %v", err) } @@ -236,12 +239,14 @@ func TestPatchPodStatus(t *testing.T) { t.Fatal(err) } - err = PatchPodStatus(client, &tc.pod, &tc.statusToUpdate) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + err = PatchPodStatus(ctx, client, &tc.pod, &tc.statusToUpdate) if err != nil { t.Fatal(err) } - retrievedPod, err := client.CoreV1().Pods(tc.pod.Namespace).Get(context.TODO(), tc.pod.Name, metav1.GetOptions{}) + retrievedPod, err := client.CoreV1().Pods(tc.pod.Namespace).Get(ctx, tc.pod.Name, metav1.GetOptions{}) if err != nil { t.Fatal(err) }