diff --git a/pkg/scheduler/framework/preemption/preemption.go b/pkg/scheduler/framework/preemption/preemption.go index 4651a91022c..0ae9f65e030 100644 --- a/pkg/scheduler/framework/preemption/preemption.go +++ b/pkg/scheduler/framework/preemption/preemption.go @@ -30,6 +30,7 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" utilerrors "k8s.io/apimachinery/pkg/util/errors" + "k8s.io/apimachinery/pkg/util/sets" corelisters "k8s.io/client-go/listers/core/v1" policylisters "k8s.io/client-go/listers/policy/v1" corev1helpers "k8s.io/component-helpers/scheduling/corev1" @@ -133,7 +134,7 @@ type Evaluator struct { mu sync.RWMutex // preempting is a map that records the pods that are currently triggering preemption asynchronously, // which is used to prevent the pods from entering the scheduling cycle meanwhile. - preempting map[types.UID]struct{} + preempting sets.Set[types.UID] // PreemptPod is a function that actually makes API calls to preempt a specific Pod. // This is exposed to be replaced during tests. @@ -146,19 +147,60 @@ func NewEvaluator(pluginName string, fh framework.Handle, i Interface, enableAsy podLister := fh.SharedInformerFactory().Core().V1().Pods().Lister() pdbLister := fh.SharedInformerFactory().Policy().V1().PodDisruptionBudgets().Lister() - e := &Evaluator{ + ev := &Evaluator{ PluginName: names.DefaultPreemption, Handler: fh, PodLister: podLister, PdbLister: pdbLister, Interface: i, enableAsyncPreemption: enableAsyncPreemption, - preempting: make(map[types.UID]struct{}), + preempting: sets.New[types.UID](), } - e.PreemptPod = e.preemptPod + // PreemptPod actually makes API calls to preempt a specific Pod. + // + // We implement it here directly, rather than creating a separate method like ev.preemptPod(...) + // to prevent the misuse of the PreemptPod function. + ev.PreemptPod = func(ctx context.Context, c Candidate, preemptor, victim *v1.Pod, pluginName string) error { + logger := klog.FromContext(ctx) - return e + // If the victim is a WaitingPod, send a reject message to the PermitPlugin. + // Otherwise we should delete the victim. + if waitingPod := ev.Handler.GetWaitingPod(victim.UID); waitingPod != nil { + waitingPod.Reject(pluginName, "preempted") + logger.V(2).Info("Preemptor pod rejected a waiting pod", "preemptor", klog.KObj(preemptor), "waitingPod", klog.KObj(victim), "node", c.Name()) + } else { + condition := &v1.PodCondition{ + Type: v1.DisruptionTarget, + Status: v1.ConditionTrue, + Reason: v1.PodReasonPreemptionByScheduler, + Message: fmt.Sprintf("%s: preempting to accommodate a higher priority pod", preemptor.Spec.SchedulerName), + } + newStatus := victim.Status.DeepCopy() + updated := apipod.UpdatePodCondition(newStatus, condition) + if updated { + if err := util.PatchPodStatus(ctx, ev.Handler.ClientSet(), victim, newStatus); err != nil { + logger.Error(err, "Could not add DisruptionTarget condition due to preemption", "pod", klog.KObj(victim), "preemptor", klog.KObj(preemptor)) + return err + } + } + if err := util.DeletePod(ctx, ev.Handler.ClientSet(), victim); err != nil { + if !apierrors.IsNotFound(err) { + logger.Error(err, "Preempted pod", "pod", klog.KObj(victim), "preemptor", klog.KObj(preemptor)) + return err + } + logger.V(2).Info("Victim Pod is already deleted", "preemptor", klog.KObj(preemptor), "victim", klog.KObj(victim), "node", c.Name()) + return nil + } + logger.V(2).Info("Preemptor Pod preempted victim Pod", "preemptor", klog.KObj(preemptor), "victim", klog.KObj(victim), "node", c.Name()) + } + + ev.Handler.EventRecorder().Eventf(victim, preemptor, v1.EventTypeNormal, "Preempted", "Preempting", "Preempted by pod %v on node %v", preemptor.UID, c.Name()) + + return nil + } + + return ev } // IsPodRunningPreemption returns true if the pod is currently triggering preemption asynchronously. @@ -384,46 +426,6 @@ func (ev *Evaluator) SelectCandidate(ctx context.Context, candidates []Candidate return candidates[0] } -// preemptPod actually makes API calls to preempt a specific Pod. -func (ev *Evaluator) preemptPod(ctx context.Context, c Candidate, preemptor, victim *v1.Pod, pluginName string) error { - logger := klog.FromContext(ctx) - - // If the victim is a WaitingPod, send a reject message to the PermitPlugin. - // Otherwise we should delete the victim. - if waitingPod := ev.Handler.GetWaitingPod(victim.UID); waitingPod != nil { - waitingPod.Reject(pluginName, "preempted") - logger.V(2).Info("Preemptor pod rejected a waiting pod", "preemptor", klog.KObj(preemptor), "waitingPod", klog.KObj(victim), "node", c.Name()) - } else { - condition := &v1.PodCondition{ - Type: v1.DisruptionTarget, - Status: v1.ConditionTrue, - Reason: v1.PodReasonPreemptionByScheduler, - Message: fmt.Sprintf("%s: preempting to accommodate a higher priority pod", preemptor.Spec.SchedulerName), - } - newStatus := victim.Status.DeepCopy() - updated := apipod.UpdatePodCondition(newStatus, condition) - if updated { - if err := util.PatchPodStatus(ctx, ev.Handler.ClientSet(), victim, newStatus); err != nil { - logger.Error(err, "Could not add DisruptionTarget condition due to preemption", "pod", klog.KObj(victim), "preemptor", klog.KObj(preemptor)) - return err - } - } - if err := util.DeletePod(ctx, ev.Handler.ClientSet(), victim); err != nil { - if !apierrors.IsNotFound(err) { - logger.Error(err, "Preempted pod", "pod", klog.KObj(victim), "preemptor", klog.KObj(preemptor)) - return err - } - logger.V(2).Info("Victim Pod is already deleted", "preemptor", klog.KObj(preemptor), "victim", klog.KObj(victim), "node", c.Name()) - return nil - } - logger.V(2).Info("Preemptor Pod preempted victim Pod", "preemptor", klog.KObj(preemptor), "victim", klog.KObj(victim), "node", c.Name()) - } - - ev.Handler.EventRecorder().Eventf(victim, preemptor, v1.EventTypeNormal, "Preempted", "Preempting", "Preempted by pod %v on node %v", preemptor.UID, c.Name()) - - return nil -} - // prepareCandidate does some preparation work before nominating the selected candidate: // - Evict the victim pods // - Reject the victim pods if they are in waitingPod map diff --git a/test/integration/scheduler/preemption/preemption_test.go b/test/integration/scheduler/preemption/preemption_test.go index 1163e0233f2..e633267a515 100644 --- a/test/integration/scheduler/preemption/preemption_test.go +++ b/test/integration/scheduler/preemption/preemption_test.go @@ -669,7 +669,7 @@ func TestAsyncPreemption(t *testing.T) { }, }, { - name: "Higher priority Pod takes over the place for higher priority Pod that is running the preemption", + name: "Higher priority Pod takes over the place for lower priority Pod that is running the preemption", scenarios: []scenario{ { name: "create scheduled Pod", @@ -864,11 +864,7 @@ func TestAsyncPreemption(t *testing.T) { if ch, ok := preemptionDoneChannels[preemptor.Name]; ok { <-ch } - err := preemptPodFn(ctx, c, preemptor, victim, pluginName) - if err != nil { - return err - } - return nil + return preemptPodFn(ctx, c, preemptor, victim, pluginName) } return preemptionPlugin, nil @@ -944,17 +940,22 @@ func TestAsyncPreemption(t *testing.T) { createdPods = append(createdPods, pod) } case scenario.schedulePod != nil: + lastFailure := "" if err := wait.PollUntilContextTimeout(testCtx.Ctx, time.Millisecond*200, wait.ForeverTestTimeout, false, func(ctx context.Context) (bool, error) { - activePods := testCtx.Scheduler.SchedulingQueue.PodsInActiveQ() - return len(activePods) != 0, nil - }); err != nil { - t.Fatalf("Expected the pod %s to be scheduled, but no pod arrives at the activeQ", scenario.schedulePod.podName) - } + if len(testCtx.Scheduler.SchedulingQueue.PodsInActiveQ()) == 0 { + lastFailure = fmt.Sprintf("Expected the pod %s to be scheduled, but no pod arrives at the activeQ", scenario.schedulePod.podName) + return false, nil + } - if err := wait.PollUntilContextTimeout(testCtx.Ctx, time.Millisecond*200, wait.ForeverTestTimeout, false, func(ctx context.Context) (bool, error) { - return testCtx.Scheduler.SchedulingQueue.PodsInActiveQ()[0].Name == scenario.schedulePod.podName, nil + if testCtx.Scheduler.SchedulingQueue.PodsInActiveQ()[0].Name != scenario.schedulePod.podName { + // need to wait more because maybe the queue will get another Pod that higher priority than the current top pod. + lastFailure = fmt.Sprintf("The pod %s is expected to be scheduled, but the top Pod is %s", scenario.schedulePod.podName, testCtx.Scheduler.SchedulingQueue.PodsInActiveQ()[0].Name) + return false, nil + } + + return true, nil }); err != nil { - t.Fatalf("The pod %s is expected to be scheduled, but the top Pod is %s", scenario.schedulePod.podName, testCtx.Scheduler.SchedulingQueue.PodsInActiveQ()[0].Name) + t.Fatal(lastFailure) } preemptionDoneChannels[scenario.schedulePod.podName] = make(chan struct{})