From 67b0ce87feec5283bdf974e27f9ce019cbebcb84 Mon Sep 17 00:00:00 2001 From: kerthcet Date: Wed, 25 May 2022 10:14:40 +0800 Subject: [PATCH] Using inherited context in utils This PR focus on utils in scheduler. When to patch pod status or delete pod, it is better to use an inherited context. Although this do not take a big effect in preventing goroutine leak, but it is a best practice Signed-off-by: kerthcet --- .../framework/preemption/preemption.go | 10 ++++----- pkg/scheduler/schedule_one.go | 22 +++++++++---------- pkg/scheduler/schedule_one_test.go | 4 +++- pkg/scheduler/util/utils.go | 12 +++++----- pkg/scheduler/util/utils_test.go | 13 +++++++---- 5 files changed, 34 insertions(+), 27 deletions(-) diff --git a/pkg/scheduler/framework/preemption/preemption.go b/pkg/scheduler/framework/preemption/preemption.go index 9b9f4bebfa8..af5e46d1ae2 100644 --- a/pkg/scheduler/framework/preemption/preemption.go +++ b/pkg/scheduler/framework/preemption/preemption.go @@ -186,7 +186,7 @@ func (ev *Evaluator) Preempt(ctx context.Context, pod *v1.Pod, m framework.NodeT } // 5) Perform preparation work before nominating the selected candidate. - if status := ev.prepareCandidate(bestCandidate, pod, ev.PluginName); !status.IsSuccess() { + if status := ev.prepareCandidate(ctx, bestCandidate, pod, ev.PluginName); !status.IsSuccess() { return nil, status } @@ -207,7 +207,7 @@ func (ev *Evaluator) findCandidates(ctx context.Context, pod *v1.Pod, m framewor if len(potentialNodes) == 0 { klog.V(3).InfoS("Preemption will not help schedule pod on any node", "pod", klog.KObj(pod)) // In this case, we should clean-up any existing nominated node name of the pod. - if err := util.ClearNominatedNodeName(ev.Handler.ClientSet(), pod); err != nil { + if err := util.ClearNominatedNodeName(ctx, ev.Handler.ClientSet(), pod); err != nil { klog.ErrorS(err, "Cannot clear 'NominatedNodeName' field of pod", "pod", klog.KObj(pod)) // We do not return as this error is not critical. } @@ -328,7 +328,7 @@ func (ev *Evaluator) SelectCandidate(candidates []Candidate) Candidate { // - Evict the victim pods // - Reject the victim pods if they are in waitingPod map // - Clear the low-priority pods' nominatedNodeName status if needed -func (ev *Evaluator) prepareCandidate(c Candidate, pod *v1.Pod, pluginName string) *framework.Status { +func (ev *Evaluator) prepareCandidate(ctx context.Context, c Candidate, pod *v1.Pod, pluginName string) *framework.Status { fh := ev.Handler cs := ev.Handler.ClientSet() for _, victim := range c.Victims().Pods { @@ -336,7 +336,7 @@ func (ev *Evaluator) prepareCandidate(c Candidate, pod *v1.Pod, pluginName strin // Otherwise we should delete the victim. if waitingPod := fh.GetWaitingPod(victim.UID); waitingPod != nil { waitingPod.Reject(pluginName, "preempted") - } else if err := util.DeletePod(cs, victim); err != nil { + } else if err := util.DeletePod(ctx, cs, victim); err != nil { klog.ErrorS(err, "Preempting pod", "pod", klog.KObj(victim), "preemptor", klog.KObj(pod)) return framework.AsStatus(err) } @@ -350,7 +350,7 @@ func (ev *Evaluator) prepareCandidate(c Candidate, pod *v1.Pod, pluginName strin // nomination updates these pods and moves them to the active queue. It // lets scheduler find another place for them. nominatedPods := getLowerPriorityNominatedPods(fh, pod, c.Name()) - if err := util.ClearNominatedNodeName(cs, nominatedPods...); err != nil { + if err := util.ClearNominatedNodeName(ctx, cs, nominatedPods...); err != nil { klog.ErrorS(err, "Cannot clear 'NominatedNodeName' field") // We do not return as this error is not critical. } diff --git a/pkg/scheduler/schedule_one.go b/pkg/scheduler/schedule_one.go index 584c31a8f76..dbb739778f6 100644 --- a/pkg/scheduler/schedule_one.go +++ b/pkg/scheduler/schedule_one.go @@ -129,7 +129,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) { klog.ErrorS(err, "Error selecting node for pod", "pod", klog.KObj(pod)) metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start)) } - sched.handleSchedulingFailure(fwk, podInfo, err, v1.PodReasonUnschedulable, nominatingInfo) + sched.handleSchedulingFailure(ctx, fwk, podInfo, err, v1.PodReasonUnschedulable, nominatingInfo) return } metrics.SchedulingAlgorithmLatency.Observe(metrics.SinceInSeconds(start)) @@ -146,7 +146,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) { // This relies on the fact that Error will check if the pod has been bound // to a node and if so will not add it back to the unscheduled pods queue // (otherwise this would cause an infinite loop). - sched.handleSchedulingFailure(fwk, assumedPodInfo, err, SchedulerError, clearNominatedNode) + sched.handleSchedulingFailure(ctx, fwk, assumedPodInfo, err, SchedulerError, clearNominatedNode) return } @@ -158,7 +158,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) { if forgetErr := sched.Cache.ForgetPod(assumedPod); forgetErr != nil { klog.ErrorS(forgetErr, "Scheduler cache ForgetPod failed") } - sched.handleSchedulingFailure(fwk, assumedPodInfo, sts.AsError(), SchedulerError, clearNominatedNode) + sched.handleSchedulingFailure(ctx, fwk, assumedPodInfo, sts.AsError(), SchedulerError, clearNominatedNode) return } @@ -178,7 +178,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) { if forgetErr := sched.Cache.ForgetPod(assumedPod); forgetErr != nil { klog.ErrorS(forgetErr, "Scheduler cache ForgetPod failed") } - sched.handleSchedulingFailure(fwk, assumedPodInfo, runPermitStatus.AsError(), reason, clearNominatedNode) + sched.handleSchedulingFailure(ctx, fwk, assumedPodInfo, runPermitStatus.AsError(), reason, clearNominatedNode) return } @@ -221,7 +221,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) { return assumedPod.UID != pod.UID }) } - sched.handleSchedulingFailure(fwk, assumedPodInfo, waitOnPermitStatus.AsError(), reason, clearNominatedNode) + sched.handleSchedulingFailure(ctx, fwk, assumedPodInfo, waitOnPermitStatus.AsError(), reason, clearNominatedNode) return } @@ -239,7 +239,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) { // TODO(#103853): de-duplicate the logic. sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(internalqueue.AssignedPodDelete, nil) } - sched.handleSchedulingFailure(fwk, assumedPodInfo, preBindStatus.AsError(), SchedulerError, clearNominatedNode) + sched.handleSchedulingFailure(ctx, fwk, assumedPodInfo, preBindStatus.AsError(), SchedulerError, clearNominatedNode) return } @@ -256,7 +256,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) { // TODO(#103853): de-duplicate the logic. sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(internalqueue.AssignedPodDelete, nil) } - sched.handleSchedulingFailure(fwk, assumedPodInfo, fmt.Errorf("binding rejected: %w", err), SchedulerError, clearNominatedNode) + sched.handleSchedulingFailure(ctx, fwk, assumedPodInfo, fmt.Errorf("binding rejected: %w", err), SchedulerError, clearNominatedNode) return } // Calculating nodeResourceString can be heavy. Avoid it if klog verbosity is below 2. @@ -809,7 +809,7 @@ func getAttemptsLabel(p *framework.QueuedPodInfo) string { // handleSchedulingFailure records an event for the pod that indicates the // pod has failed to schedule. Also, update the pod condition and nominated node name if set. -func (sched *Scheduler) handleSchedulingFailure(fwk framework.Framework, podInfo *framework.QueuedPodInfo, err error, reason string, nominatingInfo *framework.NominatingInfo) { +func (sched *Scheduler) handleSchedulingFailure(ctx context.Context, fwk framework.Framework, podInfo *framework.QueuedPodInfo, err error, reason string, nominatingInfo *framework.NominatingInfo) { sched.Error(podInfo, err) // Update the scheduling queue with the nominated pod information. Without @@ -823,7 +823,7 @@ func (sched *Scheduler) handleSchedulingFailure(fwk framework.Framework, podInfo pod := podInfo.Pod msg := truncateMessage(err.Error()) fwk.EventRecorder().Eventf(pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", msg) - if err := updatePod(sched.client, pod, &v1.PodCondition{ + if err := updatePod(ctx, sched.client, pod, &v1.PodCondition{ Type: v1.PodScheduled, Status: v1.ConditionFalse, Reason: reason, @@ -843,7 +843,7 @@ func truncateMessage(message string) string { return message[:max-len(suffix)] + suffix } -func updatePod(client clientset.Interface, pod *v1.Pod, condition *v1.PodCondition, nominatingInfo *framework.NominatingInfo) error { +func updatePod(ctx context.Context, client clientset.Interface, pod *v1.Pod, condition *v1.PodCondition, nominatingInfo *framework.NominatingInfo) error { klog.V(3).InfoS("Updating pod condition", "pod", klog.KObj(pod), "conditionType", condition.Type, "conditionStatus", condition.Status, "conditionReason", condition.Reason) podStatusCopy := pod.Status.DeepCopy() // NominatedNodeName is updated only if we are trying to set it, and the value is @@ -855,5 +855,5 @@ func updatePod(client clientset.Interface, pod *v1.Pod, condition *v1.PodConditi if nnnNeedsUpdate { podStatusCopy.NominatedNodeName = nominatingInfo.NominatedNodeName } - return util.PatchPodStatus(client, pod, podStatusCopy) + return util.PatchPodStatus(ctx, client, pod, podStatusCopy) } diff --git a/pkg/scheduler/schedule_one_test.go b/pkg/scheduler/schedule_one_test.go index 8bc6a24dcf6..c563e6c8952 100644 --- a/pkg/scheduler/schedule_one_test.go +++ b/pkg/scheduler/schedule_one_test.go @@ -1237,7 +1237,9 @@ func TestUpdatePod(t *testing.T) { pod := st.MakePod().Name("foo").NominatedNodeName(test.currentNominatedNodeName).Conditions(test.currentPodConditions).Obj() - if err := updatePod(cs, pod, test.newPodCondition, test.newNominatingInfo); err != nil { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + if err := updatePod(ctx, cs, pod, test.newPodCondition, test.newNominatingInfo); err != nil { t.Fatalf("Error calling update: %v", err) } diff --git a/pkg/scheduler/util/utils.go b/pkg/scheduler/util/utils.go index 3d4656c6e21..929dfb8fa44 100644 --- a/pkg/scheduler/util/utils.go +++ b/pkg/scheduler/util/utils.go @@ -92,7 +92,7 @@ func MoreImportantPod(pod1, pod2 *v1.Pod) bool { // PatchPodStatus calculates the delta bytes change from to , // and then submit a request to API server to patch the pod changes. -func PatchPodStatus(cs kubernetes.Interface, old *v1.Pod, newStatus *v1.PodStatus) error { +func PatchPodStatus(ctx context.Context, cs kubernetes.Interface, old *v1.Pod, newStatus *v1.PodStatus) error { if newStatus == nil { return nil } @@ -115,18 +115,18 @@ func PatchPodStatus(cs kubernetes.Interface, old *v1.Pod, newStatus *v1.PodStatu return nil } - _, err = cs.CoreV1().Pods(old.Namespace).Patch(context.TODO(), old.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}, "status") + _, err = cs.CoreV1().Pods(old.Namespace).Patch(ctx, old.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}, "status") return err } // DeletePod deletes the given from API server -func DeletePod(cs kubernetes.Interface, pod *v1.Pod) error { - return cs.CoreV1().Pods(pod.Namespace).Delete(context.TODO(), pod.Name, metav1.DeleteOptions{}) +func DeletePod(ctx context.Context, cs kubernetes.Interface, pod *v1.Pod) error { + return cs.CoreV1().Pods(pod.Namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{}) } // ClearNominatedNodeName internally submit a patch request to API server // to set each pods[*].Status.NominatedNodeName> to "". -func ClearNominatedNodeName(cs kubernetes.Interface, pods ...*v1.Pod) utilerrors.Aggregate { +func ClearNominatedNodeName(ctx context.Context, cs kubernetes.Interface, pods ...*v1.Pod) utilerrors.Aggregate { var errs []error for _, p := range pods { if len(p.Status.NominatedNodeName) == 0 { @@ -134,7 +134,7 @@ func ClearNominatedNodeName(cs kubernetes.Interface, pods ...*v1.Pod) utilerrors } podStatusCopy := p.Status.DeepCopy() podStatusCopy.NominatedNodeName = "" - if err := PatchPodStatus(cs, p, podStatusCopy); err != nil { + if err := PatchPodStatus(ctx, cs, p, podStatusCopy); err != nil { errs = append(errs, err) } } diff --git a/pkg/scheduler/util/utils_test.go b/pkg/scheduler/util/utils_test.go index ef0c77360da..a97f2a10546 100644 --- a/pkg/scheduler/util/utils_test.go +++ b/pkg/scheduler/util/utils_test.go @@ -19,10 +19,11 @@ package util import ( "context" "fmt" - "github.com/google/go-cmp/cmp" "testing" "time" + "github.com/google/go-cmp/cmp" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -161,7 +162,9 @@ func TestRemoveNominatedNodeName(t *testing.T) { Status: v1.PodStatus{NominatedNodeName: test.currentNominatedNodeName}, } - if err := ClearNominatedNodeName(cs, pod); err != nil { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + if err := ClearNominatedNodeName(ctx, cs, pod); err != nil { t.Fatalf("Error calling removeNominatedNodeName: %v", err) } @@ -236,12 +239,14 @@ func TestPatchPodStatus(t *testing.T) { t.Fatal(err) } - err = PatchPodStatus(client, &tc.pod, &tc.statusToUpdate) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + err = PatchPodStatus(ctx, client, &tc.pod, &tc.statusToUpdate) if err != nil { t.Fatal(err) } - retrievedPod, err := client.CoreV1().Pods(tc.pod.Namespace).Get(context.TODO(), tc.pod.Name, metav1.GetOptions{}) + retrievedPod, err := client.CoreV1().Pods(tc.pod.Namespace).Get(ctx, tc.pod.Name, metav1.GetOptions{}) if err != nil { t.Fatal(err) }