diff --git a/pkg/scheduler/internal/queue/active_queue.go b/pkg/scheduler/internal/queue/active_queue.go index 91149bf5752..223c7229cdd 100644 --- a/pkg/scheduler/internal/queue/active_queue.go +++ b/pkg/scheduler/internal/queue/active_queue.go @@ -35,9 +35,11 @@ import ( // getLock() methods should be used only for unlocked() methods // and it is forbidden to call any other activeQueuer's method under this lock. type activeQueuer interface { - getLock() *sync.RWMutex - unlocked() unlockedActiveQueuer + underLock(func(unlockedActiveQ unlockedActiveQueuer)) + underRLock(func(unlockedActiveQ unlockedActiveQueueReader)) + update(newPod *v1.Pod, oldPodInfo *framework.QueuedPodInfo) *framework.QueuedPodInfo + delete(pInfo *framework.QueuedPodInfo) error pop(logger klog.Logger) (*framework.QueuedPodInfo, error) list() []*v1.Pod len() int @@ -56,12 +58,17 @@ type activeQueuer interface { } // unlockedActiveQueuer defines activeQ methods that are not protected by the lock itself. -// getLock() methods should be used to protect these methods. +// underLock() method should be used to protect these methods. type unlockedActiveQueuer interface { + unlockedActiveQueueReader + AddOrUpdate(pInfo *framework.QueuedPodInfo) +} + +// unlockedActiveQueueReader defines activeQ read-only methods that are not protected by the lock itself. +// underLock() or underRLock() method should be used to protect these methods. +type unlockedActiveQueueReader interface { Get(pInfo *framework.QueuedPodInfo) (*framework.QueuedPodInfo, bool) Has(pInfo *framework.QueuedPodInfo) bool - AddOrUpdate(pInfo *framework.QueuedPodInfo) - Delete(pInfo *framework.QueuedPodInfo) error } // activeQueue implements activeQueuer. All of the fields have to be protected using the lock. @@ -130,15 +137,44 @@ func newActiveQueue(queue *heap.Heap[*framework.QueuedPodInfo], isSchedulingQueu return aq } -// getLock returns lock of activeQueue. Its methods should be used only to protect the unlocked() methods. -func (aq *activeQueue) getLock() *sync.RWMutex { - return &aq.lock +// underLock runs the fn function under the lock.Lock. +// fn can run unlockedActiveQueuer methods but should NOT run any other activeQueue method, +// as it would end up in deadlock. +func (aq *activeQueue) underLock(fn func(unlockedActiveQ unlockedActiveQueuer)) { + aq.lock.Lock() + defer aq.lock.Unlock() + fn(aq.queue) } -// unlocked returns queue methods, that are not protected by the lock itself. -// getLock() methods should be used to protect queue methods. -func (aq *activeQueue) unlocked() unlockedActiveQueuer { - return aq.queue +// underLock runs the fn function under the lock.RLock. +// fn can run unlockedActiveQueueReader methods but should NOT run any other activeQueue method, +// as it would end up in deadlock. +func (aq *activeQueue) underRLock(fn func(unlockedActiveQ unlockedActiveQueueReader)) { + aq.lock.RLock() + defer aq.lock.RUnlock() + fn(aq.queue) +} + +// update updates the pod in activeQ if oldPodInfo is already in the queue. +// It returns new pod info if updated, nil otherwise. +func (aq *activeQueue) update(newPod *v1.Pod, oldPodInfo *framework.QueuedPodInfo) *framework.QueuedPodInfo { + aq.lock.Lock() + defer aq.lock.Unlock() + + if pInfo, exists := aq.queue.Get(oldPodInfo); exists { + _ = pInfo.Update(newPod) + aq.queue.AddOrUpdate(pInfo) + return pInfo + } + return nil +} + +// delete deletes the pod info from activeQ. +func (aq *activeQueue) delete(pInfo *framework.QueuedPodInfo) error { + aq.lock.Lock() + defer aq.lock.Unlock() + + return aq.queue.Delete(pInfo) } // pop removes the head of the queue and returns it. diff --git a/pkg/scheduler/internal/queue/scheduling_queue.go b/pkg/scheduler/internal/queue/scheduling_queue.go index b4fb251f742..25b1e993842 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue.go +++ b/pkg/scheduler/internal/queue/scheduling_queue.go @@ -541,35 +541,36 @@ func (p *PriorityQueue) moveToActiveQ(logger klog.Logger, pInfo *framework.Queue gatedBefore := pInfo.Gated pInfo.Gated = !p.runPreEnqueuePlugins(context.Background(), pInfo) - p.activeQ.getLock().Lock() - defer p.activeQ.getLock().Unlock() - if pInfo.Gated { - // Add the Pod to unschedulablePods if it's not passing PreEnqueuePlugins. - if p.activeQ.unlocked().Has(pInfo) { - return false + added := false + p.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) { + if pInfo.Gated { + // Add the Pod to unschedulablePods if it's not passing PreEnqueuePlugins. + if unlockedActiveQ.Has(pInfo) { + return + } + if p.podBackoffQ.Has(pInfo) { + return + } + p.unschedulablePods.addOrUpdate(pInfo) + return } - if p.podBackoffQ.Has(pInfo) { - return false + if pInfo.InitialAttemptTimestamp == nil { + now := p.clock.Now() + pInfo.InitialAttemptTimestamp = &now } - p.unschedulablePods.addOrUpdate(pInfo) - return false - } - if pInfo.InitialAttemptTimestamp == nil { - now := p.clock.Now() - pInfo.InitialAttemptTimestamp = &now - } - p.activeQ.unlocked().AddOrUpdate(pInfo) + unlockedActiveQ.AddOrUpdate(pInfo) + added = true - p.unschedulablePods.delete(pInfo.Pod, gatedBefore) - _ = p.podBackoffQ.Delete(pInfo) // Don't need to react when pInfo is not found. - logger.V(5).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pInfo.Pod), "event", event, "queue", activeQ) - metrics.SchedulerQueueIncomingPods.WithLabelValues("active", event).Inc() - if event == framework.PodAdd || event == framework.PodUpdate { - p.AddNominatedPod(logger, pInfo.PodInfo, nil) - } - - return true + p.unschedulablePods.delete(pInfo.Pod, gatedBefore) + _ = p.podBackoffQ.Delete(pInfo) // Don't need to react when pInfo is not found. + logger.V(5).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pInfo.Pod), "event", event, "queue", activeQ) + metrics.SchedulerQueueIncomingPods.WithLabelValues("active", event).Inc() + if event == framework.PodAdd || event == framework.PodUpdate { + p.AddNominatedPod(logger, pInfo.PodInfo, nil) + } + }) + return added } // Add adds a pod to the active queue. It should be called only when a new pod @@ -859,18 +860,6 @@ func isPodUpdated(oldPod, newPod *v1.Pod) bool { return !reflect.DeepEqual(strip(oldPod), strip(newPod)) } -func (p *PriorityQueue) updateInActiveQueue(logger klog.Logger, oldPod, newPod *v1.Pod, oldPodInfo *framework.QueuedPodInfo) bool { - p.activeQ.getLock().Lock() - defer p.activeQ.getLock().Unlock() - if pInfo, exists := p.activeQ.unlocked().Get(oldPodInfo); exists { - _ = pInfo.Update(newPod) - p.UpdateNominatedPod(logger, oldPod, pInfo.PodInfo) - p.activeQ.unlocked().AddOrUpdate(pInfo) - return true - } - return false -} - // Update updates a pod in the active or backoff queue if present. Otherwise, it removes // the item from the unschedulable queue if pod is updated in a way that it may // become schedulable and adds the updated one to the active queue. @@ -894,7 +883,8 @@ func (p *PriorityQueue) Update(logger klog.Logger, oldPod, newPod *v1.Pod) { if oldPod != nil { oldPodInfo := newQueuedPodInfoForLookup(oldPod) // If the pod is already in the active queue, just update it there. - if exists := p.updateInActiveQueue(logger, oldPod, newPod, oldPodInfo); exists { + if pInfo := p.activeQ.update(newPod, oldPodInfo); pInfo != nil { + p.UpdateNominatedPod(logger, oldPod, pInfo.PodInfo) return } @@ -964,9 +954,7 @@ func (p *PriorityQueue) Delete(pod *v1.Pod) { defer p.lock.Unlock() p.DeleteNominatedPodIfExists(pod) pInfo := newQueuedPodInfoForLookup(pod) - p.activeQ.getLock().Lock() - defer p.activeQ.getLock().Unlock() - if err := p.activeQ.unlocked().Delete(pInfo); err != nil { + if err := p.activeQ.delete(pInfo); err != nil { // The item was probably not found in the activeQ. p.podBackoffQ.Delete(pInfo) if pInfo = p.unschedulablePods.get(pod); pInfo != nil { @@ -1174,11 +1162,11 @@ func (p *PriorityQueue) PendingPods() ([]*v1.Pod, string) { } // Note: this function assumes the caller locks both p.lock.RLock and p.activeQ.getLock().RLock. -func (p *PriorityQueue) nominatedPodToInfo(np podRef) *framework.PodInfo { +func (p *PriorityQueue) nominatedPodToInfo(np podRef, unlockedActiveQ unlockedActiveQueueReader) *framework.PodInfo { pod := np.toPod() pInfoLookup := newQueuedPodInfoForLookup(pod) - queuedPodInfo, exists := p.activeQ.unlocked().Get(pInfoLookup) + queuedPodInfo, exists := unlockedActiveQ.Get(pInfoLookup) if exists { return queuedPodInfo.PodInfo } @@ -1213,12 +1201,12 @@ func (p *PriorityQueue) NominatedPodsForNode(nodeName string) []*framework.PodIn defer p.lock.RUnlock() nominatedPods := p.nominator.nominatedPodsForNode(nodeName) - p.activeQ.getLock().RLock() - defer p.activeQ.getLock().RUnlock() pods := make([]*framework.PodInfo, len(nominatedPods)) - for i, np := range nominatedPods { - pods[i] = p.nominatedPodToInfo(np).DeepCopy() - } + p.activeQ.underRLock(func(unlockedActiveQ unlockedActiveQueueReader) { + for i, np := range nominatedPods { + pods[i] = p.nominatedPodToInfo(np, unlockedActiveQ).DeepCopy() + } + }) return pods } diff --git a/pkg/scheduler/internal/queue/scheduling_queue_test.go b/pkg/scheduler/internal/queue/scheduling_queue_test.go index 9a6e4cc1e9b..566fe6aa5b0 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue_test.go +++ b/pkg/scheduler/internal/queue/scheduling_queue_test.go @@ -1064,9 +1064,8 @@ func TestPriorityQueue_Update(t *testing.T) { name: "when updating a pod which is in flightPods, the pod will not be added to any queue", wantQ: notInAnyQueue, prepareFunc: func(t *testing.T, logger klog.Logger, q *PriorityQueue) (oldPod, newPod *v1.Pod) { - podInfo := q.newQueuedPodInfo(medPriorityPodInfo.Pod) // We need to once add this Pod to activeQ and Pop() it so that this Pod is registered correctly in inFlightPods. - q.activeQ.unlocked().AddOrUpdate(podInfo) + q.Add(logger, medPriorityPodInfo.Pod) if p, err := q.Pop(logger); err != nil || p.Pod != medPriorityPodInfo.Pod { t.Errorf("Expected: %v after Pop, but got: %v", medPriorityPodInfo.Pod.Name, p.Pod.Name) } @@ -1102,12 +1101,14 @@ func TestPriorityQueue_Update(t *testing.T) { pInfo = pInfoFromBackoff } - if pInfoFromActive, exists := q.activeQ.unlocked().Get(newQueuedPodInfoForLookup(newPod)); exists { - if tt.wantQ != activeQ { - t.Errorf("expected pod %s not to be queued to activeQ, but it was", newPod.Name) + q.activeQ.underRLock(func(unlockedActiveQ unlockedActiveQueueReader) { + if pInfoFromActive, exists := unlockedActiveQ.Get(newQueuedPodInfoForLookup(newPod)); exists { + if tt.wantQ != activeQ { + t.Errorf("expected pod %s not to be queued to activeQ, but it was", newPod.Name) + } + pInfo = pInfoFromActive } - pInfo = pInfoFromActive - } + }) if pInfoFromUnsched := q.unschedulablePods.get(newPod); pInfoFromUnsched != nil { if tt.wantQ != unschedulablePods { @@ -1197,10 +1198,10 @@ func TestPriorityQueue_Delete(t *testing.T) { q.Update(logger, highPriorityPodInfo.Pod, highPriNominatedPodInfo.Pod) q.Add(logger, unschedulablePodInfo.Pod) q.Delete(highPriNominatedPodInfo.Pod) - if !q.activeQ.unlocked().Has(newQueuedPodInfoForLookup(unschedulablePodInfo.Pod)) { + if !q.activeQ.has(newQueuedPodInfoForLookup(unschedulablePodInfo.Pod)) { t.Errorf("Expected %v to be in activeQ.", unschedulablePodInfo.Pod.Name) } - if q.activeQ.unlocked().Has(newQueuedPodInfoForLookup(highPriNominatedPodInfo.Pod)) { + if q.activeQ.has(newQueuedPodInfoForLookup(highPriNominatedPodInfo.Pod)) { t.Errorf("Didn't expect %v to be in activeQ.", highPriorityPodInfo.Pod.Name) } if len(q.nominator.nominatedPods) != 1 { @@ -1217,13 +1218,13 @@ func TestPriorityQueue_Activate(t *testing.T) { name string qPodInfoInUnschedulablePods []*framework.QueuedPodInfo qPodInfoInPodBackoffQ []*framework.QueuedPodInfo - qPodInfoInActiveQ []*framework.QueuedPodInfo + qPodInActiveQ []*v1.Pod qPodInfoToActivate *framework.QueuedPodInfo want []*framework.QueuedPodInfo }{ { name: "pod already in activeQ", - qPodInfoInActiveQ: []*framework.QueuedPodInfo{{PodInfo: highPriNominatedPodInfo}}, + qPodInActiveQ: []*v1.Pod{highPriNominatedPodInfo.Pod}, qPodInfoToActivate: &framework.QueuedPodInfo{PodInfo: highPriNominatedPodInfo}, want: []*framework.QueuedPodInfo{{PodInfo: highPriNominatedPodInfo}}, // 1 already active }, @@ -1255,8 +1256,8 @@ func TestPriorityQueue_Activate(t *testing.T) { q := NewTestQueueWithObjects(ctx, newDefaultQueueSort(), objs) // Prepare activeQ/unschedulablePods/podBackoffQ according to the table - for _, qPodInfo := range tt.qPodInfoInActiveQ { - q.activeQ.unlocked().AddOrUpdate(qPodInfo) + for _, qPod := range tt.qPodInActiveQ { + q.Add(logger, qPod) } for _, qPodInfo := range tt.qPodInfoInUnschedulablePods { @@ -1277,7 +1278,7 @@ func TestPriorityQueue_Activate(t *testing.T) { // Check if the specific pod exists in activeQ for _, want := range tt.want { - if !q.activeQ.unlocked().Has(newQueuedPodInfoForLookup(want.PodInfo.Pod)) { + if !q.activeQ.has(newQueuedPodInfoForLookup(want.PodInfo.Pod)) { t.Errorf("podInfo not exist in activeQ: want %v", want.PodInfo.Pod.Name) } } @@ -1563,7 +1564,7 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueueWithQueueingHint(t *testing. } cl := testingclock.NewFakeClock(now) q := NewTestQueue(ctx, newDefaultQueueSort(), WithQueueingHintMapPerProfile(m), WithClock(cl)) - q.activeQ.unlocked().AddOrUpdate(q.newQueuedPodInfo(test.podInfo.Pod)) + q.Add(logger, test.podInfo.Pod) if p, err := q.Pop(logger); err != nil || p.Pod != test.podInfo.Pod { t.Errorf("Expected: %v after Pop, but got: %v", test.podInfo.Pod.Name, p.Pod.Name) } @@ -1607,12 +1608,12 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueue(t *testing.T) { } q := NewTestQueue(ctx, newDefaultQueueSort(), WithClock(c), WithQueueingHintMapPerProfile(m)) // To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before AddUnschedulableIfNotPresent()s below. - q.activeQ.unlocked().AddOrUpdate(q.newQueuedPodInfo(unschedulablePodInfo.Pod)) + q.Add(logger, unschedulablePodInfo.Pod) if p, err := q.Pop(logger); err != nil || p.Pod != unschedulablePodInfo.Pod { t.Errorf("Expected: %v after Pop, but got: %v", unschedulablePodInfo.Pod.Name, p.Pod.Name) } expectInFlightPods(t, q, unschedulablePodInfo.Pod.UID) - q.activeQ.unlocked().AddOrUpdate(q.newQueuedPodInfo(highPriorityPodInfo.Pod)) + q.Add(logger, highPriorityPodInfo.Pod) if p, err := q.Pop(logger); err != nil || p.Pod != highPriorityPodInfo.Pod { t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPodInfo.Pod.Name, p.Pod.Name) } @@ -1628,7 +1629,7 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueue(t *testing.T) { expectInFlightPods(t, q) // Construct a Pod, but don't associate its scheduler failure to any plugin hpp1 := clonePod(highPriorityPodInfo.Pod, "hpp1") - q.activeQ.unlocked().AddOrUpdate(q.newQueuedPodInfo(hpp1)) + q.Add(logger, hpp1) if p, err := q.Pop(logger); err != nil || p.Pod != hpp1 { t.Errorf("Expected: %v after Pop, but got: %v", hpp1, p.Pod.Name) } @@ -1641,7 +1642,7 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueue(t *testing.T) { expectInFlightPods(t, q) // Construct another Pod, and associate its scheduler failure to plugin "barPlugin". hpp2 := clonePod(highPriorityPodInfo.Pod, "hpp2") - q.activeQ.unlocked().AddOrUpdate(q.newQueuedPodInfo(hpp2)) + q.Add(logger, hpp2) if p, err := q.Pop(logger); err != nil || p.Pod != hpp2 { t.Errorf("Expected: %v after Pop, but got: %v", hpp2, p.Pod.Name) } @@ -1676,17 +1677,17 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueue(t *testing.T) { } expectInFlightPods(t, q, medPriorityPodInfo.Pod.UID) - q.activeQ.unlocked().AddOrUpdate(q.newQueuedPodInfo(unschedulablePodInfo.Pod)) + q.Add(logger, unschedulablePodInfo.Pod) if p, err := q.Pop(logger); err != nil || p.Pod != unschedulablePodInfo.Pod { t.Errorf("Expected: %v after Pop, but got: %v", unschedulablePodInfo.Pod.Name, p.Pod.Name) } expectInFlightPods(t, q, medPriorityPodInfo.Pod.UID, unschedulablePodInfo.Pod.UID) - q.activeQ.unlocked().AddOrUpdate(q.newQueuedPodInfo(highPriorityPodInfo.Pod)) + q.Add(logger, highPriorityPodInfo.Pod) if p, err := q.Pop(logger); err != nil || p.Pod != highPriorityPodInfo.Pod { t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPodInfo.Pod.Name, p.Pod.Name) } expectInFlightPods(t, q, medPriorityPodInfo.Pod.UID, unschedulablePodInfo.Pod.UID, highPriorityPodInfo.Pod.UID) - q.activeQ.unlocked().AddOrUpdate(q.newQueuedPodInfo(hpp1)) + q.Add(logger, hpp1) if p, err := q.Pop(logger); err != nil || p.Pod != hpp1 { t.Errorf("Expected: %v after Pop, but got: %v", hpp1, p.Pod.Name) } @@ -1946,7 +1947,7 @@ func TestPriorityQueue_AssignedPodAdded_(t *testing.T) { q := NewTestQueue(ctx, newDefaultQueueSort(), WithClock(c), WithQueueingHintMapPerProfile(m)) // To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before AddUnschedulableIfNotPresent()s below. - q.activeQ.unlocked().AddOrUpdate(q.newQueuedPodInfo(tt.unschedPod)) + q.Add(logger, tt.unschedPod) if p, err := q.Pop(logger); err != nil || p.Pod != tt.unschedPod { t.Errorf("Expected: %v after Pop, but got: %v", tt.unschedPod.Name, p.Pod.Name) } @@ -1961,7 +1962,7 @@ func TestPriorityQueue_AssignedPodAdded_(t *testing.T) { q.AssignedPodAdded(logger, tt.updatedAssignedPod) - if q.activeQ.unlocked().Has(newQueuedPodInfoForLookup(tt.unschedPod)) != tt.wantToRequeue { + if q.activeQ.has(newQueuedPodInfoForLookup(tt.unschedPod)) != tt.wantToRequeue { t.Fatalf("unexpected Pod move: Pod should be requeued: %v. Pod is actually requeued: %v", tt.wantToRequeue, !tt.wantToRequeue) } }) @@ -2061,11 +2062,11 @@ func TestPriorityQueue_PendingPods(t *testing.T) { defer cancel() q := NewTestQueue(ctx, newDefaultQueueSort()) // To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before AddUnschedulableIfNotPresent()s below. - q.activeQ.unlocked().AddOrUpdate(q.newQueuedPodInfo(unschedulablePodInfo.Pod)) + q.Add(logger, unschedulablePodInfo.Pod) if p, err := q.Pop(logger); err != nil || p.Pod != unschedulablePodInfo.Pod { t.Errorf("Expected: %v after Pop, but got: %v", unschedulablePodInfo.Pod.Name, p.Pod.Name) } - q.activeQ.unlocked().AddOrUpdate(q.newQueuedPodInfo(highPriorityPodInfo.Pod)) + q.Add(logger, highPriorityPodInfo.Pod) if p, err := q.Pop(logger); err != nil || p.Pod != highPriorityPodInfo.Pod { t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPodInfo.Pod.Name, p.Pod.Name) } @@ -2411,7 +2412,7 @@ func TestPodFailedSchedulingMultipleTimesDoesNotBlockNewerPod(t *testing.T) { }) // To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before AddUnschedulableIfNotPresent() below. - q.activeQ.unlocked().AddOrUpdate(q.newQueuedPodInfo(unschedulablePod)) + q.Add(logger, unschedulablePod) if p, err := q.Pop(logger); err != nil || p.Pod != unschedulablePod { t.Errorf("Expected: %v after Pop, but got: %v", unschedulablePod.Name, p.Pod.Name) } @@ -2549,11 +2550,11 @@ func TestHighPriorityFlushUnschedulablePodsLeftover(t *testing.T) { }) // To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before AddUnschedulableIfNotPresent()s below. - q.activeQ.unlocked().AddOrUpdate(q.newQueuedPodInfo(highPod)) + q.Add(logger, highPod) if p, err := q.Pop(logger); err != nil || p.Pod != highPod { t.Errorf("Expected: %v after Pop, but got: %v", highPod.Name, p.Pod.Name) } - q.activeQ.unlocked().AddOrUpdate(q.newQueuedPodInfo(midPod)) + q.Add(logger, midPod) if p, err := q.Pop(logger); err != nil || p.Pod != midPod { t.Errorf("Expected: %v after Pop, but got: %v", midPod.Name, p.Pod.Name) } @@ -2673,7 +2674,7 @@ var ( // To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before AddUnschedulableIfNotPresent() below. // UnschedulablePlugins will get cleared by Pop, so make a copy first. unschedulablePlugins := pInfo.UnschedulablePlugins.Clone() - queue.activeQ.unlocked().AddOrUpdate(queue.newQueuedPodInfo(pInfo.Pod)) + queue.Add(logger, pInfo.Pod) p, err := queue.Pop(logger) if err != nil { t.Fatalf("Unexpected error during Pop: %v", err) @@ -2689,7 +2690,7 @@ var ( } popAndRequeueAsBackoff = func(t *testing.T, logger klog.Logger, queue *PriorityQueue, pInfo *framework.QueuedPodInfo) { // To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before AddUnschedulableIfNotPresent() below. - queue.activeQ.unlocked().AddOrUpdate(queue.newQueuedPodInfo(pInfo.Pod)) + queue.Add(logger, pInfo.Pod) p, err := queue.Pop(logger) if err != nil { t.Fatalf("Unexpected error during Pop: %v", err) @@ -2703,7 +2704,12 @@ var ( } } addPodActiveQ = func(t *testing.T, logger klog.Logger, queue *PriorityQueue, pInfo *framework.QueuedPodInfo) { - queue.activeQ.unlocked().AddOrUpdate(pInfo) + queue.Add(logger, pInfo.Pod) + } + addPodActiveQDirectly = func(t *testing.T, logger klog.Logger, queue *PriorityQueue, pInfo *framework.QueuedPodInfo) { + queue.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) { + unlockedActiveQ.AddOrUpdate(pInfo) + }) } addPodUnschedulablePods = func(t *testing.T, logger klog.Logger, queue *PriorityQueue, pInfo *framework.QueuedPodInfo) { if !pInfo.Gated { @@ -2768,8 +2774,9 @@ func TestPodTimestamp(t *testing.T) { { name: "add two pod to activeQ and sort them by the timestamp", operations: []operation{ - addPodActiveQ, - addPodActiveQ, + // Need to add the pods directly to the activeQ to override the timestamps. + addPodActiveQDirectly, + addPodActiveQDirectly, }, operands: []*framework.QueuedPodInfo{pInfo2, pInfo1}, expected: []*framework.QueuedPodInfo{pInfo1, pInfo2}, @@ -2788,7 +2795,8 @@ func TestPodTimestamp(t *testing.T) { { name: "add one pod to BackoffQ and move it to activeQ", operations: []operation{ - addPodActiveQ, + // Need to add the pods directly to activeQ to override the timestamps. + addPodActiveQDirectly, addPodBackoffQ, flushBackoffQ, moveAllToActiveOrBackoffQ, @@ -3254,7 +3262,7 @@ func TestIncomingPodsMetrics(t *testing.T) { operations: []operation{ popAndRequeueAsUnschedulable, }, - want: ` + want: `scheduler_queue_incoming_pods_total{event="PodAdd",queue="active"} 3 scheduler_queue_incoming_pods_total{event="ScheduleAttemptFailure",queue="unschedulable"} 3 `, }, @@ -3264,7 +3272,8 @@ func TestIncomingPodsMetrics(t *testing.T) { popAndRequeueAsUnschedulable, moveAllToActiveOrBackoffQ, }, - want: ` scheduler_queue_incoming_pods_total{event="ScheduleAttemptFailure",queue="unschedulable"} 3 + want: `scheduler_queue_incoming_pods_total{event="PodAdd",queue="active"} 3 + scheduler_queue_incoming_pods_total{event="ScheduleAttemptFailure",queue="unschedulable"} 3 scheduler_queue_incoming_pods_total{event="UnschedulableTimeout",queue="backoff"} 3 `, }, @@ -3275,7 +3284,8 @@ func TestIncomingPodsMetrics(t *testing.T) { moveClockForward, moveAllToActiveOrBackoffQ, }, - want: ` scheduler_queue_incoming_pods_total{event="ScheduleAttemptFailure",queue="unschedulable"} 3 + want: `scheduler_queue_incoming_pods_total{event="PodAdd",queue="active"} 3 + scheduler_queue_incoming_pods_total{event="ScheduleAttemptFailure",queue="unschedulable"} 3 scheduler_queue_incoming_pods_total{event="UnschedulableTimeout",queue="active"} 3 `, }, @@ -3286,7 +3296,8 @@ func TestIncomingPodsMetrics(t *testing.T) { moveClockForward, flushBackoffQ, }, - want: ` scheduler_queue_incoming_pods_total{event="BackoffComplete",queue="active"} 3 + want: `scheduler_queue_incoming_pods_total{event="PodAdd",queue="active"} 3 + scheduler_queue_incoming_pods_total{event="BackoffComplete",queue="active"} 3 scheduler_queue_incoming_pods_total{event="ScheduleAttemptFailure",queue="backoff"} 3 `, }, @@ -3449,7 +3460,7 @@ func TestMoveAllToActiveOrBackoffQueue_PreEnqueueChecks(t *testing.T) { q := NewTestQueue(ctx, newDefaultQueueSort()) for i, podInfo := range tt.podInfos { // To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before AddUnschedulableIfNotPresent() below. - q.activeQ.unlocked().AddOrUpdate(q.newQueuedPodInfo(podInfo.Pod)) + q.Add(logger, podInfo.Pod) if p, err := q.Pop(logger); err != nil || p.Pod != podInfo.Pod { t.Errorf("Expected: %v after Pop, but got: %v", podInfo.Pod.Name, p.Pod.Name) }