Using inherited context in utils

This PR focus on utils in scheduler. When to patch pod status or delete pod,
it is better to use an inherited context.
Although this do not take a big effect in preventing goroutine leak, but it is a best practice

Signed-off-by: kerthcet <kerthcet@gmail.com>
This commit is contained in:
kerthcet 2022-05-25 10:14:40 +08:00
parent cc71683fda
commit 67b0ce87fe
5 changed files with 34 additions and 27 deletions

View File

@ -186,7 +186,7 @@ func (ev *Evaluator) Preempt(ctx context.Context, pod *v1.Pod, m framework.NodeT
}
// 5) Perform preparation work before nominating the selected candidate.
if status := ev.prepareCandidate(bestCandidate, pod, ev.PluginName); !status.IsSuccess() {
if status := ev.prepareCandidate(ctx, bestCandidate, pod, ev.PluginName); !status.IsSuccess() {
return nil, status
}
@ -207,7 +207,7 @@ func (ev *Evaluator) findCandidates(ctx context.Context, pod *v1.Pod, m framewor
if len(potentialNodes) == 0 {
klog.V(3).InfoS("Preemption will not help schedule pod on any node", "pod", klog.KObj(pod))
// In this case, we should clean-up any existing nominated node name of the pod.
if err := util.ClearNominatedNodeName(ev.Handler.ClientSet(), pod); err != nil {
if err := util.ClearNominatedNodeName(ctx, ev.Handler.ClientSet(), pod); err != nil {
klog.ErrorS(err, "Cannot clear 'NominatedNodeName' field of pod", "pod", klog.KObj(pod))
// We do not return as this error is not critical.
}
@ -328,7 +328,7 @@ func (ev *Evaluator) SelectCandidate(candidates []Candidate) Candidate {
// - Evict the victim pods
// - Reject the victim pods if they are in waitingPod map
// - Clear the low-priority pods' nominatedNodeName status if needed
func (ev *Evaluator) prepareCandidate(c Candidate, pod *v1.Pod, pluginName string) *framework.Status {
func (ev *Evaluator) prepareCandidate(ctx context.Context, c Candidate, pod *v1.Pod, pluginName string) *framework.Status {
fh := ev.Handler
cs := ev.Handler.ClientSet()
for _, victim := range c.Victims().Pods {
@ -336,7 +336,7 @@ func (ev *Evaluator) prepareCandidate(c Candidate, pod *v1.Pod, pluginName strin
// Otherwise we should delete the victim.
if waitingPod := fh.GetWaitingPod(victim.UID); waitingPod != nil {
waitingPod.Reject(pluginName, "preempted")
} else if err := util.DeletePod(cs, victim); err != nil {
} else if err := util.DeletePod(ctx, cs, victim); err != nil {
klog.ErrorS(err, "Preempting pod", "pod", klog.KObj(victim), "preemptor", klog.KObj(pod))
return framework.AsStatus(err)
}
@ -350,7 +350,7 @@ func (ev *Evaluator) prepareCandidate(c Candidate, pod *v1.Pod, pluginName strin
// nomination updates these pods and moves them to the active queue. It
// lets scheduler find another place for them.
nominatedPods := getLowerPriorityNominatedPods(fh, pod, c.Name())
if err := util.ClearNominatedNodeName(cs, nominatedPods...); err != nil {
if err := util.ClearNominatedNodeName(ctx, cs, nominatedPods...); err != nil {
klog.ErrorS(err, "Cannot clear 'NominatedNodeName' field")
// We do not return as this error is not critical.
}

View File

@ -129,7 +129,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
klog.ErrorS(err, "Error selecting node for pod", "pod", klog.KObj(pod))
metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))
}
sched.handleSchedulingFailure(fwk, podInfo, err, v1.PodReasonUnschedulable, nominatingInfo)
sched.handleSchedulingFailure(ctx, fwk, podInfo, err, v1.PodReasonUnschedulable, nominatingInfo)
return
}
metrics.SchedulingAlgorithmLatency.Observe(metrics.SinceInSeconds(start))
@ -146,7 +146,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
// This relies on the fact that Error will check if the pod has been bound
// to a node and if so will not add it back to the unscheduled pods queue
// (otherwise this would cause an infinite loop).
sched.handleSchedulingFailure(fwk, assumedPodInfo, err, SchedulerError, clearNominatedNode)
sched.handleSchedulingFailure(ctx, fwk, assumedPodInfo, err, SchedulerError, clearNominatedNode)
return
}
@ -158,7 +158,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
if forgetErr := sched.Cache.ForgetPod(assumedPod); forgetErr != nil {
klog.ErrorS(forgetErr, "Scheduler cache ForgetPod failed")
}
sched.handleSchedulingFailure(fwk, assumedPodInfo, sts.AsError(), SchedulerError, clearNominatedNode)
sched.handleSchedulingFailure(ctx, fwk, assumedPodInfo, sts.AsError(), SchedulerError, clearNominatedNode)
return
}
@ -178,7 +178,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
if forgetErr := sched.Cache.ForgetPod(assumedPod); forgetErr != nil {
klog.ErrorS(forgetErr, "Scheduler cache ForgetPod failed")
}
sched.handleSchedulingFailure(fwk, assumedPodInfo, runPermitStatus.AsError(), reason, clearNominatedNode)
sched.handleSchedulingFailure(ctx, fwk, assumedPodInfo, runPermitStatus.AsError(), reason, clearNominatedNode)
return
}
@ -221,7 +221,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
return assumedPod.UID != pod.UID
})
}
sched.handleSchedulingFailure(fwk, assumedPodInfo, waitOnPermitStatus.AsError(), reason, clearNominatedNode)
sched.handleSchedulingFailure(ctx, fwk, assumedPodInfo, waitOnPermitStatus.AsError(), reason, clearNominatedNode)
return
}
@ -239,7 +239,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
// TODO(#103853): de-duplicate the logic.
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(internalqueue.AssignedPodDelete, nil)
}
sched.handleSchedulingFailure(fwk, assumedPodInfo, preBindStatus.AsError(), SchedulerError, clearNominatedNode)
sched.handleSchedulingFailure(ctx, fwk, assumedPodInfo, preBindStatus.AsError(), SchedulerError, clearNominatedNode)
return
}
@ -256,7 +256,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
// TODO(#103853): de-duplicate the logic.
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(internalqueue.AssignedPodDelete, nil)
}
sched.handleSchedulingFailure(fwk, assumedPodInfo, fmt.Errorf("binding rejected: %w", err), SchedulerError, clearNominatedNode)
sched.handleSchedulingFailure(ctx, fwk, assumedPodInfo, fmt.Errorf("binding rejected: %w", err), SchedulerError, clearNominatedNode)
return
}
// Calculating nodeResourceString can be heavy. Avoid it if klog verbosity is below 2.
@ -809,7 +809,7 @@ func getAttemptsLabel(p *framework.QueuedPodInfo) string {
// handleSchedulingFailure records an event for the pod that indicates the
// pod has failed to schedule. Also, update the pod condition and nominated node name if set.
func (sched *Scheduler) handleSchedulingFailure(fwk framework.Framework, podInfo *framework.QueuedPodInfo, err error, reason string, nominatingInfo *framework.NominatingInfo) {
func (sched *Scheduler) handleSchedulingFailure(ctx context.Context, fwk framework.Framework, podInfo *framework.QueuedPodInfo, err error, reason string, nominatingInfo *framework.NominatingInfo) {
sched.Error(podInfo, err)
// Update the scheduling queue with the nominated pod information. Without
@ -823,7 +823,7 @@ func (sched *Scheduler) handleSchedulingFailure(fwk framework.Framework, podInfo
pod := podInfo.Pod
msg := truncateMessage(err.Error())
fwk.EventRecorder().Eventf(pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", msg)
if err := updatePod(sched.client, pod, &v1.PodCondition{
if err := updatePod(ctx, sched.client, pod, &v1.PodCondition{
Type: v1.PodScheduled,
Status: v1.ConditionFalse,
Reason: reason,
@ -843,7 +843,7 @@ func truncateMessage(message string) string {
return message[:max-len(suffix)] + suffix
}
func updatePod(client clientset.Interface, pod *v1.Pod, condition *v1.PodCondition, nominatingInfo *framework.NominatingInfo) error {
func updatePod(ctx context.Context, client clientset.Interface, pod *v1.Pod, condition *v1.PodCondition, nominatingInfo *framework.NominatingInfo) error {
klog.V(3).InfoS("Updating pod condition", "pod", klog.KObj(pod), "conditionType", condition.Type, "conditionStatus", condition.Status, "conditionReason", condition.Reason)
podStatusCopy := pod.Status.DeepCopy()
// NominatedNodeName is updated only if we are trying to set it, and the value is
@ -855,5 +855,5 @@ func updatePod(client clientset.Interface, pod *v1.Pod, condition *v1.PodConditi
if nnnNeedsUpdate {
podStatusCopy.NominatedNodeName = nominatingInfo.NominatedNodeName
}
return util.PatchPodStatus(client, pod, podStatusCopy)
return util.PatchPodStatus(ctx, client, pod, podStatusCopy)
}

View File

@ -1237,7 +1237,9 @@ func TestUpdatePod(t *testing.T) {
pod := st.MakePod().Name("foo").NominatedNodeName(test.currentNominatedNodeName).Conditions(test.currentPodConditions).Obj()
if err := updatePod(cs, pod, test.newPodCondition, test.newNominatingInfo); err != nil {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
if err := updatePod(ctx, cs, pod, test.newPodCondition, test.newNominatingInfo); err != nil {
t.Fatalf("Error calling update: %v", err)
}

View File

@ -92,7 +92,7 @@ func MoreImportantPod(pod1, pod2 *v1.Pod) bool {
// PatchPodStatus calculates the delta bytes change from <old.Status> to <newStatus>,
// and then submit a request to API server to patch the pod changes.
func PatchPodStatus(cs kubernetes.Interface, old *v1.Pod, newStatus *v1.PodStatus) error {
func PatchPodStatus(ctx context.Context, cs kubernetes.Interface, old *v1.Pod, newStatus *v1.PodStatus) error {
if newStatus == nil {
return nil
}
@ -115,18 +115,18 @@ func PatchPodStatus(cs kubernetes.Interface, old *v1.Pod, newStatus *v1.PodStatu
return nil
}
_, err = cs.CoreV1().Pods(old.Namespace).Patch(context.TODO(), old.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}, "status")
_, err = cs.CoreV1().Pods(old.Namespace).Patch(ctx, old.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}, "status")
return err
}
// DeletePod deletes the given <pod> from API server
func DeletePod(cs kubernetes.Interface, pod *v1.Pod) error {
return cs.CoreV1().Pods(pod.Namespace).Delete(context.TODO(), pod.Name, metav1.DeleteOptions{})
func DeletePod(ctx context.Context, cs kubernetes.Interface, pod *v1.Pod) error {
return cs.CoreV1().Pods(pod.Namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{})
}
// ClearNominatedNodeName internally submit a patch request to API server
// to set each pods[*].Status.NominatedNodeName> to "".
func ClearNominatedNodeName(cs kubernetes.Interface, pods ...*v1.Pod) utilerrors.Aggregate {
func ClearNominatedNodeName(ctx context.Context, cs kubernetes.Interface, pods ...*v1.Pod) utilerrors.Aggregate {
var errs []error
for _, p := range pods {
if len(p.Status.NominatedNodeName) == 0 {
@ -134,7 +134,7 @@ func ClearNominatedNodeName(cs kubernetes.Interface, pods ...*v1.Pod) utilerrors
}
podStatusCopy := p.Status.DeepCopy()
podStatusCopy.NominatedNodeName = ""
if err := PatchPodStatus(cs, p, podStatusCopy); err != nil {
if err := PatchPodStatus(ctx, cs, p, podStatusCopy); err != nil {
errs = append(errs, err)
}
}

View File

@ -19,10 +19,11 @@ package util
import (
"context"
"fmt"
"github.com/google/go-cmp/cmp"
"testing"
"time"
"github.com/google/go-cmp/cmp"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
@ -161,7 +162,9 @@ func TestRemoveNominatedNodeName(t *testing.T) {
Status: v1.PodStatus{NominatedNodeName: test.currentNominatedNodeName},
}
if err := ClearNominatedNodeName(cs, pod); err != nil {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
if err := ClearNominatedNodeName(ctx, cs, pod); err != nil {
t.Fatalf("Error calling removeNominatedNodeName: %v", err)
}
@ -236,12 +239,14 @@ func TestPatchPodStatus(t *testing.T) {
t.Fatal(err)
}
err = PatchPodStatus(client, &tc.pod, &tc.statusToUpdate)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
err = PatchPodStatus(ctx, client, &tc.pod, &tc.statusToUpdate)
if err != nil {
t.Fatal(err)
}
retrievedPod, err := client.CoreV1().Pods(tc.pod.Namespace).Get(context.TODO(), tc.pod.Name, metav1.GetOptions{})
retrievedPod, err := client.CoreV1().Pods(tc.pod.Namespace).Get(ctx, tc.pod.Name, metav1.GetOptions{})
if err != nil {
t.Fatal(err)
}