From 9773a39b28a30dae6b8b471205b6e746440f068c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maciej=20Skocze=C5=84?= Date: Tue, 20 Aug 2024 08:51:21 +0000 Subject: [PATCH] Don't expose lock outside activeQueue in scheduling queue --- pkg/scheduler/internal/queue/active_queue.go | 27 +++-- .../internal/queue/scheduling_queue.go | 100 +++++++++--------- .../internal/queue/scheduling_queue_test.go | 100 +++++++++++++----- 3 files changed, 139 insertions(+), 88 deletions(-) diff --git a/pkg/scheduler/internal/queue/active_queue.go b/pkg/scheduler/internal/queue/active_queue.go index 91149bf5752..4eb19a9448f 100644 --- a/pkg/scheduler/internal/queue/active_queue.go +++ b/pkg/scheduler/internal/queue/active_queue.go @@ -35,8 +35,8 @@ import ( // getLock() methods should be used only for unlocked() methods // and it is forbidden to call any other activeQueuer's method under this lock. type activeQueuer interface { - getLock() *sync.RWMutex - unlocked() unlockedActiveQueuer + underLock(func(unlockedActiveQ unlockedActiveQueuer)) + underRLock(func(unlockedActiveQ unlockedActiveQueuer)) pop(logger klog.Logger) (*framework.QueuedPodInfo, error) list() []*v1.Pod @@ -56,7 +56,7 @@ type activeQueuer interface { } // unlockedActiveQueuer defines activeQ methods that are not protected by the lock itself. -// getLock() methods should be used to protect these methods. +// underLock() and underRLock() methods should be used to protect these methods. type unlockedActiveQueuer interface { Get(pInfo *framework.QueuedPodInfo) (*framework.QueuedPodInfo, bool) Has(pInfo *framework.QueuedPodInfo) bool @@ -130,15 +130,22 @@ func newActiveQueue(queue *heap.Heap[*framework.QueuedPodInfo], isSchedulingQueu return aq } -// getLock returns lock of activeQueue. Its methods should be used only to protect the unlocked() methods. -func (aq *activeQueue) getLock() *sync.RWMutex { - return &aq.lock +// underLock runs the fn function under the lock.Lock. +// fn can run unlockedActiveQueuer methods but should NOT run any other activeQueue method, +// as it would end up in deadlock. +func (aq *activeQueue) underLock(fn func(unlockedActiveQ unlockedActiveQueuer)) { + aq.lock.Lock() + defer aq.lock.Unlock() + fn(aq.queue) } -// unlocked returns queue methods, that are not protected by the lock itself. -// getLock() methods should be used to protect queue methods. -func (aq *activeQueue) unlocked() unlockedActiveQueuer { - return aq.queue +// underLock runs the fn function under the lock.RLock. +// fn can run unlockedActiveQueuer methods but should NOT run any other activeQueue method, +// as it would end up in deadlock. +func (aq *activeQueue) underRLock(fn func(unlockedActiveQ unlockedActiveQueuer)) { + aq.lock.RLock() + defer aq.lock.RUnlock() + fn(aq.queue) } // pop removes the head of the queue and returns it. diff --git a/pkg/scheduler/internal/queue/scheduling_queue.go b/pkg/scheduler/internal/queue/scheduling_queue.go index b4fb251f742..ed0a4dc8577 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue.go +++ b/pkg/scheduler/internal/queue/scheduling_queue.go @@ -541,35 +541,36 @@ func (p *PriorityQueue) moveToActiveQ(logger klog.Logger, pInfo *framework.Queue gatedBefore := pInfo.Gated pInfo.Gated = !p.runPreEnqueuePlugins(context.Background(), pInfo) - p.activeQ.getLock().Lock() - defer p.activeQ.getLock().Unlock() - if pInfo.Gated { - // Add the Pod to unschedulablePods if it's not passing PreEnqueuePlugins. - if p.activeQ.unlocked().Has(pInfo) { - return false + added := false + p.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) { + if pInfo.Gated { + // Add the Pod to unschedulablePods if it's not passing PreEnqueuePlugins. + if unlockedActiveQ.Has(pInfo) { + return + } + if p.podBackoffQ.Has(pInfo) { + return + } + p.unschedulablePods.addOrUpdate(pInfo) + return } - if p.podBackoffQ.Has(pInfo) { - return false + if pInfo.InitialAttemptTimestamp == nil { + now := p.clock.Now() + pInfo.InitialAttemptTimestamp = &now } - p.unschedulablePods.addOrUpdate(pInfo) - return false - } - if pInfo.InitialAttemptTimestamp == nil { - now := p.clock.Now() - pInfo.InitialAttemptTimestamp = &now - } - p.activeQ.unlocked().AddOrUpdate(pInfo) + unlockedActiveQ.AddOrUpdate(pInfo) + added = true - p.unschedulablePods.delete(pInfo.Pod, gatedBefore) - _ = p.podBackoffQ.Delete(pInfo) // Don't need to react when pInfo is not found. - logger.V(5).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pInfo.Pod), "event", event, "queue", activeQ) - metrics.SchedulerQueueIncomingPods.WithLabelValues("active", event).Inc() - if event == framework.PodAdd || event == framework.PodUpdate { - p.AddNominatedPod(logger, pInfo.PodInfo, nil) - } - - return true + p.unschedulablePods.delete(pInfo.Pod, gatedBefore) + _ = p.podBackoffQ.Delete(pInfo) // Don't need to react when pInfo is not found. + logger.V(5).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pInfo.Pod), "event", event, "queue", activeQ) + metrics.SchedulerQueueIncomingPods.WithLabelValues("active", event).Inc() + if event == framework.PodAdd || event == framework.PodUpdate { + p.AddNominatedPod(logger, pInfo.PodInfo, nil) + } + }) + return added } // Add adds a pod to the active queue. It should be called only when a new pod @@ -860,15 +861,16 @@ func isPodUpdated(oldPod, newPod *v1.Pod) bool { } func (p *PriorityQueue) updateInActiveQueue(logger klog.Logger, oldPod, newPod *v1.Pod, oldPodInfo *framework.QueuedPodInfo) bool { - p.activeQ.getLock().Lock() - defer p.activeQ.getLock().Unlock() - if pInfo, exists := p.activeQ.unlocked().Get(oldPodInfo); exists { - _ = pInfo.Update(newPod) - p.UpdateNominatedPod(logger, oldPod, pInfo.PodInfo) - p.activeQ.unlocked().AddOrUpdate(pInfo) - return true - } - return false + exists := false + p.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) { + if pInfo, exists := unlockedActiveQ.Get(oldPodInfo); exists { + _ = pInfo.Update(newPod) + p.UpdateNominatedPod(logger, oldPod, pInfo.PodInfo) + unlockedActiveQ.AddOrUpdate(pInfo) + exists = true + } + }) + return exists } // Update updates a pod in the active or backoff queue if present. Otherwise, it removes @@ -964,15 +966,15 @@ func (p *PriorityQueue) Delete(pod *v1.Pod) { defer p.lock.Unlock() p.DeleteNominatedPodIfExists(pod) pInfo := newQueuedPodInfoForLookup(pod) - p.activeQ.getLock().Lock() - defer p.activeQ.getLock().Unlock() - if err := p.activeQ.unlocked().Delete(pInfo); err != nil { - // The item was probably not found in the activeQ. - p.podBackoffQ.Delete(pInfo) - if pInfo = p.unschedulablePods.get(pod); pInfo != nil { - p.unschedulablePods.delete(pod, pInfo.Gated) + p.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) { + if err := unlockedActiveQ.Delete(pInfo); err != nil { + // The item was probably not found in the activeQ. + p.podBackoffQ.Delete(pInfo) + if pInfo = p.unschedulablePods.get(pod); pInfo != nil { + p.unschedulablePods.delete(pod, pInfo.Gated) + } } - } + }) } // AssignedPodAdded is called when a bound pod is added. Creation of this pod @@ -1174,11 +1176,11 @@ func (p *PriorityQueue) PendingPods() ([]*v1.Pod, string) { } // Note: this function assumes the caller locks both p.lock.RLock and p.activeQ.getLock().RLock. -func (p *PriorityQueue) nominatedPodToInfo(np podRef) *framework.PodInfo { +func (p *PriorityQueue) nominatedPodToInfo(np podRef, unlockedActiveQ unlockedActiveQueuer) *framework.PodInfo { pod := np.toPod() pInfoLookup := newQueuedPodInfoForLookup(pod) - queuedPodInfo, exists := p.activeQ.unlocked().Get(pInfoLookup) + queuedPodInfo, exists := unlockedActiveQ.Get(pInfoLookup) if exists { return queuedPodInfo.PodInfo } @@ -1213,12 +1215,12 @@ func (p *PriorityQueue) NominatedPodsForNode(nodeName string) []*framework.PodIn defer p.lock.RUnlock() nominatedPods := p.nominator.nominatedPodsForNode(nodeName) - p.activeQ.getLock().RLock() - defer p.activeQ.getLock().RUnlock() pods := make([]*framework.PodInfo, len(nominatedPods)) - for i, np := range nominatedPods { - pods[i] = p.nominatedPodToInfo(np).DeepCopy() - } + p.activeQ.underRLock(func(unlockedActiveQ unlockedActiveQueuer) { + for i, np := range nominatedPods { + pods[i] = p.nominatedPodToInfo(np, unlockedActiveQ).DeepCopy() + } + }) return pods } diff --git a/pkg/scheduler/internal/queue/scheduling_queue_test.go b/pkg/scheduler/internal/queue/scheduling_queue_test.go index 9a6e4cc1e9b..bdf4e037ee3 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue_test.go +++ b/pkg/scheduler/internal/queue/scheduling_queue_test.go @@ -1066,7 +1066,9 @@ func TestPriorityQueue_Update(t *testing.T) { prepareFunc: func(t *testing.T, logger klog.Logger, q *PriorityQueue) (oldPod, newPod *v1.Pod) { podInfo := q.newQueuedPodInfo(medPriorityPodInfo.Pod) // We need to once add this Pod to activeQ and Pop() it so that this Pod is registered correctly in inFlightPods. - q.activeQ.unlocked().AddOrUpdate(podInfo) + q.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) { + unlockedActiveQ.AddOrUpdate(podInfo) + }) if p, err := q.Pop(logger); err != nil || p.Pod != medPriorityPodInfo.Pod { t.Errorf("Expected: %v after Pop, but got: %v", medPriorityPodInfo.Pod.Name, p.Pod.Name) } @@ -1102,12 +1104,14 @@ func TestPriorityQueue_Update(t *testing.T) { pInfo = pInfoFromBackoff } - if pInfoFromActive, exists := q.activeQ.unlocked().Get(newQueuedPodInfoForLookup(newPod)); exists { - if tt.wantQ != activeQ { - t.Errorf("expected pod %s not to be queued to activeQ, but it was", newPod.Name) + q.activeQ.underRLock(func(unlockedActiveQ unlockedActiveQueuer) { + if pInfoFromActive, exists := unlockedActiveQ.Get(newQueuedPodInfoForLookup(newPod)); exists { + if tt.wantQ != activeQ { + t.Errorf("expected pod %s not to be queued to activeQ, but it was", newPod.Name) + } + pInfo = pInfoFromActive } - pInfo = pInfoFromActive - } + }) if pInfoFromUnsched := q.unschedulablePods.get(newPod); pInfoFromUnsched != nil { if tt.wantQ != unschedulablePods { @@ -1197,10 +1201,10 @@ func TestPriorityQueue_Delete(t *testing.T) { q.Update(logger, highPriorityPodInfo.Pod, highPriNominatedPodInfo.Pod) q.Add(logger, unschedulablePodInfo.Pod) q.Delete(highPriNominatedPodInfo.Pod) - if !q.activeQ.unlocked().Has(newQueuedPodInfoForLookup(unschedulablePodInfo.Pod)) { + if !q.activeQ.has(newQueuedPodInfoForLookup(unschedulablePodInfo.Pod)) { t.Errorf("Expected %v to be in activeQ.", unschedulablePodInfo.Pod.Name) } - if q.activeQ.unlocked().Has(newQueuedPodInfoForLookup(highPriNominatedPodInfo.Pod)) { + if q.activeQ.has(newQueuedPodInfoForLookup(highPriNominatedPodInfo.Pod)) { t.Errorf("Didn't expect %v to be in activeQ.", highPriorityPodInfo.Pod.Name) } if len(q.nominator.nominatedPods) != 1 { @@ -1256,7 +1260,9 @@ func TestPriorityQueue_Activate(t *testing.T) { // Prepare activeQ/unschedulablePods/podBackoffQ according to the table for _, qPodInfo := range tt.qPodInfoInActiveQ { - q.activeQ.unlocked().AddOrUpdate(qPodInfo) + q.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) { + unlockedActiveQ.AddOrUpdate(qPodInfo) + }) } for _, qPodInfo := range tt.qPodInfoInUnschedulablePods { @@ -1277,7 +1283,7 @@ func TestPriorityQueue_Activate(t *testing.T) { // Check if the specific pod exists in activeQ for _, want := range tt.want { - if !q.activeQ.unlocked().Has(newQueuedPodInfoForLookup(want.PodInfo.Pod)) { + if !q.activeQ.has(newQueuedPodInfoForLookup(want.PodInfo.Pod)) { t.Errorf("podInfo not exist in activeQ: want %v", want.PodInfo.Pod.Name) } } @@ -1563,7 +1569,9 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueueWithQueueingHint(t *testing. } cl := testingclock.NewFakeClock(now) q := NewTestQueue(ctx, newDefaultQueueSort(), WithQueueingHintMapPerProfile(m), WithClock(cl)) - q.activeQ.unlocked().AddOrUpdate(q.newQueuedPodInfo(test.podInfo.Pod)) + q.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) { + unlockedActiveQ.AddOrUpdate(q.newQueuedPodInfo(test.podInfo.Pod)) + }) if p, err := q.Pop(logger); err != nil || p.Pod != test.podInfo.Pod { t.Errorf("Expected: %v after Pop, but got: %v", test.podInfo.Pod.Name, p.Pod.Name) } @@ -1607,12 +1615,16 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueue(t *testing.T) { } q := NewTestQueue(ctx, newDefaultQueueSort(), WithClock(c), WithQueueingHintMapPerProfile(m)) // To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before AddUnschedulableIfNotPresent()s below. - q.activeQ.unlocked().AddOrUpdate(q.newQueuedPodInfo(unschedulablePodInfo.Pod)) + q.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) { + unlockedActiveQ.AddOrUpdate(q.newQueuedPodInfo(unschedulablePodInfo.Pod)) + }) if p, err := q.Pop(logger); err != nil || p.Pod != unschedulablePodInfo.Pod { t.Errorf("Expected: %v after Pop, but got: %v", unschedulablePodInfo.Pod.Name, p.Pod.Name) } expectInFlightPods(t, q, unschedulablePodInfo.Pod.UID) - q.activeQ.unlocked().AddOrUpdate(q.newQueuedPodInfo(highPriorityPodInfo.Pod)) + q.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) { + unlockedActiveQ.AddOrUpdate(q.newQueuedPodInfo(highPriorityPodInfo.Pod)) + }) if p, err := q.Pop(logger); err != nil || p.Pod != highPriorityPodInfo.Pod { t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPodInfo.Pod.Name, p.Pod.Name) } @@ -1628,7 +1640,9 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueue(t *testing.T) { expectInFlightPods(t, q) // Construct a Pod, but don't associate its scheduler failure to any plugin hpp1 := clonePod(highPriorityPodInfo.Pod, "hpp1") - q.activeQ.unlocked().AddOrUpdate(q.newQueuedPodInfo(hpp1)) + q.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) { + unlockedActiveQ.AddOrUpdate(q.newQueuedPodInfo(hpp1)) + }) if p, err := q.Pop(logger); err != nil || p.Pod != hpp1 { t.Errorf("Expected: %v after Pop, but got: %v", hpp1, p.Pod.Name) } @@ -1641,7 +1655,9 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueue(t *testing.T) { expectInFlightPods(t, q) // Construct another Pod, and associate its scheduler failure to plugin "barPlugin". hpp2 := clonePod(highPriorityPodInfo.Pod, "hpp2") - q.activeQ.unlocked().AddOrUpdate(q.newQueuedPodInfo(hpp2)) + q.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) { + unlockedActiveQ.AddOrUpdate(q.newQueuedPodInfo(hpp2)) + }) if p, err := q.Pop(logger); err != nil || p.Pod != hpp2 { t.Errorf("Expected: %v after Pop, but got: %v", hpp2, p.Pod.Name) } @@ -1676,17 +1692,23 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueue(t *testing.T) { } expectInFlightPods(t, q, medPriorityPodInfo.Pod.UID) - q.activeQ.unlocked().AddOrUpdate(q.newQueuedPodInfo(unschedulablePodInfo.Pod)) + q.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) { + unlockedActiveQ.AddOrUpdate(q.newQueuedPodInfo(unschedulablePodInfo.Pod)) + }) if p, err := q.Pop(logger); err != nil || p.Pod != unschedulablePodInfo.Pod { t.Errorf("Expected: %v after Pop, but got: %v", unschedulablePodInfo.Pod.Name, p.Pod.Name) } expectInFlightPods(t, q, medPriorityPodInfo.Pod.UID, unschedulablePodInfo.Pod.UID) - q.activeQ.unlocked().AddOrUpdate(q.newQueuedPodInfo(highPriorityPodInfo.Pod)) + q.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) { + unlockedActiveQ.AddOrUpdate(q.newQueuedPodInfo(highPriorityPodInfo.Pod)) + }) if p, err := q.Pop(logger); err != nil || p.Pod != highPriorityPodInfo.Pod { t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPodInfo.Pod.Name, p.Pod.Name) } expectInFlightPods(t, q, medPriorityPodInfo.Pod.UID, unschedulablePodInfo.Pod.UID, highPriorityPodInfo.Pod.UID) - q.activeQ.unlocked().AddOrUpdate(q.newQueuedPodInfo(hpp1)) + q.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) { + unlockedActiveQ.AddOrUpdate(q.newQueuedPodInfo(hpp1)) + }) if p, err := q.Pop(logger); err != nil || p.Pod != hpp1 { t.Errorf("Expected: %v after Pop, but got: %v", hpp1, p.Pod.Name) } @@ -1946,7 +1968,9 @@ func TestPriorityQueue_AssignedPodAdded_(t *testing.T) { q := NewTestQueue(ctx, newDefaultQueueSort(), WithClock(c), WithQueueingHintMapPerProfile(m)) // To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before AddUnschedulableIfNotPresent()s below. - q.activeQ.unlocked().AddOrUpdate(q.newQueuedPodInfo(tt.unschedPod)) + q.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) { + unlockedActiveQ.AddOrUpdate(q.newQueuedPodInfo(tt.unschedPod)) + }) if p, err := q.Pop(logger); err != nil || p.Pod != tt.unschedPod { t.Errorf("Expected: %v after Pop, but got: %v", tt.unschedPod.Name, p.Pod.Name) } @@ -1961,7 +1985,7 @@ func TestPriorityQueue_AssignedPodAdded_(t *testing.T) { q.AssignedPodAdded(logger, tt.updatedAssignedPod) - if q.activeQ.unlocked().Has(newQueuedPodInfoForLookup(tt.unschedPod)) != tt.wantToRequeue { + if q.activeQ.has(newQueuedPodInfoForLookup(tt.unschedPod)) != tt.wantToRequeue { t.Fatalf("unexpected Pod move: Pod should be requeued: %v. Pod is actually requeued: %v", tt.wantToRequeue, !tt.wantToRequeue) } }) @@ -2061,11 +2085,15 @@ func TestPriorityQueue_PendingPods(t *testing.T) { defer cancel() q := NewTestQueue(ctx, newDefaultQueueSort()) // To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before AddUnschedulableIfNotPresent()s below. - q.activeQ.unlocked().AddOrUpdate(q.newQueuedPodInfo(unschedulablePodInfo.Pod)) + q.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) { + unlockedActiveQ.AddOrUpdate(q.newQueuedPodInfo(unschedulablePodInfo.Pod)) + }) if p, err := q.Pop(logger); err != nil || p.Pod != unschedulablePodInfo.Pod { t.Errorf("Expected: %v after Pop, but got: %v", unschedulablePodInfo.Pod.Name, p.Pod.Name) } - q.activeQ.unlocked().AddOrUpdate(q.newQueuedPodInfo(highPriorityPodInfo.Pod)) + q.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) { + unlockedActiveQ.AddOrUpdate(q.newQueuedPodInfo(highPriorityPodInfo.Pod)) + }) if p, err := q.Pop(logger); err != nil || p.Pod != highPriorityPodInfo.Pod { t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPodInfo.Pod.Name, p.Pod.Name) } @@ -2411,7 +2439,9 @@ func TestPodFailedSchedulingMultipleTimesDoesNotBlockNewerPod(t *testing.T) { }) // To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before AddUnschedulableIfNotPresent() below. - q.activeQ.unlocked().AddOrUpdate(q.newQueuedPodInfo(unschedulablePod)) + q.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) { + unlockedActiveQ.AddOrUpdate(q.newQueuedPodInfo(unschedulablePod)) + }) if p, err := q.Pop(logger); err != nil || p.Pod != unschedulablePod { t.Errorf("Expected: %v after Pop, but got: %v", unschedulablePod.Name, p.Pod.Name) } @@ -2549,11 +2579,15 @@ func TestHighPriorityFlushUnschedulablePodsLeftover(t *testing.T) { }) // To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before AddUnschedulableIfNotPresent()s below. - q.activeQ.unlocked().AddOrUpdate(q.newQueuedPodInfo(highPod)) + q.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) { + unlockedActiveQ.AddOrUpdate(q.newQueuedPodInfo(highPod)) + }) if p, err := q.Pop(logger); err != nil || p.Pod != highPod { t.Errorf("Expected: %v after Pop, but got: %v", highPod.Name, p.Pod.Name) } - q.activeQ.unlocked().AddOrUpdate(q.newQueuedPodInfo(midPod)) + q.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) { + unlockedActiveQ.AddOrUpdate(q.newQueuedPodInfo(midPod)) + }) if p, err := q.Pop(logger); err != nil || p.Pod != midPod { t.Errorf("Expected: %v after Pop, but got: %v", midPod.Name, p.Pod.Name) } @@ -2673,7 +2707,9 @@ var ( // To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before AddUnschedulableIfNotPresent() below. // UnschedulablePlugins will get cleared by Pop, so make a copy first. unschedulablePlugins := pInfo.UnschedulablePlugins.Clone() - queue.activeQ.unlocked().AddOrUpdate(queue.newQueuedPodInfo(pInfo.Pod)) + queue.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) { + unlockedActiveQ.AddOrUpdate(queue.newQueuedPodInfo(pInfo.Pod)) + }) p, err := queue.Pop(logger) if err != nil { t.Fatalf("Unexpected error during Pop: %v", err) @@ -2689,7 +2725,9 @@ var ( } popAndRequeueAsBackoff = func(t *testing.T, logger klog.Logger, queue *PriorityQueue, pInfo *framework.QueuedPodInfo) { // To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before AddUnschedulableIfNotPresent() below. - queue.activeQ.unlocked().AddOrUpdate(queue.newQueuedPodInfo(pInfo.Pod)) + queue.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) { + unlockedActiveQ.AddOrUpdate(queue.newQueuedPodInfo(pInfo.Pod)) + }) p, err := queue.Pop(logger) if err != nil { t.Fatalf("Unexpected error during Pop: %v", err) @@ -2703,7 +2741,9 @@ var ( } } addPodActiveQ = func(t *testing.T, logger klog.Logger, queue *PriorityQueue, pInfo *framework.QueuedPodInfo) { - queue.activeQ.unlocked().AddOrUpdate(pInfo) + queue.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) { + unlockedActiveQ.AddOrUpdate(pInfo) + }) } addPodUnschedulablePods = func(t *testing.T, logger klog.Logger, queue *PriorityQueue, pInfo *framework.QueuedPodInfo) { if !pInfo.Gated { @@ -3449,7 +3489,9 @@ func TestMoveAllToActiveOrBackoffQueue_PreEnqueueChecks(t *testing.T) { q := NewTestQueue(ctx, newDefaultQueueSort()) for i, podInfo := range tt.podInfos { // To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before AddUnschedulableIfNotPresent() below. - q.activeQ.unlocked().AddOrUpdate(q.newQueuedPodInfo(podInfo.Pod)) + q.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) { + unlockedActiveQ.AddOrUpdate(q.newQueuedPodInfo(podInfo.Pod)) + }) if p, err := q.Pop(logger); err != nil || p.Pod != podInfo.Pod { t.Errorf("Expected: %v after Pop, but got: %v", podInfo.Pod.Name, p.Pod.Name) }