diff --git a/pkg/scheduler/backend/queue/backoff_queue.go b/pkg/scheduler/backend/queue/backoff_queue.go index 656e5098e70..41c053389d3 100644 --- a/pkg/scheduler/backend/queue/backoff_queue.go +++ b/pkg/scheduler/backend/queue/backoff_queue.go @@ -27,10 +27,18 @@ import ( "k8s.io/utils/clock" ) +// backoffQOrderingWindowDuration is a duration of an ordering window in the podBackoffQ. +// In each window, represented as a whole second, pods are ordered by priority. +// It is the same as interval of flushing the pods from the podBackoffQ to the activeQ, to flush the whole windows there. +// This works only if PopFromBackoffQ feature is enabled. +// See the KEP-5142 (http://kep.k8s.io/5142) for rationale. +const backoffQOrderingWindowDuration = time.Second + // backoffQueuer is a wrapper for backoffQ related operations. type backoffQueuer interface { // isPodBackingoff returns true if a pod is still waiting for its backoff timer. // If this returns true, the pod should not be re-tried. + // If the pod backoff time is in the actual ordering window, it should still be backing off. isPodBackingoff(podInfo *framework.QueuedPodInfo) bool // popEachBackoffCompleted run fn for all pods from podBackoffQ and podErrorBackoffQ that completed backoff while popping them. popEachBackoffCompleted(logger klog.Logger, fn func(pInfo *framework.QueuedPodInfo)) @@ -39,6 +47,11 @@ type backoffQueuer interface { podInitialBackoffDuration() time.Duration // podMaxBackoffDuration returns maximum backoff duration that pod can get. podMaxBackoffDuration() time.Duration + // waitUntilAlignedWithOrderingWindow waits until the time reaches a multiple of backoffQOrderingWindowDuration. + // It then runs the f function at the backoffQOrderingWindowDuration interval using a ticker. + // It's important to align the flushing time, because podBackoffQ's ordering is based on the windows + // and whole windows have to be flushed at one time without a visible latency. + waitUntilAlignedWithOrderingWindow(f func(), stopCh <-chan struct{}) // add adds the pInfo to backoffQueue. // The event should show which event triggered this addition and is used for the metric recording. @@ -54,7 +67,7 @@ type backoffQueuer interface { // has inform if pInfo exists in the queue. has(pInfo *framework.QueuedPodInfo) bool // list returns all pods that are in the queue. - list() []*framework.QueuedPodInfo + list() []*v1.Pod // len returns length of the queue. len() int } @@ -62,7 +75,7 @@ type backoffQueuer interface { // backoffQueue implements backoffQueuer and wraps two queues inside, // providing seamless access as if it were one queue. type backoffQueue struct { - clock clock.Clock + clock clock.WithTicker // podBackoffQ is a heap ordered by backoff expiry. Pods which have completed backoff // are popped from this heap before the scheduler looks at activeQ @@ -73,15 +86,27 @@ type backoffQueue struct { podInitialBackoff time.Duration podMaxBackoff time.Duration + // activeQLessFn is used as an eventual less function if two backoff times are equal, + // when the SchedulerPopFromBackoffQ feature is enabled. + activeQLessFn framework.LessFunc + + // isPopFromBackoffQEnabled indicates whether the feature gate SchedulerPopFromBackoffQ is enabled. + isPopFromBackoffQEnabled bool } -func newBackoffQueue(clock clock.Clock, podInitialBackoffDuration time.Duration, podMaxBackoffDuration time.Duration) *backoffQueue { +func newBackoffQueue(clock clock.WithTicker, podInitialBackoffDuration time.Duration, podMaxBackoffDuration time.Duration, activeQLessFn framework.LessFunc, popFromBackoffQEnabled bool) *backoffQueue { bq := &backoffQueue{ - clock: clock, - podInitialBackoff: podInitialBackoffDuration, - podMaxBackoff: podMaxBackoffDuration, + clock: clock, + podInitialBackoff: podInitialBackoffDuration, + podMaxBackoff: podMaxBackoffDuration, + isPopFromBackoffQEnabled: popFromBackoffQEnabled, + activeQLessFn: activeQLessFn, } - bq.podBackoffQ = heap.NewWithRecorder(podInfoKeyFunc, bq.lessBackoffCompleted, metrics.NewBackoffPodsRecorder()) + podBackoffQLessFn := bq.lessBackoffCompleted + if popFromBackoffQEnabled { + podBackoffQLessFn = bq.lessBackoffCompletedWithPriority + } + bq.podBackoffQ = heap.NewWithRecorder(podInfoKeyFunc, podBackoffQLessFn, metrics.NewBackoffPodsRecorder()) bq.podErrorBackoffQ = heap.NewWithRecorder(podInfoKeyFunc, bq.lessBackoffCompleted, metrics.NewBackoffPodsRecorder()) return bq @@ -97,7 +122,70 @@ func (bq *backoffQueue) podMaxBackoffDuration() time.Duration { return bq.podMaxBackoff } -// lessBackoffCompleted is a less function of podBackoffQ and podErrorBackoffQ. +// alignToWindow truncates the provided time to the podBackoffQ ordering window. +// It returns the lowest possible timestamp in the window. +func (bq *backoffQueue) alignToWindow(t time.Time) time.Time { + if !bq.isPopFromBackoffQEnabled { + return t + } + return t.Truncate(backoffQOrderingWindowDuration) +} + +// waitUntilAlignedWithOrderingWindow waits until the time reaches a multiple of backoffQOrderingWindowDuration. +// It then runs the f function at the backoffQOrderingWindowDuration interval using a ticker. +// It's important to align the flushing time, because podBackoffQ's ordering is based on the windows +// and whole windows have to be flushed at one time without a visible latency. +func (bq *backoffQueue) waitUntilAlignedWithOrderingWindow(f func(), stopCh <-chan struct{}) { + now := bq.clock.Now() + // Wait until the time reaches the multiple of backoffQOrderingWindowDuration. + durationToNextWindow := bq.alignToWindow(now.Add(backoffQOrderingWindowDuration)).Sub(now) + timer := bq.clock.NewTimer(durationToNextWindow) + select { + case <-stopCh: + timer.Stop() + return + case <-timer.C(): + } + + // Run a ticker to make sure the invocations of f function + // are aligned with the backoffQ's ordering window. + ticker := bq.clock.NewTicker(backoffQOrderingWindowDuration) + for { + select { + case <-stopCh: + return + default: + } + + f() + + // NOTE: b/c there is no priority selection in golang + // it is possible for this to race, meaning we could + // trigger ticker.C and stopCh, and ticker.C select falls through. + // In order to mitigate we re-check stopCh at the beginning + // of every loop to prevent extra executions of f(). + select { + case <-stopCh: + ticker.Stop() + return + case <-ticker.C(): + } + } +} + +// lessBackoffCompletedWithPriority is a less function of podBackoffQ if PopFromBackoffQ feature is enabled. +// It orders the pods in the same BackoffOrderingWindow the same as the activeQ will do to improve popping order from backoffQ when activeQ is empty. +func (bq *backoffQueue) lessBackoffCompletedWithPriority(pInfo1, pInfo2 *framework.QueuedPodInfo) bool { + bo1 := bq.getBackoffTime(pInfo1) + bo2 := bq.getBackoffTime(pInfo2) + if !bo1.Equal(bo2) { + return bo1.Before(bo2) + } + // If the backoff time is the same, sort the pod in the same manner as activeQ does. + return bq.activeQLessFn(pInfo1, pInfo2) +} + +// lessBackoffCompleted is a less function of podErrorBackoffQ. func (bq *backoffQueue) lessBackoffCompleted(pInfo1, pInfo2 *framework.QueuedPodInfo) bool { bo1 := bq.getBackoffTime(pInfo1) bo2 := bq.getBackoffTime(pInfo2) @@ -106,9 +194,11 @@ func (bq *backoffQueue) lessBackoffCompleted(pInfo1, pInfo2 *framework.QueuedPod // isPodBackingoff returns true if a pod is still waiting for its backoff timer. // If this returns true, the pod should not be re-tried. +// If the pod backoff time is in the actual ordering window, it should still be backing off. func (bq *backoffQueue) isPodBackingoff(podInfo *framework.QueuedPodInfo) bool { boTime := bq.getBackoffTime(podInfo) - return boTime.After(bq.clock.Now()) + // Don't use After, because in case of windows equality we want to return true. + return !boTime.Before(bq.alignToWindow(bq.clock.Now())) } // getBackoffTime returns the time that podInfo completes backoff. @@ -117,9 +207,14 @@ func (bq *backoffQueue) isPodBackingoff(podInfo *framework.QueuedPodInfo) bool { // because of the fact that the backoff time is calculated based on podInfo.Attempts, // which doesn't get changed until the pod's scheduling is retried. func (bq *backoffQueue) getBackoffTime(podInfo *framework.QueuedPodInfo) time.Time { + if podInfo.Attempts == 0 { + // Don't store backoff expiration if the duration is 0 + // to correctly handle isPodBackingoff, if pod should skip backoff, when it wasn't tried at all. + return time.Time{} + } if podInfo.BackoffExpiration.IsZero() { duration := bq.calculateBackoffDuration(podInfo) - podInfo.BackoffExpiration = podInfo.Timestamp.Add(duration) + podInfo.BackoffExpiration = bq.alignToWindow(podInfo.Timestamp.Add(duration)) } return podInfo.BackoffExpiration } @@ -238,8 +333,15 @@ func (bq *backoffQueue) has(pInfo *framework.QueuedPodInfo) bool { } // list returns all pods that are in the queue. -func (bq *backoffQueue) list() []*framework.QueuedPodInfo { - return append(bq.podBackoffQ.List(), bq.podErrorBackoffQ.List()...) +func (bq *backoffQueue) list() []*v1.Pod { + var result []*v1.Pod + for _, pInfo := range bq.podBackoffQ.List() { + result = append(result, pInfo.Pod) + } + for _, pInfo := range bq.podErrorBackoffQ.List() { + result = append(result, pInfo.Pod) + } + return result } // len returns length of the queue. diff --git a/pkg/scheduler/backend/queue/backoff_queue_test.go b/pkg/scheduler/backend/queue/backoff_queue_test.go index 1a93a7ee32d..c26e3271262 100644 --- a/pkg/scheduler/backend/queue/backoff_queue_test.go +++ b/pkg/scheduler/backend/queue/backoff_queue_test.go @@ -17,6 +17,7 @@ limitations under the License. package queue import ( + "fmt" "math" "testing" "time" @@ -69,7 +70,7 @@ func TestBackoffQueue_calculateBackoffDuration(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - bq := newBackoffQueue(clock.RealClock{}, tt.initialBackoffDuration, tt.maxBackoffDuration) + bq := newBackoffQueue(clock.RealClock{}, tt.initialBackoffDuration, tt.maxBackoffDuration, newDefaultQueueSort(), true) if got := bq.calculateBackoffDuration(tt.podInfo); got != tt.want { t.Errorf("backoffQueue.calculateBackoffDuration() = %v, want %v", got, tt.want) } @@ -84,7 +85,7 @@ func TestBackoffQueue_popEachBackoffCompleted(t *testing.T) { PodInfo: &framework.PodInfo{ Pod: st.MakePod().Name("pod0").Obj(), }, - Timestamp: fakeClock.Now().Add(-time.Second), + Timestamp: fakeClock.Now().Add(-2 * time.Second), Attempts: 1, UnschedulablePlugins: sets.New("plugin"), }, @@ -100,7 +101,7 @@ func TestBackoffQueue_popEachBackoffCompleted(t *testing.T) { PodInfo: &framework.PodInfo{ Pod: st.MakePod().Name("pod2").Obj(), }, - Timestamp: fakeClock.Now().Add(-time.Second), + Timestamp: fakeClock.Now().Add(-2 * time.Second), Attempts: 1, }, "pod3": { @@ -147,12 +148,105 @@ func TestBackoffQueue_popEachBackoffCompleted(t *testing.T) { wantPods: nil, }, } + for _, tt := range tests { + for _, popFromBackoffQEnabled := range []bool{true, false} { + t.Run(fmt.Sprintf("%s popFromBackoffQEnabled(%v)", tt.name, popFromBackoffQEnabled), func(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) + bq := newBackoffQueue(fakeClock, DefaultPodInitialBackoffDuration, DefaultPodMaxBackoffDuration, newDefaultQueueSort(), popFromBackoffQEnabled) + for _, podName := range tt.podsInBackoff { + bq.add(logger, podInfos[podName], framework.EventUnscheduledPodAdd.Label()) + } + var gotPods []string + bq.popEachBackoffCompleted(logger, func(pInfo *framework.QueuedPodInfo) { + gotPods = append(gotPods, pInfo.Pod.Name) + }) + if diff := cmp.Diff(tt.wantPods, gotPods); diff != "" { + t.Errorf("Unexpected pods moved (-want, +got):\n%s", diff) + } + podsToStayInBackoff := len(tt.podsInBackoff) - len(tt.wantPods) + if bq.len() != podsToStayInBackoff { + t.Errorf("Expected %v pods to stay in backoffQ, but got: %v", podsToStayInBackoff, bq.len()) + } + }) + } + } +} + +func TestBackoffQueueOrdering(t *testing.T) { + // Align the fake clock with ordering window. + fakeClock := testingclock.NewFakeClock(time.Now().Truncate(backoffQOrderingWindowDuration)) + podInfos := []*framework.QueuedPodInfo{ + { + PodInfo: &framework.PodInfo{ + Pod: st.MakePod().Name("pod0").Priority(1).Obj(), + }, + Timestamp: fakeClock.Now(), + Attempts: 1, + UnschedulablePlugins: sets.New("plugin"), + }, + { + PodInfo: &framework.PodInfo{ + Pod: st.MakePod().Name("pod1").Priority(1).Obj(), + }, + Timestamp: fakeClock.Now().Add(-time.Second), + Attempts: 1, + UnschedulablePlugins: sets.New("plugin"), + }, + { + PodInfo: &framework.PodInfo{ + Pod: st.MakePod().Name("pod2").Priority(2).Obj(), + }, + Timestamp: fakeClock.Now().Add(-2*time.Second + time.Millisecond), + Attempts: 1, + UnschedulablePlugins: sets.New("plugin"), + }, + { + PodInfo: &framework.PodInfo{ + Pod: st.MakePod().Name("pod3").Priority(1).Obj(), + }, + Timestamp: fakeClock.Now().Add(-2 * time.Second), + Attempts: 1, + UnschedulablePlugins: sets.New("plugin"), + }, + { + PodInfo: &framework.PodInfo{ + Pod: st.MakePod().Name("pod4").Priority(2).Obj(), + }, + Timestamp: fakeClock.Now().Add(-2 * time.Second), + Attempts: 1, + UnschedulablePlugins: sets.New("plugin"), + }, + { + PodInfo: &framework.PodInfo{ + Pod: st.MakePod().Name("pod5").Priority(1).Obj(), + }, + Timestamp: fakeClock.Now().Add(-3 * time.Second), + Attempts: 1, + UnschedulablePlugins: sets.New("plugin"), + }, + } + tests := []struct { + name string + popFromBackoffQEnabled bool + wantPods []string + }{ + { + name: "Pods with the same window are ordered by priority if PopFromBackoffQ is enabled", + popFromBackoffQEnabled: true, + wantPods: []string{"pod5", "pod4", "pod2", "pod3"}, + }, + { + name: "Pods priority doesn't matter if PopFromBackoffQ is disabled", + popFromBackoffQEnabled: false, + wantPods: []string{"pod5", "pod3", "pod4", "pod2"}, + }, + } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { logger, _ := ktesting.NewTestContext(t) - bq := newBackoffQueue(fakeClock, DefaultPodInitialBackoffDuration, DefaultPodMaxBackoffDuration) - for _, podName := range tt.podsInBackoff { - bq.add(logger, podInfos[podName], framework.EventUnscheduledPodAdd.Label()) + bq := newBackoffQueue(fakeClock, DefaultPodInitialBackoffDuration, DefaultPodMaxBackoffDuration, newDefaultQueueSort(), tt.popFromBackoffQEnabled) + for _, podInfo := range podInfos { + bq.add(logger, podInfo, framework.EventUnscheduledPodAdd.Label()) } var gotPods []string bq.popEachBackoffCompleted(logger, func(pInfo *framework.QueuedPodInfo) { @@ -161,10 +255,6 @@ func TestBackoffQueue_popEachBackoffCompleted(t *testing.T) { if diff := cmp.Diff(tt.wantPods, gotPods); diff != "" { t.Errorf("Unexpected pods moved (-want, +got):\n%s", diff) } - podsToStayInBackoff := len(tt.podsInBackoff) - len(tt.wantPods) - if bq.len() != podsToStayInBackoff { - t.Errorf("Expected %v pods to stay in backoffQ, but got: %v", podsToStayInBackoff, bq.len()) - } }) } } diff --git a/pkg/scheduler/backend/queue/scheduling_queue.go b/pkg/scheduler/backend/queue/scheduling_queue.go index 9e3ae718579..340675c4f9d 100644 --- a/pkg/scheduler/backend/queue/scheduling_queue.go +++ b/pkg/scheduler/backend/queue/scheduling_queue.go @@ -132,6 +132,8 @@ type SchedulingQueue interface { PendingPods() ([]*v1.Pod, string) InFlightPods() []*v1.Pod PodsInActiveQ() []*v1.Pod + // PodsInBackoffQ returns all the Pods in the backoffQ. + PodsInBackoffQ() []*v1.Pod } // NewSchedulingQueue initializes a priority queue as a new scheduling queue. @@ -155,7 +157,7 @@ type PriorityQueue struct { *nominator stop chan struct{} - clock clock.Clock + clock clock.WithTicker // lock takes precedence and should be taken first, // before any other locks in the queue (activeQueue.lock or nominator.nLock). @@ -209,7 +211,7 @@ type clusterEvent struct { } type priorityQueueOptions struct { - clock clock.Clock + clock clock.WithTicker podInitialBackoffDuration time.Duration podMaxBackoffDuration time.Duration podMaxInUnschedulablePodsDuration time.Duration @@ -224,7 +226,7 @@ type priorityQueueOptions struct { type Option func(*priorityQueueOptions) // WithClock sets clock for PriorityQueue, the default clock is clock.RealClock. -func WithClock(clock clock.Clock) Option { +func WithClock(clock clock.WithTicker) Option { return func(o *priorityQueueOptions) { o.clock = clock } @@ -334,7 +336,7 @@ func NewPriorityQueue( stop: make(chan struct{}), podMaxInUnschedulablePodsDuration: options.podMaxInUnschedulablePodsDuration, activeQ: newActiveQueue(heap.NewWithRecorder(podInfoKeyFunc, heap.LessFunc[*framework.QueuedPodInfo](lessFn), metrics.NewActivePodsRecorder()), isSchedulingQueueHintEnabled, options.metricsRecorder), - backoffQ: newBackoffQueue(options.clock, options.podInitialBackoffDuration, options.podMaxBackoffDuration), + backoffQ: newBackoffQueue(options.clock, options.podInitialBackoffDuration, options.podMaxBackoffDuration, lessFn, isPopFromBackoffQEnabled), unschedulablePods: newUnschedulablePods(metrics.NewUnschedulablePodsRecorder(), metrics.NewGatedPodsRecorder()), preEnqueuePluginMap: options.preEnqueuePluginMap, queueingHintMap: options.queueingHintMap, @@ -352,9 +354,9 @@ func NewPriorityQueue( // Run starts the goroutine to pump from backoffQ to activeQ func (p *PriorityQueue) Run(logger klog.Logger) { - go wait.Until(func() { + go p.backoffQ.waitUntilAlignedWithOrderingWindow(func() { p.flushBackoffQCompleted(logger) - }, 1.0*time.Second, p.stop) + }, p.stop) go wait.Until(func() { p.flushUnschedulablePodsLeftover(logger) }, 30*time.Second, p.stop) @@ -1177,6 +1179,11 @@ func (p *PriorityQueue) PodsInActiveQ() []*v1.Pod { return p.activeQ.list() } +// PodsInBackoffQ returns all the Pods in the backoffQ. +func (p *PriorityQueue) PodsInBackoffQ() []*v1.Pod { + return p.backoffQ.list() +} + var pendingPodsSummary = "activeQ:%v; backoffQ:%v; unschedulablePods:%v" // GetPod searches for a pod in the activeQ, backoffQ, and unschedulablePods. @@ -1215,9 +1222,7 @@ func (p *PriorityQueue) PendingPods() ([]*v1.Pod, string) { defer p.lock.RUnlock() result := p.activeQ.list() activeQLen := len(result) - for _, pInfo := range p.backoffQ.list() { - result = append(result, pInfo.Pod) - } + result = append(result, p.backoffQ.list()...) for _, pInfo := range p.unschedulablePods.podInfoMap { result = append(result, pInfo.Pod) } diff --git a/pkg/scheduler/backend/queue/scheduling_queue_test.go b/pkg/scheduler/backend/queue/scheduling_queue_test.go index c85fad42d34..baab4dd441c 100644 --- a/pkg/scheduler/backend/queue/scheduling_queue_test.go +++ b/pkg/scheduler/backend/queue/scheduling_queue_test.go @@ -835,18 +835,17 @@ func Test_InFlightPods(t *testing.T) { } if test.wantBackoffQPodNames != nil { - podInfos := q.backoffQ.list() + pods := q.backoffQ.list() var podNames []string - for _, pInfo := range podInfos { - podNames = append(podNames, pInfo.Pod.Name) + for _, pod := range pods { + podNames = append(podNames, pod.Name) } if diff := cmp.Diff(test.wantBackoffQPodNames, podNames, sortOpt); diff != "" { t.Fatalf("Unexpected diff of backoffQ pod names (-want, +got):\n%s", diff) } wantPodNames := sets.New(test.wantBackoffQPodNames...) - for _, podInfo := range podInfos { - podGotFromBackoffQ := podInfo.Pod + for _, podGotFromBackoffQ := range pods { if !wantPodNames.Has(podGotFromBackoffQ.Name) { t.Fatalf("Pod %v was not expected to be in the backoffQ.", podGotFromBackoffQ.Name) } @@ -1121,9 +1120,9 @@ func TestPriorityQueue_Update(t *testing.T) { q.unschedulablePods.addOrUpdate(attemptQueuedPodInfo(q.newQueuedPodInfo(medPriorityPodInfo.Pod, queuePlugin)), framework.EventUnscheduledPodAdd.Label()) updatedPod := medPriorityPodInfo.Pod.DeepCopy() updatedPod.Annotations["foo"] = "test1" - // Move clock by podInitialBackoffDuration, so that pods in the unschedulablePods would pass the backing off, + // Move clock by podMaxBackoffDuration, so that pods in the unschedulablePods would pass the backing off, // and the pods will be moved into activeQ. - c.Step(q.backoffQ.podInitialBackoffDuration()) + c.Step(q.backoffQ.podMaxBackoffDuration()) return medPriorityPodInfo.Pod, updatedPod }, schedulingHintsEnablement: []bool{false, true}, @@ -2000,9 +1999,9 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueue(t *testing.T) { t.Errorf("Expected %v in the backoffQ", hpp1.Name) } - // Move clock by podInitialBackoffDuration, so that pods in the unschedulablePods would pass the backing off, + // Move clock by podMaxBackoffDuration, so that pods in the unschedulablePods would pass the backing off, // and the pods will be moved into activeQ. - c.Step(q.backoffQ.podInitialBackoffDuration()) + c.Step(q.backoffQ.podMaxBackoffDuration()) q.flushBackoffQCompleted(logger) // flush the completed backoffQ to move hpp1 to activeQ. q.MoveAllToActiveOrBackoffQueue(logger, nodeAdd, nil, nil, nil) if q.activeQ.len() != 4 { @@ -2104,9 +2103,9 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueueWithOutQueueingHint(t *testi t.Errorf("Expected %v in the backoffQ", hpp1.Name) } - // Move clock by podInitialBackoffDuration, so that pods in the unschedulablePods would pass the backing off, + // Move clock by podMaxBackoffDuration, so that pods in the unschedulablePods would pass the backing off, // and the pods will be moved into activeQ. - c.Step(q.backoffQ.podInitialBackoffDuration()) + c.Step(q.backoffQ.podMaxBackoffDuration()) q.flushBackoffQCompleted(logger) // flush the completed backoffQ to move hpp1 to activeQ. q.MoveAllToActiveOrBackoffQueue(logger, nodeAdd, nil, nil, nil) if q.activeQ.len() != 4 { @@ -2762,7 +2761,7 @@ func TestRecentlyTriedPodsGoBack(t *testing.T) { if err != nil { t.Fatalf("unexpected error from AddUnschedulableIfNotPresent: %v", err) } - c.Step(DefaultPodInitialBackoffDuration) + c.Step(q.backoffQ.podMaxBackoffDuration()) // Move all unschedulable pods to the active queue. q.MoveAllToActiveOrBackoffQueue(logger, framework.EventUnschedulableTimeout, nil, nil, nil) // Simulation is over. Now let's pop all pods. The pod popped first should be @@ -3130,11 +3129,11 @@ var ( queue.MoveAllToActiveOrBackoffQueue(logger, framework.EventUnschedulableTimeout, nil, nil, nil) } flushBackoffQ = func(t *testing.T, logger klog.Logger, queue *PriorityQueue, _ *framework.QueuedPodInfo) { - queue.clock.(*testingclock.FakeClock).Step(2 * time.Second) + queue.clock.(*testingclock.FakeClock).Step(3 * time.Second) queue.flushBackoffQCompleted(logger) } moveClockForward = func(t *testing.T, logger klog.Logger, queue *PriorityQueue, _ *framework.QueuedPodInfo) { - queue.clock.(*testingclock.FakeClock).Step(2 * time.Second) + queue.clock.(*testingclock.FakeClock).Step(3 * time.Second) } flushUnscheduledQ = func(t *testing.T, logger klog.Logger, queue *PriorityQueue, _ *framework.QueuedPodInfo) { queue.clock.(*testingclock.FakeClock).Step(queue.podMaxInUnschedulablePodsDuration) @@ -3731,7 +3730,6 @@ func TestBackOffFlow(t *testing.T) { logger, ctx := ktesting.NewTestContext(t) ctx, cancel := context.WithCancel(ctx) defer cancel() - q := NewTestQueue(ctx, newDefaultQueueSort(), WithClock(cl)) steps := []struct { wantBackoff time.Duration }{ @@ -3743,58 +3741,67 @@ func TestBackOffFlow(t *testing.T) { {wantBackoff: 10 * time.Second}, {wantBackoff: 10 * time.Second}, } - pod := st.MakePod().Name("test-pod").Namespace("test-ns").UID("test-uid").Obj() + for _, popFromBackoffQEnabled := range []bool{true, false} { + featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.SchedulerPopFromBackoffQ, popFromBackoffQEnabled) - podID := types.NamespacedName{ - Namespace: pod.Namespace, - Name: pod.Name, - } - q.Add(logger, pod) + q := NewTestQueue(ctx, newDefaultQueueSort(), WithClock(cl)) - for i, step := range steps { - t.Run(fmt.Sprintf("step %d", i), func(t *testing.T) { - timestamp := cl.Now() - // Simulate schedule attempt. - podInfo, err := q.Pop(logger) - if err != nil { - t.Fatal(err) - } - if podInfo.Attempts != i+1 { - t.Errorf("got attempts %d, want %d", podInfo.Attempts, i+1) - } - err = q.AddUnschedulableIfNotPresent(logger, podInfo, int64(i)) - if err != nil { - t.Fatalf("unexpected error from AddUnschedulableIfNotPresent: %v", err) - } + pod := st.MakePod().Name("test-pod").Namespace("test-ns").UID("test-uid").Obj() + podID := types.NamespacedName{ + Namespace: pod.Namespace, + Name: pod.Name, + } + q.Add(logger, pod) - // An event happens. - q.MoveAllToActiveOrBackoffQueue(logger, framework.EventUnschedulableTimeout, nil, nil, nil) + for i, step := range steps { + t.Run(fmt.Sprintf("step %d popFromBackoffQEnabled(%v)", i, popFromBackoffQEnabled), func(t *testing.T) { + timestamp := cl.Now() + // Simulate schedule attempt. + podInfo, err := q.Pop(logger) + if err != nil { + t.Fatal(err) + } + if podInfo.Attempts != i+1 { + t.Errorf("got attempts %d, want %d", podInfo.Attempts, i+1) + } + err = q.AddUnschedulableIfNotPresent(logger, podInfo, int64(i)) + if err != nil { + t.Fatalf("unexpected error from AddUnschedulableIfNotPresent: %v", err) + } - if !q.backoffQ.has(podInfo) { - t.Errorf("pod %v is not in the backoff queue", podID) - } + // An event happens. + q.MoveAllToActiveOrBackoffQueue(logger, framework.EventUnschedulableTimeout, nil, nil, nil) - // Check backoff duration. - deadline := podInfo.BackoffExpiration - backoff := deadline.Sub(timestamp) - if backoff != step.wantBackoff { - t.Errorf("got backoff %s, want %s", backoff, step.wantBackoff) - } + if !q.backoffQ.has(podInfo) { + t.Errorf("pod %v is not in the backoff queue", podID) + } - // Simulate routine that continuously flushes the backoff queue. - cl.Step(time.Millisecond) - q.flushBackoffQCompleted(logger) - // Still in backoff queue after an early flush. - if !q.backoffQ.has(podInfo) { - t.Errorf("pod %v is not in the backoff queue", podID) - } - // Moved out of the backoff queue after timeout. - cl.Step(backoff) - q.flushBackoffQCompleted(logger) - if q.backoffQ.has(podInfo) { - t.Errorf("pod %v is still in the backoff queue", podID) - } - }) + // Check backoff duration. + deadline := podInfo.BackoffExpiration + backoff := deadline.Sub(timestamp) + if popFromBackoffQEnabled { + // If popFromBackoffQEnabled, the actual backoff can be calculated by rounding up to the ordering window duration. + backoff = backoff.Truncate(backoffQOrderingWindowDuration) + backoffQOrderingWindowDuration + } + if backoff != step.wantBackoff { + t.Errorf("got backoff %s, want %s", backoff, step.wantBackoff) + } + + // Simulate routine that continuously flushes the backoff queue. + cl.Step(backoffQOrderingWindowDuration) + q.flushBackoffQCompleted(logger) + // Still in backoff queue after an early flush. + if !q.backoffQ.has(podInfo) { + t.Errorf("pod %v is not in the backoff queue", podID) + } + // Moved out of the backoff queue after timeout. + cl.Step(backoff) + q.flushBackoffQCompleted(logger) + if q.backoffQ.has(podInfo) { + t.Errorf("pod %v is still in the backoff queue", podID) + } + }) + } } } @@ -3812,20 +3819,20 @@ func TestMoveAllToActiveOrBackoffQueue_PreEnqueueChecks(t *testing.T) { preEnqueueCheck PreEnqueueCheck podInfos []*framework.QueuedPodInfo event framework.ClusterEvent - want []string + want sets.Set[string] }{ { name: "nil PreEnqueueCheck", podInfos: podInfos, event: framework.EventUnschedulableTimeout, - want: []string{"p0", "p1", "p2", "p3", "p4"}, + want: sets.New("p0", "p1", "p2", "p3", "p4"), }, { name: "move Pods with priority greater than 2", podInfos: podInfos, event: framework.EventUnschedulableTimeout, preEnqueueCheck: func(pod *v1.Pod) bool { return *pod.Spec.Priority >= 2 }, - want: []string{"p2", "p3", "p4"}, + want: sets.New("p2", "p3", "p4"), }, { name: "move Pods with even priority and greater than 2", @@ -3834,7 +3841,7 @@ func TestMoveAllToActiveOrBackoffQueue_PreEnqueueChecks(t *testing.T) { preEnqueueCheck: func(pod *v1.Pod) bool { return *pod.Spec.Priority%2 == 0 && *pod.Spec.Priority >= 2 }, - want: []string{"p2", "p4"}, + want: sets.New("p2", "p4"), }, { name: "move Pods with even and negative priority", @@ -3856,12 +3863,12 @@ func TestMoveAllToActiveOrBackoffQueue_PreEnqueueChecks(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - c := testingclock.NewFakeClock(time.Now()) + c := testingclock.NewFakeClock(time.Now().Truncate(backoffQOrderingWindowDuration)) logger, ctx := ktesting.NewTestContext(t) ctx, cancel := context.WithCancel(ctx) defer cancel() q := NewTestQueue(ctx, newDefaultQueueSort(), WithClock(c)) - for i, podInfo := range tt.podInfos { + for _, podInfo := range tt.podInfos { // To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before AddUnschedulableIfNotPresent() below. q.Add(logger, podInfo.Pod) if p, err := q.Pop(logger); err != nil || p.Pod != podInfo.Pod { @@ -3872,18 +3879,12 @@ func TestMoveAllToActiveOrBackoffQueue_PreEnqueueChecks(t *testing.T) { if err != nil { t.Fatalf("unexpected error from AddUnschedulableIfNotPresent: %v", err) } - // NOTE: On Windows, time.Now() is not as precise, 2 consecutive calls may return the same timestamp, - // resulting in 0 time delta / latency. This will cause the pods to be backed off in a random - // order, which would cause this test to fail, since the expectation is for them to be backed off - // in a certain order. - // See: https://github.com/golang/go/issues/8687 - podInfo.Timestamp = podInfo.Timestamp.Add(time.Duration((i - len(tt.podInfos))) * time.Millisecond) } q.MoveAllToActiveOrBackoffQueue(logger, tt.event, nil, nil, tt.preEnqueueCheck) - var got []string + got := sets.New[string]() c.Step(2 * q.backoffQ.podMaxBackoffDuration()) q.backoffQ.popEachBackoffCompleted(logger, func(pInfo *framework.QueuedPodInfo) { - got = append(got, pInfo.Pod.Name) + got.Insert(pInfo.Pod.Name) }) if diff := cmp.Diff(tt.want, got); diff != "" { t.Errorf("Unexpected diff (-want, +got):\n%s", diff) diff --git a/pkg/scheduler/eventhandlers_test.go b/pkg/scheduler/eventhandlers_test.go index a1bd9321d01..8c631953443 100644 --- a/pkg/scheduler/eventhandlers_test.go +++ b/pkg/scheduler/eventhandlers_test.go @@ -98,9 +98,9 @@ func TestEventHandlers_MoveToActiveOnNominatedNodeUpdate(t *testing.T) { } tests := []struct { - name string - updateFunc func(s *Scheduler) - wantInActive sets.Set[string] + name string + updateFunc func(s *Scheduler) + wantInActiveOrBackoff sets.Set[string] }{ { name: "Update of a nominated node name to a different value should trigger rescheduling of lower priority pods", @@ -110,7 +110,7 @@ func TestEventHandlers_MoveToActiveOnNominatedNodeUpdate(t *testing.T) { updatedPod.ResourceVersion = "1" s.updatePodInSchedulingQueue(medNominatedPriorityPod, updatedPod) }, - wantInActive: sets.New(lowPriorityPod.Name, medPriorityPod.Name, medNominatedPriorityPod.Name), + wantInActiveOrBackoff: sets.New(lowPriorityPod.Name, medPriorityPod.Name, medNominatedPriorityPod.Name), }, { name: "Removal of a nominated node name should trigger rescheduling of lower priority pods", @@ -120,14 +120,14 @@ func TestEventHandlers_MoveToActiveOnNominatedNodeUpdate(t *testing.T) { updatedPod.ResourceVersion = "1" s.updatePodInSchedulingQueue(medNominatedPriorityPod, updatedPod) }, - wantInActive: sets.New(lowPriorityPod.Name, medPriorityPod.Name, medNominatedPriorityPod.Name), + wantInActiveOrBackoff: sets.New(lowPriorityPod.Name, medPriorityPod.Name, medNominatedPriorityPod.Name), }, { name: "Removal of a pod that had nominated node name should trigger rescheduling of lower priority pods", updateFunc: func(s *Scheduler) { s.deletePodFromSchedulingQueue(medNominatedPriorityPod) }, - wantInActive: sets.New(lowPriorityPod.Name, medPriorityPod.Name), + wantInActiveOrBackoff: sets.New(lowPriorityPod.Name, medPriorityPod.Name), }, { name: "Addition of a nominated node name to the high priority pod that did not have it before shouldn't trigger rescheduling", @@ -137,7 +137,7 @@ func TestEventHandlers_MoveToActiveOnNominatedNodeUpdate(t *testing.T) { updatedPod.ResourceVersion = "1" s.updatePodInSchedulingQueue(highPriorityPod, updatedPod) }, - wantInActive: sets.New[string](), + wantInActiveOrBackoff: sets.New[string](), }, } @@ -190,12 +190,15 @@ func TestEventHandlers_MoveToActiveOnNominatedNodeUpdate(t *testing.T) { t.Errorf("No pods were expected to be in the activeQ before the update, but there were %v", s.SchedulingQueue.PodsInActiveQ()) } tt.updateFunc(s) - if len(s.SchedulingQueue.PodsInActiveQ()) != len(tt.wantInActive) { - t.Errorf("Different number of pods were expected to be in the activeQ, but found actual %v vs. expected %v", s.SchedulingQueue.PodsInActiveQ(), tt.wantInActive) + + podsInActiveOrBackoff := s.SchedulingQueue.PodsInActiveQ() + podsInActiveOrBackoff = append(podsInActiveOrBackoff, s.SchedulingQueue.PodsInBackoffQ()...) + if len(podsInActiveOrBackoff) != len(tt.wantInActiveOrBackoff) { + t.Errorf("Different number of pods were expected to be in the activeQ or backoffQ, but found actual %v vs. expected %v", podsInActiveOrBackoff, tt.wantInActiveOrBackoff) } - for _, pod := range s.SchedulingQueue.PodsInActiveQ() { - if !tt.wantInActive.Has(pod.Name) { - t.Errorf("Found unexpected pod in activeQ: %s", pod.Name) + for _, pod := range podsInActiveOrBackoff { + if !tt.wantInActiveOrBackoff.Has(pod.Name) { + t.Errorf("Found unexpected pod in activeQ or backoffQ: %s", pod.Name) } } }) diff --git a/pkg/scheduler/framework/types.go b/pkg/scheduler/framework/types.go index 5fd10a33ddf..fbc02a223a6 100644 --- a/pkg/scheduler/framework/types.go +++ b/pkg/scheduler/framework/types.go @@ -367,6 +367,9 @@ type QueuedPodInfo struct { // It's used to record the # attempts metric and calculate the backoff time this Pod is obliged to get before retrying. Attempts int // BackoffExpiration is the time when the Pod will complete its backoff. + // If the SchedulerPopFromBackoffQ feature is enabled, the value is aligned to the backoff ordering window. + // Then, two Pods with the same BackoffExpiration (time bucket) are ordered by priority and eventually the timestamp, + // to make sure popping from the backoffQ considers priority of pods that are close to the expiration time. BackoffExpiration time.Time // The time when the pod is added to the queue for the first time. The pod may be added // back to the queue multiple times before it's successfully scheduled. diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 27cc8b294f6..0fd972fbc73 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -118,7 +118,7 @@ func (sched *Scheduler) applyDefaultHandlers() { } type schedulerOptions struct { - clock clock.Clock + clock clock.WithTicker componentConfigVersion string kubeConfig *restclient.Config // Overridden by profile level percentageOfNodesToScore if set in v1. @@ -231,7 +231,7 @@ func WithExtenders(e ...schedulerapi.Extender) Option { } // WithClock sets clock for PriorityQueue, the default clock is clock.RealClock. -func WithClock(clock clock.Clock) Option { +func WithClock(clock clock.WithTicker) Option { return func(o *schedulerOptions) { o.clock = clock } diff --git a/test/integration/scheduler/eventhandler/eventhandler_test.go b/test/integration/scheduler/eventhandler/eventhandler_test.go index ef9b266b6b1..7f183c687b9 100644 --- a/test/integration/scheduler/eventhandler/eventhandler_test.go +++ b/test/integration/scheduler/eventhandler/eventhandler_test.go @@ -270,8 +270,8 @@ func TestUpdateNominatedNodeName(t *testing.T) { // Note that the update has to happen since the nominated pod is still in the backoffQ to actually test updates of nominated, but not bound yet pods. tt.updateFunc(testCtx) - // Advance time by the maxPodBackoffSeconds to move low priority pod out of the backoff queue. - fakeClock.Step(testBackoff) + // Advance time by the 2 * maxPodBackoffSeconds to move low priority pod out of the backoff queue. + fakeClock.Step(2 * testBackoff) // Expect the low-priority pod is notified about unnominated mid-pririty pod and gets scheduled, as it should fit this time. if err := testutils.WaitForPodToSchedule(testCtx.Ctx, testCtx.ClientSet, podLow); err != nil { diff --git a/test/integration/scheduler/preemption/preemption_test.go b/test/integration/scheduler/preemption/preemption_test.go index 348f56a384b..a1b6acb7458 100644 --- a/test/integration/scheduler/preemption/preemption_test.go +++ b/test/integration/scheduler/preemption/preemption_test.go @@ -886,6 +886,9 @@ func TestAsyncPreemption(t *testing.T) { } logger, _ := ktesting.NewTestContext(t) + testCtx.Scheduler.SchedulingQueue.Run(logger) + defer testCtx.Scheduler.SchedulingQueue.Close() + featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.SchedulerAsyncPreemption, true) createdPods := []*v1.Pod{} diff --git a/test/integration/scheduler/queueing/queue.go b/test/integration/scheduler/queueing/queue.go index 581a13e268d..503e05d9d8f 100644 --- a/test/integration/scheduler/queueing/queue.go +++ b/test/integration/scheduler/queueing/queue.go @@ -41,6 +41,7 @@ import ( "k8s.io/kubernetes/pkg/scheduler/framework/plugins/names" st "k8s.io/kubernetes/pkg/scheduler/testing" testutils "k8s.io/kubernetes/test/integration/util" + "k8s.io/kubernetes/test/utils/ktesting" "k8s.io/utils/ptr" ) @@ -2264,6 +2265,7 @@ var CoreResourceEnqueueTestCases = []*CoreResourceEnqueueTestCase{ func RunTestCoreResourceEnqueue(t *testing.T, tt *CoreResourceEnqueueTestCase) { t.Helper() featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.InPlacePodVerticalScaling, true) + logger, _ := ktesting.NewTestContext(t) opts := []scheduler.Option{scheduler.WithPodInitialBackoffSeconds(0), scheduler.WithPodMaxBackoffSeconds(0)} if tt.EnablePlugins != nil { @@ -2303,6 +2305,7 @@ func RunTestCoreResourceEnqueue(t *testing.T, tt *CoreResourceEnqueueTestCase) { ) testutils.SyncSchedulerInformerFactory(testCtx) + testCtx.Scheduler.SchedulingQueue.Run(logger) defer testCtx.Scheduler.SchedulingQueue.Close() cs, ns, ctx := testCtx.ClientSet, testCtx.NS.Name, testCtx.Ctx