fix: address reviews

This commit is contained in:
Kensei Nakada 2024-10-30 17:36:11 +09:00
parent 69a8d0ec0b
commit e40f3f40bd
2 changed files with 62 additions and 59 deletions

View File

@ -30,6 +30,7 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/sets"
corelisters "k8s.io/client-go/listers/core/v1"
policylisters "k8s.io/client-go/listers/policy/v1"
corev1helpers "k8s.io/component-helpers/scheduling/corev1"
@ -133,7 +134,7 @@ type Evaluator struct {
mu sync.RWMutex
// preempting is a map that records the pods that are currently triggering preemption asynchronously,
// which is used to prevent the pods from entering the scheduling cycle meanwhile.
preempting map[types.UID]struct{}
preempting sets.Set[types.UID]
// PreemptPod is a function that actually makes API calls to preempt a specific Pod.
// This is exposed to be replaced during tests.
@ -146,19 +147,60 @@ func NewEvaluator(pluginName string, fh framework.Handle, i Interface, enableAsy
podLister := fh.SharedInformerFactory().Core().V1().Pods().Lister()
pdbLister := fh.SharedInformerFactory().Policy().V1().PodDisruptionBudgets().Lister()
e := &Evaluator{
ev := &Evaluator{
PluginName: names.DefaultPreemption,
Handler: fh,
PodLister: podLister,
PdbLister: pdbLister,
Interface: i,
enableAsyncPreemption: enableAsyncPreemption,
preempting: make(map[types.UID]struct{}),
preempting: sets.New[types.UID](),
}
e.PreemptPod = e.preemptPod
// PreemptPod actually makes API calls to preempt a specific Pod.
//
// We implement it here directly, rather than creating a separate method like ev.preemptPod(...)
// to prevent the misuse of the PreemptPod function.
ev.PreemptPod = func(ctx context.Context, c Candidate, preemptor, victim *v1.Pod, pluginName string) error {
logger := klog.FromContext(ctx)
return e
// If the victim is a WaitingPod, send a reject message to the PermitPlugin.
// Otherwise we should delete the victim.
if waitingPod := ev.Handler.GetWaitingPod(victim.UID); waitingPod != nil {
waitingPod.Reject(pluginName, "preempted")
logger.V(2).Info("Preemptor pod rejected a waiting pod", "preemptor", klog.KObj(preemptor), "waitingPod", klog.KObj(victim), "node", c.Name())
} else {
condition := &v1.PodCondition{
Type: v1.DisruptionTarget,
Status: v1.ConditionTrue,
Reason: v1.PodReasonPreemptionByScheduler,
Message: fmt.Sprintf("%s: preempting to accommodate a higher priority pod", preemptor.Spec.SchedulerName),
}
newStatus := victim.Status.DeepCopy()
updated := apipod.UpdatePodCondition(newStatus, condition)
if updated {
if err := util.PatchPodStatus(ctx, ev.Handler.ClientSet(), victim, newStatus); err != nil {
logger.Error(err, "Could not add DisruptionTarget condition due to preemption", "pod", klog.KObj(victim), "preemptor", klog.KObj(preemptor))
return err
}
}
if err := util.DeletePod(ctx, ev.Handler.ClientSet(), victim); err != nil {
if !apierrors.IsNotFound(err) {
logger.Error(err, "Preempted pod", "pod", klog.KObj(victim), "preemptor", klog.KObj(preemptor))
return err
}
logger.V(2).Info("Victim Pod is already deleted", "preemptor", klog.KObj(preemptor), "victim", klog.KObj(victim), "node", c.Name())
return nil
}
logger.V(2).Info("Preemptor Pod preempted victim Pod", "preemptor", klog.KObj(preemptor), "victim", klog.KObj(victim), "node", c.Name())
}
ev.Handler.EventRecorder().Eventf(victim, preemptor, v1.EventTypeNormal, "Preempted", "Preempting", "Preempted by pod %v on node %v", preemptor.UID, c.Name())
return nil
}
return ev
}
// IsPodRunningPreemption returns true if the pod is currently triggering preemption asynchronously.
@ -384,46 +426,6 @@ func (ev *Evaluator) SelectCandidate(ctx context.Context, candidates []Candidate
return candidates[0]
}
// preemptPod actually makes API calls to preempt a specific Pod.
func (ev *Evaluator) preemptPod(ctx context.Context, c Candidate, preemptor, victim *v1.Pod, pluginName string) error {
logger := klog.FromContext(ctx)
// If the victim is a WaitingPod, send a reject message to the PermitPlugin.
// Otherwise we should delete the victim.
if waitingPod := ev.Handler.GetWaitingPod(victim.UID); waitingPod != nil {
waitingPod.Reject(pluginName, "preempted")
logger.V(2).Info("Preemptor pod rejected a waiting pod", "preemptor", klog.KObj(preemptor), "waitingPod", klog.KObj(victim), "node", c.Name())
} else {
condition := &v1.PodCondition{
Type: v1.DisruptionTarget,
Status: v1.ConditionTrue,
Reason: v1.PodReasonPreemptionByScheduler,
Message: fmt.Sprintf("%s: preempting to accommodate a higher priority pod", preemptor.Spec.SchedulerName),
}
newStatus := victim.Status.DeepCopy()
updated := apipod.UpdatePodCondition(newStatus, condition)
if updated {
if err := util.PatchPodStatus(ctx, ev.Handler.ClientSet(), victim, newStatus); err != nil {
logger.Error(err, "Could not add DisruptionTarget condition due to preemption", "pod", klog.KObj(victim), "preemptor", klog.KObj(preemptor))
return err
}
}
if err := util.DeletePod(ctx, ev.Handler.ClientSet(), victim); err != nil {
if !apierrors.IsNotFound(err) {
logger.Error(err, "Preempted pod", "pod", klog.KObj(victim), "preemptor", klog.KObj(preemptor))
return err
}
logger.V(2).Info("Victim Pod is already deleted", "preemptor", klog.KObj(preemptor), "victim", klog.KObj(victim), "node", c.Name())
return nil
}
logger.V(2).Info("Preemptor Pod preempted victim Pod", "preemptor", klog.KObj(preemptor), "victim", klog.KObj(victim), "node", c.Name())
}
ev.Handler.EventRecorder().Eventf(victim, preemptor, v1.EventTypeNormal, "Preempted", "Preempting", "Preempted by pod %v on node %v", preemptor.UID, c.Name())
return nil
}
// prepareCandidate does some preparation work before nominating the selected candidate:
// - Evict the victim pods
// - Reject the victim pods if they are in waitingPod map

View File

@ -669,7 +669,7 @@ func TestAsyncPreemption(t *testing.T) {
},
},
{
name: "Higher priority Pod takes over the place for higher priority Pod that is running the preemption",
name: "Higher priority Pod takes over the place for lower priority Pod that is running the preemption",
scenarios: []scenario{
{
name: "create scheduled Pod",
@ -864,11 +864,7 @@ func TestAsyncPreemption(t *testing.T) {
if ch, ok := preemptionDoneChannels[preemptor.Name]; ok {
<-ch
}
err := preemptPodFn(ctx, c, preemptor, victim, pluginName)
if err != nil {
return err
}
return nil
return preemptPodFn(ctx, c, preemptor, victim, pluginName)
}
return preemptionPlugin, nil
@ -944,17 +940,22 @@ func TestAsyncPreemption(t *testing.T) {
createdPods = append(createdPods, pod)
}
case scenario.schedulePod != nil:
lastFailure := ""
if err := wait.PollUntilContextTimeout(testCtx.Ctx, time.Millisecond*200, wait.ForeverTestTimeout, false, func(ctx context.Context) (bool, error) {
activePods := testCtx.Scheduler.SchedulingQueue.PodsInActiveQ()
return len(activePods) != 0, nil
}); err != nil {
t.Fatalf("Expected the pod %s to be scheduled, but no pod arrives at the activeQ", scenario.schedulePod.podName)
}
if len(testCtx.Scheduler.SchedulingQueue.PodsInActiveQ()) == 0 {
lastFailure = fmt.Sprintf("Expected the pod %s to be scheduled, but no pod arrives at the activeQ", scenario.schedulePod.podName)
return false, nil
}
if err := wait.PollUntilContextTimeout(testCtx.Ctx, time.Millisecond*200, wait.ForeverTestTimeout, false, func(ctx context.Context) (bool, error) {
return testCtx.Scheduler.SchedulingQueue.PodsInActiveQ()[0].Name == scenario.schedulePod.podName, nil
if testCtx.Scheduler.SchedulingQueue.PodsInActiveQ()[0].Name != scenario.schedulePod.podName {
// need to wait more because maybe the queue will get another Pod that higher priority than the current top pod.
lastFailure = fmt.Sprintf("The pod %s is expected to be scheduled, but the top Pod is %s", scenario.schedulePod.podName, testCtx.Scheduler.SchedulingQueue.PodsInActiveQ()[0].Name)
return false, nil
}
return true, nil
}); err != nil {
t.Fatalf("The pod %s is expected to be scheduled, but the top Pod is %s", scenario.schedulePod.podName, testCtx.Scheduler.SchedulingQueue.PodsInActiveQ()[0].Name)
t.Fatal(lastFailure)
}
preemptionDoneChannels[scenario.schedulePod.podName] = make(chan struct{})