Do pod preemption in parallel.

This commit is contained in:
tangwz 2022-10-18 11:27:34 +08:00
parent 507cf76570
commit 480ae2a7c1

View File

@ -37,6 +37,7 @@ import (
apipod "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/parallelize"
"k8s.io/kubernetes/pkg/scheduler/metrics"
"k8s.io/kubernetes/pkg/scheduler/util"
)
@ -338,7 +339,12 @@ func (ev *Evaluator) SelectCandidate(candidates []Candidate) Candidate {
func (ev *Evaluator) prepareCandidate(ctx context.Context, c Candidate, pod *v1.Pod, pluginName string) *framework.Status {
fh := ev.Handler
cs := ev.Handler.ClientSet()
for _, victim := range c.Victims().Pods {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
errCh := parallelize.NewErrorChannel()
preemptPod := func(index int) {
victim := c.Victims().Pods[index]
// If the victim is a WaitingPod, send a reject message to the PermitPlugin.
// Otherwise we should delete the victim.
if waitingPod := fh.GetWaitingPod(victim.UID); waitingPod != nil {
@ -355,18 +361,26 @@ func (ev *Evaluator) prepareCandidate(ctx context.Context, c Candidate, pod *v1.
if apipod.UpdatePodCondition(newStatus, condition) {
if err := util.PatchPodStatus(ctx, cs, victim, newStatus); err != nil {
klog.ErrorS(err, "Preparing pod preemption", "pod", klog.KObj(victim), "preemptor", klog.KObj(pod))
return framework.AsStatus(err)
errCh.SendErrorWithCancel(err, cancel)
return
}
}
}
if err := util.DeletePod(ctx, cs, victim); err != nil {
klog.ErrorS(err, "Preempting pod", "pod", klog.KObj(victim), "preemptor", klog.KObj(pod))
return framework.AsStatus(err)
errCh.SendErrorWithCancel(err, cancel)
return
}
}
fh.EventRecorder().Eventf(victim, pod, v1.EventTypeNormal, "Preempted", "Preempting", "Preempted by %v/%v on node %v",
pod.Namespace, pod.Name, c.Name())
}
fh.Parallelizer().Until(ctx, len(c.Victims().Pods), preemptPod, ev.PluginName)
if err := errCh.ReceiveError(); err != nil {
return framework.AsStatus(err)
}
metrics.PreemptionVictims.Observe(float64(len(c.Victims().Pods)))
// Lower priority pods nominated to run on this node, may no longer fit on