From 0f24b9ff4518207beb9926c3dff7693b54a394ac Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maciej=20Skocze=C5=84?= Date: Tue, 18 Feb 2025 08:23:37 +0000 Subject: [PATCH] Split backoffQ into backoffQ and errorBackoffQ in scheduler --- pkg/scheduler/backend/queue/backoff_queue.go | 238 ++++++++++++++++++ .../backend/queue/backoff_queue_test.go | 170 +++++++++++++ .../backend/queue/scheduling_queue.go | 114 ++------- .../backend/queue/scheduling_queue_test.go | 159 ++++-------- 4 files changed, 490 insertions(+), 191 deletions(-) create mode 100644 pkg/scheduler/backend/queue/backoff_queue.go create mode 100644 pkg/scheduler/backend/queue/backoff_queue_test.go diff --git a/pkg/scheduler/backend/queue/backoff_queue.go b/pkg/scheduler/backend/queue/backoff_queue.go new file mode 100644 index 00000000000..2af772a3664 --- /dev/null +++ b/pkg/scheduler/backend/queue/backoff_queue.go @@ -0,0 +1,238 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package queue + +import ( + "time" + + v1 "k8s.io/api/core/v1" + "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/scheduler/backend/heap" + "k8s.io/kubernetes/pkg/scheduler/framework" + "k8s.io/kubernetes/pkg/scheduler/metrics" + "k8s.io/utils/clock" +) + +// backoffQueuer is a wrapper for backoffQ related operations. +type backoffQueuer interface { + // isPodBackingoff returns true if a pod is still waiting for its backoff timer. + // If this returns true, the pod should not be re-tried. + isPodBackingoff(podInfo *framework.QueuedPodInfo) bool + // getBackoffTime returns the time that podInfo completes backoff + getBackoffTime(podInfo *framework.QueuedPodInfo) time.Time + // popEachBackoffCompleted run fn for all pods from podBackoffQ and podErrorBackoffQ that completed backoff while popping them. + popEachBackoffCompleted(logger klog.Logger, fn func(pInfo *framework.QueuedPodInfo)) + + // podInitialBackoffDuration returns initial backoff duration that pod can get. + podInitialBackoffDuration() time.Duration + // podMaxBackoffDuration returns maximum backoff duration that pod can get. + podMaxBackoffDuration() time.Duration + + // add adds the pInfo to backoffQueue. + // It also ensures that pInfo is not in both queues. + add(logger klog.Logger, pInfo *framework.QueuedPodInfo) + // update updates the pod in backoffQueue if oldPodInfo is already in the queue. + // It returns new pod info if updated, nil otherwise. + update(newPod *v1.Pod, oldPodInfo *framework.QueuedPodInfo) *framework.QueuedPodInfo + // delete deletes the pInfo from backoffQueue. + delete(pInfo *framework.QueuedPodInfo) + // get returns the pInfo matching given pInfoLookup, if exists. + get(pInfoLookup *framework.QueuedPodInfo) (*framework.QueuedPodInfo, bool) + // has inform if pInfo exists in the queue. + has(pInfo *framework.QueuedPodInfo) bool + // list returns all pods that are in the queue. + list() []*framework.QueuedPodInfo + // len returns length of the queue. + len() int +} + +// backoffQueue implements backoffQueuer and wraps two queues inside, +// providing seamless access as if it were one queue. +type backoffQueue struct { + clock clock.Clock + + // podBackoffQ is a heap ordered by backoff expiry. Pods which have completed backoff + // are popped from this heap before the scheduler looks at activeQ + podBackoffQ *heap.Heap[*framework.QueuedPodInfo] + // podErrorBackoffQ is a heap ordered by error backoff expiry. Pods which have completed backoff + // are popped from this heap before the scheduler looks at activeQ + podErrorBackoffQ *heap.Heap[*framework.QueuedPodInfo] + + podInitialBackoff time.Duration + podMaxBackoff time.Duration +} + +func newBackoffQueue(clock clock.Clock, podInitialBackoffDuration time.Duration, podMaxBackoffDuration time.Duration) *backoffQueue { + bq := &backoffQueue{ + clock: clock, + podInitialBackoff: podInitialBackoffDuration, + podMaxBackoff: podMaxBackoffDuration, + } + bq.podBackoffQ = heap.NewWithRecorder(podInfoKeyFunc, bq.lessBackoffCompleted, metrics.NewBackoffPodsRecorder()) + bq.podErrorBackoffQ = heap.NewWithRecorder(podInfoKeyFunc, bq.lessBackoffCompleted, metrics.NewBackoffPodsRecorder()) + + return bq +} + +// podInitialBackoffDuration returns initial backoff duration that pod can get. +func (bq *backoffQueue) podInitialBackoffDuration() time.Duration { + return bq.podInitialBackoff +} + +// podMaxBackoffDuration returns maximum backoff duration that pod can get. +func (bq *backoffQueue) podMaxBackoffDuration() time.Duration { + return bq.podMaxBackoff +} + +// lessBackoffCompleted is a less function of podBackoffQ and podErrorBackoffQ. +func (bq *backoffQueue) lessBackoffCompleted(pInfo1, pInfo2 *framework.QueuedPodInfo) bool { + bo1 := bq.getBackoffTime(pInfo1) + bo2 := bq.getBackoffTime(pInfo2) + return bo1.Before(bo2) +} + +// isPodBackingoff returns true if a pod is still waiting for its backoff timer. +// If this returns true, the pod should not be re-tried. +func (bq *backoffQueue) isPodBackingoff(podInfo *framework.QueuedPodInfo) bool { + boTime := bq.getBackoffTime(podInfo) + return boTime.After(bq.clock.Now()) +} + +// getBackoffTime returns the time that podInfo completes backoff +func (bq *backoffQueue) getBackoffTime(podInfo *framework.QueuedPodInfo) time.Time { + duration := bq.calculateBackoffDuration(podInfo) + backoffTime := podInfo.Timestamp.Add(duration) + return backoffTime +} + +// calculateBackoffDuration is a helper function for calculating the backoffDuration +// based on the number of attempts the pod has made. +func (bq *backoffQueue) calculateBackoffDuration(podInfo *framework.QueuedPodInfo) time.Duration { + if podInfo.Attempts == 0 { + // When the Pod hasn't experienced any scheduling attempts, + // they aren't obliged to get a backoff penalty at all. + return 0 + } + + duration := bq.podInitialBackoff + for i := 1; i < podInfo.Attempts; i++ { + // Use subtraction instead of addition or multiplication to avoid overflow. + if duration > bq.podMaxBackoff-duration { + return bq.podMaxBackoff + } + duration += duration + } + return duration +} + +func (bq *backoffQueue) popEachBackoffCompletedWithQueue(logger klog.Logger, fn func(pInfo *framework.QueuedPodInfo), queue *heap.Heap[*framework.QueuedPodInfo]) { + for { + pInfo, ok := queue.Peek() + if !ok || pInfo == nil { + break + } + pod := pInfo.Pod + if bq.isPodBackingoff(pInfo) { + break + } + _, err := queue.Pop() + if err != nil { + logger.Error(err, "Unable to pop pod from backoff queue despite backoff completion", "pod", klog.KObj(pod)) + break + } + if fn != nil { + fn(pInfo) + } + } +} + +// popEachBackoffCompleted run fn for all pods from podBackoffQ and podErrorBackoffQ that completed backoff while popping them. +func (bq *backoffQueue) popEachBackoffCompleted(logger klog.Logger, fn func(pInfo *framework.QueuedPodInfo)) { + // Ensure both queues are called + bq.popEachBackoffCompletedWithQueue(logger, fn, bq.podBackoffQ) + bq.popEachBackoffCompletedWithQueue(logger, fn, bq.podErrorBackoffQ) +} + +// add adds the pInfo to backoffQueue. +// It also ensures that pInfo is not in both queues. +func (bq *backoffQueue) add(logger klog.Logger, pInfo *framework.QueuedPodInfo) { + // If pod has empty both unschedulable plugins and pending plugins, + // it means that it failed because of error and should be moved to podErrorBackoffQ. + if pInfo.UnschedulablePlugins.Len() == 0 && pInfo.PendingPlugins.Len() == 0 { + bq.podErrorBackoffQ.AddOrUpdate(pInfo) + // Ensure the pod is not in the podBackoffQ and report the error if it happens. + err := bq.podBackoffQ.Delete(pInfo) + if err == nil { + logger.Error(nil, "BackoffQueue add() was called with a pod that was already in the podBackoffQ", "pod", klog.KObj(pInfo.Pod)) + } + return + } + bq.podBackoffQ.AddOrUpdate(pInfo) + // Ensure the pod is not in the podErrorBackoffQ and report the error if it happens. + err := bq.podErrorBackoffQ.Delete(pInfo) + if err == nil { + logger.Error(nil, "BackoffQueue add() was called with a pod that was already in the podErrorBackoffQ", "pod", klog.KObj(pInfo.Pod)) + } +} + +// update updates the pod in backoffQueue if oldPodInfo is already in the queue. +// It returns new pod info if updated, nil otherwise. +func (bq *backoffQueue) update(newPod *v1.Pod, oldPodInfo *framework.QueuedPodInfo) *framework.QueuedPodInfo { + // If the pod is in the backoff queue, update it there. + if pInfo, exists := bq.podBackoffQ.Get(oldPodInfo); exists { + _ = pInfo.Update(newPod) + bq.podBackoffQ.AddOrUpdate(pInfo) + return pInfo + } + // If the pod is in the error backoff queue, update it there. + if pInfo, exists := bq.podErrorBackoffQ.Get(oldPodInfo); exists { + _ = pInfo.Update(newPod) + bq.podErrorBackoffQ.AddOrUpdate(pInfo) + return pInfo + } + return nil +} + +// delete deletes the pInfo from backoffQueue. +func (bq *backoffQueue) delete(pInfo *framework.QueuedPodInfo) { + _ = bq.podBackoffQ.Delete(pInfo) + _ = bq.podErrorBackoffQ.Delete(pInfo) +} + +// get returns the pInfo matching given pInfoLookup, if exists. +func (bq *backoffQueue) get(pInfoLookup *framework.QueuedPodInfo) (*framework.QueuedPodInfo, bool) { + pInfo, exists := bq.podBackoffQ.Get(pInfoLookup) + if exists { + return pInfo, true + } + return bq.podErrorBackoffQ.Get(pInfoLookup) +} + +// has inform if pInfo exists in the queue. +func (bq *backoffQueue) has(pInfo *framework.QueuedPodInfo) bool { + return bq.podBackoffQ.Has(pInfo) || bq.podErrorBackoffQ.Has(pInfo) +} + +// list returns all pods that are in the queue. +func (bq *backoffQueue) list() []*framework.QueuedPodInfo { + return append(bq.podBackoffQ.List(), bq.podErrorBackoffQ.List()...) +} + +// len returns length of the queue. +func (bq *backoffQueue) len() int { + return bq.podBackoffQ.Len() + bq.podErrorBackoffQ.Len() +} diff --git a/pkg/scheduler/backend/queue/backoff_queue_test.go b/pkg/scheduler/backend/queue/backoff_queue_test.go new file mode 100644 index 00000000000..d0468322c87 --- /dev/null +++ b/pkg/scheduler/backend/queue/backoff_queue_test.go @@ -0,0 +1,170 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package queue + +import ( + "math" + "testing" + "time" + + "github.com/google/go-cmp/cmp" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/klog/v2/ktesting" + "k8s.io/kubernetes/pkg/scheduler/framework" + st "k8s.io/kubernetes/pkg/scheduler/testing" + "k8s.io/utils/clock" + testingclock "k8s.io/utils/clock/testing" +) + +func TestBackoffQueue_calculateBackoffDuration(t *testing.T) { + tests := []struct { + name string + initialBackoffDuration time.Duration + maxBackoffDuration time.Duration + podInfo *framework.QueuedPodInfo + want time.Duration + }{ + { + name: "no backoff", + initialBackoffDuration: 1 * time.Nanosecond, + maxBackoffDuration: 32 * time.Nanosecond, + podInfo: &framework.QueuedPodInfo{Attempts: 0}, + want: 0, + }, + { + name: "normal", + initialBackoffDuration: 1 * time.Nanosecond, + maxBackoffDuration: 32 * time.Nanosecond, + podInfo: &framework.QueuedPodInfo{Attempts: 16}, + want: 32 * time.Nanosecond, + }, + { + name: "overflow_32bit", + initialBackoffDuration: 1 * time.Nanosecond, + maxBackoffDuration: math.MaxInt32 * time.Nanosecond, + podInfo: &framework.QueuedPodInfo{Attempts: 32}, + want: math.MaxInt32 * time.Nanosecond, + }, + { + name: "overflow_64bit", + initialBackoffDuration: 1 * time.Nanosecond, + maxBackoffDuration: math.MaxInt64 * time.Nanosecond, + podInfo: &framework.QueuedPodInfo{Attempts: 64}, + want: math.MaxInt64 * time.Nanosecond, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + bq := newBackoffQueue(clock.RealClock{}, tt.initialBackoffDuration, tt.maxBackoffDuration) + if got := bq.calculateBackoffDuration(tt.podInfo); got != tt.want { + t.Errorf("backoffQueue.calculateBackoffDuration() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestBackoffQueue_popEachBackoffCompleted(t *testing.T) { + fakeClock := testingclock.NewFakeClock(time.Now()) + podInfos := map[string]*framework.QueuedPodInfo{ + "pod0": { + PodInfo: &framework.PodInfo{ + Pod: st.MakePod().Name("pod0").Obj(), + }, + Timestamp: fakeClock.Now().Add(-time.Second), + Attempts: 1, + UnschedulablePlugins: sets.New("plugin"), + }, + "pod1": { + PodInfo: &framework.PodInfo{ + Pod: st.MakePod().Name("pod1").Obj(), + }, + Timestamp: fakeClock.Now().Add(time.Second), + Attempts: 1, + UnschedulablePlugins: sets.New("plugin"), + }, + "pod2": { + PodInfo: &framework.PodInfo{ + Pod: st.MakePod().Name("pod2").Obj(), + }, + Timestamp: fakeClock.Now().Add(-time.Second), + Attempts: 1, + }, + "pod3": { + PodInfo: &framework.PodInfo{ + Pod: st.MakePod().Name("pod3").Obj(), + }, + Timestamp: fakeClock.Now().Add(time.Second), + Attempts: 1, + }, + } + tests := []struct { + name string + podsInBackoff []string + wantPods []string + }{ + { + name: "Both queues empty, no pods moved to activeQ", + podsInBackoff: []string{}, + wantPods: nil, + }, + { + name: "Pods only in backoffQ, some pods moved to activeQ", + podsInBackoff: []string{"pod0", "pod1"}, + wantPods: []string{"pod0"}, + }, + { + name: "Pods only in errorBackoffQ, some pods moved to activeQ", + podsInBackoff: []string{"pod2", "pod3"}, + wantPods: []string{"pod2"}, + }, + { + name: "Pods in both queues, some pods moved to activeQ", + podsInBackoff: []string{"pod0", "pod1", "pod2", "pod3"}, + wantPods: []string{"pod0", "pod2"}, + }, + { + name: "Pods in both queues, all pods moved to activeQ", + podsInBackoff: []string{"pod0", "pod2"}, + wantPods: []string{"pod0", "pod2"}, + }, + { + name: "Pods in both queues, no pods moved to activeQ", + podsInBackoff: []string{"pod1", "pod3"}, + wantPods: nil, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) + bq := newBackoffQueue(fakeClock, DefaultPodInitialBackoffDuration, DefaultPodMaxBackoffDuration) + for _, podName := range tt.podsInBackoff { + bq.add(logger, podInfos[podName]) + } + var gotPods []string + bq.popEachBackoffCompleted(logger, func(pInfo *framework.QueuedPodInfo) { + gotPods = append(gotPods, pInfo.Pod.Name) + }) + if diff := cmp.Diff(tt.wantPods, gotPods); diff != "" { + t.Errorf("Unexpected pods moved (-want, +got):\n%s", diff) + } + podsToStayInBackoff := len(tt.podsInBackoff) - len(tt.wantPods) + if bq.len() != podsToStayInBackoff { + t.Errorf("Expected %v pods to stay in backoffQ, but got: %v", podsToStayInBackoff, bq.len()) + } + }) + } +} diff --git a/pkg/scheduler/backend/queue/scheduling_queue.go b/pkg/scheduler/backend/queue/scheduling_queue.go index 06f25037bb7..99d35698a87 100644 --- a/pkg/scheduler/backend/queue/scheduling_queue.go +++ b/pkg/scheduler/backend/queue/scheduling_queue.go @@ -162,17 +162,11 @@ type PriorityQueue struct { // Correct locking order is: lock > activeQueue.lock > nominator.nLock. lock sync.RWMutex - // pod initial backoff duration. - podInitialBackoffDuration time.Duration - // pod maximum backoff duration. - podMaxBackoffDuration time.Duration // the maximum time a pod can stay in the unschedulablePods. podMaxInUnschedulablePodsDuration time.Duration - activeQ activeQueuer - // podBackoffQ is a heap ordered by backoff expiry. Pods which have completed backoff - // are popped from this heap before the scheduler looks at activeQ - podBackoffQ *heap.Heap[*framework.QueuedPodInfo] + activeQ activeQueuer + backoffQ backoffQueuer // unschedulablePods holds pods that have been tried and determined unschedulable. unschedulablePods *UnschedulablePods // moveRequestCycle caches the sequence number of scheduling cycle when we @@ -335,10 +329,9 @@ func NewPriorityQueue( pq := &PriorityQueue{ clock: options.clock, stop: make(chan struct{}), - podInitialBackoffDuration: options.podInitialBackoffDuration, - podMaxBackoffDuration: options.podMaxBackoffDuration, podMaxInUnschedulablePodsDuration: options.podMaxInUnschedulablePodsDuration, activeQ: newActiveQueue(heap.NewWithRecorder(podInfoKeyFunc, heap.LessFunc[*framework.QueuedPodInfo](lessFn), metrics.NewActivePodsRecorder()), isSchedulingQueueHintEnabled, options.metricsRecorder), + backoffQ: newBackoffQueue(options.clock, options.podInitialBackoffDuration, options.podMaxBackoffDuration), unschedulablePods: newUnschedulablePods(metrics.NewUnschedulablePodsRecorder(), metrics.NewGatedPodsRecorder()), preEnqueuePluginMap: options.preEnqueuePluginMap, queueingHintMap: options.queueingHintMap, @@ -347,14 +340,13 @@ func NewPriorityQueue( moveRequestCycle: -1, isSchedulingQueueHintEnabled: isSchedulingQueueHintEnabled, } - pq.podBackoffQ = heap.NewWithRecorder(podInfoKeyFunc, pq.podsCompareBackoffCompleted, metrics.NewBackoffPodsRecorder()) pq.nsLister = informerFactory.Core().V1().Namespaces().Lister() pq.nominator = newPodNominator(options.podLister) return pq } -// Run starts the goroutine to pump from podBackoffQ to activeQ +// Run starts the goroutine to pump from backoffQ to activeQ func (p *PriorityQueue) Run(logger klog.Logger) { go wait.Until(func() { p.flushBackoffQCompleted(logger) @@ -568,7 +560,7 @@ func (p *PriorityQueue) moveToActiveQ(logger klog.Logger, pInfo *framework.Queue if unlockedActiveQ.Has(pInfo) { return } - if p.podBackoffQ.Has(pInfo) { + if p.backoffQ.has(pInfo) { return } p.unschedulablePods.addOrUpdate(pInfo) @@ -583,7 +575,7 @@ func (p *PriorityQueue) moveToActiveQ(logger klog.Logger, pInfo *framework.Queue added = true p.unschedulablePods.delete(pInfo.Pod, gatedBefore) - _ = p.podBackoffQ.Delete(pInfo) // Don't need to react when pInfo is not found. + p.backoffQ.delete(pInfo) logger.V(5).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pInfo.Pod), "event", event, "queue", activeQ) metrics.SchedulerQueueIncomingPods.WithLabelValues("active", event).Inc() if event == framework.EventUnscheduledPodAdd.Label() || event == framework.EventUnscheduledPodUpdate.Label() { @@ -641,7 +633,7 @@ func (p *PriorityQueue) activate(logger klog.Logger, pod *v1.Pod) bool { // If the pod doesn't belong to unschedulablePods or backoffQ, don't activate it. // The pod can be already in activeQ. var exists bool - pInfo, exists = p.podBackoffQ.Get(newQueuedPodInfoForLookup(pod)) + pInfo, exists = p.backoffQ.get(newQueuedPodInfoForLookup(pod)) if !exists { return false } @@ -656,13 +648,6 @@ func (p *PriorityQueue) activate(logger klog.Logger, pod *v1.Pod) bool { return p.moveToActiveQ(logger, pInfo, framework.ForceActivate) } -// isPodBackingoff returns true if a pod is still waiting for its backoff timer. -// If this returns true, the pod should not be re-tried. -func (p *PriorityQueue) isPodBackingoff(podInfo *framework.QueuedPodInfo) bool { - boTime := p.getBackoffTime(podInfo) - return boTime.After(p.clock.Now()) -} - // SchedulingCycle returns current scheduling cycle. func (p *PriorityQueue) SchedulingCycle() int64 { return p.activeQ.schedulingCycle() @@ -712,7 +697,7 @@ func (p *PriorityQueue) determineSchedulingHintForInFlightPod(logger klog.Logger // addUnschedulableIfNotPresentWithoutQueueingHint inserts a pod that cannot be scheduled into // the queue, unless it is already in the queue. Normally, PriorityQueue puts // unschedulable pods in `unschedulablePods`. But if there has been a recent move -// request, then the pod is put in `podBackoffQ`. +// request, then the pod is put in `backoffQ`. // TODO: This function is called only when p.isSchedulingQueueHintEnabled is false, // and this will be removed after SchedulingQueueHint goes to stable and the feature gate is removed. func (p *PriorityQueue) addUnschedulableWithoutQueueingHint(logger klog.Logger, pInfo *framework.QueuedPodInfo, podSchedulingCycle int64) error { @@ -736,7 +721,7 @@ func (p *PriorityQueue) addUnschedulableWithoutQueueingHint(logger klog.Logger, // - No unschedulable plugins are associated with this Pod, // meaning something unusual (a temporal failure on kube-apiserver, etc) happened and this Pod gets moved back to the queue. // In this case, we should retry scheduling it because this Pod may not be retried until the next flush. - p.podBackoffQ.AddOrUpdate(pInfo) + p.backoffQ.add(logger, pInfo) logger.V(5).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pod), "event", framework.ScheduleAttemptFailure, "queue", backoffQ) metrics.SchedulerQueueIncomingPods.WithLabelValues("backoff", framework.ScheduleAttemptFailure).Inc() } else { @@ -751,7 +736,7 @@ func (p *PriorityQueue) addUnschedulableWithoutQueueingHint(logger klog.Logger, // AddUnschedulableIfNotPresent inserts a pod that cannot be scheduled into // the queue, unless it is already in the queue. Normally, PriorityQueue puts // unschedulable pods in `unschedulablePods`. But if there has been a recent move -// request, then the pod is put in `podBackoffQ`. +// request, then the pod is put in `backoffQ`. func (p *PriorityQueue) AddUnschedulableIfNotPresent(logger klog.Logger, pInfo *framework.QueuedPodInfo, podSchedulingCycle int64) error { p.lock.Lock() defer p.lock.Unlock() @@ -767,7 +752,7 @@ func (p *PriorityQueue) AddUnschedulableIfNotPresent(logger klog.Logger, pInfo * if p.activeQ.has(pInfo) { return fmt.Errorf("Pod %v is already present in the active queue", klog.KObj(pod)) } - if p.podBackoffQ.Has(pInfo) { + if p.backoffQ.has(pInfo) { return fmt.Errorf("Pod %v is already present in the backoff queue", klog.KObj(pod)) } @@ -805,25 +790,11 @@ func (p *PriorityQueue) flushBackoffQCompleted(logger klog.Logger) { p.lock.Lock() defer p.lock.Unlock() activated := false - for { - pInfo, ok := p.podBackoffQ.Peek() - if !ok || pInfo == nil { - break - } - pod := pInfo.Pod - if p.isPodBackingoff(pInfo) { - break - } - _, err := p.podBackoffQ.Pop() - if err != nil { - logger.Error(err, "Unable to pop pod from backoff queue despite backoff completion", "pod", klog.KObj(pod)) - break - } + p.backoffQ.popEachBackoffCompleted(logger, func(pInfo *framework.QueuedPodInfo) { if added := p.moveToActiveQ(logger, pInfo, framework.BackoffComplete); added { activated = true } - } - + }) if activated { p.activeQ.broadcast() } @@ -928,10 +899,8 @@ func (p *PriorityQueue) Update(logger klog.Logger, oldPod, newPod *v1.Pod) { } // If the pod is in the backoff queue, update it there. - if pInfo, exists := p.podBackoffQ.Get(oldPodInfo); exists { - _ = pInfo.Update(newPod) + if pInfo := p.backoffQ.update(newPod, oldPodInfo); pInfo != nil { p.UpdateNominatedPod(logger, oldPod, pInfo.PodInfo) - p.podBackoffQ.AddOrUpdate(pInfo) return } } @@ -961,8 +930,10 @@ func (p *PriorityQueue) Update(logger klog.Logger, oldPod, newPod *v1.Pod) { return } if isPodUpdated(oldPod, newPod) { - if p.isPodBackingoff(pInfo) { - p.podBackoffQ.AddOrUpdate(pInfo) + // Pod might have completed its backoff time while being in unschedulablePods, + // so we should check isPodBackingoff before moving the pod to backoffQ. + if p.backoffQ.isPodBackingoff(pInfo) { + p.backoffQ.add(logger, pInfo) p.unschedulablePods.delete(pInfo.Pod, gated) logger.V(5).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pInfo.Pod), "event", framework.EventUnscheduledPodUpdate.Label(), "queue", backoffQ) return @@ -994,7 +965,7 @@ func (p *PriorityQueue) Delete(pod *v1.Pod) { pInfo := newQueuedPodInfoForLookup(pod) if err := p.activeQ.delete(pInfo); err != nil { // The item was probably not found in the activeQ. - p.podBackoffQ.Delete(pInfo) + p.backoffQ.delete(pInfo) if pInfo = p.unschedulablePods.get(pod); pInfo != nil { p.unschedulablePods.delete(pod, pInfo.Gated) } @@ -1070,8 +1041,10 @@ func (p *PriorityQueue) requeuePodViaQueueingHint(logger klog.Logger, pInfo *fra return unschedulablePods } - if strategy == queueAfterBackoff && p.isPodBackingoff(pInfo) { - p.podBackoffQ.AddOrUpdate(pInfo) + // Pod might have completed its backoff time while being in unschedulablePods, + // so we should check isPodBackingoff before moving the pod to backoffQ. + if strategy == queueAfterBackoff && p.backoffQ.isPodBackingoff(pInfo) { + p.backoffQ.add(logger, pInfo) metrics.SchedulerQueueIncomingPods.WithLabelValues("backoff", event).Inc() return backoffQ } @@ -1197,7 +1170,7 @@ func (p *PriorityQueue) GetPod(name, namespace string) (pInfo *framework.QueuedP }, }, } - if pInfo, ok = p.podBackoffQ.Get(pInfoLookup); ok { + if pInfo, ok = p.backoffQ.get(pInfoLookup); ok { return pInfo, true } if pInfo = p.unschedulablePods.get(pInfoLookup.Pod); pInfo != nil { @@ -1218,13 +1191,13 @@ func (p *PriorityQueue) PendingPods() ([]*v1.Pod, string) { defer p.lock.RUnlock() result := p.activeQ.list() activeQLen := len(result) - for _, pInfo := range p.podBackoffQ.List() { + for _, pInfo := range p.backoffQ.list() { result = append(result, pInfo.Pod) } for _, pInfo := range p.unschedulablePods.podInfoMap { result = append(result, pInfo.Pod) } - return result, fmt.Sprintf(pendingPodsSummary, activeQLen, p.podBackoffQ.Len(), len(p.unschedulablePods.podInfoMap)) + return result, fmt.Sprintf(pendingPodsSummary, activeQLen, p.backoffQ.len(), len(p.unschedulablePods.podInfoMap)) } // Note: this function assumes the caller locks both p.lock.RLock and p.activeQ.getLock().RLock. @@ -1242,7 +1215,7 @@ func (p *PriorityQueue) nominatedPodToInfo(np podRef, unlockedActiveQ unlockedAc return queuedPodInfo.PodInfo } - queuedPodInfo, exists = p.podBackoffQ.Get(pInfoLookup) + queuedPodInfo, exists = p.backoffQ.get(pInfoLookup) if exists { return queuedPodInfo.PodInfo } @@ -1276,12 +1249,6 @@ func (p *PriorityQueue) NominatedPodsForNode(nodeName string) []*framework.PodIn return pods } -func (p *PriorityQueue) podsCompareBackoffCompleted(pInfo1, pInfo2 *framework.QueuedPodInfo) bool { - bo1 := p.getBackoffTime(pInfo1) - bo2 := p.getBackoffTime(pInfo2) - return bo1.Before(bo2) -} - // newQueuedPodInfo builds a QueuedPodInfo object. func (p *PriorityQueue) newQueuedPodInfo(pod *v1.Pod, plugins ...string) *framework.QueuedPodInfo { now := p.clock.Now() @@ -1296,33 +1263,6 @@ func (p *PriorityQueue) newQueuedPodInfo(pod *v1.Pod, plugins ...string) *framew } } -// getBackoffTime returns the time that podInfo completes backoff -func (p *PriorityQueue) getBackoffTime(podInfo *framework.QueuedPodInfo) time.Time { - duration := p.calculateBackoffDuration(podInfo) - backoffTime := podInfo.Timestamp.Add(duration) - return backoffTime -} - -// calculateBackoffDuration is a helper function for calculating the backoffDuration -// based on the number of attempts the pod has made. -func (p *PriorityQueue) calculateBackoffDuration(podInfo *framework.QueuedPodInfo) time.Duration { - if podInfo.Attempts == 0 { - // When the Pod hasn't experienced any scheduling attempts, - // they aren't obliged to get a backoff penalty at all. - return 0 - } - - duration := p.podInitialBackoffDuration - for i := 1; i < podInfo.Attempts; i++ { - // Use subtraction instead of addition or multiplication to avoid overflow. - if duration > p.podMaxBackoffDuration-duration { - return p.podMaxBackoffDuration - } - duration += duration - } - return duration -} - // UnschedulablePods holds pods that cannot be scheduled. This data structure // is used to implement unschedulablePods. type UnschedulablePods struct { diff --git a/pkg/scheduler/backend/queue/scheduling_queue_test.go b/pkg/scheduler/backend/queue/scheduling_queue_test.go index cd7cbc027ff..89fecf601ef 100644 --- a/pkg/scheduler/backend/queue/scheduling_queue_test.go +++ b/pkg/scheduler/backend/queue/scheduling_queue_test.go @@ -19,7 +19,6 @@ package queue import ( "context" "fmt" - "math" "strings" "sync" "testing" @@ -836,7 +835,7 @@ func Test_InFlightPods(t *testing.T) { } if test.wantBackoffQPodNames != nil { - podInfos := q.podBackoffQ.List() + podInfos := q.backoffQ.list() var podNames []string for _, pInfo := range podInfos { podNames = append(podNames, pInfo.Pod.Name) @@ -1006,8 +1005,8 @@ func TestPriorityQueue_AddUnschedulableIfNotPresent_Backoff(t *testing.T) { // Since there was a move request at the same cycle as "oldCycle", these pods // should be in the backoff queue. for i := 1; i < totalNum; i++ { - if !q.podBackoffQ.Has(newQueuedPodInfoForLookup(&expectedPods[i])) { - t.Errorf("Expected %v to be added to podBackoffQ.", expectedPods[i].Name) + if !q.backoffQ.has(newQueuedPodInfoForLookup(&expectedPods[i])) { + t.Errorf("Expected %v to be added to backoffQ.", expectedPods[i].Name) } } } @@ -1099,7 +1098,7 @@ func TestPriorityQueue_Update(t *testing.T) { wantQ: backoffQ, prepareFunc: func(t *testing.T, logger klog.Logger, q *PriorityQueue) (oldPod, newPod *v1.Pod) { podInfo := q.newQueuedPodInfo(medPriorityPodInfo.Pod) - q.podBackoffQ.AddOrUpdate(podInfo) + q.backoffQ.add(logger, podInfo) return podInfo.Pod, podInfo.Pod }, schedulingHintsEnablement: []bool{false, true}, @@ -1124,7 +1123,7 @@ func TestPriorityQueue_Update(t *testing.T) { updatedPod.Annotations["foo"] = "test1" // Move clock by podInitialBackoffDuration, so that pods in the unschedulablePods would pass the backing off, // and the pods will be moved into activeQ. - c.Step(q.podInitialBackoffDuration) + c.Step(q.backoffQ.podInitialBackoffDuration()) return medPriorityPodInfo.Pod, updatedPod }, schedulingHintsEnablement: []bool{false, true}, @@ -1174,7 +1173,7 @@ func TestPriorityQueue_Update(t *testing.T) { var pInfo *framework.QueuedPodInfo // validate expected queue - if pInfoFromBackoff, exists := q.podBackoffQ.Get(newQueuedPodInfoForLookup(newPod)); exists { + if pInfoFromBackoff, exists := q.backoffQ.get(newQueuedPodInfoForLookup(newPod)); exists { if tt.wantQ != backoffQ { t.Errorf("expected pod %s not to be queued to backoffQ, but it was", newPod.Name) } @@ -1260,7 +1259,7 @@ func TestPriorityQueue_UpdateWhenInflight(t *testing.T) { t.Fatalf("unexpected error from AddUnschedulableIfNotPresent: %v", err) } - pInfo, exists := q.podBackoffQ.Get(newQueuedPodInfoForLookup(updatedPod)) + pInfo, exists := q.backoffQ.get(newQueuedPodInfoForLookup(updatedPod)) if !exists { t.Fatalf("expected pod %s to be queued to backoffQ, but it wasn't.", updatedPod.Name) } @@ -1298,7 +1297,7 @@ func TestPriorityQueue_Activate(t *testing.T) { tests := []struct { name string qPodInfoInUnschedulablePods []*framework.QueuedPodInfo - qPodInfoInPodBackoffQ []*framework.QueuedPodInfo + qPodInfoInBackoffQ []*framework.QueuedPodInfo qPodInActiveQ []*v1.Pod qPodInfoToActivate *framework.QueuedPodInfo qPodInInFlightPod *v1.Pod @@ -1313,12 +1312,12 @@ func TestPriorityQueue_Activate(t *testing.T) { want: []*framework.QueuedPodInfo{{PodInfo: highPriNominatedPodInfo}}, // 1 already active }, { - name: "pod not in unschedulablePods/podBackoffQ", + name: "pod not in unschedulablePods/backoffQ", qPodInfoToActivate: &framework.QueuedPodInfo{PodInfo: highPriNominatedPodInfo}, want: []*framework.QueuedPodInfo{}, }, { - name: "[QHint] pod not in unschedulablePods/podBackoffQ but in-flight", + name: "[QHint] pod not in unschedulablePods/backoffQ but in-flight", qPodInfoToActivate: &framework.QueuedPodInfo{PodInfo: highPriNominatedPodInfo}, qPodInInFlightPod: highPriNominatedPodInfo.Pod, expectedInFlightEvent: &clusterEvent{oldObj: (*v1.Pod)(nil), newObj: highPriNominatedPodInfo.Pod, event: framework.EventForceActivate}, @@ -1326,7 +1325,7 @@ func TestPriorityQueue_Activate(t *testing.T) { qHintEnabled: true, }, { - name: "[QHint] pod not in unschedulablePods/podBackoffQ and not in-flight", + name: "[QHint] pod not in unschedulablePods/backoffQ and not in-flight", qPodInfoToActivate: &framework.QueuedPodInfo{PodInfo: highPriNominatedPodInfo}, qPodInInFlightPod: medPriorityPodInfo.Pod, // different pod is in-flight want: []*framework.QueuedPodInfo{}, @@ -1339,10 +1338,10 @@ func TestPriorityQueue_Activate(t *testing.T) { want: []*framework.QueuedPodInfo{{PodInfo: highPriNominatedPodInfo}}, }, { - name: "pod in backoffQ", - qPodInfoInPodBackoffQ: []*framework.QueuedPodInfo{{PodInfo: highPriNominatedPodInfo}}, - qPodInfoToActivate: &framework.QueuedPodInfo{PodInfo: highPriNominatedPodInfo}, - want: []*framework.QueuedPodInfo{{PodInfo: highPriNominatedPodInfo}}, + name: "pod in backoffQ", + qPodInfoInBackoffQ: []*framework.QueuedPodInfo{{PodInfo: highPriNominatedPodInfo}}, + qPodInfoToActivate: &framework.QueuedPodInfo{PodInfo: highPriNominatedPodInfo}, + want: []*framework.QueuedPodInfo{{PodInfo: highPriNominatedPodInfo}}, }, } @@ -1372,7 +1371,7 @@ func TestPriorityQueue_Activate(t *testing.T) { } } - // Prepare activeQ/unschedulablePods/podBackoffQ according to the table + // Prepare activeQ/unschedulablePods/backoffQ according to the table for _, qPod := range tt.qPodInActiveQ { q.Add(logger, qPod) } @@ -1381,8 +1380,8 @@ func TestPriorityQueue_Activate(t *testing.T) { q.unschedulablePods.addOrUpdate(qPodInfo) } - for _, qPodInfo := range tt.qPodInfoInPodBackoffQ { - q.podBackoffQ.AddOrUpdate(qPodInfo) + for _, qPodInfo := range tt.qPodInfoInBackoffQ { + q.backoffQ.add(logger, qPodInfo) } // Activate specific pod according to the table @@ -1722,7 +1721,7 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueueWithQueueingHint(t *testing. q.MoveAllToActiveOrBackoffQueue(logger, nodeAdd, nil, nil, nil) - if q.podBackoffQ.Len() == 0 && test.expectedQ == backoffQ { + if q.backoffQ.len() == 0 && test.expectedQ == backoffQ { t.Fatalf("expected pod to be queued to backoffQ, but it was not") } @@ -1811,15 +1810,14 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueue(t *testing.T) { } expectInFlightPods(t, q, medPriorityPodInfo.Pod.UID) // hpp2 won't be moved. - if q.podBackoffQ.Len() != 3 { - t.Fatalf("Expected 3 items to be in podBackoffQ, but got: %v", q.podBackoffQ.Len()) + if q.backoffQ.len() != 3 { + t.Fatalf("Expected 3 items to be in backoffQ, but got: %v", q.backoffQ.len()) } // pop out the pods in the backoffQ. // This doesn't make them in-flight pods. - for q.podBackoffQ.Len() != 0 { - q.podBackoffQ.Pop() - } + c.Step(q.backoffQ.podMaxBackoffDuration()) + q.backoffQ.popEachBackoffCompleted(logger, nil) expectInFlightPods(t, q, medPriorityPodInfo.Pod.UID) q.Add(logger, unschedulablePodInfo.Pod) @@ -1863,20 +1861,20 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueue(t *testing.T) { t.Errorf("Expected %v in the unschedulablePods", pod.Name) } } - if !q.podBackoffQ.Has(hpp1QueuedPodInfo) { - t.Errorf("Expected %v in the podBackoffQ", hpp1.Name) + if !q.backoffQ.has(hpp1QueuedPodInfo) { + t.Errorf("Expected %v in the backoffQ", hpp1.Name) } // Move clock by podInitialBackoffDuration, so that pods in the unschedulablePods would pass the backing off, // and the pods will be moved into activeQ. - c.Step(q.podInitialBackoffDuration) + c.Step(q.backoffQ.podInitialBackoffDuration()) q.flushBackoffQCompleted(logger) // flush the completed backoffQ to move hpp1 to activeQ. q.MoveAllToActiveOrBackoffQueue(logger, nodeAdd, nil, nil, nil) if q.activeQ.len() != 4 { t.Errorf("Expected 4 items to be in activeQ, but got: %v", q.activeQ.len()) } - if q.podBackoffQ.Len() != 0 { - t.Errorf("Expected 0 item to be in podBackoffQ, but got: %v", q.podBackoffQ.Len()) + if q.backoffQ.len() != 0 { + t.Errorf("Expected 0 item to be in backoffQ, but got: %v", q.backoffQ.len()) } expectInFlightPods(t, q, medPriorityPodInfo.Pod.UID) if len(q.unschedulablePods.podInfoMap) != 1 { @@ -1935,18 +1933,14 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueueWithOutQueueingHint(t *testi t.Errorf("Expected: %v after Pop, but got: %v", medPriorityPodInfo.Pod, p.Pod.Name) } // hpp2 won't be moved. - if q.podBackoffQ.Len() != 3 { - t.Fatalf("Expected 3 items to be in podBackoffQ, but got: %v", q.podBackoffQ.Len()) + if q.backoffQ.len() != 3 { + t.Fatalf("Expected 3 items to be in backoffQ, but got: %v", q.backoffQ.len()) } // pop out the pods in the backoffQ. // This doesn't make them in-flight pods. - for q.podBackoffQ.Len() != 0 { - _, err = q.podBackoffQ.Pop() - if err != nil { - t.Errorf("pop failed: %v", err) - } - } + c.Step(q.backoffQ.podMaxBackoffDuration()) + q.backoffQ.popEachBackoffCompleted(logger, nil) unschedulableQueuedPodInfo := attemptQueuedPodInfo(q.newQueuedPodInfo(unschedulablePodInfo.Pod, "fooPlugin")) highPriorityQueuedPodInfo := attemptQueuedPodInfo(q.newQueuedPodInfo(highPriorityPodInfo.Pod, "fooPlugin")) @@ -1971,18 +1965,20 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueueWithOutQueueingHint(t *testi t.Errorf("Expected %v in the unschedulablePods", pod.Name) } } - q.podBackoffQ.Get(hpp1QueuedPodInfo) + if !q.backoffQ.has(hpp1QueuedPodInfo) { + t.Errorf("Expected %v in the backoffQ", hpp1.Name) + } // Move clock by podInitialBackoffDuration, so that pods in the unschedulablePods would pass the backing off, // and the pods will be moved into activeQ. - c.Step(q.podInitialBackoffDuration) + c.Step(q.backoffQ.podInitialBackoffDuration()) q.flushBackoffQCompleted(logger) // flush the completed backoffQ to move hpp1 to activeQ. q.MoveAllToActiveOrBackoffQueue(logger, nodeAdd, nil, nil, nil) if q.activeQ.len() != 4 { t.Errorf("Expected 4 items to be in activeQ, but got: %v", q.activeQ.len()) } - if q.podBackoffQ.Len() != 0 { - t.Errorf("Expected 0 item to be in podBackoffQ, but got: %v", q.podBackoffQ.Len()) + if q.backoffQ.len() != 0 { + t.Errorf("Expected 0 item to be in backoffQ, but got: %v", q.backoffQ.len()) } if len(q.unschedulablePods.podInfoMap) != 1 { // hpp2 won't be moved regardless of its backoff timer. @@ -2456,12 +2452,12 @@ func TestPriorityQueue_NewWithOptions(t *testing.T) { WithPodMaxBackoffDuration(20*time.Second), ) - if q.podInitialBackoffDuration != 2*time.Second { - t.Errorf("Unexpected pod backoff initial duration. Expected: %v, got: %v", 2*time.Second, q.podInitialBackoffDuration) + if q.backoffQ.podInitialBackoffDuration() != 2*time.Second { + t.Errorf("Unexpected pod backoff initial duration. Expected: %v, got: %v", 2*time.Second, q.backoffQ.podInitialBackoffDuration()) } - if q.podMaxBackoffDuration != 20*time.Second { - t.Errorf("Unexpected pod backoff max duration. Expected: %v, got: %v", 2*time.Second, q.podMaxBackoffDuration) + if q.backoffQ.podMaxBackoffDuration() != 20*time.Second { + t.Errorf("Unexpected pod backoff max duration. Expected: %v, got: %v", 2*time.Second, q.backoffQ.podMaxBackoffDuration()) } } @@ -2993,7 +2989,7 @@ var ( queue.Update(logger, pInfo.Pod, newPod) } addPodBackoffQ = func(t *testing.T, logger klog.Logger, queue *PriorityQueue, pInfo *framework.QueuedPodInfo) { - queue.podBackoffQ.AddOrUpdate(pInfo) + queue.backoffQ.add(logger, pInfo) } moveAllToActiveOrBackoffQ = func(t *testing.T, logger klog.Logger, queue *PriorityQueue, _ *framework.QueuedPodInfo) { queue.MoveAllToActiveOrBackoffQueue(logger, framework.EventUnschedulableTimeout, nil, nil, nil) @@ -3639,12 +3635,12 @@ func TestBackOffFlow(t *testing.T) { // An event happens. q.MoveAllToActiveOrBackoffQueue(logger, framework.EventUnschedulableTimeout, nil, nil, nil) - if !q.podBackoffQ.Has(podInfo) { + if !q.backoffQ.has(podInfo) { t.Errorf("pod %v is not in the backoff queue", podID) } // Check backoff duration. - deadline := q.getBackoffTime(podInfo) + deadline := q.backoffQ.getBackoffTime(podInfo) backoff := deadline.Sub(timestamp) if backoff != step.wantBackoff { t.Errorf("got backoff %s, want %s", backoff, step.wantBackoff) @@ -3654,13 +3650,13 @@ func TestBackOffFlow(t *testing.T) { cl.Step(time.Millisecond) q.flushBackoffQCompleted(logger) // Still in backoff queue after an early flush. - if !q.podBackoffQ.Has(podInfo) { + if !q.backoffQ.has(podInfo) { t.Errorf("pod %v is not in the backoff queue", podID) } // Moved out of the backoff queue after timeout. cl.Step(backoff) q.flushBackoffQCompleted(logger) - if q.podBackoffQ.Has(podInfo) { + if q.backoffQ.has(podInfo) { t.Errorf("pod %v is still in the backoff queue", podID) } }) @@ -3725,10 +3721,11 @@ func TestMoveAllToActiveOrBackoffQueue_PreEnqueueChecks(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { + c := testingclock.NewFakeClock(time.Now()) logger, ctx := ktesting.NewTestContext(t) ctx, cancel := context.WithCancel(ctx) defer cancel() - q := NewTestQueue(ctx, newDefaultQueueSort()) + q := NewTestQueue(ctx, newDefaultQueueSort(), WithClock(c)) for i, podInfo := range tt.podInfos { // To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before AddUnschedulableIfNotPresent() below. q.Add(logger, podInfo.Pod) @@ -3749,13 +3746,10 @@ func TestMoveAllToActiveOrBackoffQueue_PreEnqueueChecks(t *testing.T) { } q.MoveAllToActiveOrBackoffQueue(logger, tt.event, nil, nil, tt.preEnqueueCheck) var got []string - for q.podBackoffQ.Len() != 0 { - queuedPodInfo, err := q.podBackoffQ.Pop() - if err != nil { - t.Fatalf("Fail to pop pod from backoffQ: %v", err) - } - got = append(got, queuedPodInfo.Pod.Name) - } + c.Step(2 * q.backoffQ.podMaxBackoffDuration()) + q.backoffQ.popEachBackoffCompleted(logger, func(pInfo *framework.QueuedPodInfo) { + got = append(got, pInfo.Pod.Name) + }) if diff := cmp.Diff(tt.want, got); diff != "" { t.Errorf("Unexpected diff (-want, +got):\n%s", diff) } @@ -3777,49 +3771,6 @@ func makeQueuedPodInfos(num int, namePrefix, label string, timestamp time.Time) return pInfos } -func TestPriorityQueue_calculateBackoffDuration(t *testing.T) { - tests := []struct { - name string - initialBackoffDuration time.Duration - maxBackoffDuration time.Duration - podInfo *framework.QueuedPodInfo - want time.Duration - }{ - { - name: "normal", - initialBackoffDuration: 1 * time.Nanosecond, - maxBackoffDuration: 32 * time.Nanosecond, - podInfo: &framework.QueuedPodInfo{Attempts: 16}, - want: 32 * time.Nanosecond, - }, - { - name: "overflow_32bit", - initialBackoffDuration: 1 * time.Nanosecond, - maxBackoffDuration: math.MaxInt32 * time.Nanosecond, - podInfo: &framework.QueuedPodInfo{Attempts: 32}, - want: math.MaxInt32 * time.Nanosecond, - }, - { - name: "overflow_64bit", - initialBackoffDuration: 1 * time.Nanosecond, - maxBackoffDuration: math.MaxInt64 * time.Nanosecond, - podInfo: &framework.QueuedPodInfo{Attempts: 64}, - want: math.MaxInt64 * time.Nanosecond, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - _, ctx := ktesting.NewTestContext(t) - ctx, cancel := context.WithCancel(ctx) - defer cancel() - q := NewTestQueue(ctx, newDefaultQueueSort(), WithPodInitialBackoffDuration(tt.initialBackoffDuration), WithPodMaxBackoffDuration(tt.maxBackoffDuration)) - if got := q.calculateBackoffDuration(tt.podInfo); got != tt.want { - t.Errorf("PriorityQueue.calculateBackoffDuration() = %v, want %v", got, tt.want) - } - }) - } -} - func mustNewTestPodInfo(t *testing.T, pod *v1.Pod) *framework.PodInfo { podInfo, err := framework.NewPodInfo(pod) if err != nil { @@ -4190,12 +4141,12 @@ func TestPriorityQueue_GetPod(t *testing.T) { }, } - _, ctx := ktesting.NewTestContext(t) + logger, ctx := ktesting.NewTestContext(t) q := NewTestQueue(ctx, newDefaultQueueSort()) q.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) { unlockedActiveQ.AddOrUpdate(newQueuedPodInfoForLookup(activeQPod)) }) - q.podBackoffQ.AddOrUpdate(newQueuedPodInfoForLookup(backoffQPod)) + q.backoffQ.add(logger, newQueuedPodInfoForLookup(backoffQPod)) q.unschedulablePods.addOrUpdate(newQueuedPodInfoForLookup(unschedPod)) tests := []struct {