From c7919f5e22e3d85358040892c24b8976d0bbd2fa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maciej=20Skocze=C5=84?= Date: Mon, 10 Mar 2025 08:56:21 +0000 Subject: [PATCH] Pop from the backoffQ when the activeQ is empty --- pkg/scheduler/backend/queue/active_queue.go | 37 ++++++- .../backend/queue/active_queue_test.go | 2 +- pkg/scheduler/backend/queue/backoff_queue.go | 83 +++++++++++--- .../backend/queue/backoff_queue_test.go | 12 ++- pkg/scheduler/backend/queue/nominator.go | 6 +- .../backend/queue/scheduling_queue.go | 59 ++++++---- .../backend/queue/scheduling_queue_test.go | 102 ++++++++++++++---- pkg/scheduler/framework/events.go | 2 + .../eventhandler/eventhandler_test.go | 2 + .../scheduler/plugins/plugins_test.go | 22 ++++ .../scheduler/queueing/queue_test.go | 35 ++++++ 11 files changed, 298 insertions(+), 64 deletions(-) diff --git a/pkg/scheduler/backend/queue/active_queue.go b/pkg/scheduler/backend/queue/active_queue.go index 27293b39a76..14c734d7561 100644 --- a/pkg/scheduler/backend/queue/active_queue.go +++ b/pkg/scheduler/backend/queue/active_queue.go @@ -113,14 +113,22 @@ func (uaq *unlockedActiveQueue) has(pInfo *framework.QueuedPodInfo) bool { return uaq.queue.Has(pInfo) } +// backoffQPopper defines method that is used to pop from the backoffQ when the activeQ is empty. +type backoffQPopper interface { + // popBackoff pops the pInfo from the podBackoffQ. + popBackoff() (*framework.QueuedPodInfo, error) + // len returns length of the podBackoffQ queue. + lenBackoff() int +} + // activeQueue implements activeQueuer. All of the fields have to be protected using the lock. type activeQueue struct { // lock synchronizes all operations related to activeQ. // It protects activeQ, inFlightPods, inFlightEvents, schedulingCycle and closed fields. // Caution: DO NOT take "SchedulingQueue.lock" after taking "lock". // You should always take "SchedulingQueue.lock" first, otherwise the queue could end up in deadlock. - // "lock" should not be taken after taking "nLock". - // Correct locking order is: SchedulingQueue.lock > lock > nominator.nLock. + // "lock" should not be taken after taking "backoffQueue.lock" or "nominator.nLock". + // Correct locking order is: SchedulingQueue.lock > lock > backoffQueue.lock > nominator.nLock. lock sync.RWMutex // activeQ is heap structure that scheduler actively looks at to find pods to @@ -132,6 +140,8 @@ type activeQueue struct { unlockedQueue *unlockedActiveQueue // cond is a condition that is notified when the pod is added to activeQ. + // When SchedulerPopFromBackoffQ feature is enabled, + // condition is also notified when the pod is added to backoffQ. // It is used with lock. cond sync.Cond @@ -171,9 +181,13 @@ type activeQueue struct { isSchedulingQueueHintEnabled bool metricsRecorder metrics.MetricAsyncRecorder + + // backoffQPopper is used to pop from backoffQ when activeQ is empty. + // It is non-nil only when SchedulerPopFromBackoffQ feature is enabled. + backoffQPopper backoffQPopper } -func newActiveQueue(queue *heap.Heap[*framework.QueuedPodInfo], isSchedulingQueueHintEnabled bool, metricRecorder metrics.MetricAsyncRecorder) *activeQueue { +func newActiveQueue(queue *heap.Heap[*framework.QueuedPodInfo], isSchedulingQueueHintEnabled bool, metricRecorder metrics.MetricAsyncRecorder, backoffQPopper backoffQPopper) *activeQueue { aq := &activeQueue{ queue: queue, inFlightPods: make(map[types.UID]*list.Element), @@ -181,6 +195,7 @@ func newActiveQueue(queue *heap.Heap[*framework.QueuedPodInfo], isSchedulingQueu isSchedulingQueueHintEnabled: isSchedulingQueueHintEnabled, metricsRecorder: metricRecorder, unlockedQueue: newUnlockedActiveQueue(queue), + backoffQPopper: backoffQPopper, } aq.cond.L = &aq.lock @@ -238,7 +253,13 @@ func (aq *activeQueue) pop(logger klog.Logger) (*framework.QueuedPodInfo, error) } func (aq *activeQueue) unlockedPop(logger klog.Logger) (*framework.QueuedPodInfo, error) { + var pInfo *framework.QueuedPodInfo for aq.queue.Len() == 0 { + // backoffQPopper is non-nil only if SchedulerPopFromBackoffQ feature is enabled. + // In case of non-empty backoffQ, try popping from there. + if aq.backoffQPopper != nil && aq.backoffQPopper.lenBackoff() != 0 { + break + } // When the queue is empty, invocation of Pop() is blocked until new item is enqueued. // When Close() is called, the p.closed is set and the condition is broadcast, // which causes this loop to continue and return from the Pop(). @@ -250,7 +271,15 @@ func (aq *activeQueue) unlockedPop(logger klog.Logger) (*framework.QueuedPodInfo } pInfo, err := aq.queue.Pop() if err != nil { - return nil, err + if aq.backoffQPopper == nil { + return nil, err + } + // Try to pop from backoffQ when activeQ is empty. + pInfo, err = aq.backoffQPopper.popBackoff() + if err != nil { + return nil, err + } + metrics.SchedulerQueueIncomingPods.WithLabelValues("active", framework.PopFromBackoffQ).Inc() } pInfo.Attempts++ pInfo.BackoffExpiration = time.Time{} diff --git a/pkg/scheduler/backend/queue/active_queue_test.go b/pkg/scheduler/backend/queue/active_queue_test.go index df4b553db82..b49f786d4f6 100644 --- a/pkg/scheduler/backend/queue/active_queue_test.go +++ b/pkg/scheduler/backend/queue/active_queue_test.go @@ -30,7 +30,7 @@ import ( func TestClose(t *testing.T) { logger, ctx := ktesting.NewTestContext(t) rr := metrics.NewMetricsAsyncRecorder(10, time.Second, ctx.Done()) - aq := newActiveQueue(heap.NewWithRecorder(podInfoKeyFunc, heap.LessFunc[*framework.QueuedPodInfo](newDefaultQueueSort()), metrics.NewActivePodsRecorder()), true, *rr) + aq := newActiveQueue(heap.NewWithRecorder(podInfoKeyFunc, heap.LessFunc[*framework.QueuedPodInfo](newDefaultQueueSort()), metrics.NewActivePodsRecorder()), true, *rr, nil) aq.underLock(func(unlockedActiveQ unlockedActiveQueuer) { unlockedActiveQ.add(&framework.QueuedPodInfo{PodInfo: &framework.PodInfo{Pod: st.MakePod().Namespace("foo").Name("p1").UID("p1").Obj()}}, framework.EventUnscheduledPodAdd.Label()) diff --git a/pkg/scheduler/backend/queue/backoff_queue.go b/pkg/scheduler/backend/queue/backoff_queue.go index 41c053389d3..6e0bbd6808c 100644 --- a/pkg/scheduler/backend/queue/backoff_queue.go +++ b/pkg/scheduler/backend/queue/backoff_queue.go @@ -17,6 +17,7 @@ limitations under the License. package queue import ( + "sync" "time" v1 "k8s.io/api/core/v1" @@ -35,13 +36,14 @@ import ( const backoffQOrderingWindowDuration = time.Second // backoffQueuer is a wrapper for backoffQ related operations. +// Its methods that relies on the queues, take the lock inside. type backoffQueuer interface { // isPodBackingoff returns true if a pod is still waiting for its backoff timer. // If this returns true, the pod should not be re-tried. // If the pod backoff time is in the actual ordering window, it should still be backing off. isPodBackingoff(podInfo *framework.QueuedPodInfo) bool - // popEachBackoffCompleted run fn for all pods from podBackoffQ and podErrorBackoffQ that completed backoff while popping them. - popEachBackoffCompleted(logger klog.Logger, fn func(pInfo *framework.QueuedPodInfo)) + // popAllBackoffCompleted pops all pods from podBackoffQ and podErrorBackoffQ that completed backoff. + popAllBackoffCompleted(logger klog.Logger) []*framework.QueuedPodInfo // podInitialBackoffDuration returns initial backoff duration that pod can get. podInitialBackoffDuration() time.Duration @@ -61,7 +63,8 @@ type backoffQueuer interface { // It returns new pod info if updated, nil otherwise. update(newPod *v1.Pod, oldPodInfo *framework.QueuedPodInfo) *framework.QueuedPodInfo // delete deletes the pInfo from backoffQueue. - delete(pInfo *framework.QueuedPodInfo) + // It returns true if the pod was deleted. + delete(pInfo *framework.QueuedPodInfo) bool // get returns the pInfo matching given pInfoLookup, if exists. get(pInfoLookup *framework.QueuedPodInfo) (*framework.QueuedPodInfo, bool) // has inform if pInfo exists in the queue. @@ -75,6 +78,14 @@ type backoffQueuer interface { // backoffQueue implements backoffQueuer and wraps two queues inside, // providing seamless access as if it were one queue. type backoffQueue struct { + // lock synchronizes all operations related to backoffQ. + // It protects both podBackoffQ and podErrorBackoffQ. + // Caution: DO NOT take "SchedulingQueue.lock" or "activeQueue.lock" after taking "lock". + // You should always take "SchedulingQueue.lock" and "activeQueue.lock" first, otherwise the queue could end up in deadlock. + // "lock" should not be taken after taking "nominator.nLock". + // Correct locking order is: SchedulingQueue.lock > activeQueue.lock > lock > nominator.nLock. + lock sync.RWMutex + clock clock.WithTicker // podBackoffQ is a heap ordered by backoff expiry. Pods which have completed backoff @@ -239,7 +250,8 @@ func (bq *backoffQueue) calculateBackoffDuration(podInfo *framework.QueuedPodInf return duration } -func (bq *backoffQueue) popEachBackoffCompletedWithQueue(logger klog.Logger, fn func(pInfo *framework.QueuedPodInfo), queue *heap.Heap[*framework.QueuedPodInfo]) { +func (bq *backoffQueue) popAllBackoffCompletedWithQueue(logger klog.Logger, queue *heap.Heap[*framework.QueuedPodInfo]) []*framework.QueuedPodInfo { + var poppedPods []*framework.QueuedPodInfo for { pInfo, ok := queue.Peek() if !ok || pInfo == nil { @@ -254,23 +266,27 @@ func (bq *backoffQueue) popEachBackoffCompletedWithQueue(logger klog.Logger, fn logger.Error(err, "Unable to pop pod from backoff queue despite backoff completion", "pod", klog.KObj(pod)) break } - if fn != nil { - fn(pInfo) - } + poppedPods = append(poppedPods, pInfo) } + return poppedPods } -// popEachBackoffCompleted run fn for all pods from podBackoffQ and podErrorBackoffQ that completed backoff while popping them. -func (bq *backoffQueue) popEachBackoffCompleted(logger klog.Logger, fn func(pInfo *framework.QueuedPodInfo)) { +// popAllBackoffCompleted pops all pods from podBackoffQ and podErrorBackoffQ that completed backoff. +func (bq *backoffQueue) popAllBackoffCompleted(logger klog.Logger) []*framework.QueuedPodInfo { + bq.lock.Lock() + defer bq.lock.Unlock() + // Ensure both queues are called - bq.popEachBackoffCompletedWithQueue(logger, fn, bq.podBackoffQ) - bq.popEachBackoffCompletedWithQueue(logger, fn, bq.podErrorBackoffQ) + return append(bq.popAllBackoffCompletedWithQueue(logger, bq.podBackoffQ), bq.popAllBackoffCompletedWithQueue(logger, bq.podErrorBackoffQ)...) } // add adds the pInfo to backoffQueue. // The event should show which event triggered this addition and is used for the metric recording. // It also ensures that pInfo is not in both queues. func (bq *backoffQueue) add(logger klog.Logger, pInfo *framework.QueuedPodInfo, event string) { + bq.lock.Lock() + defer bq.lock.Unlock() + // If pod has empty both unschedulable plugins and pending plugins, // it means that it failed because of error and should be moved to podErrorBackoffQ. if pInfo.UnschedulablePlugins.Len() == 0 && pInfo.PendingPlugins.Len() == 0 { @@ -297,6 +313,9 @@ func (bq *backoffQueue) add(logger klog.Logger, pInfo *framework.QueuedPodInfo, // update updates the pod in backoffQueue if oldPodInfo is already in the queue. // It returns new pod info if updated, nil otherwise. func (bq *backoffQueue) update(newPod *v1.Pod, oldPodInfo *framework.QueuedPodInfo) *framework.QueuedPodInfo { + bq.lock.Lock() + defer bq.lock.Unlock() + // If the pod is in the backoff queue, update it there. if pInfo, exists := bq.podBackoffQ.Get(oldPodInfo); exists { _ = pInfo.Update(newPod) @@ -313,13 +332,32 @@ func (bq *backoffQueue) update(newPod *v1.Pod, oldPodInfo *framework.QueuedPodIn } // delete deletes the pInfo from backoffQueue. -func (bq *backoffQueue) delete(pInfo *framework.QueuedPodInfo) { - _ = bq.podBackoffQ.Delete(pInfo) - _ = bq.podErrorBackoffQ.Delete(pInfo) +// It returns true if the pod was deleted. +func (bq *backoffQueue) delete(pInfo *framework.QueuedPodInfo) bool { + bq.lock.Lock() + defer bq.lock.Unlock() + + if bq.podBackoffQ.Delete(pInfo) == nil { + return true + } + return bq.podErrorBackoffQ.Delete(pInfo) == nil +} + +// popBackoff pops the pInfo from the podBackoffQ. +// It returns error if the queue is empty. +// This doesn't pop the pods from the podErrorBackoffQ. +func (bq *backoffQueue) popBackoff() (*framework.QueuedPodInfo, error) { + bq.lock.Lock() + defer bq.lock.Unlock() + + return bq.podBackoffQ.Pop() } // get returns the pInfo matching given pInfoLookup, if exists. func (bq *backoffQueue) get(pInfoLookup *framework.QueuedPodInfo) (*framework.QueuedPodInfo, bool) { + bq.lock.RLock() + defer bq.lock.RUnlock() + pInfo, exists := bq.podBackoffQ.Get(pInfoLookup) if exists { return pInfo, true @@ -329,11 +367,17 @@ func (bq *backoffQueue) get(pInfoLookup *framework.QueuedPodInfo) (*framework.Qu // has inform if pInfo exists in the queue. func (bq *backoffQueue) has(pInfo *framework.QueuedPodInfo) bool { + bq.lock.RLock() + defer bq.lock.RUnlock() + return bq.podBackoffQ.Has(pInfo) || bq.podErrorBackoffQ.Has(pInfo) } // list returns all pods that are in the queue. func (bq *backoffQueue) list() []*v1.Pod { + bq.lock.RLock() + defer bq.lock.RUnlock() + var result []*v1.Pod for _, pInfo := range bq.podBackoffQ.List() { result = append(result, pInfo.Pod) @@ -346,5 +390,16 @@ func (bq *backoffQueue) list() []*v1.Pod { // len returns length of the queue. func (bq *backoffQueue) len() int { + bq.lock.RLock() + defer bq.lock.RUnlock() + return bq.podBackoffQ.Len() + bq.podErrorBackoffQ.Len() } + +// lenBackoff returns length of the podBackoffQ. +func (bq *backoffQueue) lenBackoff() int { + bq.lock.RLock() + defer bq.lock.RUnlock() + + return bq.podBackoffQ.Len() +} diff --git a/pkg/scheduler/backend/queue/backoff_queue_test.go b/pkg/scheduler/backend/queue/backoff_queue_test.go index c26e3271262..8ebf7799729 100644 --- a/pkg/scheduler/backend/queue/backoff_queue_test.go +++ b/pkg/scheduler/backend/queue/backoff_queue_test.go @@ -78,7 +78,7 @@ func TestBackoffQueue_calculateBackoffDuration(t *testing.T) { } } -func TestBackoffQueue_popEachBackoffCompleted(t *testing.T) { +func TestBackoffQueue_popAllBackoffCompleted(t *testing.T) { fakeClock := testingclock.NewFakeClock(time.Now()) podInfos := map[string]*framework.QueuedPodInfo{ "pod0": { @@ -156,10 +156,11 @@ func TestBackoffQueue_popEachBackoffCompleted(t *testing.T) { for _, podName := range tt.podsInBackoff { bq.add(logger, podInfos[podName], framework.EventUnscheduledPodAdd.Label()) } + gotPodInfos := bq.popAllBackoffCompleted(logger) var gotPods []string - bq.popEachBackoffCompleted(logger, func(pInfo *framework.QueuedPodInfo) { + for _, pInfo := range gotPodInfos { gotPods = append(gotPods, pInfo.Pod.Name) - }) + } if diff := cmp.Diff(tt.wantPods, gotPods); diff != "" { t.Errorf("Unexpected pods moved (-want, +got):\n%s", diff) } @@ -248,10 +249,11 @@ func TestBackoffQueueOrdering(t *testing.T) { for _, podInfo := range podInfos { bq.add(logger, podInfo, framework.EventUnscheduledPodAdd.Label()) } + gotPodInfos := bq.popAllBackoffCompleted(logger) var gotPods []string - bq.popEachBackoffCompleted(logger, func(pInfo *framework.QueuedPodInfo) { + for _, pInfo := range gotPodInfos { gotPods = append(gotPods, pInfo.Pod.Name) - }) + } if diff := cmp.Diff(tt.wantPods, gotPods); diff != "" { t.Errorf("Unexpected pods moved (-want, +got):\n%s", diff) } diff --git a/pkg/scheduler/backend/queue/nominator.go b/pkg/scheduler/backend/queue/nominator.go index 59f7eaa4802..4d6ceaabcc0 100644 --- a/pkg/scheduler/backend/queue/nominator.go +++ b/pkg/scheduler/backend/queue/nominator.go @@ -35,10 +35,10 @@ import ( type nominator struct { // nLock synchronizes all operations related to nominator. // It should not be used anywhere else. - // Caution: DO NOT take ("SchedulingQueue.lock" or "activeQueue.lock") after taking "nLock". - // You should always take "SchedulingQueue.lock" and "activeQueue.lock" first, + // Caution: DO NOT take ("SchedulingQueue.lock" or "activeQueue.lock" or "backoffQueue.lock") after taking "nLock". + // You should always take "SchedulingQueue.lock" and "activeQueue.lock" and "backoffQueue.lock" first, // otherwise the nominator could end up in deadlock. - // Correct locking order is: SchedulingQueue.lock > activeQueue.lock > nLock. + // Correct locking order is: SchedulingQueue.lock > activeQueue.lock = backoffQueue.lock > nLock. nLock sync.RWMutex // podLister is used to verify if the given pod is alive. diff --git a/pkg/scheduler/backend/queue/scheduling_queue.go b/pkg/scheduler/backend/queue/scheduling_queue.go index 340675c4f9d..d3e3efba785 100644 --- a/pkg/scheduler/backend/queue/scheduling_queue.go +++ b/pkg/scheduler/backend/queue/scheduling_queue.go @@ -160,8 +160,8 @@ type PriorityQueue struct { clock clock.WithTicker // lock takes precedence and should be taken first, - // before any other locks in the queue (activeQueue.lock or nominator.nLock). - // Correct locking order is: lock > activeQueue.lock > nominator.nLock. + // before any other locks in the queue (activeQueue.lock or backoffQueue.lock or nominator.nLock). + // Correct locking order is: lock > activeQueue.lock > backoffQueue.lock > nominator.nLock. lock sync.RWMutex // the maximum time a pod can stay in the unschedulablePods. @@ -331,12 +331,12 @@ func NewPriorityQueue( isSchedulingQueueHintEnabled := utilfeature.DefaultFeatureGate.Enabled(features.SchedulerQueueingHints) isPopFromBackoffQEnabled := utilfeature.DefaultFeatureGate.Enabled(features.SchedulerPopFromBackoffQ) + backoffQ := newBackoffQueue(options.clock, options.podInitialBackoffDuration, options.podMaxBackoffDuration, lessFn, isPopFromBackoffQEnabled) pq := &PriorityQueue{ clock: options.clock, stop: make(chan struct{}), podMaxInUnschedulablePodsDuration: options.podMaxInUnschedulablePodsDuration, - activeQ: newActiveQueue(heap.NewWithRecorder(podInfoKeyFunc, heap.LessFunc[*framework.QueuedPodInfo](lessFn), metrics.NewActivePodsRecorder()), isSchedulingQueueHintEnabled, options.metricsRecorder), - backoffQ: newBackoffQueue(options.clock, options.podInitialBackoffDuration, options.podMaxBackoffDuration, lessFn, isPopFromBackoffQEnabled), + backoffQ: backoffQ, unschedulablePods: newUnschedulablePods(metrics.NewUnschedulablePodsRecorder(), metrics.NewGatedPodsRecorder()), preEnqueuePluginMap: options.preEnqueuePluginMap, queueingHintMap: options.queueingHintMap, @@ -346,6 +346,11 @@ func NewPriorityQueue( isSchedulingQueueHintEnabled: isSchedulingQueueHintEnabled, isPopFromBackoffQEnabled: isPopFromBackoffQEnabled, } + var backoffQPopper backoffQPopper + if isPopFromBackoffQEnabled { + backoffQPopper = backoffQ + } + pq.activeQ = newActiveQueue(heap.NewWithRecorder(podInfoKeyFunc, heap.LessFunc[*framework.QueuedPodInfo](lessFn), metrics.NewActivePodsRecorder()), isSchedulingQueueHintEnabled, options.metricsRecorder, backoffQPopper) pq.nsLister = informerFactory.Core().V1().Namespaces().Lister() pq.nominator = newPodNominator(options.podLister) @@ -672,6 +677,12 @@ func (p *PriorityQueue) activate(logger klog.Logger, pod *v1.Pod) bool { if !exists { return false } + // Delete pod from the backoffQ now to make sure it won't be popped from the backoffQ + // just before moving it to the activeQ + if deleted := p.backoffQ.delete(pInfo); !deleted { + // Pod was popped from the backoffQ in the meantime. Don't activate it. + return false + } } if pInfo == nil { @@ -756,7 +767,11 @@ func (p *PriorityQueue) addUnschedulableWithoutQueueingHint(logger klog.Logger, // - No unschedulable plugins are associated with this Pod, // meaning something unusual (a temporal failure on kube-apiserver, etc) happened and this Pod gets moved back to the queue. // In this case, we should retry scheduling it because this Pod may not be retried until the next flush. - _ = p.moveToBackoffQ(logger, pInfo, framework.ScheduleAttemptFailure) + if added := p.moveToBackoffQ(logger, pInfo, framework.ScheduleAttemptFailure); added { + if p.isPopFromBackoffQEnabled { + p.activeQ.broadcast() + } + } } else { p.unschedulablePods.addOrUpdate(pInfo, framework.ScheduleAttemptFailure) logger.V(5).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pod), "event", framework.ScheduleAttemptFailure, "queue", unschedulablePods) @@ -809,7 +824,7 @@ func (p *PriorityQueue) AddUnschedulableIfNotPresent(logger klog.Logger, pInfo * // In this case, we try to requeue this Pod to activeQ/backoffQ. queue := p.requeuePodViaQueueingHint(logger, pInfo, schedulingHint, framework.ScheduleAttemptFailure) logger.V(3).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pod), "event", framework.ScheduleAttemptFailure, "queue", queue, "schedulingCycle", podSchedulingCycle, "hint", schedulingHint, "unschedulable plugins", rejectorPlugins) - if queue == activeQ { + if queue == activeQ || (p.isPopFromBackoffQEnabled && queue == backoffQ) { // When the Pod is moved to activeQ, need to let p.cond know so that the Pod will be pop()ed out. p.activeQ.broadcast() } @@ -822,11 +837,12 @@ func (p *PriorityQueue) flushBackoffQCompleted(logger klog.Logger) { p.lock.Lock() defer p.lock.Unlock() activated := false - p.backoffQ.popEachBackoffCompleted(logger, func(pInfo *framework.QueuedPodInfo) { + podsCompletedBackoff := p.backoffQ.popAllBackoffCompleted(logger) + for _, pInfo := range podsCompletedBackoff { if added := p.moveToActiveQ(logger, pInfo, framework.BackoffComplete); added { activated = true } - }) + } if activated { p.activeQ.broadcast() } @@ -954,7 +970,7 @@ func (p *PriorityQueue) Update(logger klog.Logger, oldPod, newPod *v1.Pod) { logger.V(5).Info("Pod moved to an internal scheduling queue because the Pod is updated", "pod", klog.KObj(newPod), "event", evt.Label(), "queue", queue) p.unschedulablePods.delete(pInfo.Pod, gated) } - if queue == activeQ { + if queue == activeQ || (p.isPopFromBackoffQEnabled && queue == backoffQ) { p.activeQ.broadcast() break } @@ -967,6 +983,9 @@ func (p *PriorityQueue) Update(logger klog.Logger, oldPod, newPod *v1.Pod) { if p.backoffQ.isPodBackingoff(pInfo) { if added := p.moveToBackoffQ(logger, pInfo, framework.EventUnscheduledPodUpdate.Label()); added { p.unschedulablePods.delete(pInfo.Pod, gated) + if p.isPopFromBackoffQEnabled { + p.activeQ.broadcast() + } } return } @@ -995,12 +1014,14 @@ func (p *PriorityQueue) Delete(pod *v1.Pod) { defer p.lock.Unlock() p.DeleteNominatedPodIfExists(pod) pInfo := newQueuedPodInfoForLookup(pod) - if err := p.activeQ.delete(pInfo); err != nil { - // The item was probably not found in the activeQ. - p.backoffQ.delete(pInfo) - if pInfo = p.unschedulablePods.get(pod); pInfo != nil { - p.unschedulablePods.delete(pod, pInfo.Gated) - } + if err := p.activeQ.delete(pInfo); err == nil { + return + } + if deleted := p.backoffQ.delete(pInfo); deleted { + return + } + if pInfo = p.unschedulablePods.get(pod); pInfo != nil { + p.unschedulablePods.delete(pod, pInfo.Gated) } } @@ -1127,7 +1148,7 @@ func (p *PriorityQueue) movePodsToActiveOrBackoffQueue(logger klog.Logger, podIn p.unschedulablePods.delete(pInfo.Pod, pInfo.Gated) queue := p.requeuePodViaQueueingHint(logger, pInfo, schedulingHint, event.Label()) logger.V(4).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pInfo.Pod), "event", event.Label(), "queue", queue, "hint", schedulingHint) - if queue == activeQ { + if queue == activeQ || (p.isPopFromBackoffQEnabled && queue == backoffQ) { activated = true } } @@ -1222,11 +1243,13 @@ func (p *PriorityQueue) PendingPods() ([]*v1.Pod, string) { defer p.lock.RUnlock() result := p.activeQ.list() activeQLen := len(result) - result = append(result, p.backoffQ.list()...) + backoffQPods := p.backoffQ.list() + backoffQLen := len(backoffQPods) + result = append(result, backoffQPods...) for _, pInfo := range p.unschedulablePods.podInfoMap { result = append(result, pInfo.Pod) } - return result, fmt.Sprintf(pendingPodsSummary, activeQLen, p.backoffQ.len(), len(p.unschedulablePods.podInfoMap)) + return result, fmt.Sprintf(pendingPodsSummary, activeQLen, backoffQLen, len(p.unschedulablePods.podInfoMap)) } // Note: this function assumes the caller locks both p.lock.RLock and p.activeQ.getLock().RLock. diff --git a/pkg/scheduler/backend/queue/scheduling_queue_test.go b/pkg/scheduler/backend/queue/scheduling_queue_test.go index baab4dd441c..d221ff2617e 100644 --- a/pkg/scheduler/backend/queue/scheduling_queue_test.go +++ b/pkg/scheduler/backend/queue/scheduling_queue_test.go @@ -1010,25 +1010,88 @@ func TestPriorityQueue_AddUnschedulableIfNotPresent_Backoff(t *testing.T) { } } -func TestPriorityQueue_Pop(t *testing.T) { - objs := []runtime.Object{medPriorityPodInfo.Pod} - logger, ctx := ktesting.NewTestContext(t) - ctx, cancel := context.WithCancel(ctx) - defer cancel() - q := NewTestQueueWithObjects(ctx, newDefaultQueueSort(), objs) - wg := sync.WaitGroup{} - wg.Add(1) +// tryPop tries to pop one pod from the queue and returns it. +// It waits 5 seconds before timing out, assuming the queue is then empty. +func tryPop(t *testing.T, logger klog.Logger, q *PriorityQueue) *framework.QueuedPodInfo { + t.Helper() + + var gotPod *framework.QueuedPodInfo + popped := make(chan struct{}, 1) go func() { - defer wg.Done() - if p, err := q.Pop(logger); err != nil || p.Pod != medPriorityPodInfo.Pod { - t.Errorf("Expected: %v after Pop, but got: %v", medPriorityPodInfo.Pod.Name, p.Pod.Name) + pod, err := q.Pop(logger) + if err != nil { + t.Errorf("Failed to pop pod from scheduling queue: %s", err) } - if len(q.nominator.nominatedPods["node1"]) != 1 { - t.Errorf("Expected medPriorityPodInfo to be present in nominatedPods: %v", q.nominator.nominatedPods["node1"]) + if pod != nil { + gotPod = pod } + popped <- struct{}{} }() - q.Add(logger, medPriorityPodInfo.Pod) - wg.Wait() + + timer := time.NewTimer(5 * time.Second) + select { + case <-timer.C: + q.Close() + case <-popped: + timer.Stop() + } + return gotPod +} + +func TestPriorityQueue_Pop(t *testing.T) { + highPriorityPodInfo2 := mustNewPodInfo( + st.MakePod().Name("hpp2").Namespace("ns1").UID("hpp2ns1").Priority(highPriority).Obj(), + ) + objs := []runtime.Object{medPriorityPodInfo.Pod, highPriorityPodInfo.Pod, highPriorityPodInfo2.Pod, unschedulablePodInfo.Pod} + tests := []struct { + name string + popFromBackoffQEnabled bool + wantPods []string + }{ + { + name: "Pop pods from both activeQ and backoffQ when PopFromBackoffQ is enabled", + popFromBackoffQEnabled: true, + wantPods: []string{medPriorityPodInfo.Pod.Name, highPriorityPodInfo.Pod.Name}, + }, + { + name: "Pop pod only from activeQ when PopFromBackoffQ is disabled", + popFromBackoffQEnabled: false, + wantPods: []string{medPriorityPodInfo.Pod.Name}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.SchedulerPopFromBackoffQ, tt.popFromBackoffQEnabled) + logger, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + q := NewTestQueueWithObjects(ctx, newDefaultQueueSort(), objs) + + // Add medium priority pod to the activeQ + q.Add(logger, medPriorityPodInfo.Pod) + // Add high priority pod to the backoffQ + backoffPodInfo := q.newQueuedPodInfo(highPriorityPodInfo.Pod, "plugin") + q.backoffQ.add(logger, backoffPodInfo, framework.EventUnscheduledPodAdd.Label()) + // Add high priority pod to the errorBackoffQ + errorBackoffPodInfo := q.newQueuedPodInfo(highPriorityPodInfo2.Pod) + q.backoffQ.add(logger, errorBackoffPodInfo, framework.EventUnscheduledPodAdd.Label()) + // Add pod to the unschedulablePods + unschedulablePodInfo := q.newQueuedPodInfo(unschedulablePodInfo.Pod, "plugin") + q.unschedulablePods.addOrUpdate(unschedulablePodInfo, framework.EventUnscheduledPodAdd.Label()) + + var gotPods []string + for i := 0; i < len(tt.wantPods)+1; i++ { + gotPod := tryPop(t, logger, q) + if gotPod == nil { + break + } + gotPods = append(gotPods, gotPod.Pod.Name) + } + if diff := cmp.Diff(tt.wantPods, gotPods); diff != "" { + t.Errorf("Unexpected popped pods (-want, +got): %s", diff) + } + }) + } } func TestPriorityQueue_Update(t *testing.T) { @@ -1951,7 +2014,7 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueue(t *testing.T) { // pop out the pods in the backoffQ. // This doesn't make them in-flight pods. c.Step(q.backoffQ.podMaxBackoffDuration()) - q.backoffQ.popEachBackoffCompleted(logger, nil) + _ = q.backoffQ.popAllBackoffCompleted(logger) expectInFlightPods(t, q, medPriorityPodInfo.Pod.UID) q.Add(logger, unschedulablePodInfo.Pod) @@ -2074,7 +2137,7 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueueWithOutQueueingHint(t *testi // pop out the pods in the backoffQ. // This doesn't make them in-flight pods. c.Step(q.backoffQ.podMaxBackoffDuration()) - q.backoffQ.popEachBackoffCompleted(logger, nil) + _ = q.backoffQ.popAllBackoffCompleted(logger) unschedulableQueuedPodInfo := attemptQueuedPodInfo(q.newQueuedPodInfo(unschedulablePodInfo.Pod, "fooPlugin")) highPriorityQueuedPodInfo := attemptQueuedPodInfo(q.newQueuedPodInfo(highPriorityPodInfo.Pod, "fooPlugin")) @@ -3883,9 +3946,10 @@ func TestMoveAllToActiveOrBackoffQueue_PreEnqueueChecks(t *testing.T) { q.MoveAllToActiveOrBackoffQueue(logger, tt.event, nil, nil, tt.preEnqueueCheck) got := sets.New[string]() c.Step(2 * q.backoffQ.podMaxBackoffDuration()) - q.backoffQ.popEachBackoffCompleted(logger, func(pInfo *framework.QueuedPodInfo) { + gotPodInfos := q.backoffQ.popAllBackoffCompleted(logger) + for _, pInfo := range gotPodInfos { got.Insert(pInfo.Pod.Name) - }) + } if diff := cmp.Diff(tt.want, got); diff != "" { t.Errorf("Unexpected diff (-want, +got):\n%s", diff) } diff --git a/pkg/scheduler/framework/events.go b/pkg/scheduler/framework/events.go index ab4c31a3f9e..1fe1b0b972b 100644 --- a/pkg/scheduler/framework/events.go +++ b/pkg/scheduler/framework/events.go @@ -31,6 +31,8 @@ const ( ScheduleAttemptFailure = "ScheduleAttemptFailure" // BackoffComplete is the event when a pod finishes backoff. BackoffComplete = "BackoffComplete" + // PopFromBackoffQ is the event when a pod is popped from backoffQ when activeQ is empty. + PopFromBackoffQ = "PopFromBackoffQ" // ForceActivate is the event when a pod is moved from unschedulablePods/backoffQ // to activeQ. Usually it's triggered by plugin implementations. ForceActivate = "ForceActivate" diff --git a/test/integration/scheduler/eventhandler/eventhandler_test.go b/test/integration/scheduler/eventhandler/eventhandler_test.go index 7f183c687b9..cae1d6996d9 100644 --- a/test/integration/scheduler/eventhandler/eventhandler_test.go +++ b/test/integration/scheduler/eventhandler/eventhandler_test.go @@ -212,6 +212,8 @@ func TestUpdateNominatedNodeName(t *testing.T) { for _, qHintEnabled := range []bool{false, true} { t.Run(fmt.Sprintf("%s, with queuehint(%v)", tt.name, qHintEnabled), func(t *testing.T) { featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.SchedulerQueueingHints, qHintEnabled) + // Set the SchedulerPopFromBackoffQ feature to false, because when it's enabled, we can't be sure the pod won't be popped from the backoffQ. + featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.SchedulerPopFromBackoffQ, false) testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 0, true, scheduler.WithClock(fakeClock), diff --git a/test/integration/scheduler/plugins/plugins_test.go b/test/integration/scheduler/plugins/plugins_test.go index d359b513d40..f06ded973ec 100644 --- a/test/integration/scheduler/plugins/plugins_test.go +++ b/test/integration/scheduler/plugins/plugins_test.go @@ -309,14 +309,18 @@ var _ framework.PreFilterPlugin = &PreFilterPlugin{} var _ framework.PostFilterPlugin = &PostFilterPlugin{} var _ framework.ScorePlugin = &ScorePlugin{} var _ framework.FilterPlugin = &FilterPlugin{} +var _ framework.EnqueueExtensions = &FilterPlugin{} var _ framework.ScorePlugin = &ScorePlugin{} var _ framework.ScorePlugin = &ScoreWithNormalizePlugin{} +var _ framework.EnqueueExtensions = &ScorePlugin{} var _ framework.ReservePlugin = &ReservePlugin{} var _ framework.PreScorePlugin = &PreScorePlugin{} var _ framework.PreBindPlugin = &PreBindPlugin{} +var _ framework.EnqueueExtensions = &PreBindPlugin{} var _ framework.BindPlugin = &BindPlugin{} var _ framework.PostBindPlugin = &PostBindPlugin{} var _ framework.PermitPlugin = &PermitPlugin{} +var _ framework.EnqueueExtensions = &PermitPlugin{} var _ framework.QueueSortPlugin = &QueueSortPlugin{} func (ep *QueueSortPlugin) Name() string { @@ -377,6 +381,10 @@ func (sp *ScorePlugin) ScoreExtensions() framework.ScoreExtensions { return nil } +func (sp *ScorePlugin) EventsToRegister(_ context.Context) ([]framework.ClusterEventWithHint, error) { + return nil, nil +} + // Name returns name of the score plugin. func (sp *ScoreWithNormalizePlugin) Name() string { return scoreWithNormalizePluginName @@ -427,6 +435,12 @@ func (fp *FilterPlugin) Filter(ctx context.Context, state *framework.CycleState, return nil } +func (fp *FilterPlugin) EventsToRegister(_ context.Context) ([]framework.ClusterEventWithHint, error) { + return []framework.ClusterEventWithHint{ + {Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Delete}}, + }, nil +} + // Name returns name of the plugin. func (rp *ReservePlugin) Name() string { return rp.name @@ -491,6 +505,10 @@ func (pp *PreBindPlugin) PreBind(ctx context.Context, state *framework.CycleStat return nil } +func (pp *PreBindPlugin) EventsToRegister(_ context.Context) ([]framework.ClusterEventWithHint, error) { + return nil, nil +} + const bindPluginAnnotation = "bindPluginName" func (bp *BindPlugin) Name() string { @@ -651,6 +669,10 @@ func (pp *PermitPlugin) rejectAllPods() { pp.fh.IterateOverWaitingPods(func(wp framework.WaitingPod) { wp.Reject(pp.name, "rejectAllPods") }) } +func (pp *PermitPlugin) EventsToRegister(_ context.Context) ([]framework.ClusterEventWithHint, error) { + return nil, nil +} + // TestPreFilterPlugin tests invocation of prefilter plugins. func TestPreFilterPlugin(t *testing.T) { testContext := testutils.InitTestAPIServer(t, "prefilter-plugin", nil) diff --git a/test/integration/scheduler/queueing/queue_test.go b/test/integration/scheduler/queueing/queue_test.go index fc6acc4f7f9..f12ecb0752f 100644 --- a/test/integration/scheduler/queueing/queue_test.go +++ b/test/integration/scheduler/queueing/queue_test.go @@ -581,3 +581,38 @@ func (p *fakePermitPlugin) EventsToRegister(_ context.Context) ([]framework.Clus {Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.UpdateNodeLabel}, QueueingHintFn: p.schedulingHint}, }, nil } + +func TestPopFromBackoffQWhenActiveQEmpty(t *testing.T) { + // Set initial backoff to 1000s to make sure pod won't go to the activeQ after being requeued. + testCtx := testutils.InitTestSchedulerWithNS(t, "pop-from-backoffq", scheduler.WithPodInitialBackoffSeconds(1000), scheduler.WithPodMaxBackoffSeconds(1000)) + cs, ns, ctx := testCtx.ClientSet, testCtx.NS.Name, testCtx.Ctx + + // Create node, so we can schedule pods. + node := st.MakeNode().Name("node").Obj() + if _, err := cs.CoreV1().Nodes().Create(ctx, node, metav1.CreateOptions{}); err != nil { + t.Fatal("Failed to create node") + } + + // Create a pod that will be unschedulable. + pod := st.MakePod().Namespace(ns).Name("pod").NodeAffinityIn("foo", []string{"bar"}, st.NodeSelectorTypeMatchExpressions).Container(imageutils.GetPauseImageName()).Obj() + if _, err := cs.CoreV1().Pods(ns).Create(ctx, pod, metav1.CreateOptions{}); err != nil { + t.Fatalf("Failed to create pod: %v", err) + } + + err := wait.PollUntilContextTimeout(ctx, 200*time.Millisecond, wait.ForeverTestTimeout, false, testutils.PodUnschedulable(cs, ns, pod.Name)) + if err != nil { + t.Fatalf("Expected pod to be unschedulable: %v", err) + } + + // Create node with label to make the pod schedulable. + node2 := st.MakeNode().Name("node-schedulable").Label("foo", "bar").Obj() + if _, err := cs.CoreV1().Nodes().Create(ctx, node2, metav1.CreateOptions{}); err != nil { + t.Fatal("Failed to create node-schedulable") + } + + // Pod should be scheduled, even if it was in the backoffQ, because PopFromBackoffQ feature is enabled. + err = wait.PollUntilContextTimeout(ctx, 200*time.Millisecond, wait.ForeverTestTimeout, false, testutils.PodScheduled(cs, ns, pod.Name)) + if err != nil { + t.Fatalf("Expected pod to be scheduled: %v", err) + } +}