From 9773a39b28a30dae6b8b471205b6e746440f068c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maciej=20Skocze=C5=84?= Date: Tue, 20 Aug 2024 08:51:21 +0000 Subject: [PATCH 1/3] Don't expose lock outside activeQueue in scheduling queue --- pkg/scheduler/internal/queue/active_queue.go | 27 +++-- .../internal/queue/scheduling_queue.go | 100 +++++++++--------- .../internal/queue/scheduling_queue_test.go | 100 +++++++++++++----- 3 files changed, 139 insertions(+), 88 deletions(-) diff --git a/pkg/scheduler/internal/queue/active_queue.go b/pkg/scheduler/internal/queue/active_queue.go index 91149bf5752..4eb19a9448f 100644 --- a/pkg/scheduler/internal/queue/active_queue.go +++ b/pkg/scheduler/internal/queue/active_queue.go @@ -35,8 +35,8 @@ import ( // getLock() methods should be used only for unlocked() methods // and it is forbidden to call any other activeQueuer's method under this lock. type activeQueuer interface { - getLock() *sync.RWMutex - unlocked() unlockedActiveQueuer + underLock(func(unlockedActiveQ unlockedActiveQueuer)) + underRLock(func(unlockedActiveQ unlockedActiveQueuer)) pop(logger klog.Logger) (*framework.QueuedPodInfo, error) list() []*v1.Pod @@ -56,7 +56,7 @@ type activeQueuer interface { } // unlockedActiveQueuer defines activeQ methods that are not protected by the lock itself. -// getLock() methods should be used to protect these methods. +// underLock() and underRLock() methods should be used to protect these methods. type unlockedActiveQueuer interface { Get(pInfo *framework.QueuedPodInfo) (*framework.QueuedPodInfo, bool) Has(pInfo *framework.QueuedPodInfo) bool @@ -130,15 +130,22 @@ func newActiveQueue(queue *heap.Heap[*framework.QueuedPodInfo], isSchedulingQueu return aq } -// getLock returns lock of activeQueue. Its methods should be used only to protect the unlocked() methods. -func (aq *activeQueue) getLock() *sync.RWMutex { - return &aq.lock +// underLock runs the fn function under the lock.Lock. +// fn can run unlockedActiveQueuer methods but should NOT run any other activeQueue method, +// as it would end up in deadlock. +func (aq *activeQueue) underLock(fn func(unlockedActiveQ unlockedActiveQueuer)) { + aq.lock.Lock() + defer aq.lock.Unlock() + fn(aq.queue) } -// unlocked returns queue methods, that are not protected by the lock itself. -// getLock() methods should be used to protect queue methods. -func (aq *activeQueue) unlocked() unlockedActiveQueuer { - return aq.queue +// underLock runs the fn function under the lock.RLock. +// fn can run unlockedActiveQueuer methods but should NOT run any other activeQueue method, +// as it would end up in deadlock. +func (aq *activeQueue) underRLock(fn func(unlockedActiveQ unlockedActiveQueuer)) { + aq.lock.RLock() + defer aq.lock.RUnlock() + fn(aq.queue) } // pop removes the head of the queue and returns it. diff --git a/pkg/scheduler/internal/queue/scheduling_queue.go b/pkg/scheduler/internal/queue/scheduling_queue.go index b4fb251f742..ed0a4dc8577 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue.go +++ b/pkg/scheduler/internal/queue/scheduling_queue.go @@ -541,35 +541,36 @@ func (p *PriorityQueue) moveToActiveQ(logger klog.Logger, pInfo *framework.Queue gatedBefore := pInfo.Gated pInfo.Gated = !p.runPreEnqueuePlugins(context.Background(), pInfo) - p.activeQ.getLock().Lock() - defer p.activeQ.getLock().Unlock() - if pInfo.Gated { - // Add the Pod to unschedulablePods if it's not passing PreEnqueuePlugins. - if p.activeQ.unlocked().Has(pInfo) { - return false + added := false + p.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) { + if pInfo.Gated { + // Add the Pod to unschedulablePods if it's not passing PreEnqueuePlugins. + if unlockedActiveQ.Has(pInfo) { + return + } + if p.podBackoffQ.Has(pInfo) { + return + } + p.unschedulablePods.addOrUpdate(pInfo) + return } - if p.podBackoffQ.Has(pInfo) { - return false + if pInfo.InitialAttemptTimestamp == nil { + now := p.clock.Now() + pInfo.InitialAttemptTimestamp = &now } - p.unschedulablePods.addOrUpdate(pInfo) - return false - } - if pInfo.InitialAttemptTimestamp == nil { - now := p.clock.Now() - pInfo.InitialAttemptTimestamp = &now - } - p.activeQ.unlocked().AddOrUpdate(pInfo) + unlockedActiveQ.AddOrUpdate(pInfo) + added = true - p.unschedulablePods.delete(pInfo.Pod, gatedBefore) - _ = p.podBackoffQ.Delete(pInfo) // Don't need to react when pInfo is not found. - logger.V(5).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pInfo.Pod), "event", event, "queue", activeQ) - metrics.SchedulerQueueIncomingPods.WithLabelValues("active", event).Inc() - if event == framework.PodAdd || event == framework.PodUpdate { - p.AddNominatedPod(logger, pInfo.PodInfo, nil) - } - - return true + p.unschedulablePods.delete(pInfo.Pod, gatedBefore) + _ = p.podBackoffQ.Delete(pInfo) // Don't need to react when pInfo is not found. + logger.V(5).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pInfo.Pod), "event", event, "queue", activeQ) + metrics.SchedulerQueueIncomingPods.WithLabelValues("active", event).Inc() + if event == framework.PodAdd || event == framework.PodUpdate { + p.AddNominatedPod(logger, pInfo.PodInfo, nil) + } + }) + return added } // Add adds a pod to the active queue. It should be called only when a new pod @@ -860,15 +861,16 @@ func isPodUpdated(oldPod, newPod *v1.Pod) bool { } func (p *PriorityQueue) updateInActiveQueue(logger klog.Logger, oldPod, newPod *v1.Pod, oldPodInfo *framework.QueuedPodInfo) bool { - p.activeQ.getLock().Lock() - defer p.activeQ.getLock().Unlock() - if pInfo, exists := p.activeQ.unlocked().Get(oldPodInfo); exists { - _ = pInfo.Update(newPod) - p.UpdateNominatedPod(logger, oldPod, pInfo.PodInfo) - p.activeQ.unlocked().AddOrUpdate(pInfo) - return true - } - return false + exists := false + p.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) { + if pInfo, exists := unlockedActiveQ.Get(oldPodInfo); exists { + _ = pInfo.Update(newPod) + p.UpdateNominatedPod(logger, oldPod, pInfo.PodInfo) + unlockedActiveQ.AddOrUpdate(pInfo) + exists = true + } + }) + return exists } // Update updates a pod in the active or backoff queue if present. Otherwise, it removes @@ -964,15 +966,15 @@ func (p *PriorityQueue) Delete(pod *v1.Pod) { defer p.lock.Unlock() p.DeleteNominatedPodIfExists(pod) pInfo := newQueuedPodInfoForLookup(pod) - p.activeQ.getLock().Lock() - defer p.activeQ.getLock().Unlock() - if err := p.activeQ.unlocked().Delete(pInfo); err != nil { - // The item was probably not found in the activeQ. - p.podBackoffQ.Delete(pInfo) - if pInfo = p.unschedulablePods.get(pod); pInfo != nil { - p.unschedulablePods.delete(pod, pInfo.Gated) + p.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) { + if err := unlockedActiveQ.Delete(pInfo); err != nil { + // The item was probably not found in the activeQ. + p.podBackoffQ.Delete(pInfo) + if pInfo = p.unschedulablePods.get(pod); pInfo != nil { + p.unschedulablePods.delete(pod, pInfo.Gated) + } } - } + }) } // AssignedPodAdded is called when a bound pod is added. Creation of this pod @@ -1174,11 +1176,11 @@ func (p *PriorityQueue) PendingPods() ([]*v1.Pod, string) { } // Note: this function assumes the caller locks both p.lock.RLock and p.activeQ.getLock().RLock. -func (p *PriorityQueue) nominatedPodToInfo(np podRef) *framework.PodInfo { +func (p *PriorityQueue) nominatedPodToInfo(np podRef, unlockedActiveQ unlockedActiveQueuer) *framework.PodInfo { pod := np.toPod() pInfoLookup := newQueuedPodInfoForLookup(pod) - queuedPodInfo, exists := p.activeQ.unlocked().Get(pInfoLookup) + queuedPodInfo, exists := unlockedActiveQ.Get(pInfoLookup) if exists { return queuedPodInfo.PodInfo } @@ -1213,12 +1215,12 @@ func (p *PriorityQueue) NominatedPodsForNode(nodeName string) []*framework.PodIn defer p.lock.RUnlock() nominatedPods := p.nominator.nominatedPodsForNode(nodeName) - p.activeQ.getLock().RLock() - defer p.activeQ.getLock().RUnlock() pods := make([]*framework.PodInfo, len(nominatedPods)) - for i, np := range nominatedPods { - pods[i] = p.nominatedPodToInfo(np).DeepCopy() - } + p.activeQ.underRLock(func(unlockedActiveQ unlockedActiveQueuer) { + for i, np := range nominatedPods { + pods[i] = p.nominatedPodToInfo(np, unlockedActiveQ).DeepCopy() + } + }) return pods } diff --git a/pkg/scheduler/internal/queue/scheduling_queue_test.go b/pkg/scheduler/internal/queue/scheduling_queue_test.go index 9a6e4cc1e9b..bdf4e037ee3 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue_test.go +++ b/pkg/scheduler/internal/queue/scheduling_queue_test.go @@ -1066,7 +1066,9 @@ func TestPriorityQueue_Update(t *testing.T) { prepareFunc: func(t *testing.T, logger klog.Logger, q *PriorityQueue) (oldPod, newPod *v1.Pod) { podInfo := q.newQueuedPodInfo(medPriorityPodInfo.Pod) // We need to once add this Pod to activeQ and Pop() it so that this Pod is registered correctly in inFlightPods. - q.activeQ.unlocked().AddOrUpdate(podInfo) + q.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) { + unlockedActiveQ.AddOrUpdate(podInfo) + }) if p, err := q.Pop(logger); err != nil || p.Pod != medPriorityPodInfo.Pod { t.Errorf("Expected: %v after Pop, but got: %v", medPriorityPodInfo.Pod.Name, p.Pod.Name) } @@ -1102,12 +1104,14 @@ func TestPriorityQueue_Update(t *testing.T) { pInfo = pInfoFromBackoff } - if pInfoFromActive, exists := q.activeQ.unlocked().Get(newQueuedPodInfoForLookup(newPod)); exists { - if tt.wantQ != activeQ { - t.Errorf("expected pod %s not to be queued to activeQ, but it was", newPod.Name) + q.activeQ.underRLock(func(unlockedActiveQ unlockedActiveQueuer) { + if pInfoFromActive, exists := unlockedActiveQ.Get(newQueuedPodInfoForLookup(newPod)); exists { + if tt.wantQ != activeQ { + t.Errorf("expected pod %s not to be queued to activeQ, but it was", newPod.Name) + } + pInfo = pInfoFromActive } - pInfo = pInfoFromActive - } + }) if pInfoFromUnsched := q.unschedulablePods.get(newPod); pInfoFromUnsched != nil { if tt.wantQ != unschedulablePods { @@ -1197,10 +1201,10 @@ func TestPriorityQueue_Delete(t *testing.T) { q.Update(logger, highPriorityPodInfo.Pod, highPriNominatedPodInfo.Pod) q.Add(logger, unschedulablePodInfo.Pod) q.Delete(highPriNominatedPodInfo.Pod) - if !q.activeQ.unlocked().Has(newQueuedPodInfoForLookup(unschedulablePodInfo.Pod)) { + if !q.activeQ.has(newQueuedPodInfoForLookup(unschedulablePodInfo.Pod)) { t.Errorf("Expected %v to be in activeQ.", unschedulablePodInfo.Pod.Name) } - if q.activeQ.unlocked().Has(newQueuedPodInfoForLookup(highPriNominatedPodInfo.Pod)) { + if q.activeQ.has(newQueuedPodInfoForLookup(highPriNominatedPodInfo.Pod)) { t.Errorf("Didn't expect %v to be in activeQ.", highPriorityPodInfo.Pod.Name) } if len(q.nominator.nominatedPods) != 1 { @@ -1256,7 +1260,9 @@ func TestPriorityQueue_Activate(t *testing.T) { // Prepare activeQ/unschedulablePods/podBackoffQ according to the table for _, qPodInfo := range tt.qPodInfoInActiveQ { - q.activeQ.unlocked().AddOrUpdate(qPodInfo) + q.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) { + unlockedActiveQ.AddOrUpdate(qPodInfo) + }) } for _, qPodInfo := range tt.qPodInfoInUnschedulablePods { @@ -1277,7 +1283,7 @@ func TestPriorityQueue_Activate(t *testing.T) { // Check if the specific pod exists in activeQ for _, want := range tt.want { - if !q.activeQ.unlocked().Has(newQueuedPodInfoForLookup(want.PodInfo.Pod)) { + if !q.activeQ.has(newQueuedPodInfoForLookup(want.PodInfo.Pod)) { t.Errorf("podInfo not exist in activeQ: want %v", want.PodInfo.Pod.Name) } } @@ -1563,7 +1569,9 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueueWithQueueingHint(t *testing. } cl := testingclock.NewFakeClock(now) q := NewTestQueue(ctx, newDefaultQueueSort(), WithQueueingHintMapPerProfile(m), WithClock(cl)) - q.activeQ.unlocked().AddOrUpdate(q.newQueuedPodInfo(test.podInfo.Pod)) + q.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) { + unlockedActiveQ.AddOrUpdate(q.newQueuedPodInfo(test.podInfo.Pod)) + }) if p, err := q.Pop(logger); err != nil || p.Pod != test.podInfo.Pod { t.Errorf("Expected: %v after Pop, but got: %v", test.podInfo.Pod.Name, p.Pod.Name) } @@ -1607,12 +1615,16 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueue(t *testing.T) { } q := NewTestQueue(ctx, newDefaultQueueSort(), WithClock(c), WithQueueingHintMapPerProfile(m)) // To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before AddUnschedulableIfNotPresent()s below. - q.activeQ.unlocked().AddOrUpdate(q.newQueuedPodInfo(unschedulablePodInfo.Pod)) + q.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) { + unlockedActiveQ.AddOrUpdate(q.newQueuedPodInfo(unschedulablePodInfo.Pod)) + }) if p, err := q.Pop(logger); err != nil || p.Pod != unschedulablePodInfo.Pod { t.Errorf("Expected: %v after Pop, but got: %v", unschedulablePodInfo.Pod.Name, p.Pod.Name) } expectInFlightPods(t, q, unschedulablePodInfo.Pod.UID) - q.activeQ.unlocked().AddOrUpdate(q.newQueuedPodInfo(highPriorityPodInfo.Pod)) + q.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) { + unlockedActiveQ.AddOrUpdate(q.newQueuedPodInfo(highPriorityPodInfo.Pod)) + }) if p, err := q.Pop(logger); err != nil || p.Pod != highPriorityPodInfo.Pod { t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPodInfo.Pod.Name, p.Pod.Name) } @@ -1628,7 +1640,9 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueue(t *testing.T) { expectInFlightPods(t, q) // Construct a Pod, but don't associate its scheduler failure to any plugin hpp1 := clonePod(highPriorityPodInfo.Pod, "hpp1") - q.activeQ.unlocked().AddOrUpdate(q.newQueuedPodInfo(hpp1)) + q.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) { + unlockedActiveQ.AddOrUpdate(q.newQueuedPodInfo(hpp1)) + }) if p, err := q.Pop(logger); err != nil || p.Pod != hpp1 { t.Errorf("Expected: %v after Pop, but got: %v", hpp1, p.Pod.Name) } @@ -1641,7 +1655,9 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueue(t *testing.T) { expectInFlightPods(t, q) // Construct another Pod, and associate its scheduler failure to plugin "barPlugin". hpp2 := clonePod(highPriorityPodInfo.Pod, "hpp2") - q.activeQ.unlocked().AddOrUpdate(q.newQueuedPodInfo(hpp2)) + q.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) { + unlockedActiveQ.AddOrUpdate(q.newQueuedPodInfo(hpp2)) + }) if p, err := q.Pop(logger); err != nil || p.Pod != hpp2 { t.Errorf("Expected: %v after Pop, but got: %v", hpp2, p.Pod.Name) } @@ -1676,17 +1692,23 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueue(t *testing.T) { } expectInFlightPods(t, q, medPriorityPodInfo.Pod.UID) - q.activeQ.unlocked().AddOrUpdate(q.newQueuedPodInfo(unschedulablePodInfo.Pod)) + q.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) { + unlockedActiveQ.AddOrUpdate(q.newQueuedPodInfo(unschedulablePodInfo.Pod)) + }) if p, err := q.Pop(logger); err != nil || p.Pod != unschedulablePodInfo.Pod { t.Errorf("Expected: %v after Pop, but got: %v", unschedulablePodInfo.Pod.Name, p.Pod.Name) } expectInFlightPods(t, q, medPriorityPodInfo.Pod.UID, unschedulablePodInfo.Pod.UID) - q.activeQ.unlocked().AddOrUpdate(q.newQueuedPodInfo(highPriorityPodInfo.Pod)) + q.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) { + unlockedActiveQ.AddOrUpdate(q.newQueuedPodInfo(highPriorityPodInfo.Pod)) + }) if p, err := q.Pop(logger); err != nil || p.Pod != highPriorityPodInfo.Pod { t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPodInfo.Pod.Name, p.Pod.Name) } expectInFlightPods(t, q, medPriorityPodInfo.Pod.UID, unschedulablePodInfo.Pod.UID, highPriorityPodInfo.Pod.UID) - q.activeQ.unlocked().AddOrUpdate(q.newQueuedPodInfo(hpp1)) + q.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) { + unlockedActiveQ.AddOrUpdate(q.newQueuedPodInfo(hpp1)) + }) if p, err := q.Pop(logger); err != nil || p.Pod != hpp1 { t.Errorf("Expected: %v after Pop, but got: %v", hpp1, p.Pod.Name) } @@ -1946,7 +1968,9 @@ func TestPriorityQueue_AssignedPodAdded_(t *testing.T) { q := NewTestQueue(ctx, newDefaultQueueSort(), WithClock(c), WithQueueingHintMapPerProfile(m)) // To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before AddUnschedulableIfNotPresent()s below. - q.activeQ.unlocked().AddOrUpdate(q.newQueuedPodInfo(tt.unschedPod)) + q.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) { + unlockedActiveQ.AddOrUpdate(q.newQueuedPodInfo(tt.unschedPod)) + }) if p, err := q.Pop(logger); err != nil || p.Pod != tt.unschedPod { t.Errorf("Expected: %v after Pop, but got: %v", tt.unschedPod.Name, p.Pod.Name) } @@ -1961,7 +1985,7 @@ func TestPriorityQueue_AssignedPodAdded_(t *testing.T) { q.AssignedPodAdded(logger, tt.updatedAssignedPod) - if q.activeQ.unlocked().Has(newQueuedPodInfoForLookup(tt.unschedPod)) != tt.wantToRequeue { + if q.activeQ.has(newQueuedPodInfoForLookup(tt.unschedPod)) != tt.wantToRequeue { t.Fatalf("unexpected Pod move: Pod should be requeued: %v. Pod is actually requeued: %v", tt.wantToRequeue, !tt.wantToRequeue) } }) @@ -2061,11 +2085,15 @@ func TestPriorityQueue_PendingPods(t *testing.T) { defer cancel() q := NewTestQueue(ctx, newDefaultQueueSort()) // To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before AddUnschedulableIfNotPresent()s below. - q.activeQ.unlocked().AddOrUpdate(q.newQueuedPodInfo(unschedulablePodInfo.Pod)) + q.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) { + unlockedActiveQ.AddOrUpdate(q.newQueuedPodInfo(unschedulablePodInfo.Pod)) + }) if p, err := q.Pop(logger); err != nil || p.Pod != unschedulablePodInfo.Pod { t.Errorf("Expected: %v after Pop, but got: %v", unschedulablePodInfo.Pod.Name, p.Pod.Name) } - q.activeQ.unlocked().AddOrUpdate(q.newQueuedPodInfo(highPriorityPodInfo.Pod)) + q.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) { + unlockedActiveQ.AddOrUpdate(q.newQueuedPodInfo(highPriorityPodInfo.Pod)) + }) if p, err := q.Pop(logger); err != nil || p.Pod != highPriorityPodInfo.Pod { t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPodInfo.Pod.Name, p.Pod.Name) } @@ -2411,7 +2439,9 @@ func TestPodFailedSchedulingMultipleTimesDoesNotBlockNewerPod(t *testing.T) { }) // To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before AddUnschedulableIfNotPresent() below. - q.activeQ.unlocked().AddOrUpdate(q.newQueuedPodInfo(unschedulablePod)) + q.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) { + unlockedActiveQ.AddOrUpdate(q.newQueuedPodInfo(unschedulablePod)) + }) if p, err := q.Pop(logger); err != nil || p.Pod != unschedulablePod { t.Errorf("Expected: %v after Pop, but got: %v", unschedulablePod.Name, p.Pod.Name) } @@ -2549,11 +2579,15 @@ func TestHighPriorityFlushUnschedulablePodsLeftover(t *testing.T) { }) // To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before AddUnschedulableIfNotPresent()s below. - q.activeQ.unlocked().AddOrUpdate(q.newQueuedPodInfo(highPod)) + q.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) { + unlockedActiveQ.AddOrUpdate(q.newQueuedPodInfo(highPod)) + }) if p, err := q.Pop(logger); err != nil || p.Pod != highPod { t.Errorf("Expected: %v after Pop, but got: %v", highPod.Name, p.Pod.Name) } - q.activeQ.unlocked().AddOrUpdate(q.newQueuedPodInfo(midPod)) + q.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) { + unlockedActiveQ.AddOrUpdate(q.newQueuedPodInfo(midPod)) + }) if p, err := q.Pop(logger); err != nil || p.Pod != midPod { t.Errorf("Expected: %v after Pop, but got: %v", midPod.Name, p.Pod.Name) } @@ -2673,7 +2707,9 @@ var ( // To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before AddUnschedulableIfNotPresent() below. // UnschedulablePlugins will get cleared by Pop, so make a copy first. unschedulablePlugins := pInfo.UnschedulablePlugins.Clone() - queue.activeQ.unlocked().AddOrUpdate(queue.newQueuedPodInfo(pInfo.Pod)) + queue.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) { + unlockedActiveQ.AddOrUpdate(queue.newQueuedPodInfo(pInfo.Pod)) + }) p, err := queue.Pop(logger) if err != nil { t.Fatalf("Unexpected error during Pop: %v", err) @@ -2689,7 +2725,9 @@ var ( } popAndRequeueAsBackoff = func(t *testing.T, logger klog.Logger, queue *PriorityQueue, pInfo *framework.QueuedPodInfo) { // To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before AddUnschedulableIfNotPresent() below. - queue.activeQ.unlocked().AddOrUpdate(queue.newQueuedPodInfo(pInfo.Pod)) + queue.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) { + unlockedActiveQ.AddOrUpdate(queue.newQueuedPodInfo(pInfo.Pod)) + }) p, err := queue.Pop(logger) if err != nil { t.Fatalf("Unexpected error during Pop: %v", err) @@ -2703,7 +2741,9 @@ var ( } } addPodActiveQ = func(t *testing.T, logger klog.Logger, queue *PriorityQueue, pInfo *framework.QueuedPodInfo) { - queue.activeQ.unlocked().AddOrUpdate(pInfo) + queue.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) { + unlockedActiveQ.AddOrUpdate(pInfo) + }) } addPodUnschedulablePods = func(t *testing.T, logger klog.Logger, queue *PriorityQueue, pInfo *framework.QueuedPodInfo) { if !pInfo.Gated { @@ -3449,7 +3489,9 @@ func TestMoveAllToActiveOrBackoffQueue_PreEnqueueChecks(t *testing.T) { q := NewTestQueue(ctx, newDefaultQueueSort()) for i, podInfo := range tt.podInfos { // To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before AddUnschedulableIfNotPresent() below. - q.activeQ.unlocked().AddOrUpdate(q.newQueuedPodInfo(podInfo.Pod)) + q.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) { + unlockedActiveQ.AddOrUpdate(q.newQueuedPodInfo(podInfo.Pod)) + }) if p, err := q.Pop(logger); err != nil || p.Pod != podInfo.Pod { t.Errorf("Expected: %v after Pop, but got: %v", podInfo.Pod.Name, p.Pod.Name) } From 3eefd62f9433e5b4be9198a440ef40fac17f607c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maciej=20Skocze=C5=84?= Date: Tue, 20 Aug 2024 09:21:06 +0000 Subject: [PATCH 2/3] Make update and delete active queue methods --- pkg/scheduler/internal/queue/active_queue.go | 41 ++++++++++++++++--- .../internal/queue/scheduling_queue.go | 34 +++++---------- .../internal/queue/scheduling_queue_test.go | 2 +- 3 files changed, 46 insertions(+), 31 deletions(-) diff --git a/pkg/scheduler/internal/queue/active_queue.go b/pkg/scheduler/internal/queue/active_queue.go index 4eb19a9448f..223c7229cdd 100644 --- a/pkg/scheduler/internal/queue/active_queue.go +++ b/pkg/scheduler/internal/queue/active_queue.go @@ -36,8 +36,10 @@ import ( // and it is forbidden to call any other activeQueuer's method under this lock. type activeQueuer interface { underLock(func(unlockedActiveQ unlockedActiveQueuer)) - underRLock(func(unlockedActiveQ unlockedActiveQueuer)) + underRLock(func(unlockedActiveQ unlockedActiveQueueReader)) + update(newPod *v1.Pod, oldPodInfo *framework.QueuedPodInfo) *framework.QueuedPodInfo + delete(pInfo *framework.QueuedPodInfo) error pop(logger klog.Logger) (*framework.QueuedPodInfo, error) list() []*v1.Pod len() int @@ -56,12 +58,17 @@ type activeQueuer interface { } // unlockedActiveQueuer defines activeQ methods that are not protected by the lock itself. -// underLock() and underRLock() methods should be used to protect these methods. +// underLock() method should be used to protect these methods. type unlockedActiveQueuer interface { + unlockedActiveQueueReader + AddOrUpdate(pInfo *framework.QueuedPodInfo) +} + +// unlockedActiveQueueReader defines activeQ read-only methods that are not protected by the lock itself. +// underLock() or underRLock() method should be used to protect these methods. +type unlockedActiveQueueReader interface { Get(pInfo *framework.QueuedPodInfo) (*framework.QueuedPodInfo, bool) Has(pInfo *framework.QueuedPodInfo) bool - AddOrUpdate(pInfo *framework.QueuedPodInfo) - Delete(pInfo *framework.QueuedPodInfo) error } // activeQueue implements activeQueuer. All of the fields have to be protected using the lock. @@ -140,14 +147,36 @@ func (aq *activeQueue) underLock(fn func(unlockedActiveQ unlockedActiveQueuer)) } // underLock runs the fn function under the lock.RLock. -// fn can run unlockedActiveQueuer methods but should NOT run any other activeQueue method, +// fn can run unlockedActiveQueueReader methods but should NOT run any other activeQueue method, // as it would end up in deadlock. -func (aq *activeQueue) underRLock(fn func(unlockedActiveQ unlockedActiveQueuer)) { +func (aq *activeQueue) underRLock(fn func(unlockedActiveQ unlockedActiveQueueReader)) { aq.lock.RLock() defer aq.lock.RUnlock() fn(aq.queue) } +// update updates the pod in activeQ if oldPodInfo is already in the queue. +// It returns new pod info if updated, nil otherwise. +func (aq *activeQueue) update(newPod *v1.Pod, oldPodInfo *framework.QueuedPodInfo) *framework.QueuedPodInfo { + aq.lock.Lock() + defer aq.lock.Unlock() + + if pInfo, exists := aq.queue.Get(oldPodInfo); exists { + _ = pInfo.Update(newPod) + aq.queue.AddOrUpdate(pInfo) + return pInfo + } + return nil +} + +// delete deletes the pod info from activeQ. +func (aq *activeQueue) delete(pInfo *framework.QueuedPodInfo) error { + aq.lock.Lock() + defer aq.lock.Unlock() + + return aq.queue.Delete(pInfo) +} + // pop removes the head of the queue and returns it. // It blocks if the queue is empty and waits until a new item is added to the queue. // It increments scheduling cycle when a pod is popped. diff --git a/pkg/scheduler/internal/queue/scheduling_queue.go b/pkg/scheduler/internal/queue/scheduling_queue.go index ed0a4dc8577..25b1e993842 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue.go +++ b/pkg/scheduler/internal/queue/scheduling_queue.go @@ -860,19 +860,6 @@ func isPodUpdated(oldPod, newPod *v1.Pod) bool { return !reflect.DeepEqual(strip(oldPod), strip(newPod)) } -func (p *PriorityQueue) updateInActiveQueue(logger klog.Logger, oldPod, newPod *v1.Pod, oldPodInfo *framework.QueuedPodInfo) bool { - exists := false - p.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) { - if pInfo, exists := unlockedActiveQ.Get(oldPodInfo); exists { - _ = pInfo.Update(newPod) - p.UpdateNominatedPod(logger, oldPod, pInfo.PodInfo) - unlockedActiveQ.AddOrUpdate(pInfo) - exists = true - } - }) - return exists -} - // Update updates a pod in the active or backoff queue if present. Otherwise, it removes // the item from the unschedulable queue if pod is updated in a way that it may // become schedulable and adds the updated one to the active queue. @@ -896,7 +883,8 @@ func (p *PriorityQueue) Update(logger klog.Logger, oldPod, newPod *v1.Pod) { if oldPod != nil { oldPodInfo := newQueuedPodInfoForLookup(oldPod) // If the pod is already in the active queue, just update it there. - if exists := p.updateInActiveQueue(logger, oldPod, newPod, oldPodInfo); exists { + if pInfo := p.activeQ.update(newPod, oldPodInfo); pInfo != nil { + p.UpdateNominatedPod(logger, oldPod, pInfo.PodInfo) return } @@ -966,15 +954,13 @@ func (p *PriorityQueue) Delete(pod *v1.Pod) { defer p.lock.Unlock() p.DeleteNominatedPodIfExists(pod) pInfo := newQueuedPodInfoForLookup(pod) - p.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) { - if err := unlockedActiveQ.Delete(pInfo); err != nil { - // The item was probably not found in the activeQ. - p.podBackoffQ.Delete(pInfo) - if pInfo = p.unschedulablePods.get(pod); pInfo != nil { - p.unschedulablePods.delete(pod, pInfo.Gated) - } + if err := p.activeQ.delete(pInfo); err != nil { + // The item was probably not found in the activeQ. + p.podBackoffQ.Delete(pInfo) + if pInfo = p.unschedulablePods.get(pod); pInfo != nil { + p.unschedulablePods.delete(pod, pInfo.Gated) } - }) + } } // AssignedPodAdded is called when a bound pod is added. Creation of this pod @@ -1176,7 +1162,7 @@ func (p *PriorityQueue) PendingPods() ([]*v1.Pod, string) { } // Note: this function assumes the caller locks both p.lock.RLock and p.activeQ.getLock().RLock. -func (p *PriorityQueue) nominatedPodToInfo(np podRef, unlockedActiveQ unlockedActiveQueuer) *framework.PodInfo { +func (p *PriorityQueue) nominatedPodToInfo(np podRef, unlockedActiveQ unlockedActiveQueueReader) *framework.PodInfo { pod := np.toPod() pInfoLookup := newQueuedPodInfoForLookup(pod) @@ -1216,7 +1202,7 @@ func (p *PriorityQueue) NominatedPodsForNode(nodeName string) []*framework.PodIn nominatedPods := p.nominator.nominatedPodsForNode(nodeName) pods := make([]*framework.PodInfo, len(nominatedPods)) - p.activeQ.underRLock(func(unlockedActiveQ unlockedActiveQueuer) { + p.activeQ.underRLock(func(unlockedActiveQ unlockedActiveQueueReader) { for i, np := range nominatedPods { pods[i] = p.nominatedPodToInfo(np, unlockedActiveQ).DeepCopy() } diff --git a/pkg/scheduler/internal/queue/scheduling_queue_test.go b/pkg/scheduler/internal/queue/scheduling_queue_test.go index bdf4e037ee3..19b4ad1f5f7 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue_test.go +++ b/pkg/scheduler/internal/queue/scheduling_queue_test.go @@ -1104,7 +1104,7 @@ func TestPriorityQueue_Update(t *testing.T) { pInfo = pInfoFromBackoff } - q.activeQ.underRLock(func(unlockedActiveQ unlockedActiveQueuer) { + q.activeQ.underRLock(func(unlockedActiveQ unlockedActiveQueueReader) { if pInfoFromActive, exists := unlockedActiveQ.Get(newQueuedPodInfoForLookup(newPod)); exists { if tt.wantQ != activeQ { t.Errorf("expected pod %s not to be queued to activeQ, but it was", newPod.Name) From eabdc612ddef961243401b491f035393117cbdb5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maciej=20Skocze=C5=84?= Date: Wed, 21 Aug 2024 13:40:41 +0000 Subject: [PATCH 3/3] Use queue.Add instead of activeQ.AddOrUpdate in scheduling queuue tests --- .../internal/queue/scheduling_queue_test.go | 105 ++++++------------ 1 file changed, 37 insertions(+), 68 deletions(-) diff --git a/pkg/scheduler/internal/queue/scheduling_queue_test.go b/pkg/scheduler/internal/queue/scheduling_queue_test.go index 19b4ad1f5f7..566fe6aa5b0 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue_test.go +++ b/pkg/scheduler/internal/queue/scheduling_queue_test.go @@ -1064,11 +1064,8 @@ func TestPriorityQueue_Update(t *testing.T) { name: "when updating a pod which is in flightPods, the pod will not be added to any queue", wantQ: notInAnyQueue, prepareFunc: func(t *testing.T, logger klog.Logger, q *PriorityQueue) (oldPod, newPod *v1.Pod) { - podInfo := q.newQueuedPodInfo(medPriorityPodInfo.Pod) // We need to once add this Pod to activeQ and Pop() it so that this Pod is registered correctly in inFlightPods. - q.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) { - unlockedActiveQ.AddOrUpdate(podInfo) - }) + q.Add(logger, medPriorityPodInfo.Pod) if p, err := q.Pop(logger); err != nil || p.Pod != medPriorityPodInfo.Pod { t.Errorf("Expected: %v after Pop, but got: %v", medPriorityPodInfo.Pod.Name, p.Pod.Name) } @@ -1221,13 +1218,13 @@ func TestPriorityQueue_Activate(t *testing.T) { name string qPodInfoInUnschedulablePods []*framework.QueuedPodInfo qPodInfoInPodBackoffQ []*framework.QueuedPodInfo - qPodInfoInActiveQ []*framework.QueuedPodInfo + qPodInActiveQ []*v1.Pod qPodInfoToActivate *framework.QueuedPodInfo want []*framework.QueuedPodInfo }{ { name: "pod already in activeQ", - qPodInfoInActiveQ: []*framework.QueuedPodInfo{{PodInfo: highPriNominatedPodInfo}}, + qPodInActiveQ: []*v1.Pod{highPriNominatedPodInfo.Pod}, qPodInfoToActivate: &framework.QueuedPodInfo{PodInfo: highPriNominatedPodInfo}, want: []*framework.QueuedPodInfo{{PodInfo: highPriNominatedPodInfo}}, // 1 already active }, @@ -1259,10 +1256,8 @@ func TestPriorityQueue_Activate(t *testing.T) { q := NewTestQueueWithObjects(ctx, newDefaultQueueSort(), objs) // Prepare activeQ/unschedulablePods/podBackoffQ according to the table - for _, qPodInfo := range tt.qPodInfoInActiveQ { - q.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) { - unlockedActiveQ.AddOrUpdate(qPodInfo) - }) + for _, qPod := range tt.qPodInActiveQ { + q.Add(logger, qPod) } for _, qPodInfo := range tt.qPodInfoInUnschedulablePods { @@ -1569,9 +1564,7 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueueWithQueueingHint(t *testing. } cl := testingclock.NewFakeClock(now) q := NewTestQueue(ctx, newDefaultQueueSort(), WithQueueingHintMapPerProfile(m), WithClock(cl)) - q.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) { - unlockedActiveQ.AddOrUpdate(q.newQueuedPodInfo(test.podInfo.Pod)) - }) + q.Add(logger, test.podInfo.Pod) if p, err := q.Pop(logger); err != nil || p.Pod != test.podInfo.Pod { t.Errorf("Expected: %v after Pop, but got: %v", test.podInfo.Pod.Name, p.Pod.Name) } @@ -1615,16 +1608,12 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueue(t *testing.T) { } q := NewTestQueue(ctx, newDefaultQueueSort(), WithClock(c), WithQueueingHintMapPerProfile(m)) // To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before AddUnschedulableIfNotPresent()s below. - q.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) { - unlockedActiveQ.AddOrUpdate(q.newQueuedPodInfo(unschedulablePodInfo.Pod)) - }) + q.Add(logger, unschedulablePodInfo.Pod) if p, err := q.Pop(logger); err != nil || p.Pod != unschedulablePodInfo.Pod { t.Errorf("Expected: %v after Pop, but got: %v", unschedulablePodInfo.Pod.Name, p.Pod.Name) } expectInFlightPods(t, q, unschedulablePodInfo.Pod.UID) - q.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) { - unlockedActiveQ.AddOrUpdate(q.newQueuedPodInfo(highPriorityPodInfo.Pod)) - }) + q.Add(logger, highPriorityPodInfo.Pod) if p, err := q.Pop(logger); err != nil || p.Pod != highPriorityPodInfo.Pod { t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPodInfo.Pod.Name, p.Pod.Name) } @@ -1640,9 +1629,7 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueue(t *testing.T) { expectInFlightPods(t, q) // Construct a Pod, but don't associate its scheduler failure to any plugin hpp1 := clonePod(highPriorityPodInfo.Pod, "hpp1") - q.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) { - unlockedActiveQ.AddOrUpdate(q.newQueuedPodInfo(hpp1)) - }) + q.Add(logger, hpp1) if p, err := q.Pop(logger); err != nil || p.Pod != hpp1 { t.Errorf("Expected: %v after Pop, but got: %v", hpp1, p.Pod.Name) } @@ -1655,9 +1642,7 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueue(t *testing.T) { expectInFlightPods(t, q) // Construct another Pod, and associate its scheduler failure to plugin "barPlugin". hpp2 := clonePod(highPriorityPodInfo.Pod, "hpp2") - q.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) { - unlockedActiveQ.AddOrUpdate(q.newQueuedPodInfo(hpp2)) - }) + q.Add(logger, hpp2) if p, err := q.Pop(logger); err != nil || p.Pod != hpp2 { t.Errorf("Expected: %v after Pop, but got: %v", hpp2, p.Pod.Name) } @@ -1692,23 +1677,17 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueue(t *testing.T) { } expectInFlightPods(t, q, medPriorityPodInfo.Pod.UID) - q.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) { - unlockedActiveQ.AddOrUpdate(q.newQueuedPodInfo(unschedulablePodInfo.Pod)) - }) + q.Add(logger, unschedulablePodInfo.Pod) if p, err := q.Pop(logger); err != nil || p.Pod != unschedulablePodInfo.Pod { t.Errorf("Expected: %v after Pop, but got: %v", unschedulablePodInfo.Pod.Name, p.Pod.Name) } expectInFlightPods(t, q, medPriorityPodInfo.Pod.UID, unschedulablePodInfo.Pod.UID) - q.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) { - unlockedActiveQ.AddOrUpdate(q.newQueuedPodInfo(highPriorityPodInfo.Pod)) - }) + q.Add(logger, highPriorityPodInfo.Pod) if p, err := q.Pop(logger); err != nil || p.Pod != highPriorityPodInfo.Pod { t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPodInfo.Pod.Name, p.Pod.Name) } expectInFlightPods(t, q, medPriorityPodInfo.Pod.UID, unschedulablePodInfo.Pod.UID, highPriorityPodInfo.Pod.UID) - q.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) { - unlockedActiveQ.AddOrUpdate(q.newQueuedPodInfo(hpp1)) - }) + q.Add(logger, hpp1) if p, err := q.Pop(logger); err != nil || p.Pod != hpp1 { t.Errorf("Expected: %v after Pop, but got: %v", hpp1, p.Pod.Name) } @@ -1968,9 +1947,7 @@ func TestPriorityQueue_AssignedPodAdded_(t *testing.T) { q := NewTestQueue(ctx, newDefaultQueueSort(), WithClock(c), WithQueueingHintMapPerProfile(m)) // To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before AddUnschedulableIfNotPresent()s below. - q.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) { - unlockedActiveQ.AddOrUpdate(q.newQueuedPodInfo(tt.unschedPod)) - }) + q.Add(logger, tt.unschedPod) if p, err := q.Pop(logger); err != nil || p.Pod != tt.unschedPod { t.Errorf("Expected: %v after Pop, but got: %v", tt.unschedPod.Name, p.Pod.Name) } @@ -2085,15 +2062,11 @@ func TestPriorityQueue_PendingPods(t *testing.T) { defer cancel() q := NewTestQueue(ctx, newDefaultQueueSort()) // To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before AddUnschedulableIfNotPresent()s below. - q.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) { - unlockedActiveQ.AddOrUpdate(q.newQueuedPodInfo(unschedulablePodInfo.Pod)) - }) + q.Add(logger, unschedulablePodInfo.Pod) if p, err := q.Pop(logger); err != nil || p.Pod != unschedulablePodInfo.Pod { t.Errorf("Expected: %v after Pop, but got: %v", unschedulablePodInfo.Pod.Name, p.Pod.Name) } - q.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) { - unlockedActiveQ.AddOrUpdate(q.newQueuedPodInfo(highPriorityPodInfo.Pod)) - }) + q.Add(logger, highPriorityPodInfo.Pod) if p, err := q.Pop(logger); err != nil || p.Pod != highPriorityPodInfo.Pod { t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPodInfo.Pod.Name, p.Pod.Name) } @@ -2439,9 +2412,7 @@ func TestPodFailedSchedulingMultipleTimesDoesNotBlockNewerPod(t *testing.T) { }) // To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before AddUnschedulableIfNotPresent() below. - q.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) { - unlockedActiveQ.AddOrUpdate(q.newQueuedPodInfo(unschedulablePod)) - }) + q.Add(logger, unschedulablePod) if p, err := q.Pop(logger); err != nil || p.Pod != unschedulablePod { t.Errorf("Expected: %v after Pop, but got: %v", unschedulablePod.Name, p.Pod.Name) } @@ -2579,15 +2550,11 @@ func TestHighPriorityFlushUnschedulablePodsLeftover(t *testing.T) { }) // To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before AddUnschedulableIfNotPresent()s below. - q.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) { - unlockedActiveQ.AddOrUpdate(q.newQueuedPodInfo(highPod)) - }) + q.Add(logger, highPod) if p, err := q.Pop(logger); err != nil || p.Pod != highPod { t.Errorf("Expected: %v after Pop, but got: %v", highPod.Name, p.Pod.Name) } - q.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) { - unlockedActiveQ.AddOrUpdate(q.newQueuedPodInfo(midPod)) - }) + q.Add(logger, midPod) if p, err := q.Pop(logger); err != nil || p.Pod != midPod { t.Errorf("Expected: %v after Pop, but got: %v", midPod.Name, p.Pod.Name) } @@ -2707,9 +2674,7 @@ var ( // To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before AddUnschedulableIfNotPresent() below. // UnschedulablePlugins will get cleared by Pop, so make a copy first. unschedulablePlugins := pInfo.UnschedulablePlugins.Clone() - queue.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) { - unlockedActiveQ.AddOrUpdate(queue.newQueuedPodInfo(pInfo.Pod)) - }) + queue.Add(logger, pInfo.Pod) p, err := queue.Pop(logger) if err != nil { t.Fatalf("Unexpected error during Pop: %v", err) @@ -2725,9 +2690,7 @@ var ( } popAndRequeueAsBackoff = func(t *testing.T, logger klog.Logger, queue *PriorityQueue, pInfo *framework.QueuedPodInfo) { // To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before AddUnschedulableIfNotPresent() below. - queue.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) { - unlockedActiveQ.AddOrUpdate(queue.newQueuedPodInfo(pInfo.Pod)) - }) + queue.Add(logger, pInfo.Pod) p, err := queue.Pop(logger) if err != nil { t.Fatalf("Unexpected error during Pop: %v", err) @@ -2741,6 +2704,9 @@ var ( } } addPodActiveQ = func(t *testing.T, logger klog.Logger, queue *PriorityQueue, pInfo *framework.QueuedPodInfo) { + queue.Add(logger, pInfo.Pod) + } + addPodActiveQDirectly = func(t *testing.T, logger klog.Logger, queue *PriorityQueue, pInfo *framework.QueuedPodInfo) { queue.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) { unlockedActiveQ.AddOrUpdate(pInfo) }) @@ -2808,8 +2774,9 @@ func TestPodTimestamp(t *testing.T) { { name: "add two pod to activeQ and sort them by the timestamp", operations: []operation{ - addPodActiveQ, - addPodActiveQ, + // Need to add the pods directly to the activeQ to override the timestamps. + addPodActiveQDirectly, + addPodActiveQDirectly, }, operands: []*framework.QueuedPodInfo{pInfo2, pInfo1}, expected: []*framework.QueuedPodInfo{pInfo1, pInfo2}, @@ -2828,7 +2795,8 @@ func TestPodTimestamp(t *testing.T) { { name: "add one pod to BackoffQ and move it to activeQ", operations: []operation{ - addPodActiveQ, + // Need to add the pods directly to activeQ to override the timestamps. + addPodActiveQDirectly, addPodBackoffQ, flushBackoffQ, moveAllToActiveOrBackoffQ, @@ -3294,7 +3262,7 @@ func TestIncomingPodsMetrics(t *testing.T) { operations: []operation{ popAndRequeueAsUnschedulable, }, - want: ` + want: `scheduler_queue_incoming_pods_total{event="PodAdd",queue="active"} 3 scheduler_queue_incoming_pods_total{event="ScheduleAttemptFailure",queue="unschedulable"} 3 `, }, @@ -3304,7 +3272,8 @@ func TestIncomingPodsMetrics(t *testing.T) { popAndRequeueAsUnschedulable, moveAllToActiveOrBackoffQ, }, - want: ` scheduler_queue_incoming_pods_total{event="ScheduleAttemptFailure",queue="unschedulable"} 3 + want: `scheduler_queue_incoming_pods_total{event="PodAdd",queue="active"} 3 + scheduler_queue_incoming_pods_total{event="ScheduleAttemptFailure",queue="unschedulable"} 3 scheduler_queue_incoming_pods_total{event="UnschedulableTimeout",queue="backoff"} 3 `, }, @@ -3315,7 +3284,8 @@ func TestIncomingPodsMetrics(t *testing.T) { moveClockForward, moveAllToActiveOrBackoffQ, }, - want: ` scheduler_queue_incoming_pods_total{event="ScheduleAttemptFailure",queue="unschedulable"} 3 + want: `scheduler_queue_incoming_pods_total{event="PodAdd",queue="active"} 3 + scheduler_queue_incoming_pods_total{event="ScheduleAttemptFailure",queue="unschedulable"} 3 scheduler_queue_incoming_pods_total{event="UnschedulableTimeout",queue="active"} 3 `, }, @@ -3326,7 +3296,8 @@ func TestIncomingPodsMetrics(t *testing.T) { moveClockForward, flushBackoffQ, }, - want: ` scheduler_queue_incoming_pods_total{event="BackoffComplete",queue="active"} 3 + want: `scheduler_queue_incoming_pods_total{event="PodAdd",queue="active"} 3 + scheduler_queue_incoming_pods_total{event="BackoffComplete",queue="active"} 3 scheduler_queue_incoming_pods_total{event="ScheduleAttemptFailure",queue="backoff"} 3 `, }, @@ -3489,9 +3460,7 @@ func TestMoveAllToActiveOrBackoffQueue_PreEnqueueChecks(t *testing.T) { q := NewTestQueue(ctx, newDefaultQueueSort()) for i, podInfo := range tt.podInfos { // To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before AddUnschedulableIfNotPresent() below. - q.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) { - unlockedActiveQ.AddOrUpdate(q.newQueuedPodInfo(podInfo.Pod)) - }) + q.Add(logger, podInfo.Pod) if p, err := q.Pop(logger); err != nil || p.Pod != podInfo.Pod { t.Errorf("Expected: %v after Pop, but got: %v", podInfo.Pod.Name, p.Pod.Name) }