From 4babdf80264b5aec7463cfaa861a0964107231e1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maciej=20Skocze=C5=84?= Date: Tue, 2 Sep 2025 09:35:22 +0000 Subject: [PATCH] Fix race in movePodsToActiveOrBackoffQueue --- pkg/scheduler/backend/queue/active_queue.go | 5 +++-- pkg/scheduler/backend/queue/active_queue_test.go | 4 ++-- pkg/scheduler/backend/queue/backoff_queue.go | 2 ++ pkg/scheduler/backend/queue/scheduling_queue.go | 5 +---- pkg/scheduler/backend/queue/scheduling_queue_test.go | 8 ++++---- 5 files changed, 12 insertions(+), 12 deletions(-) diff --git a/pkg/scheduler/backend/queue/active_queue.go b/pkg/scheduler/backend/queue/active_queue.go index 9ff54c03e9f..de722833d44 100644 --- a/pkg/scheduler/backend/queue/active_queue.go +++ b/pkg/scheduler/backend/queue/active_queue.go @@ -62,7 +62,7 @@ type unlockedActiveQueuer interface { // add adds a new pod to the activeQ. // The event should show which event triggered this addition and is used for the metric recording. // This method should be called in activeQueue.underLock(). - add(pInfo *framework.QueuedPodInfo, event string) + add(logger klog.Logger, pInfo *framework.QueuedPodInfo, event string) // update updates the pod in activeQ if oldPodInfo is already in the queue. // It returns new pod info if updated, nil otherwise. update(newPod *v1.Pod, oldPodInfo *framework.QueuedPodInfo) *framework.QueuedPodInfo @@ -104,9 +104,10 @@ func newUnlockedActiveQueue(queue *heap.Heap[*framework.QueuedPodInfo], inFlight // add adds a new pod to the activeQ. // The event should show which event triggered this addition and is used for the metric recording. // This method should be called in activeQueue.underLock(). -func (uaq *unlockedActiveQueue) add(pInfo *framework.QueuedPodInfo, event string) { +func (uaq *unlockedActiveQueue) add(logger klog.Logger, pInfo *framework.QueuedPodInfo, event string) { uaq.queue.AddOrUpdate(pInfo) metrics.SchedulerQueueIncomingPods.WithLabelValues("active", event).Inc() + logger.V(5).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pInfo.Pod), "event", event, "queue", activeQ) } // update updates the pod in activeQ if oldPodInfo is already in the queue. diff --git a/pkg/scheduler/backend/queue/active_queue_test.go b/pkg/scheduler/backend/queue/active_queue_test.go index eefa9aef402..a17642ea2f8 100644 --- a/pkg/scheduler/backend/queue/active_queue_test.go +++ b/pkg/scheduler/backend/queue/active_queue_test.go @@ -33,8 +33,8 @@ func TestClose(t *testing.T) { aq := newActiveQueue(heap.NewWithRecorder(podInfoKeyFunc, heap.LessFunc[*framework.QueuedPodInfo](convertLessFn(newDefaultQueueSort())), metrics.NewActivePodsRecorder()), true, rr, nil) aq.underLock(func(unlockedActiveQ unlockedActiveQueuer) { - unlockedActiveQ.add(&framework.QueuedPodInfo{PodInfo: &framework.PodInfo{Pod: st.MakePod().Namespace("foo").Name("p1").UID("p1").Obj()}}, framework.EventUnscheduledPodAdd.Label()) - unlockedActiveQ.add(&framework.QueuedPodInfo{PodInfo: &framework.PodInfo{Pod: st.MakePod().Namespace("bar").Name("p2").UID("p2").Obj()}}, framework.EventUnscheduledPodAdd.Label()) + unlockedActiveQ.add(logger, &framework.QueuedPodInfo{PodInfo: &framework.PodInfo{Pod: st.MakePod().Namespace("foo").Name("p1").UID("p1").Obj()}}, framework.EventUnscheduledPodAdd.Label()) + unlockedActiveQ.add(logger, &framework.QueuedPodInfo{PodInfo: &framework.PodInfo{Pod: st.MakePod().Namespace("bar").Name("p2").UID("p2").Obj()}}, framework.EventUnscheduledPodAdd.Label()) }) _, err := aq.pop(logger) diff --git a/pkg/scheduler/backend/queue/backoff_queue.go b/pkg/scheduler/backend/queue/backoff_queue.go index 5d20f2593ce..b6a5e04ca86 100644 --- a/pkg/scheduler/backend/queue/backoff_queue.go +++ b/pkg/scheduler/backend/queue/backoff_queue.go @@ -306,6 +306,7 @@ func (bq *backoffQueue) add(logger klog.Logger, pInfo *framework.QueuedPodInfo, return } metrics.SchedulerQueueIncomingPods.WithLabelValues("backoff", event).Inc() + logger.V(5).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pInfo.Pod), "event", event, "queue", backoffQ) return } bq.podBackoffQ.AddOrUpdate(pInfo) @@ -316,6 +317,7 @@ func (bq *backoffQueue) add(logger klog.Logger, pInfo *framework.QueuedPodInfo, return } metrics.SchedulerQueueIncomingPods.WithLabelValues("backoff", event).Inc() + logger.V(5).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pInfo.Pod), "event", event, "queue", backoffQ) } // update updates the pod in backoffQueue if oldPodInfo is already in the queue. diff --git a/pkg/scheduler/backend/queue/scheduling_queue.go b/pkg/scheduler/backend/queue/scheduling_queue.go index 2af098adc07..5fc09d30afe 100644 --- a/pkg/scheduler/backend/queue/scheduling_queue.go +++ b/pkg/scheduler/backend/queue/scheduling_queue.go @@ -669,9 +669,8 @@ func (p *PriorityQueue) moveToActiveQ(logger klog.Logger, pInfo *framework.Queue p.unschedulablePods.delete(pInfo.Pod, gatedBefore) p.backoffQ.delete(pInfo) - unlockedActiveQ.add(pInfo, event) + unlockedActiveQ.add(logger, pInfo, event) added = true - logger.V(5).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pInfo.Pod), "event", event, "queue", activeQ) if event == framework.EventUnscheduledPodAdd.Label() || event == framework.EventUnscheduledPodUpdate.Label() { p.nominator.addNominatedPod(logger, pInfo.PodInfo, nil) } @@ -700,7 +699,6 @@ func (p *PriorityQueue) moveToBackoffQ(logger klog.Logger, pInfo *framework.Queu p.unschedulablePods.delete(pInfo.Pod, gatedBefore) p.backoffQ.add(logger, pInfo, event) - logger.V(5).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pInfo.Pod), "event", event, "queue", backoffQ) return true } @@ -1239,7 +1237,6 @@ func (p *PriorityQueue) movePodsToActiveOrBackoffQueue(logger klog.Logger, podIn p.unschedulablePods.delete(pInfo.Pod, pInfo.Gated()) queue := p.requeuePodWithQueueingStrategy(logger, pInfo, schedulingHint, event.Label()) - logger.V(4).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pInfo.Pod), "event", event.Label(), "queue", queue, "hint", schedulingHint) if queue == activeQ || (p.isPopFromBackoffQEnabled && queue == backoffQ) { activated = true } diff --git a/pkg/scheduler/backend/queue/scheduling_queue_test.go b/pkg/scheduler/backend/queue/scheduling_queue_test.go index 53bbe39d5c4..60a5c5d3117 100644 --- a/pkg/scheduler/backend/queue/scheduling_queue_test.go +++ b/pkg/scheduler/backend/queue/scheduling_queue_test.go @@ -706,7 +706,7 @@ func Test_InFlightPods(t *testing.T) { // Simulate a bug, putting pod into activeQ, while pod is being scheduled. {callback: func(t *testing.T, q *PriorityQueue) { q.activeQ.underLock(func(unlocked unlockedActiveQueuer) { - unlocked.add(newQueuedPodInfoForLookup(pod1), framework.EventUnscheduledPodAdd.Label()) + unlocked.add(logger, newQueuedPodInfoForLookup(pod1), framework.EventUnscheduledPodAdd.Label()) }) }}, // At this point, in the activeQ, we have pod1 and pod3 in this order. @@ -1458,7 +1458,7 @@ func TestPriorityQueue_Activate(t *testing.T) { if tt.qPodInInFlightPod != nil { // Put -> Pop the Pod to make it registered in inFlightPods. q.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) { - unlockedActiveQ.add(newQueuedPodInfoForLookup(tt.qPodInInFlightPod), framework.EventUnscheduledPodAdd.Label()) + unlockedActiveQ.add(logger, newQueuedPodInfoForLookup(tt.qPodInInFlightPod), framework.EventUnscheduledPodAdd.Label()) }) p, err := q.activeQ.pop(logger) if err != nil { @@ -3156,7 +3156,7 @@ var ( } addPodActiveQDirectly = func(t *testing.T, logger klog.Logger, queue *PriorityQueue, pInfo *framework.QueuedPodInfo) { queue.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) { - unlockedActiveQ.add(pInfo, framework.EventUnscheduledPodAdd.Label()) + unlockedActiveQ.add(logger, pInfo, framework.EventUnscheduledPodAdd.Label()) }) } addPodUnschedulablePods = func(t *testing.T, logger klog.Logger, queue *PriorityQueue, pInfo *framework.QueuedPodInfo) { @@ -4352,7 +4352,7 @@ func TestPriorityQueue_GetPod(t *testing.T) { logger, ctx := ktesting.NewTestContext(t) q := NewTestQueue(ctx, newDefaultQueueSort()) q.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) { - unlockedActiveQ.add(newQueuedPodInfoForLookup(activeQPod), framework.EventUnscheduledPodAdd.Label()) + unlockedActiveQ.add(logger, newQueuedPodInfoForLookup(activeQPod), framework.EventUnscheduledPodAdd.Label()) }) q.backoffQ.add(logger, newQueuedPodInfoForLookup(backoffQPod), framework.EventUnscheduledPodAdd.Label()) q.unschedulablePods.addOrUpdate(newQueuedPodInfoForLookup(unschedPod), framework.EventUnscheduledPodAdd.Label())