From 4e99ada05f565e2de8839445089476735b436593 Mon Sep 17 00:00:00 2001 From: Gabe <15304068+gabesaba@users.noreply.github.com> Date: Mon, 22 Apr 2024 17:00:58 +0000 Subject: [PATCH 1/4] Filter gated pods before calling isPodWorthRequeueing --- .../internal/queue/scheduling_queue.go | 5 ++ .../internal/queue/scheduling_queue_test.go | 39 ++++++++++++- .../scheduler/plugins/plugins_test.go | 2 +- test/integration/scheduler/queue_test.go | 57 +++++++++++++++++++ 4 files changed, 101 insertions(+), 2 deletions(-) diff --git a/pkg/scheduler/internal/queue/scheduling_queue.go b/pkg/scheduler/internal/queue/scheduling_queue.go index 115f81afc13..93a8333a4a5 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue.go +++ b/pkg/scheduler/internal/queue/scheduling_queue.go @@ -1175,6 +1175,11 @@ func (p *PriorityQueue) movePodsToActiveOrBackoffQueue(logger klog.Logger, podIn activated := false for _, pInfo := range podInfoList { + // Since there may be many gated pods and they will not move from the + // unschedulable pool, we skip calling the expensive isPodWorthRequeueing. + if pInfo.Gated { + continue + } schedulingHint := p.isPodWorthRequeuing(logger, pInfo, event, oldObj, newObj) if schedulingHint == queueSkip { // QueueingHintFn determined that this Pod isn't worth putting to activeQ or backoffQ by this event. diff --git a/pkg/scheduler/internal/queue/scheduling_queue_test.go b/pkg/scheduler/internal/queue/scheduling_queue_test.go index e160b89a4e9..bb6c515ed4d 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue_test.go +++ b/pkg/scheduler/internal/queue/scheduling_queue_test.go @@ -44,6 +44,7 @@ import ( "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/queuesort" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/schedulinggates" "k8s.io/kubernetes/pkg/scheduler/metrics" st "k8s.io/kubernetes/pkg/scheduler/testing" "k8s.io/kubernetes/pkg/scheduler/util" @@ -93,6 +94,11 @@ var ( } ) +func setQueuedPodInfoGated(queuedPodInfo *framework.QueuedPodInfo) *framework.QueuedPodInfo { + queuedPodInfo.Gated = true + return queuedPodInfo +} + func getUnschedulablePod(p *PriorityQueue, pod *v1.Pod) *v1.Pod { pInfo := p.unschedulablePods.get(pod) if pInfo != nil { @@ -1451,6 +1457,14 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueueWithQueueingHint(t *testing. hint: queueHintReturnSkip, expectedQ: unschedulablePods, }, + { + name: "QueueHintFunction is not called when Pod is gated", + podInfo: setQueuedPodInfoGated(&framework.QueuedPodInfo{PodInfo: mustNewPodInfo(p), UnschedulablePlugins: sets.New("foo")}), + hint: func(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) { + return framework.Queue, fmt.Errorf("QueueingHintFn should not be called as pod is gated") + }, + expectedQ: unschedulablePods, + }, } for _, test := range tests { @@ -2732,7 +2746,7 @@ func TestPendingPodsMetric(t *testing.T) { gated := makeQueuedPodInfos(total-queueableNum, "y", failme, timestamp) // Manually mark them as gated=true. for _, pInfo := range gated { - pInfo.Gated = true + setQueuedPodInfoGated(pInfo) } pInfos = append(pInfos, gated...) totalWithDelay := 20 @@ -3729,3 +3743,26 @@ func Test_isPodWorthRequeuing(t *testing.T) { }) } } + +func Test_queuedPodInfo_gatedSetUponCreationAndUnsetUponUpdate(t *testing.T) { + logger, ctx := ktesting.NewTestContext(t) + plugin, _ := schedulinggates.New(ctx, nil, nil) + m := map[string][]framework.PreEnqueuePlugin{"": {plugin.(framework.PreEnqueuePlugin)}} + q := NewTestQueue(ctx, newDefaultQueueSort(), WithPreEnqueuePluginMap(m)) + + gatedPod := st.MakePod().SchedulingGates([]string{"hello world"}).Obj() + q.Add(logger, gatedPod) + + if !q.unschedulablePods.get(gatedPod).Gated { + t.Error("expected pod to be gated") + } + + ungatedPod := gatedPod.DeepCopy() + ungatedPod.Spec.SchedulingGates = nil + q.Update(logger, gatedPod, ungatedPod) + + ungatedPodInfo, _ := q.Pop(logger) + if ungatedPodInfo.Gated { + t.Error("expected pod to be ungated") + } +} diff --git a/test/integration/scheduler/plugins/plugins_test.go b/test/integration/scheduler/plugins/plugins_test.go index 8993b78126e..5b8250738b7 100644 --- a/test/integration/scheduler/plugins/plugins_test.go +++ b/test/integration/scheduler/plugins/plugins_test.go @@ -2677,7 +2677,7 @@ func TestSchedulingGatesPluginEventsToRegister(t *testing.T) { { name: "preEnqueue plugin with event registered", enqueuePlugin: &SchedulingGatesPluginWithEvents{SchedulingGates: schedulinggates.SchedulingGates{}}, - count: 3, + count: 2, }, } diff --git a/test/integration/scheduler/queue_test.go b/test/integration/scheduler/queue_test.go index 3f452473a1d..1c48fb18310 100644 --- a/test/integration/scheduler/queue_test.go +++ b/test/integration/scheduler/queue_test.go @@ -152,6 +152,63 @@ func TestSchedulingGates(t *testing.T) { } } +// We create a gated and non-gated pod. After scheduling and deleting the +// non-gated pod, we ensure that the gated pod is schedulable after the removal +// of its gate. +func TestGatedPodSchedulableAfterEvent(t *testing.T) { + testCtx := testutils.InitTestSchedulerWithOptions( + t, + testutils.InitTestAPIServer(t, "pod-scheduling-gates", nil), + 0, + scheduler.WithPodInitialBackoffSeconds(0), + scheduler.WithPodMaxBackoffSeconds(0), + ) + testutils.SyncSchedulerInformerFactory(testCtx) + cs, ns, ctx := testCtx.ClientSet, testCtx.NS.Name, testCtx.Ctx + + // create initially gated pod + pod1 := st.MakePod().Namespace(ns).Name("p1").Container("pause").SchedulingGates([]string{"foo"}).Obj() + if _, err := cs.CoreV1().Pods(ns).Create(ctx, pod1, metav1.CreateOptions{}); err != nil { + t.Fatal("Failed to create p1") + } + + // create immediately schedulable pod + pod2 := st.MakePod().Namespace(ns).Name("p2").Container("pause").Obj() + if _, err := cs.CoreV1().Pods(ns).Create(ctx, pod2, metav1.CreateOptions{}); err != nil { + t.Fatal("Failed to create p2") + } + + // create node on which to schedule pods + node := st.MakeNode().Name("node1").Obj() + if _, err := cs.CoreV1().Nodes().Create(ctx, node, metav1.CreateOptions{}); err != nil { + t.Fatal("Failed to create node1") + } + + // schedule p2 + testCtx.Scheduler.ScheduleOne(testCtx.Ctx) + if err := wait.PollUntilContextTimeout(ctx, 200*time.Millisecond, wait.ForeverTestTimeout, false, testutils.PodScheduled(cs, ns, "p2")); err != nil { + t.Fatal("Failed to schedule p2") + } + + // delete p2, which triggers DeletePodFromCache event + cs.CoreV1().Pods(ns).Delete(ctx, "p2", metav1.DeleteOptions{}) + if err := wait.PollUntilContextTimeout(ctx, 200*time.Millisecond, wait.ForeverTestTimeout, false, testutils.PodDeleted(ctx, cs, ns, "p2")); err != nil { + t.Fatal("Failed to delete p2") + } + + // remove gate from p1 + patch := `{"spec": {"schedulingGates": null}}` + if _, err := cs.CoreV1().Pods(ns).Patch(ctx, "p1", types.StrategicMergePatchType, []byte(patch), metav1.PatchOptions{}); err != nil { + t.Fatal("Failed to remove schedulingGates from p1") + } + + // schedule p1 + testCtx.Scheduler.ScheduleOne(testCtx.Ctx) + if err := wait.PollUntilContextTimeout(ctx, 200*time.Millisecond, wait.ForeverTestTimeout, false, testutils.PodScheduled(cs, ns, "p1")); err != nil { + t.Fatal("Failed to schedule p1") + } +} + // TestCoreResourceEnqueue verify Pods failed by in-tree default plugins can be // moved properly upon their registered events. func TestCoreResourceEnqueue(t *testing.T) { From 9a8ec135051bbd0704ac62cc34375dfddbd7b704 Mon Sep 17 00:00:00 2001 From: Gabe <15304068+gabesaba@users.noreply.github.com> Date: Tue, 30 Apr 2024 12:06:26 +0000 Subject: [PATCH 2/4] make linter happy --- pkg/scheduler/internal/queue/scheduling_queue_test.go | 8 ++++++-- test/integration/scheduler/queue_test.go | 4 +++- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/pkg/scheduler/internal/queue/scheduling_queue_test.go b/pkg/scheduler/internal/queue/scheduling_queue_test.go index bb6c515ed4d..1b2a29da921 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue_test.go +++ b/pkg/scheduler/internal/queue/scheduling_queue_test.go @@ -3751,7 +3751,9 @@ func Test_queuedPodInfo_gatedSetUponCreationAndUnsetUponUpdate(t *testing.T) { q := NewTestQueue(ctx, newDefaultQueueSort(), WithPreEnqueuePluginMap(m)) gatedPod := st.MakePod().SchedulingGates([]string{"hello world"}).Obj() - q.Add(logger, gatedPod) + if err := q.Add(logger, gatedPod); err != nil { + t.Error("Error calling Add") + } if !q.unschedulablePods.get(gatedPod).Gated { t.Error("expected pod to be gated") @@ -3759,7 +3761,9 @@ func Test_queuedPodInfo_gatedSetUponCreationAndUnsetUponUpdate(t *testing.T) { ungatedPod := gatedPod.DeepCopy() ungatedPod.Spec.SchedulingGates = nil - q.Update(logger, gatedPod, ungatedPod) + if err := q.Update(logger, gatedPod, ungatedPod); err != nil { + t.Error("Error calling Update") + } ungatedPodInfo, _ := q.Pop(logger) if ungatedPodInfo.Gated { diff --git a/test/integration/scheduler/queue_test.go b/test/integration/scheduler/queue_test.go index 1c48fb18310..180a69f01ca 100644 --- a/test/integration/scheduler/queue_test.go +++ b/test/integration/scheduler/queue_test.go @@ -191,7 +191,9 @@ func TestGatedPodSchedulableAfterEvent(t *testing.T) { } // delete p2, which triggers DeletePodFromCache event - cs.CoreV1().Pods(ns).Delete(ctx, "p2", metav1.DeleteOptions{}) + if err := cs.CoreV1().Pods(ns).Delete(ctx, "p2", metav1.DeleteOptions{}); err != nil { + t.Fatal("Error calling Delete on p2") + } if err := wait.PollUntilContextTimeout(ctx, 200*time.Millisecond, wait.ForeverTestTimeout, false, testutils.PodDeleted(ctx, cs, ns, "p2")); err != nil { t.Fatal("Failed to delete p2") } From 6c6be931ee90660d9d29d6200ac55feac6f3055f Mon Sep 17 00:00:00 2001 From: Gabe <15304068+gabesaba@users.noreply.github.com> Date: Thu, 2 May 2024 10:29:15 +0000 Subject: [PATCH 3/4] revert unit test --- .../internal/queue/scheduling_queue_test.go | 28 ------------------- 1 file changed, 28 deletions(-) diff --git a/pkg/scheduler/internal/queue/scheduling_queue_test.go b/pkg/scheduler/internal/queue/scheduling_queue_test.go index 1b2a29da921..dd7a9ad84c5 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue_test.go +++ b/pkg/scheduler/internal/queue/scheduling_queue_test.go @@ -44,7 +44,6 @@ import ( "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/queuesort" - "k8s.io/kubernetes/pkg/scheduler/framework/plugins/schedulinggates" "k8s.io/kubernetes/pkg/scheduler/metrics" st "k8s.io/kubernetes/pkg/scheduler/testing" "k8s.io/kubernetes/pkg/scheduler/util" @@ -3743,30 +3742,3 @@ func Test_isPodWorthRequeuing(t *testing.T) { }) } } - -func Test_queuedPodInfo_gatedSetUponCreationAndUnsetUponUpdate(t *testing.T) { - logger, ctx := ktesting.NewTestContext(t) - plugin, _ := schedulinggates.New(ctx, nil, nil) - m := map[string][]framework.PreEnqueuePlugin{"": {plugin.(framework.PreEnqueuePlugin)}} - q := NewTestQueue(ctx, newDefaultQueueSort(), WithPreEnqueuePluginMap(m)) - - gatedPod := st.MakePod().SchedulingGates([]string{"hello world"}).Obj() - if err := q.Add(logger, gatedPod); err != nil { - t.Error("Error calling Add") - } - - if !q.unschedulablePods.get(gatedPod).Gated { - t.Error("expected pod to be gated") - } - - ungatedPod := gatedPod.DeepCopy() - ungatedPod.Spec.SchedulingGates = nil - if err := q.Update(logger, gatedPod, ungatedPod); err != nil { - t.Error("Error calling Update") - } - - ungatedPodInfo, _ := q.Pop(logger) - if ungatedPodInfo.Gated { - t.Error("expected pod to be ungated") - } -} From 558945958e240194c54920416fa03f0bc3fbb341 Mon Sep 17 00:00:00 2001 From: Gabe <15304068+gabesaba@users.noreply.github.com> Date: Tue, 7 May 2024 08:44:44 +0000 Subject: [PATCH 4/4] refactor integ test --- test/integration/scheduler/queue_test.go | 137 +++++++++-------------- 1 file changed, 54 insertions(+), 83 deletions(-) diff --git a/test/integration/scheduler/queue_test.go b/test/integration/scheduler/queue_test.go index 180a69f01ca..46914c97eee 100644 --- a/test/integration/scheduler/queue_test.go +++ b/test/integration/scheduler/queue_test.go @@ -57,11 +57,11 @@ import ( func TestSchedulingGates(t *testing.T) { tests := []struct { - name string - pods []*v1.Pod - want []string - rmPodsSchedulingGates []int - wantPostGatesRemoval []string + name string + pods []*v1.Pod + schedule []string + delete []string + rmGates []string }{ { name: "regular pods", @@ -69,7 +69,7 @@ func TestSchedulingGates(t *testing.T) { st.MakePod().Name("p1").Container("pause").Obj(), st.MakePod().Name("p2").Container("pause").Obj(), }, - want: []string{"p1", "p2"}, + schedule: []string{"p1", "p2"}, }, { name: "one pod carrying scheduling gates", @@ -77,7 +77,7 @@ func TestSchedulingGates(t *testing.T) { st.MakePod().Name("p1").SchedulingGates([]string{"foo"}).Container("pause").Obj(), st.MakePod().Name("p2").Container("pause").Obj(), }, - want: []string{"p2"}, + schedule: []string{"p2"}, }, { name: "two pod carrying scheduling gates, and remove gates of one pod", @@ -86,9 +86,18 @@ func TestSchedulingGates(t *testing.T) { st.MakePod().Name("p2").SchedulingGates([]string{"bar"}).Container("pause").Obj(), st.MakePod().Name("p3").Container("pause").Obj(), }, - want: []string{"p3"}, - rmPodsSchedulingGates: []int{1}, // remove gates of 'p2' - wantPostGatesRemoval: []string{"p2"}, + schedule: []string{"p3"}, + rmGates: []string{"p2"}, + }, + { + name: "gated pod schedulable after deleting the scheduled pod and removing gate", + pods: []*v1.Pod{ + st.MakePod().Name("p1").SchedulingGates([]string{"foo"}).Container("pause").Obj(), + st.MakePod().Name("p2").Container("pause").Obj(), + }, + schedule: []string{"p2"}, + delete: []string{"p2"}, + rmGates: []string{"p1"}, }, } @@ -107,6 +116,15 @@ func TestSchedulingGates(t *testing.T) { testutils.SyncSchedulerInformerFactory(testCtx) cs, ns, ctx := testCtx.ClientSet, testCtx.NS.Name, testCtx.Ctx + + // Create node, so we can schedule pods. + node := st.MakeNode().Name("node").Obj() + if _, err := cs.CoreV1().Nodes().Create(ctx, node, metav1.CreateOptions{}); err != nil { + t.Fatal("Failed to create node") + + } + + // Create pods. for _, p := range tt.pods { p.Namespace = ns if _, err := cs.CoreV1().Pods(ns).Create(ctx, p, metav1.CreateOptions{}); err != nil { @@ -122,95 +140,48 @@ func TestSchedulingGates(t *testing.T) { t.Fatal(err) } - // Pop the expected pods out. They should be de-queueable. - for _, wantPod := range tt.want { - podInfo := testutils.NextPodOrDie(t, testCtx) - if got := podInfo.Pod.Name; got != wantPod { - t.Errorf("Want %v to be popped out, but got %v", wantPod, got) + // Schedule pods. + for _, podName := range tt.schedule { + testCtx.Scheduler.ScheduleOne(testCtx.Ctx) + if err := wait.PollUntilContextTimeout(ctx, time.Millisecond*200, wait.ForeverTestTimeout, false, testutils.PodScheduled(cs, ns, podName)); err != nil { + t.Fatalf("Failed to schedule %s", podName) } } - if len(tt.rmPodsSchedulingGates) == 0 { - return + // Delete pods, which triggers AssignedPodDelete event in the scheduling queue. + for _, podName := range tt.delete { + if err := cs.CoreV1().Pods(ns).Delete(ctx, podName, metav1.DeleteOptions{}); err != nil { + t.Fatalf("Error calling Delete on %s", podName) + } + if err := wait.PollUntilContextTimeout(ctx, time.Millisecond*200, wait.ForeverTestTimeout, false, testutils.PodDeleted(ctx, cs, ns, podName)); err != nil { + t.Fatalf("Failed to delete %s", podName) + } } + + // Ensure gated pods are not in ActiveQ + if len(testCtx.Scheduler.SchedulingQueue.PodsInActiveQ()) > 0 { + t.Fatal("Expected no schedulable pods") + } + // Remove scheduling gates from the pod spec. - for _, idx := range tt.rmPodsSchedulingGates { + for _, podName := range tt.rmGates { patch := `{"spec": {"schedulingGates": null}}` - podName := tt.pods[idx].Name if _, err := cs.CoreV1().Pods(ns).Patch(ctx, podName, types.StrategicMergePatchType, []byte(patch), metav1.PatchOptions{}); err != nil { t.Fatalf("Failed to patch pod %v: %v", podName, err) } } - // Pop the expected pods out. They should be de-queueable. - for _, wantPod := range tt.wantPostGatesRemoval { - podInfo := testutils.NextPodOrDie(t, testCtx) - if got := podInfo.Pod.Name; got != wantPod { - t.Errorf("Want %v to be popped out, but got %v", wantPod, got) + + // Schedule pods which no longer have gates. + for _, podName := range tt.rmGates { + testCtx.Scheduler.ScheduleOne(testCtx.Ctx) + if err := wait.PollUntilContextTimeout(ctx, time.Millisecond*200, wait.ForeverTestTimeout, false, testutils.PodScheduled(cs, ns, podName)); err != nil { + t.Fatalf("Failed to schedule %s", podName) } } }) } } -// We create a gated and non-gated pod. After scheduling and deleting the -// non-gated pod, we ensure that the gated pod is schedulable after the removal -// of its gate. -func TestGatedPodSchedulableAfterEvent(t *testing.T) { - testCtx := testutils.InitTestSchedulerWithOptions( - t, - testutils.InitTestAPIServer(t, "pod-scheduling-gates", nil), - 0, - scheduler.WithPodInitialBackoffSeconds(0), - scheduler.WithPodMaxBackoffSeconds(0), - ) - testutils.SyncSchedulerInformerFactory(testCtx) - cs, ns, ctx := testCtx.ClientSet, testCtx.NS.Name, testCtx.Ctx - - // create initially gated pod - pod1 := st.MakePod().Namespace(ns).Name("p1").Container("pause").SchedulingGates([]string{"foo"}).Obj() - if _, err := cs.CoreV1().Pods(ns).Create(ctx, pod1, metav1.CreateOptions{}); err != nil { - t.Fatal("Failed to create p1") - } - - // create immediately schedulable pod - pod2 := st.MakePod().Namespace(ns).Name("p2").Container("pause").Obj() - if _, err := cs.CoreV1().Pods(ns).Create(ctx, pod2, metav1.CreateOptions{}); err != nil { - t.Fatal("Failed to create p2") - } - - // create node on which to schedule pods - node := st.MakeNode().Name("node1").Obj() - if _, err := cs.CoreV1().Nodes().Create(ctx, node, metav1.CreateOptions{}); err != nil { - t.Fatal("Failed to create node1") - } - - // schedule p2 - testCtx.Scheduler.ScheduleOne(testCtx.Ctx) - if err := wait.PollUntilContextTimeout(ctx, 200*time.Millisecond, wait.ForeverTestTimeout, false, testutils.PodScheduled(cs, ns, "p2")); err != nil { - t.Fatal("Failed to schedule p2") - } - - // delete p2, which triggers DeletePodFromCache event - if err := cs.CoreV1().Pods(ns).Delete(ctx, "p2", metav1.DeleteOptions{}); err != nil { - t.Fatal("Error calling Delete on p2") - } - if err := wait.PollUntilContextTimeout(ctx, 200*time.Millisecond, wait.ForeverTestTimeout, false, testutils.PodDeleted(ctx, cs, ns, "p2")); err != nil { - t.Fatal("Failed to delete p2") - } - - // remove gate from p1 - patch := `{"spec": {"schedulingGates": null}}` - if _, err := cs.CoreV1().Pods(ns).Patch(ctx, "p1", types.StrategicMergePatchType, []byte(patch), metav1.PatchOptions{}); err != nil { - t.Fatal("Failed to remove schedulingGates from p1") - } - - // schedule p1 - testCtx.Scheduler.ScheduleOne(testCtx.Ctx) - if err := wait.PollUntilContextTimeout(ctx, 200*time.Millisecond, wait.ForeverTestTimeout, false, testutils.PodScheduled(cs, ns, "p1")); err != nil { - t.Fatal("Failed to schedule p1") - } -} - // TestCoreResourceEnqueue verify Pods failed by in-tree default plugins can be // moved properly upon their registered events. func TestCoreResourceEnqueue(t *testing.T) {