Add missing increments of queue_incoming_pods_total metric in scheduling queue

This commit is contained in:
Maciej Skoczeń 2025-02-25 13:05:33 +00:00
parent 566f939b19
commit 6975572a80
6 changed files with 98 additions and 51 deletions

View File

@ -61,14 +61,55 @@ type activeQueuer interface {
// underLock() method should be used to protect these methods. // underLock() method should be used to protect these methods.
type unlockedActiveQueuer interface { type unlockedActiveQueuer interface {
unlockedActiveQueueReader unlockedActiveQueueReader
AddOrUpdate(pInfo *framework.QueuedPodInfo) // add adds a new pod to the activeQ.
// The event should show which event triggered this addition and is used for the metric recording.
// This method should be called in activeQueue.underLock().
add(pInfo *framework.QueuedPodInfo, event string)
} }
// unlockedActiveQueueReader defines activeQ read-only methods that are not protected by the lock itself. // unlockedActiveQueueReader defines activeQ read-only methods that are not protected by the lock itself.
// underLock() or underRLock() method should be used to protect these methods. // underLock() or underRLock() method should be used to protect these methods.
type unlockedActiveQueueReader interface { type unlockedActiveQueueReader interface {
Get(pInfo *framework.QueuedPodInfo) (*framework.QueuedPodInfo, bool) // get returns the pod matching pInfo inside the activeQ.
Has(pInfo *framework.QueuedPodInfo) bool // Returns false if the pInfo doesn't exist in the queue.
// This method should be called in activeQueue.underLock() or activeQueue.underRLock().
get(pInfo *framework.QueuedPodInfo) (*framework.QueuedPodInfo, bool)
// has returns if pInfo exists in the queue.
// This method should be called in activeQueue.underLock() or activeQueue.underRLock().
has(pInfo *framework.QueuedPodInfo) bool
}
// unlockedActiveQueue defines activeQ methods that are not protected by the lock itself.
// activeQueue.underLock() or activeQueue.underRLock() method should be used to protect these methods.
type unlockedActiveQueue struct {
queue *heap.Heap[*framework.QueuedPodInfo]
}
func newUnlockedActiveQueue(queue *heap.Heap[*framework.QueuedPodInfo]) *unlockedActiveQueue {
return &unlockedActiveQueue{
queue: queue,
}
}
// add adds a new pod to the activeQ.
// The event should show which event triggered this addition and is used for the metric recording.
// This method should be called in activeQueue.underLock().
func (uaq *unlockedActiveQueue) add(pInfo *framework.QueuedPodInfo, event string) {
uaq.queue.AddOrUpdate(pInfo)
metrics.SchedulerQueueIncomingPods.WithLabelValues("active", event).Inc()
}
// get returns the pod matching pInfo inside the activeQ.
// Returns false if the pInfo doesn't exist in the queue.
// This method should be called in activeQueue.underLock() or activeQueue.underRLock().
func (uaq *unlockedActiveQueue) get(pInfo *framework.QueuedPodInfo) (*framework.QueuedPodInfo, bool) {
return uaq.queue.Get(pInfo)
}
// has returns if pInfo exists in the queue.
// This method should be called in activeQueue.underLock() or activeQueue.underRLock().
func (uaq *unlockedActiveQueue) has(pInfo *framework.QueuedPodInfo) bool {
return uaq.queue.Has(pInfo)
} }
// activeQueue implements activeQueuer. All of the fields have to be protected using the lock. // activeQueue implements activeQueuer. All of the fields have to be protected using the lock.
@ -85,6 +126,10 @@ type activeQueue struct {
// schedule. Head of heap is the highest priority pod. // schedule. Head of heap is the highest priority pod.
queue *heap.Heap[*framework.QueuedPodInfo] queue *heap.Heap[*framework.QueuedPodInfo]
// unlockedQueue is a wrapper of queue providing methods that are not locked themselves
// and can be used in the underLock() or underRLock().
unlockedQueue *unlockedActiveQueue
// cond is a condition that is notified when the pod is added to activeQ. // cond is a condition that is notified when the pod is added to activeQ.
// It is used with lock. // It is used with lock.
cond sync.Cond cond sync.Cond
@ -134,6 +179,7 @@ func newActiveQueue(queue *heap.Heap[*framework.QueuedPodInfo], isSchedulingQueu
inFlightEvents: list.New(), inFlightEvents: list.New(),
isSchedulingQueueHintEnabled: isSchedulingQueueHintEnabled, isSchedulingQueueHintEnabled: isSchedulingQueueHintEnabled,
metricsRecorder: metricRecorder, metricsRecorder: metricRecorder,
unlockedQueue: newUnlockedActiveQueue(queue),
} }
aq.cond.L = &aq.lock aq.cond.L = &aq.lock
@ -146,7 +192,7 @@ func newActiveQueue(queue *heap.Heap[*framework.QueuedPodInfo], isSchedulingQueu
func (aq *activeQueue) underLock(fn func(unlockedActiveQ unlockedActiveQueuer)) { func (aq *activeQueue) underLock(fn func(unlockedActiveQ unlockedActiveQueuer)) {
aq.lock.Lock() aq.lock.Lock()
defer aq.lock.Unlock() defer aq.lock.Unlock()
fn(aq.queue) fn(aq.unlockedQueue)
} }
// underLock runs the fn function under the lock.RLock. // underLock runs the fn function under the lock.RLock.
@ -155,7 +201,7 @@ func (aq *activeQueue) underLock(fn func(unlockedActiveQ unlockedActiveQueuer))
func (aq *activeQueue) underRLock(fn func(unlockedActiveQ unlockedActiveQueueReader)) { func (aq *activeQueue) underRLock(fn func(unlockedActiveQ unlockedActiveQueueReader)) {
aq.lock.RLock() aq.lock.RLock()
defer aq.lock.RUnlock() defer aq.lock.RUnlock()
fn(aq.queue) fn(aq.unlockedQueue)
} }
// update updates the pod in activeQ if oldPodInfo is already in the queue. // update updates the pod in activeQ if oldPodInfo is already in the queue.

View File

@ -34,8 +34,8 @@ func TestClose(t *testing.T) {
aq := newActiveQueue(heap.NewWithRecorder(podInfoKeyFunc, heap.LessFunc[*framework.QueuedPodInfo](newDefaultQueueSort()), metrics.NewActivePodsRecorder()), true, *rr) aq := newActiveQueue(heap.NewWithRecorder(podInfoKeyFunc, heap.LessFunc[*framework.QueuedPodInfo](newDefaultQueueSort()), metrics.NewActivePodsRecorder()), true, *rr)
aq.underLock(func(unlockedActiveQ unlockedActiveQueuer) { aq.underLock(func(unlockedActiveQ unlockedActiveQueuer) {
unlockedActiveQ.AddOrUpdate(&framework.QueuedPodInfo{PodInfo: &framework.PodInfo{Pod: st.MakePod().Namespace("foo").Name("p1").UID("p1").Obj()}}) unlockedActiveQ.add(&framework.QueuedPodInfo{PodInfo: &framework.PodInfo{Pod: st.MakePod().Namespace("foo").Name("p1").UID("p1").Obj()}}, framework.EventUnscheduledPodAdd.Label())
unlockedActiveQ.AddOrUpdate(&framework.QueuedPodInfo{PodInfo: &framework.PodInfo{Pod: st.MakePod().Namespace("bar").Name("p2").UID("p2").Obj()}}) unlockedActiveQ.add(&framework.QueuedPodInfo{PodInfo: &framework.PodInfo{Pod: st.MakePod().Namespace("bar").Name("p2").UID("p2").Obj()}}, framework.EventUnscheduledPodAdd.Label())
}) })
_, err := aq.pop(logger) _, err := aq.pop(logger)

View File

@ -43,8 +43,9 @@ type backoffQueuer interface {
podMaxBackoffDuration() time.Duration podMaxBackoffDuration() time.Duration
// add adds the pInfo to backoffQueue. // add adds the pInfo to backoffQueue.
// The event should show which event triggered this addition and is used for the metric recording.
// It also ensures that pInfo is not in both queues. // It also ensures that pInfo is not in both queues.
add(logger klog.Logger, pInfo *framework.QueuedPodInfo) add(logger klog.Logger, pInfo *framework.QueuedPodInfo, event string)
// update updates the pod in backoffQueue if oldPodInfo is already in the queue. // update updates the pod in backoffQueue if oldPodInfo is already in the queue.
// It returns new pod info if updated, nil otherwise. // It returns new pod info if updated, nil otherwise.
update(newPod *v1.Pod, oldPodInfo *framework.QueuedPodInfo) *framework.QueuedPodInfo update(newPod *v1.Pod, oldPodInfo *framework.QueuedPodInfo) *framework.QueuedPodInfo
@ -168,8 +169,9 @@ func (bq *backoffQueue) popEachBackoffCompleted(logger klog.Logger, fn func(pInf
} }
// add adds the pInfo to backoffQueue. // add adds the pInfo to backoffQueue.
// The event should show which event triggered this addition and is used for the metric recording.
// It also ensures that pInfo is not in both queues. // It also ensures that pInfo is not in both queues.
func (bq *backoffQueue) add(logger klog.Logger, pInfo *framework.QueuedPodInfo) { func (bq *backoffQueue) add(logger klog.Logger, pInfo *framework.QueuedPodInfo, event string) {
// If pod has empty both unschedulable plugins and pending plugins, // If pod has empty both unschedulable plugins and pending plugins,
// it means that it failed because of error and should be moved to podErrorBackoffQ. // it means that it failed because of error and should be moved to podErrorBackoffQ.
if pInfo.UnschedulablePlugins.Len() == 0 && pInfo.PendingPlugins.Len() == 0 { if pInfo.UnschedulablePlugins.Len() == 0 && pInfo.PendingPlugins.Len() == 0 {
@ -178,7 +180,9 @@ func (bq *backoffQueue) add(logger klog.Logger, pInfo *framework.QueuedPodInfo)
err := bq.podBackoffQ.Delete(pInfo) err := bq.podBackoffQ.Delete(pInfo)
if err == nil { if err == nil {
logger.Error(nil, "BackoffQueue add() was called with a pod that was already in the podBackoffQ", "pod", klog.KObj(pInfo.Pod)) logger.Error(nil, "BackoffQueue add() was called with a pod that was already in the podBackoffQ", "pod", klog.KObj(pInfo.Pod))
return
} }
metrics.SchedulerQueueIncomingPods.WithLabelValues("backoff", event).Inc()
return return
} }
bq.podBackoffQ.AddOrUpdate(pInfo) bq.podBackoffQ.AddOrUpdate(pInfo)
@ -186,7 +190,9 @@ func (bq *backoffQueue) add(logger klog.Logger, pInfo *framework.QueuedPodInfo)
err := bq.podErrorBackoffQ.Delete(pInfo) err := bq.podErrorBackoffQ.Delete(pInfo)
if err == nil { if err == nil {
logger.Error(nil, "BackoffQueue add() was called with a pod that was already in the podErrorBackoffQ", "pod", klog.KObj(pInfo.Pod)) logger.Error(nil, "BackoffQueue add() was called with a pod that was already in the podErrorBackoffQ", "pod", klog.KObj(pInfo.Pod))
return
} }
metrics.SchedulerQueueIncomingPods.WithLabelValues("backoff", event).Inc()
} }
// update updates the pod in backoffQueue if oldPodInfo is already in the queue. // update updates the pod in backoffQueue if oldPodInfo is already in the queue.

View File

@ -152,7 +152,7 @@ func TestBackoffQueue_popEachBackoffCompleted(t *testing.T) {
logger, _ := ktesting.NewTestContext(t) logger, _ := ktesting.NewTestContext(t)
bq := newBackoffQueue(fakeClock, DefaultPodInitialBackoffDuration, DefaultPodMaxBackoffDuration) bq := newBackoffQueue(fakeClock, DefaultPodInitialBackoffDuration, DefaultPodMaxBackoffDuration)
for _, podName := range tt.podsInBackoff { for _, podName := range tt.podsInBackoff {
bq.add(logger, podInfos[podName]) bq.add(logger, podInfos[podName], framework.EventUnscheduledPodAdd.Label())
} }
var gotPods []string var gotPods []string
bq.popEachBackoffCompleted(logger, func(pInfo *framework.QueuedPodInfo) { bq.popEachBackoffCompleted(logger, func(pInfo *framework.QueuedPodInfo) {

View File

@ -557,13 +557,17 @@ func (p *PriorityQueue) moveToActiveQ(logger klog.Logger, pInfo *framework.Queue
p.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) { p.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) {
if pInfo.Gated { if pInfo.Gated {
// Add the Pod to unschedulablePods if it's not passing PreEnqueuePlugins. // Add the Pod to unschedulablePods if it's not passing PreEnqueuePlugins.
if unlockedActiveQ.Has(pInfo) { if unlockedActiveQ.has(pInfo) {
return return
} }
if p.backoffQ.has(pInfo) { if p.backoffQ.has(pInfo) {
return return
} }
p.unschedulablePods.addOrUpdate(pInfo) if p.unschedulablePods.get(pInfo.Pod) != nil {
return
}
p.unschedulablePods.addOrUpdate(pInfo, event)
logger.V(5).Info("Pod moved to an internal scheduling queue, because the pod is gated", "pod", klog.KObj(pInfo.Pod), "event", event, "queue", unschedulablePods)
return return
} }
if pInfo.InitialAttemptTimestamp == nil { if pInfo.InitialAttemptTimestamp == nil {
@ -571,13 +575,12 @@ func (p *PriorityQueue) moveToActiveQ(logger klog.Logger, pInfo *framework.Queue
pInfo.InitialAttemptTimestamp = &now pInfo.InitialAttemptTimestamp = &now
} }
unlockedActiveQ.AddOrUpdate(pInfo) unlockedActiveQ.add(pInfo, event)
added = true added = true
p.unschedulablePods.delete(pInfo.Pod, gatedBefore) p.unschedulablePods.delete(pInfo.Pod, gatedBefore)
p.backoffQ.delete(pInfo) p.backoffQ.delete(pInfo)
logger.V(5).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pInfo.Pod), "event", event, "queue", activeQ) logger.V(5).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pInfo.Pod), "event", event, "queue", activeQ)
metrics.SchedulerQueueIncomingPods.WithLabelValues("active", event).Inc()
if event == framework.EventUnscheduledPodAdd.Label() || event == framework.EventUnscheduledPodUpdate.Label() { if event == framework.EventUnscheduledPodAdd.Label() || event == framework.EventUnscheduledPodUpdate.Label() {
p.AddNominatedPod(logger, pInfo.PodInfo, nil) p.AddNominatedPod(logger, pInfo.PodInfo, nil)
} }
@ -721,13 +724,11 @@ func (p *PriorityQueue) addUnschedulableWithoutQueueingHint(logger klog.Logger,
// - No unschedulable plugins are associated with this Pod, // - No unschedulable plugins are associated with this Pod,
// meaning something unusual (a temporal failure on kube-apiserver, etc) happened and this Pod gets moved back to the queue. // meaning something unusual (a temporal failure on kube-apiserver, etc) happened and this Pod gets moved back to the queue.
// In this case, we should retry scheduling it because this Pod may not be retried until the next flush. // In this case, we should retry scheduling it because this Pod may not be retried until the next flush.
p.backoffQ.add(logger, pInfo) p.backoffQ.add(logger, pInfo, framework.ScheduleAttemptFailure)
logger.V(5).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pod), "event", framework.ScheduleAttemptFailure, "queue", backoffQ) logger.V(5).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pod), "event", framework.ScheduleAttemptFailure, "queue", backoffQ)
metrics.SchedulerQueueIncomingPods.WithLabelValues("backoff", framework.ScheduleAttemptFailure).Inc()
} else { } else {
p.unschedulablePods.addOrUpdate(pInfo) p.unschedulablePods.addOrUpdate(pInfo, framework.ScheduleAttemptFailure)
logger.V(5).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pod), "event", framework.ScheduleAttemptFailure, "queue", unschedulablePods) logger.V(5).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pod), "event", framework.ScheduleAttemptFailure, "queue", unschedulablePods)
metrics.SchedulerQueueIncomingPods.WithLabelValues("unschedulable", framework.ScheduleAttemptFailure).Inc()
} }
return nil return nil
@ -933,7 +934,7 @@ func (p *PriorityQueue) Update(logger klog.Logger, oldPod, newPod *v1.Pod) {
// Pod might have completed its backoff time while being in unschedulablePods, // Pod might have completed its backoff time while being in unschedulablePods,
// so we should check isPodBackingoff before moving the pod to backoffQ. // so we should check isPodBackingoff before moving the pod to backoffQ.
if p.backoffQ.isPodBackingoff(pInfo) { if p.backoffQ.isPodBackingoff(pInfo) {
p.backoffQ.add(logger, pInfo) p.backoffQ.add(logger, pInfo, framework.EventUnscheduledPodUpdate.Label())
p.unschedulablePods.delete(pInfo.Pod, gated) p.unschedulablePods.delete(pInfo.Pod, gated)
logger.V(5).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pInfo.Pod), "event", framework.EventUnscheduledPodUpdate.Label(), "queue", backoffQ) logger.V(5).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pInfo.Pod), "event", framework.EventUnscheduledPodUpdate.Label(), "queue", backoffQ)
return return
@ -946,7 +947,7 @@ func (p *PriorityQueue) Update(logger klog.Logger, oldPod, newPod *v1.Pod) {
} }
// Pod update didn't make it schedulable, keep it in the unschedulable queue. // Pod update didn't make it schedulable, keep it in the unschedulable queue.
p.unschedulablePods.addOrUpdate(pInfo) p.unschedulablePods.addOrUpdate(pInfo, framework.EventUnscheduledPodUpdate.Label())
return return
} }
// If pod is not in any of the queues, we put it in the active queue. // If pod is not in any of the queues, we put it in the active queue.
@ -1036,16 +1037,14 @@ func (p *PriorityQueue) MoveAllToActiveOrBackoffQueue(logger klog.Logger, event
// NOTE: this function assumes lock has been acquired in caller // NOTE: this function assumes lock has been acquired in caller
func (p *PriorityQueue) requeuePodViaQueueingHint(logger klog.Logger, pInfo *framework.QueuedPodInfo, strategy queueingStrategy, event string) string { func (p *PriorityQueue) requeuePodViaQueueingHint(logger klog.Logger, pInfo *framework.QueuedPodInfo, strategy queueingStrategy, event string) string {
if strategy == queueSkip { if strategy == queueSkip {
p.unschedulablePods.addOrUpdate(pInfo) p.unschedulablePods.addOrUpdate(pInfo, event)
metrics.SchedulerQueueIncomingPods.WithLabelValues("unschedulable", event).Inc()
return unschedulablePods return unschedulablePods
} }
// Pod might have completed its backoff time while being in unschedulablePods, // Pod might have completed its backoff time while being in unschedulablePods,
// so we should check isPodBackingoff before moving the pod to backoffQ. // so we should check isPodBackingoff before moving the pod to backoffQ.
if strategy == queueAfterBackoff && p.backoffQ.isPodBackingoff(pInfo) { if strategy == queueAfterBackoff && p.backoffQ.isPodBackingoff(pInfo) {
p.backoffQ.add(logger, pInfo) p.backoffQ.add(logger, pInfo, event)
metrics.SchedulerQueueIncomingPods.WithLabelValues("backoff", event).Inc()
return backoffQ return backoffQ
} }
@ -1053,13 +1052,7 @@ func (p *PriorityQueue) requeuePodViaQueueingHint(logger klog.Logger, pInfo *fra
if added := p.moveToActiveQ(logger, pInfo, event); added { if added := p.moveToActiveQ(logger, pInfo, event); added {
return activeQ return activeQ
} }
if pInfo.Gated { // Pod is gated. We don't have to push it back to unschedulable queue, because moveToActiveQ should already have done that.
// In case the pod is gated, the Pod is pushed back to unschedulable Pods pool in moveToActiveQ.
return unschedulablePods
}
p.unschedulablePods.addOrUpdate(pInfo)
metrics.SchedulerQueueIncomingPods.WithLabelValues("unschedulable", framework.ScheduleAttemptFailure).Inc()
return unschedulablePods return unschedulablePods
} }
@ -1178,7 +1171,7 @@ func (p *PriorityQueue) GetPod(name, namespace string) (pInfo *framework.QueuedP
} }
p.activeQ.underRLock(func(unlockedActiveQ unlockedActiveQueueReader) { p.activeQ.underRLock(func(unlockedActiveQ unlockedActiveQueueReader) {
pInfo, ok = unlockedActiveQ.Get(pInfoLookup) pInfo, ok = unlockedActiveQ.get(pInfoLookup)
}) })
return return
} }
@ -1205,7 +1198,7 @@ func (p *PriorityQueue) nominatedPodToInfo(np podRef, unlockedActiveQ unlockedAc
pod := np.toPod() pod := np.toPod()
pInfoLookup := newQueuedPodInfoForLookup(pod) pInfoLookup := newQueuedPodInfoForLookup(pod)
queuedPodInfo, exists := unlockedActiveQ.Get(pInfoLookup) queuedPodInfo, exists := unlockedActiveQ.get(pInfoLookup)
if exists { if exists {
return queuedPodInfo.PodInfo return queuedPodInfo.PodInfo
} }
@ -1275,7 +1268,8 @@ type UnschedulablePods struct {
} }
// addOrUpdate adds a pod to the unschedulable podInfoMap. // addOrUpdate adds a pod to the unschedulable podInfoMap.
func (u *UnschedulablePods) addOrUpdate(pInfo *framework.QueuedPodInfo) { // The event should show which event triggered the addition and is used for the metric recording.
func (u *UnschedulablePods) addOrUpdate(pInfo *framework.QueuedPodInfo, event string) {
podID := u.keyFunc(pInfo.Pod) podID := u.keyFunc(pInfo.Pod)
if _, exists := u.podInfoMap[podID]; !exists { if _, exists := u.podInfoMap[podID]; !exists {
if pInfo.Gated && u.gatedRecorder != nil { if pInfo.Gated && u.gatedRecorder != nil {
@ -1283,6 +1277,7 @@ func (u *UnschedulablePods) addOrUpdate(pInfo *framework.QueuedPodInfo) {
} else if !pInfo.Gated && u.unschedulableRecorder != nil { } else if !pInfo.Gated && u.unschedulableRecorder != nil {
u.unschedulableRecorder.Inc() u.unschedulableRecorder.Inc()
} }
metrics.SchedulerQueueIncomingPods.WithLabelValues("unschedulable", event).Inc()
} }
u.podInfoMap[podID] = pInfo u.podInfoMap[podID] = pInfo
} }

View File

@ -692,7 +692,7 @@ func Test_InFlightPods(t *testing.T) {
// Simulate a bug, putting pod into activeQ, while pod is being scheduled. // Simulate a bug, putting pod into activeQ, while pod is being scheduled.
{callback: func(t *testing.T, q *PriorityQueue) { {callback: func(t *testing.T, q *PriorityQueue) {
q.activeQ.underLock(func(unlocked unlockedActiveQueuer) { q.activeQ.underLock(func(unlocked unlockedActiveQueuer) {
unlocked.AddOrUpdate(newQueuedPodInfoForLookup(pod1)) unlocked.add(newQueuedPodInfoForLookup(pod1), framework.EventUnscheduledPodAdd.Label())
}) })
}}, }},
// At this point, in the activeQ, we have pod1 and pod3 in this order. // At this point, in the activeQ, we have pod1 and pod3 in this order.
@ -1098,7 +1098,7 @@ func TestPriorityQueue_Update(t *testing.T) {
wantQ: backoffQ, wantQ: backoffQ,
prepareFunc: func(t *testing.T, logger klog.Logger, q *PriorityQueue) (oldPod, newPod *v1.Pod) { prepareFunc: func(t *testing.T, logger klog.Logger, q *PriorityQueue) (oldPod, newPod *v1.Pod) {
podInfo := q.newQueuedPodInfo(medPriorityPodInfo.Pod) podInfo := q.newQueuedPodInfo(medPriorityPodInfo.Pod)
q.backoffQ.add(logger, podInfo) q.backoffQ.add(logger, podInfo, framework.EventUnscheduledPodAdd.Label())
return podInfo.Pod, podInfo.Pod return podInfo.Pod, podInfo.Pod
}, },
schedulingHintsEnablement: []bool{false, true}, schedulingHintsEnablement: []bool{false, true},
@ -1107,7 +1107,7 @@ func TestPriorityQueue_Update(t *testing.T) {
name: "when updating a pod which is in unschedulable queue and is backing off, it will be moved to backoff queue", name: "when updating a pod which is in unschedulable queue and is backing off, it will be moved to backoff queue",
wantQ: backoffQ, wantQ: backoffQ,
prepareFunc: func(t *testing.T, logger klog.Logger, q *PriorityQueue) (oldPod, newPod *v1.Pod) { prepareFunc: func(t *testing.T, logger klog.Logger, q *PriorityQueue) (oldPod, newPod *v1.Pod) {
q.unschedulablePods.addOrUpdate(attemptQueuedPodInfo(q.newQueuedPodInfo(medPriorityPodInfo.Pod, queuePlugin))) q.unschedulablePods.addOrUpdate(attemptQueuedPodInfo(q.newQueuedPodInfo(medPriorityPodInfo.Pod, queuePlugin)), framework.EventUnscheduledPodAdd.Label())
updatedPod := medPriorityPodInfo.Pod.DeepCopy() updatedPod := medPriorityPodInfo.Pod.DeepCopy()
updatedPod.Annotations["foo"] = "test" updatedPod.Annotations["foo"] = "test"
return medPriorityPodInfo.Pod, updatedPod return medPriorityPodInfo.Pod, updatedPod
@ -1118,7 +1118,7 @@ func TestPriorityQueue_Update(t *testing.T) {
name: "when updating a pod which is in unschedulable queue and is not backing off, it will be moved to active queue", name: "when updating a pod which is in unschedulable queue and is not backing off, it will be moved to active queue",
wantQ: activeQ, wantQ: activeQ,
prepareFunc: func(t *testing.T, logger klog.Logger, q *PriorityQueue) (oldPod, newPod *v1.Pod) { prepareFunc: func(t *testing.T, logger klog.Logger, q *PriorityQueue) (oldPod, newPod *v1.Pod) {
q.unschedulablePods.addOrUpdate(attemptQueuedPodInfo(q.newQueuedPodInfo(medPriorityPodInfo.Pod, queuePlugin))) q.unschedulablePods.addOrUpdate(attemptQueuedPodInfo(q.newQueuedPodInfo(medPriorityPodInfo.Pod, queuePlugin)), framework.EventUnscheduledPodAdd.Label())
updatedPod := medPriorityPodInfo.Pod.DeepCopy() updatedPod := medPriorityPodInfo.Pod.DeepCopy()
updatedPod.Annotations["foo"] = "test1" updatedPod.Annotations["foo"] = "test1"
// Move clock by podInitialBackoffDuration, so that pods in the unschedulablePods would pass the backing off, // Move clock by podInitialBackoffDuration, so that pods in the unschedulablePods would pass the backing off,
@ -1132,7 +1132,7 @@ func TestPriorityQueue_Update(t *testing.T) {
name: "when updating a pod which is in unschedulable pods but the plugin returns skip, it will remain in unschedulablePods", name: "when updating a pod which is in unschedulable pods but the plugin returns skip, it will remain in unschedulablePods",
wantQ: unschedulablePods, wantQ: unschedulablePods,
prepareFunc: func(t *testing.T, logger klog.Logger, q *PriorityQueue) (oldPod, newPod *v1.Pod) { prepareFunc: func(t *testing.T, logger klog.Logger, q *PriorityQueue) (oldPod, newPod *v1.Pod) {
q.unschedulablePods.addOrUpdate(q.newQueuedPodInfo(medPriorityPodInfo.Pod, skipPlugin)) q.unschedulablePods.addOrUpdate(q.newQueuedPodInfo(medPriorityPodInfo.Pod, skipPlugin), framework.EventUnscheduledPodAdd.Label())
updatedPod := medPriorityPodInfo.Pod.DeepCopy() updatedPod := medPriorityPodInfo.Pod.DeepCopy()
updatedPod.Annotations["foo"] = "test1" updatedPod.Annotations["foo"] = "test1"
return medPriorityPodInfo.Pod, updatedPod return medPriorityPodInfo.Pod, updatedPod
@ -1181,7 +1181,7 @@ func TestPriorityQueue_Update(t *testing.T) {
} }
q.activeQ.underRLock(func(unlockedActiveQ unlockedActiveQueueReader) { q.activeQ.underRLock(func(unlockedActiveQ unlockedActiveQueueReader) {
if pInfoFromActive, exists := unlockedActiveQ.Get(newQueuedPodInfoForLookup(newPod)); exists { if pInfoFromActive, exists := unlockedActiveQ.get(newQueuedPodInfoForLookup(newPod)); exists {
if tt.wantQ != activeQ { if tt.wantQ != activeQ {
t.Errorf("expected pod %s not to be queued to activeQ, but it was", newPod.Name) t.Errorf("expected pod %s not to be queued to activeQ, but it was", newPod.Name)
} }
@ -1357,7 +1357,7 @@ func TestPriorityQueue_Activate(t *testing.T) {
if tt.qPodInInFlightPod != nil { if tt.qPodInInFlightPod != nil {
// Put -> Pop the Pod to make it registered in inFlightPods. // Put -> Pop the Pod to make it registered in inFlightPods.
q.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) { q.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) {
unlockedActiveQ.AddOrUpdate(newQueuedPodInfoForLookup(tt.qPodInInFlightPod)) unlockedActiveQ.add(newQueuedPodInfoForLookup(tt.qPodInInFlightPod), framework.EventUnscheduledPodAdd.Label())
}) })
p, err := q.activeQ.pop(logger) p, err := q.activeQ.pop(logger)
if err != nil { if err != nil {
@ -1377,11 +1377,11 @@ func TestPriorityQueue_Activate(t *testing.T) {
} }
for _, qPodInfo := range tt.qPodInfoInUnschedulablePods { for _, qPodInfo := range tt.qPodInfoInUnschedulablePods {
q.unschedulablePods.addOrUpdate(qPodInfo) q.unschedulablePods.addOrUpdate(qPodInfo, framework.EventUnscheduledPodAdd.Label())
} }
for _, qPodInfo := range tt.qPodInfoInBackoffQ { for _, qPodInfo := range tt.qPodInfoInBackoffQ {
q.backoffQ.add(logger, qPodInfo) q.backoffQ.add(logger, qPodInfo, framework.EventUnscheduledPodAdd.Label())
} }
// Activate specific pod according to the table // Activate specific pod according to the table
@ -2542,7 +2542,7 @@ func TestUnschedulablePodsMap(t *testing.T) {
t.Run(test.name, func(t *testing.T) { t.Run(test.name, func(t *testing.T) {
upm := newUnschedulablePods(nil, nil) upm := newUnschedulablePods(nil, nil)
for _, p := range test.podsToAdd { for _, p := range test.podsToAdd {
upm.addOrUpdate(newQueuedPodInfoForLookup(p)) upm.addOrUpdate(newQueuedPodInfoForLookup(p), framework.EventUnscheduledPodAdd.Label())
} }
if diff := cmp.Diff(test.expectedMapAfterAdd, upm.podInfoMap, cmpopts.IgnoreUnexported(framework.PodInfo{})); diff != "" { if diff := cmp.Diff(test.expectedMapAfterAdd, upm.podInfoMap, cmpopts.IgnoreUnexported(framework.PodInfo{})); diff != "" {
t.Errorf("Unexpected map after adding pods(-want, +got):\n%s", diff) t.Errorf("Unexpected map after adding pods(-want, +got):\n%s", diff)
@ -2550,7 +2550,7 @@ func TestUnschedulablePodsMap(t *testing.T) {
if len(test.podsToUpdate) > 0 { if len(test.podsToUpdate) > 0 {
for _, p := range test.podsToUpdate { for _, p := range test.podsToUpdate {
upm.addOrUpdate(newQueuedPodInfoForLookup(p)) upm.addOrUpdate(newQueuedPodInfoForLookup(p), framework.EventUnscheduledPodUpdate.Label())
} }
if diff := cmp.Diff(test.expectedMapAfterUpdate, upm.podInfoMap, cmpopts.IgnoreUnexported(framework.PodInfo{})); diff != "" { if diff := cmp.Diff(test.expectedMapAfterUpdate, upm.podInfoMap, cmpopts.IgnoreUnexported(framework.PodInfo{})); diff != "" {
t.Errorf("Unexpected map after updating pods (-want, +got):\n%s", diff) t.Errorf("Unexpected map after updating pods (-want, +got):\n%s", diff)
@ -2964,7 +2964,7 @@ var (
} }
addPodActiveQDirectly = func(t *testing.T, logger klog.Logger, queue *PriorityQueue, pInfo *framework.QueuedPodInfo) { addPodActiveQDirectly = func(t *testing.T, logger klog.Logger, queue *PriorityQueue, pInfo *framework.QueuedPodInfo) {
queue.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) { queue.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) {
unlockedActiveQ.AddOrUpdate(pInfo) unlockedActiveQ.add(pInfo, framework.EventUnscheduledPodAdd.Label())
}) })
} }
addPodUnschedulablePods = func(t *testing.T, logger klog.Logger, queue *PriorityQueue, pInfo *framework.QueuedPodInfo) { addPodUnschedulablePods = func(t *testing.T, logger klog.Logger, queue *PriorityQueue, pInfo *framework.QueuedPodInfo) {
@ -2978,7 +2978,7 @@ var (
}) })
pInfo = attemptQueuedPodInfo(pInfo) pInfo = attemptQueuedPodInfo(pInfo)
} }
queue.unschedulablePods.addOrUpdate(pInfo) queue.unschedulablePods.addOrUpdate(pInfo, framework.EventUnscheduledPodAdd.Label())
} }
deletePod = func(t *testing.T, _ klog.Logger, queue *PriorityQueue, pInfo *framework.QueuedPodInfo) { deletePod = func(t *testing.T, _ klog.Logger, queue *PriorityQueue, pInfo *framework.QueuedPodInfo) {
queue.Delete(pInfo.Pod) queue.Delete(pInfo.Pod)
@ -2989,7 +2989,7 @@ var (
queue.Update(logger, pInfo.Pod, newPod) queue.Update(logger, pInfo.Pod, newPod)
} }
addPodBackoffQ = func(t *testing.T, logger klog.Logger, queue *PriorityQueue, pInfo *framework.QueuedPodInfo) { addPodBackoffQ = func(t *testing.T, logger klog.Logger, queue *PriorityQueue, pInfo *framework.QueuedPodInfo) {
queue.backoffQ.add(logger, pInfo) queue.backoffQ.add(logger, pInfo, framework.EventUnscheduledPodAdd.Label())
} }
moveAllToActiveOrBackoffQ = func(t *testing.T, logger klog.Logger, queue *PriorityQueue, _ *framework.QueuedPodInfo) { moveAllToActiveOrBackoffQ = func(t *testing.T, logger klog.Logger, queue *PriorityQueue, _ *framework.QueuedPodInfo) {
queue.MoveAllToActiveOrBackoffQueue(logger, framework.EventUnschedulableTimeout, nil, nil, nil) queue.MoveAllToActiveOrBackoffQueue(logger, framework.EventUnschedulableTimeout, nil, nil, nil)
@ -4144,10 +4144,10 @@ func TestPriorityQueue_GetPod(t *testing.T) {
logger, ctx := ktesting.NewTestContext(t) logger, ctx := ktesting.NewTestContext(t)
q := NewTestQueue(ctx, newDefaultQueueSort()) q := NewTestQueue(ctx, newDefaultQueueSort())
q.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) { q.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) {
unlockedActiveQ.AddOrUpdate(newQueuedPodInfoForLookup(activeQPod)) unlockedActiveQ.add(newQueuedPodInfoForLookup(activeQPod), framework.EventUnscheduledPodAdd.Label())
}) })
q.backoffQ.add(logger, newQueuedPodInfoForLookup(backoffQPod)) q.backoffQ.add(logger, newQueuedPodInfoForLookup(backoffQPod), framework.EventUnscheduledPodAdd.Label())
q.unschedulablePods.addOrUpdate(newQueuedPodInfoForLookup(unschedPod)) q.unschedulablePods.addOrUpdate(newQueuedPodInfoForLookup(unschedPod), framework.EventUnscheduledPodAdd.Label())
tests := []struct { tests := []struct {
name string name string