diff --git a/pkg/scheduler/framework/preemption/preemption.go b/pkg/scheduler/framework/preemption/preemption.go index 0fad6f30b0e..8834fb5fe43 100644 --- a/pkg/scheduler/framework/preemption/preemption.go +++ b/pkg/scheduler/framework/preemption/preemption.go @@ -37,6 +37,7 @@ import ( apipod "k8s.io/kubernetes/pkg/api/v1/pod" "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/scheduler/framework" + "k8s.io/kubernetes/pkg/scheduler/framework/parallelize" "k8s.io/kubernetes/pkg/scheduler/metrics" "k8s.io/kubernetes/pkg/scheduler/util" ) @@ -338,7 +339,12 @@ func (ev *Evaluator) SelectCandidate(candidates []Candidate) Candidate { func (ev *Evaluator) prepareCandidate(ctx context.Context, c Candidate, pod *v1.Pod, pluginName string) *framework.Status { fh := ev.Handler cs := ev.Handler.ClientSet() - for _, victim := range c.Victims().Pods { + + ctx, cancel := context.WithCancel(ctx) + defer cancel() + errCh := parallelize.NewErrorChannel() + preemptPod := func(index int) { + victim := c.Victims().Pods[index] // If the victim is a WaitingPod, send a reject message to the PermitPlugin. // Otherwise we should delete the victim. if waitingPod := fh.GetWaitingPod(victim.UID); waitingPod != nil { @@ -355,18 +361,26 @@ func (ev *Evaluator) prepareCandidate(ctx context.Context, c Candidate, pod *v1. if apipod.UpdatePodCondition(newStatus, condition) { if err := util.PatchPodStatus(ctx, cs, victim, newStatus); err != nil { klog.ErrorS(err, "Preparing pod preemption", "pod", klog.KObj(victim), "preemptor", klog.KObj(pod)) - return framework.AsStatus(err) + errCh.SendErrorWithCancel(err, cancel) + return } } } if err := util.DeletePod(ctx, cs, victim); err != nil { klog.ErrorS(err, "Preempting pod", "pod", klog.KObj(victim), "preemptor", klog.KObj(pod)) - return framework.AsStatus(err) + errCh.SendErrorWithCancel(err, cancel) + return } } fh.EventRecorder().Eventf(victim, pod, v1.EventTypeNormal, "Preempted", "Preempting", "Preempted by %v/%v on node %v", pod.Namespace, pod.Name, c.Name()) } + + fh.Parallelizer().Until(ctx, len(c.Victims().Pods), preemptPod, ev.PluginName) + if err := errCh.ReceiveError(); err != nil { + return framework.AsStatus(err) + } + metrics.PreemptionVictims.Observe(float64(len(c.Victims().Pods))) // Lower priority pods nominated to run on this node, may no longer fit on