From b5a156971fa21863996cc6ab207d7120b677d2b7 Mon Sep 17 00:00:00 2001 From: Kensei Nakada Date: Thu, 11 Jul 2024 20:41:08 +0900 Subject: [PATCH] scheduler: impose a backoff penalty on gated Pods --- .../backend/queue/scheduling_queue.go | 9 ++- .../backend/queue/scheduling_queue_test.go | 67 +++++++++++++------ pkg/scheduler/framework/types.go | 2 +- 3 files changed, 52 insertions(+), 26 deletions(-) diff --git a/pkg/scheduler/backend/queue/scheduling_queue.go b/pkg/scheduler/backend/queue/scheduling_queue.go index 56d4ec20bab..24d05e6219f 100644 --- a/pkg/scheduler/backend/queue/scheduling_queue.go +++ b/pkg/scheduler/backend/queue/scheduling_queue.go @@ -627,9 +627,6 @@ func (p *PriorityQueue) activate(logger klog.Logger, pod *v1.Pod) bool { // isPodBackingoff returns true if a pod is still waiting for its backoff timer. // If this returns true, the pod should not be re-tried. func (p *PriorityQueue) isPodBackingoff(podInfo *framework.QueuedPodInfo) bool { - if podInfo.Gated { - return false - } boTime := p.getBackoffTime(podInfo) return boTime.After(p.clock.Now()) } @@ -1240,6 +1237,12 @@ func (p *PriorityQueue) getBackoffTime(podInfo *framework.QueuedPodInfo) time.Ti // calculateBackoffDuration is a helper function for calculating the backoffDuration // based on the number of attempts the pod has made. func (p *PriorityQueue) calculateBackoffDuration(podInfo *framework.QueuedPodInfo) time.Duration { + if podInfo.Attempts == 0 { + // When the Pod hasn't experienced any scheduling attempts, + // they aren't obliged to get a backoff penalty at all. + return 0 + } + duration := p.podInitialBackoffDuration for i := 1; i < podInfo.Attempts; i++ { // Use subtraction instead of addition or multiplication to avoid overflow. diff --git a/pkg/scheduler/backend/queue/scheduling_queue_test.go b/pkg/scheduler/backend/queue/scheduling_queue_test.go index 566fe6aa5b0..bcc2f544555 100644 --- a/pkg/scheduler/backend/queue/scheduling_queue_test.go +++ b/pkg/scheduler/backend/queue/scheduling_queue_test.go @@ -697,7 +697,7 @@ func Test_InFlightPods(t *testing.T) { case action.eventHappens != nil: q.MoveAllToActiveOrBackoffQueue(logger, *action.eventHappens, nil, nil, nil) case action.podEnqueued != nil: - err := q.AddUnschedulableIfNotPresent(logger, action.podEnqueued, q.SchedulingCycle()) + err := q.AddUnschedulableIfNotPresent(logger, attemptQueuedPodInfo(action.podEnqueued), q.SchedulingCycle()) if err != nil { t.Fatalf("unexpected error from AddUnschedulableIfNotPresent: %v", err) } @@ -1028,7 +1028,7 @@ func TestPriorityQueue_Update(t *testing.T) { name: "when updating a pod which is in unschedulable queue and is backing off, it will be moved to backoff queue", wantQ: backoffQ, prepareFunc: func(t *testing.T, logger klog.Logger, q *PriorityQueue) (oldPod, newPod *v1.Pod) { - q.unschedulablePods.addOrUpdate(q.newQueuedPodInfo(medPriorityPodInfo.Pod, queuePlugin)) + q.unschedulablePods.addOrUpdate(attemptQueuedPodInfo(q.newQueuedPodInfo(medPriorityPodInfo.Pod, queuePlugin))) updatedPod := medPriorityPodInfo.Pod.DeepCopy() updatedPod.Annotations["foo"] = "test" return medPriorityPodInfo.Pod, updatedPod @@ -1039,7 +1039,7 @@ func TestPriorityQueue_Update(t *testing.T) { name: "when updating a pod which is in unschedulable queue and is not backing off, it will be moved to active queue", wantQ: activeQ, prepareFunc: func(t *testing.T, logger klog.Logger, q *PriorityQueue) (oldPod, newPod *v1.Pod) { - q.unschedulablePods.addOrUpdate(q.newQueuedPodInfo(medPriorityPodInfo.Pod, queuePlugin)) + q.unschedulablePods.addOrUpdate(attemptQueuedPodInfo(q.newQueuedPodInfo(medPriorityPodInfo.Pod, queuePlugin))) updatedPod := medPriorityPodInfo.Pod.DeepCopy() updatedPod.Annotations["foo"] = "test1" // Move clock by podInitialBackoffDuration, so that pods in the unschedulablePods would pass the backing off, @@ -1175,7 +1175,7 @@ func TestPriorityQueue_UpdateWhenInflight(t *testing.T) { // test-pod got rejected by fakePlugin, // but the update event that it just got may change this scheduling result, // and hence we should put this pod to activeQ/backoffQ. - err := q.AddUnschedulableIfNotPresent(logger, newQueuedPodInfoForLookup(updatedPod, "fakePlugin"), q.SchedulingCycle()) + err := q.AddUnschedulableIfNotPresent(logger, attemptQueuedPodInfo(newQueuedPodInfoForLookup(updatedPod, "fakePlugin")), q.SchedulingCycle()) if err != nil { t.Fatalf("unexpected error from AddUnschedulableIfNotPresent: %v", err) } @@ -1519,7 +1519,7 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueueWithQueueingHint(t *testing. }, { name: "Queue queues pod to backoffQ if Pod is backing off", - podInfo: &framework.QueuedPodInfo{PodInfo: mustNewPodInfo(p), UnschedulablePlugins: sets.New("foo")}, + podInfo: &framework.QueuedPodInfo{PodInfo: mustNewPodInfo(p), Attempts: 1, UnschedulablePlugins: sets.New("foo")}, hint: queueHintReturnQueue, expectedQ: backoffQ, }, @@ -1550,6 +1550,12 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueueWithQueueingHint(t *testing. hint: queueHintReturnQueue, expectedQ: activeQ, }, + { + name: "Pod that experienced a scheduling failure before should be queued to backoffQ after un-gated", + podInfo: setQueuedPodInfoGated(&framework.QueuedPodInfo{PodInfo: mustNewPodInfo(p), Attempts: 1, UnschedulablePlugins: sets.New("foo")}), + hint: queueHintReturnQueue, + expectedQ: backoffQ, + }, } for _, test := range tests { @@ -1618,11 +1624,11 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueue(t *testing.T) { t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPodInfo.Pod.Name, p.Pod.Name) } expectInFlightPods(t, q, unschedulablePodInfo.Pod.UID, highPriorityPodInfo.Pod.UID) - err := q.AddUnschedulableIfNotPresent(logger, q.newQueuedPodInfo(unschedulablePodInfo.Pod, "fooPlugin"), q.SchedulingCycle()) + err := q.AddUnschedulableIfNotPresent(logger, attemptQueuedPodInfo(q.newQueuedPodInfo(unschedulablePodInfo.Pod, "fooPlugin")), q.SchedulingCycle()) if err != nil { t.Fatalf("unexpected error from AddUnschedulableIfNotPresent: %v", err) } - err = q.AddUnschedulableIfNotPresent(logger, q.newQueuedPodInfo(highPriorityPodInfo.Pod, "fooPlugin"), q.SchedulingCycle()) + err = q.AddUnschedulableIfNotPresent(logger, attemptQueuedPodInfo(q.newQueuedPodInfo(highPriorityPodInfo.Pod, "fooPlugin")), q.SchedulingCycle()) if err != nil { t.Fatalf("unexpected error from AddUnschedulableIfNotPresent: %v", err) } @@ -1635,7 +1641,7 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueue(t *testing.T) { } expectInFlightPods(t, q, hpp1.UID) // This Pod will go to backoffQ because no failure plugin is associated with it. - err = q.AddUnschedulableIfNotPresent(logger, q.newQueuedPodInfo(hpp1), q.SchedulingCycle()) + err = q.AddUnschedulableIfNotPresent(logger, attemptQueuedPodInfo(q.newQueuedPodInfo(hpp1)), q.SchedulingCycle()) if err != nil { t.Fatalf("unexpected error from AddUnschedulableIfNotPresent: %v", err) } @@ -1648,7 +1654,7 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueue(t *testing.T) { } expectInFlightPods(t, q, hpp2.UID) // This Pod will go to the unschedulable Pod pool. - err = q.AddUnschedulableIfNotPresent(logger, q.newQueuedPodInfo(hpp2, "barPlugin"), q.SchedulingCycle()) + err = q.AddUnschedulableIfNotPresent(logger, attemptQueuedPodInfo(q.newQueuedPodInfo(hpp2, "barPlugin")), q.SchedulingCycle()) if err != nil { t.Fatalf("unexpected error from AddUnschedulableIfNotPresent: %v", err) } @@ -1691,9 +1697,9 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueue(t *testing.T) { if p, err := q.Pop(logger); err != nil || p.Pod != hpp1 { t.Errorf("Expected: %v after Pop, but got: %v", hpp1, p.Pod.Name) } - unschedulableQueuedPodInfo := q.newQueuedPodInfo(unschedulablePodInfo.Pod, "fooPlugin") - highPriorityQueuedPodInfo := q.newQueuedPodInfo(highPriorityPodInfo.Pod, "fooPlugin") - hpp1QueuedPodInfo := q.newQueuedPodInfo(hpp1) + unschedulableQueuedPodInfo := attemptQueuedPodInfo(q.newQueuedPodInfo(unschedulablePodInfo.Pod, "fooPlugin")) + highPriorityQueuedPodInfo := attemptQueuedPodInfo(q.newQueuedPodInfo(highPriorityPodInfo.Pod, "fooPlugin")) + hpp1QueuedPodInfo := attemptQueuedPodInfo(q.newQueuedPodInfo(hpp1)) expectInFlightPods(t, q, medPriorityPodInfo.Pod.UID, unschedulablePodInfo.Pod.UID, highPriorityPodInfo.Pod.UID, hpp1.UID) err = q.AddUnschedulableIfNotPresent(logger, unschedulableQueuedPodInfo, q.SchedulingCycle()) if err != nil { @@ -1757,25 +1763,25 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueueWithOutQueueingHint(t *testi // To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before AddUnschedulableIfNotPresent()s below. q.Add(logger, medPriorityPodInfo.Pod) - err := q.AddUnschedulableIfNotPresent(logger, q.newQueuedPodInfo(unschedulablePodInfo.Pod, "fooPlugin"), q.SchedulingCycle()) + err := q.AddUnschedulableIfNotPresent(logger, attemptQueuedPodInfo(q.newQueuedPodInfo(unschedulablePodInfo.Pod, "fooPlugin")), q.SchedulingCycle()) if err != nil { t.Fatalf("unexpected error from AddUnschedulableIfNotPresent: %v", err) } - err = q.AddUnschedulableIfNotPresent(logger, q.newQueuedPodInfo(highPriorityPodInfo.Pod, "fooPlugin"), q.SchedulingCycle()) + err = q.AddUnschedulableIfNotPresent(logger, attemptQueuedPodInfo(q.newQueuedPodInfo(highPriorityPodInfo.Pod, "fooPlugin")), q.SchedulingCycle()) if err != nil { t.Fatalf("unexpected error from AddUnschedulableIfNotPresent: %v", err) } // Construct a Pod, but don't associate its scheduler failure to any plugin hpp1 := clonePod(highPriorityPodInfo.Pod, "hpp1") // This Pod will go to backoffQ because no failure plugin is associated with it. - err = q.AddUnschedulableIfNotPresent(logger, q.newQueuedPodInfo(hpp1), q.SchedulingCycle()) + err = q.AddUnschedulableIfNotPresent(logger, attemptQueuedPodInfo(q.newQueuedPodInfo(hpp1)), q.SchedulingCycle()) if err != nil { t.Fatalf("unexpected error from AddUnschedulableIfNotPresent: %v", err) } // Construct another Pod, and associate its scheduler failure to plugin "barPlugin". hpp2 := clonePod(highPriorityPodInfo.Pod, "hpp2") // This Pod will go to the unschedulable Pod pool. - err = q.AddUnschedulableIfNotPresent(logger, q.newQueuedPodInfo(hpp2, "barPlugin"), q.SchedulingCycle()) + err = q.AddUnschedulableIfNotPresent(logger, attemptQueuedPodInfo(q.newQueuedPodInfo(hpp2, "barPlugin")), q.SchedulingCycle()) if err != nil { t.Fatalf("unexpected error from AddUnschedulableIfNotPresent: %v", err) } @@ -1803,9 +1809,9 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueueWithOutQueueingHint(t *testi } } - unschedulableQueuedPodInfo := q.newQueuedPodInfo(unschedulablePodInfo.Pod, "fooPlugin") - highPriorityQueuedPodInfo := q.newQueuedPodInfo(highPriorityPodInfo.Pod, "fooPlugin") - hpp1QueuedPodInfo := q.newQueuedPodInfo(hpp1) + unschedulableQueuedPodInfo := attemptQueuedPodInfo(q.newQueuedPodInfo(unschedulablePodInfo.Pod, "fooPlugin")) + highPriorityQueuedPodInfo := attemptQueuedPodInfo(q.newQueuedPodInfo(highPriorityPodInfo.Pod, "fooPlugin")) + hpp1QueuedPodInfo := attemptQueuedPodInfo(q.newQueuedPodInfo(hpp1)) err = q.AddUnschedulableIfNotPresent(logger, unschedulableQueuedPodInfo, q.SchedulingCycle()) if err != nil { t.Fatalf("unexpected error from AddUnschedulableIfNotPresent: %v", err) @@ -2071,11 +2077,11 @@ func TestPriorityQueue_PendingPods(t *testing.T) { t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPodInfo.Pod.Name, p.Pod.Name) } q.Add(logger, medPriorityPodInfo.Pod) - err := q.AddUnschedulableIfNotPresent(logger, q.newQueuedPodInfo(unschedulablePodInfo.Pod, "plugin"), q.SchedulingCycle()) + err := q.AddUnschedulableIfNotPresent(logger, attemptQueuedPodInfo(q.newQueuedPodInfo(unschedulablePodInfo.Pod, "plugin")), q.SchedulingCycle()) if err != nil { t.Fatalf("unexpected error from AddUnschedulableIfNotPresent: %v", err) } - err = q.AddUnschedulableIfNotPresent(logger, q.newQueuedPodInfo(highPriorityPodInfo.Pod, "plugin"), q.SchedulingCycle()) + err = q.AddUnschedulableIfNotPresent(logger, attemptQueuedPodInfo(q.newQueuedPodInfo(highPriorityPodInfo.Pod, "plugin")), q.SchedulingCycle()) if err != nil { t.Fatalf("unexpected error from AddUnschedulableIfNotPresent: %v", err) } @@ -2720,6 +2726,7 @@ var ( Reason: v1.PodReasonUnschedulable, Message: "fake scheduling failure", }) + pInfo = attemptQueuedPodInfo(pInfo) } queue.unschedulablePods.addOrUpdate(pInfo) } @@ -2859,6 +2866,16 @@ func TestPendingPodsMetric(t *testing.T) { totalWithDelay := 20 pInfosWithDelay := makeQueuedPodInfos(totalWithDelay, "z", queueable, timestamp.Add(2*time.Second)) + resetPodInfos := func() { + // reset PodInfo's Attempts because they influence the backoff time calculation. + for i := range pInfos { + pInfos[i].Attempts = 0 + } + for i := range pInfosWithDelay { + pInfosWithDelay[i].Attempts = 0 + } + } + tests := []struct { name string operations []operation @@ -3093,6 +3110,7 @@ scheduler_plugin_execution_duration_seconds_count{extension_point="PreEnqueue",p for _, test := range tests { t.Run(test.name, func(t *testing.T) { resetMetrics() + resetPodInfos() logger, ctx := ktesting.NewTestContext(t) ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -3465,7 +3483,7 @@ func TestMoveAllToActiveOrBackoffQueue_PreEnqueueChecks(t *testing.T) { t.Errorf("Expected: %v after Pop, but got: %v", podInfo.Pod.Name, p.Pod.Name) } podInfo.UnschedulablePlugins = sets.New("plugin") - err := q.AddUnschedulableIfNotPresent(logger, podInfo, q.activeQ.schedulingCycle()) + err := q.AddUnschedulableIfNotPresent(logger, attemptQueuedPodInfo(podInfo), q.activeQ.schedulingCycle()) if err != nil { t.Fatalf("unexpected error from AddUnschedulableIfNotPresent: %v", err) } @@ -3870,3 +3888,8 @@ func Test_queuedPodInfo_gatedSetUponCreationAndUnsetUponUpdate(t *testing.T) { t.Error("Expected pod to be ungated") } } + +func attemptQueuedPodInfo(podInfo *framework.QueuedPodInfo) *framework.QueuedPodInfo { + podInfo.Attempts++ + return podInfo +} diff --git a/pkg/scheduler/framework/types.go b/pkg/scheduler/framework/types.go index 4743306ca63..bf334033794 100644 --- a/pkg/scheduler/framework/types.go +++ b/pkg/scheduler/framework/types.go @@ -232,7 +232,7 @@ type QueuedPodInfo struct { // The time pod added to the scheduling queue. Timestamp time.Time // Number of schedule attempts before successfully scheduled. - // It's used to record the # attempts metric. + // It's used to record the # attempts metric and calculate the backoff time this Pod is obliged to get before retrying. Attempts int // The time when the pod is added to the queue for the first time. The pod may be added // back to the queue multiple times before it's successfully scheduled.