scheduler: impose a backoff penalty on gated Pods

This commit is contained in:
Kensei Nakada 2024-07-11 20:41:08 +09:00
parent 9682c62148
commit b5a156971f
3 changed files with 52 additions and 26 deletions

View File

@ -627,9 +627,6 @@ func (p *PriorityQueue) activate(logger klog.Logger, pod *v1.Pod) bool {
// isPodBackingoff returns true if a pod is still waiting for its backoff timer.
// If this returns true, the pod should not be re-tried.
func (p *PriorityQueue) isPodBackingoff(podInfo *framework.QueuedPodInfo) bool {
if podInfo.Gated {
return false
}
boTime := p.getBackoffTime(podInfo)
return boTime.After(p.clock.Now())
}
@ -1240,6 +1237,12 @@ func (p *PriorityQueue) getBackoffTime(podInfo *framework.QueuedPodInfo) time.Ti
// calculateBackoffDuration is a helper function for calculating the backoffDuration
// based on the number of attempts the pod has made.
func (p *PriorityQueue) calculateBackoffDuration(podInfo *framework.QueuedPodInfo) time.Duration {
if podInfo.Attempts == 0 {
// When the Pod hasn't experienced any scheduling attempts,
// they aren't obliged to get a backoff penalty at all.
return 0
}
duration := p.podInitialBackoffDuration
for i := 1; i < podInfo.Attempts; i++ {
// Use subtraction instead of addition or multiplication to avoid overflow.

View File

@ -697,7 +697,7 @@ func Test_InFlightPods(t *testing.T) {
case action.eventHappens != nil:
q.MoveAllToActiveOrBackoffQueue(logger, *action.eventHappens, nil, nil, nil)
case action.podEnqueued != nil:
err := q.AddUnschedulableIfNotPresent(logger, action.podEnqueued, q.SchedulingCycle())
err := q.AddUnschedulableIfNotPresent(logger, attemptQueuedPodInfo(action.podEnqueued), q.SchedulingCycle())
if err != nil {
t.Fatalf("unexpected error from AddUnschedulableIfNotPresent: %v", err)
}
@ -1028,7 +1028,7 @@ func TestPriorityQueue_Update(t *testing.T) {
name: "when updating a pod which is in unschedulable queue and is backing off, it will be moved to backoff queue",
wantQ: backoffQ,
prepareFunc: func(t *testing.T, logger klog.Logger, q *PriorityQueue) (oldPod, newPod *v1.Pod) {
q.unschedulablePods.addOrUpdate(q.newQueuedPodInfo(medPriorityPodInfo.Pod, queuePlugin))
q.unschedulablePods.addOrUpdate(attemptQueuedPodInfo(q.newQueuedPodInfo(medPriorityPodInfo.Pod, queuePlugin)))
updatedPod := medPriorityPodInfo.Pod.DeepCopy()
updatedPod.Annotations["foo"] = "test"
return medPriorityPodInfo.Pod, updatedPod
@ -1039,7 +1039,7 @@ func TestPriorityQueue_Update(t *testing.T) {
name: "when updating a pod which is in unschedulable queue and is not backing off, it will be moved to active queue",
wantQ: activeQ,
prepareFunc: func(t *testing.T, logger klog.Logger, q *PriorityQueue) (oldPod, newPod *v1.Pod) {
q.unschedulablePods.addOrUpdate(q.newQueuedPodInfo(medPriorityPodInfo.Pod, queuePlugin))
q.unschedulablePods.addOrUpdate(attemptQueuedPodInfo(q.newQueuedPodInfo(medPriorityPodInfo.Pod, queuePlugin)))
updatedPod := medPriorityPodInfo.Pod.DeepCopy()
updatedPod.Annotations["foo"] = "test1"
// Move clock by podInitialBackoffDuration, so that pods in the unschedulablePods would pass the backing off,
@ -1175,7 +1175,7 @@ func TestPriorityQueue_UpdateWhenInflight(t *testing.T) {
// test-pod got rejected by fakePlugin,
// but the update event that it just got may change this scheduling result,
// and hence we should put this pod to activeQ/backoffQ.
err := q.AddUnschedulableIfNotPresent(logger, newQueuedPodInfoForLookup(updatedPod, "fakePlugin"), q.SchedulingCycle())
err := q.AddUnschedulableIfNotPresent(logger, attemptQueuedPodInfo(newQueuedPodInfoForLookup(updatedPod, "fakePlugin")), q.SchedulingCycle())
if err != nil {
t.Fatalf("unexpected error from AddUnschedulableIfNotPresent: %v", err)
}
@ -1519,7 +1519,7 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueueWithQueueingHint(t *testing.
},
{
name: "Queue queues pod to backoffQ if Pod is backing off",
podInfo: &framework.QueuedPodInfo{PodInfo: mustNewPodInfo(p), UnschedulablePlugins: sets.New("foo")},
podInfo: &framework.QueuedPodInfo{PodInfo: mustNewPodInfo(p), Attempts: 1, UnschedulablePlugins: sets.New("foo")},
hint: queueHintReturnQueue,
expectedQ: backoffQ,
},
@ -1550,6 +1550,12 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueueWithQueueingHint(t *testing.
hint: queueHintReturnQueue,
expectedQ: activeQ,
},
{
name: "Pod that experienced a scheduling failure before should be queued to backoffQ after un-gated",
podInfo: setQueuedPodInfoGated(&framework.QueuedPodInfo{PodInfo: mustNewPodInfo(p), Attempts: 1, UnschedulablePlugins: sets.New("foo")}),
hint: queueHintReturnQueue,
expectedQ: backoffQ,
},
}
for _, test := range tests {
@ -1618,11 +1624,11 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueue(t *testing.T) {
t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPodInfo.Pod.Name, p.Pod.Name)
}
expectInFlightPods(t, q, unschedulablePodInfo.Pod.UID, highPriorityPodInfo.Pod.UID)
err := q.AddUnschedulableIfNotPresent(logger, q.newQueuedPodInfo(unschedulablePodInfo.Pod, "fooPlugin"), q.SchedulingCycle())
err := q.AddUnschedulableIfNotPresent(logger, attemptQueuedPodInfo(q.newQueuedPodInfo(unschedulablePodInfo.Pod, "fooPlugin")), q.SchedulingCycle())
if err != nil {
t.Fatalf("unexpected error from AddUnschedulableIfNotPresent: %v", err)
}
err = q.AddUnschedulableIfNotPresent(logger, q.newQueuedPodInfo(highPriorityPodInfo.Pod, "fooPlugin"), q.SchedulingCycle())
err = q.AddUnschedulableIfNotPresent(logger, attemptQueuedPodInfo(q.newQueuedPodInfo(highPriorityPodInfo.Pod, "fooPlugin")), q.SchedulingCycle())
if err != nil {
t.Fatalf("unexpected error from AddUnschedulableIfNotPresent: %v", err)
}
@ -1635,7 +1641,7 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueue(t *testing.T) {
}
expectInFlightPods(t, q, hpp1.UID)
// This Pod will go to backoffQ because no failure plugin is associated with it.
err = q.AddUnschedulableIfNotPresent(logger, q.newQueuedPodInfo(hpp1), q.SchedulingCycle())
err = q.AddUnschedulableIfNotPresent(logger, attemptQueuedPodInfo(q.newQueuedPodInfo(hpp1)), q.SchedulingCycle())
if err != nil {
t.Fatalf("unexpected error from AddUnschedulableIfNotPresent: %v", err)
}
@ -1648,7 +1654,7 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueue(t *testing.T) {
}
expectInFlightPods(t, q, hpp2.UID)
// This Pod will go to the unschedulable Pod pool.
err = q.AddUnschedulableIfNotPresent(logger, q.newQueuedPodInfo(hpp2, "barPlugin"), q.SchedulingCycle())
err = q.AddUnschedulableIfNotPresent(logger, attemptQueuedPodInfo(q.newQueuedPodInfo(hpp2, "barPlugin")), q.SchedulingCycle())
if err != nil {
t.Fatalf("unexpected error from AddUnschedulableIfNotPresent: %v", err)
}
@ -1691,9 +1697,9 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueue(t *testing.T) {
if p, err := q.Pop(logger); err != nil || p.Pod != hpp1 {
t.Errorf("Expected: %v after Pop, but got: %v", hpp1, p.Pod.Name)
}
unschedulableQueuedPodInfo := q.newQueuedPodInfo(unschedulablePodInfo.Pod, "fooPlugin")
highPriorityQueuedPodInfo := q.newQueuedPodInfo(highPriorityPodInfo.Pod, "fooPlugin")
hpp1QueuedPodInfo := q.newQueuedPodInfo(hpp1)
unschedulableQueuedPodInfo := attemptQueuedPodInfo(q.newQueuedPodInfo(unschedulablePodInfo.Pod, "fooPlugin"))
highPriorityQueuedPodInfo := attemptQueuedPodInfo(q.newQueuedPodInfo(highPriorityPodInfo.Pod, "fooPlugin"))
hpp1QueuedPodInfo := attemptQueuedPodInfo(q.newQueuedPodInfo(hpp1))
expectInFlightPods(t, q, medPriorityPodInfo.Pod.UID, unschedulablePodInfo.Pod.UID, highPriorityPodInfo.Pod.UID, hpp1.UID)
err = q.AddUnschedulableIfNotPresent(logger, unschedulableQueuedPodInfo, q.SchedulingCycle())
if err != nil {
@ -1757,25 +1763,25 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueueWithOutQueueingHint(t *testi
// To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before AddUnschedulableIfNotPresent()s below.
q.Add(logger, medPriorityPodInfo.Pod)
err := q.AddUnschedulableIfNotPresent(logger, q.newQueuedPodInfo(unschedulablePodInfo.Pod, "fooPlugin"), q.SchedulingCycle())
err := q.AddUnschedulableIfNotPresent(logger, attemptQueuedPodInfo(q.newQueuedPodInfo(unschedulablePodInfo.Pod, "fooPlugin")), q.SchedulingCycle())
if err != nil {
t.Fatalf("unexpected error from AddUnschedulableIfNotPresent: %v", err)
}
err = q.AddUnschedulableIfNotPresent(logger, q.newQueuedPodInfo(highPriorityPodInfo.Pod, "fooPlugin"), q.SchedulingCycle())
err = q.AddUnschedulableIfNotPresent(logger, attemptQueuedPodInfo(q.newQueuedPodInfo(highPriorityPodInfo.Pod, "fooPlugin")), q.SchedulingCycle())
if err != nil {
t.Fatalf("unexpected error from AddUnschedulableIfNotPresent: %v", err)
}
// Construct a Pod, but don't associate its scheduler failure to any plugin
hpp1 := clonePod(highPriorityPodInfo.Pod, "hpp1")
// This Pod will go to backoffQ because no failure plugin is associated with it.
err = q.AddUnschedulableIfNotPresent(logger, q.newQueuedPodInfo(hpp1), q.SchedulingCycle())
err = q.AddUnschedulableIfNotPresent(logger, attemptQueuedPodInfo(q.newQueuedPodInfo(hpp1)), q.SchedulingCycle())
if err != nil {
t.Fatalf("unexpected error from AddUnschedulableIfNotPresent: %v", err)
}
// Construct another Pod, and associate its scheduler failure to plugin "barPlugin".
hpp2 := clonePod(highPriorityPodInfo.Pod, "hpp2")
// This Pod will go to the unschedulable Pod pool.
err = q.AddUnschedulableIfNotPresent(logger, q.newQueuedPodInfo(hpp2, "barPlugin"), q.SchedulingCycle())
err = q.AddUnschedulableIfNotPresent(logger, attemptQueuedPodInfo(q.newQueuedPodInfo(hpp2, "barPlugin")), q.SchedulingCycle())
if err != nil {
t.Fatalf("unexpected error from AddUnschedulableIfNotPresent: %v", err)
}
@ -1803,9 +1809,9 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueueWithOutQueueingHint(t *testi
}
}
unschedulableQueuedPodInfo := q.newQueuedPodInfo(unschedulablePodInfo.Pod, "fooPlugin")
highPriorityQueuedPodInfo := q.newQueuedPodInfo(highPriorityPodInfo.Pod, "fooPlugin")
hpp1QueuedPodInfo := q.newQueuedPodInfo(hpp1)
unschedulableQueuedPodInfo := attemptQueuedPodInfo(q.newQueuedPodInfo(unschedulablePodInfo.Pod, "fooPlugin"))
highPriorityQueuedPodInfo := attemptQueuedPodInfo(q.newQueuedPodInfo(highPriorityPodInfo.Pod, "fooPlugin"))
hpp1QueuedPodInfo := attemptQueuedPodInfo(q.newQueuedPodInfo(hpp1))
err = q.AddUnschedulableIfNotPresent(logger, unschedulableQueuedPodInfo, q.SchedulingCycle())
if err != nil {
t.Fatalf("unexpected error from AddUnschedulableIfNotPresent: %v", err)
@ -2071,11 +2077,11 @@ func TestPriorityQueue_PendingPods(t *testing.T) {
t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPodInfo.Pod.Name, p.Pod.Name)
}
q.Add(logger, medPriorityPodInfo.Pod)
err := q.AddUnschedulableIfNotPresent(logger, q.newQueuedPodInfo(unschedulablePodInfo.Pod, "plugin"), q.SchedulingCycle())
err := q.AddUnschedulableIfNotPresent(logger, attemptQueuedPodInfo(q.newQueuedPodInfo(unschedulablePodInfo.Pod, "plugin")), q.SchedulingCycle())
if err != nil {
t.Fatalf("unexpected error from AddUnschedulableIfNotPresent: %v", err)
}
err = q.AddUnschedulableIfNotPresent(logger, q.newQueuedPodInfo(highPriorityPodInfo.Pod, "plugin"), q.SchedulingCycle())
err = q.AddUnschedulableIfNotPresent(logger, attemptQueuedPodInfo(q.newQueuedPodInfo(highPriorityPodInfo.Pod, "plugin")), q.SchedulingCycle())
if err != nil {
t.Fatalf("unexpected error from AddUnschedulableIfNotPresent: %v", err)
}
@ -2720,6 +2726,7 @@ var (
Reason: v1.PodReasonUnschedulable,
Message: "fake scheduling failure",
})
pInfo = attemptQueuedPodInfo(pInfo)
}
queue.unschedulablePods.addOrUpdate(pInfo)
}
@ -2859,6 +2866,16 @@ func TestPendingPodsMetric(t *testing.T) {
totalWithDelay := 20
pInfosWithDelay := makeQueuedPodInfos(totalWithDelay, "z", queueable, timestamp.Add(2*time.Second))
resetPodInfos := func() {
// reset PodInfo's Attempts because they influence the backoff time calculation.
for i := range pInfos {
pInfos[i].Attempts = 0
}
for i := range pInfosWithDelay {
pInfosWithDelay[i].Attempts = 0
}
}
tests := []struct {
name string
operations []operation
@ -3093,6 +3110,7 @@ scheduler_plugin_execution_duration_seconds_count{extension_point="PreEnqueue",p
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
resetMetrics()
resetPodInfos()
logger, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
@ -3465,7 +3483,7 @@ func TestMoveAllToActiveOrBackoffQueue_PreEnqueueChecks(t *testing.T) {
t.Errorf("Expected: %v after Pop, but got: %v", podInfo.Pod.Name, p.Pod.Name)
}
podInfo.UnschedulablePlugins = sets.New("plugin")
err := q.AddUnschedulableIfNotPresent(logger, podInfo, q.activeQ.schedulingCycle())
err := q.AddUnschedulableIfNotPresent(logger, attemptQueuedPodInfo(podInfo), q.activeQ.schedulingCycle())
if err != nil {
t.Fatalf("unexpected error from AddUnschedulableIfNotPresent: %v", err)
}
@ -3870,3 +3888,8 @@ func Test_queuedPodInfo_gatedSetUponCreationAndUnsetUponUpdate(t *testing.T) {
t.Error("Expected pod to be ungated")
}
}
func attemptQueuedPodInfo(podInfo *framework.QueuedPodInfo) *framework.QueuedPodInfo {
podInfo.Attempts++
return podInfo
}

View File

@ -232,7 +232,7 @@ type QueuedPodInfo struct {
// The time pod added to the scheduling queue.
Timestamp time.Time
// Number of schedule attempts before successfully scheduled.
// It's used to record the # attempts metric.
// It's used to record the # attempts metric and calculate the backoff time this Pod is obliged to get before retrying.
Attempts int
// The time when the pod is added to the queue for the first time. The pod may be added
// back to the queue multiple times before it's successfully scheduled.