diff --git a/pkg/features/kube_features.go b/pkg/features/kube_features.go index 55429319f4b..fe387e8f6a9 100644 --- a/pkg/features/kube_features.go +++ b/pkg/features/kube_features.go @@ -580,6 +580,14 @@ const ( // which benefits to reduce the useless requeueing. SchedulerQueueingHints featuregate.Feature = "SchedulerQueueingHints" + // owner: @sanposhiho + // kep: http://kep.k8s.io/4832 + // alpha: v1.32 + // + // Running some expensive operation within the scheduler's preemption asynchronously, + // which improves the scheduling latency when the preemption involves in. + SchedulerAsyncPreemption featuregate.Feature = "SchedulerAsyncPreemption" + // owner: @atosatto @yuanchen8911 // kep: http://kep.k8s.io/3902 // diff --git a/pkg/features/versioned_kube_features.go b/pkg/features/versioned_kube_features.go index ced4f0e944e..55f97ba6d80 100644 --- a/pkg/features/versioned_kube_features.go +++ b/pkg/features/versioned_kube_features.go @@ -633,6 +633,10 @@ var defaultVersionedKubernetesFeatureGates = map[featuregate.Feature]featuregate {Version: version.MustParse("1.29"), Default: false, PreRelease: featuregate.Alpha}, }, + SchedulerAsyncPreemption: { + {Version: version.MustParse("1.32"), Default: false, PreRelease: featuregate.Alpha}, + }, + SchedulerQueueingHints: { {Version: version.MustParse("1.28"), Default: false, PreRelease: featuregate.Beta}, {Version: version.MustParse("1.32"), Default: true, PreRelease: featuregate.Beta}, diff --git a/pkg/scheduler/apis/config/testing/defaults/defaults.go b/pkg/scheduler/apis/config/testing/defaults/defaults.go index c83668c9604..a24a498272d 100644 --- a/pkg/scheduler/apis/config/testing/defaults/defaults.go +++ b/pkg/scheduler/apis/config/testing/defaults/defaults.go @@ -52,6 +52,7 @@ var ExpandedPluginsV1 = &config.Plugins{ PreEnqueue: config.PluginSet{ Enabled: []config.Plugin{ {Name: names.SchedulingGates}, + {Name: names.DefaultPreemption}, }, }, QueueSort: config.PluginSet{ diff --git a/pkg/scheduler/backend/queue/scheduling_queue.go b/pkg/scheduler/backend/queue/scheduling_queue.go index 66e7334f61a..06f25037bb7 100644 --- a/pkg/scheduler/backend/queue/scheduling_queue.go +++ b/pkg/scheduler/backend/queue/scheduling_queue.go @@ -92,9 +92,11 @@ type PreEnqueueCheck func(pod *v1.Pod) bool type SchedulingQueue interface { framework.PodNominator Add(logger klog.Logger, pod *v1.Pod) - // Activate moves the given pods to activeQ iff they're in unschedulablePods or backoffQ. - // The passed-in pods are originally compiled from plugins that want to activate Pods, - // by injecting the pods through a reserved CycleState struct (PodsToActivate). + // Activate moves the given pods to activeQ. + // If a pod isn't found in unschedulablePods or backoffQ and it's in-flight, + // the wildcard event is registered so that the pod will be requeued when it comes back. + // But, if a pod isn't found in unschedulablePods or backoffQ and it's not in-flight (i.e., completely unknown pod), + // Activate would ignore the pod. Activate(logger klog.Logger, pods map[string]*v1.Pod) // AddUnschedulableIfNotPresent adds an unschedulable pod back to scheduling queue. // The podSchedulingCycle represents the current scheduling cycle number which can be @@ -411,9 +413,22 @@ func (p *PriorityQueue) isPodWorthRequeuing(logger klog.Logger, pInfo *framework } if event.IsWildCard() { + // If the wildcard event has a Pod in newObj, + // that indicates that the event wants to be effective for the Pod only. + // Specifically, EventForceActivate could have a target Pod in newObj. + if newObj != nil { + if pod, ok := newObj.(*v1.Pod); !ok || pod.UID != pInfo.Pod.UID { + // This wildcard event is not for this Pod. + if ok { + logger.V(6).Info("Not worth requeuing because the event is wildcard, but for another pod", "pod", klog.KObj(pInfo.Pod), "event", event.Label(), "newObj", klog.KObj(pod)) + } + return queueSkip + } + } + // If the wildcard event is special one as someone wants to force all Pods to move to activeQ/backoffQ. // We return queueAfterBackoff in this case, while resetting all blocked plugins. - logger.V(6).Info("Worth requeuing because the event is wildcard", "pod", klog.KObj(pInfo.Pod)) + logger.V(6).Info("Worth requeuing because the event is wildcard", "pod", klog.KObj(pInfo.Pod), "event", event.Label()) return queueAfterBackoff } @@ -590,7 +605,11 @@ func (p *PriorityQueue) Add(logger klog.Logger, pod *v1.Pod) { } } -// Activate moves the given pods to activeQ iff they're in unschedulablePods or backoffQ. +// Activate moves the given pods to activeQ. +// If a pod isn't found in unschedulablePods or backoffQ and it's in-flight, +// the wildcard event is registered so that the pod will be requeued when it comes back. +// But, if a pod isn't found in unschedulablePods or backoffQ and it's not in-flight (i.e., completely unknown pod), +// Activate would ignore the pod. func (p *PriorityQueue) Activate(logger klog.Logger, pods map[string]*v1.Pod) { p.lock.Lock() defer p.lock.Unlock() @@ -599,7 +618,15 @@ func (p *PriorityQueue) Activate(logger klog.Logger, pods map[string]*v1.Pod) { for _, pod := range pods { if p.activate(logger, pod) { activated = true + continue } + + // If this pod is in-flight, register the activation event (for when QHint is enabled) or update moveRequestCycle (for when QHints is disabled) + // so that the pod will be requeued when it comes back. + // Specifically in the in-tree plugins, this is for the scenario with the preemption plugin + // where the async preemption API calls are all done or fail at some point before the Pod comes back to the queue. + p.activeQ.addEventsIfPodInFlight(nil, pod, []framework.ClusterEvent{framework.EventForceActivate}) + p.moveRequestCycle = p.activeQ.schedulingCycle() } if activated { diff --git a/pkg/scheduler/backend/queue/scheduling_queue_test.go b/pkg/scheduler/backend/queue/scheduling_queue_test.go index 0f732af37e5..caf72131696 100644 --- a/pkg/scheduler/backend/queue/scheduling_queue_test.go +++ b/pkg/scheduler/backend/queue/scheduling_queue_test.go @@ -1294,13 +1294,17 @@ func TestPriorityQueue_Delete(t *testing.T) { } func TestPriorityQueue_Activate(t *testing.T) { + metrics.Register() tests := []struct { name string qPodInfoInUnschedulablePods []*framework.QueuedPodInfo qPodInfoInPodBackoffQ []*framework.QueuedPodInfo qPodInActiveQ []*v1.Pod qPodInfoToActivate *framework.QueuedPodInfo + qPodInInFlightPod *v1.Pod + expectedInFlightEvent *clusterEvent want []*framework.QueuedPodInfo + qHintEnabled bool }{ { name: "pod already in activeQ", @@ -1313,6 +1317,21 @@ func TestPriorityQueue_Activate(t *testing.T) { qPodInfoToActivate: &framework.QueuedPodInfo{PodInfo: highPriNominatedPodInfo}, want: []*framework.QueuedPodInfo{}, }, + { + name: "[QHint] pod not in unschedulablePods/podBackoffQ but in-flight", + qPodInfoToActivate: &framework.QueuedPodInfo{PodInfo: highPriNominatedPodInfo}, + qPodInInFlightPod: highPriNominatedPodInfo.Pod, + expectedInFlightEvent: &clusterEvent{oldObj: (*v1.Pod)(nil), newObj: highPriNominatedPodInfo.Pod, event: framework.EventForceActivate}, + want: []*framework.QueuedPodInfo{}, + qHintEnabled: true, + }, + { + name: "[QHint] pod not in unschedulablePods/podBackoffQ and not in-flight", + qPodInfoToActivate: &framework.QueuedPodInfo{PodInfo: highPriNominatedPodInfo}, + qPodInInFlightPod: medPriorityPodInfo.Pod, // different pod is in-flight + want: []*framework.QueuedPodInfo{}, + qHintEnabled: true, + }, { name: "pod in unschedulablePods", qPodInfoInUnschedulablePods: []*framework.QueuedPodInfo{{PodInfo: highPriNominatedPodInfo}}, @@ -1329,12 +1348,30 @@ func TestPriorityQueue_Activate(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { + featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.SchedulerQueueingHints, tt.qHintEnabled) var objs []runtime.Object logger, ctx := ktesting.NewTestContext(t) ctx, cancel := context.WithCancel(ctx) defer cancel() q := NewTestQueueWithObjects(ctx, newDefaultQueueSort(), objs) + if tt.qPodInInFlightPod != nil { + // Put -> Pop the Pod to make it registered in inFlightPods. + q.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) { + unlockedActiveQ.AddOrUpdate(newQueuedPodInfoForLookup(tt.qPodInInFlightPod)) + }) + p, err := q.activeQ.pop(logger) + if err != nil { + t.Fatalf("Pop failed: %v", err) + } + if p.Pod.Name != tt.qPodInInFlightPod.Name { + t.Errorf("Unexpected popped pod: %v", p.Pod.Name) + } + if len(q.activeQ.listInFlightEvents()) != 1 { + t.Fatal("Expected the pod to be recorded in in-flight events, but it doesn't") + } + } + // Prepare activeQ/unschedulablePods/podBackoffQ according to the table for _, qPod := range tt.qPodInActiveQ { q.Add(logger, qPod) @@ -1353,7 +1390,29 @@ func TestPriorityQueue_Activate(t *testing.T) { // Check the result after activation by the length of activeQ if wantLen := len(tt.want); q.activeQ.len() != wantLen { - t.Errorf("length compare: want %v, got %v", wantLen, q.activeQ.len()) + t.Fatalf("length compare: want %v, got %v", wantLen, q.activeQ.len()) + } + + if tt.expectedInFlightEvent != nil { + if len(q.activeQ.listInFlightEvents()) != 2 { + t.Fatalf("Expected two in-flight event to be recorded, but got %v events", len(q.activeQ.listInFlightEvents())) + } + found := false + for _, e := range q.activeQ.listInFlightEvents() { + event, ok := e.(*clusterEvent) + if !ok { + continue + } + + if d := cmp.Diff(tt.expectedInFlightEvent, event, cmpopts.EquateComparable(clusterEvent{})); d != "" { + t.Fatalf("Unexpected in-flight event (-want, +got):\n%s", d) + } + found = true + } + + if !found { + t.Fatalf("Expected in-flight event to be recorded, but it wasn't.") + } } // Check if the specific pod exists in activeQ @@ -3779,6 +3838,7 @@ func mustNewPodInfo(pod *v1.Pod) *framework.PodInfo { // Test_isPodWorthRequeuing tests isPodWorthRequeuing function. func Test_isPodWorthRequeuing(t *testing.T) { + metrics.Register() count := 0 queueHintReturnQueue := func(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) { count++ @@ -3857,11 +3917,37 @@ func Test_isPodWorthRequeuing(t *testing.T) { }, event: framework.EventUnschedulableTimeout, oldObj: nil, - newObj: st.MakeNode().Obj(), + newObj: nil, expected: queueAfterBackoff, expectedExecutionCount: 0, queueingHintMap: QueueingHintMapPerProfile{}, }, + { + name: "return Queue when the event is wildcard and the wildcard targets the pod to be requeued right now", + podInfo: &framework.QueuedPodInfo{ + UnschedulablePlugins: sets.New("fooPlugin1"), + PodInfo: mustNewPodInfo(st.MakePod().Name("pod1").Namespace("ns1").UID("1").Obj()), + }, + event: framework.EventForceActivate, + oldObj: nil, + newObj: st.MakePod().Name("pod1").Namespace("ns1").UID("1").Obj(), + expected: queueAfterBackoff, + expectedExecutionCount: 0, + queueingHintMap: QueueingHintMapPerProfile{}, + }, + { + name: "return Skip when the event is wildcard, but the wildcard targets a different pod", + podInfo: &framework.QueuedPodInfo{ + UnschedulablePlugins: sets.New("fooPlugin1"), + PodInfo: mustNewPodInfo(st.MakePod().Name("pod1").Namespace("ns1").UID("1").Obj()), + }, + event: framework.EventForceActivate, + oldObj: nil, + newObj: st.MakePod().Name("pod-different").Namespace("ns2").UID("2").Obj(), + expected: queueSkip, + expectedExecutionCount: 0, + queueingHintMap: QueueingHintMapPerProfile{}, + }, { name: "interprets Queue from the Pending plugin as queueImmediately", podInfo: &framework.QueuedPodInfo{ diff --git a/pkg/scheduler/framework/events.go b/pkg/scheduler/framework/events.go index e6b628028c5..910c254c520 100644 --- a/pkg/scheduler/framework/events.go +++ b/pkg/scheduler/framework/events.go @@ -34,6 +34,9 @@ const ( // ForceActivate is the event when a pod is moved from unschedulablePods/backoffQ // to activeQ. Usually it's triggered by plugin implementations. ForceActivate = "ForceActivate" + // UnschedulableTimeout is the event when a pod is moved from unschedulablePods + // due to the timeout specified at pod-max-in-unschedulable-pods-duration. + UnschedulableTimeout = "UnschedulableTimeout" ) var ( @@ -50,7 +53,9 @@ var ( // EventUnscheduledPodDelete is the event when an unscheduled pod is deleted. EventUnscheduledPodDelete = ClusterEvent{Resource: unschedulablePod, ActionType: Delete} // EventUnschedulableTimeout is the event when a pod stays in unschedulable for longer than timeout. - EventUnschedulableTimeout = ClusterEvent{Resource: WildCard, ActionType: All, label: "UnschedulableTimeout"} + EventUnschedulableTimeout = ClusterEvent{Resource: WildCard, ActionType: All, label: UnschedulableTimeout} + // EventForceActivate is the event when a pod is moved from unschedulablePods/backoffQ to activeQ. + EventForceActivate = ClusterEvent{Resource: WildCard, ActionType: All, label: ForceActivate} ) // PodSchedulingPropertiesChange interprets the update of a pod and returns corresponding UpdatePodXYZ event(s). diff --git a/pkg/scheduler/framework/interface.go b/pkg/scheduler/framework/interface.go index 285973e249f..433de93f2ad 100644 --- a/pkg/scheduler/framework/interface.go +++ b/pkg/scheduler/framework/interface.go @@ -770,6 +770,8 @@ type Framework interface { // SetPodNominator sets the PodNominator SetPodNominator(nominator PodNominator) + // SetPodActivator sets the PodActivator + SetPodActivator(activator PodActivator) // Close calls Close method of each plugin. Close() error @@ -783,6 +785,8 @@ type Handle interface { PodNominator // PluginsRunner abstracts operations to run some plugins. PluginsRunner + // PodActivator abstracts operations in the scheduling queue. + PodActivator // SnapshotSharedLister returns listers from the latest NodeInfo Snapshot. The snapshot // is taken at the beginning of a scheduling cycle and remains unchanged until // a pod finishes "Permit" point. @@ -896,6 +900,16 @@ func (ni *NominatingInfo) Mode() NominatingMode { return ni.NominatingMode } +// PodActivator abstracts operations in the scheduling queue. +type PodActivator interface { + // Activate moves the given pods to activeQ. + // If a pod isn't found in unschedulablePods or backoffQ and it's in-flight, + // the wildcard event is registered so that the pod will be requeued when it comes back. + // But, if a pod isn't found in unschedulablePods or backoffQ and it's not in-flight (i.e., completely unknown pod), + // Activate would ignore the pod. + Activate(logger klog.Logger, pods map[string]*v1.Pod) +} + // PodNominator abstracts operations to maintain nominated Pods. type PodNominator interface { // AddNominatedPod adds the given pod to the nominator or diff --git a/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption.go b/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption.go index 1d573973e3e..c9144cb175e 100644 --- a/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption.go +++ b/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption.go @@ -53,9 +53,11 @@ type DefaultPreemption struct { args config.DefaultPreemptionArgs podLister corelisters.PodLister pdbLister policylisters.PodDisruptionBudgetLister + Evaluator *preemption.Evaluator } var _ framework.PostFilterPlugin = &DefaultPreemption{} +var _ framework.PreEnqueuePlugin = &DefaultPreemption{} // Name returns name of the plugin. It is used in logs, etc. func (pl *DefaultPreemption) Name() string { @@ -71,13 +73,19 @@ func New(_ context.Context, dpArgs runtime.Object, fh framework.Handle, fts feat if err := validation.ValidateDefaultPreemptionArgs(nil, args); err != nil { return nil, err } + + podLister := fh.SharedInformerFactory().Core().V1().Pods().Lister() + pdbLister := getPDBLister(fh.SharedInformerFactory()) + pl := DefaultPreemption{ fh: fh, fts: fts, args: *args, - podLister: fh.SharedInformerFactory().Core().V1().Pods().Lister(), - pdbLister: getPDBLister(fh.SharedInformerFactory()), + podLister: podLister, + pdbLister: pdbLister, } + pl.Evaluator = preemption.NewEvaluator(Name, fh, &pl, fts.EnableAsyncPreemption) + return &pl, nil } @@ -87,16 +95,7 @@ func (pl *DefaultPreemption) PostFilter(ctx context.Context, state *framework.Cy metrics.PreemptionAttempts.Inc() }() - pe := preemption.Evaluator{ - PluginName: names.DefaultPreemption, - Handler: pl.fh, - PodLister: pl.podLister, - PdbLister: pl.pdbLister, - State: state, - Interface: pl, - } - - result, status := pe.Preempt(ctx, pod, m) + result, status := pl.Evaluator.Preempt(ctx, state, pod, m) msg := status.Message() if len(msg) > 0 { return result, framework.NewStatus(status.Code(), "preemption: "+msg) @@ -104,6 +103,24 @@ func (pl *DefaultPreemption) PostFilter(ctx context.Context, state *framework.Cy return result, status } +func (pl *DefaultPreemption) PreEnqueue(ctx context.Context, p *v1.Pod) *framework.Status { + if !pl.fts.EnableAsyncPreemption { + return nil + } + if pl.Evaluator.IsPodRunningPreemption(p.GetUID()) { + return framework.NewStatus(framework.UnschedulableAndUnresolvable, "waiting for the preemption for this pod to be finished") + } + return nil +} + +// EventsToRegister returns the possible events that may make a Pod +// failed by this plugin schedulable. +func (pl *DefaultPreemption) EventsToRegister(_ context.Context) ([]framework.ClusterEventWithHint, error) { + // The plugin moves the preemptor Pod to acviteQ/backoffQ once the preemption API calls are all done, + // and we don't need to move the Pod with any events. + return nil, nil +} + // calculateNumCandidates returns the number of candidates the FindCandidates // method must produce from dry running based on the constraints given by // and . The number of diff --git a/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption_test.go b/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption_test.go index 89192d4ffca..a1d87dcd064 100644 --- a/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption_test.go +++ b/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption_test.go @@ -25,6 +25,7 @@ import ( "math/rand" "sort" "strings" + "sync" "testing" "time" @@ -37,10 +38,12 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/strategicpatch" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/informers" clientsetfake "k8s.io/client-go/kubernetes/fake" clienttesting "k8s.io/client-go/testing" "k8s.io/client-go/tools/events" + "k8s.io/klog/v2" "k8s.io/klog/v2/ktesting" kubeschedulerconfigv1 "k8s.io/kube-scheduler/config/v1" extenderv1 "k8s.io/kube-scheduler/extender/v1" @@ -436,6 +439,7 @@ func TestPostFilter(t *testing.T) { pdbLister: getPDBLister(informerFactory), args: *getDefaultDefaultPreemptionArgs(), } + p.Evaluator = preemption.NewEvaluator(names.DefaultPreemption, f, &p, false) state := framework.NewCycleState() // Ensure is populated. @@ -1206,11 +1210,10 @@ func TestDryRunPreemption(t *testing.T) { Handler: pl.fh, PodLister: pl.podLister, PdbLister: pl.pdbLister, - State: state, Interface: pl, } offset, numCandidates := pl.GetOffsetAndNumCandidates(int32(len(nodeInfos))) - got, _, _ := pe.DryRunPreemption(ctx, pod, nodeInfos, tt.pdbs, offset, numCandidates) + got, _, _ := pe.DryRunPreemption(ctx, state, pod, nodeInfos, tt.pdbs, offset, numCandidates) // Sort the values (inner victims) and the candidate itself (by its NominatedNodeName). for i := range got { victims := got[i].Victims().Pods @@ -1447,11 +1450,10 @@ func TestSelectBestCandidate(t *testing.T) { Handler: pl.fh, PodLister: pl.podLister, PdbLister: pl.pdbLister, - State: state, Interface: pl, } offset, numCandidates := pl.GetOffsetAndNumCandidates(int32(len(nodeInfos))) - candidates, _, _ := pe.DryRunPreemption(ctx, tt.pod, nodeInfos, nil, offset, numCandidates) + candidates, _, _ := pe.DryRunPreemption(ctx, state, tt.pod, nodeInfos, nil, offset, numCandidates) s := pe.SelectCandidate(ctx, candidates) if s == nil || len(s.Name()) == 0 { return @@ -1548,7 +1550,9 @@ func TestPodEligibleToPreemptOthers(t *testing.T) { }) } } + func TestPreempt(t *testing.T) { + metrics.Register() tests := []struct { name string pod *v1.Pod @@ -1713,197 +1717,235 @@ func TestPreempt(t *testing.T) { } labelKeys := []string{"hostname", "zone", "region"} - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - client := clientsetfake.NewClientset() - informerFactory := informers.NewSharedInformerFactory(client, 0) - podInformer := informerFactory.Core().V1().Pods().Informer() - podInformer.GetStore().Add(test.pod) - for i := range test.pods { - podInformer.GetStore().Add(test.pods[i]) - } - - deletedPodNames := sets.New[string]() - patchedPodNames := sets.New[string]() - patchedPods := []*v1.Pod{} - client.PrependReactor("patch", "pods", func(action clienttesting.Action) (bool, runtime.Object, error) { - patchAction := action.(clienttesting.PatchAction) - podName := patchAction.GetName() - namespace := patchAction.GetNamespace() - patch := patchAction.GetPatch() - pod, err := informerFactory.Core().V1().Pods().Lister().Pods(namespace).Get(podName) - if err != nil { - t.Fatalf("Failed to get the original pod %s/%s before patching: %v\n", namespace, podName, err) - } - marshalledPod, err := json.Marshal(pod) - if err != nil { - t.Fatalf("Failed to marshal the original pod %s/%s: %v", namespace, podName, err) - } - updated, err := strategicpatch.StrategicMergePatch(marshalledPod, patch, v1.Pod{}) - if err != nil { - t.Fatalf("Failed to apply strategic merge patch %q on pod %#v: %v", patch, marshalledPod, err) - } - updatedPod := &v1.Pod{} - if err := json.Unmarshal(updated, updatedPod); err != nil { - t.Fatalf("Failed to unmarshal updated pod %q: %v", updated, err) - } - patchedPods = append(patchedPods, updatedPod) - patchedPodNames.Insert(podName) - return true, nil, nil - }) - client.PrependReactor("delete", "pods", func(action clienttesting.Action) (bool, runtime.Object, error) { - deletedPodNames.Insert(action.(clienttesting.DeleteAction).GetName()) - return true, nil, nil - }) - - logger, ctx := ktesting.NewTestContext(t) - ctx, cancel := context.WithCancel(ctx) - defer cancel() - - waitingPods := frameworkruntime.NewWaitingPodsMap() - - cache := internalcache.New(ctx, time.Duration(0)) - for _, pod := range test.pods { - cache.AddPod(logger, pod) - } - cachedNodeInfoMap := map[string]*framework.NodeInfo{} - nodes := make([]*v1.Node, len(test.nodeNames)) - for i, name := range test.nodeNames { - node := st.MakeNode().Name(name).Capacity(veryLargeRes).Obj() - // Split node name by '/' to form labels in a format of - // {"hostname": node.Name[0], "zone": node.Name[1], "region": node.Name[2]} - node.ObjectMeta.Labels = make(map[string]string) - for i, label := range strings.Split(node.Name, "/") { - node.ObjectMeta.Labels[labelKeys[i]] = label - } - node.Name = node.ObjectMeta.Labels["hostname"] - cache.AddNode(logger, node) - nodes[i] = node - - // Set nodeInfo to extenders to mock extenders' cache for preemption. - cachedNodeInfo := framework.NewNodeInfo() - cachedNodeInfo.SetNode(node) - cachedNodeInfoMap[node.Name] = cachedNodeInfo - } - var extenders []framework.Extender - for _, extender := range test.extenders { - // Set nodeInfoMap as extenders cached node information. - extender.CachedNodeNameToInfo = cachedNodeInfoMap - extenders = append(extenders, extender) - } - fwk, err := tf.NewFramework( - ctx, - []tf.RegisterPluginFunc{ - test.registerPlugin, - tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), - tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), - }, - "", - frameworkruntime.WithClientSet(client), - frameworkruntime.WithEventRecorder(&events.FakeRecorder{}), - frameworkruntime.WithExtenders(extenders), - frameworkruntime.WithPodNominator(internalqueue.NewSchedulingQueue(nil, informerFactory)), - frameworkruntime.WithSnapshotSharedLister(internalcache.NewSnapshot(test.pods, nodes)), - frameworkruntime.WithInformerFactory(informerFactory), - frameworkruntime.WithWaitingPods(waitingPods), - frameworkruntime.WithLogger(logger), - ) - if err != nil { - t.Fatal(err) - } - - state := framework.NewCycleState() - // Some tests rely on PreFilter plugin to compute its CycleState. - if _, s, _ := fwk.RunPreFilterPlugins(ctx, state, test.pod); !s.IsSuccess() { - t.Errorf("Unexpected preFilterStatus: %v", s) - } - // Call preempt and check the expected results. - pl := DefaultPreemption{ - fh: fwk, - podLister: informerFactory.Core().V1().Pods().Lister(), - pdbLister: getPDBLister(informerFactory), - args: *getDefaultDefaultPreemptionArgs(), - } - - pe := preemption.Evaluator{ - PluginName: names.DefaultPreemption, - Handler: pl.fh, - PodLister: pl.podLister, - PdbLister: pl.pdbLister, - State: state, - Interface: &pl, - } - - // so that these nodes are eligible for preemption, we set their status - // to Unschedulable. - - nodeToStatusMap := framework.NewDefaultNodeToStatus() - for _, n := range nodes { - nodeToStatusMap.Set(n.Name, framework.NewStatus(framework.Unschedulable)) - } - - res, status := pe.Preempt(ctx, test.pod, nodeToStatusMap) - if !status.IsSuccess() && !status.IsRejected() { - t.Errorf("unexpected error in preemption: %v", status.AsError()) - } - if diff := cmp.Diff(test.want, res); diff != "" { - t.Errorf("Unexpected status (-want, +got):\n%s", diff) - } - if len(deletedPodNames) != len(test.expectedPods) { - t.Errorf("expected %v pods, got %v.", len(test.expectedPods), len(deletedPodNames)) - } - if diff := cmp.Diff(sets.List(patchedPodNames), sets.List(deletedPodNames)); diff != "" { - t.Errorf("unexpected difference in the set of patched and deleted pods: %s", diff) - } - - // Make sure that the DisruptionTarget condition has been added to the pod status - for _, patchedPod := range patchedPods { - expectedPodCondition := &v1.PodCondition{ - Type: v1.DisruptionTarget, - Status: v1.ConditionTrue, - Reason: v1.PodReasonPreemptionByScheduler, - Message: fmt.Sprintf("%s: preempting to accommodate a higher priority pod", patchedPod.Spec.SchedulerName), + for _, asyncPreemptionEnabled := range []bool{true, false} { + for _, test := range tests { + t.Run(fmt.Sprintf("%s (Async preemption enabled: %v)", test.name, asyncPreemptionEnabled), func(t *testing.T) { + client := clientsetfake.NewClientset() + informerFactory := informers.NewSharedInformerFactory(client, 0) + podInformer := informerFactory.Core().V1().Pods().Informer() + testPod := test.pod.DeepCopy() + testPods := make([]*v1.Pod, len(test.pods)) + for i := range test.pods { + testPods[i] = test.pods[i].DeepCopy() } - _, condition := apipod.GetPodCondition(&patchedPod.Status, v1.DisruptionTarget) - if diff := cmp.Diff(condition, expectedPodCondition, cmpopts.IgnoreFields(v1.PodCondition{}, "LastTransitionTime")); diff != "" { - t.Fatalf("unexpected difference in the pod %q DisruptionTarget condition: %s", patchedPod.Name, diff) + if err := podInformer.GetStore().Add(testPod); err != nil { + t.Fatalf("Failed to add test pod %s: %v", testPod.Name, err) } - } - - for victimName := range deletedPodNames { - found := false - for _, expPod := range test.expectedPods { - if expPod == victimName { - found = true - break + for i := range testPods { + if err := podInformer.GetStore().Add(testPods[i]); err != nil { + t.Fatalf("Failed to add test pod %s: %v", testPods[i], err) } } - if !found { - t.Errorf("pod %v is not expected to be a victim.", victimName) - } - } - if res != nil && res.NominatingInfo != nil { - test.pod.Status.NominatedNodeName = res.NominatedNodeName - } - // Manually set the deleted Pods' deletionTimestamp to non-nil. - for _, pod := range test.pods { - if deletedPodNames.Has(pod.Name) { - now := metav1.Now() - pod.DeletionTimestamp = &now - deletedPodNames.Delete(pod.Name) - } - } + // Need to protect deletedPodNames and patchedPodNames to prevent DATA RACE panic. + var mu sync.RWMutex + deletedPodNames := sets.New[string]() + patchedPodNames := sets.New[string]() + patchedPods := []*v1.Pod{} + client.PrependReactor("patch", "pods", func(action clienttesting.Action) (bool, runtime.Object, error) { + patchAction := action.(clienttesting.PatchAction) + podName := patchAction.GetName() + namespace := patchAction.GetNamespace() + patch := patchAction.GetPatch() + pod, err := informerFactory.Core().V1().Pods().Lister().Pods(namespace).Get(podName) + if err != nil { + t.Fatalf("Failed to get the original pod %s/%s before patching: %v\n", namespace, podName, err) + } + marshalledPod, err := json.Marshal(pod) + if err != nil { + t.Fatalf("Failed to marshal the original pod %s/%s: %v", namespace, podName, err) + } + updated, err := strategicpatch.StrategicMergePatch(marshalledPod, patch, v1.Pod{}) + if err != nil { + t.Fatalf("Failed to apply strategic merge patch %q on pod %#v: %v", patch, marshalledPod, err) + } + updatedPod := &v1.Pod{} + if err := json.Unmarshal(updated, updatedPod); err != nil { + t.Fatalf("Failed to unmarshal updated pod %q: %v", updated, err) + } + patchedPods = append(patchedPods, updatedPod) + mu.Lock() + defer mu.Unlock() + patchedPodNames.Insert(podName) + return true, nil, nil + }) + client.PrependReactor("delete", "pods", func(action clienttesting.Action) (bool, runtime.Object, error) { + mu.Lock() + defer mu.Unlock() + deletedPodNames.Insert(action.(clienttesting.DeleteAction).GetName()) + return true, nil, nil + }) - // Call preempt again and make sure it doesn't preempt any more pods. - res, status = pe.Preempt(ctx, test.pod, framework.NewDefaultNodeToStatus()) - if !status.IsSuccess() && !status.IsRejected() { - t.Errorf("unexpected error in preemption: %v", status.AsError()) - } - if res != nil && res.NominatingInfo != nil && len(deletedPodNames) > 0 { - t.Errorf("didn't expect any more preemption. Node %v is selected for preemption.", res.NominatedNodeName) - } - }) + logger, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + waitingPods := frameworkruntime.NewWaitingPodsMap() + + cache := internalcache.New(ctx, time.Duration(0)) + for _, pod := range testPods { + if err := cache.AddPod(logger, pod.DeepCopy()); err != nil { + t.Fatalf("Failed to add pod %s: %v", pod.Name, err) + } + } + cachedNodeInfoMap := map[string]*framework.NodeInfo{} + nodes := make([]*v1.Node, len(test.nodeNames)) + for i, name := range test.nodeNames { + node := st.MakeNode().Name(name).Capacity(veryLargeRes).Obj() + // Split node name by '/' to form labels in a format of + // {"hostname": node.Name[0], "zone": node.Name[1], "region": node.Name[2]} + node.ObjectMeta.Labels = make(map[string]string) + for i, label := range strings.Split(node.Name, "/") { + node.ObjectMeta.Labels[labelKeys[i]] = label + } + node.Name = node.ObjectMeta.Labels["hostname"] + t.Logf("node is added: %v. labels: %#v", node.Name, node.ObjectMeta.Labels) + cache.AddNode(logger, node) + nodes[i] = node + + // Set nodeInfo to extenders to mock extenders' cache for preemption. + cachedNodeInfo := framework.NewNodeInfo() + cachedNodeInfo.SetNode(node) + cachedNodeInfoMap[node.Name] = cachedNodeInfo + } + var extenders []framework.Extender + for _, extender := range test.extenders { + // Set nodeInfoMap as extenders cached node information. + extender.CachedNodeNameToInfo = cachedNodeInfoMap + extenders = append(extenders, extender) + } + fwk, err := tf.NewFramework( + ctx, + []tf.RegisterPluginFunc{ + test.registerPlugin, + tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), + tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), + }, + "", + frameworkruntime.WithClientSet(client), + frameworkruntime.WithEventRecorder(&events.FakeRecorder{}), + frameworkruntime.WithExtenders(extenders), + frameworkruntime.WithPodNominator(internalqueue.NewSchedulingQueue(nil, informerFactory)), + frameworkruntime.WithSnapshotSharedLister(internalcache.NewSnapshot(testPods, nodes)), + frameworkruntime.WithInformerFactory(informerFactory), + frameworkruntime.WithWaitingPods(waitingPods), + frameworkruntime.WithLogger(logger), + frameworkruntime.WithPodActivator(&fakePodActivator{}), + ) + if err != nil { + t.Fatal(err) + } + + state := framework.NewCycleState() + // Some tests rely on PreFilter plugin to compute its CycleState. + if _, s, _ := fwk.RunPreFilterPlugins(ctx, state, testPod); !s.IsSuccess() { + t.Errorf("Unexpected preFilterStatus: %v", s) + } + // Call preempt and check the expected results. + pl := DefaultPreemption{ + fh: fwk, + podLister: informerFactory.Core().V1().Pods().Lister(), + pdbLister: getPDBLister(informerFactory), + args: *getDefaultDefaultPreemptionArgs(), + } + + pe := preemption.NewEvaluator(names.DefaultPreemption, pl.fh, &pl, asyncPreemptionEnabled) + + // so that these nodes are eligible for preemption, we set their status + // to Unschedulable. + + nodeToStatusMap := framework.NewDefaultNodeToStatus() + for _, n := range nodes { + nodeToStatusMap.Set(n.Name, framework.NewStatus(framework.Unschedulable)) + } + + res, status := pe.Preempt(ctx, state, testPod, nodeToStatusMap) + if !status.IsSuccess() && !status.IsRejected() { + t.Errorf("unexpected error in preemption: %v", status.AsError()) + } + if diff := cmp.Diff(test.want, res); diff != "" { + t.Errorf("Unexpected status (-want, +got):\n%s", diff) + } + + if asyncPreemptionEnabled { + // Wait for the pod to be deleted. + if err := wait.PollUntilContextTimeout(ctx, time.Millisecond*200, wait.ForeverTestTimeout, false, func(ctx context.Context) (bool, error) { + mu.RLock() + defer mu.RUnlock() + return len(deletedPodNames) == len(test.expectedPods), nil + }); err != nil { + t.Errorf("expected %v pods to be deleted, got %v.", len(test.expectedPods), len(deletedPodNames)) + } + } else { + mu.RLock() + // If async preemption is disabled, the pod should be deleted immediately. + if len(deletedPodNames) != len(test.expectedPods) { + t.Errorf("expected %v pods to be deleted, got %v.", len(test.expectedPods), len(deletedPodNames)) + } + mu.RUnlock() + } + + mu.RLock() + if diff := cmp.Diff(sets.List(patchedPodNames), sets.List(deletedPodNames)); diff != "" { + t.Errorf("unexpected difference in the set of patched and deleted pods: %s", diff) + } + + // Make sure that the DisruptionTarget condition has been added to the pod status + for _, patchedPod := range patchedPods { + expectedPodCondition := &v1.PodCondition{ + Type: v1.DisruptionTarget, + Status: v1.ConditionTrue, + Reason: v1.PodReasonPreemptionByScheduler, + Message: fmt.Sprintf("%s: preempting to accommodate a higher priority pod", patchedPod.Spec.SchedulerName), + } + + _, condition := apipod.GetPodCondition(&patchedPod.Status, v1.DisruptionTarget) + if diff := cmp.Diff(condition, expectedPodCondition, cmpopts.IgnoreFields(v1.PodCondition{}, "LastTransitionTime")); diff != "" { + t.Fatalf("unexpected difference in the pod %q DisruptionTarget condition: %s", patchedPod.Name, diff) + } + } + + for victimName := range deletedPodNames { + found := false + for _, expPod := range test.expectedPods { + if expPod == victimName { + found = true + break + } + } + if !found { + t.Errorf("pod %v is not expected to be a victim.", victimName) + } + } + if res != nil && res.NominatingInfo != nil { + testPod.Status.NominatedNodeName = res.NominatedNodeName + } + + // Manually set the deleted Pods' deletionTimestamp to non-nil. + for _, pod := range testPods { + if deletedPodNames.Has(pod.Name) { + now := metav1.Now() + pod.DeletionTimestamp = &now + deletedPodNames.Delete(pod.Name) + } + } + mu.RUnlock() + + // Call preempt again and make sure it doesn't preempt any more pods. + res, status = pe.Preempt(ctx, state, testPod, framework.NewDefaultNodeToStatus()) + if !status.IsSuccess() && !status.IsRejected() { + t.Errorf("unexpected error in preemption: %v", status.AsError()) + } + if res != nil && res.NominatingInfo != nil && len(deletedPodNames) > 0 { + t.Errorf("didn't expect any more preemption. Node %v is selected for preemption.", res.NominatedNodeName) + } + }) + } } } + +type fakePodActivator struct { +} + +func (f *fakePodActivator) Activate(logger klog.Logger, pods map[string]*v1.Pod) {} diff --git a/pkg/scheduler/framework/plugins/feature/feature.go b/pkg/scheduler/framework/plugins/feature/feature.go index 219e9c3e746..ff1920cca57 100644 --- a/pkg/scheduler/framework/plugins/feature/feature.go +++ b/pkg/scheduler/framework/plugins/feature/feature.go @@ -28,4 +28,5 @@ type Features struct { EnableInPlacePodVerticalScaling bool EnableSidecarContainers bool EnableSchedulingQueueHint bool + EnableAsyncPreemption bool } diff --git a/pkg/scheduler/framework/plugins/registry.go b/pkg/scheduler/framework/plugins/registry.go index 6a2e2edbe66..009b6c89967 100644 --- a/pkg/scheduler/framework/plugins/registry.go +++ b/pkg/scheduler/framework/plugins/registry.go @@ -54,6 +54,7 @@ func NewInTreeRegistry() runtime.Registry { EnableInPlacePodVerticalScaling: feature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling), EnableSidecarContainers: feature.DefaultFeatureGate.Enabled(features.SidecarContainers), EnableSchedulingQueueHint: feature.DefaultFeatureGate.Enabled(features.SchedulerQueueingHints), + EnableAsyncPreemption: feature.DefaultFeatureGate.Enabled(features.SchedulerAsyncPreemption), } registry := runtime.Registry{ diff --git a/pkg/scheduler/framework/preemption/preemption.go b/pkg/scheduler/framework/preemption/preemption.go index 5fa8a139385..3b5c8daff41 100644 --- a/pkg/scheduler/framework/preemption/preemption.go +++ b/pkg/scheduler/framework/preemption/preemption.go @@ -23,11 +23,15 @@ import ( "math" "sync" "sync/atomic" + "time" v1 "k8s.io/api/core/v1" policy "k8s.io/api/policy/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/types" utilerrors "k8s.io/apimachinery/pkg/util/errors" + "k8s.io/apimachinery/pkg/util/sets" corelisters "k8s.io/client-go/listers/core/v1" policylisters "k8s.io/client-go/listers/policy/v1" corev1helpers "k8s.io/component-helpers/scheduling/corev1" @@ -36,6 +40,7 @@ import ( apipod "k8s.io/kubernetes/pkg/api/v1/pod" "k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework/parallelize" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/names" "k8s.io/kubernetes/pkg/scheduler/metrics" "k8s.io/kubernetes/pkg/scheduler/util" ) @@ -125,10 +130,88 @@ type Evaluator struct { Handler framework.Handle PodLister corelisters.PodLister PdbLister policylisters.PodDisruptionBudgetLister - State *framework.CycleState + + enableAsyncPreemption bool + mu sync.RWMutex + // preempting is a set that records the pods that are currently triggering preemption asynchronously, + // which is used to prevent the pods from entering the scheduling cycle meanwhile. + preempting sets.Set[types.UID] + + // PreemptPod is a function that actually makes API calls to preempt a specific Pod. + // This is exposed to be replaced during tests. + PreemptPod func(ctx context.Context, c Candidate, preemptor, victim *v1.Pod, pluginName string) error + Interface } +func NewEvaluator(pluginName string, fh framework.Handle, i Interface, enableAsyncPreemption bool) *Evaluator { + podLister := fh.SharedInformerFactory().Core().V1().Pods().Lister() + pdbLister := fh.SharedInformerFactory().Policy().V1().PodDisruptionBudgets().Lister() + + ev := &Evaluator{ + PluginName: names.DefaultPreemption, + Handler: fh, + PodLister: podLister, + PdbLister: pdbLister, + Interface: i, + enableAsyncPreemption: enableAsyncPreemption, + preempting: sets.New[types.UID](), + } + + // PreemptPod actually makes API calls to preempt a specific Pod. + // + // We implement it here directly, rather than creating a separate method like ev.preemptPod(...) + // to prevent the misuse of the PreemptPod function. + ev.PreemptPod = func(ctx context.Context, c Candidate, preemptor, victim *v1.Pod, pluginName string) error { + logger := klog.FromContext(ctx) + + // If the victim is a WaitingPod, send a reject message to the PermitPlugin. + // Otherwise we should delete the victim. + if waitingPod := ev.Handler.GetWaitingPod(victim.UID); waitingPod != nil { + waitingPod.Reject(pluginName, "preempted") + logger.V(2).Info("Preemptor pod rejected a waiting pod", "preemptor", klog.KObj(preemptor), "waitingPod", klog.KObj(victim), "node", c.Name()) + } else { + condition := &v1.PodCondition{ + Type: v1.DisruptionTarget, + Status: v1.ConditionTrue, + Reason: v1.PodReasonPreemptionByScheduler, + Message: fmt.Sprintf("%s: preempting to accommodate a higher priority pod", preemptor.Spec.SchedulerName), + } + newStatus := victim.Status.DeepCopy() + updated := apipod.UpdatePodCondition(newStatus, condition) + if updated { + if err := util.PatchPodStatus(ctx, ev.Handler.ClientSet(), victim, newStatus); err != nil { + logger.Error(err, "Could not add DisruptionTarget condition due to preemption", "pod", klog.KObj(victim), "preemptor", klog.KObj(preemptor)) + return err + } + } + if err := util.DeletePod(ctx, ev.Handler.ClientSet(), victim); err != nil { + if !apierrors.IsNotFound(err) { + logger.Error(err, "Tried to preempted pod", "pod", klog.KObj(victim), "preemptor", klog.KObj(preemptor)) + return err + } + logger.V(2).Info("Victim Pod is already deleted", "preemptor", klog.KObj(preemptor), "victim", klog.KObj(victim), "node", c.Name()) + return nil + } + logger.V(2).Info("Preemptor Pod preempted victim Pod", "preemptor", klog.KObj(preemptor), "victim", klog.KObj(victim), "node", c.Name()) + } + + ev.Handler.EventRecorder().Eventf(victim, preemptor, v1.EventTypeNormal, "Preempted", "Preempting", "Preempted by pod %v on node %v", preemptor.UID, c.Name()) + + return nil + } + + return ev +} + +// IsPodRunningPreemption returns true if the pod is currently triggering preemption asynchronously. +func (ev *Evaluator) IsPodRunningPreemption(podUID types.UID) bool { + ev.mu.RLock() + defer ev.mu.RUnlock() + + return ev.preempting.Has(podUID) +} + // Preempt returns a PostFilterResult carrying suggested nominatedNodeName, along with a Status. // The semantics of returned varies on different scenarios: // @@ -145,7 +228,7 @@ type Evaluator struct { // // - . It's the regular happy path // and the non-empty nominatedNodeName will be applied to the preemptor pod. -func (ev *Evaluator) Preempt(ctx context.Context, pod *v1.Pod, m framework.NodeToStatusReader) (*framework.PostFilterResult, *framework.Status) { +func (ev *Evaluator) Preempt(ctx context.Context, state *framework.CycleState, pod *v1.Pod, m framework.NodeToStatusReader) (*framework.PostFilterResult, *framework.Status) { logger := klog.FromContext(ctx) // 0) Fetch the latest version of . @@ -171,7 +254,7 @@ func (ev *Evaluator) Preempt(ctx context.Context, pod *v1.Pod, m framework.NodeT if err != nil { return nil, framework.AsStatus(err) } - candidates, nodeToStatusMap, err := ev.findCandidates(ctx, allNodes, pod, m) + candidates, nodeToStatusMap, err := ev.findCandidates(ctx, state, allNodes, pod, m) if err != nil && len(candidates) == 0 { return nil, framework.AsStatus(err) } @@ -203,9 +286,15 @@ func (ev *Evaluator) Preempt(ctx context.Context, pod *v1.Pod, m framework.NodeT return nil, framework.NewStatus(framework.Unschedulable, "no candidate node for preemption") } + logger.V(2).Info("the target node for the preemption is determined", "node", bestCandidate.Name(), "pod", klog.KObj(pod)) + // 5) Perform preparation work before nominating the selected candidate. - if status := ev.prepareCandidate(ctx, bestCandidate, pod, ev.PluginName); !status.IsSuccess() { - return nil, status + if ev.enableAsyncPreemption { + ev.prepareCandidateAsync(bestCandidate, pod, ev.PluginName) + } else { + if status := ev.prepareCandidate(ctx, bestCandidate, pod, ev.PluginName); !status.IsSuccess() { + return nil, status + } } return framework.NewPostFilterResultWithNominatedNode(bestCandidate.Name()), framework.NewStatus(framework.Success) @@ -213,7 +302,7 @@ func (ev *Evaluator) Preempt(ctx context.Context, pod *v1.Pod, m framework.NodeT // FindCandidates calculates a slice of preemption candidates. // Each candidate is executable to make the given schedulable. -func (ev *Evaluator) findCandidates(ctx context.Context, allNodes []*framework.NodeInfo, pod *v1.Pod, m framework.NodeToStatusReader) ([]Candidate, *framework.NodeToStatus, error) { +func (ev *Evaluator) findCandidates(ctx context.Context, state *framework.CycleState, allNodes []*framework.NodeInfo, pod *v1.Pod, m framework.NodeToStatusReader) ([]Candidate, *framework.NodeToStatus, error) { if len(allNodes) == 0 { return nil, nil, errors.New("no nodes available") } @@ -239,7 +328,7 @@ func (ev *Evaluator) findCandidates(ctx context.Context, allNodes []*framework.N } offset, candidatesNum := ev.GetOffsetAndNumCandidates(int32(len(potentialNodes))) - return ev.DryRunPreemption(ctx, pod, potentialNodes, pdbs, offset, candidatesNum) + return ev.DryRunPreemption(ctx, state, pod, potentialNodes, pdbs, offset, candidatesNum) } // callExtenders calls given to select the list of feasible candidates. @@ -347,41 +436,11 @@ func (ev *Evaluator) prepareCandidate(ctx context.Context, c Candidate, pod *v1. defer cancel() logger := klog.FromContext(ctx) errCh := parallelize.NewErrorChannel() - preemptPod := func(index int) { - victim := c.Victims().Pods[index] - // If the victim is a WaitingPod, send a reject message to the PermitPlugin. - // Otherwise we should delete the victim. - if waitingPod := fh.GetWaitingPod(victim.UID); waitingPod != nil { - waitingPod.Reject(pluginName, "preempted") - logger.V(2).Info("Preemptor pod rejected a waiting pod", "preemptor", klog.KObj(pod), "waitingPod", klog.KObj(victim), "node", c.Name()) - } else { - condition := &v1.PodCondition{ - Type: v1.DisruptionTarget, - Status: v1.ConditionTrue, - Reason: v1.PodReasonPreemptionByScheduler, - Message: fmt.Sprintf("%s: preempting to accommodate a higher priority pod", pod.Spec.SchedulerName), - } - newStatus := victim.Status.DeepCopy() - updated := apipod.UpdatePodCondition(newStatus, condition) - if updated { - if err := util.PatchPodStatus(ctx, cs, victim, newStatus); err != nil { - logger.Error(err, "Could not add DisruptionTarget condition due to preemption", "pod", klog.KObj(victim), "preemptor", klog.KObj(pod)) - errCh.SendErrorWithCancel(err, cancel) - return - } - } - if err := util.DeletePod(ctx, cs, victim); err != nil { - logger.Error(err, "Preempted pod", "pod", klog.KObj(victim), "preemptor", klog.KObj(pod)) - errCh.SendErrorWithCancel(err, cancel) - return - } - logger.V(2).Info("Preemptor Pod preempted victim Pod", "preemptor", klog.KObj(pod), "victim", klog.KObj(victim), "node", c.Name()) + fh.Parallelizer().Until(ctx, len(c.Victims().Pods), func(index int) { + if err := ev.PreemptPod(ctx, c, pod, c.Victims().Pods[index], pluginName); err != nil { + errCh.SendErrorWithCancel(err, cancel) } - - fh.EventRecorder().Eventf(victim, pod, v1.EventTypeNormal, "Preempted", "Preempting", "Preempted by pod %v on node %v", pod.UID, c.Name()) - } - - fh.Parallelizer().Until(ctx, len(c.Victims().Pods), preemptPod, ev.PluginName) + }, ev.PluginName) if err := errCh.ReceiveError(); err != nil { return framework.AsStatus(err) } @@ -401,6 +460,91 @@ func (ev *Evaluator) prepareCandidate(ctx context.Context, c Candidate, pod *v1. return nil } +// prepareCandidateAsync triggers a goroutine for some preparation work: +// - Evict the victim pods +// - Reject the victim pods if they are in waitingPod map +// - Clear the low-priority pods' nominatedNodeName status if needed +// The Pod won't be retried until the goroutine triggered here completes. +// +// See http://kep.k8s.io/4832 for how the async preemption works. +func (ev *Evaluator) prepareCandidateAsync(c Candidate, pod *v1.Pod, pluginName string) { + metrics.PreemptionVictims.Observe(float64(len(c.Victims().Pods))) + + // Intentionally create a new context, not using a ctx from the scheduling cycle, to create ctx, + // because this process could continue even after this scheduling cycle finishes. + ctx, cancel := context.WithCancel(context.Background()) + errCh := parallelize.NewErrorChannel() + preemptPod := func(index int) { + victim := c.Victims().Pods[index] + if err := ev.PreemptPod(ctx, c, pod, victim, pluginName); err != nil { + errCh.SendErrorWithCancel(err, cancel) + } + } + + ev.mu.Lock() + ev.preempting.Insert(pod.UID) + ev.mu.Unlock() + + logger := klog.FromContext(ctx) + go func() { + startTime := time.Now() + result := metrics.GoroutineResultSuccess + defer metrics.PreemptionGoroutinesDuration.WithLabelValues(result).Observe(metrics.SinceInSeconds(startTime)) + defer metrics.PreemptionGoroutinesExecutionTotal.WithLabelValues(result).Inc() + defer func() { + if result == metrics.GoroutineResultError { + // When API call isn't successful, the Pod may get stuck in the unschedulable pod pool in the worst case. + // So, we should move the Pod to the activeQ. + ev.Handler.Activate(logger, map[string]*v1.Pod{pod.Name: pod}) + } + }() + defer cancel() + logger.V(2).Info("Start the preemption asynchronously", "preemptor", klog.KObj(pod), "node", c.Name(), "numVictims", len(c.Victims().Pods)) + + // Lower priority pods nominated to run on this node, may no longer fit on + // this node. So, we should remove their nomination. Removing their + // nomination updates these pods and moves them to the active queue. It + // lets scheduler find another place for them. + nominatedPods := getLowerPriorityNominatedPods(logger, ev.Handler, pod, c.Name()) + if err := util.ClearNominatedNodeName(ctx, ev.Handler.ClientSet(), nominatedPods...); err != nil { + logger.Error(err, "Cannot clear 'NominatedNodeName' field from lower priority pods on the same target node", "node", c.Name()) + result = metrics.GoroutineResultError + // We do not return as this error is not critical. + } + + if len(c.Victims().Pods) == 0 { + ev.mu.Lock() + delete(ev.preempting, pod.UID) + ev.mu.Unlock() + + return + } + + // We can evict all victims in parallel, but the last one. + // We have to remove the pod from the preempting map before the last one is evicted + // because, otherwise, the pod removal might be notified to the scheduling queue before + // we remove this pod from the preempting map, + // and the pod could end up stucking at the unschedulable pod pool + // by all the pod removal events being ignored. + ev.Handler.Parallelizer().Until(ctx, len(c.Victims().Pods)-1, preemptPod, ev.PluginName) + if err := errCh.ReceiveError(); err != nil { + logger.Error(err, "Error occurred during async preemption") + result = metrics.GoroutineResultError + } + + ev.mu.Lock() + delete(ev.preempting, pod.UID) + ev.mu.Unlock() + + if err := ev.PreemptPod(ctx, c, pod, c.Victims().Pods[len(c.Victims().Pods)-1], pluginName); err != nil { + logger.Error(err, "Error occurred during async preemption") + result = metrics.GoroutineResultError + } + + logger.V(2).Info("Async Preemption finished completely", "preemptor", klog.KObj(pod), "node", c.Name(), "result", result) + }() +} + func getPodDisruptionBudgets(pdbLister policylisters.PodDisruptionBudgetLister) ([]*policy.PodDisruptionBudget, error) { if pdbLister != nil { return pdbLister.List(labels.Everything()) @@ -538,7 +682,7 @@ func getLowerPriorityNominatedPods(logger klog.Logger, pn framework.PodNominator // The number of candidates depends on the constraints defined in the plugin's args. In the returned list of // candidates, ones that do not violate PDB are preferred over ones that do. // NOTE: This method is exported for easier testing in default preemption. -func (ev *Evaluator) DryRunPreemption(ctx context.Context, pod *v1.Pod, potentialNodes []*framework.NodeInfo, +func (ev *Evaluator) DryRunPreemption(ctx context.Context, state *framework.CycleState, pod *v1.Pod, potentialNodes []*framework.NodeInfo, pdbs []*policy.PodDisruptionBudget, offset int32, candidatesNum int32) ([]Candidate, *framework.NodeToStatus, error) { fh := ev.Handler @@ -557,7 +701,7 @@ func (ev *Evaluator) DryRunPreemption(ctx context.Context, pod *v1.Pod, potentia nodeInfoCopy := potentialNodes[(int(offset)+i)%len(potentialNodes)].Snapshot() logger.V(5).Info("Check the potential node for preemption", "node", nodeInfoCopy.Node().Name) - stateCopy := ev.State.Clone() + stateCopy := state.Clone() pods, numPDBViolations, status := ev.SelectVictimsOnNode(ctx, stateCopy, pod, nodeInfoCopy, pdbs) if status.IsSuccess() && len(pods) != 0 { victims := extenderv1.Victims{ diff --git a/pkg/scheduler/framework/preemption/preemption_test.go b/pkg/scheduler/framework/preemption/preemption_test.go index 467671bc669..22473be4f8f 100644 --- a/pkg/scheduler/framework/preemption/preemption_test.go +++ b/pkg/scheduler/framework/preemption/preemption_test.go @@ -20,18 +20,26 @@ import ( "context" "errors" "fmt" + "reflect" "sort" + "sync" "testing" + "time" "github.com/google/go-cmp/cmp" v1 "k8s.io/api/core/v1" policy "k8s.io/api/policy/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/informers" clientsetfake "k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/kubernetes/scheme" + clienttesting "k8s.io/client-go/testing" "k8s.io/client-go/tools/events" + "k8s.io/klog/v2" "k8s.io/klog/v2/ktesting" extenderv1 "k8s.io/kube-scheduler/extender/v1" internalcache "k8s.io/kubernetes/pkg/scheduler/backend/cache" @@ -81,6 +89,19 @@ func (pl *FakePostFilterPlugin) OrderedScoreFuncs(ctx context.Context, nodesToVi return nil } +type fakePodActivator struct { + activatedPods map[string]*v1.Pod + mu *sync.RWMutex +} + +func (f *fakePodActivator) Activate(logger klog.Logger, pods map[string]*v1.Pod) { + f.mu.Lock() + defer f.mu.Unlock() + for name, pod := range pods { + f.activatedPods[name] = pod + } +} + type FakePreemptionScorePostFilterPlugin struct{} func (pl *FakePreemptionScorePostFilterPlugin) SelectVictimsOnNode( @@ -243,9 +264,8 @@ func TestDryRunPreemption(t *testing.T) { PluginName: "FakePostFilter", Handler: fwk, Interface: fakePostPlugin, - State: state, } - got, _, _ := pe.DryRunPreemption(ctx, pod, nodeInfos, nil, 0, int32(len(nodeInfos))) + got, _, _ := pe.DryRunPreemption(ctx, state, pod, nodeInfos, nil, 0, int32(len(nodeInfos))) // Sort the values (inner victims) and the candidate itself (by its NominatedNodeName). for i := range got { victims := got[i].Victims().Pods @@ -344,9 +364,8 @@ func TestSelectCandidate(t *testing.T) { PluginName: "FakePreemptionScorePostFilter", Handler: fwk, Interface: fakePreemptionScorePostFilterPlugin, - State: state, } - candidates, _, _ := pe.DryRunPreemption(ctx, pod, nodeInfos, nil, 0, int32(len(nodeInfos))) + candidates, _, _ := pe.DryRunPreemption(ctx, state, pod, nodeInfos, nil, 0, int32(len(nodeInfos))) s := pe.SelectCandidate(ctx, candidates) if s == nil || len(s.Name()) == 0 { t.Errorf("expect any node in %v, but no candidate selected", tt.expected) @@ -393,6 +412,11 @@ func TestPrepareCandidate(t *testing.T) { Containers([]v1.Container{st.MakeContainer().Name("container1").Obj()}). Obj() + failVictim = st.MakePod().Name("fail-victim").UID("victim1"). + Node(node1Name).SchedulerName(defaultSchedulerName).Priority(midPriority). + Containers([]v1.Container{st.MakeContainer().Name("container1").Obj()}). + Obj() + victim2 = st.MakePod().Name("victim2").UID("victim2"). Node(node1Name).SchedulerName(defaultSchedulerName).Priority(50000). Containers([]v1.Container{st.MakeContainer().Name("container1").Obj()}). @@ -404,6 +428,12 @@ func TestPrepareCandidate(t *testing.T) { Containers([]v1.Container{st.MakeContainer().Name("container1").Obj()}). Obj() + failVictim1WithMatchingCondition = st.MakePod().Name("fail-victim").UID("victim1"). + Node(node1Name).SchedulerName(defaultSchedulerName).Priority(midPriority). + Conditions([]v1.PodCondition{condition}). + Containers([]v1.Container{st.MakeContainer().Name("container1").Obj()}). + Obj() + preemptor = st.MakePod().Name("preemptor").UID("preemptor"). SchedulerName(defaultSchedulerName).Priority(highPriority). Containers([]v1.Container{st.MakeContainer().Name("container1").Obj()}). @@ -411,15 +441,23 @@ func TestPrepareCandidate(t *testing.T) { ) tests := []struct { - name string - nodeNames []string - candidate *fakeCandidate - preemptor *v1.Pod - testPods []*v1.Pod + name string + nodeNames []string + candidate *fakeCandidate + preemptor *v1.Pod + testPods []*v1.Pod + expectedDeletedPods []string + expectedDeletionError bool + expectedPatchError bool + // Only compared when async preemption is disabled. expectedStatus *framework.Status + // Only compared when async preemption is enabled. + expectedPreemptingMap sets.Set[types.UID] + expectedActivatedPods map[string]*v1.Pod }{ { name: "no victims", + candidate: &fakeCandidate{ victims: &extenderv1.Victims{}, }, @@ -427,11 +465,13 @@ func TestPrepareCandidate(t *testing.T) { testPods: []*v1.Pod{ victim1, }, - nodeNames: []string{node1Name}, - expectedStatus: nil, + nodeNames: []string{node1Name}, + expectedStatus: nil, + expectedPreemptingMap: sets.New(types.UID("preemptor")), }, { name: "one victim without condition", + candidate: &fakeCandidate{ name: node1Name, victims: &extenderv1.Victims{ @@ -444,11 +484,14 @@ func TestPrepareCandidate(t *testing.T) { testPods: []*v1.Pod{ victim1, }, - nodeNames: []string{node1Name}, - expectedStatus: nil, + nodeNames: []string{node1Name}, + expectedDeletedPods: []string{"victim1"}, + expectedStatus: nil, + expectedPreemptingMap: sets.New(types.UID("preemptor")), }, { name: "one victim with same condition", + candidate: &fakeCandidate{ name: node1Name, victims: &extenderv1.Victims{ @@ -461,11 +504,14 @@ func TestPrepareCandidate(t *testing.T) { testPods: []*v1.Pod{ victim1WithMatchingCondition, }, - nodeNames: []string{node1Name}, - expectedStatus: nil, + nodeNames: []string{node1Name}, + expectedDeletedPods: []string{"victim1"}, + expectedStatus: nil, + expectedPreemptingMap: sets.New(types.UID("preemptor")), }, { - name: "one victim, but patch pod failed (not found victim1 pod)", + name: "one victim, not-found victim error is ignored when patching", + candidate: &fakeCandidate{ name: node1Name, victims: &extenderv1.Victims{ @@ -474,13 +520,35 @@ func TestPrepareCandidate(t *testing.T) { }, }, }, - preemptor: preemptor, - testPods: []*v1.Pod{}, - nodeNames: []string{node1Name}, - expectedStatus: framework.AsStatus(errors.New("patch pod status failed")), + preemptor: preemptor, + testPods: []*v1.Pod{}, + nodeNames: []string{node1Name}, + expectedDeletedPods: []string{"victim1"}, + expectedStatus: nil, + expectedPreemptingMap: sets.New(types.UID("preemptor")), }, { - name: "one victim, but delete pod failed (not found victim1 pod)", + name: "one victim, but pod deletion failed", + + candidate: &fakeCandidate{ + name: node1Name, + victims: &extenderv1.Victims{ + Pods: []*v1.Pod{ + failVictim1WithMatchingCondition, + }, + }, + }, + preemptor: preemptor, + testPods: []*v1.Pod{}, + expectedDeletionError: true, + nodeNames: []string{node1Name}, + expectedStatus: framework.AsStatus(errors.New("delete pod failed")), + expectedPreemptingMap: sets.New(types.UID("preemptor")), + expectedActivatedPods: map[string]*v1.Pod{preemptor.Name: preemptor}, + }, + { + name: "one victim, not-found victim error is ignored when deleting", + candidate: &fakeCandidate{ name: node1Name, victims: &extenderv1.Victims{ @@ -489,18 +557,40 @@ func TestPrepareCandidate(t *testing.T) { }, }, }, - preemptor: preemptor, - testPods: []*v1.Pod{}, - nodeNames: []string{node1Name}, - expectedStatus: framework.AsStatus(errors.New("delete pod failed")), + preemptor: preemptor, + testPods: []*v1.Pod{}, + nodeNames: []string{node1Name}, + expectedDeletedPods: []string{"victim1"}, + expectedStatus: nil, + expectedPreemptingMap: sets.New(types.UID("preemptor")), }, { - name: "two victims without condition, one passes successfully and the second fails (not found victim2 pod)", + name: "one victim, but patch pod failed", + candidate: &fakeCandidate{ name: node1Name, victims: &extenderv1.Victims{ Pods: []*v1.Pod{ - victim1, + failVictim, + }, + }, + }, + preemptor: preemptor, + testPods: []*v1.Pod{}, + expectedPatchError: true, + nodeNames: []string{node1Name}, + expectedStatus: framework.AsStatus(errors.New("patch pod status failed")), + expectedPreemptingMap: sets.New(types.UID("preemptor")), + expectedActivatedPods: map[string]*v1.Pod{preemptor.Name: preemptor}, + }, + { + name: "two victims without condition, one passes successfully and the second fails", + + candidate: &fakeCandidate{ + name: node1Name, + victims: &extenderv1.Victims{ + Pods: []*v1.Pod{ + failVictim, victim2, }, }, @@ -509,70 +599,157 @@ func TestPrepareCandidate(t *testing.T) { testPods: []*v1.Pod{ victim1, }, - nodeNames: []string{node1Name}, - expectedStatus: framework.AsStatus(errors.New("patch pod status failed")), + nodeNames: []string{node1Name}, + expectedPatchError: true, + expectedDeletedPods: []string{"victim2"}, + expectedStatus: framework.AsStatus(errors.New("patch pod status failed")), + expectedPreemptingMap: sets.New(types.UID("preemptor")), + expectedActivatedPods: map[string]*v1.Pod{preemptor.Name: preemptor}, }, } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - metrics.Register() - logger, ctx := ktesting.NewTestContext(t) - ctx, cancel := context.WithCancel(ctx) - defer cancel() + for _, asyncPreemptionEnabled := range []bool{true, false} { + for _, tt := range tests { + t.Run(fmt.Sprintf("%v (Async preemption enabled: %v)", tt.name, asyncPreemptionEnabled), func(t *testing.T) { + metrics.Register() + logger, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + defer cancel() - nodes := make([]*v1.Node, len(tt.nodeNames)) - for i, nodeName := range tt.nodeNames { - nodes[i] = st.MakeNode().Name(nodeName).Capacity(veryLargeRes).Obj() - } - registeredPlugins := append([]tf.RegisterPluginFunc{ - tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New)}, - tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), - ) - var objs []runtime.Object - for _, pod := range tt.testPods { - objs = append(objs, pod) - } - cs := clientsetfake.NewClientset(objs...) - informerFactory := informers.NewSharedInformerFactory(cs, 0) - eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: cs.EventsV1()}) - fwk, err := tf.NewFramework( - ctx, - registeredPlugins, "", - frameworkruntime.WithClientSet(cs), - frameworkruntime.WithLogger(logger), - frameworkruntime.WithInformerFactory(informerFactory), - frameworkruntime.WithWaitingPods(frameworkruntime.NewWaitingPodsMap()), - frameworkruntime.WithSnapshotSharedLister(internalcache.NewSnapshot(tt.testPods, nodes)), - frameworkruntime.WithPodNominator(internalqueue.NewSchedulingQueue(nil, informerFactory)), - frameworkruntime.WithEventRecorder(eventBroadcaster.NewRecorder(scheme.Scheme, "test-scheduler")), - ) - if err != nil { - t.Fatal(err) - } - informerFactory.Start(ctx.Done()) - informerFactory.WaitForCacheSync(ctx.Done()) - fakePreemptionScorePostFilterPlugin := &FakePreemptionScorePostFilterPlugin{} - pe := Evaluator{ - PluginName: "FakePreemptionScorePostFilter", - Handler: fwk, - Interface: fakePreemptionScorePostFilterPlugin, - State: framework.NewCycleState(), - } + nodes := make([]*v1.Node, len(tt.nodeNames)) + for i, nodeName := range tt.nodeNames { + nodes[i] = st.MakeNode().Name(nodeName).Capacity(veryLargeRes).Obj() + } + registeredPlugins := append([]tf.RegisterPluginFunc{ + tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New)}, + tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), + ) + var objs []runtime.Object + for _, pod := range tt.testPods { + objs = append(objs, pod) + } - status := pe.prepareCandidate(ctx, tt.candidate, tt.preemptor, "test-plugin") - if tt.expectedStatus == nil { - if status != nil { - t.Errorf("expect nil status, but got %v", status) + requestStopper := make(chan struct{}) + mu := &sync.RWMutex{} + deletedPods := sets.New[string]() + deletionFailure := false // whether any request to delete pod failed + patchFailure := false // whether any request to patch pod status failed + + cs := clientsetfake.NewClientset(objs...) + cs.PrependReactor("delete", "pods", func(action clienttesting.Action) (bool, runtime.Object, error) { + <-requestStopper + mu.Lock() + defer mu.Unlock() + name := action.(clienttesting.DeleteAction).GetName() + if name == "fail-victim" { + deletionFailure = true + return true, nil, fmt.Errorf("delete pod failed") + } + + deletedPods.Insert(name) + return true, nil, nil + }) + + cs.PrependReactor("patch", "pods", func(action clienttesting.Action) (bool, runtime.Object, error) { + <-requestStopper + mu.Lock() + defer mu.Unlock() + if action.(clienttesting.PatchAction).GetName() == "fail-victim" { + patchFailure = true + return true, nil, fmt.Errorf("patch pod status failed") + } + return true, nil, nil + }) + + informerFactory := informers.NewSharedInformerFactory(cs, 0) + eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: cs.EventsV1()}) + fakeActivator := &fakePodActivator{activatedPods: make(map[string]*v1.Pod), mu: mu} + fwk, err := tf.NewFramework( + ctx, + registeredPlugins, "", + frameworkruntime.WithClientSet(cs), + frameworkruntime.WithLogger(logger), + frameworkruntime.WithInformerFactory(informerFactory), + frameworkruntime.WithWaitingPods(frameworkruntime.NewWaitingPodsMap()), + frameworkruntime.WithSnapshotSharedLister(internalcache.NewSnapshot(tt.testPods, nodes)), + frameworkruntime.WithPodNominator(internalqueue.NewSchedulingQueue(nil, informerFactory)), + frameworkruntime.WithEventRecorder(eventBroadcaster.NewRecorder(scheme.Scheme, "test-scheduler")), + frameworkruntime.WithPodActivator(fakeActivator), + ) + if err != nil { + t.Fatal(err) } - } else { - if status == nil { - t.Errorf("expect status %v, but got nil", tt.expectedStatus) - } else if status.Code() != tt.expectedStatus.Code() { - t.Errorf("expect status code %v, but got %v", tt.expectedStatus.Code(), status.Code()) + informerFactory.Start(ctx.Done()) + informerFactory.WaitForCacheSync(ctx.Done()) + fakePreemptionScorePostFilterPlugin := &FakePreemptionScorePostFilterPlugin{} + pe := NewEvaluator("FakePreemptionScorePostFilter", fwk, fakePreemptionScorePostFilterPlugin, asyncPreemptionEnabled) + + if asyncPreemptionEnabled { + pe.prepareCandidateAsync(tt.candidate, tt.preemptor, "test-plugin") + pe.mu.Lock() + // The preempting map should be registered synchronously + // so we don't need wait.Poll. + if !tt.expectedPreemptingMap.Equal(pe.preempting) { + t.Errorf("expected preempting map %v, got %v", tt.expectedPreemptingMap, pe.preempting) + close(requestStopper) + pe.mu.Unlock() + return + } + pe.mu.Unlock() + // make the requests complete + close(requestStopper) + } else { + close(requestStopper) // no need to stop requests + status := pe.prepareCandidate(ctx, tt.candidate, tt.preemptor, "test-plugin") + if tt.expectedStatus == nil { + if status != nil { + t.Errorf("expect nil status, but got %v", status) + } + } else { + if status == nil { + t.Errorf("expect status %v, but got nil", tt.expectedStatus) + } else if status.Code() != tt.expectedStatus.Code() { + t.Errorf("expect status code %v, but got %v", tt.expectedStatus.Code(), status.Code()) + } else if status.Message() != tt.expectedStatus.Message() { + t.Errorf("expect status message %v, but got %v", tt.expectedStatus.Message(), status.Message()) + } + } } - } - }) + + var lastErrMsg string + if err := wait.PollUntilContextTimeout(ctx, time.Millisecond*200, wait.ForeverTestTimeout, false, func(ctx context.Context) (bool, error) { + mu.RLock() + defer mu.RUnlock() + if !deletedPods.Equal(sets.New(tt.expectedDeletedPods...)) { + lastErrMsg = fmt.Sprintf("expected deleted pods %v, got %v", tt.expectedDeletedPods, deletedPods.UnsortedList()) + return false, nil + } + if tt.expectedDeletionError != deletionFailure { + lastErrMsg = fmt.Sprintf("expected deletion error %v, got %v", tt.expectedDeletionError, deletionFailure) + return false, nil + } + if tt.expectedPatchError != patchFailure { + lastErrMsg = fmt.Sprintf("expected patch error %v, got %v", tt.expectedPatchError, patchFailure) + return false, nil + } + + if asyncPreemptionEnabled { + if tt.expectedActivatedPods != nil && !reflect.DeepEqual(tt.expectedActivatedPods, fakeActivator.activatedPods) { + lastErrMsg = fmt.Sprintf("expected activated pods %v, got %v", tt.expectedActivatedPods, fakeActivator.activatedPods) + return false, nil + } + if tt.expectedActivatedPods == nil && len(fakeActivator.activatedPods) != 0 { + lastErrMsg = fmt.Sprintf("expected no activated pods, got %v", fakeActivator.activatedPods) + return false, nil + } + } + + return true, nil + }); err != nil { + t.Fatal(lastErrMsg) + } + }) + } } } @@ -812,7 +989,6 @@ func TestCallExtenders(t *testing.T) { PluginName: "FakePreemptionScorePostFilter", Handler: fwk, Interface: fakePreemptionScorePostFilterPlugin, - State: framework.NewCycleState(), } gotCandidates, status := pe.callExtenders(logger, preemptor, tt.candidates) if (tt.wantStatus == nil) != (status == nil) || status.Code() != tt.wantStatus.Code() { diff --git a/pkg/scheduler/framework/runtime/framework.go b/pkg/scheduler/framework/runtime/framework.go index cbe3bcee3d6..0fe2a02577d 100644 --- a/pkg/scheduler/framework/runtime/framework.go +++ b/pkg/scheduler/framework/runtime/framework.go @@ -84,6 +84,7 @@ type frameworkImpl struct { extenders []framework.Extender framework.PodNominator + framework.PodActivator parallelizer parallelize.Parallelizer } @@ -131,6 +132,7 @@ type frameworkOptions struct { snapshotSharedLister framework.SharedLister metricsRecorder *metrics.MetricAsyncRecorder podNominator framework.PodNominator + podActivator framework.PodActivator extenders []framework.Extender captureProfile CaptureProfile parallelizer parallelize.Parallelizer @@ -200,6 +202,12 @@ func WithPodNominator(nominator framework.PodNominator) Option { } } +func WithPodActivator(activator framework.PodActivator) Option { + return func(o *frameworkOptions) { + o.podActivator = activator + } +} + // WithExtenders sets extenders for the scheduling frameworkImpl. func WithExtenders(extenders []framework.Extender) Option { return func(o *frameworkOptions) { @@ -279,6 +287,7 @@ func NewFramework(ctx context.Context, r Registry, profile *config.KubeScheduler metricsRecorder: options.metricsRecorder, extenders: options.extenders, PodNominator: options.podNominator, + PodActivator: options.podActivator, parallelizer: options.parallelizer, logger: logger, } @@ -427,6 +436,10 @@ func (f *frameworkImpl) SetPodNominator(n framework.PodNominator) { f.PodNominator = n } +func (f *frameworkImpl) SetPodActivator(a framework.PodActivator) { + f.PodActivator = a +} + // Close closes each plugin, when they implement io.Closer interface. func (f *frameworkImpl) Close() error { var errs []error diff --git a/pkg/scheduler/framework/types.go b/pkg/scheduler/framework/types.go index c0dedaf5340..4e63acf95c4 100644 --- a/pkg/scheduler/framework/types.go +++ b/pkg/scheduler/framework/types.go @@ -296,7 +296,7 @@ func (ce ClusterEvent) Label() string { // AllClusterEventLabels returns all possible cluster event labels given to the metrics. func AllClusterEventLabels() []string { - labels := []string{EventUnschedulableTimeout.Label()} + labels := []string{UnschedulableTimeout, ForceActivate} for _, r := range allResources { for _, a := range basicActionTypes { labels = append(labels, ClusterEvent{Resource: r, ActionType: a}.Label()) diff --git a/pkg/scheduler/metrics/metrics.go b/pkg/scheduler/metrics/metrics.go index 84a1bb9e7e5..7fc649793b7 100644 --- a/pkg/scheduler/metrics/metrics.go +++ b/pkg/scheduler/metrics/metrics.go @@ -40,6 +40,11 @@ const ( Binding = "binding" ) +const ( + GoroutineResultSuccess = "success" + GoroutineResultError = "error" +) + // ExtentionPoints is a list of possible values for the extension_point label. var ExtentionPoints = []string{ PreFilter, @@ -105,14 +110,21 @@ var ( FrameworkExtensionPointDuration *metrics.HistogramVec PluginExecutionDuration *metrics.HistogramVec - // This is only available when the QHint feature gate is enabled. + PermitWaitDuration *metrics.HistogramVec + CacheSize *metrics.GaugeVec + unschedulableReasons *metrics.GaugeVec + PluginEvaluationTotal *metrics.CounterVec + + // The below two are only available when the QHint feature gate is enabled. queueingHintExecutionDuration *metrics.HistogramVec SchedulerQueueIncomingPods *metrics.CounterVec - PermitWaitDuration *metrics.HistogramVec - CacheSize *metrics.GaugeVec - unschedulableReasons *metrics.GaugeVec - PluginEvaluationTotal *metrics.CounterVec - metricsList []metrics.Registerable + + // The below two are only available when the async-preemption feature gate is enabled. + PreemptionGoroutinesDuration *metrics.HistogramVec + PreemptionGoroutinesExecutionTotal *metrics.CounterVec + + // metricsList is a list of all metrics that should be registered always, regardless of any feature gate's value. + metricsList []metrics.Registerable ) var registerMetrics sync.Once @@ -123,11 +135,14 @@ func Register() { registerMetrics.Do(func() { InitMetrics() RegisterMetrics(metricsList...) - if utilfeature.DefaultFeatureGate.Enabled(features.SchedulerQueueingHints) { - RegisterMetrics(queueingHintExecutionDuration) - RegisterMetrics(InFlightEvents) - } volumebindingmetrics.RegisterVolumeSchedulingMetrics() + + if utilfeature.DefaultFeatureGate.Enabled(features.SchedulerQueueingHints) { + RegisterMetrics(queueingHintExecutionDuration, InFlightEvents) + } + if utilfeature.DefaultFeatureGate.Enabled(features.SchedulerAsyncPreemption) { + RegisterMetrics(PreemptionGoroutinesDuration, PreemptionGoroutinesExecutionTotal) + } }) } @@ -317,6 +332,25 @@ func InitMetrics() { StabilityLevel: metrics.ALPHA, }, []string{"plugin", "extension_point", "profile"}) + PreemptionGoroutinesDuration = metrics.NewHistogramVec( + &metrics.HistogramOpts{ + Subsystem: SchedulerSubsystem, + Name: "preemption_goroutines_duration_seconds", + Help: "Duration in seconds for running goroutines for the preemption.", + Buckets: metrics.ExponentialBuckets(0.01, 2, 20), + StabilityLevel: metrics.ALPHA, + }, + []string{"result"}) + + PreemptionGoroutinesExecutionTotal = metrics.NewCounterVec( + &metrics.CounterOpts{ + Subsystem: SchedulerSubsystem, + Name: "preemption_goroutines_execution_total", + Help: "Number of preemption goroutines executed.", + StabilityLevel: metrics.ALPHA, + }, + []string{"result"}) + metricsList = []metrics.Registerable{ scheduleAttempts, schedulingLatency, diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 37591bd29b0..e0b6ba644da 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -355,6 +355,7 @@ func New(ctx context.Context, for _, fwk := range profiles { fwk.SetPodNominator(podQueue) + fwk.SetPodActivator(podQueue) } schedulerCache := internalcache.New(ctx, durationToExpireAssumedPod) diff --git a/pkg/scheduler/testing/wrappers.go b/pkg/scheduler/testing/wrappers.go index 3a6fcccfd7b..a38b272100b 100644 --- a/pkg/scheduler/testing/wrappers.go +++ b/pkg/scheduler/testing/wrappers.go @@ -310,6 +310,12 @@ func (p *PodWrapper) Name(s string) *PodWrapper { return p } +// Name sets `s` as the name of the inner pod. +func (p *PodWrapper) GenerateName(s string) *PodWrapper { + p.SetGenerateName(s) + return p +} + // UID sets `s` as the UID of the inner pod. func (p *PodWrapper) UID(s string) *PodWrapper { p.SetUID(types.UID(s)) diff --git a/test/e2e/feature/feature.go b/test/e2e/feature/feature.go index 16b318be01d..d703a15da48 100644 --- a/test/e2e/feature/feature.go +++ b/test/e2e/feature/feature.go @@ -312,6 +312,10 @@ var ( // TODO: document the feature (owning SIG, when to use this feature for a test) RegularResourceUsageTracking = framework.WithFeature(framework.ValidFeatures.Add("RegularResourceUsageTracking")) + // Owner: sig-scheduling + // Marks tests of the asynchronous preemption (KEP-4832) that require the `SchedulerAsyncPreemption` feature gate. + SchedulerAsyncPreemption = framework.WithFeature(framework.ValidFeatures.Add("SchedulerAsyncPreemption")) + // Owner: sig-network // Marks tests that require a pod networking implementation that supports SCTP // traffic between pods. diff --git a/test/e2e/scheduling/preemption.go b/test/e2e/scheduling/preemption.go index a3668c94fd4..28909c3b1b9 100644 --- a/test/e2e/scheduling/preemption.go +++ b/test/e2e/scheduling/preemption.go @@ -43,6 +43,7 @@ import ( clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" "k8s.io/kubernetes/pkg/apis/scheduling" + "k8s.io/kubernetes/test/e2e/feature" "k8s.io/kubernetes/test/e2e/framework" e2enode "k8s.io/kubernetes/test/e2e/framework/node" e2epod "k8s.io/kubernetes/test/e2e/framework/pod" @@ -305,6 +306,157 @@ var _ = SIGDescribe("SchedulerPreemption", framework.WithSerial(), func() { } }) + /* + Release: v1.32 + Testname: Scheduler runs the preemption with various priority classes expectedly + Description: When there are Pods with various priority classes running the preemption, + the scheduler must prioritize the Pods with the higher priority class. + */ + framework.It("validates various priority Pods preempt expectedly with the async preemption", feature.SchedulerAsyncPreemption, func(ctx context.Context) { + var podRes v1.ResourceList + // Create 10 pods per node that will eat up all the node's resources. + ginkgo.By("Create 10 low-priority pods on each node.") + lowPriorityPods := make([]*v1.Pod, 0, 10*len(nodeList.Items)) + // Create pods in the cluster. + for i, node := range nodeList.Items { + // Update each node to advertise 3 available extended resources + e2enode.AddExtendedResource(ctx, cs, node.Name, testExtendedResource, resource.MustParse("10")) + + // Create 10 low priority pods on each node, which will use up 10/10 of the node's resources. + for j := 0; j < 10; j++ { + // Request 1 of the available resources for the victim pods + podRes = v1.ResourceList{} + podRes[testExtendedResource] = resource.MustParse("1") + pausePod := createPausePod(ctx, f, pausePodConfig{ + Name: fmt.Sprintf("pod%d-%d-%v", i, j, lowPriorityClassName), + PriorityClassName: lowPriorityClassName, + // This victim pod will be preempted by the high priority pod. + // But, the deletion will be blocked by the finalizer. + // + // The finalizer is needed to prevent the medium Pods from being scheduled instead of the high Pods, + // depending on when the scheduler notices the existence of all the high Pods we create. + Finalizers: []string{testFinalizer}, + Resources: &v1.ResourceRequirements{ + Requests: podRes, + Limits: podRes, + }, + Affinity: &v1.Affinity{ + NodeAffinity: &v1.NodeAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{ + NodeSelectorTerms: []v1.NodeSelectorTerm{ + { + MatchFields: []v1.NodeSelectorRequirement{ + {Key: "metadata.name", Operator: v1.NodeSelectorOpIn, Values: []string{node.Name}}, + }, + }, + }, + }, + }, + }, + }) + lowPriorityPods = append(lowPriorityPods, pausePod) + framework.Logf("Created pod: %v", pausePod.Name) + } + } + + ginkgo.By("Wait for lower priority pods to be scheduled.") + for _, pod := range lowPriorityPods { + framework.ExpectNoError(e2epod.WaitForPodRunningInNamespace(ctx, cs, pod)) + } + + highPriorityPods := make([]*v1.Pod, 0, 5*len(nodeList.Items)) + mediumPriorityPods := make([]*v1.Pod, 0, 10*len(nodeList.Items)) + + ginkgo.By("Run high/medium priority pods that have same requirements as that of lower priority pod") + for i := range nodeList.Items { + // Create medium priority pods first + // to confirm the scheduler finally prioritize the high priority pods, ignoring the medium priority pods. + for j := 0; j < 10; j++ { + // 5 pods per node will be unschedulable + // because the node only has 10 resource, and high priority pods will use 5 resource. + p := createPausePod(ctx, f, pausePodConfig{ + Name: fmt.Sprintf("pod%d-%d-%v", i, j, mediumPriorityClassName), + PriorityClassName: mediumPriorityClassName, + Resources: &v1.ResourceRequirements{ + // Set the pod request to the low priority pod's resources + Requests: lowPriorityPods[0].Spec.Containers[0].Resources.Requests, + Limits: lowPriorityPods[0].Spec.Containers[0].Resources.Requests, + }, + }) + mediumPriorityPods = append(mediumPriorityPods, p) + } + + for j := 0; j < 5; j++ { + p := createPausePod(ctx, f, pausePodConfig{ + Name: fmt.Sprintf("pod%d-%d-%v", i, j, highPriorityClassName), + PriorityClassName: highPriorityClassName, + Resources: &v1.ResourceRequirements{ + // Set the pod request to the low priority pod's resources + Requests: lowPriorityPods[0].Spec.Containers[0].Resources.Requests, + Limits: lowPriorityPods[0].Spec.Containers[0].Resources.Requests, + }, + }) + highPriorityPods = append(highPriorityPods, p) + } + } + + // All low priority Pods should be the target of preemption. + // Those Pods have a finalizer and hence should not be deleted yet at this point. + ginkgo.By("Check all low priority pods to be about to preempted.") + for _, pod := range lowPriorityPods { + framework.ExpectNoError(wait.PollUntilContextTimeout(ctx, time.Second, framework.PodStartTimeout, false, func(ctx context.Context) (bool, error) { + preemptedPod, err := cs.CoreV1().Pods(pod.Namespace).Get(ctx, pod.Name, metav1.GetOptions{}) + if err != nil { + return false, err + } + return preemptedPod.DeletionTimestamp != nil, nil + })) + } + + // All high priority Pods should be schedulable by removing the low priority Pods. + ginkgo.By("Wait for high priority pods to be ready for the preemption.") + for _, pod := range highPriorityPods { + framework.ExpectNoError(wait.PollUntilContextTimeout(ctx, time.Second, framework.PodStartTimeout, false, func(ctx context.Context) (bool, error) { + highPod, err := cs.CoreV1().Pods(pod.Namespace).Get(ctx, pod.Name, metav1.GetOptions{}) + if err != nil { + return false, err + } + return highPod.Status.NominatedNodeName != "", nil + })) + } + + ginkgo.By("Remove the finalizer from all low priority pods to proceed the preemption.") + for _, pod := range lowPriorityPods { + // Remove the finalizer so that the pod can be deleted by GC + e2epod.NewPodClient(f).RemoveFinalizer(ctx, pod.Name, testFinalizer) + } + + ginkgo.By("Wait for high priority pods to be scheduled.") + for _, pod := range highPriorityPods { + framework.ExpectNoError(e2epod.WaitForPodRunningInNamespace(ctx, cs, pod)) + } + + ginkgo.By("Wait for 5 medium priority pods to be scheduled.") + framework.ExpectNoError(wait.PollUntilContextTimeout(ctx, time.Second, framework.PodStartTimeout, false, func(ctx context.Context) (bool, error) { + scheduled := 0 + for _, pod := range mediumPriorityPods { + medPod, err := cs.CoreV1().Pods(pod.Namespace).Get(ctx, pod.Name, metav1.GetOptions{}) + if err != nil { + return false, err + } + + if medPod.Spec.NodeName != "" { + scheduled++ + } + } + if scheduled > 5 { + return false, fmt.Errorf("expected 5 medium priority pods to be scheduled, but got %d", scheduled) + } + + return scheduled == 5, nil + })) + }) + /* Release: v1.31 Testname: Verify the DisruptionTarget condition is added to the preempted pod diff --git a/test/featuregates_linter/test_data/versioned_feature_list.yaml b/test/featuregates_linter/test_data/versioned_feature_list.yaml index b3063aef6c7..a829c62413d 100644 --- a/test/featuregates_linter/test_data/versioned_feature_list.yaml +++ b/test/featuregates_linter/test_data/versioned_feature_list.yaml @@ -1058,6 +1058,12 @@ lockToDefault: false preRelease: Alpha version: "1.29" +- name: SchedulerAsyncPreemption + versionedSpecs: + - default: false + lockToDefault: false + preRelease: Alpha + version: "1.32" - name: SchedulerQueueingHints versionedSpecs: - default: false diff --git a/test/integration/scheduler/preemption/preemption_test.go b/test/integration/scheduler/preemption/preemption_test.go index 403da74d663..e633267a515 100644 --- a/test/integration/scheduler/preemption/preemption_test.go +++ b/test/integration/scheduler/preemption/preemption_test.go @@ -33,23 +33,34 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/wait" + utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/informers" clientset "k8s.io/client-go/kubernetes" restclient "k8s.io/client-go/rest" + featuregatetesting "k8s.io/component-base/featuregate/testing" "k8s.io/component-helpers/storage/volume" "k8s.io/klog/v2" configv1 "k8s.io/kube-scheduler/config/v1" podutil "k8s.io/kubernetes/pkg/api/v1/pod" "k8s.io/kubernetes/pkg/apis/scheduling" + "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/scheduler" + "k8s.io/kubernetes/pkg/scheduler/apis/config" configtesting "k8s.io/kubernetes/pkg/scheduler/apis/config/testing" + "k8s.io/kubernetes/pkg/scheduler/backend/queue" "k8s.io/kubernetes/pkg/scheduler/framework" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultpreemption" + plfeature "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/names" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumerestrictions" + "k8s.io/kubernetes/pkg/scheduler/framework/preemption" frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime" st "k8s.io/kubernetes/pkg/scheduler/testing" "k8s.io/kubernetes/plugin/pkg/admission/priority" testutils "k8s.io/kubernetes/test/integration/util" + "k8s.io/kubernetes/test/utils/ktesting" "k8s.io/utils/pointer" + "k8s.io/utils/ptr" ) // imported from testutils @@ -452,59 +463,562 @@ func TestPreemption(t *testing.T) { t.Fatalf("Error creating node: %v", err) } - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - filter.Tokens = test.initTokens - filter.EnablePreFilter = test.enablePreFilter - filter.Unresolvable = test.unresolvable - pods := make([]*v1.Pod, len(test.existingPods)) - // Create and run existingPods. - for i, p := range test.existingPods { - pods[i], err = runPausePod(cs, p) - if err != nil { - t.Fatalf("Error running pause pod: %v", err) - } - } - // Create the "pod". - preemptor, err := createPausePod(cs, test.pod) - if err != nil { - t.Errorf("Error while creating high priority pod: %v", err) - } - // Wait for preemption of pods and make sure the other ones are not preempted. - for i, p := range pods { - if _, found := test.preemptedPodIndexes[i]; found { - if err = wait.PollUntilContextTimeout(testCtx.Ctx, time.Second, wait.ForeverTestTimeout, false, - podIsGettingEvicted(cs, p.Namespace, p.Name)); err != nil { - t.Errorf("Pod %v/%v is not getting evicted.", p.Namespace, p.Name) - } - pod, err := cs.CoreV1().Pods(p.Namespace).Get(testCtx.Ctx, p.Name, metav1.GetOptions{}) + for _, asyncPreemptionEnabled := range []bool{true, false} { + for _, test := range tests { + t.Run(fmt.Sprintf("%s (Async preemption enabled: %v)", test.name, asyncPreemptionEnabled), func(t *testing.T) { + featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.SchedulerAsyncPreemption, asyncPreemptionEnabled) + + filter.Tokens = test.initTokens + filter.EnablePreFilter = test.enablePreFilter + filter.Unresolvable = test.unresolvable + pods := make([]*v1.Pod, len(test.existingPods)) + // Create and run existingPods. + for i, p := range test.existingPods { + pods[i], err = runPausePod(cs, p) if err != nil { - t.Errorf("Error %v when getting the updated status for pod %v/%v ", err, p.Namespace, p.Name) + t.Fatalf("Error running pause pod: %v", err) } - _, cond := podutil.GetPodCondition(&pod.Status, v1.DisruptionTarget) - if cond == nil { - t.Errorf("Pod %q does not have the expected condition: %q", klog.KObj(pod), v1.DisruptionTarget) - } - } else { - if p.DeletionTimestamp != nil { + } + // Create the "pod". + preemptor, err := createPausePod(cs, test.pod) + if err != nil { + t.Errorf("Error while creating high priority pod: %v", err) + } + // Wait for preemption of pods and make sure the other ones are not preempted. + for i, p := range pods { + if _, found := test.preemptedPodIndexes[i]; found { + if err = wait.PollUntilContextTimeout(testCtx.Ctx, time.Second, wait.ForeverTestTimeout, false, + podIsGettingEvicted(cs, p.Namespace, p.Name)); err != nil { + t.Errorf("Pod %v/%v is not getting evicted.", p.Namespace, p.Name) + } + pod, err := cs.CoreV1().Pods(p.Namespace).Get(testCtx.Ctx, p.Name, metav1.GetOptions{}) + if err != nil { + t.Errorf("Error %v when getting the updated status for pod %v/%v ", err, p.Namespace, p.Name) + } + _, cond := podutil.GetPodCondition(&pod.Status, v1.DisruptionTarget) + if cond == nil { + t.Errorf("Pod %q does not have the expected condition: %q", klog.KObj(pod), v1.DisruptionTarget) + } + } else if p.DeletionTimestamp != nil { t.Errorf("Didn't expect pod %v to get preempted.", p.Name) } } - } - // Also check that the preemptor pod gets the NominatedNodeName field set. - if len(test.preemptedPodIndexes) > 0 { - if err := waitForNominatedNodeName(cs, preemptor); err != nil { - t.Errorf("NominatedNodeName field was not set for pod %v: %v", preemptor.Name, err) + // Also check that the preemptor pod gets the NominatedNodeName field set. + if len(test.preemptedPodIndexes) > 0 { + if err := waitForNominatedNodeName(cs, preemptor); err != nil { + t.Errorf("NominatedNodeName field was not set for pod %v: %v", preemptor.Name, err) + } } + + // Cleanup + pods = append(pods, preemptor) + testutils.CleanupPods(testCtx.Ctx, cs, t, pods) + }) + } + } +} + +func TestAsyncPreemption(t *testing.T) { + type createPod struct { + pod *v1.Pod + // count is the number of times the pod should be created by this action. + // i.e., if you use it, you have to use GenerateName. + // By default, it's 1. + count *int + } + + type schedulePod struct { + podName string + expectSuccess bool + } + + type scenario struct { + // name is this step's name, just for the debugging purpose. + name string + + // Only one of the following actions should be set. + + // createPod creates a Pod. + createPod *createPod + // schedulePod schedules one Pod that is at the top of the activeQ. + // You should give a Pod name that is supposed to be scheduled. + schedulePod *schedulePod + // completePreemption completes the preemption that is currently on-going. + // You should give a Pod name. + completePreemption string + // podGatedInQueue checks if the given Pod is in the scheduling queue and gated by the preemption. + // You should give a Pod name. + podGatedInQueue string + // podRunningPreemption checks if the given Pod is running preemption. + // You should give a Pod index representing the order of Pod creation. + // e.g., if you want to check the Pod created first in the test case, you should give 0. + podRunningPreemption *int + } + + tests := []struct { + name string + // scenarios after the first attempt of scheduling the pod. + scenarios []scenario + }{ + { + // Very basic test case: if it fails, the basic scenario is broken somewhere. + name: "basic: async preemption happens expectedly", + scenarios: []scenario{ + { + name: "create scheduled Pod", + createPod: &createPod{ + pod: st.MakePod().GenerateName("victim-").Req(map[v1.ResourceName]string{v1.ResourceCPU: "2"}).Node("node").Container("image").ZeroTerminationGracePeriod().Priority(1).Obj(), + count: ptr.To(2), + }, + }, + { + name: "create a preemptor Pod", + createPod: &createPod{ + pod: st.MakePod().Name("preemptor").Req(map[v1.ResourceName]string{v1.ResourceCPU: "4"}).Container("image").Priority(100).Obj(), + }, + }, + { + name: "schedule the preemptor Pod", + schedulePod: &schedulePod{ + podName: "preemptor", + }, + }, + { + name: "check the pod is in the queue and gated", + podGatedInQueue: "preemptor", + }, + { + name: "check the preemptor Pod making the preemption API calls", + podRunningPreemption: ptr.To(2), + }, + { + name: "complete the preemption API calls", + completePreemption: "preemptor", + }, + { + name: "schedule the preemptor Pod after the preemption", + schedulePod: &schedulePod{ + podName: "preemptor", + expectSuccess: true, + }, + }, + }, + }, + { + name: "Lower priority Pod doesn't take over the place for higher priority Pod that is running the preemption", + scenarios: []scenario{ + { + name: "create scheduled Pod", + createPod: &createPod{ + pod: st.MakePod().GenerateName("victim-").Req(map[v1.ResourceName]string{v1.ResourceCPU: "2"}).Node("node").Container("image").ZeroTerminationGracePeriod().Priority(1).Obj(), + count: ptr.To(2), + }, + }, + { + name: "create a preemptor Pod", + createPod: &createPod{ + pod: st.MakePod().Name("preemptor-high-priority").Req(map[v1.ResourceName]string{v1.ResourceCPU: "4"}).Container("image").Priority(100).Obj(), + }, + }, + { + name: "schedule the preemptor Pod", + schedulePod: &schedulePod{ + podName: "preemptor-high-priority", + }, + }, + { + name: "check the pod is in the queue and gated", + podGatedInQueue: "preemptor-high-priority", + }, + { + name: "check the preemptor Pod making the preemption API calls", + podRunningPreemption: ptr.To(2), + }, + { + // This Pod is lower priority than the preemptor Pod. + // Given the preemptor Pod is nominated to the node, this Pod should be unschedulable. + name: "create a second Pod that is lower priority than the first preemptor Pod", + createPod: &createPod{ + pod: st.MakePod().Name("pod-mid-priority").Req(map[v1.ResourceName]string{v1.ResourceCPU: "4"}).Container("image").Priority(50).Obj(), + }, + }, + { + name: "schedule the mid-priority Pod", + schedulePod: &schedulePod{ + podName: "pod-mid-priority", + }, + }, + { + name: "complete the preemption API calls", + completePreemption: "preemptor-high-priority", + }, + { + // the preemptor pod should be popped from the queue before the mid-priority pod. + name: "schedule the preemptor Pod again", + schedulePod: &schedulePod{ + podName: "preemptor-high-priority", + expectSuccess: true, + }, + }, + { + name: "schedule the mid-priority Pod again", + schedulePod: &schedulePod{ + podName: "pod-mid-priority", + }, + }, + }, + }, + { + name: "Higher priority Pod takes over the place for lower priority Pod that is running the preemption", + scenarios: []scenario{ + { + name: "create scheduled Pod", + createPod: &createPod{ + pod: st.MakePod().GenerateName("victim-").Req(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).Node("node").Container("image").ZeroTerminationGracePeriod().Priority(1).Obj(), + count: ptr.To(4), + }, + }, + { + name: "create a preemptor Pod", + createPod: &createPod{ + pod: st.MakePod().Name("preemptor-high-priority").Req(map[v1.ResourceName]string{v1.ResourceCPU: "4"}).Container("image").Priority(100).Obj(), + }, + }, + { + name: "schedule the preemptor Pod", + schedulePod: &schedulePod{ + podName: "preemptor-high-priority", + }, + }, + { + name: "check the pod is in the queue and gated", + podGatedInQueue: "preemptor-high-priority", + }, + { + name: "check the preemptor Pod making the preemption API calls", + podRunningPreemption: ptr.To(4), + }, + { + // This Pod is higher priority than the preemptor Pod. + // Even though the preemptor Pod is nominated to the node, this Pod can take over the place. + name: "create a second Pod that is higher priority than the first preemptor Pod", + createPod: &createPod{ + pod: st.MakePod().Name("preemptor-super-high-priority").Req(map[v1.ResourceName]string{v1.ResourceCPU: "4"}).Container("image").Priority(200).Obj(), + }, + }, + { + name: "schedule the super-high-priority Pod", + schedulePod: &schedulePod{ + podName: "preemptor-super-high-priority", + }, + }, + { + name: "check the super-high-priority Pod making the preemption API calls", + podRunningPreemption: ptr.To(5), + }, + { + // the super-high-priority preemptor should enter the preemption + // and select the place where the preemptor-high-priority selected. + // So, basically both goroutines are preempting the same Pods. + name: "check the super-high-priority pod is in the queue and gated", + podGatedInQueue: "preemptor-super-high-priority", + }, + { + name: "complete the preemption API calls of super-high-priority", + completePreemption: "preemptor-super-high-priority", + }, + { + name: "complete the preemption API calls of high-priority", + completePreemption: "preemptor-high-priority", + }, + { + name: "schedule the super-high-priority Pod", + schedulePod: &schedulePod{ + podName: "preemptor-super-high-priority", + expectSuccess: true, + }, + }, + { + name: "schedule the high-priority Pod", + schedulePod: &schedulePod{ + podName: "preemptor-high-priority", + }, + }, + }, + }, + { + name: "Lower priority Pod can select the same place where the higher priority Pod is preempting if the node is big enough", + scenarios: []scenario{ + { + name: "create scheduled Pod", + createPod: &createPod{ + pod: st.MakePod().GenerateName("victim-").Req(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).Node("node").Container("image").ZeroTerminationGracePeriod().Priority(1).Obj(), + count: ptr.To(4), + }, + }, + { + // It will preempt two victims. + name: "create a preemptor Pod", + createPod: &createPod{ + pod: st.MakePod().Name("preemptor-high-priority").Req(map[v1.ResourceName]string{v1.ResourceCPU: "2"}).Container("image").Priority(100).Obj(), + }, + }, + { + name: "schedule the preemptor Pod", + schedulePod: &schedulePod{ + podName: "preemptor-high-priority", + }, + }, + { + name: "check the pod is in the queue and gated", + podGatedInQueue: "preemptor-high-priority", + }, + { + name: "check the preemptor Pod making the preemption API calls", + podRunningPreemption: ptr.To(4), + }, + { + // This Pod is lower priority than the preemptor Pod. + // Given the preemptor Pod is nominated to the node, this Pod should be unschedulable. + // This Pod will trigger the preemption to target the two victims that the first Pod doesn't target. + name: "create a second Pod that is lower priority than the first preemptor Pod", + createPod: &createPod{ + pod: st.MakePod().Name("preemptor-mid-priority").Req(map[v1.ResourceName]string{v1.ResourceCPU: "2"}).Container("image").Priority(50).Obj(), + }, + }, + { + name: "schedule the mid-priority Pod", + schedulePod: &schedulePod{ + podName: "preemptor-mid-priority", + }, + }, + { + name: "check the mid-priority pod is in the queue and gated", + podGatedInQueue: "preemptor-mid-priority", + }, + { + name: "check the mid-priority Pod making the preemption API calls", + podRunningPreemption: ptr.To(5), + }, + { + name: "complete the preemption API calls", + completePreemption: "preemptor-mid-priority", + }, + { + name: "complete the preemption API calls", + completePreemption: "preemptor-high-priority", + }, + { + // the preemptor pod should be popped from the queue before the mid-priority pod. + name: "schedule the preemptor Pod again", + schedulePod: &schedulePod{ + podName: "preemptor-high-priority", + expectSuccess: true, + }, + }, + { + name: "schedule the mid-priority Pod again", + schedulePod: &schedulePod{ + podName: "preemptor-mid-priority", + expectSuccess: true, + }, + }, + }, + }, + } + + // All test cases have the same node. + node := st.MakeNode().Name("node").Capacity(map[v1.ResourceName]string{v1.ResourceCPU: "4"}).Obj() + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + // We need to use a custom preemption plugin to test async preemption behavior + delayedPreemptionPluginName := "delay-preemption" + // keyed by the pod name + preemptionDoneChannels := make(map[string]chan struct{}) + defer func() { + for _, ch := range preemptionDoneChannels { + close(ch) + } + }() + registry := make(frameworkruntime.Registry) + var preemptionPlugin *defaultpreemption.DefaultPreemption + err := registry.Register(delayedPreemptionPluginName, func(c context.Context, r runtime.Object, fh framework.Handle) (framework.Plugin, error) { + p, err := frameworkruntime.FactoryAdapter(plfeature.Features{EnableAsyncPreemption: true}, defaultpreemption.New)(c, &config.DefaultPreemptionArgs{ + // Set default values to pass the validation at the initialization, not related to the test. + MinCandidateNodesPercentage: 10, + MinCandidateNodesAbsolute: 100, + }, fh) + if err != nil { + return nil, fmt.Errorf("error creating default preemption plugin: %w", err) + } + + var ok bool + preemptionPlugin, ok = p.(*defaultpreemption.DefaultPreemption) + if !ok { + return nil, fmt.Errorf("unexpected plugin type %T", p) + } + + preemptPodFn := preemptionPlugin.Evaluator.PreemptPod + preemptionPlugin.Evaluator.PreemptPod = func(ctx context.Context, c preemption.Candidate, preemptor, victim *v1.Pod, pluginName string) error { + // block the preemption goroutine to complete until the test case allows it to proceed. + if ch, ok := preemptionDoneChannels[preemptor.Name]; ok { + <-ch + } + return preemptPodFn(ctx, c, preemptor, victim, pluginName) + } + + return preemptionPlugin, nil + }) + if err != nil { + t.Fatalf("Error registering a filter: %v", err) + } + cfg := configtesting.V1ToInternalWithDefaults(t, configv1.KubeSchedulerConfiguration{ + Profiles: []configv1.KubeSchedulerProfile{{ + SchedulerName: pointer.String(v1.DefaultSchedulerName), + Plugins: &configv1.Plugins{ + MultiPoint: configv1.PluginSet{ + Enabled: []configv1.Plugin{ + {Name: delayedPreemptionPluginName}, + }, + Disabled: []configv1.Plugin{ + {Name: names.DefaultPreemption}, + }, + }, + }, + }}, + }) + + // It initializes the scheduler, but doesn't start. + // We manually trigger the scheduling cycle. + testCtx := testutils.InitTestSchedulerWithOptions(t, + testutils.InitTestAPIServer(t, "preemption", nil), + 0, + scheduler.WithProfiles(cfg.Profiles...), + scheduler.WithFrameworkOutOfTreeRegistry(registry), + // disable backoff + scheduler.WithPodMaxBackoffSeconds(0), + scheduler.WithPodInitialBackoffSeconds(0), + ) + testutils.SyncSchedulerInformerFactory(testCtx) + cs := testCtx.ClientSet + + if preemptionPlugin == nil { + t.Fatalf("the preemption plugin should be initialized") } - // Cleanup - pods = append(pods, preemptor) - testutils.CleanupPods(testCtx.Ctx, cs, t, pods) + logger, _ := ktesting.NewTestContext(t) + featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.SchedulerAsyncPreemption, true) + + createdPods := []*v1.Pod{} + defer testutils.CleanupPods(testCtx.Ctx, cs, t, createdPods) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + if _, err := cs.CoreV1().Nodes().Create(ctx, node, metav1.CreateOptions{}); err != nil { + t.Fatalf("Failed to create an initial Node %q: %v", node.Name, err) + } + defer func() { + if err := cs.CoreV1().Nodes().Delete(ctx, node.Name, metav1.DeleteOptions{}); err != nil { + t.Fatalf("Failed to delete the Node %q: %v", node.Name, err) + } + }() + + for _, scenario := range test.scenarios { + t.Logf("Running scenario: %s", scenario.name) + switch { + case scenario.createPod != nil: + if scenario.createPod.count == nil { + scenario.createPod.count = ptr.To(1) + } + + for i := 0; i < *scenario.createPod.count; i++ { + pod, err := cs.CoreV1().Pods(testCtx.NS.Name).Create(ctx, scenario.createPod.pod, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("Failed to create a Pod %q: %v", pod.Name, err) + } + createdPods = append(createdPods, pod) + } + case scenario.schedulePod != nil: + lastFailure := "" + if err := wait.PollUntilContextTimeout(testCtx.Ctx, time.Millisecond*200, wait.ForeverTestTimeout, false, func(ctx context.Context) (bool, error) { + if len(testCtx.Scheduler.SchedulingQueue.PodsInActiveQ()) == 0 { + lastFailure = fmt.Sprintf("Expected the pod %s to be scheduled, but no pod arrives at the activeQ", scenario.schedulePod.podName) + return false, nil + } + + if testCtx.Scheduler.SchedulingQueue.PodsInActiveQ()[0].Name != scenario.schedulePod.podName { + // need to wait more because maybe the queue will get another Pod that higher priority than the current top pod. + lastFailure = fmt.Sprintf("The pod %s is expected to be scheduled, but the top Pod is %s", scenario.schedulePod.podName, testCtx.Scheduler.SchedulingQueue.PodsInActiveQ()[0].Name) + return false, nil + } + + return true, nil + }); err != nil { + t.Fatal(lastFailure) + } + + preemptionDoneChannels[scenario.schedulePod.podName] = make(chan struct{}) + testCtx.Scheduler.ScheduleOne(testCtx.Ctx) + if scenario.schedulePod.expectSuccess { + if err := wait.PollUntilContextTimeout(testCtx.Ctx, 200*time.Millisecond, wait.ForeverTestTimeout, false, testutils.PodScheduled(cs, testCtx.NS.Name, scenario.schedulePod.podName)); err != nil { + t.Fatalf("Expected the pod %s to be scheduled", scenario.schedulePod.podName) + } + } else { + if !podInUnschedulablePodPool(t, testCtx.Scheduler.SchedulingQueue, scenario.schedulePod.podName) { + t.Fatalf("Expected the pod %s to be in the queue after the scheduling attempt", scenario.schedulePod.podName) + } + } + case scenario.completePreemption != "": + if _, ok := preemptionDoneChannels[scenario.completePreemption]; !ok { + t.Fatalf("The preemptor Pod %q is not running preemption", scenario.completePreemption) + } + + close(preemptionDoneChannels[scenario.completePreemption]) + delete(preemptionDoneChannels, scenario.completePreemption) + case scenario.podGatedInQueue != "": + // make sure the Pod is in the queue in the first place. + if !podInUnschedulablePodPool(t, testCtx.Scheduler.SchedulingQueue, scenario.podGatedInQueue) { + t.Fatalf("Expected the pod %s to be in the queue", scenario.podGatedInQueue) + } + + // Make sure this Pod is gated by the preemption at PreEnqueue extension point + // by activating the Pod and see if it's still in the unsched pod pool. + testCtx.Scheduler.SchedulingQueue.Activate(logger, map[string]*v1.Pod{scenario.podGatedInQueue: st.MakePod().Namespace(testCtx.NS.Name).Name(scenario.podGatedInQueue).Obj()}) + if !podInUnschedulablePodPool(t, testCtx.Scheduler.SchedulingQueue, scenario.podGatedInQueue) { + t.Fatalf("Expected the pod %s to be in the queue even after the activation", scenario.podGatedInQueue) + } + case scenario.podRunningPreemption != nil: + if err := wait.PollUntilContextTimeout(testCtx.Ctx, time.Millisecond*200, wait.ForeverTestTimeout, false, func(ctx context.Context) (bool, error) { + return preemptionPlugin.Evaluator.IsPodRunningPreemption(createdPods[*scenario.podRunningPreemption].GetUID()), nil + }); err != nil { + t.Fatalf("Expected the pod %s to be running preemption", createdPods[*scenario.podRunningPreemption].Name) + } + } + } }) } } +// podInUnschedulablePodPool checks if the given Pod is in the unschedulable pod pool. +func podInUnschedulablePodPool(t *testing.T, queue queue.SchedulingQueue, podName string) bool { + t.Helper() + // First, look for the pod in the activeQ. + for _, pod := range queue.PodsInActiveQ() { + if pod.Name == podName { + return false + } + } + + pendingPods, _ := queue.PendingPods() + for _, pod := range pendingPods { + if pod.Name == podName { + return true + } + } + return false +} + // TestNonPreemption tests NonPreempt option of PriorityClass of scheduler works as expected. func TestNonPreemption(t *testing.T) { var preemptNever = v1.PreemptNever @@ -554,32 +1068,35 @@ func TestNonPreemption(t *testing.T) { if err != nil { t.Fatalf("Error creating nodes: %v", err) } - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - defer testutils.CleanupPods(testCtx.Ctx, cs, t, []*v1.Pod{preemptor, victim}) - preemptor.Spec.PreemptionPolicy = test.PreemptionPolicy - victimPod, err := createPausePod(cs, victim) - if err != nil { - t.Fatalf("Error while creating victim: %v", err) - } - if err := waitForPodToScheduleWithTimeout(cs, victimPod, 5*time.Second); err != nil { - t.Fatalf("victim %v should be become scheduled", victimPod.Name) - } - preemptorPod, err := createPausePod(cs, preemptor) - if err != nil { - t.Fatalf("Error while creating preemptor: %v", err) - } + for _, asyncPreemptionEnabled := range []bool{true, false} { + for _, test := range tests { + t.Run(fmt.Sprintf("%s (Async preemption enabled: %v)", test.name, asyncPreemptionEnabled), func(t *testing.T) { + defer testutils.CleanupPods(testCtx.Ctx, cs, t, []*v1.Pod{preemptor, victim}) + preemptor.Spec.PreemptionPolicy = test.PreemptionPolicy + victimPod, err := createPausePod(cs, victim) + if err != nil { + t.Fatalf("Error while creating victim: %v", err) + } + if err := waitForPodToScheduleWithTimeout(cs, victimPod, 5*time.Second); err != nil { + t.Fatalf("victim %v should be become scheduled", victimPod.Name) + } - err = waitForNominatedNodeNameWithTimeout(cs, preemptorPod, 5*time.Second) - // test.PreemptionPolicy == nil means we expect the preemptor to be nominated. - expect := test.PreemptionPolicy == nil - // err == nil indicates the preemptor is indeed nominated. - got := err == nil - if got != expect { - t.Errorf("Expect preemptor to be nominated=%v, but got=%v", expect, got) - } - }) + preemptorPod, err := createPausePod(cs, preemptor) + if err != nil { + t.Fatalf("Error while creating preemptor: %v", err) + } + + err = waitForNominatedNodeNameWithTimeout(cs, preemptorPod, 5*time.Second) + // test.PreemptionPolicy == nil means we expect the preemptor to be nominated. + expect := test.PreemptionPolicy == nil + // err == nil indicates the preemptor is indeed nominated. + got := err == nil + if got != expect { + t.Errorf("Expect preemptor to be nominated=%v, but got=%v", expect, got) + } + }) + } } } @@ -630,35 +1147,37 @@ func TestDisablePreemption(t *testing.T) { t.Fatalf("Error creating nodes: %v", err) } - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - pods := make([]*v1.Pod, len(test.existingPods)) - // Create and run existingPods. - for i, p := range test.existingPods { - pods[i], err = runPausePod(cs, p) - if err != nil { - t.Fatalf("Test [%v]: Error running pause pod: %v", test.name, err) + for _, asyncPreemptionEnabled := range []bool{true, false} { + for _, test := range tests { + t.Run(fmt.Sprintf("%s (Async preemption enabled: %v)", test.name, asyncPreemptionEnabled), func(t *testing.T) { + pods := make([]*v1.Pod, len(test.existingPods)) + // Create and run existingPods. + for i, p := range test.existingPods { + pods[i], err = runPausePod(cs, p) + if err != nil { + t.Fatalf("Test [%v]: Error running pause pod: %v", test.name, err) + } + } + // Create the "pod". + preemptor, err := createPausePod(cs, test.pod) + if err != nil { + t.Errorf("Error while creating high priority pod: %v", err) + } + // Ensure preemptor should keep unschedulable. + if err := waitForPodUnschedulable(testCtx.Ctx, cs, preemptor); err != nil { + t.Errorf("Preemptor %v should not become scheduled", preemptor.Name) } - } - // Create the "pod". - preemptor, err := createPausePod(cs, test.pod) - if err != nil { - t.Errorf("Error while creating high priority pod: %v", err) - } - // Ensure preemptor should keep unschedulable. - if err := waitForPodUnschedulable(testCtx.Ctx, cs, preemptor); err != nil { - t.Errorf("Preemptor %v should not become scheduled", preemptor.Name) - } - // Ensure preemptor should not be nominated. - if err := waitForNominatedNodeNameWithTimeout(cs, preemptor, 5*time.Second); err == nil { - t.Errorf("Preemptor %v should not be nominated", preemptor.Name) - } + // Ensure preemptor should not be nominated. + if err := waitForNominatedNodeNameWithTimeout(cs, preemptor, 5*time.Second); err == nil { + t.Errorf("Preemptor %v should not be nominated", preemptor.Name) + } - // Cleanup - pods = append(pods, preemptor) - testutils.CleanupPods(testCtx.Ctx, cs, t, pods) - }) + // Cleanup + pods = append(pods, preemptor) + testutils.CleanupPods(testCtx.Ctx, cs, t, pods) + }) + } } } @@ -736,9 +1255,9 @@ func TestPodPriorityResolution(t *testing.T) { } pods := make([]*v1.Pod, 0, len(tests)) - for _, test := range tests { - t.Run(test.Name, func(t *testing.T) { - t.Run(test.Name, func(t *testing.T) { + for _, asyncPreemptionEnabled := range []bool{true, false} { + for _, test := range tests { + t.Run(fmt.Sprintf("%s (Async preemption enabled: %v)", test.Name, asyncPreemptionEnabled), func(t *testing.T) { pod, err := runPausePod(cs, test.Pod) if err != nil { if test.ExpectedError == nil { @@ -757,10 +1276,10 @@ func TestPodPriorityResolution(t *testing.T) { } else { t.Errorf("Expected pod %v to have priority %v but was nil", pod.Name, test.PriorityClass) } + testutils.CleanupPods(testCtx.Ctx, cs, t, pods) }) - }) + } } - testutils.CleanupPods(testCtx.Ctx, cs, t, pods) testutils.CleanupNodes(cs, t) } @@ -824,58 +1343,60 @@ func TestPreemptionStarvation(t *testing.T) { t.Fatalf("Error creating nodes: %v", err) } - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - pendingPods := make([]*v1.Pod, test.numExpectedPending) - numRunningPods := test.numExistingPod - test.numExpectedPending - runningPods := make([]*v1.Pod, numRunningPods) - // Create and run existingPods. - for i := 0; i < numRunningPods; i++ { - runningPods[i], err = createPausePod(cs, mkPriorityPodWithGrace(testCtx, fmt.Sprintf("rpod-%v", i), mediumPriority, 0)) + for _, asyncPreemptionEnabled := range []bool{true, false} { + for _, test := range tests { + t.Run(fmt.Sprintf("%s (Async preemption enabled: %v)", test.name, asyncPreemptionEnabled), func(t *testing.T) { + pendingPods := make([]*v1.Pod, test.numExpectedPending) + numRunningPods := test.numExistingPod - test.numExpectedPending + runningPods := make([]*v1.Pod, numRunningPods) + // Create and run existingPods. + for i := 0; i < numRunningPods; i++ { + runningPods[i], err = createPausePod(cs, mkPriorityPodWithGrace(testCtx, fmt.Sprintf("rpod-%v", i), mediumPriority, 0)) + if err != nil { + t.Fatalf("Error creating pause pod: %v", err) + } + } + // make sure that runningPods are all scheduled. + for _, p := range runningPods { + if err := testutils.WaitForPodToSchedule(cs, p); err != nil { + t.Fatalf("Pod %v/%v didn't get scheduled: %v", p.Namespace, p.Name, err) + } + } + // Create pending pods. + for i := 0; i < test.numExpectedPending; i++ { + pendingPods[i], err = createPausePod(cs, mkPriorityPodWithGrace(testCtx, fmt.Sprintf("ppod-%v", i), mediumPriority, 0)) + if err != nil { + t.Fatalf("Error creating pending pod: %v", err) + } + } + // Make sure that all pending pods are being marked unschedulable. + for _, p := range pendingPods { + if err := wait.PollUntilContextTimeout(testCtx.Ctx, 100*time.Millisecond, wait.ForeverTestTimeout, false, + podUnschedulable(cs, p.Namespace, p.Name)); err != nil { + t.Errorf("Pod %v/%v didn't get marked unschedulable: %v", p.Namespace, p.Name, err) + } + } + // Create the preemptor. + preemptor, err := createPausePod(cs, test.preemptor) if err != nil { - t.Fatalf("Error creating pause pod: %v", err) + t.Errorf("Error while creating the preempting pod: %v", err) } - } - // make sure that runningPods are all scheduled. - for _, p := range runningPods { - if err := testutils.WaitForPodToSchedule(cs, p); err != nil { - t.Fatalf("Pod %v/%v didn't get scheduled: %v", p.Namespace, p.Name, err) + // Check if .status.nominatedNodeName of the preemptor pod gets set. + if err := waitForNominatedNodeName(cs, preemptor); err != nil { + t.Errorf(".status.nominatedNodeName was not set for pod %v/%v: %v", preemptor.Namespace, preemptor.Name, err) } - } - // Create pending pods. - for i := 0; i < test.numExpectedPending; i++ { - pendingPods[i], err = createPausePod(cs, mkPriorityPodWithGrace(testCtx, fmt.Sprintf("ppod-%v", i), mediumPriority, 0)) - if err != nil { - t.Fatalf("Error creating pending pod: %v", err) + // Make sure that preemptor is scheduled after preemptions. + if err := testutils.WaitForPodToScheduleWithTimeout(cs, preemptor, 60*time.Second); err != nil { + t.Errorf("Preemptor pod %v didn't get scheduled: %v", preemptor.Name, err) } - } - // Make sure that all pending pods are being marked unschedulable. - for _, p := range pendingPods { - if err := wait.PollUntilContextTimeout(testCtx.Ctx, 100*time.Millisecond, wait.ForeverTestTimeout, false, - podUnschedulable(cs, p.Namespace, p.Name)); err != nil { - t.Errorf("Pod %v/%v didn't get marked unschedulable: %v", p.Namespace, p.Name, err) - } - } - // Create the preemptor. - preemptor, err := createPausePod(cs, test.preemptor) - if err != nil { - t.Errorf("Error while creating the preempting pod: %v", err) - } - // Check if .status.nominatedNodeName of the preemptor pod gets set. - if err := waitForNominatedNodeName(cs, preemptor); err != nil { - t.Errorf(".status.nominatedNodeName was not set for pod %v/%v: %v", preemptor.Namespace, preemptor.Name, err) - } - // Make sure that preemptor is scheduled after preemptions. - if err := testutils.WaitForPodToScheduleWithTimeout(cs, preemptor, 60*time.Second); err != nil { - t.Errorf("Preemptor pod %v didn't get scheduled: %v", preemptor.Name, err) - } - // Cleanup - klog.Info("Cleaning up all pods...") - allPods := pendingPods - allPods = append(allPods, runningPods...) - allPods = append(allPods, preemptor) - testutils.CleanupPods(testCtx.Ctx, cs, t, allPods) - }) + // Cleanup + klog.Info("Cleaning up all pods...") + allPods := pendingPods + allPods = append(allPods, runningPods...) + allPods = append(allPods, preemptor) + testutils.CleanupPods(testCtx.Ctx, cs, t, allPods) + }) + } } } @@ -924,72 +1445,74 @@ func TestPreemptionRaces(t *testing.T) { t.Fatalf("Error creating nodes: %v", err) } - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - if test.numRepetitions <= 0 { - test.numRepetitions = 1 - } - for n := 0; n < test.numRepetitions; n++ { - initialPods := make([]*v1.Pod, test.numInitialPods) - additionalPods := make([]*v1.Pod, test.numAdditionalPods) - // Create and run existingPods. - for i := 0; i < test.numInitialPods; i++ { - initialPods[i], err = createPausePod(cs, mkPriorityPodWithGrace(testCtx, fmt.Sprintf("rpod-%v", i), mediumPriority, 0)) + for _, asyncPreemptionEnabled := range []bool{true, false} { + for _, test := range tests { + t.Run(fmt.Sprintf("%s (Async preemption enabled: %v)", test.name, asyncPreemptionEnabled), func(t *testing.T) { + if test.numRepetitions <= 0 { + test.numRepetitions = 1 + } + for n := 0; n < test.numRepetitions; n++ { + initialPods := make([]*v1.Pod, test.numInitialPods) + additionalPods := make([]*v1.Pod, test.numAdditionalPods) + // Create and run existingPods. + for i := 0; i < test.numInitialPods; i++ { + initialPods[i], err = createPausePod(cs, mkPriorityPodWithGrace(testCtx, fmt.Sprintf("rpod-%v", i), mediumPriority, 0)) + if err != nil { + t.Fatalf("Error creating pause pod: %v", err) + } + } + // make sure that initial Pods are all scheduled. + for _, p := range initialPods { + if err := testutils.WaitForPodToSchedule(cs, p); err != nil { + t.Fatalf("Pod %v/%v didn't get scheduled: %v", p.Namespace, p.Name, err) + } + } + // Create the preemptor. + klog.Info("Creating the preemptor pod...") + preemptor, err := createPausePod(cs, test.preemptor) if err != nil { - t.Fatalf("Error creating pause pod: %v", err) + t.Errorf("Error while creating the preempting pod: %v", err) } - } - // make sure that initial Pods are all scheduled. - for _, p := range initialPods { - if err := testutils.WaitForPodToSchedule(cs, p); err != nil { - t.Fatalf("Pod %v/%v didn't get scheduled: %v", p.Namespace, p.Name, err) - } - } - // Create the preemptor. - klog.Info("Creating the preemptor pod...") - preemptor, err := createPausePod(cs, test.preemptor) - if err != nil { - t.Errorf("Error while creating the preempting pod: %v", err) - } - klog.Info("Creating additional pods...") - for i := 0; i < test.numAdditionalPods; i++ { - additionalPods[i], err = createPausePod(cs, mkPriorityPodWithGrace(testCtx, fmt.Sprintf("ppod-%v", i), mediumPriority, 0)) - if err != nil { - t.Fatalf("Error creating pending pod: %v", err) + klog.Info("Creating additional pods...") + for i := 0; i < test.numAdditionalPods; i++ { + additionalPods[i], err = createPausePod(cs, mkPriorityPodWithGrace(testCtx, fmt.Sprintf("ppod-%v", i), mediumPriority, 0)) + if err != nil { + t.Fatalf("Error creating pending pod: %v", err) + } + } + // Check that the preemptor pod gets nominated node name. + if err := waitForNominatedNodeName(cs, preemptor); err != nil { + t.Errorf(".status.nominatedNodeName was not set for pod %v/%v: %v", preemptor.Namespace, preemptor.Name, err) + } + // Make sure that preemptor is scheduled after preemptions. + if err := testutils.WaitForPodToScheduleWithTimeout(cs, preemptor, 60*time.Second); err != nil { + t.Errorf("Preemptor pod %v didn't get scheduled: %v", preemptor.Name, err) } - } - // Check that the preemptor pod gets nominated node name. - if err := waitForNominatedNodeName(cs, preemptor); err != nil { - t.Errorf(".status.nominatedNodeName was not set for pod %v/%v: %v", preemptor.Namespace, preemptor.Name, err) - } - // Make sure that preemptor is scheduled after preemptions. - if err := testutils.WaitForPodToScheduleWithTimeout(cs, preemptor, 60*time.Second); err != nil { - t.Errorf("Preemptor pod %v didn't get scheduled: %v", preemptor.Name, err) - } - klog.Info("Check unschedulable pods still exists and were never scheduled...") - for _, p := range additionalPods { - pod, err := cs.CoreV1().Pods(p.Namespace).Get(testCtx.Ctx, p.Name, metav1.GetOptions{}) - if err != nil { - t.Errorf("Error in getting Pod %v/%v info: %v", p.Namespace, p.Name, err) - } - if len(pod.Spec.NodeName) > 0 { - t.Errorf("Pod %v/%v is already scheduled", p.Namespace, p.Name) - } - _, cond := podutil.GetPodCondition(&pod.Status, v1.PodScheduled) - if cond != nil && cond.Status != v1.ConditionFalse { - t.Errorf("Pod %v/%v is no longer unschedulable: %v", p.Namespace, p.Name, err) + klog.Info("Check unschedulable pods still exists and were never scheduled...") + for _, p := range additionalPods { + pod, err := cs.CoreV1().Pods(p.Namespace).Get(testCtx.Ctx, p.Name, metav1.GetOptions{}) + if err != nil { + t.Errorf("Error in getting Pod %v/%v info: %v", p.Namespace, p.Name, err) + } + if len(pod.Spec.NodeName) > 0 { + t.Errorf("Pod %v/%v is already scheduled", p.Namespace, p.Name) + } + _, cond := podutil.GetPodCondition(&pod.Status, v1.PodScheduled) + if cond != nil && cond.Status != v1.ConditionFalse { + t.Errorf("Pod %v/%v is no longer unschedulable: %v", p.Namespace, p.Name, err) + } } + // Cleanup + klog.Info("Cleaning up all pods...") + allPods := additionalPods + allPods = append(allPods, initialPods...) + allPods = append(allPods, preemptor) + testutils.CleanupPods(testCtx.Ctx, cs, t, allPods) } - // Cleanup - klog.Info("Cleaning up all pods...") - allPods := additionalPods - allPods = append(allPods, initialPods...) - allPods = append(allPods, preemptor) - testutils.CleanupPods(testCtx.Ctx, cs, t, allPods) - } - }) + }) + } } } @@ -1126,75 +1649,77 @@ func TestNominatedNodeCleanUp(t *testing.T) { }, } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - cfg := configtesting.V1ToInternalWithDefaults(t, configv1.KubeSchedulerConfiguration{ - Profiles: []configv1.KubeSchedulerProfile{{ - SchedulerName: pointer.String(v1.DefaultSchedulerName), - Plugins: tt.customPlugins, - }}, - }) - testCtx := initTest( - t, - "preemption", - scheduler.WithProfiles(cfg.Profiles...), - scheduler.WithFrameworkOutOfTreeRegistry(tt.outOfTreeRegistry), - ) + for _, asyncPreemptionEnabled := range []bool{true, false} { + for _, tt := range tests { + t.Run(fmt.Sprintf("%s (Async preemption enabled: %v)", tt.name, asyncPreemptionEnabled), func(t *testing.T) { + cfg := configtesting.V1ToInternalWithDefaults(t, configv1.KubeSchedulerConfiguration{ + Profiles: []configv1.KubeSchedulerProfile{{ + SchedulerName: pointer.String(v1.DefaultSchedulerName), + Plugins: tt.customPlugins, + }}, + }) + testCtx := initTest( + t, + "preemption", + scheduler.WithProfiles(cfg.Profiles...), + scheduler.WithFrameworkOutOfTreeRegistry(tt.outOfTreeRegistry), + ) - cs, ns := testCtx.ClientSet, testCtx.NS.Name - // Create a node with the specified capacity. - nodeName := "fake-node" - if _, err := createNode(cs, st.MakeNode().Name(nodeName).Capacity(tt.nodeCapacity).Obj()); err != nil { - t.Fatalf("Error creating node %v: %v", nodeName, err) - } - - // Create pods and run post check if necessary. - for i, pods := range tt.podsToCreate { - for _, p := range pods { - p.Namespace = ns - if _, err := createPausePod(cs, p); err != nil { - t.Fatalf("Error creating pod %v: %v", p.Name, err) - } + cs, ns := testCtx.ClientSet, testCtx.NS.Name + // Create a node with the specified capacity. + nodeName := "fake-node" + if _, err := createNode(cs, st.MakeNode().Name(nodeName).Capacity(tt.nodeCapacity).Obj()); err != nil { + t.Fatalf("Error creating node %v: %v", nodeName, err) } - // If necessary, run the post check function. - if len(tt.postChecks) > i && tt.postChecks[i] != nil { + + // Create pods and run post check if necessary. + for i, pods := range tt.podsToCreate { for _, p := range pods { - if err := tt.postChecks[i](cs, p); err != nil { - t.Fatalf("Pod %v didn't pass the postChecks[%v]: %v", p.Name, i, err) + p.Namespace = ns + if _, err := createPausePod(cs, p); err != nil { + t.Fatalf("Error creating pod %v: %v", p.Name, err) + } + } + // If necessary, run the post check function. + if len(tt.postChecks) > i && tt.postChecks[i] != nil { + for _, p := range pods { + if err := tt.postChecks[i](cs, p); err != nil { + t.Fatalf("Pod %v didn't pass the postChecks[%v]: %v", p.Name, i, err) + } } } } - } - // Delete the node if necessary. - if tt.deleteNode { - if err := cs.CoreV1().Nodes().Delete(testCtx.Ctx, nodeName, *metav1.NewDeleteOptions(0)); err != nil { - t.Fatalf("Node %v cannot be deleted: %v", nodeName, err) + // Delete the node if necessary. + if tt.deleteNode { + if err := cs.CoreV1().Nodes().Delete(testCtx.Ctx, nodeName, *metav1.NewDeleteOptions(0)); err != nil { + t.Fatalf("Node %v cannot be deleted: %v", nodeName, err) + } } - } - // Force deleting the terminating pods if necessary. - // This is required if we demand to delete terminating Pods physically. - for _, podName := range tt.podNamesToDelete { - if err := deletePod(cs, podName, ns); err != nil { - t.Fatalf("Pod %v cannot be deleted: %v", podName, err) + // Force deleting the terminating pods if necessary. + // This is required if we demand to delete terminating Pods physically. + for _, podName := range tt.podNamesToDelete { + if err := deletePod(cs, podName, ns); err != nil { + t.Fatalf("Pod %v cannot be deleted: %v", podName, err) + } } - } - // Verify if .status.nominatedNodeName is cleared. - if err := wait.PollUntilContextTimeout(testCtx.Ctx, 100*time.Millisecond, wait.ForeverTestTimeout, false, func(ctx context.Context) (bool, error) { - pod, err := cs.CoreV1().Pods(ns).Get(ctx, "medium", metav1.GetOptions{}) - if err != nil { - t.Errorf("Error getting the medium pod: %v", err) + // Verify if .status.nominatedNodeName is cleared. + if err := wait.PollUntilContextTimeout(testCtx.Ctx, 100*time.Millisecond, wait.ForeverTestTimeout, false, func(ctx context.Context) (bool, error) { + pod, err := cs.CoreV1().Pods(ns).Get(ctx, "medium", metav1.GetOptions{}) + if err != nil { + t.Errorf("Error getting the medium pod: %v", err) + } + if len(pod.Status.NominatedNodeName) == 0 { + return true, nil + } + return false, err + }); err != nil { + t.Errorf(".status.nominatedNodeName of the medium pod was not cleared: %v", err) } - if len(pod.Status.NominatedNodeName) == 0 { - return true, nil - } - return false, err - }); err != nil { - t.Errorf(".status.nominatedNodeName of the medium pod was not cleared: %v", err) - } - }) + }) + } } } @@ -1405,81 +1930,83 @@ func TestPDBInPreemption(t *testing.T) { }, } - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - for i := 1; i <= test.nodeCnt; i++ { - nodeName := fmt.Sprintf("node-%v", i) - _, err := createNode(cs, st.MakeNode().Name(nodeName).Capacity(defaultNodeRes).Obj()) - if err != nil { - t.Fatalf("Error creating node %v: %v", nodeName, err) - } - } - - pods := make([]*v1.Pod, len(test.existingPods)) - var err error - // Create and run existingPods. - for i, p := range test.existingPods { - if pods[i], err = runPausePod(cs, p); err != nil { - t.Fatalf("Test [%v]: Error running pause pod: %v", test.name, err) - } - // Add pod condition ready so that PDB is updated. - addPodConditionReady(p) - if _, err := testCtx.ClientSet.CoreV1().Pods(testCtx.NS.Name).UpdateStatus(testCtx.Ctx, p, metav1.UpdateOptions{}); err != nil { - t.Fatal(err) - } - } - // Wait for Pods to be stable in scheduler cache. - if err := waitCachedPodsStable(testCtx, test.existingPods); err != nil { - t.Fatalf("Not all pods are stable in the cache: %v", err) - } - - // Create PDBs. - for _, pdb := range test.pdbs { - _, err := testCtx.ClientSet.PolicyV1().PodDisruptionBudgets(testCtx.NS.Name).Create(testCtx.Ctx, pdb, metav1.CreateOptions{}) - if err != nil { - t.Fatalf("Failed to create PDB: %v", err) - } - } - // Wait for PDBs to become stable. - if err := waitForPDBsStable(testCtx, test.pdbs, test.pdbPodNum); err != nil { - t.Fatalf("Not all pdbs are stable in the cache: %v", err) - } - - // Create the "pod". - preemptor, err := createPausePod(cs, test.pod) - if err != nil { - t.Errorf("Error while creating high priority pod: %v", err) - } - // Wait for preemption of pods and make sure the other ones are not preempted. - for i, p := range pods { - if _, found := test.preemptedPodIndexes[i]; found { - if err = wait.PollUntilContextTimeout(testCtx.Ctx, time.Second, wait.ForeverTestTimeout, false, - podIsGettingEvicted(cs, p.Namespace, p.Name)); err != nil { - t.Errorf("Test [%v]: Pod %v/%v is not getting evicted.", test.name, p.Namespace, p.Name) - } - } else { - if p.DeletionTimestamp != nil { - t.Errorf("Test [%v]: Didn't expect pod %v/%v to get preempted.", test.name, p.Namespace, p.Name) + for _, asyncPreemptionEnabled := range []bool{true, false} { + for _, test := range tests { + t.Run(fmt.Sprintf("%s (Async preemption enabled: %v)", test.name, asyncPreemptionEnabled), func(t *testing.T) { + for i := 1; i <= test.nodeCnt; i++ { + nodeName := fmt.Sprintf("node-%v", i) + _, err := createNode(cs, st.MakeNode().Name(nodeName).Capacity(defaultNodeRes).Obj()) + if err != nil { + t.Fatalf("Error creating node %v: %v", nodeName, err) } } - } - // Also check if .status.nominatedNodeName of the preemptor pod gets set. - if len(test.preemptedPodIndexes) > 0 { - if err := waitForNominatedNodeName(cs, preemptor); err != nil { - t.Errorf("Test [%v]: .status.nominatedNodeName was not set for pod %v/%v: %v", test.name, preemptor.Namespace, preemptor.Name, err) - } - } - // Cleanup - pods = append(pods, preemptor) - testutils.CleanupPods(testCtx.Ctx, cs, t, pods) - if err := cs.PolicyV1().PodDisruptionBudgets(testCtx.NS.Name).DeleteCollection(testCtx.Ctx, metav1.DeleteOptions{}, metav1.ListOptions{}); err != nil { - t.Errorf("error while deleting PDBs, error: %v", err) - } - if err := cs.CoreV1().Nodes().DeleteCollection(testCtx.Ctx, metav1.DeleteOptions{}, metav1.ListOptions{}); err != nil { - t.Errorf("error whiling deleting nodes, error: %v", err) - } - }) + pods := make([]*v1.Pod, len(test.existingPods)) + var err error + // Create and run existingPods. + for i, p := range test.existingPods { + if pods[i], err = runPausePod(cs, p); err != nil { + t.Fatalf("Test [%v]: Error running pause pod: %v", test.name, err) + } + // Add pod condition ready so that PDB is updated. + addPodConditionReady(p) + if _, err := testCtx.ClientSet.CoreV1().Pods(testCtx.NS.Name).UpdateStatus(testCtx.Ctx, p, metav1.UpdateOptions{}); err != nil { + t.Fatal(err) + } + } + // Wait for Pods to be stable in scheduler cache. + if err := waitCachedPodsStable(testCtx, test.existingPods); err != nil { + t.Fatalf("Not all pods are stable in the cache: %v", err) + } + + // Create PDBs. + for _, pdb := range test.pdbs { + _, err := testCtx.ClientSet.PolicyV1().PodDisruptionBudgets(testCtx.NS.Name).Create(testCtx.Ctx, pdb, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("Failed to create PDB: %v", err) + } + } + // Wait for PDBs to become stable. + if err := waitForPDBsStable(testCtx, test.pdbs, test.pdbPodNum); err != nil { + t.Fatalf("Not all pdbs are stable in the cache: %v", err) + } + + // Create the "pod". + preemptor, err := createPausePod(cs, test.pod) + if err != nil { + t.Errorf("Error while creating high priority pod: %v", err) + } + // Wait for preemption of pods and make sure the other ones are not preempted. + for i, p := range pods { + if _, found := test.preemptedPodIndexes[i]; found { + if err = wait.PollUntilContextTimeout(testCtx.Ctx, time.Second, wait.ForeverTestTimeout, false, + podIsGettingEvicted(cs, p.Namespace, p.Name)); err != nil { + t.Errorf("Test [%v]: Pod %v/%v is not getting evicted.", test.name, p.Namespace, p.Name) + } + } else { + if p.DeletionTimestamp != nil { + t.Errorf("Test [%v]: Didn't expect pod %v/%v to get preempted.", test.name, p.Namespace, p.Name) + } + } + } + // Also check if .status.nominatedNodeName of the preemptor pod gets set. + if len(test.preemptedPodIndexes) > 0 { + if err := waitForNominatedNodeName(cs, preemptor); err != nil { + t.Errorf("Test [%v]: .status.nominatedNodeName was not set for pod %v/%v: %v", test.name, preemptor.Namespace, preemptor.Name, err) + } + } + + // Cleanup + pods = append(pods, preemptor) + testutils.CleanupPods(testCtx.Ctx, cs, t, pods) + if err := cs.PolicyV1().PodDisruptionBudgets(testCtx.NS.Name).DeleteCollection(testCtx.Ctx, metav1.DeleteOptions{}, metav1.ListOptions{}); err != nil { + t.Errorf("error while deleting PDBs, error: %v", err) + } + if err := cs.CoreV1().Nodes().DeleteCollection(testCtx.Ctx, metav1.DeleteOptions{}, metav1.ListOptions{}); err != nil { + t.Errorf("error whiling deleting nodes, error: %v", err) + } + }) + } } } @@ -1563,52 +2090,54 @@ func TestPreferNominatedNode(t *testing.T) { }, } - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - testCtx := initTestPreferNominatedNode(t, "perfer-nominated-node") - cs := testCtx.ClientSet - nsName := testCtx.NS.Name - var err error - var preemptor *v1.Pod - for _, nodeName := range test.nodeNames { - _, err := createNode(cs, st.MakeNode().Name(nodeName).Capacity(defaultNodeRes).Obj()) - if err != nil { - t.Fatalf("Error creating node %v: %v", nodeName, err) + for _, asyncPreemptionEnabled := range []bool{true, false} { + for _, test := range tests { + t.Run(fmt.Sprintf("%s (Async preemption enabled: %v)", test.name, asyncPreemptionEnabled), func(t *testing.T) { + testCtx := initTestPreferNominatedNode(t, "perfer-nominated-node") + cs := testCtx.ClientSet + nsName := testCtx.NS.Name + var err error + var preemptor *v1.Pod + for _, nodeName := range test.nodeNames { + _, err := createNode(cs, st.MakeNode().Name(nodeName).Capacity(defaultNodeRes).Obj()) + if err != nil { + t.Fatalf("Error creating node %v: %v", nodeName, err) + } } - } - pods := make([]*v1.Pod, len(test.existingPods)) - // Create and run existingPods. - for i, p := range test.existingPods { - p.Namespace = nsName - pods[i], err = runPausePod(cs, p) + pods := make([]*v1.Pod, len(test.existingPods)) + // Create and run existingPods. + for i, p := range test.existingPods { + p.Namespace = nsName + pods[i], err = runPausePod(cs, p) + if err != nil { + t.Fatalf("Error running pause pod: %v", err) + } + } + test.pod.Namespace = nsName + preemptor, err = createPausePod(cs, test.pod) if err != nil { - t.Fatalf("Error running pause pod: %v", err) + t.Errorf("Error while creating high priority pod: %v", err) } - } - test.pod.Namespace = nsName - preemptor, err = createPausePod(cs, test.pod) - if err != nil { - t.Errorf("Error while creating high priority pod: %v", err) - } - err = wait.PollUntilContextTimeout(testCtx.Ctx, 100*time.Millisecond, wait.ForeverTestTimeout, false, func(ctx context.Context) (bool, error) { - preemptor, err = cs.CoreV1().Pods(test.pod.Namespace).Get(ctx, test.pod.Name, metav1.GetOptions{}) + err = wait.PollUntilContextTimeout(testCtx.Ctx, 100*time.Millisecond, wait.ForeverTestTimeout, false, func(ctx context.Context) (bool, error) { + preemptor, err = cs.CoreV1().Pods(test.pod.Namespace).Get(ctx, test.pod.Name, metav1.GetOptions{}) + if err != nil { + t.Errorf("Error getting the preemptor pod info: %v", err) + } + if len(preemptor.Spec.NodeName) == 0 { + return false, err + } + return true, nil + }) if err != nil { - t.Errorf("Error getting the preemptor pod info: %v", err) + t.Errorf("Cannot schedule Pod %v/%v, error: %v", test.pod.Namespace, test.pod.Name, err) } - if len(preemptor.Spec.NodeName) == 0 { - return false, err + // Make sure the pod has been scheduled to the right node. + if preemptor.Spec.NodeName != test.runningNode { + t.Errorf("Expect pod running on %v, got %v.", test.runningNode, preemptor.Spec.NodeName) } - return true, nil }) - if err != nil { - t.Errorf("Cannot schedule Pod %v/%v, error: %v", test.pod.Namespace, test.pod.Name, err) - } - // Make sure the pod has been scheduled to the right node. - if preemptor.Spec.NodeName != test.runningNode { - t.Errorf("Expect pod running on %v, got %v.", test.runningNode, preemptor.Spec.NodeName) - } - }) + } } } @@ -1912,52 +2441,54 @@ func TestReadWriteOncePodPreemption(t *testing.T) { t.Fatalf("Error creating node: %v", err) } - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - if err := test.init(); err != nil { - t.Fatalf("Error while initializing test: %v", err) - } + for _, asyncPreemptionEnabled := range []bool{true, false} { + for _, test := range tests { + t.Run(fmt.Sprintf("%s (Async preemption enabled: %v)", test.name, asyncPreemptionEnabled), func(t *testing.T) { + if err := test.init(); err != nil { + t.Fatalf("Error while initializing test: %v", err) + } - pods := make([]*v1.Pod, len(test.existingPods)) - t.Cleanup(func() { - testutils.CleanupPods(testCtx.Ctx, cs, t, pods) - if err := test.cleanup(); err != nil { - t.Errorf("Error cleaning up test: %v", err) + pods := make([]*v1.Pod, len(test.existingPods)) + t.Cleanup(func() { + testutils.CleanupPods(testCtx.Ctx, cs, t, pods) + if err := test.cleanup(); err != nil { + t.Errorf("Error cleaning up test: %v", err) + } + }) + // Create and run existingPods. + for i, p := range test.existingPods { + var err error + pods[i], err = runPausePod(cs, p) + if err != nil { + t.Fatalf("Error running pause pod: %v", err) + } + } + // Create the "pod". + preemptor, err := createPausePod(cs, test.pod) + if err != nil { + t.Errorf("Error while creating high priority pod: %v", err) + } + pods = append(pods, preemptor) + // Wait for preemption of pods and make sure the other ones are not preempted. + for i, p := range pods { + if _, found := test.preemptedPodIndexes[i]; found { + if err = wait.PollUntilContextTimeout(testCtx.Ctx, time.Second, wait.ForeverTestTimeout, false, + podIsGettingEvicted(cs, p.Namespace, p.Name)); err != nil { + t.Errorf("Pod %v/%v is not getting evicted.", p.Namespace, p.Name) + } + } else { + if p.DeletionTimestamp != nil { + t.Errorf("Didn't expect pod %v to get preempted.", p.Name) + } + } + } + // Also check that the preemptor pod gets the NominatedNodeName field set. + if len(test.preemptedPodIndexes) > 0 { + if err := waitForNominatedNodeName(cs, preemptor); err != nil { + t.Errorf("NominatedNodeName field was not set for pod %v: %v", preemptor.Name, err) + } } }) - // Create and run existingPods. - for i, p := range test.existingPods { - var err error - pods[i], err = runPausePod(cs, p) - if err != nil { - t.Fatalf("Error running pause pod: %v", err) - } - } - // Create the "pod". - preemptor, err := createPausePod(cs, test.pod) - if err != nil { - t.Errorf("Error while creating high priority pod: %v", err) - } - pods = append(pods, preemptor) - // Wait for preemption of pods and make sure the other ones are not preempted. - for i, p := range pods { - if _, found := test.preemptedPodIndexes[i]; found { - if err = wait.PollUntilContextTimeout(testCtx.Ctx, time.Second, wait.ForeverTestTimeout, false, - podIsGettingEvicted(cs, p.Namespace, p.Name)); err != nil { - t.Errorf("Pod %v/%v is not getting evicted.", p.Namespace, p.Name) - } - } else { - if p.DeletionTimestamp != nil { - t.Errorf("Didn't expect pod %v to get preempted.", p.Name) - } - } - } - // Also check that the preemptor pod gets the NominatedNodeName field set. - if len(test.preemptedPodIndexes) > 0 { - if err := waitForNominatedNodeName(cs, preemptor); err != nil { - t.Errorf("NominatedNodeName field was not set for pod %v: %v", preemptor.Name, err) - } - } - }) + } } } diff --git a/test/integration/scheduler_perf/misc/performance-config.yaml b/test/integration/scheduler_perf/misc/performance-config.yaml index f189046b58c..6fc3387b13b 100644 --- a/test/integration/scheduler_perf/misc/performance-config.yaml +++ b/test/integration/scheduler_perf/misc/performance-config.yaml @@ -354,10 +354,20 @@ initPods: 2000 measurePods: 500 - name: 5000Nodes - featureGates: - SchedulerQueueingHints: false labels: [performance] threshold: 200 + featureGates: + SchedulerQueueingHints: false + SchedulerAsyncPreemption: false + params: + initNodes: 5000 + initPods: 20000 + measurePods: 5000 + - name: 5000Nodes_AsyncPreemptionEnabled + threshold: 200 + labels: [performance] + featureGates: + SchedulerAsyncPreemption: true params: initNodes: 5000 initPods: 20000 diff --git a/test/integration/scheduler_perf/templates/pod-high-priority.yaml b/test/integration/scheduler_perf/templates/pod-high-priority.yaml index 554e0608d87..68899e8c376 100644 --- a/test/integration/scheduler_perf/templates/pod-high-priority.yaml +++ b/test/integration/scheduler_perf/templates/pod-high-priority.yaml @@ -1,7 +1,7 @@ apiVersion: v1 kind: Pod metadata: - generateName: pod- + generateName: pod-high-priority- spec: priority: 10 containers: