diff --git a/pkg/scheduler/eventhandlers.go b/pkg/scheduler/eventhandlers.go index 3b009d76fb6..a6b1f066c77 100644 --- a/pkg/scheduler/eventhandlers.go +++ b/pkg/scheduler/eventhandlers.go @@ -160,6 +160,16 @@ func (sched *Scheduler) updatePodInSchedulingQueue(oldObj, newObj interface{}) { logger.V(4).Info("Update event for unscheduled pod", "pod", klog.KObj(newPod)) sched.SchedulingQueue.Update(logger, oldPod, newPod) + if hasNominatedNodeNameChanged(oldPod, newPod) { + // Nominated node changed in pod, so we need to treat it as if the pod was deleted from the old nominated node, + // because the scheduler treats such a pod as if it was already assigned when scheduling lower or equal priority pods. + sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, framework.EventAssignedPodDelete, oldPod, nil, getLEPriorityPreCheck(corev1helpers.PodPriority(oldPod))) + } +} + +// hasNominatedNodeNameChanged returns true when nominated node name has existed but changed. +func hasNominatedNodeNameChanged(oldPod, newPod *v1.Pod) bool { + return len(oldPod.Status.NominatedNodeName) > 0 && oldPod.Status.NominatedNodeName != newPod.Status.NominatedNodeName } func (sched *Scheduler) deletePodFromSchedulingQueue(obj interface{}) { @@ -195,8 +205,21 @@ func (sched *Scheduler) deletePodFromSchedulingQueue(obj interface{}) { // If a waiting pod is rejected, it indicates it's previously assumed and we're // removing it from the scheduler cache. In this case, signal a AssignedPodDelete // event to immediately retry some unscheduled Pods. + // Similarly when a pod that had nominated node is deleted, it can unblock scheduling of other pods, + // because the lower or equal priority pods treat such a pod as if it was assigned. if fwk.RejectWaitingPod(pod.UID) { sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, framework.EventAssignedPodDelete, pod, nil, nil) + } else if pod.Status.NominatedNodeName != "" { + // Note that a nominated pod can fall into `RejectWaitingPod` case as well, + // but in that case the `MoveAllToActiveOrBackoffQueue` already covered lower priority pods. + sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, framework.EventAssignedPodDelete, pod, nil, getLEPriorityPreCheck(corev1helpers.PodPriority(pod))) + } +} + +// getLEPriorityPreCheck is a PreEnqueueCheck function that selects only lower or equal priority pods. +func getLEPriorityPreCheck(priority int32) queue.PreEnqueueCheck { + return func(pod *v1.Pod) bool { + return corev1helpers.PodPriority(pod) <= priority } } diff --git a/pkg/scheduler/eventhandlers_test.go b/pkg/scheduler/eventhandlers_test.go index 37c7341a858..8f3f0b21345 100644 --- a/pkg/scheduler/eventhandlers_test.go +++ b/pkg/scheduler/eventhandlers_test.go @@ -18,6 +18,7 @@ package scheduler import ( "context" + "fmt" "reflect" "testing" "time" @@ -30,8 +31,10 @@ import ( resourceapi "k8s.io/api/resource/v1beta1" storagev1 "k8s.io/api/storage/v1" "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/apimachinery/pkg/util/sets" utilfeature "k8s.io/apiserver/pkg/util/feature" featuregatetesting "k8s.io/component-base/featuregate/testing" + "k8s.io/klog/v2" "k8s.io/klog/v2/ktesting" "k8s.io/apimachinery/pkg/runtime" @@ -42,18 +45,168 @@ import ( "k8s.io/client-go/kubernetes/fake" "k8s.io/kubernetes/pkg/features" - "k8s.io/kubernetes/pkg/scheduler/backend/cache" - "k8s.io/kubernetes/pkg/scheduler/backend/queue" + internalcache "k8s.io/kubernetes/pkg/scheduler/backend/cache" + internalqueue "k8s.io/kubernetes/pkg/scheduler/backend/queue" "k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeaffinity" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodename" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeports" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/noderesources" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/queuesort" "k8s.io/kubernetes/pkg/scheduler/metrics" st "k8s.io/kubernetes/pkg/scheduler/testing" + "k8s.io/kubernetes/pkg/scheduler/util" "k8s.io/kubernetes/pkg/scheduler/util/assumecache" ) +func TestEventHandlers_MoveToActiveOnNominatedNodeUpdate(t *testing.T) { + metrics.Register() + highPriorityPod := + st.MakePod().Name("hpp").Namespace("ns1").UID("hppns1").Priority(highPriority).SchedulerName(testSchedulerName).Obj() + + medNominatedPriorityPod := + st.MakePod().Name("mpp").Namespace("ns2").UID("mppns1").Priority(midPriority).SchedulerName(testSchedulerName).NominatedNodeName("node1").Obj() + medPriorityPod := + st.MakePod().Name("smpp").Namespace("ns3").UID("mppns2").Priority(midPriority).SchedulerName(testSchedulerName).Obj() + + lowPriorityPod := + st.MakePod().Name("lpp").Namespace("ns4").UID("lppns1").Priority(lowPriority).SchedulerName(testSchedulerName).Obj() + + unschedulablePods := []*v1.Pod{highPriorityPod, medNominatedPriorityPod, medPriorityPod, lowPriorityPod} + + // Make pods schedulable on Delete event when QHints are enabled, but not when nominated node appears. + queueHintForPodDelete := func(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) { + oldPod, _, err := util.As[*v1.Pod](oldObj, newObj) + if err != nil { + t.Errorf("Failed to convert objects to pods: %v", err) + } + if oldPod.Status.NominatedNodeName == "" { + return framework.QueueSkip, nil + } + return framework.Queue, nil + } + queueingHintMap := internalqueue.QueueingHintMapPerProfile{ + testSchedulerName: { + framework.EventAssignedPodDelete: { + { + PluginName: "fooPlugin1", + QueueingHintFn: queueHintForPodDelete, + }, + }, + }, + } + + tests := []struct { + name string + updateFunc func(s *Scheduler) + wantInActive sets.Set[string] + }{ + { + name: "Update of a nominated node name to a different value should trigger rescheduling of lower priority pods", + updateFunc: func(s *Scheduler) { + updatedPod := medNominatedPriorityPod.DeepCopy() + updatedPod.Status.NominatedNodeName = "node2" + updatedPod.ResourceVersion = "1" + s.updatePodInSchedulingQueue(medNominatedPriorityPod, updatedPod) + }, + wantInActive: sets.New(lowPriorityPod.Name, medPriorityPod.Name, medNominatedPriorityPod.Name), + }, + { + name: "Removal of a nominated node name should trigger rescheduling of lower priority pods", + updateFunc: func(s *Scheduler) { + updatedPod := medNominatedPriorityPod.DeepCopy() + updatedPod.Status.NominatedNodeName = "" + updatedPod.ResourceVersion = "1" + s.updatePodInSchedulingQueue(medNominatedPriorityPod, updatedPod) + }, + wantInActive: sets.New(lowPriorityPod.Name, medPriorityPod.Name, medNominatedPriorityPod.Name), + }, + { + name: "Removal of a pod that had nominated node name should trigger rescheduling of lower priority pods", + updateFunc: func(s *Scheduler) { + s.deletePodFromSchedulingQueue(medNominatedPriorityPod) + }, + wantInActive: sets.New(lowPriorityPod.Name, medPriorityPod.Name), + }, + { + name: "Addition of a nominated node name to the high priority pod that did not have it before shouldn't trigger rescheduling", + updateFunc: func(s *Scheduler) { + updatedPod := highPriorityPod.DeepCopy() + updatedPod.Status.NominatedNodeName = "node2" + updatedPod.ResourceVersion = "1" + s.updatePodInSchedulingQueue(highPriorityPod, updatedPod) + }, + wantInActive: sets.New[string](), + }, + } + + for _, tt := range tests { + for _, qHintEnabled := range []bool{false, true} { + t.Run(fmt.Sprintf("%s, with queuehint(%v)", tt.name, qHintEnabled), func(t *testing.T) { + featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.SchedulerQueueingHints, qHintEnabled) + + logger, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + var objs []runtime.Object + for _, pod := range unschedulablePods { + objs = append(objs, pod) + } + client := fake.NewClientset(objs...) + informerFactory := informers.NewSharedInformerFactory(client, 0) + + recorder := metrics.NewMetricsAsyncRecorder(3, 20*time.Microsecond, ctx.Done()) + queue := internalqueue.NewPriorityQueue( + newDefaultQueueSort(), + informerFactory, + internalqueue.WithMetricsRecorder(*recorder), + internalqueue.WithQueueingHintMapPerProfile(queueingHintMap), + // disable backoff queue + internalqueue.WithPodInitialBackoffDuration(0), + internalqueue.WithPodMaxBackoffDuration(0)) + schedulerCache := internalcache.New(ctx, 30*time.Second) + + // Put test pods into unschedulable queue + for _, pod := range unschedulablePods { + queue.Add(logger, pod) + poppedPod, err := queue.Pop(logger) + if err != nil { + t.Fatalf("Pop failed: %v", err) + } + poppedPod.UnschedulablePlugins = sets.New("fooPlugin1") + if err := queue.AddUnschedulableIfNotPresent(logger, poppedPod, queue.SchedulingCycle()); err != nil { + t.Errorf("Unexpected error from AddUnschedulableIfNotPresent: %v", err) + } + } + + s, _, err := initScheduler(ctx, schedulerCache, queue, client, informerFactory) + if err != nil { + t.Fatalf("Failed to initialize test scheduler: %v", err) + } + + if len(s.SchedulingQueue.PodsInActiveQ()) > 0 { + t.Errorf("No pods were expected to be in the activeQ before the update, but there were %v", s.SchedulingQueue.PodsInActiveQ()) + } + tt.updateFunc(s) + if len(s.SchedulingQueue.PodsInActiveQ()) != len(tt.wantInActive) { + t.Errorf("Different number of pods were expected to be in the activeQ, but found actual %v vs. expected %v", s.SchedulingQueue.PodsInActiveQ(), tt.wantInActive) + } + for _, pod := range s.SchedulingQueue.PodsInActiveQ() { + if !tt.wantInActive.Has(pod.Name) { + t.Errorf("Found unexpected pod in activeQ: %s", pod.Name) + } + } + }) + } + } +} + +func newDefaultQueueSort() framework.LessFunc { + sort := &queuesort.PrioritySort{} + return sort.Less +} + func TestUpdatePodInCache(t *testing.T) { ttl := 10 * time.Second nodeName := "node" @@ -81,8 +234,8 @@ func TestUpdatePodInCache(t *testing.T) { ctx, cancel := context.WithCancel(ctx) defer cancel() sched := &Scheduler{ - Cache: cache.New(ctx, ttl), - SchedulingQueue: queue.NewTestQueue(ctx, nil), + Cache: internalcache.New(ctx, ttl), + SchedulingQueue: internalqueue.NewTestQueue(ctx, nil), logger: logger, } sched.addPodToCache(tt.oldObj) @@ -354,7 +507,7 @@ func TestAddAllEventHandlers(t *testing.T) { defer cancel() informerFactory := informers.NewSharedInformerFactory(fake.NewClientset(), 0) - schedulingQueue := queue.NewTestQueueWithInformerFactory(ctx, nil, informerFactory) + schedulingQueue := internalqueue.NewTestQueueWithInformerFactory(ctx, nil, informerFactory) testSched := Scheduler{ StopEverything: ctx.Done(), SchedulingQueue: schedulingQueue, diff --git a/pkg/scheduler/framework/plugins/nodeports/node_ports.go b/pkg/scheduler/framework/plugins/nodeports/node_ports.go index 74350150c3c..21835f6c122 100644 --- a/pkg/scheduler/framework/plugins/nodeports/node_ports.go +++ b/pkg/scheduler/framework/plugins/nodeports/node_ports.go @@ -143,7 +143,7 @@ func (pl *NodePorts) isSchedulableAfterPodDeleted(logger klog.Logger, pod *v1.Po } // If the deleted pod is unscheduled, it doesn't make the target pod schedulable. - if deletedPod.Spec.NodeName == "" { + if deletedPod.Spec.NodeName == "" && deletedPod.Status.NominatedNodeName == "" { logger.V(4).Info("the deleted pod is unscheduled and it doesn't make the target pod schedulable", "pod", klog.KObj(pod), "deletedPod", klog.KObj(deletedPod)) return framework.QueueSkip, nil } diff --git a/pkg/scheduler/framework/plugins/noderesources/fit.go b/pkg/scheduler/framework/plugins/noderesources/fit.go index ef6c2244c6b..d94554ad6d7 100644 --- a/pkg/scheduler/framework/plugins/noderesources/fit.go +++ b/pkg/scheduler/framework/plugins/noderesources/fit.go @@ -294,7 +294,7 @@ func (f *Fit) isSchedulableAfterPodEvent(logger klog.Logger, pod *v1.Pod, oldObj } if modifiedPod == nil { - if originalPod.Spec.NodeName == "" { + if originalPod.Spec.NodeName == "" && originalPod.Status.NominatedNodeName == "" { logger.V(5).Info("the deleted pod was unscheduled and it wouldn't make the unscheduled pod schedulable", "pod", klog.KObj(pod), "deletedPod", klog.KObj(originalPod)) return framework.QueueSkip, nil } diff --git a/pkg/scheduler/framework/plugins/nodevolumelimits/csi.go b/pkg/scheduler/framework/plugins/nodevolumelimits/csi.go index b0c35b4c660..2387b5ac85a 100644 --- a/pkg/scheduler/framework/plugins/nodevolumelimits/csi.go +++ b/pkg/scheduler/framework/plugins/nodevolumelimits/csi.go @@ -104,7 +104,7 @@ func (pl *CSILimits) isSchedulableAfterPodDeleted(logger klog.Logger, pod *v1.Po return framework.QueueSkip, nil } - if deletedPod.Spec.NodeName == "" { + if deletedPod.Spec.NodeName == "" && deletedPod.Status.NominatedNodeName == "" { return framework.QueueSkip, nil } diff --git a/pkg/scheduler/framework/runtime/framework.go b/pkg/scheduler/framework/runtime/framework.go index dbdc6d0986e..77dc23bc52d 100644 --- a/pkg/scheduler/framework/runtime/framework.go +++ b/pkg/scheduler/framework/runtime/framework.go @@ -1011,7 +1011,7 @@ func (f *frameworkImpl) RunFilterPluginsWithNominatedPods(ctx context.Context, s nodeInfoToUse := info if i == 0 { var err error - podsAdded, stateToUse, nodeInfoToUse, err = addNominatedPods(ctx, f, pod, state, info) + podsAdded, stateToUse, nodeInfoToUse, err = addGENominatedPods(ctx, f, pod, state, info) if err != nil { return framework.AsStatus(err) } @@ -1028,10 +1028,10 @@ func (f *frameworkImpl) RunFilterPluginsWithNominatedPods(ctx context.Context, s return status } -// addNominatedPods adds pods with equal or greater priority which are nominated +// addGENominatedPods adds pods with equal or greater priority which are nominated // to run on the node. It returns 1) whether any pod was added, 2) augmented cycleState, // 3) augmented nodeInfo. -func addNominatedPods(ctx context.Context, fh framework.Handle, pod *v1.Pod, state *framework.CycleState, nodeInfo *framework.NodeInfo) (bool, *framework.CycleState, *framework.NodeInfo, error) { +func addGENominatedPods(ctx context.Context, fh framework.Handle, pod *v1.Pod, state *framework.CycleState, nodeInfo *framework.NodeInfo) (bool, *framework.CycleState, *framework.NodeInfo, error) { if fh == nil { // This may happen only in tests. return false, state, nodeInfo, nil diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index e0b6ba644da..857fa1af9e8 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -50,6 +50,7 @@ import ( "k8s.io/kubernetes/pkg/scheduler/metrics" "k8s.io/kubernetes/pkg/scheduler/profile" "k8s.io/kubernetes/pkg/scheduler/util/assumecache" + "k8s.io/utils/clock" ) const ( @@ -116,6 +117,7 @@ func (sched *Scheduler) applyDefaultHandlers() { } type schedulerOptions struct { + clock clock.Clock componentConfigVersion string kubeConfig *restclient.Config // Overridden by profile level percentageOfNodesToScore if set in v1. @@ -227,6 +229,13 @@ func WithExtenders(e ...schedulerapi.Extender) Option { } } +// WithClock sets clock for PriorityQueue, the default clock is clock.RealClock. +func WithClock(clock clock.Clock) Option { + return func(o *schedulerOptions) { + o.clock = clock + } +} + // FrameworkCapturer is used for registering a notify function in building framework. type FrameworkCapturer func(schedulerapi.KubeSchedulerProfile) @@ -238,6 +247,7 @@ func WithBuildFrameworkCapturer(fc FrameworkCapturer) Option { } var defaultSchedulerOptions = schedulerOptions{ + clock: clock.RealClock{}, percentageOfNodesToScore: schedulerapi.DefaultPercentageOfNodesToScore, podInitialBackoffSeconds: int64(internalqueue.DefaultPodInitialBackoffDuration.Seconds()), podMaxBackoffSeconds: int64(internalqueue.DefaultPodMaxBackoffDuration.Seconds()), @@ -343,6 +353,7 @@ func New(ctx context.Context, podQueue := internalqueue.NewSchedulingQueue( profiles[options.profiles[0].SchedulerName].QueueSortFunc(), informerFactory, + internalqueue.WithClock(options.clock), internalqueue.WithPodInitialBackoffDuration(time.Duration(options.podInitialBackoffSeconds)*time.Second), internalqueue.WithPodMaxBackoffDuration(time.Duration(options.podMaxBackoffSeconds)*time.Second), internalqueue.WithPodLister(podLister), diff --git a/test/integration/scheduler/eventhandler/eventhandler_test.go b/test/integration/scheduler/eventhandler/eventhandler_test.go index 1cd7d53336d..b87bb600154 100644 --- a/test/integration/scheduler/eventhandler/eventhandler_test.go +++ b/test/integration/scheduler/eventhandler/eventhandler_test.go @@ -18,13 +18,19 @@ package eventhandler import ( "context" + "fmt" "testing" + "time" v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + utilfeature "k8s.io/apiserver/pkg/util/feature" + featuregatetesting "k8s.io/component-base/featuregate/testing" "k8s.io/component-helpers/scheduling/corev1" configv1 "k8s.io/kube-scheduler/config/v1" + "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/scheduler" configtesting "k8s.io/kubernetes/pkg/scheduler/apis/config/testing" "k8s.io/kubernetes/pkg/scheduler/framework" @@ -32,9 +38,12 @@ import ( st "k8s.io/kubernetes/pkg/scheduler/testing" schedulerutils "k8s.io/kubernetes/test/integration/scheduler" testutils "k8s.io/kubernetes/test/integration/util" + testingclock "k8s.io/utils/clock/testing" "k8s.io/utils/ptr" ) +var lowPriority, mediumPriority, highPriority int32 = 100, 200, 300 + var _ framework.FilterPlugin = &fooPlugin{} type fooPlugin struct { @@ -135,3 +144,136 @@ func TestUpdateNodeEvent(t *testing.T) { t.Errorf("Pod %v was not scheduled: %v", pod.Name, err) } } + +func TestUpdateNominatedNodeName(t *testing.T) { + fakeClock := testingclock.NewFakeClock(time.Now()) + testBackoff := time.Minute + testContext := testutils.InitTestAPIServer(t, "test-event", nil) + capacity := map[v1.ResourceName]string{ + v1.ResourceMemory: "32", + } + var cleanupPods []*v1.Pod + + testNode := st.MakeNode().Name("node-0").Label("kubernetes.io/hostname", "node-0").Capacity(capacity).Obj() + // Note that the low priority pod that cannot fit with the mid priority, but can fit with the high priority one. + podLow := testutils.InitPausePod(&testutils.PausePodConfig{ + Name: "test-lp-pod", + Namespace: testContext.NS.Name, + Priority: &lowPriority, + Resources: &v1.ResourceRequirements{Requests: v1.ResourceList{ + v1.ResourceMemory: *resource.NewQuantity(20, resource.DecimalSI)}, + }}) + cleanupPods = append(cleanupPods, podLow) + podMidNominated := testutils.InitPausePod(&testutils.PausePodConfig{ + Name: "test-nominated-pod", + Namespace: testContext.NS.Name, + Priority: &mediumPriority, + Resources: &v1.ResourceRequirements{Requests: v1.ResourceList{ + v1.ResourceMemory: *resource.NewQuantity(25, resource.DecimalSI)}, + }}) + cleanupPods = append(cleanupPods, podMidNominated) + podHigh := testutils.InitPausePod(&testutils.PausePodConfig{ + Name: "test-hp-pod", + Namespace: testContext.NS.Name, + Priority: &highPriority, + Resources: &v1.ResourceRequirements{Requests: v1.ResourceList{ + v1.ResourceMemory: *resource.NewQuantity(10, resource.DecimalSI)}, + }}) + cleanupPods = append(cleanupPods, podHigh) + + tests := []struct { + name string + updateFunc func(testCtx *testutils.TestContext) + }{ + { + name: "Preempt nominated pod", + updateFunc: func(testCtx *testutils.TestContext) { + // Create high-priority pod and wait until it's scheduled (unnominate mid-priority pod) + pod, err := testutils.CreatePausePod(testCtx.ClientSet, podHigh) + if err != nil { + t.Fatalf("Creating pod error: %v", err) + } + if err = testutils.WaitForPodToSchedule(testCtx.Ctx, testCtx.ClientSet, pod); err != nil { + t.Fatalf("Pod %v was not scheduled: %v", pod.Name, err) + } + }, + }, + { + name: "Remove nominated pod", + updateFunc: func(testCtx *testutils.TestContext) { + if err := testutils.DeletePod(testCtx.ClientSet, podMidNominated.Name, podMidNominated.Namespace); err != nil { + t.Fatalf("Deleting pod error: %v", err) + } + }, + }, + } + + for _, tt := range tests { + for _, qHintEnabled := range []bool{false, true} { + t.Run(fmt.Sprintf("%s, with queuehint(%v)", tt.name, qHintEnabled), func(t *testing.T) { + featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.SchedulerQueueingHints, qHintEnabled) + + testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 0, true, + scheduler.WithClock(fakeClock), + // UpdateFunc needs to be called when the nominated pod is still in the backoff queue, thus small, but non 0 value. + scheduler.WithPodInitialBackoffSeconds(int64(testBackoff.Seconds())), + scheduler.WithPodMaxBackoffSeconds(int64(testBackoff.Seconds())), + ) + defer teardown() + + _, err := testutils.CreateNode(testCtx.ClientSet, testNode) + if err != nil { + t.Fatalf("Creating node error: %v", err) + } + + // Create initial low-priority pod and wait until it's scheduled. + pod, err := testutils.CreatePausePod(testCtx.ClientSet, podLow) + if err != nil { + t.Fatalf("Creating pod error: %v", err) + } + if err := testutils.WaitForPodToSchedule(testCtx.Ctx, testCtx.ClientSet, pod); err != nil { + t.Fatalf("Pod %v was not scheduled: %v", pod.Name, err) + } + + // Create mid-priority pod and wait until it becomes nominated (preempt low-priority pod) and remain uschedulable. + pod, err = testutils.CreatePausePod(testCtx.ClientSet, podMidNominated) + if err != nil { + t.Fatalf("Creating pod error: %v", err) + } + if err := testutils.WaitForNominatedNodeName(testCtx.Ctx, testCtx.ClientSet, pod); err != nil { + t.Errorf("NominatedNodeName field was not set for pod %v: %v", pod.Name, err) + } + if err := testutils.WaitForPodUnschedulable(testCtx.Ctx, testCtx.ClientSet, pod); err != nil { + t.Errorf("Pod %v haven't become unschedulabe: %v", pod.Name, err) + } + + // Remove the initial low-priority pod, which will move the nominated unschedulable pod back to the backoff queue. + if err := testutils.DeletePod(testCtx.ClientSet, podLow.Name, podLow.Namespace); err != nil { + t.Fatalf("Deleting pod error: %v", err) + } + + // Create another low-priority pods which cannot be scheduled because the mid-priority pod is nominated on the node and the node doesn't have enough resource to have two pods both. + pod, err = testutils.CreatePausePod(testCtx.ClientSet, podLow) + if err != nil { + t.Fatalf("Creating pod error: %v", err) + } + if err := testutils.WaitForPodUnschedulable(testCtx.Ctx, testCtx.ClientSet, pod); err != nil { + t.Fatalf("Pod %v was not scheduled: %v", pod.Name, err) + } + + // Update causing the nominated pod to be removed or to get its nominated node name removed, which should trigger scheduling of the low priority pod. + // Note that the update has to happen since the nominated pod is still in the backoffQ to actually test updates of nominated, but not bound yet pods. + tt.updateFunc(testCtx) + + // Advance time by the maxPodBackoffSeconds to move low priority pod out of the backoff queue. + fakeClock.Step(testBackoff) + + // Expect the low-priority pod is notified about unnominated mid-pririty pod and gets scheduled, as it should fit this time. + if err := testutils.WaitForPodToSchedule(testCtx.Ctx, testCtx.ClientSet, podLow); err != nil { + t.Fatalf("Pod %v was not scheduled: %v", podLow.Name, err) + } + testutils.CleanupPods(testCtx.Ctx, testCtx.ClientSet, t, cleanupPods) + }) + } + } +} diff --git a/test/integration/scheduler/preemption/preemption_test.go b/test/integration/scheduler/preemption/preemption_test.go index dc60edb10ce..348f56a384b 100644 --- a/test/integration/scheduler/preemption/preemption_test.go +++ b/test/integration/scheduler/preemption/preemption_test.go @@ -84,26 +84,6 @@ const filterPluginName = "filter-plugin" var lowPriority, mediumPriority, highPriority = int32(100), int32(200), int32(300) -func waitForNominatedNodeNameWithTimeout(ctx context.Context, cs clientset.Interface, pod *v1.Pod, timeout time.Duration) error { - if err := wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, timeout, false, func(ctx context.Context) (bool, error) { - pod, err := cs.CoreV1().Pods(pod.Namespace).Get(ctx, pod.Name, metav1.GetOptions{}) - if err != nil { - return false, err - } - if len(pod.Status.NominatedNodeName) > 0 { - return true, nil - } - return false, err - }); err != nil { - return fmt.Errorf(".status.nominatedNodeName of Pod %v/%v did not get set: %v", pod.Namespace, pod.Name, err) - } - return nil -} - -func waitForNominatedNodeName(ctx context.Context, cs clientset.Interface, pod *v1.Pod) error { - return waitForNominatedNodeNameWithTimeout(ctx, cs, pod, wait.ForeverTestTimeout) -} - const tokenFilterName = "token-filter" // tokenFilter is a fake plugin that implements PreFilter and Filter. @@ -504,7 +484,7 @@ func TestPreemption(t *testing.T) { } // Also check that the preemptor pod gets the NominatedNodeName field set. if len(test.preemptedPodIndexes) > 0 { - if err := waitForNominatedNodeName(testCtx.Ctx, cs, preemptor); err != nil { + if err := testutils.WaitForNominatedNodeName(testCtx.Ctx, cs, preemptor); err != nil { t.Errorf("NominatedNodeName field was not set for pod %v: %v", preemptor.Name, err) } } @@ -1086,7 +1066,7 @@ func TestNonPreemption(t *testing.T) { t.Fatalf("Error while creating preemptor: %v", err) } - err = waitForNominatedNodeNameWithTimeout(testCtx.Ctx, cs, preemptorPod, 5*time.Second) + err = testutils.WaitForNominatedNodeNameWithTimeout(testCtx.Ctx, cs, preemptorPod, 5*time.Second) // test.PreemptionPolicy == nil means we expect the preemptor to be nominated. expect := test.PreemptionPolicy == nil // err == nil indicates the preemptor is indeed nominated. @@ -1168,7 +1148,7 @@ func TestDisablePreemption(t *testing.T) { } // Ensure preemptor should not be nominated. - if err := waitForNominatedNodeNameWithTimeout(testCtx.Ctx, cs, preemptor, 5*time.Second); err == nil { + if err := testutils.WaitForNominatedNodeNameWithTimeout(testCtx.Ctx, cs, preemptor, 5*time.Second); err == nil { t.Errorf("Preemptor %v should not be nominated", preemptor.Name) } @@ -1381,7 +1361,7 @@ func TestPreemptionStarvation(t *testing.T) { t.Errorf("Error while creating the preempting pod: %v", err) } // Check if .status.nominatedNodeName of the preemptor pod gets set. - if err := waitForNominatedNodeName(testCtx.Ctx, cs, preemptor); err != nil { + if err := testutils.WaitForNominatedNodeName(testCtx.Ctx, cs, preemptor); err != nil { t.Errorf(".status.nominatedNodeName was not set for pod %v/%v: %v", preemptor.Namespace, preemptor.Name, err) } // Make sure that preemptor is scheduled after preemptions. @@ -1481,7 +1461,7 @@ func TestPreemptionRaces(t *testing.T) { } } // Check that the preemptor pod gets nominated node name. - if err := waitForNominatedNodeName(testCtx.Ctx, cs, preemptor); err != nil { + if err := testutils.WaitForNominatedNodeName(testCtx.Ctx, cs, preemptor); err != nil { t.Errorf(".status.nominatedNodeName was not set for pod %v/%v: %v", preemptor.Namespace, preemptor.Name, err) } // Make sure that preemptor is scheduled after preemptions. @@ -1577,8 +1557,8 @@ func TestNominatedNodeCleanUp(t *testing.T) { }, postChecks: []func(ctx context.Context, cs clientset.Interface, pod *v1.Pod) error{ testutils.WaitForPodToSchedule, - waitForNominatedNodeName, - waitForNominatedNodeName, + testutils.WaitForNominatedNodeName, + testutils.WaitForNominatedNodeName, }, }, { @@ -1597,7 +1577,7 @@ func TestNominatedNodeCleanUp(t *testing.T) { }, postChecks: []func(ctx context.Context, cs clientset.Interface, pod *v1.Pod) error{ testutils.WaitForPodToSchedule, - waitForNominatedNodeName, + testutils.WaitForNominatedNodeName, testutils.WaitForPodToSchedule, }, podNamesToDelete: []string{"low"}, @@ -1615,7 +1595,7 @@ func TestNominatedNodeCleanUp(t *testing.T) { }, postChecks: []func(ctx context.Context, cs clientset.Interface, pod *v1.Pod) error{ testutils.WaitForPodToSchedule, - waitForNominatedNodeName, + testutils.WaitForNominatedNodeName, }, // Delete the node to simulate an ErrNoNodesAvailable error. deleteNode: true, @@ -1634,7 +1614,7 @@ func TestNominatedNodeCleanUp(t *testing.T) { }, postChecks: []func(ctx context.Context, cs clientset.Interface, pod *v1.Pod) error{ testutils.WaitForPodToSchedule, - waitForNominatedNodeName, + testutils.WaitForNominatedNodeName, }, podNamesToDelete: []string{fmt.Sprintf("low-%v", doNotFailMe)}, customPlugins: &configv1.Plugins{ @@ -1990,7 +1970,7 @@ func TestPDBInPreemption(t *testing.T) { } // Also check if .status.nominatedNodeName of the preemptor pod gets set. if len(test.preemptedPodIndexes) > 0 { - if err := waitForNominatedNodeName(testCtx.Ctx, cs, preemptor); err != nil { + if err := testutils.WaitForNominatedNodeName(testCtx.Ctx, cs, preemptor); err != nil { t.Errorf("Test [%v]: .status.nominatedNodeName was not set for pod %v/%v: %v", test.name, preemptor.Namespace, preemptor.Name, err) } } @@ -2483,7 +2463,7 @@ func TestReadWriteOncePodPreemption(t *testing.T) { } // Also check that the preemptor pod gets the NominatedNodeName field set. if len(test.preemptedPodIndexes) > 0 { - if err := waitForNominatedNodeName(testCtx.Ctx, cs, preemptor); err != nil { + if err := testutils.WaitForNominatedNodeName(testCtx.Ctx, cs, preemptor); err != nil { t.Errorf("NominatedNodeName field was not set for pod %v: %v", preemptor.Name, err) } } diff --git a/test/integration/util/util.go b/test/integration/util/util.go index 370b2010ccb..72d7f65e513 100644 --- a/test/integration/util/util.go +++ b/test/integration/util/util.go @@ -1160,3 +1160,23 @@ func NextPodOrDie(t *testing.T, testCtx *TestContext) *schedulerframework.Queued } return podInfo } + +func WaitForNominatedNodeNameWithTimeout(ctx context.Context, cs clientset.Interface, pod *v1.Pod, timeout time.Duration) error { + if err := wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, timeout, false, func(ctx context.Context) (bool, error) { + pod, err := cs.CoreV1().Pods(pod.Namespace).Get(ctx, pod.Name, metav1.GetOptions{}) + if err != nil { + return false, err + } + if len(pod.Status.NominatedNodeName) > 0 { + return true, nil + } + return false, err + }); err != nil { + return fmt.Errorf(".status.nominatedNodeName of Pod %v/%v did not get set: %w", pod.Namespace, pod.Name, err) + } + return nil +} + +func WaitForNominatedNodeName(ctx context.Context, cs clientset.Interface, pod *v1.Pod) error { + return WaitForNominatedNodeNameWithTimeout(ctx, cs, pod, wait.ForeverTestTimeout) +}