mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-21 10:51:29 +00:00
Merge pull request #113127 from tangwz/parallel_preemption
Do pod preemption in parallel.
This commit is contained in:
commit
ca17d9ba2b
@ -37,6 +37,7 @@ import (
|
|||||||
apipod "k8s.io/kubernetes/pkg/api/v1/pod"
|
apipod "k8s.io/kubernetes/pkg/api/v1/pod"
|
||||||
"k8s.io/kubernetes/pkg/features"
|
"k8s.io/kubernetes/pkg/features"
|
||||||
"k8s.io/kubernetes/pkg/scheduler/framework"
|
"k8s.io/kubernetes/pkg/scheduler/framework"
|
||||||
|
"k8s.io/kubernetes/pkg/scheduler/framework/parallelize"
|
||||||
"k8s.io/kubernetes/pkg/scheduler/metrics"
|
"k8s.io/kubernetes/pkg/scheduler/metrics"
|
||||||
"k8s.io/kubernetes/pkg/scheduler/util"
|
"k8s.io/kubernetes/pkg/scheduler/util"
|
||||||
)
|
)
|
||||||
@ -338,7 +339,12 @@ func (ev *Evaluator) SelectCandidate(candidates []Candidate) Candidate {
|
|||||||
func (ev *Evaluator) prepareCandidate(ctx context.Context, c Candidate, pod *v1.Pod, pluginName string) *framework.Status {
|
func (ev *Evaluator) prepareCandidate(ctx context.Context, c Candidate, pod *v1.Pod, pluginName string) *framework.Status {
|
||||||
fh := ev.Handler
|
fh := ev.Handler
|
||||||
cs := ev.Handler.ClientSet()
|
cs := ev.Handler.ClientSet()
|
||||||
for _, victim := range c.Victims().Pods {
|
|
||||||
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
|
defer cancel()
|
||||||
|
errCh := parallelize.NewErrorChannel()
|
||||||
|
preemptPod := func(index int) {
|
||||||
|
victim := c.Victims().Pods[index]
|
||||||
// If the victim is a WaitingPod, send a reject message to the PermitPlugin.
|
// If the victim is a WaitingPod, send a reject message to the PermitPlugin.
|
||||||
// Otherwise we should delete the victim.
|
// Otherwise we should delete the victim.
|
||||||
if waitingPod := fh.GetWaitingPod(victim.UID); waitingPod != nil {
|
if waitingPod := fh.GetWaitingPod(victim.UID); waitingPod != nil {
|
||||||
@ -355,18 +361,26 @@ func (ev *Evaluator) prepareCandidate(ctx context.Context, c Candidate, pod *v1.
|
|||||||
if apipod.UpdatePodCondition(newStatus, condition) {
|
if apipod.UpdatePodCondition(newStatus, condition) {
|
||||||
if err := util.PatchPodStatus(ctx, cs, victim, newStatus); err != nil {
|
if err := util.PatchPodStatus(ctx, cs, victim, newStatus); err != nil {
|
||||||
klog.ErrorS(err, "Preparing pod preemption", "pod", klog.KObj(victim), "preemptor", klog.KObj(pod))
|
klog.ErrorS(err, "Preparing pod preemption", "pod", klog.KObj(victim), "preemptor", klog.KObj(pod))
|
||||||
return framework.AsStatus(err)
|
errCh.SendErrorWithCancel(err, cancel)
|
||||||
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if err := util.DeletePod(ctx, cs, victim); err != nil {
|
if err := util.DeletePod(ctx, cs, victim); err != nil {
|
||||||
klog.ErrorS(err, "Preempting pod", "pod", klog.KObj(victim), "preemptor", klog.KObj(pod))
|
klog.ErrorS(err, "Preempting pod", "pod", klog.KObj(victim), "preemptor", klog.KObj(pod))
|
||||||
return framework.AsStatus(err)
|
errCh.SendErrorWithCancel(err, cancel)
|
||||||
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
fh.EventRecorder().Eventf(victim, pod, v1.EventTypeNormal, "Preempted", "Preempting", "Preempted by %v/%v on node %v",
|
fh.EventRecorder().Eventf(victim, pod, v1.EventTypeNormal, "Preempted", "Preempting", "Preempted by %v/%v on node %v",
|
||||||
pod.Namespace, pod.Name, c.Name())
|
pod.Namespace, pod.Name, c.Name())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fh.Parallelizer().Until(ctx, len(c.Victims().Pods), preemptPod, ev.PluginName)
|
||||||
|
if err := errCh.ReceiveError(); err != nil {
|
||||||
|
return framework.AsStatus(err)
|
||||||
|
}
|
||||||
|
|
||||||
metrics.PreemptionVictims.Observe(float64(len(c.Victims().Pods)))
|
metrics.PreemptionVictims.Observe(float64(len(c.Victims().Pods)))
|
||||||
|
|
||||||
// Lower priority pods nominated to run on this node, may no longer fit on
|
// Lower priority pods nominated to run on this node, may no longer fit on
|
||||||
|
Loading…
Reference in New Issue
Block a user