Don't expose lock outside activeQueue in scheduling queue

This commit is contained in:
Maciej Skoczeń 2024-08-20 08:51:21 +00:00
parent b3c725627b
commit 9773a39b28
3 changed files with 139 additions and 88 deletions

View File

@ -35,8 +35,8 @@ import (
// getLock() methods should be used only for unlocked() methods // getLock() methods should be used only for unlocked() methods
// and it is forbidden to call any other activeQueuer's method under this lock. // and it is forbidden to call any other activeQueuer's method under this lock.
type activeQueuer interface { type activeQueuer interface {
getLock() *sync.RWMutex underLock(func(unlockedActiveQ unlockedActiveQueuer))
unlocked() unlockedActiveQueuer underRLock(func(unlockedActiveQ unlockedActiveQueuer))
pop(logger klog.Logger) (*framework.QueuedPodInfo, error) pop(logger klog.Logger) (*framework.QueuedPodInfo, error)
list() []*v1.Pod list() []*v1.Pod
@ -56,7 +56,7 @@ type activeQueuer interface {
} }
// unlockedActiveQueuer defines activeQ methods that are not protected by the lock itself. // unlockedActiveQueuer defines activeQ methods that are not protected by the lock itself.
// getLock() methods should be used to protect these methods. // underLock() and underRLock() methods should be used to protect these methods.
type unlockedActiveQueuer interface { type unlockedActiveQueuer interface {
Get(pInfo *framework.QueuedPodInfo) (*framework.QueuedPodInfo, bool) Get(pInfo *framework.QueuedPodInfo) (*framework.QueuedPodInfo, bool)
Has(pInfo *framework.QueuedPodInfo) bool Has(pInfo *framework.QueuedPodInfo) bool
@ -130,15 +130,22 @@ func newActiveQueue(queue *heap.Heap[*framework.QueuedPodInfo], isSchedulingQueu
return aq return aq
} }
// getLock returns lock of activeQueue. Its methods should be used only to protect the unlocked() methods. // underLock runs the fn function under the lock.Lock.
func (aq *activeQueue) getLock() *sync.RWMutex { // fn can run unlockedActiveQueuer methods but should NOT run any other activeQueue method,
return &aq.lock // as it would end up in deadlock.
func (aq *activeQueue) underLock(fn func(unlockedActiveQ unlockedActiveQueuer)) {
aq.lock.Lock()
defer aq.lock.Unlock()
fn(aq.queue)
} }
// unlocked returns queue methods, that are not protected by the lock itself. // underLock runs the fn function under the lock.RLock.
// getLock() methods should be used to protect queue methods. // fn can run unlockedActiveQueuer methods but should NOT run any other activeQueue method,
func (aq *activeQueue) unlocked() unlockedActiveQueuer { // as it would end up in deadlock.
return aq.queue func (aq *activeQueue) underRLock(fn func(unlockedActiveQ unlockedActiveQueuer)) {
aq.lock.RLock()
defer aq.lock.RUnlock()
fn(aq.queue)
} }
// pop removes the head of the queue and returns it. // pop removes the head of the queue and returns it.

View File

@ -541,35 +541,36 @@ func (p *PriorityQueue) moveToActiveQ(logger klog.Logger, pInfo *framework.Queue
gatedBefore := pInfo.Gated gatedBefore := pInfo.Gated
pInfo.Gated = !p.runPreEnqueuePlugins(context.Background(), pInfo) pInfo.Gated = !p.runPreEnqueuePlugins(context.Background(), pInfo)
p.activeQ.getLock().Lock() added := false
defer p.activeQ.getLock().Unlock() p.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) {
if pInfo.Gated { if pInfo.Gated {
// Add the Pod to unschedulablePods if it's not passing PreEnqueuePlugins. // Add the Pod to unschedulablePods if it's not passing PreEnqueuePlugins.
if p.activeQ.unlocked().Has(pInfo) { if unlockedActiveQ.Has(pInfo) {
return false return
}
if p.podBackoffQ.Has(pInfo) {
return
}
p.unschedulablePods.addOrUpdate(pInfo)
return
} }
if p.podBackoffQ.Has(pInfo) { if pInfo.InitialAttemptTimestamp == nil {
return false now := p.clock.Now()
pInfo.InitialAttemptTimestamp = &now
} }
p.unschedulablePods.addOrUpdate(pInfo)
return false
}
if pInfo.InitialAttemptTimestamp == nil {
now := p.clock.Now()
pInfo.InitialAttemptTimestamp = &now
}
p.activeQ.unlocked().AddOrUpdate(pInfo) unlockedActiveQ.AddOrUpdate(pInfo)
added = true
p.unschedulablePods.delete(pInfo.Pod, gatedBefore) p.unschedulablePods.delete(pInfo.Pod, gatedBefore)
_ = p.podBackoffQ.Delete(pInfo) // Don't need to react when pInfo is not found. _ = p.podBackoffQ.Delete(pInfo) // Don't need to react when pInfo is not found.
logger.V(5).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pInfo.Pod), "event", event, "queue", activeQ) logger.V(5).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pInfo.Pod), "event", event, "queue", activeQ)
metrics.SchedulerQueueIncomingPods.WithLabelValues("active", event).Inc() metrics.SchedulerQueueIncomingPods.WithLabelValues("active", event).Inc()
if event == framework.PodAdd || event == framework.PodUpdate { if event == framework.PodAdd || event == framework.PodUpdate {
p.AddNominatedPod(logger, pInfo.PodInfo, nil) p.AddNominatedPod(logger, pInfo.PodInfo, nil)
} }
})
return true return added
} }
// Add adds a pod to the active queue. It should be called only when a new pod // Add adds a pod to the active queue. It should be called only when a new pod
@ -860,15 +861,16 @@ func isPodUpdated(oldPod, newPod *v1.Pod) bool {
} }
func (p *PriorityQueue) updateInActiveQueue(logger klog.Logger, oldPod, newPod *v1.Pod, oldPodInfo *framework.QueuedPodInfo) bool { func (p *PriorityQueue) updateInActiveQueue(logger klog.Logger, oldPod, newPod *v1.Pod, oldPodInfo *framework.QueuedPodInfo) bool {
p.activeQ.getLock().Lock() exists := false
defer p.activeQ.getLock().Unlock() p.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) {
if pInfo, exists := p.activeQ.unlocked().Get(oldPodInfo); exists { if pInfo, exists := unlockedActiveQ.Get(oldPodInfo); exists {
_ = pInfo.Update(newPod) _ = pInfo.Update(newPod)
p.UpdateNominatedPod(logger, oldPod, pInfo.PodInfo) p.UpdateNominatedPod(logger, oldPod, pInfo.PodInfo)
p.activeQ.unlocked().AddOrUpdate(pInfo) unlockedActiveQ.AddOrUpdate(pInfo)
return true exists = true
} }
return false })
return exists
} }
// Update updates a pod in the active or backoff queue if present. Otherwise, it removes // Update updates a pod in the active or backoff queue if present. Otherwise, it removes
@ -964,15 +966,15 @@ func (p *PriorityQueue) Delete(pod *v1.Pod) {
defer p.lock.Unlock() defer p.lock.Unlock()
p.DeleteNominatedPodIfExists(pod) p.DeleteNominatedPodIfExists(pod)
pInfo := newQueuedPodInfoForLookup(pod) pInfo := newQueuedPodInfoForLookup(pod)
p.activeQ.getLock().Lock() p.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) {
defer p.activeQ.getLock().Unlock() if err := unlockedActiveQ.Delete(pInfo); err != nil {
if err := p.activeQ.unlocked().Delete(pInfo); err != nil { // The item was probably not found in the activeQ.
// The item was probably not found in the activeQ. p.podBackoffQ.Delete(pInfo)
p.podBackoffQ.Delete(pInfo) if pInfo = p.unschedulablePods.get(pod); pInfo != nil {
if pInfo = p.unschedulablePods.get(pod); pInfo != nil { p.unschedulablePods.delete(pod, pInfo.Gated)
p.unschedulablePods.delete(pod, pInfo.Gated) }
} }
} })
} }
// AssignedPodAdded is called when a bound pod is added. Creation of this pod // AssignedPodAdded is called when a bound pod is added. Creation of this pod
@ -1174,11 +1176,11 @@ func (p *PriorityQueue) PendingPods() ([]*v1.Pod, string) {
} }
// Note: this function assumes the caller locks both p.lock.RLock and p.activeQ.getLock().RLock. // Note: this function assumes the caller locks both p.lock.RLock and p.activeQ.getLock().RLock.
func (p *PriorityQueue) nominatedPodToInfo(np podRef) *framework.PodInfo { func (p *PriorityQueue) nominatedPodToInfo(np podRef, unlockedActiveQ unlockedActiveQueuer) *framework.PodInfo {
pod := np.toPod() pod := np.toPod()
pInfoLookup := newQueuedPodInfoForLookup(pod) pInfoLookup := newQueuedPodInfoForLookup(pod)
queuedPodInfo, exists := p.activeQ.unlocked().Get(pInfoLookup) queuedPodInfo, exists := unlockedActiveQ.Get(pInfoLookup)
if exists { if exists {
return queuedPodInfo.PodInfo return queuedPodInfo.PodInfo
} }
@ -1213,12 +1215,12 @@ func (p *PriorityQueue) NominatedPodsForNode(nodeName string) []*framework.PodIn
defer p.lock.RUnlock() defer p.lock.RUnlock()
nominatedPods := p.nominator.nominatedPodsForNode(nodeName) nominatedPods := p.nominator.nominatedPodsForNode(nodeName)
p.activeQ.getLock().RLock()
defer p.activeQ.getLock().RUnlock()
pods := make([]*framework.PodInfo, len(nominatedPods)) pods := make([]*framework.PodInfo, len(nominatedPods))
for i, np := range nominatedPods { p.activeQ.underRLock(func(unlockedActiveQ unlockedActiveQueuer) {
pods[i] = p.nominatedPodToInfo(np).DeepCopy() for i, np := range nominatedPods {
} pods[i] = p.nominatedPodToInfo(np, unlockedActiveQ).DeepCopy()
}
})
return pods return pods
} }

View File

@ -1066,7 +1066,9 @@ func TestPriorityQueue_Update(t *testing.T) {
prepareFunc: func(t *testing.T, logger klog.Logger, q *PriorityQueue) (oldPod, newPod *v1.Pod) { prepareFunc: func(t *testing.T, logger klog.Logger, q *PriorityQueue) (oldPod, newPod *v1.Pod) {
podInfo := q.newQueuedPodInfo(medPriorityPodInfo.Pod) podInfo := q.newQueuedPodInfo(medPriorityPodInfo.Pod)
// We need to once add this Pod to activeQ and Pop() it so that this Pod is registered correctly in inFlightPods. // We need to once add this Pod to activeQ and Pop() it so that this Pod is registered correctly in inFlightPods.
q.activeQ.unlocked().AddOrUpdate(podInfo) q.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) {
unlockedActiveQ.AddOrUpdate(podInfo)
})
if p, err := q.Pop(logger); err != nil || p.Pod != medPriorityPodInfo.Pod { if p, err := q.Pop(logger); err != nil || p.Pod != medPriorityPodInfo.Pod {
t.Errorf("Expected: %v after Pop, but got: %v", medPriorityPodInfo.Pod.Name, p.Pod.Name) t.Errorf("Expected: %v after Pop, but got: %v", medPriorityPodInfo.Pod.Name, p.Pod.Name)
} }
@ -1102,12 +1104,14 @@ func TestPriorityQueue_Update(t *testing.T) {
pInfo = pInfoFromBackoff pInfo = pInfoFromBackoff
} }
if pInfoFromActive, exists := q.activeQ.unlocked().Get(newQueuedPodInfoForLookup(newPod)); exists { q.activeQ.underRLock(func(unlockedActiveQ unlockedActiveQueuer) {
if tt.wantQ != activeQ { if pInfoFromActive, exists := unlockedActiveQ.Get(newQueuedPodInfoForLookup(newPod)); exists {
t.Errorf("expected pod %s not to be queued to activeQ, but it was", newPod.Name) if tt.wantQ != activeQ {
t.Errorf("expected pod %s not to be queued to activeQ, but it was", newPod.Name)
}
pInfo = pInfoFromActive
} }
pInfo = pInfoFromActive })
}
if pInfoFromUnsched := q.unschedulablePods.get(newPod); pInfoFromUnsched != nil { if pInfoFromUnsched := q.unschedulablePods.get(newPod); pInfoFromUnsched != nil {
if tt.wantQ != unschedulablePods { if tt.wantQ != unschedulablePods {
@ -1197,10 +1201,10 @@ func TestPriorityQueue_Delete(t *testing.T) {
q.Update(logger, highPriorityPodInfo.Pod, highPriNominatedPodInfo.Pod) q.Update(logger, highPriorityPodInfo.Pod, highPriNominatedPodInfo.Pod)
q.Add(logger, unschedulablePodInfo.Pod) q.Add(logger, unschedulablePodInfo.Pod)
q.Delete(highPriNominatedPodInfo.Pod) q.Delete(highPriNominatedPodInfo.Pod)
if !q.activeQ.unlocked().Has(newQueuedPodInfoForLookup(unschedulablePodInfo.Pod)) { if !q.activeQ.has(newQueuedPodInfoForLookup(unschedulablePodInfo.Pod)) {
t.Errorf("Expected %v to be in activeQ.", unschedulablePodInfo.Pod.Name) t.Errorf("Expected %v to be in activeQ.", unschedulablePodInfo.Pod.Name)
} }
if q.activeQ.unlocked().Has(newQueuedPodInfoForLookup(highPriNominatedPodInfo.Pod)) { if q.activeQ.has(newQueuedPodInfoForLookup(highPriNominatedPodInfo.Pod)) {
t.Errorf("Didn't expect %v to be in activeQ.", highPriorityPodInfo.Pod.Name) t.Errorf("Didn't expect %v to be in activeQ.", highPriorityPodInfo.Pod.Name)
} }
if len(q.nominator.nominatedPods) != 1 { if len(q.nominator.nominatedPods) != 1 {
@ -1256,7 +1260,9 @@ func TestPriorityQueue_Activate(t *testing.T) {
// Prepare activeQ/unschedulablePods/podBackoffQ according to the table // Prepare activeQ/unschedulablePods/podBackoffQ according to the table
for _, qPodInfo := range tt.qPodInfoInActiveQ { for _, qPodInfo := range tt.qPodInfoInActiveQ {
q.activeQ.unlocked().AddOrUpdate(qPodInfo) q.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) {
unlockedActiveQ.AddOrUpdate(qPodInfo)
})
} }
for _, qPodInfo := range tt.qPodInfoInUnschedulablePods { for _, qPodInfo := range tt.qPodInfoInUnschedulablePods {
@ -1277,7 +1283,7 @@ func TestPriorityQueue_Activate(t *testing.T) {
// Check if the specific pod exists in activeQ // Check if the specific pod exists in activeQ
for _, want := range tt.want { for _, want := range tt.want {
if !q.activeQ.unlocked().Has(newQueuedPodInfoForLookup(want.PodInfo.Pod)) { if !q.activeQ.has(newQueuedPodInfoForLookup(want.PodInfo.Pod)) {
t.Errorf("podInfo not exist in activeQ: want %v", want.PodInfo.Pod.Name) t.Errorf("podInfo not exist in activeQ: want %v", want.PodInfo.Pod.Name)
} }
} }
@ -1563,7 +1569,9 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueueWithQueueingHint(t *testing.
} }
cl := testingclock.NewFakeClock(now) cl := testingclock.NewFakeClock(now)
q := NewTestQueue(ctx, newDefaultQueueSort(), WithQueueingHintMapPerProfile(m), WithClock(cl)) q := NewTestQueue(ctx, newDefaultQueueSort(), WithQueueingHintMapPerProfile(m), WithClock(cl))
q.activeQ.unlocked().AddOrUpdate(q.newQueuedPodInfo(test.podInfo.Pod)) q.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) {
unlockedActiveQ.AddOrUpdate(q.newQueuedPodInfo(test.podInfo.Pod))
})
if p, err := q.Pop(logger); err != nil || p.Pod != test.podInfo.Pod { if p, err := q.Pop(logger); err != nil || p.Pod != test.podInfo.Pod {
t.Errorf("Expected: %v after Pop, but got: %v", test.podInfo.Pod.Name, p.Pod.Name) t.Errorf("Expected: %v after Pop, but got: %v", test.podInfo.Pod.Name, p.Pod.Name)
} }
@ -1607,12 +1615,16 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueue(t *testing.T) {
} }
q := NewTestQueue(ctx, newDefaultQueueSort(), WithClock(c), WithQueueingHintMapPerProfile(m)) q := NewTestQueue(ctx, newDefaultQueueSort(), WithClock(c), WithQueueingHintMapPerProfile(m))
// To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before AddUnschedulableIfNotPresent()s below. // To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before AddUnschedulableIfNotPresent()s below.
q.activeQ.unlocked().AddOrUpdate(q.newQueuedPodInfo(unschedulablePodInfo.Pod)) q.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) {
unlockedActiveQ.AddOrUpdate(q.newQueuedPodInfo(unschedulablePodInfo.Pod))
})
if p, err := q.Pop(logger); err != nil || p.Pod != unschedulablePodInfo.Pod { if p, err := q.Pop(logger); err != nil || p.Pod != unschedulablePodInfo.Pod {
t.Errorf("Expected: %v after Pop, but got: %v", unschedulablePodInfo.Pod.Name, p.Pod.Name) t.Errorf("Expected: %v after Pop, but got: %v", unschedulablePodInfo.Pod.Name, p.Pod.Name)
} }
expectInFlightPods(t, q, unschedulablePodInfo.Pod.UID) expectInFlightPods(t, q, unschedulablePodInfo.Pod.UID)
q.activeQ.unlocked().AddOrUpdate(q.newQueuedPodInfo(highPriorityPodInfo.Pod)) q.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) {
unlockedActiveQ.AddOrUpdate(q.newQueuedPodInfo(highPriorityPodInfo.Pod))
})
if p, err := q.Pop(logger); err != nil || p.Pod != highPriorityPodInfo.Pod { if p, err := q.Pop(logger); err != nil || p.Pod != highPriorityPodInfo.Pod {
t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPodInfo.Pod.Name, p.Pod.Name) t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPodInfo.Pod.Name, p.Pod.Name)
} }
@ -1628,7 +1640,9 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueue(t *testing.T) {
expectInFlightPods(t, q) expectInFlightPods(t, q)
// Construct a Pod, but don't associate its scheduler failure to any plugin // Construct a Pod, but don't associate its scheduler failure to any plugin
hpp1 := clonePod(highPriorityPodInfo.Pod, "hpp1") hpp1 := clonePod(highPriorityPodInfo.Pod, "hpp1")
q.activeQ.unlocked().AddOrUpdate(q.newQueuedPodInfo(hpp1)) q.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) {
unlockedActiveQ.AddOrUpdate(q.newQueuedPodInfo(hpp1))
})
if p, err := q.Pop(logger); err != nil || p.Pod != hpp1 { if p, err := q.Pop(logger); err != nil || p.Pod != hpp1 {
t.Errorf("Expected: %v after Pop, but got: %v", hpp1, p.Pod.Name) t.Errorf("Expected: %v after Pop, but got: %v", hpp1, p.Pod.Name)
} }
@ -1641,7 +1655,9 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueue(t *testing.T) {
expectInFlightPods(t, q) expectInFlightPods(t, q)
// Construct another Pod, and associate its scheduler failure to plugin "barPlugin". // Construct another Pod, and associate its scheduler failure to plugin "barPlugin".
hpp2 := clonePod(highPriorityPodInfo.Pod, "hpp2") hpp2 := clonePod(highPriorityPodInfo.Pod, "hpp2")
q.activeQ.unlocked().AddOrUpdate(q.newQueuedPodInfo(hpp2)) q.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) {
unlockedActiveQ.AddOrUpdate(q.newQueuedPodInfo(hpp2))
})
if p, err := q.Pop(logger); err != nil || p.Pod != hpp2 { if p, err := q.Pop(logger); err != nil || p.Pod != hpp2 {
t.Errorf("Expected: %v after Pop, but got: %v", hpp2, p.Pod.Name) t.Errorf("Expected: %v after Pop, but got: %v", hpp2, p.Pod.Name)
} }
@ -1676,17 +1692,23 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueue(t *testing.T) {
} }
expectInFlightPods(t, q, medPriorityPodInfo.Pod.UID) expectInFlightPods(t, q, medPriorityPodInfo.Pod.UID)
q.activeQ.unlocked().AddOrUpdate(q.newQueuedPodInfo(unschedulablePodInfo.Pod)) q.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) {
unlockedActiveQ.AddOrUpdate(q.newQueuedPodInfo(unschedulablePodInfo.Pod))
})
if p, err := q.Pop(logger); err != nil || p.Pod != unschedulablePodInfo.Pod { if p, err := q.Pop(logger); err != nil || p.Pod != unschedulablePodInfo.Pod {
t.Errorf("Expected: %v after Pop, but got: %v", unschedulablePodInfo.Pod.Name, p.Pod.Name) t.Errorf("Expected: %v after Pop, but got: %v", unschedulablePodInfo.Pod.Name, p.Pod.Name)
} }
expectInFlightPods(t, q, medPriorityPodInfo.Pod.UID, unschedulablePodInfo.Pod.UID) expectInFlightPods(t, q, medPriorityPodInfo.Pod.UID, unschedulablePodInfo.Pod.UID)
q.activeQ.unlocked().AddOrUpdate(q.newQueuedPodInfo(highPriorityPodInfo.Pod)) q.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) {
unlockedActiveQ.AddOrUpdate(q.newQueuedPodInfo(highPriorityPodInfo.Pod))
})
if p, err := q.Pop(logger); err != nil || p.Pod != highPriorityPodInfo.Pod { if p, err := q.Pop(logger); err != nil || p.Pod != highPriorityPodInfo.Pod {
t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPodInfo.Pod.Name, p.Pod.Name) t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPodInfo.Pod.Name, p.Pod.Name)
} }
expectInFlightPods(t, q, medPriorityPodInfo.Pod.UID, unschedulablePodInfo.Pod.UID, highPriorityPodInfo.Pod.UID) expectInFlightPods(t, q, medPriorityPodInfo.Pod.UID, unschedulablePodInfo.Pod.UID, highPriorityPodInfo.Pod.UID)
q.activeQ.unlocked().AddOrUpdate(q.newQueuedPodInfo(hpp1)) q.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) {
unlockedActiveQ.AddOrUpdate(q.newQueuedPodInfo(hpp1))
})
if p, err := q.Pop(logger); err != nil || p.Pod != hpp1 { if p, err := q.Pop(logger); err != nil || p.Pod != hpp1 {
t.Errorf("Expected: %v after Pop, but got: %v", hpp1, p.Pod.Name) t.Errorf("Expected: %v after Pop, but got: %v", hpp1, p.Pod.Name)
} }
@ -1946,7 +1968,9 @@ func TestPriorityQueue_AssignedPodAdded_(t *testing.T) {
q := NewTestQueue(ctx, newDefaultQueueSort(), WithClock(c), WithQueueingHintMapPerProfile(m)) q := NewTestQueue(ctx, newDefaultQueueSort(), WithClock(c), WithQueueingHintMapPerProfile(m))
// To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before AddUnschedulableIfNotPresent()s below. // To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before AddUnschedulableIfNotPresent()s below.
q.activeQ.unlocked().AddOrUpdate(q.newQueuedPodInfo(tt.unschedPod)) q.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) {
unlockedActiveQ.AddOrUpdate(q.newQueuedPodInfo(tt.unschedPod))
})
if p, err := q.Pop(logger); err != nil || p.Pod != tt.unschedPod { if p, err := q.Pop(logger); err != nil || p.Pod != tt.unschedPod {
t.Errorf("Expected: %v after Pop, but got: %v", tt.unschedPod.Name, p.Pod.Name) t.Errorf("Expected: %v after Pop, but got: %v", tt.unschedPod.Name, p.Pod.Name)
} }
@ -1961,7 +1985,7 @@ func TestPriorityQueue_AssignedPodAdded_(t *testing.T) {
q.AssignedPodAdded(logger, tt.updatedAssignedPod) q.AssignedPodAdded(logger, tt.updatedAssignedPod)
if q.activeQ.unlocked().Has(newQueuedPodInfoForLookup(tt.unschedPod)) != tt.wantToRequeue { if q.activeQ.has(newQueuedPodInfoForLookup(tt.unschedPod)) != tt.wantToRequeue {
t.Fatalf("unexpected Pod move: Pod should be requeued: %v. Pod is actually requeued: %v", tt.wantToRequeue, !tt.wantToRequeue) t.Fatalf("unexpected Pod move: Pod should be requeued: %v. Pod is actually requeued: %v", tt.wantToRequeue, !tt.wantToRequeue)
} }
}) })
@ -2061,11 +2085,15 @@ func TestPriorityQueue_PendingPods(t *testing.T) {
defer cancel() defer cancel()
q := NewTestQueue(ctx, newDefaultQueueSort()) q := NewTestQueue(ctx, newDefaultQueueSort())
// To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before AddUnschedulableIfNotPresent()s below. // To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before AddUnschedulableIfNotPresent()s below.
q.activeQ.unlocked().AddOrUpdate(q.newQueuedPodInfo(unschedulablePodInfo.Pod)) q.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) {
unlockedActiveQ.AddOrUpdate(q.newQueuedPodInfo(unschedulablePodInfo.Pod))
})
if p, err := q.Pop(logger); err != nil || p.Pod != unschedulablePodInfo.Pod { if p, err := q.Pop(logger); err != nil || p.Pod != unschedulablePodInfo.Pod {
t.Errorf("Expected: %v after Pop, but got: %v", unschedulablePodInfo.Pod.Name, p.Pod.Name) t.Errorf("Expected: %v after Pop, but got: %v", unschedulablePodInfo.Pod.Name, p.Pod.Name)
} }
q.activeQ.unlocked().AddOrUpdate(q.newQueuedPodInfo(highPriorityPodInfo.Pod)) q.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) {
unlockedActiveQ.AddOrUpdate(q.newQueuedPodInfo(highPriorityPodInfo.Pod))
})
if p, err := q.Pop(logger); err != nil || p.Pod != highPriorityPodInfo.Pod { if p, err := q.Pop(logger); err != nil || p.Pod != highPriorityPodInfo.Pod {
t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPodInfo.Pod.Name, p.Pod.Name) t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPodInfo.Pod.Name, p.Pod.Name)
} }
@ -2411,7 +2439,9 @@ func TestPodFailedSchedulingMultipleTimesDoesNotBlockNewerPod(t *testing.T) {
}) })
// To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before AddUnschedulableIfNotPresent() below. // To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before AddUnschedulableIfNotPresent() below.
q.activeQ.unlocked().AddOrUpdate(q.newQueuedPodInfo(unschedulablePod)) q.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) {
unlockedActiveQ.AddOrUpdate(q.newQueuedPodInfo(unschedulablePod))
})
if p, err := q.Pop(logger); err != nil || p.Pod != unschedulablePod { if p, err := q.Pop(logger); err != nil || p.Pod != unschedulablePod {
t.Errorf("Expected: %v after Pop, but got: %v", unschedulablePod.Name, p.Pod.Name) t.Errorf("Expected: %v after Pop, but got: %v", unschedulablePod.Name, p.Pod.Name)
} }
@ -2549,11 +2579,15 @@ func TestHighPriorityFlushUnschedulablePodsLeftover(t *testing.T) {
}) })
// To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before AddUnschedulableIfNotPresent()s below. // To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before AddUnschedulableIfNotPresent()s below.
q.activeQ.unlocked().AddOrUpdate(q.newQueuedPodInfo(highPod)) q.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) {
unlockedActiveQ.AddOrUpdate(q.newQueuedPodInfo(highPod))
})
if p, err := q.Pop(logger); err != nil || p.Pod != highPod { if p, err := q.Pop(logger); err != nil || p.Pod != highPod {
t.Errorf("Expected: %v after Pop, but got: %v", highPod.Name, p.Pod.Name) t.Errorf("Expected: %v after Pop, but got: %v", highPod.Name, p.Pod.Name)
} }
q.activeQ.unlocked().AddOrUpdate(q.newQueuedPodInfo(midPod)) q.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) {
unlockedActiveQ.AddOrUpdate(q.newQueuedPodInfo(midPod))
})
if p, err := q.Pop(logger); err != nil || p.Pod != midPod { if p, err := q.Pop(logger); err != nil || p.Pod != midPod {
t.Errorf("Expected: %v after Pop, but got: %v", midPod.Name, p.Pod.Name) t.Errorf("Expected: %v after Pop, but got: %v", midPod.Name, p.Pod.Name)
} }
@ -2673,7 +2707,9 @@ var (
// To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before AddUnschedulableIfNotPresent() below. // To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before AddUnschedulableIfNotPresent() below.
// UnschedulablePlugins will get cleared by Pop, so make a copy first. // UnschedulablePlugins will get cleared by Pop, so make a copy first.
unschedulablePlugins := pInfo.UnschedulablePlugins.Clone() unschedulablePlugins := pInfo.UnschedulablePlugins.Clone()
queue.activeQ.unlocked().AddOrUpdate(queue.newQueuedPodInfo(pInfo.Pod)) queue.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) {
unlockedActiveQ.AddOrUpdate(queue.newQueuedPodInfo(pInfo.Pod))
})
p, err := queue.Pop(logger) p, err := queue.Pop(logger)
if err != nil { if err != nil {
t.Fatalf("Unexpected error during Pop: %v", err) t.Fatalf("Unexpected error during Pop: %v", err)
@ -2689,7 +2725,9 @@ var (
} }
popAndRequeueAsBackoff = func(t *testing.T, logger klog.Logger, queue *PriorityQueue, pInfo *framework.QueuedPodInfo) { popAndRequeueAsBackoff = func(t *testing.T, logger klog.Logger, queue *PriorityQueue, pInfo *framework.QueuedPodInfo) {
// To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before AddUnschedulableIfNotPresent() below. // To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before AddUnschedulableIfNotPresent() below.
queue.activeQ.unlocked().AddOrUpdate(queue.newQueuedPodInfo(pInfo.Pod)) queue.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) {
unlockedActiveQ.AddOrUpdate(queue.newQueuedPodInfo(pInfo.Pod))
})
p, err := queue.Pop(logger) p, err := queue.Pop(logger)
if err != nil { if err != nil {
t.Fatalf("Unexpected error during Pop: %v", err) t.Fatalf("Unexpected error during Pop: %v", err)
@ -2703,7 +2741,9 @@ var (
} }
} }
addPodActiveQ = func(t *testing.T, logger klog.Logger, queue *PriorityQueue, pInfo *framework.QueuedPodInfo) { addPodActiveQ = func(t *testing.T, logger klog.Logger, queue *PriorityQueue, pInfo *framework.QueuedPodInfo) {
queue.activeQ.unlocked().AddOrUpdate(pInfo) queue.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) {
unlockedActiveQ.AddOrUpdate(pInfo)
})
} }
addPodUnschedulablePods = func(t *testing.T, logger klog.Logger, queue *PriorityQueue, pInfo *framework.QueuedPodInfo) { addPodUnschedulablePods = func(t *testing.T, logger klog.Logger, queue *PriorityQueue, pInfo *framework.QueuedPodInfo) {
if !pInfo.Gated { if !pInfo.Gated {
@ -3449,7 +3489,9 @@ func TestMoveAllToActiveOrBackoffQueue_PreEnqueueChecks(t *testing.T) {
q := NewTestQueue(ctx, newDefaultQueueSort()) q := NewTestQueue(ctx, newDefaultQueueSort())
for i, podInfo := range tt.podInfos { for i, podInfo := range tt.podInfos {
// To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before AddUnschedulableIfNotPresent() below. // To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before AddUnschedulableIfNotPresent() below.
q.activeQ.unlocked().AddOrUpdate(q.newQueuedPodInfo(podInfo.Pod)) q.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) {
unlockedActiveQ.AddOrUpdate(q.newQueuedPodInfo(podInfo.Pod))
})
if p, err := q.Pop(logger); err != nil || p.Pod != podInfo.Pod { if p, err := q.Pop(logger); err != nil || p.Pod != podInfo.Pod {
t.Errorf("Expected: %v after Pop, but got: %v", podInfo.Pod.Name, p.Pod.Name) t.Errorf("Expected: %v after Pop, but got: %v", podInfo.Pod.Name, p.Pod.Name)
} }