Merge pull request #126808 from macsko/move_activeq_fields_follow_up

Don't expose lock outside activeQueue in scheduling queue
This commit is contained in:
Kubernetes Prow Robot 2024-08-22 20:33:47 +01:00 committed by GitHub
commit e955c1d6a8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 135 additions and 100 deletions

View File

@ -35,9 +35,11 @@ import (
// getLock() methods should be used only for unlocked() methods // getLock() methods should be used only for unlocked() methods
// and it is forbidden to call any other activeQueuer's method under this lock. // and it is forbidden to call any other activeQueuer's method under this lock.
type activeQueuer interface { type activeQueuer interface {
getLock() *sync.RWMutex underLock(func(unlockedActiveQ unlockedActiveQueuer))
unlocked() unlockedActiveQueuer underRLock(func(unlockedActiveQ unlockedActiveQueueReader))
update(newPod *v1.Pod, oldPodInfo *framework.QueuedPodInfo) *framework.QueuedPodInfo
delete(pInfo *framework.QueuedPodInfo) error
pop(logger klog.Logger) (*framework.QueuedPodInfo, error) pop(logger klog.Logger) (*framework.QueuedPodInfo, error)
list() []*v1.Pod list() []*v1.Pod
len() int len() int
@ -56,12 +58,17 @@ type activeQueuer interface {
} }
// unlockedActiveQueuer defines activeQ methods that are not protected by the lock itself. // unlockedActiveQueuer defines activeQ methods that are not protected by the lock itself.
// getLock() methods should be used to protect these methods. // underLock() method should be used to protect these methods.
type unlockedActiveQueuer interface { type unlockedActiveQueuer interface {
unlockedActiveQueueReader
AddOrUpdate(pInfo *framework.QueuedPodInfo)
}
// unlockedActiveQueueReader defines activeQ read-only methods that are not protected by the lock itself.
// underLock() or underRLock() method should be used to protect these methods.
type unlockedActiveQueueReader interface {
Get(pInfo *framework.QueuedPodInfo) (*framework.QueuedPodInfo, bool) Get(pInfo *framework.QueuedPodInfo) (*framework.QueuedPodInfo, bool)
Has(pInfo *framework.QueuedPodInfo) bool Has(pInfo *framework.QueuedPodInfo) bool
AddOrUpdate(pInfo *framework.QueuedPodInfo)
Delete(pInfo *framework.QueuedPodInfo) error
} }
// activeQueue implements activeQueuer. All of the fields have to be protected using the lock. // activeQueue implements activeQueuer. All of the fields have to be protected using the lock.
@ -130,15 +137,44 @@ func newActiveQueue(queue *heap.Heap[*framework.QueuedPodInfo], isSchedulingQueu
return aq return aq
} }
// getLock returns lock of activeQueue. Its methods should be used only to protect the unlocked() methods. // underLock runs the fn function under the lock.Lock.
func (aq *activeQueue) getLock() *sync.RWMutex { // fn can run unlockedActiveQueuer methods but should NOT run any other activeQueue method,
return &aq.lock // as it would end up in deadlock.
func (aq *activeQueue) underLock(fn func(unlockedActiveQ unlockedActiveQueuer)) {
aq.lock.Lock()
defer aq.lock.Unlock()
fn(aq.queue)
} }
// unlocked returns queue methods, that are not protected by the lock itself. // underLock runs the fn function under the lock.RLock.
// getLock() methods should be used to protect queue methods. // fn can run unlockedActiveQueueReader methods but should NOT run any other activeQueue method,
func (aq *activeQueue) unlocked() unlockedActiveQueuer { // as it would end up in deadlock.
return aq.queue func (aq *activeQueue) underRLock(fn func(unlockedActiveQ unlockedActiveQueueReader)) {
aq.lock.RLock()
defer aq.lock.RUnlock()
fn(aq.queue)
}
// update updates the pod in activeQ if oldPodInfo is already in the queue.
// It returns new pod info if updated, nil otherwise.
func (aq *activeQueue) update(newPod *v1.Pod, oldPodInfo *framework.QueuedPodInfo) *framework.QueuedPodInfo {
aq.lock.Lock()
defer aq.lock.Unlock()
if pInfo, exists := aq.queue.Get(oldPodInfo); exists {
_ = pInfo.Update(newPod)
aq.queue.AddOrUpdate(pInfo)
return pInfo
}
return nil
}
// delete deletes the pod info from activeQ.
func (aq *activeQueue) delete(pInfo *framework.QueuedPodInfo) error {
aq.lock.Lock()
defer aq.lock.Unlock()
return aq.queue.Delete(pInfo)
} }
// pop removes the head of the queue and returns it. // pop removes the head of the queue and returns it.

View File

@ -541,35 +541,36 @@ func (p *PriorityQueue) moveToActiveQ(logger klog.Logger, pInfo *framework.Queue
gatedBefore := pInfo.Gated gatedBefore := pInfo.Gated
pInfo.Gated = !p.runPreEnqueuePlugins(context.Background(), pInfo) pInfo.Gated = !p.runPreEnqueuePlugins(context.Background(), pInfo)
p.activeQ.getLock().Lock() added := false
defer p.activeQ.getLock().Unlock() p.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) {
if pInfo.Gated { if pInfo.Gated {
// Add the Pod to unschedulablePods if it's not passing PreEnqueuePlugins. // Add the Pod to unschedulablePods if it's not passing PreEnqueuePlugins.
if p.activeQ.unlocked().Has(pInfo) { if unlockedActiveQ.Has(pInfo) {
return false return
}
if p.podBackoffQ.Has(pInfo) {
return
}
p.unschedulablePods.addOrUpdate(pInfo)
return
} }
if p.podBackoffQ.Has(pInfo) { if pInfo.InitialAttemptTimestamp == nil {
return false now := p.clock.Now()
pInfo.InitialAttemptTimestamp = &now
} }
p.unschedulablePods.addOrUpdate(pInfo)
return false
}
if pInfo.InitialAttemptTimestamp == nil {
now := p.clock.Now()
pInfo.InitialAttemptTimestamp = &now
}
p.activeQ.unlocked().AddOrUpdate(pInfo) unlockedActiveQ.AddOrUpdate(pInfo)
added = true
p.unschedulablePods.delete(pInfo.Pod, gatedBefore) p.unschedulablePods.delete(pInfo.Pod, gatedBefore)
_ = p.podBackoffQ.Delete(pInfo) // Don't need to react when pInfo is not found. _ = p.podBackoffQ.Delete(pInfo) // Don't need to react when pInfo is not found.
logger.V(5).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pInfo.Pod), "event", event, "queue", activeQ) logger.V(5).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pInfo.Pod), "event", event, "queue", activeQ)
metrics.SchedulerQueueIncomingPods.WithLabelValues("active", event).Inc() metrics.SchedulerQueueIncomingPods.WithLabelValues("active", event).Inc()
if event == framework.PodAdd || event == framework.PodUpdate { if event == framework.PodAdd || event == framework.PodUpdate {
p.AddNominatedPod(logger, pInfo.PodInfo, nil) p.AddNominatedPod(logger, pInfo.PodInfo, nil)
} }
})
return true return added
} }
// Add adds a pod to the active queue. It should be called only when a new pod // Add adds a pod to the active queue. It should be called only when a new pod
@ -859,18 +860,6 @@ func isPodUpdated(oldPod, newPod *v1.Pod) bool {
return !reflect.DeepEqual(strip(oldPod), strip(newPod)) return !reflect.DeepEqual(strip(oldPod), strip(newPod))
} }
func (p *PriorityQueue) updateInActiveQueue(logger klog.Logger, oldPod, newPod *v1.Pod, oldPodInfo *framework.QueuedPodInfo) bool {
p.activeQ.getLock().Lock()
defer p.activeQ.getLock().Unlock()
if pInfo, exists := p.activeQ.unlocked().Get(oldPodInfo); exists {
_ = pInfo.Update(newPod)
p.UpdateNominatedPod(logger, oldPod, pInfo.PodInfo)
p.activeQ.unlocked().AddOrUpdate(pInfo)
return true
}
return false
}
// Update updates a pod in the active or backoff queue if present. Otherwise, it removes // Update updates a pod in the active or backoff queue if present. Otherwise, it removes
// the item from the unschedulable queue if pod is updated in a way that it may // the item from the unschedulable queue if pod is updated in a way that it may
// become schedulable and adds the updated one to the active queue. // become schedulable and adds the updated one to the active queue.
@ -894,7 +883,8 @@ func (p *PriorityQueue) Update(logger klog.Logger, oldPod, newPod *v1.Pod) {
if oldPod != nil { if oldPod != nil {
oldPodInfo := newQueuedPodInfoForLookup(oldPod) oldPodInfo := newQueuedPodInfoForLookup(oldPod)
// If the pod is already in the active queue, just update it there. // If the pod is already in the active queue, just update it there.
if exists := p.updateInActiveQueue(logger, oldPod, newPod, oldPodInfo); exists { if pInfo := p.activeQ.update(newPod, oldPodInfo); pInfo != nil {
p.UpdateNominatedPod(logger, oldPod, pInfo.PodInfo)
return return
} }
@ -964,9 +954,7 @@ func (p *PriorityQueue) Delete(pod *v1.Pod) {
defer p.lock.Unlock() defer p.lock.Unlock()
p.DeleteNominatedPodIfExists(pod) p.DeleteNominatedPodIfExists(pod)
pInfo := newQueuedPodInfoForLookup(pod) pInfo := newQueuedPodInfoForLookup(pod)
p.activeQ.getLock().Lock() if err := p.activeQ.delete(pInfo); err != nil {
defer p.activeQ.getLock().Unlock()
if err := p.activeQ.unlocked().Delete(pInfo); err != nil {
// The item was probably not found in the activeQ. // The item was probably not found in the activeQ.
p.podBackoffQ.Delete(pInfo) p.podBackoffQ.Delete(pInfo)
if pInfo = p.unschedulablePods.get(pod); pInfo != nil { if pInfo = p.unschedulablePods.get(pod); pInfo != nil {
@ -1174,11 +1162,11 @@ func (p *PriorityQueue) PendingPods() ([]*v1.Pod, string) {
} }
// Note: this function assumes the caller locks both p.lock.RLock and p.activeQ.getLock().RLock. // Note: this function assumes the caller locks both p.lock.RLock and p.activeQ.getLock().RLock.
func (p *PriorityQueue) nominatedPodToInfo(np podRef) *framework.PodInfo { func (p *PriorityQueue) nominatedPodToInfo(np podRef, unlockedActiveQ unlockedActiveQueueReader) *framework.PodInfo {
pod := np.toPod() pod := np.toPod()
pInfoLookup := newQueuedPodInfoForLookup(pod) pInfoLookup := newQueuedPodInfoForLookup(pod)
queuedPodInfo, exists := p.activeQ.unlocked().Get(pInfoLookup) queuedPodInfo, exists := unlockedActiveQ.Get(pInfoLookup)
if exists { if exists {
return queuedPodInfo.PodInfo return queuedPodInfo.PodInfo
} }
@ -1213,12 +1201,12 @@ func (p *PriorityQueue) NominatedPodsForNode(nodeName string) []*framework.PodIn
defer p.lock.RUnlock() defer p.lock.RUnlock()
nominatedPods := p.nominator.nominatedPodsForNode(nodeName) nominatedPods := p.nominator.nominatedPodsForNode(nodeName)
p.activeQ.getLock().RLock()
defer p.activeQ.getLock().RUnlock()
pods := make([]*framework.PodInfo, len(nominatedPods)) pods := make([]*framework.PodInfo, len(nominatedPods))
for i, np := range nominatedPods { p.activeQ.underRLock(func(unlockedActiveQ unlockedActiveQueueReader) {
pods[i] = p.nominatedPodToInfo(np).DeepCopy() for i, np := range nominatedPods {
} pods[i] = p.nominatedPodToInfo(np, unlockedActiveQ).DeepCopy()
}
})
return pods return pods
} }

View File

@ -1064,9 +1064,8 @@ func TestPriorityQueue_Update(t *testing.T) {
name: "when updating a pod which is in flightPods, the pod will not be added to any queue", name: "when updating a pod which is in flightPods, the pod will not be added to any queue",
wantQ: notInAnyQueue, wantQ: notInAnyQueue,
prepareFunc: func(t *testing.T, logger klog.Logger, q *PriorityQueue) (oldPod, newPod *v1.Pod) { prepareFunc: func(t *testing.T, logger klog.Logger, q *PriorityQueue) (oldPod, newPod *v1.Pod) {
podInfo := q.newQueuedPodInfo(medPriorityPodInfo.Pod)
// We need to once add this Pod to activeQ and Pop() it so that this Pod is registered correctly in inFlightPods. // We need to once add this Pod to activeQ and Pop() it so that this Pod is registered correctly in inFlightPods.
q.activeQ.unlocked().AddOrUpdate(podInfo) q.Add(logger, medPriorityPodInfo.Pod)
if p, err := q.Pop(logger); err != nil || p.Pod != medPriorityPodInfo.Pod { if p, err := q.Pop(logger); err != nil || p.Pod != medPriorityPodInfo.Pod {
t.Errorf("Expected: %v after Pop, but got: %v", medPriorityPodInfo.Pod.Name, p.Pod.Name) t.Errorf("Expected: %v after Pop, but got: %v", medPriorityPodInfo.Pod.Name, p.Pod.Name)
} }
@ -1102,12 +1101,14 @@ func TestPriorityQueue_Update(t *testing.T) {
pInfo = pInfoFromBackoff pInfo = pInfoFromBackoff
} }
if pInfoFromActive, exists := q.activeQ.unlocked().Get(newQueuedPodInfoForLookup(newPod)); exists { q.activeQ.underRLock(func(unlockedActiveQ unlockedActiveQueueReader) {
if tt.wantQ != activeQ { if pInfoFromActive, exists := unlockedActiveQ.Get(newQueuedPodInfoForLookup(newPod)); exists {
t.Errorf("expected pod %s not to be queued to activeQ, but it was", newPod.Name) if tt.wantQ != activeQ {
t.Errorf("expected pod %s not to be queued to activeQ, but it was", newPod.Name)
}
pInfo = pInfoFromActive
} }
pInfo = pInfoFromActive })
}
if pInfoFromUnsched := q.unschedulablePods.get(newPod); pInfoFromUnsched != nil { if pInfoFromUnsched := q.unschedulablePods.get(newPod); pInfoFromUnsched != nil {
if tt.wantQ != unschedulablePods { if tt.wantQ != unschedulablePods {
@ -1197,10 +1198,10 @@ func TestPriorityQueue_Delete(t *testing.T) {
q.Update(logger, highPriorityPodInfo.Pod, highPriNominatedPodInfo.Pod) q.Update(logger, highPriorityPodInfo.Pod, highPriNominatedPodInfo.Pod)
q.Add(logger, unschedulablePodInfo.Pod) q.Add(logger, unschedulablePodInfo.Pod)
q.Delete(highPriNominatedPodInfo.Pod) q.Delete(highPriNominatedPodInfo.Pod)
if !q.activeQ.unlocked().Has(newQueuedPodInfoForLookup(unschedulablePodInfo.Pod)) { if !q.activeQ.has(newQueuedPodInfoForLookup(unschedulablePodInfo.Pod)) {
t.Errorf("Expected %v to be in activeQ.", unschedulablePodInfo.Pod.Name) t.Errorf("Expected %v to be in activeQ.", unschedulablePodInfo.Pod.Name)
} }
if q.activeQ.unlocked().Has(newQueuedPodInfoForLookup(highPriNominatedPodInfo.Pod)) { if q.activeQ.has(newQueuedPodInfoForLookup(highPriNominatedPodInfo.Pod)) {
t.Errorf("Didn't expect %v to be in activeQ.", highPriorityPodInfo.Pod.Name) t.Errorf("Didn't expect %v to be in activeQ.", highPriorityPodInfo.Pod.Name)
} }
if len(q.nominator.nominatedPods) != 1 { if len(q.nominator.nominatedPods) != 1 {
@ -1217,13 +1218,13 @@ func TestPriorityQueue_Activate(t *testing.T) {
name string name string
qPodInfoInUnschedulablePods []*framework.QueuedPodInfo qPodInfoInUnschedulablePods []*framework.QueuedPodInfo
qPodInfoInPodBackoffQ []*framework.QueuedPodInfo qPodInfoInPodBackoffQ []*framework.QueuedPodInfo
qPodInfoInActiveQ []*framework.QueuedPodInfo qPodInActiveQ []*v1.Pod
qPodInfoToActivate *framework.QueuedPodInfo qPodInfoToActivate *framework.QueuedPodInfo
want []*framework.QueuedPodInfo want []*framework.QueuedPodInfo
}{ }{
{ {
name: "pod already in activeQ", name: "pod already in activeQ",
qPodInfoInActiveQ: []*framework.QueuedPodInfo{{PodInfo: highPriNominatedPodInfo}}, qPodInActiveQ: []*v1.Pod{highPriNominatedPodInfo.Pod},
qPodInfoToActivate: &framework.QueuedPodInfo{PodInfo: highPriNominatedPodInfo}, qPodInfoToActivate: &framework.QueuedPodInfo{PodInfo: highPriNominatedPodInfo},
want: []*framework.QueuedPodInfo{{PodInfo: highPriNominatedPodInfo}}, // 1 already active want: []*framework.QueuedPodInfo{{PodInfo: highPriNominatedPodInfo}}, // 1 already active
}, },
@ -1255,8 +1256,8 @@ func TestPriorityQueue_Activate(t *testing.T) {
q := NewTestQueueWithObjects(ctx, newDefaultQueueSort(), objs) q := NewTestQueueWithObjects(ctx, newDefaultQueueSort(), objs)
// Prepare activeQ/unschedulablePods/podBackoffQ according to the table // Prepare activeQ/unschedulablePods/podBackoffQ according to the table
for _, qPodInfo := range tt.qPodInfoInActiveQ { for _, qPod := range tt.qPodInActiveQ {
q.activeQ.unlocked().AddOrUpdate(qPodInfo) q.Add(logger, qPod)
} }
for _, qPodInfo := range tt.qPodInfoInUnschedulablePods { for _, qPodInfo := range tt.qPodInfoInUnschedulablePods {
@ -1277,7 +1278,7 @@ func TestPriorityQueue_Activate(t *testing.T) {
// Check if the specific pod exists in activeQ // Check if the specific pod exists in activeQ
for _, want := range tt.want { for _, want := range tt.want {
if !q.activeQ.unlocked().Has(newQueuedPodInfoForLookup(want.PodInfo.Pod)) { if !q.activeQ.has(newQueuedPodInfoForLookup(want.PodInfo.Pod)) {
t.Errorf("podInfo not exist in activeQ: want %v", want.PodInfo.Pod.Name) t.Errorf("podInfo not exist in activeQ: want %v", want.PodInfo.Pod.Name)
} }
} }
@ -1563,7 +1564,7 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueueWithQueueingHint(t *testing.
} }
cl := testingclock.NewFakeClock(now) cl := testingclock.NewFakeClock(now)
q := NewTestQueue(ctx, newDefaultQueueSort(), WithQueueingHintMapPerProfile(m), WithClock(cl)) q := NewTestQueue(ctx, newDefaultQueueSort(), WithQueueingHintMapPerProfile(m), WithClock(cl))
q.activeQ.unlocked().AddOrUpdate(q.newQueuedPodInfo(test.podInfo.Pod)) q.Add(logger, test.podInfo.Pod)
if p, err := q.Pop(logger); err != nil || p.Pod != test.podInfo.Pod { if p, err := q.Pop(logger); err != nil || p.Pod != test.podInfo.Pod {
t.Errorf("Expected: %v after Pop, but got: %v", test.podInfo.Pod.Name, p.Pod.Name) t.Errorf("Expected: %v after Pop, but got: %v", test.podInfo.Pod.Name, p.Pod.Name)
} }
@ -1607,12 +1608,12 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueue(t *testing.T) {
} }
q := NewTestQueue(ctx, newDefaultQueueSort(), WithClock(c), WithQueueingHintMapPerProfile(m)) q := NewTestQueue(ctx, newDefaultQueueSort(), WithClock(c), WithQueueingHintMapPerProfile(m))
// To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before AddUnschedulableIfNotPresent()s below. // To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before AddUnschedulableIfNotPresent()s below.
q.activeQ.unlocked().AddOrUpdate(q.newQueuedPodInfo(unschedulablePodInfo.Pod)) q.Add(logger, unschedulablePodInfo.Pod)
if p, err := q.Pop(logger); err != nil || p.Pod != unschedulablePodInfo.Pod { if p, err := q.Pop(logger); err != nil || p.Pod != unschedulablePodInfo.Pod {
t.Errorf("Expected: %v after Pop, but got: %v", unschedulablePodInfo.Pod.Name, p.Pod.Name) t.Errorf("Expected: %v after Pop, but got: %v", unschedulablePodInfo.Pod.Name, p.Pod.Name)
} }
expectInFlightPods(t, q, unschedulablePodInfo.Pod.UID) expectInFlightPods(t, q, unschedulablePodInfo.Pod.UID)
q.activeQ.unlocked().AddOrUpdate(q.newQueuedPodInfo(highPriorityPodInfo.Pod)) q.Add(logger, highPriorityPodInfo.Pod)
if p, err := q.Pop(logger); err != nil || p.Pod != highPriorityPodInfo.Pod { if p, err := q.Pop(logger); err != nil || p.Pod != highPriorityPodInfo.Pod {
t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPodInfo.Pod.Name, p.Pod.Name) t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPodInfo.Pod.Name, p.Pod.Name)
} }
@ -1628,7 +1629,7 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueue(t *testing.T) {
expectInFlightPods(t, q) expectInFlightPods(t, q)
// Construct a Pod, but don't associate its scheduler failure to any plugin // Construct a Pod, but don't associate its scheduler failure to any plugin
hpp1 := clonePod(highPriorityPodInfo.Pod, "hpp1") hpp1 := clonePod(highPriorityPodInfo.Pod, "hpp1")
q.activeQ.unlocked().AddOrUpdate(q.newQueuedPodInfo(hpp1)) q.Add(logger, hpp1)
if p, err := q.Pop(logger); err != nil || p.Pod != hpp1 { if p, err := q.Pop(logger); err != nil || p.Pod != hpp1 {
t.Errorf("Expected: %v after Pop, but got: %v", hpp1, p.Pod.Name) t.Errorf("Expected: %v after Pop, but got: %v", hpp1, p.Pod.Name)
} }
@ -1641,7 +1642,7 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueue(t *testing.T) {
expectInFlightPods(t, q) expectInFlightPods(t, q)
// Construct another Pod, and associate its scheduler failure to plugin "barPlugin". // Construct another Pod, and associate its scheduler failure to plugin "barPlugin".
hpp2 := clonePod(highPriorityPodInfo.Pod, "hpp2") hpp2 := clonePod(highPriorityPodInfo.Pod, "hpp2")
q.activeQ.unlocked().AddOrUpdate(q.newQueuedPodInfo(hpp2)) q.Add(logger, hpp2)
if p, err := q.Pop(logger); err != nil || p.Pod != hpp2 { if p, err := q.Pop(logger); err != nil || p.Pod != hpp2 {
t.Errorf("Expected: %v after Pop, but got: %v", hpp2, p.Pod.Name) t.Errorf("Expected: %v after Pop, but got: %v", hpp2, p.Pod.Name)
} }
@ -1676,17 +1677,17 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueue(t *testing.T) {
} }
expectInFlightPods(t, q, medPriorityPodInfo.Pod.UID) expectInFlightPods(t, q, medPriorityPodInfo.Pod.UID)
q.activeQ.unlocked().AddOrUpdate(q.newQueuedPodInfo(unschedulablePodInfo.Pod)) q.Add(logger, unschedulablePodInfo.Pod)
if p, err := q.Pop(logger); err != nil || p.Pod != unschedulablePodInfo.Pod { if p, err := q.Pop(logger); err != nil || p.Pod != unschedulablePodInfo.Pod {
t.Errorf("Expected: %v after Pop, but got: %v", unschedulablePodInfo.Pod.Name, p.Pod.Name) t.Errorf("Expected: %v after Pop, but got: %v", unschedulablePodInfo.Pod.Name, p.Pod.Name)
} }
expectInFlightPods(t, q, medPriorityPodInfo.Pod.UID, unschedulablePodInfo.Pod.UID) expectInFlightPods(t, q, medPriorityPodInfo.Pod.UID, unschedulablePodInfo.Pod.UID)
q.activeQ.unlocked().AddOrUpdate(q.newQueuedPodInfo(highPriorityPodInfo.Pod)) q.Add(logger, highPriorityPodInfo.Pod)
if p, err := q.Pop(logger); err != nil || p.Pod != highPriorityPodInfo.Pod { if p, err := q.Pop(logger); err != nil || p.Pod != highPriorityPodInfo.Pod {
t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPodInfo.Pod.Name, p.Pod.Name) t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPodInfo.Pod.Name, p.Pod.Name)
} }
expectInFlightPods(t, q, medPriorityPodInfo.Pod.UID, unschedulablePodInfo.Pod.UID, highPriorityPodInfo.Pod.UID) expectInFlightPods(t, q, medPriorityPodInfo.Pod.UID, unschedulablePodInfo.Pod.UID, highPriorityPodInfo.Pod.UID)
q.activeQ.unlocked().AddOrUpdate(q.newQueuedPodInfo(hpp1)) q.Add(logger, hpp1)
if p, err := q.Pop(logger); err != nil || p.Pod != hpp1 { if p, err := q.Pop(logger); err != nil || p.Pod != hpp1 {
t.Errorf("Expected: %v after Pop, but got: %v", hpp1, p.Pod.Name) t.Errorf("Expected: %v after Pop, but got: %v", hpp1, p.Pod.Name)
} }
@ -1946,7 +1947,7 @@ func TestPriorityQueue_AssignedPodAdded_(t *testing.T) {
q := NewTestQueue(ctx, newDefaultQueueSort(), WithClock(c), WithQueueingHintMapPerProfile(m)) q := NewTestQueue(ctx, newDefaultQueueSort(), WithClock(c), WithQueueingHintMapPerProfile(m))
// To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before AddUnschedulableIfNotPresent()s below. // To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before AddUnschedulableIfNotPresent()s below.
q.activeQ.unlocked().AddOrUpdate(q.newQueuedPodInfo(tt.unschedPod)) q.Add(logger, tt.unschedPod)
if p, err := q.Pop(logger); err != nil || p.Pod != tt.unschedPod { if p, err := q.Pop(logger); err != nil || p.Pod != tt.unschedPod {
t.Errorf("Expected: %v after Pop, but got: %v", tt.unschedPod.Name, p.Pod.Name) t.Errorf("Expected: %v after Pop, but got: %v", tt.unschedPod.Name, p.Pod.Name)
} }
@ -1961,7 +1962,7 @@ func TestPriorityQueue_AssignedPodAdded_(t *testing.T) {
q.AssignedPodAdded(logger, tt.updatedAssignedPod) q.AssignedPodAdded(logger, tt.updatedAssignedPod)
if q.activeQ.unlocked().Has(newQueuedPodInfoForLookup(tt.unschedPod)) != tt.wantToRequeue { if q.activeQ.has(newQueuedPodInfoForLookup(tt.unschedPod)) != tt.wantToRequeue {
t.Fatalf("unexpected Pod move: Pod should be requeued: %v. Pod is actually requeued: %v", tt.wantToRequeue, !tt.wantToRequeue) t.Fatalf("unexpected Pod move: Pod should be requeued: %v. Pod is actually requeued: %v", tt.wantToRequeue, !tt.wantToRequeue)
} }
}) })
@ -2061,11 +2062,11 @@ func TestPriorityQueue_PendingPods(t *testing.T) {
defer cancel() defer cancel()
q := NewTestQueue(ctx, newDefaultQueueSort()) q := NewTestQueue(ctx, newDefaultQueueSort())
// To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before AddUnschedulableIfNotPresent()s below. // To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before AddUnschedulableIfNotPresent()s below.
q.activeQ.unlocked().AddOrUpdate(q.newQueuedPodInfo(unschedulablePodInfo.Pod)) q.Add(logger, unschedulablePodInfo.Pod)
if p, err := q.Pop(logger); err != nil || p.Pod != unschedulablePodInfo.Pod { if p, err := q.Pop(logger); err != nil || p.Pod != unschedulablePodInfo.Pod {
t.Errorf("Expected: %v after Pop, but got: %v", unschedulablePodInfo.Pod.Name, p.Pod.Name) t.Errorf("Expected: %v after Pop, but got: %v", unschedulablePodInfo.Pod.Name, p.Pod.Name)
} }
q.activeQ.unlocked().AddOrUpdate(q.newQueuedPodInfo(highPriorityPodInfo.Pod)) q.Add(logger, highPriorityPodInfo.Pod)
if p, err := q.Pop(logger); err != nil || p.Pod != highPriorityPodInfo.Pod { if p, err := q.Pop(logger); err != nil || p.Pod != highPriorityPodInfo.Pod {
t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPodInfo.Pod.Name, p.Pod.Name) t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPodInfo.Pod.Name, p.Pod.Name)
} }
@ -2411,7 +2412,7 @@ func TestPodFailedSchedulingMultipleTimesDoesNotBlockNewerPod(t *testing.T) {
}) })
// To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before AddUnschedulableIfNotPresent() below. // To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before AddUnschedulableIfNotPresent() below.
q.activeQ.unlocked().AddOrUpdate(q.newQueuedPodInfo(unschedulablePod)) q.Add(logger, unschedulablePod)
if p, err := q.Pop(logger); err != nil || p.Pod != unschedulablePod { if p, err := q.Pop(logger); err != nil || p.Pod != unschedulablePod {
t.Errorf("Expected: %v after Pop, but got: %v", unschedulablePod.Name, p.Pod.Name) t.Errorf("Expected: %v after Pop, but got: %v", unschedulablePod.Name, p.Pod.Name)
} }
@ -2549,11 +2550,11 @@ func TestHighPriorityFlushUnschedulablePodsLeftover(t *testing.T) {
}) })
// To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before AddUnschedulableIfNotPresent()s below. // To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before AddUnschedulableIfNotPresent()s below.
q.activeQ.unlocked().AddOrUpdate(q.newQueuedPodInfo(highPod)) q.Add(logger, highPod)
if p, err := q.Pop(logger); err != nil || p.Pod != highPod { if p, err := q.Pop(logger); err != nil || p.Pod != highPod {
t.Errorf("Expected: %v after Pop, but got: %v", highPod.Name, p.Pod.Name) t.Errorf("Expected: %v after Pop, but got: %v", highPod.Name, p.Pod.Name)
} }
q.activeQ.unlocked().AddOrUpdate(q.newQueuedPodInfo(midPod)) q.Add(logger, midPod)
if p, err := q.Pop(logger); err != nil || p.Pod != midPod { if p, err := q.Pop(logger); err != nil || p.Pod != midPod {
t.Errorf("Expected: %v after Pop, but got: %v", midPod.Name, p.Pod.Name) t.Errorf("Expected: %v after Pop, but got: %v", midPod.Name, p.Pod.Name)
} }
@ -2673,7 +2674,7 @@ var (
// To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before AddUnschedulableIfNotPresent() below. // To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before AddUnschedulableIfNotPresent() below.
// UnschedulablePlugins will get cleared by Pop, so make a copy first. // UnschedulablePlugins will get cleared by Pop, so make a copy first.
unschedulablePlugins := pInfo.UnschedulablePlugins.Clone() unschedulablePlugins := pInfo.UnschedulablePlugins.Clone()
queue.activeQ.unlocked().AddOrUpdate(queue.newQueuedPodInfo(pInfo.Pod)) queue.Add(logger, pInfo.Pod)
p, err := queue.Pop(logger) p, err := queue.Pop(logger)
if err != nil { if err != nil {
t.Fatalf("Unexpected error during Pop: %v", err) t.Fatalf("Unexpected error during Pop: %v", err)
@ -2689,7 +2690,7 @@ var (
} }
popAndRequeueAsBackoff = func(t *testing.T, logger klog.Logger, queue *PriorityQueue, pInfo *framework.QueuedPodInfo) { popAndRequeueAsBackoff = func(t *testing.T, logger klog.Logger, queue *PriorityQueue, pInfo *framework.QueuedPodInfo) {
// To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before AddUnschedulableIfNotPresent() below. // To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before AddUnschedulableIfNotPresent() below.
queue.activeQ.unlocked().AddOrUpdate(queue.newQueuedPodInfo(pInfo.Pod)) queue.Add(logger, pInfo.Pod)
p, err := queue.Pop(logger) p, err := queue.Pop(logger)
if err != nil { if err != nil {
t.Fatalf("Unexpected error during Pop: %v", err) t.Fatalf("Unexpected error during Pop: %v", err)
@ -2703,7 +2704,12 @@ var (
} }
} }
addPodActiveQ = func(t *testing.T, logger klog.Logger, queue *PriorityQueue, pInfo *framework.QueuedPodInfo) { addPodActiveQ = func(t *testing.T, logger klog.Logger, queue *PriorityQueue, pInfo *framework.QueuedPodInfo) {
queue.activeQ.unlocked().AddOrUpdate(pInfo) queue.Add(logger, pInfo.Pod)
}
addPodActiveQDirectly = func(t *testing.T, logger klog.Logger, queue *PriorityQueue, pInfo *framework.QueuedPodInfo) {
queue.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) {
unlockedActiveQ.AddOrUpdate(pInfo)
})
} }
addPodUnschedulablePods = func(t *testing.T, logger klog.Logger, queue *PriorityQueue, pInfo *framework.QueuedPodInfo) { addPodUnschedulablePods = func(t *testing.T, logger klog.Logger, queue *PriorityQueue, pInfo *framework.QueuedPodInfo) {
if !pInfo.Gated { if !pInfo.Gated {
@ -2768,8 +2774,9 @@ func TestPodTimestamp(t *testing.T) {
{ {
name: "add two pod to activeQ and sort them by the timestamp", name: "add two pod to activeQ and sort them by the timestamp",
operations: []operation{ operations: []operation{
addPodActiveQ, // Need to add the pods directly to the activeQ to override the timestamps.
addPodActiveQ, addPodActiveQDirectly,
addPodActiveQDirectly,
}, },
operands: []*framework.QueuedPodInfo{pInfo2, pInfo1}, operands: []*framework.QueuedPodInfo{pInfo2, pInfo1},
expected: []*framework.QueuedPodInfo{pInfo1, pInfo2}, expected: []*framework.QueuedPodInfo{pInfo1, pInfo2},
@ -2788,7 +2795,8 @@ func TestPodTimestamp(t *testing.T) {
{ {
name: "add one pod to BackoffQ and move it to activeQ", name: "add one pod to BackoffQ and move it to activeQ",
operations: []operation{ operations: []operation{
addPodActiveQ, // Need to add the pods directly to activeQ to override the timestamps.
addPodActiveQDirectly,
addPodBackoffQ, addPodBackoffQ,
flushBackoffQ, flushBackoffQ,
moveAllToActiveOrBackoffQ, moveAllToActiveOrBackoffQ,
@ -3254,7 +3262,7 @@ func TestIncomingPodsMetrics(t *testing.T) {
operations: []operation{ operations: []operation{
popAndRequeueAsUnschedulable, popAndRequeueAsUnschedulable,
}, },
want: ` want: `scheduler_queue_incoming_pods_total{event="PodAdd",queue="active"} 3
scheduler_queue_incoming_pods_total{event="ScheduleAttemptFailure",queue="unschedulable"} 3 scheduler_queue_incoming_pods_total{event="ScheduleAttemptFailure",queue="unschedulable"} 3
`, `,
}, },
@ -3264,7 +3272,8 @@ func TestIncomingPodsMetrics(t *testing.T) {
popAndRequeueAsUnschedulable, popAndRequeueAsUnschedulable,
moveAllToActiveOrBackoffQ, moveAllToActiveOrBackoffQ,
}, },
want: ` scheduler_queue_incoming_pods_total{event="ScheduleAttemptFailure",queue="unschedulable"} 3 want: `scheduler_queue_incoming_pods_total{event="PodAdd",queue="active"} 3
scheduler_queue_incoming_pods_total{event="ScheduleAttemptFailure",queue="unschedulable"} 3
scheduler_queue_incoming_pods_total{event="UnschedulableTimeout",queue="backoff"} 3 scheduler_queue_incoming_pods_total{event="UnschedulableTimeout",queue="backoff"} 3
`, `,
}, },
@ -3275,7 +3284,8 @@ func TestIncomingPodsMetrics(t *testing.T) {
moveClockForward, moveClockForward,
moveAllToActiveOrBackoffQ, moveAllToActiveOrBackoffQ,
}, },
want: ` scheduler_queue_incoming_pods_total{event="ScheduleAttemptFailure",queue="unschedulable"} 3 want: `scheduler_queue_incoming_pods_total{event="PodAdd",queue="active"} 3
scheduler_queue_incoming_pods_total{event="ScheduleAttemptFailure",queue="unschedulable"} 3
scheduler_queue_incoming_pods_total{event="UnschedulableTimeout",queue="active"} 3 scheduler_queue_incoming_pods_total{event="UnschedulableTimeout",queue="active"} 3
`, `,
}, },
@ -3286,7 +3296,8 @@ func TestIncomingPodsMetrics(t *testing.T) {
moveClockForward, moveClockForward,
flushBackoffQ, flushBackoffQ,
}, },
want: ` scheduler_queue_incoming_pods_total{event="BackoffComplete",queue="active"} 3 want: `scheduler_queue_incoming_pods_total{event="PodAdd",queue="active"} 3
scheduler_queue_incoming_pods_total{event="BackoffComplete",queue="active"} 3
scheduler_queue_incoming_pods_total{event="ScheduleAttemptFailure",queue="backoff"} 3 scheduler_queue_incoming_pods_total{event="ScheduleAttemptFailure",queue="backoff"} 3
`, `,
}, },
@ -3449,7 +3460,7 @@ func TestMoveAllToActiveOrBackoffQueue_PreEnqueueChecks(t *testing.T) {
q := NewTestQueue(ctx, newDefaultQueueSort()) q := NewTestQueue(ctx, newDefaultQueueSort())
for i, podInfo := range tt.podInfos { for i, podInfo := range tt.podInfos {
// To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before AddUnschedulableIfNotPresent() below. // To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before AddUnschedulableIfNotPresent() below.
q.activeQ.unlocked().AddOrUpdate(q.newQueuedPodInfo(podInfo.Pod)) q.Add(logger, podInfo.Pod)
if p, err := q.Pop(logger); err != nil || p.Pod != podInfo.Pod { if p, err := q.Pop(logger); err != nil || p.Pod != podInfo.Pod {
t.Errorf("Expected: %v after Pop, but got: %v", podInfo.Pod.Name, p.Pod.Name) t.Errorf("Expected: %v after Pop, but got: %v", podInfo.Pod.Name, p.Pod.Name)
} }