Merge pull request #110202 from kerthcet/cleanup/remove-potential-goroutine-leak-in-utils

Using inherited context in utils
This commit is contained in:
Kubernetes Prow Robot 2022-06-14 09:09:34 -07:00 committed by GitHub
commit 597cb5fac8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 34 additions and 27 deletions

View File

@ -186,7 +186,7 @@ func (ev *Evaluator) Preempt(ctx context.Context, pod *v1.Pod, m framework.NodeT
}
// 5) Perform preparation work before nominating the selected candidate.
if status := ev.prepareCandidate(bestCandidate, pod, ev.PluginName); !status.IsSuccess() {
if status := ev.prepareCandidate(ctx, bestCandidate, pod, ev.PluginName); !status.IsSuccess() {
return nil, status
}
@ -207,7 +207,7 @@ func (ev *Evaluator) findCandidates(ctx context.Context, pod *v1.Pod, m framewor
if len(potentialNodes) == 0 {
klog.V(3).InfoS("Preemption will not help schedule pod on any node", "pod", klog.KObj(pod))
// In this case, we should clean-up any existing nominated node name of the pod.
if err := util.ClearNominatedNodeName(ev.Handler.ClientSet(), pod); err != nil {
if err := util.ClearNominatedNodeName(ctx, ev.Handler.ClientSet(), pod); err != nil {
klog.ErrorS(err, "Cannot clear 'NominatedNodeName' field of pod", "pod", klog.KObj(pod))
// We do not return as this error is not critical.
}
@ -328,7 +328,7 @@ func (ev *Evaluator) SelectCandidate(candidates []Candidate) Candidate {
// - Evict the victim pods
// - Reject the victim pods if they are in waitingPod map
// - Clear the low-priority pods' nominatedNodeName status if needed
func (ev *Evaluator) prepareCandidate(c Candidate, pod *v1.Pod, pluginName string) *framework.Status {
func (ev *Evaluator) prepareCandidate(ctx context.Context, c Candidate, pod *v1.Pod, pluginName string) *framework.Status {
fh := ev.Handler
cs := ev.Handler.ClientSet()
for _, victim := range c.Victims().Pods {
@ -336,7 +336,7 @@ func (ev *Evaluator) prepareCandidate(c Candidate, pod *v1.Pod, pluginName strin
// Otherwise we should delete the victim.
if waitingPod := fh.GetWaitingPod(victim.UID); waitingPod != nil {
waitingPod.Reject(pluginName, "preempted")
} else if err := util.DeletePod(cs, victim); err != nil {
} else if err := util.DeletePod(ctx, cs, victim); err != nil {
klog.ErrorS(err, "Preempting pod", "pod", klog.KObj(victim), "preemptor", klog.KObj(pod))
return framework.AsStatus(err)
}
@ -350,7 +350,7 @@ func (ev *Evaluator) prepareCandidate(c Candidate, pod *v1.Pod, pluginName strin
// nomination updates these pods and moves them to the active queue. It
// lets scheduler find another place for them.
nominatedPods := getLowerPriorityNominatedPods(fh, pod, c.Name())
if err := util.ClearNominatedNodeName(cs, nominatedPods...); err != nil {
if err := util.ClearNominatedNodeName(ctx, cs, nominatedPods...); err != nil {
klog.ErrorS(err, "Cannot clear 'NominatedNodeName' field")
// We do not return as this error is not critical.
}

View File

@ -129,7 +129,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
klog.ErrorS(err, "Error selecting node for pod", "pod", klog.KObj(pod))
metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))
}
sched.handleSchedulingFailure(fwk, podInfo, err, v1.PodReasonUnschedulable, nominatingInfo)
sched.handleSchedulingFailure(ctx, fwk, podInfo, err, v1.PodReasonUnschedulable, nominatingInfo)
return
}
metrics.SchedulingAlgorithmLatency.Observe(metrics.SinceInSeconds(start))
@ -146,7 +146,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
// This relies on the fact that Error will check if the pod has been bound
// to a node and if so will not add it back to the unscheduled pods queue
// (otherwise this would cause an infinite loop).
sched.handleSchedulingFailure(fwk, assumedPodInfo, err, SchedulerError, clearNominatedNode)
sched.handleSchedulingFailure(ctx, fwk, assumedPodInfo, err, SchedulerError, clearNominatedNode)
return
}
@ -158,7 +158,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
if forgetErr := sched.Cache.ForgetPod(assumedPod); forgetErr != nil {
klog.ErrorS(forgetErr, "Scheduler cache ForgetPod failed")
}
sched.handleSchedulingFailure(fwk, assumedPodInfo, sts.AsError(), SchedulerError, clearNominatedNode)
sched.handleSchedulingFailure(ctx, fwk, assumedPodInfo, sts.AsError(), SchedulerError, clearNominatedNode)
return
}
@ -178,7 +178,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
if forgetErr := sched.Cache.ForgetPod(assumedPod); forgetErr != nil {
klog.ErrorS(forgetErr, "Scheduler cache ForgetPod failed")
}
sched.handleSchedulingFailure(fwk, assumedPodInfo, runPermitStatus.AsError(), reason, clearNominatedNode)
sched.handleSchedulingFailure(ctx, fwk, assumedPodInfo, runPermitStatus.AsError(), reason, clearNominatedNode)
return
}
@ -221,7 +221,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
return assumedPod.UID != pod.UID
})
}
sched.handleSchedulingFailure(fwk, assumedPodInfo, waitOnPermitStatus.AsError(), reason, clearNominatedNode)
sched.handleSchedulingFailure(ctx, fwk, assumedPodInfo, waitOnPermitStatus.AsError(), reason, clearNominatedNode)
return
}
@ -239,7 +239,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
// TODO(#103853): de-duplicate the logic.
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(internalqueue.AssignedPodDelete, nil)
}
sched.handleSchedulingFailure(fwk, assumedPodInfo, preBindStatus.AsError(), SchedulerError, clearNominatedNode)
sched.handleSchedulingFailure(ctx, fwk, assumedPodInfo, preBindStatus.AsError(), SchedulerError, clearNominatedNode)
return
}
@ -256,7 +256,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
// TODO(#103853): de-duplicate the logic.
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(internalqueue.AssignedPodDelete, nil)
}
sched.handleSchedulingFailure(fwk, assumedPodInfo, fmt.Errorf("binding rejected: %w", err), SchedulerError, clearNominatedNode)
sched.handleSchedulingFailure(ctx, fwk, assumedPodInfo, fmt.Errorf("binding rejected: %w", err), SchedulerError, clearNominatedNode)
return
}
// Calculating nodeResourceString can be heavy. Avoid it if klog verbosity is below 2.
@ -809,7 +809,7 @@ func getAttemptsLabel(p *framework.QueuedPodInfo) string {
// handleSchedulingFailure records an event for the pod that indicates the
// pod has failed to schedule. Also, update the pod condition and nominated node name if set.
func (sched *Scheduler) handleSchedulingFailure(fwk framework.Framework, podInfo *framework.QueuedPodInfo, err error, reason string, nominatingInfo *framework.NominatingInfo) {
func (sched *Scheduler) handleSchedulingFailure(ctx context.Context, fwk framework.Framework, podInfo *framework.QueuedPodInfo, err error, reason string, nominatingInfo *framework.NominatingInfo) {
sched.Error(podInfo, err)
// Update the scheduling queue with the nominated pod information. Without
@ -823,7 +823,7 @@ func (sched *Scheduler) handleSchedulingFailure(fwk framework.Framework, podInfo
pod := podInfo.Pod
msg := truncateMessage(err.Error())
fwk.EventRecorder().Eventf(pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", msg)
if err := updatePod(sched.client, pod, &v1.PodCondition{
if err := updatePod(ctx, sched.client, pod, &v1.PodCondition{
Type: v1.PodScheduled,
Status: v1.ConditionFalse,
Reason: reason,
@ -843,7 +843,7 @@ func truncateMessage(message string) string {
return message[:max-len(suffix)] + suffix
}
func updatePod(client clientset.Interface, pod *v1.Pod, condition *v1.PodCondition, nominatingInfo *framework.NominatingInfo) error {
func updatePod(ctx context.Context, client clientset.Interface, pod *v1.Pod, condition *v1.PodCondition, nominatingInfo *framework.NominatingInfo) error {
klog.V(3).InfoS("Updating pod condition", "pod", klog.KObj(pod), "conditionType", condition.Type, "conditionStatus", condition.Status, "conditionReason", condition.Reason)
podStatusCopy := pod.Status.DeepCopy()
// NominatedNodeName is updated only if we are trying to set it, and the value is
@ -855,5 +855,5 @@ func updatePod(client clientset.Interface, pod *v1.Pod, condition *v1.PodConditi
if nnnNeedsUpdate {
podStatusCopy.NominatedNodeName = nominatingInfo.NominatedNodeName
}
return util.PatchPodStatus(client, pod, podStatusCopy)
return util.PatchPodStatus(ctx, client, pod, podStatusCopy)
}

View File

@ -1237,7 +1237,9 @@ func TestUpdatePod(t *testing.T) {
pod := st.MakePod().Name("foo").NominatedNodeName(test.currentNominatedNodeName).Conditions(test.currentPodConditions).Obj()
if err := updatePod(cs, pod, test.newPodCondition, test.newNominatingInfo); err != nil {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
if err := updatePod(ctx, cs, pod, test.newPodCondition, test.newNominatingInfo); err != nil {
t.Fatalf("Error calling update: %v", err)
}

View File

@ -92,7 +92,7 @@ func MoreImportantPod(pod1, pod2 *v1.Pod) bool {
// PatchPodStatus calculates the delta bytes change from <old.Status> to <newStatus>,
// and then submit a request to API server to patch the pod changes.
func PatchPodStatus(cs kubernetes.Interface, old *v1.Pod, newStatus *v1.PodStatus) error {
func PatchPodStatus(ctx context.Context, cs kubernetes.Interface, old *v1.Pod, newStatus *v1.PodStatus) error {
if newStatus == nil {
return nil
}
@ -115,18 +115,18 @@ func PatchPodStatus(cs kubernetes.Interface, old *v1.Pod, newStatus *v1.PodStatu
return nil
}
_, err = cs.CoreV1().Pods(old.Namespace).Patch(context.TODO(), old.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}, "status")
_, err = cs.CoreV1().Pods(old.Namespace).Patch(ctx, old.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}, "status")
return err
}
// DeletePod deletes the given <pod> from API server
func DeletePod(cs kubernetes.Interface, pod *v1.Pod) error {
return cs.CoreV1().Pods(pod.Namespace).Delete(context.TODO(), pod.Name, metav1.DeleteOptions{})
func DeletePod(ctx context.Context, cs kubernetes.Interface, pod *v1.Pod) error {
return cs.CoreV1().Pods(pod.Namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{})
}
// ClearNominatedNodeName internally submit a patch request to API server
// to set each pods[*].Status.NominatedNodeName> to "".
func ClearNominatedNodeName(cs kubernetes.Interface, pods ...*v1.Pod) utilerrors.Aggregate {
func ClearNominatedNodeName(ctx context.Context, cs kubernetes.Interface, pods ...*v1.Pod) utilerrors.Aggregate {
var errs []error
for _, p := range pods {
if len(p.Status.NominatedNodeName) == 0 {
@ -134,7 +134,7 @@ func ClearNominatedNodeName(cs kubernetes.Interface, pods ...*v1.Pod) utilerrors
}
podStatusCopy := p.Status.DeepCopy()
podStatusCopy.NominatedNodeName = ""
if err := PatchPodStatus(cs, p, podStatusCopy); err != nil {
if err := PatchPodStatus(ctx, cs, p, podStatusCopy); err != nil {
errs = append(errs, err)
}
}

View File

@ -19,10 +19,11 @@ package util
import (
"context"
"fmt"
"github.com/google/go-cmp/cmp"
"testing"
"time"
"github.com/google/go-cmp/cmp"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
@ -161,7 +162,9 @@ func TestRemoveNominatedNodeName(t *testing.T) {
Status: v1.PodStatus{NominatedNodeName: test.currentNominatedNodeName},
}
if err := ClearNominatedNodeName(cs, pod); err != nil {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
if err := ClearNominatedNodeName(ctx, cs, pod); err != nil {
t.Fatalf("Error calling removeNominatedNodeName: %v", err)
}
@ -236,12 +239,14 @@ func TestPatchPodStatus(t *testing.T) {
t.Fatal(err)
}
err = PatchPodStatus(client, &tc.pod, &tc.statusToUpdate)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
err = PatchPodStatus(ctx, client, &tc.pod, &tc.statusToUpdate)
if err != nil {
t.Fatal(err)
}
retrievedPod, err := client.CoreV1().Pods(tc.pod.Namespace).Get(context.TODO(), tc.pod.Name, metav1.GetOptions{})
retrievedPod, err := client.CoreV1().Pods(tc.pod.Namespace).Get(ctx, tc.pod.Name, metav1.GetOptions{})
if err != nil {
t.Fatal(err)
}