diff --git a/pkg/scheduler/backend/queue/active_queue.go b/pkg/scheduler/backend/queue/active_queue.go index bb53bf18dc7..207f052962c 100644 --- a/pkg/scheduler/backend/queue/active_queue.go +++ b/pkg/scheduler/backend/queue/active_queue.go @@ -61,14 +61,55 @@ type activeQueuer interface { // underLock() method should be used to protect these methods. type unlockedActiveQueuer interface { unlockedActiveQueueReader - AddOrUpdate(pInfo *framework.QueuedPodInfo) + // add adds a new pod to the activeQ. + // The event should show which event triggered this addition and is used for the metric recording. + // This method should be called in activeQueue.underLock(). + add(pInfo *framework.QueuedPodInfo, event string) } // unlockedActiveQueueReader defines activeQ read-only methods that are not protected by the lock itself. // underLock() or underRLock() method should be used to protect these methods. type unlockedActiveQueueReader interface { - Get(pInfo *framework.QueuedPodInfo) (*framework.QueuedPodInfo, bool) - Has(pInfo *framework.QueuedPodInfo) bool + // get returns the pod matching pInfo inside the activeQ. + // Returns false if the pInfo doesn't exist in the queue. + // This method should be called in activeQueue.underLock() or activeQueue.underRLock(). + get(pInfo *framework.QueuedPodInfo) (*framework.QueuedPodInfo, bool) + // has returns if pInfo exists in the queue. + // This method should be called in activeQueue.underLock() or activeQueue.underRLock(). + has(pInfo *framework.QueuedPodInfo) bool +} + +// unlockedActiveQueue defines activeQ methods that are not protected by the lock itself. +// activeQueue.underLock() or activeQueue.underRLock() method should be used to protect these methods. +type unlockedActiveQueue struct { + queue *heap.Heap[*framework.QueuedPodInfo] +} + +func newUnlockedActiveQueue(queue *heap.Heap[*framework.QueuedPodInfo]) *unlockedActiveQueue { + return &unlockedActiveQueue{ + queue: queue, + } +} + +// add adds a new pod to the activeQ. +// The event should show which event triggered this addition and is used for the metric recording. +// This method should be called in activeQueue.underLock(). +func (uaq *unlockedActiveQueue) add(pInfo *framework.QueuedPodInfo, event string) { + uaq.queue.AddOrUpdate(pInfo) + metrics.SchedulerQueueIncomingPods.WithLabelValues("active", event).Inc() +} + +// get returns the pod matching pInfo inside the activeQ. +// Returns false if the pInfo doesn't exist in the queue. +// This method should be called in activeQueue.underLock() or activeQueue.underRLock(). +func (uaq *unlockedActiveQueue) get(pInfo *framework.QueuedPodInfo) (*framework.QueuedPodInfo, bool) { + return uaq.queue.Get(pInfo) +} + +// has returns if pInfo exists in the queue. +// This method should be called in activeQueue.underLock() or activeQueue.underRLock(). +func (uaq *unlockedActiveQueue) has(pInfo *framework.QueuedPodInfo) bool { + return uaq.queue.Has(pInfo) } // activeQueue implements activeQueuer. All of the fields have to be protected using the lock. @@ -85,6 +126,10 @@ type activeQueue struct { // schedule. Head of heap is the highest priority pod. queue *heap.Heap[*framework.QueuedPodInfo] + // unlockedQueue is a wrapper of queue providing methods that are not locked themselves + // and can be used in the underLock() or underRLock(). + unlockedQueue *unlockedActiveQueue + // cond is a condition that is notified when the pod is added to activeQ. // It is used with lock. cond sync.Cond @@ -134,6 +179,7 @@ func newActiveQueue(queue *heap.Heap[*framework.QueuedPodInfo], isSchedulingQueu inFlightEvents: list.New(), isSchedulingQueueHintEnabled: isSchedulingQueueHintEnabled, metricsRecorder: metricRecorder, + unlockedQueue: newUnlockedActiveQueue(queue), } aq.cond.L = &aq.lock @@ -146,7 +192,7 @@ func newActiveQueue(queue *heap.Heap[*framework.QueuedPodInfo], isSchedulingQueu func (aq *activeQueue) underLock(fn func(unlockedActiveQ unlockedActiveQueuer)) { aq.lock.Lock() defer aq.lock.Unlock() - fn(aq.queue) + fn(aq.unlockedQueue) } // underLock runs the fn function under the lock.RLock. @@ -155,7 +201,7 @@ func (aq *activeQueue) underLock(fn func(unlockedActiveQ unlockedActiveQueuer)) func (aq *activeQueue) underRLock(fn func(unlockedActiveQ unlockedActiveQueueReader)) { aq.lock.RLock() defer aq.lock.RUnlock() - fn(aq.queue) + fn(aq.unlockedQueue) } // update updates the pod in activeQ if oldPodInfo is already in the queue. diff --git a/pkg/scheduler/backend/queue/active_queue_test.go b/pkg/scheduler/backend/queue/active_queue_test.go index cbe5b27d37c..2c567731566 100644 --- a/pkg/scheduler/backend/queue/active_queue_test.go +++ b/pkg/scheduler/backend/queue/active_queue_test.go @@ -34,8 +34,8 @@ func TestClose(t *testing.T) { aq := newActiveQueue(heap.NewWithRecorder(podInfoKeyFunc, heap.LessFunc[*framework.QueuedPodInfo](newDefaultQueueSort()), metrics.NewActivePodsRecorder()), true, *rr) aq.underLock(func(unlockedActiveQ unlockedActiveQueuer) { - unlockedActiveQ.AddOrUpdate(&framework.QueuedPodInfo{PodInfo: &framework.PodInfo{Pod: st.MakePod().Namespace("foo").Name("p1").UID("p1").Obj()}}) - unlockedActiveQ.AddOrUpdate(&framework.QueuedPodInfo{PodInfo: &framework.PodInfo{Pod: st.MakePod().Namespace("bar").Name("p2").UID("p2").Obj()}}) + unlockedActiveQ.add(&framework.QueuedPodInfo{PodInfo: &framework.PodInfo{Pod: st.MakePod().Namespace("foo").Name("p1").UID("p1").Obj()}}, framework.EventUnscheduledPodAdd.Label()) + unlockedActiveQ.add(&framework.QueuedPodInfo{PodInfo: &framework.PodInfo{Pod: st.MakePod().Namespace("bar").Name("p2").UID("p2").Obj()}}, framework.EventUnscheduledPodAdd.Label()) }) _, err := aq.pop(logger) diff --git a/pkg/scheduler/backend/queue/backoff_queue.go b/pkg/scheduler/backend/queue/backoff_queue.go index 2af772a3664..27663c359ae 100644 --- a/pkg/scheduler/backend/queue/backoff_queue.go +++ b/pkg/scheduler/backend/queue/backoff_queue.go @@ -43,8 +43,9 @@ type backoffQueuer interface { podMaxBackoffDuration() time.Duration // add adds the pInfo to backoffQueue. + // The event should show which event triggered this addition and is used for the metric recording. // It also ensures that pInfo is not in both queues. - add(logger klog.Logger, pInfo *framework.QueuedPodInfo) + add(logger klog.Logger, pInfo *framework.QueuedPodInfo, event string) // update updates the pod in backoffQueue if oldPodInfo is already in the queue. // It returns new pod info if updated, nil otherwise. update(newPod *v1.Pod, oldPodInfo *framework.QueuedPodInfo) *framework.QueuedPodInfo @@ -168,8 +169,9 @@ func (bq *backoffQueue) popEachBackoffCompleted(logger klog.Logger, fn func(pInf } // add adds the pInfo to backoffQueue. +// The event should show which event triggered this addition and is used for the metric recording. // It also ensures that pInfo is not in both queues. -func (bq *backoffQueue) add(logger klog.Logger, pInfo *framework.QueuedPodInfo) { +func (bq *backoffQueue) add(logger klog.Logger, pInfo *framework.QueuedPodInfo, event string) { // If pod has empty both unschedulable plugins and pending plugins, // it means that it failed because of error and should be moved to podErrorBackoffQ. if pInfo.UnschedulablePlugins.Len() == 0 && pInfo.PendingPlugins.Len() == 0 { @@ -178,7 +180,9 @@ func (bq *backoffQueue) add(logger klog.Logger, pInfo *framework.QueuedPodInfo) err := bq.podBackoffQ.Delete(pInfo) if err == nil { logger.Error(nil, "BackoffQueue add() was called with a pod that was already in the podBackoffQ", "pod", klog.KObj(pInfo.Pod)) + return } + metrics.SchedulerQueueIncomingPods.WithLabelValues("backoff", event).Inc() return } bq.podBackoffQ.AddOrUpdate(pInfo) @@ -186,7 +190,9 @@ func (bq *backoffQueue) add(logger klog.Logger, pInfo *framework.QueuedPodInfo) err := bq.podErrorBackoffQ.Delete(pInfo) if err == nil { logger.Error(nil, "BackoffQueue add() was called with a pod that was already in the podErrorBackoffQ", "pod", klog.KObj(pInfo.Pod)) + return } + metrics.SchedulerQueueIncomingPods.WithLabelValues("backoff", event).Inc() } // update updates the pod in backoffQueue if oldPodInfo is already in the queue. diff --git a/pkg/scheduler/backend/queue/backoff_queue_test.go b/pkg/scheduler/backend/queue/backoff_queue_test.go index d0468322c87..1a93a7ee32d 100644 --- a/pkg/scheduler/backend/queue/backoff_queue_test.go +++ b/pkg/scheduler/backend/queue/backoff_queue_test.go @@ -152,7 +152,7 @@ func TestBackoffQueue_popEachBackoffCompleted(t *testing.T) { logger, _ := ktesting.NewTestContext(t) bq := newBackoffQueue(fakeClock, DefaultPodInitialBackoffDuration, DefaultPodMaxBackoffDuration) for _, podName := range tt.podsInBackoff { - bq.add(logger, podInfos[podName]) + bq.add(logger, podInfos[podName], framework.EventUnscheduledPodAdd.Label()) } var gotPods []string bq.popEachBackoffCompleted(logger, func(pInfo *framework.QueuedPodInfo) { diff --git a/pkg/scheduler/backend/queue/scheduling_queue.go b/pkg/scheduler/backend/queue/scheduling_queue.go index 99d35698a87..3b85a296bce 100644 --- a/pkg/scheduler/backend/queue/scheduling_queue.go +++ b/pkg/scheduler/backend/queue/scheduling_queue.go @@ -557,13 +557,17 @@ func (p *PriorityQueue) moveToActiveQ(logger klog.Logger, pInfo *framework.Queue p.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) { if pInfo.Gated { // Add the Pod to unschedulablePods if it's not passing PreEnqueuePlugins. - if unlockedActiveQ.Has(pInfo) { + if unlockedActiveQ.has(pInfo) { return } if p.backoffQ.has(pInfo) { return } - p.unschedulablePods.addOrUpdate(pInfo) + if p.unschedulablePods.get(pInfo.Pod) != nil { + return + } + p.unschedulablePods.addOrUpdate(pInfo, event) + logger.V(5).Info("Pod moved to an internal scheduling queue, because the pod is gated", "pod", klog.KObj(pInfo.Pod), "event", event, "queue", unschedulablePods) return } if pInfo.InitialAttemptTimestamp == nil { @@ -571,13 +575,12 @@ func (p *PriorityQueue) moveToActiveQ(logger klog.Logger, pInfo *framework.Queue pInfo.InitialAttemptTimestamp = &now } - unlockedActiveQ.AddOrUpdate(pInfo) + unlockedActiveQ.add(pInfo, event) added = true p.unschedulablePods.delete(pInfo.Pod, gatedBefore) p.backoffQ.delete(pInfo) logger.V(5).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pInfo.Pod), "event", event, "queue", activeQ) - metrics.SchedulerQueueIncomingPods.WithLabelValues("active", event).Inc() if event == framework.EventUnscheduledPodAdd.Label() || event == framework.EventUnscheduledPodUpdate.Label() { p.AddNominatedPod(logger, pInfo.PodInfo, nil) } @@ -721,13 +724,11 @@ func (p *PriorityQueue) addUnschedulableWithoutQueueingHint(logger klog.Logger, // - No unschedulable plugins are associated with this Pod, // meaning something unusual (a temporal failure on kube-apiserver, etc) happened and this Pod gets moved back to the queue. // In this case, we should retry scheduling it because this Pod may not be retried until the next flush. - p.backoffQ.add(logger, pInfo) + p.backoffQ.add(logger, pInfo, framework.ScheduleAttemptFailure) logger.V(5).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pod), "event", framework.ScheduleAttemptFailure, "queue", backoffQ) - metrics.SchedulerQueueIncomingPods.WithLabelValues("backoff", framework.ScheduleAttemptFailure).Inc() } else { - p.unschedulablePods.addOrUpdate(pInfo) + p.unschedulablePods.addOrUpdate(pInfo, framework.ScheduleAttemptFailure) logger.V(5).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pod), "event", framework.ScheduleAttemptFailure, "queue", unschedulablePods) - metrics.SchedulerQueueIncomingPods.WithLabelValues("unschedulable", framework.ScheduleAttemptFailure).Inc() } return nil @@ -933,7 +934,7 @@ func (p *PriorityQueue) Update(logger klog.Logger, oldPod, newPod *v1.Pod) { // Pod might have completed its backoff time while being in unschedulablePods, // so we should check isPodBackingoff before moving the pod to backoffQ. if p.backoffQ.isPodBackingoff(pInfo) { - p.backoffQ.add(logger, pInfo) + p.backoffQ.add(logger, pInfo, framework.EventUnscheduledPodUpdate.Label()) p.unschedulablePods.delete(pInfo.Pod, gated) logger.V(5).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pInfo.Pod), "event", framework.EventUnscheduledPodUpdate.Label(), "queue", backoffQ) return @@ -946,7 +947,7 @@ func (p *PriorityQueue) Update(logger klog.Logger, oldPod, newPod *v1.Pod) { } // Pod update didn't make it schedulable, keep it in the unschedulable queue. - p.unschedulablePods.addOrUpdate(pInfo) + p.unschedulablePods.addOrUpdate(pInfo, framework.EventUnscheduledPodUpdate.Label()) return } // If pod is not in any of the queues, we put it in the active queue. @@ -1036,16 +1037,14 @@ func (p *PriorityQueue) MoveAllToActiveOrBackoffQueue(logger klog.Logger, event // NOTE: this function assumes lock has been acquired in caller func (p *PriorityQueue) requeuePodViaQueueingHint(logger klog.Logger, pInfo *framework.QueuedPodInfo, strategy queueingStrategy, event string) string { if strategy == queueSkip { - p.unschedulablePods.addOrUpdate(pInfo) - metrics.SchedulerQueueIncomingPods.WithLabelValues("unschedulable", event).Inc() + p.unschedulablePods.addOrUpdate(pInfo, event) return unschedulablePods } // Pod might have completed its backoff time while being in unschedulablePods, // so we should check isPodBackingoff before moving the pod to backoffQ. if strategy == queueAfterBackoff && p.backoffQ.isPodBackingoff(pInfo) { - p.backoffQ.add(logger, pInfo) - metrics.SchedulerQueueIncomingPods.WithLabelValues("backoff", event).Inc() + p.backoffQ.add(logger, pInfo, event) return backoffQ } @@ -1053,13 +1052,7 @@ func (p *PriorityQueue) requeuePodViaQueueingHint(logger klog.Logger, pInfo *fra if added := p.moveToActiveQ(logger, pInfo, event); added { return activeQ } - if pInfo.Gated { - // In case the pod is gated, the Pod is pushed back to unschedulable Pods pool in moveToActiveQ. - return unschedulablePods - } - - p.unschedulablePods.addOrUpdate(pInfo) - metrics.SchedulerQueueIncomingPods.WithLabelValues("unschedulable", framework.ScheduleAttemptFailure).Inc() + // Pod is gated. We don't have to push it back to unschedulable queue, because moveToActiveQ should already have done that. return unschedulablePods } @@ -1178,7 +1171,7 @@ func (p *PriorityQueue) GetPod(name, namespace string) (pInfo *framework.QueuedP } p.activeQ.underRLock(func(unlockedActiveQ unlockedActiveQueueReader) { - pInfo, ok = unlockedActiveQ.Get(pInfoLookup) + pInfo, ok = unlockedActiveQ.get(pInfoLookup) }) return } @@ -1205,7 +1198,7 @@ func (p *PriorityQueue) nominatedPodToInfo(np podRef, unlockedActiveQ unlockedAc pod := np.toPod() pInfoLookup := newQueuedPodInfoForLookup(pod) - queuedPodInfo, exists := unlockedActiveQ.Get(pInfoLookup) + queuedPodInfo, exists := unlockedActiveQ.get(pInfoLookup) if exists { return queuedPodInfo.PodInfo } @@ -1275,7 +1268,8 @@ type UnschedulablePods struct { } // addOrUpdate adds a pod to the unschedulable podInfoMap. -func (u *UnschedulablePods) addOrUpdate(pInfo *framework.QueuedPodInfo) { +// The event should show which event triggered the addition and is used for the metric recording. +func (u *UnschedulablePods) addOrUpdate(pInfo *framework.QueuedPodInfo, event string) { podID := u.keyFunc(pInfo.Pod) if _, exists := u.podInfoMap[podID]; !exists { if pInfo.Gated && u.gatedRecorder != nil { @@ -1283,6 +1277,7 @@ func (u *UnschedulablePods) addOrUpdate(pInfo *framework.QueuedPodInfo) { } else if !pInfo.Gated && u.unschedulableRecorder != nil { u.unschedulableRecorder.Inc() } + metrics.SchedulerQueueIncomingPods.WithLabelValues("unschedulable", event).Inc() } u.podInfoMap[podID] = pInfo } diff --git a/pkg/scheduler/backend/queue/scheduling_queue_test.go b/pkg/scheduler/backend/queue/scheduling_queue_test.go index 89fecf601ef..aaf04e66c0c 100644 --- a/pkg/scheduler/backend/queue/scheduling_queue_test.go +++ b/pkg/scheduler/backend/queue/scheduling_queue_test.go @@ -692,7 +692,7 @@ func Test_InFlightPods(t *testing.T) { // Simulate a bug, putting pod into activeQ, while pod is being scheduled. {callback: func(t *testing.T, q *PriorityQueue) { q.activeQ.underLock(func(unlocked unlockedActiveQueuer) { - unlocked.AddOrUpdate(newQueuedPodInfoForLookup(pod1)) + unlocked.add(newQueuedPodInfoForLookup(pod1), framework.EventUnscheduledPodAdd.Label()) }) }}, // At this point, in the activeQ, we have pod1 and pod3 in this order. @@ -1098,7 +1098,7 @@ func TestPriorityQueue_Update(t *testing.T) { wantQ: backoffQ, prepareFunc: func(t *testing.T, logger klog.Logger, q *PriorityQueue) (oldPod, newPod *v1.Pod) { podInfo := q.newQueuedPodInfo(medPriorityPodInfo.Pod) - q.backoffQ.add(logger, podInfo) + q.backoffQ.add(logger, podInfo, framework.EventUnscheduledPodAdd.Label()) return podInfo.Pod, podInfo.Pod }, schedulingHintsEnablement: []bool{false, true}, @@ -1107,7 +1107,7 @@ func TestPriorityQueue_Update(t *testing.T) { name: "when updating a pod which is in unschedulable queue and is backing off, it will be moved to backoff queue", wantQ: backoffQ, prepareFunc: func(t *testing.T, logger klog.Logger, q *PriorityQueue) (oldPod, newPod *v1.Pod) { - q.unschedulablePods.addOrUpdate(attemptQueuedPodInfo(q.newQueuedPodInfo(medPriorityPodInfo.Pod, queuePlugin))) + q.unschedulablePods.addOrUpdate(attemptQueuedPodInfo(q.newQueuedPodInfo(medPriorityPodInfo.Pod, queuePlugin)), framework.EventUnscheduledPodAdd.Label()) updatedPod := medPriorityPodInfo.Pod.DeepCopy() updatedPod.Annotations["foo"] = "test" return medPriorityPodInfo.Pod, updatedPod @@ -1118,7 +1118,7 @@ func TestPriorityQueue_Update(t *testing.T) { name: "when updating a pod which is in unschedulable queue and is not backing off, it will be moved to active queue", wantQ: activeQ, prepareFunc: func(t *testing.T, logger klog.Logger, q *PriorityQueue) (oldPod, newPod *v1.Pod) { - q.unschedulablePods.addOrUpdate(attemptQueuedPodInfo(q.newQueuedPodInfo(medPriorityPodInfo.Pod, queuePlugin))) + q.unschedulablePods.addOrUpdate(attemptQueuedPodInfo(q.newQueuedPodInfo(medPriorityPodInfo.Pod, queuePlugin)), framework.EventUnscheduledPodAdd.Label()) updatedPod := medPriorityPodInfo.Pod.DeepCopy() updatedPod.Annotations["foo"] = "test1" // Move clock by podInitialBackoffDuration, so that pods in the unschedulablePods would pass the backing off, @@ -1132,7 +1132,7 @@ func TestPriorityQueue_Update(t *testing.T) { name: "when updating a pod which is in unschedulable pods but the plugin returns skip, it will remain in unschedulablePods", wantQ: unschedulablePods, prepareFunc: func(t *testing.T, logger klog.Logger, q *PriorityQueue) (oldPod, newPod *v1.Pod) { - q.unschedulablePods.addOrUpdate(q.newQueuedPodInfo(medPriorityPodInfo.Pod, skipPlugin)) + q.unschedulablePods.addOrUpdate(q.newQueuedPodInfo(medPriorityPodInfo.Pod, skipPlugin), framework.EventUnscheduledPodAdd.Label()) updatedPod := medPriorityPodInfo.Pod.DeepCopy() updatedPod.Annotations["foo"] = "test1" return medPriorityPodInfo.Pod, updatedPod @@ -1181,7 +1181,7 @@ func TestPriorityQueue_Update(t *testing.T) { } q.activeQ.underRLock(func(unlockedActiveQ unlockedActiveQueueReader) { - if pInfoFromActive, exists := unlockedActiveQ.Get(newQueuedPodInfoForLookup(newPod)); exists { + if pInfoFromActive, exists := unlockedActiveQ.get(newQueuedPodInfoForLookup(newPod)); exists { if tt.wantQ != activeQ { t.Errorf("expected pod %s not to be queued to activeQ, but it was", newPod.Name) } @@ -1357,7 +1357,7 @@ func TestPriorityQueue_Activate(t *testing.T) { if tt.qPodInInFlightPod != nil { // Put -> Pop the Pod to make it registered in inFlightPods. q.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) { - unlockedActiveQ.AddOrUpdate(newQueuedPodInfoForLookup(tt.qPodInInFlightPod)) + unlockedActiveQ.add(newQueuedPodInfoForLookup(tt.qPodInInFlightPod), framework.EventUnscheduledPodAdd.Label()) }) p, err := q.activeQ.pop(logger) if err != nil { @@ -1377,11 +1377,11 @@ func TestPriorityQueue_Activate(t *testing.T) { } for _, qPodInfo := range tt.qPodInfoInUnschedulablePods { - q.unschedulablePods.addOrUpdate(qPodInfo) + q.unschedulablePods.addOrUpdate(qPodInfo, framework.EventUnscheduledPodAdd.Label()) } for _, qPodInfo := range tt.qPodInfoInBackoffQ { - q.backoffQ.add(logger, qPodInfo) + q.backoffQ.add(logger, qPodInfo, framework.EventUnscheduledPodAdd.Label()) } // Activate specific pod according to the table @@ -2542,7 +2542,7 @@ func TestUnschedulablePodsMap(t *testing.T) { t.Run(test.name, func(t *testing.T) { upm := newUnschedulablePods(nil, nil) for _, p := range test.podsToAdd { - upm.addOrUpdate(newQueuedPodInfoForLookup(p)) + upm.addOrUpdate(newQueuedPodInfoForLookup(p), framework.EventUnscheduledPodAdd.Label()) } if diff := cmp.Diff(test.expectedMapAfterAdd, upm.podInfoMap, cmpopts.IgnoreUnexported(framework.PodInfo{})); diff != "" { t.Errorf("Unexpected map after adding pods(-want, +got):\n%s", diff) @@ -2550,7 +2550,7 @@ func TestUnschedulablePodsMap(t *testing.T) { if len(test.podsToUpdate) > 0 { for _, p := range test.podsToUpdate { - upm.addOrUpdate(newQueuedPodInfoForLookup(p)) + upm.addOrUpdate(newQueuedPodInfoForLookup(p), framework.EventUnscheduledPodUpdate.Label()) } if diff := cmp.Diff(test.expectedMapAfterUpdate, upm.podInfoMap, cmpopts.IgnoreUnexported(framework.PodInfo{})); diff != "" { t.Errorf("Unexpected map after updating pods (-want, +got):\n%s", diff) @@ -2964,7 +2964,7 @@ var ( } addPodActiveQDirectly = func(t *testing.T, logger klog.Logger, queue *PriorityQueue, pInfo *framework.QueuedPodInfo) { queue.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) { - unlockedActiveQ.AddOrUpdate(pInfo) + unlockedActiveQ.add(pInfo, framework.EventUnscheduledPodAdd.Label()) }) } addPodUnschedulablePods = func(t *testing.T, logger klog.Logger, queue *PriorityQueue, pInfo *framework.QueuedPodInfo) { @@ -2978,7 +2978,7 @@ var ( }) pInfo = attemptQueuedPodInfo(pInfo) } - queue.unschedulablePods.addOrUpdate(pInfo) + queue.unschedulablePods.addOrUpdate(pInfo, framework.EventUnscheduledPodAdd.Label()) } deletePod = func(t *testing.T, _ klog.Logger, queue *PriorityQueue, pInfo *framework.QueuedPodInfo) { queue.Delete(pInfo.Pod) @@ -2989,7 +2989,7 @@ var ( queue.Update(logger, pInfo.Pod, newPod) } addPodBackoffQ = func(t *testing.T, logger klog.Logger, queue *PriorityQueue, pInfo *framework.QueuedPodInfo) { - queue.backoffQ.add(logger, pInfo) + queue.backoffQ.add(logger, pInfo, framework.EventUnscheduledPodAdd.Label()) } moveAllToActiveOrBackoffQ = func(t *testing.T, logger klog.Logger, queue *PriorityQueue, _ *framework.QueuedPodInfo) { queue.MoveAllToActiveOrBackoffQueue(logger, framework.EventUnschedulableTimeout, nil, nil, nil) @@ -4144,10 +4144,10 @@ func TestPriorityQueue_GetPod(t *testing.T) { logger, ctx := ktesting.NewTestContext(t) q := NewTestQueue(ctx, newDefaultQueueSort()) q.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) { - unlockedActiveQ.AddOrUpdate(newQueuedPodInfoForLookup(activeQPod)) + unlockedActiveQ.add(newQueuedPodInfoForLookup(activeQPod), framework.EventUnscheduledPodAdd.Label()) }) - q.backoffQ.add(logger, newQueuedPodInfoForLookup(backoffQPod)) - q.unschedulablePods.addOrUpdate(newQueuedPodInfoForLookup(unschedPod)) + q.backoffQ.add(logger, newQueuedPodInfoForLookup(backoffQPod), framework.EventUnscheduledPodAdd.Label()) + q.unschedulablePods.addOrUpdate(newQueuedPodInfoForLookup(unschedPod), framework.EventUnscheduledPodAdd.Label()) tests := []struct { name string