Merge pull request #133838 from macsko/fix_race_in_movepodstoactiveorbackoffq

Fix race in movePodsToActiveOrBackoffQueue
This commit is contained in:
Kubernetes Prow Robot
2025-09-06 00:31:25 -07:00
committed by GitHub
5 changed files with 12 additions and 12 deletions

View File

@@ -62,7 +62,7 @@ type unlockedActiveQueuer interface {
// add adds a new pod to the activeQ.
// The event should show which event triggered this addition and is used for the metric recording.
// This method should be called in activeQueue.underLock().
add(pInfo *framework.QueuedPodInfo, event string)
add(logger klog.Logger, pInfo *framework.QueuedPodInfo, event string)
// update updates the pod in activeQ if oldPodInfo is already in the queue.
// It returns new pod info if updated, nil otherwise.
update(newPod *v1.Pod, oldPodInfo *framework.QueuedPodInfo) *framework.QueuedPodInfo
@@ -104,9 +104,10 @@ func newUnlockedActiveQueue(queue *heap.Heap[*framework.QueuedPodInfo], inFlight
// add adds a new pod to the activeQ.
// The event should show which event triggered this addition and is used for the metric recording.
// This method should be called in activeQueue.underLock().
func (uaq *unlockedActiveQueue) add(pInfo *framework.QueuedPodInfo, event string) {
func (uaq *unlockedActiveQueue) add(logger klog.Logger, pInfo *framework.QueuedPodInfo, event string) {
uaq.queue.AddOrUpdate(pInfo)
metrics.SchedulerQueueIncomingPods.WithLabelValues("active", event).Inc()
logger.V(5).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pInfo.Pod), "event", event, "queue", activeQ)
}
// update updates the pod in activeQ if oldPodInfo is already in the queue.

View File

@@ -33,8 +33,8 @@ func TestClose(t *testing.T) {
aq := newActiveQueue(heap.NewWithRecorder(podInfoKeyFunc, heap.LessFunc[*framework.QueuedPodInfo](convertLessFn(newDefaultQueueSort())), metrics.NewActivePodsRecorder()), true, rr, nil)
aq.underLock(func(unlockedActiveQ unlockedActiveQueuer) {
unlockedActiveQ.add(&framework.QueuedPodInfo{PodInfo: &framework.PodInfo{Pod: st.MakePod().Namespace("foo").Name("p1").UID("p1").Obj()}}, framework.EventUnscheduledPodAdd.Label())
unlockedActiveQ.add(&framework.QueuedPodInfo{PodInfo: &framework.PodInfo{Pod: st.MakePod().Namespace("bar").Name("p2").UID("p2").Obj()}}, framework.EventUnscheduledPodAdd.Label())
unlockedActiveQ.add(logger, &framework.QueuedPodInfo{PodInfo: &framework.PodInfo{Pod: st.MakePod().Namespace("foo").Name("p1").UID("p1").Obj()}}, framework.EventUnscheduledPodAdd.Label())
unlockedActiveQ.add(logger, &framework.QueuedPodInfo{PodInfo: &framework.PodInfo{Pod: st.MakePod().Namespace("bar").Name("p2").UID("p2").Obj()}}, framework.EventUnscheduledPodAdd.Label())
})
_, err := aq.pop(logger)

View File

@@ -306,6 +306,7 @@ func (bq *backoffQueue) add(logger klog.Logger, pInfo *framework.QueuedPodInfo,
return
}
metrics.SchedulerQueueIncomingPods.WithLabelValues("backoff", event).Inc()
logger.V(5).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pInfo.Pod), "event", event, "queue", backoffQ)
return
}
bq.podBackoffQ.AddOrUpdate(pInfo)
@@ -316,6 +317,7 @@ func (bq *backoffQueue) add(logger klog.Logger, pInfo *framework.QueuedPodInfo,
return
}
metrics.SchedulerQueueIncomingPods.WithLabelValues("backoff", event).Inc()
logger.V(5).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pInfo.Pod), "event", event, "queue", backoffQ)
}
// update updates the pod in backoffQueue if oldPodInfo is already in the queue.

View File

@@ -669,9 +669,8 @@ func (p *PriorityQueue) moveToActiveQ(logger klog.Logger, pInfo *framework.Queue
p.unschedulablePods.delete(pInfo.Pod, gatedBefore)
p.backoffQ.delete(pInfo)
unlockedActiveQ.add(pInfo, event)
unlockedActiveQ.add(logger, pInfo, event)
added = true
logger.V(5).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pInfo.Pod), "event", event, "queue", activeQ)
if event == framework.EventUnscheduledPodAdd.Label() || event == framework.EventUnscheduledPodUpdate.Label() {
p.nominator.addNominatedPod(logger, pInfo.PodInfo, nil)
}
@@ -700,7 +699,6 @@ func (p *PriorityQueue) moveToBackoffQ(logger klog.Logger, pInfo *framework.Queu
p.unschedulablePods.delete(pInfo.Pod, gatedBefore)
p.backoffQ.add(logger, pInfo, event)
logger.V(5).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pInfo.Pod), "event", event, "queue", backoffQ)
return true
}
@@ -1239,7 +1237,6 @@ func (p *PriorityQueue) movePodsToActiveOrBackoffQueue(logger klog.Logger, podIn
p.unschedulablePods.delete(pInfo.Pod, pInfo.Gated())
queue := p.requeuePodWithQueueingStrategy(logger, pInfo, schedulingHint, event.Label())
logger.V(4).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pInfo.Pod), "event", event.Label(), "queue", queue, "hint", schedulingHint)
if queue == activeQ || (p.isPopFromBackoffQEnabled && queue == backoffQ) {
activated = true
}

View File

@@ -706,7 +706,7 @@ func Test_InFlightPods(t *testing.T) {
// Simulate a bug, putting pod into activeQ, while pod is being scheduled.
{callback: func(t *testing.T, q *PriorityQueue) {
q.activeQ.underLock(func(unlocked unlockedActiveQueuer) {
unlocked.add(newQueuedPodInfoForLookup(pod1), framework.EventUnscheduledPodAdd.Label())
unlocked.add(logger, newQueuedPodInfoForLookup(pod1), framework.EventUnscheduledPodAdd.Label())
})
}},
// At this point, in the activeQ, we have pod1 and pod3 in this order.
@@ -1458,7 +1458,7 @@ func TestPriorityQueue_Activate(t *testing.T) {
if tt.qPodInInFlightPod != nil {
// Put -> Pop the Pod to make it registered in inFlightPods.
q.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) {
unlockedActiveQ.add(newQueuedPodInfoForLookup(tt.qPodInInFlightPod), framework.EventUnscheduledPodAdd.Label())
unlockedActiveQ.add(logger, newQueuedPodInfoForLookup(tt.qPodInInFlightPod), framework.EventUnscheduledPodAdd.Label())
})
p, err := q.activeQ.pop(logger)
if err != nil {
@@ -3156,7 +3156,7 @@ var (
}
addPodActiveQDirectly = func(t *testing.T, logger klog.Logger, queue *PriorityQueue, pInfo *framework.QueuedPodInfo) {
queue.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) {
unlockedActiveQ.add(pInfo, framework.EventUnscheduledPodAdd.Label())
unlockedActiveQ.add(logger, pInfo, framework.EventUnscheduledPodAdd.Label())
})
}
addPodUnschedulablePods = func(t *testing.T, logger klog.Logger, queue *PriorityQueue, pInfo *framework.QueuedPodInfo) {
@@ -4352,7 +4352,7 @@ func TestPriorityQueue_GetPod(t *testing.T) {
logger, ctx := ktesting.NewTestContext(t)
q := NewTestQueue(ctx, newDefaultQueueSort())
q.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) {
unlockedActiveQ.add(newQueuedPodInfoForLookup(activeQPod), framework.EventUnscheduledPodAdd.Label())
unlockedActiveQ.add(logger, newQueuedPodInfoForLookup(activeQPod), framework.EventUnscheduledPodAdd.Label())
})
q.backoffQ.add(logger, newQueuedPodInfoForLookup(backoffQPod), framework.EventUnscheduledPodAdd.Label())
q.unschedulablePods.addOrUpdate(newQueuedPodInfoForLookup(unschedPod), framework.EventUnscheduledPodAdd.Label())