diff --git a/pkg/scheduler/backend/queue/active_queue.go b/pkg/scheduler/backend/queue/active_queue.go index 456df160d3c..58479dd836f 100644 --- a/pkg/scheduler/backend/queue/active_queue.go +++ b/pkg/scheduler/backend/queue/active_queue.go @@ -186,6 +186,11 @@ func (aq *activeQueue) delete(pInfo *framework.QueuedPodInfo) error { func (aq *activeQueue) pop(logger klog.Logger) (*framework.QueuedPodInfo, error) { aq.lock.Lock() defer aq.lock.Unlock() + + return aq.unlockedPop(logger) +} + +func (aq *activeQueue) unlockedPop(logger klog.Logger) (*framework.QueuedPodInfo, error) { for aq.queue.Len() == 0 { // When the queue is empty, invocation of Pop() is blocked until new item is enqueued. // When Close() is called, the p.closed is set and the condition is broadcast, @@ -201,12 +206,22 @@ func (aq *activeQueue) pop(logger klog.Logger) (*framework.QueuedPodInfo, error) return nil, err } pInfo.Attempts++ - aq.schedCycle++ // In flight, no concurrent events yet. if aq.isSchedulingQueueHintEnabled { + // If the pod is already in the map, we shouldn't overwrite the inFlightPods otherwise it'd lead to a memory leak. + // https://github.com/kubernetes/kubernetes/pull/127016 + if _, ok := aq.inFlightPods[pInfo.Pod.UID]; ok { + // Just report it as an error, but no need to stop the scheduler + // because it likely doesn't cause any visible issues from the scheduling perspective. + logger.Error(nil, "the same pod is tracked in multiple places in the scheduler, and just discard it", "pod", klog.KObj(pInfo.Pod)) + // Just ignore/discard this duplicated pod and try to pop the next one. + return aq.unlockedPop(logger) + } + aq.metricsRecorder.ObserveInFlightEventsAsync(metrics.PodPoppedInFlightEvent, 1, false) aq.inFlightPods[pInfo.Pod.UID] = aq.inFlightEvents.PushBack(pInfo.Pod) } + aq.schedCycle++ // Update metrics and reset the set of unschedulable plugins for the next attempt. for plugin := range pInfo.UnschedulablePlugins.Union(pInfo.PendingPlugins) { diff --git a/pkg/scheduler/backend/queue/scheduling_queue_test.go b/pkg/scheduler/backend/queue/scheduling_queue_test.go index bcc2f544555..fb2507765f8 100644 --- a/pkg/scheduler/backend/queue/scheduling_queue_test.go +++ b/pkg/scheduler/backend/queue/scheduling_queue_test.go @@ -173,7 +173,7 @@ func TestPriorityQueue_AddWithReversePriorityLessFunc(t *testing.T) { func Test_InFlightPods(t *testing.T) { logger, _ := ktesting.NewTestContext(t) - pod := st.MakePod().Name("targetpod").UID("pod1").Obj() + pod1 := st.MakePod().Name("targetpod").UID("pod1").Obj() pod2 := st.MakePod().Name("targetpod2").UID("pod2").Obj() pod3 := st.MakePod().Name("targetpod3").UID("pod3").Obj() var poppedPod, poppedPod2 *framework.QueuedPodInfo @@ -182,8 +182,11 @@ func Test_InFlightPods(t *testing.T) { // ONLY ONE of the following should be set. eventHappens *framework.ClusterEvent podPopped *v1.Pod - podEnqueued *framework.QueuedPodInfo - callback func(t *testing.T, q *PriorityQueue) + // podCreated is the Pod that is created and inserted into the activeQ. + podCreated *v1.Pod + // podEnqueued is the Pod that is enqueued back to activeQ. + podEnqueued *framework.QueuedPodInfo + callback func(t *testing.T, q *PriorityQueue) } tests := []struct { @@ -201,10 +204,10 @@ func Test_InFlightPods(t *testing.T) { }{ { name: "when SchedulingQueueHint is disabled, inFlightPods and inFlightEvents should be empty", - initialPods: []*v1.Pod{pod}, + initialPods: []*v1.Pod{pod1}, actions: []action{ // This Pod shouldn't be added to inFlightPods because SchedulingQueueHint is disabled. - {podPopped: pod}, + {podPopped: pod1}, // This event shouldn't be added to inFlightEvents because SchedulingQueueHint is disabled. {eventHappens: &framework.PvAdd}, }, @@ -224,18 +227,18 @@ func Test_InFlightPods(t *testing.T) { { name: "Pod and interested events are registered in inFlightPods/inFlightEvents", isSchedulingQueueHintEnabled: true, - initialPods: []*v1.Pod{pod}, + initialPods: []*v1.Pod{pod1}, actions: []action{ // This won't be added to inFlightEvents because no inFlightPods at this point. {eventHappens: &framework.PvcAdd}, - {podPopped: pod}, + {podPopped: pod1}, // This gets added for the pod. {eventHappens: &framework.PvAdd}, // This doesn't get added because no plugin is interested in framework.PvUpdate. {eventHappens: &framework.PvUpdate}, }, - wantInFlightPods: []*v1.Pod{pod}, - wantInFlightEvents: []interface{}{pod, framework.PvAdd}, + wantInFlightPods: []*v1.Pod{pod1}, + wantInFlightEvents: []interface{}{pod1, framework.PvAdd}, queueingHintMap: QueueingHintMapPerProfile{ "": { framework.PvAdd: { @@ -250,16 +253,16 @@ func Test_InFlightPods(t *testing.T) { { name: "Pod, registered in inFlightPods, is enqueued back to activeQ", isSchedulingQueueHintEnabled: true, - initialPods: []*v1.Pod{pod, pod2}, + initialPods: []*v1.Pod{pod1, pod2}, actions: []action{ // This won't be added to inFlightEvents because no inFlightPods at this point. {eventHappens: &framework.PvcAdd}, - {podPopped: pod}, + {podPopped: pod1}, {eventHappens: &framework.PvAdd}, {podPopped: pod2}, {eventHappens: &framework.NodeAdd}, // This pod will be requeued to backoffQ because no plugin is registered as unschedulable plugin. - {podEnqueued: newQueuedPodInfoForLookup(pod)}, + {podEnqueued: newQueuedPodInfoForLookup(pod1)}, }, wantBackoffQPodNames: []string{"targetpod"}, wantInFlightPods: []*v1.Pod{pod2}, // only pod2 is registered because pod is already enqueued back. @@ -290,16 +293,16 @@ func Test_InFlightPods(t *testing.T) { { name: "All Pods registered in inFlightPods are enqueued back to activeQ", isSchedulingQueueHintEnabled: true, - initialPods: []*v1.Pod{pod, pod2}, + initialPods: []*v1.Pod{pod1, pod2}, actions: []action{ // This won't be added to inFlightEvents because no inFlightPods at this point. {eventHappens: &framework.PvcAdd}, - {podPopped: pod}, + {podPopped: pod1}, {eventHappens: &framework.PvAdd}, {podPopped: pod2}, {eventHappens: &framework.NodeAdd}, // This pod will be requeued to backoffQ because no plugin is registered as unschedulable plugin. - {podEnqueued: newQueuedPodInfoForLookup(pod)}, + {podEnqueued: newQueuedPodInfoForLookup(pod1)}, {eventHappens: &framework.CSINodeUpdate}, // This pod will be requeued to backoffQ because no plugin is registered as unschedulable plugin. {podEnqueued: newQueuedPodInfoForLookup(pod2)}, @@ -338,11 +341,11 @@ func Test_InFlightPods(t *testing.T) { { name: "One intermediate Pod registered in inFlightPods is enqueued back to activeQ", isSchedulingQueueHintEnabled: true, - initialPods: []*v1.Pod{pod, pod2, pod3}, + initialPods: []*v1.Pod{pod1, pod2, pod3}, actions: []action{ // This won't be added to inFlightEvents because no inFlightPods at this point. {eventHappens: &framework.PvcAdd}, - {podPopped: pod}, + {podPopped: pod1}, {eventHappens: &framework.PvAdd}, {podPopped: pod2}, {eventHappens: &framework.NodeAdd}, @@ -352,8 +355,8 @@ func Test_InFlightPods(t *testing.T) { {podEnqueued: newQueuedPodInfoForLookup(pod2)}, }, wantBackoffQPodNames: []string{"targetpod2"}, - wantInFlightPods: []*v1.Pod{pod, pod3}, - wantInFlightEvents: []interface{}{pod, framework.PvAdd, framework.NodeAdd, pod3, framework.AssignedPodAdd}, + wantInFlightPods: []*v1.Pod{pod1, pod3}, + wantInFlightEvents: []interface{}{pod1, framework.PvAdd, framework.NodeAdd, pod3, framework.AssignedPodAdd}, queueingHintMap: QueueingHintMapPerProfile{ "": { framework.PvAdd: { @@ -379,11 +382,11 @@ func Test_InFlightPods(t *testing.T) { }, { name: "pod is enqueued to queue without QueueingHint when SchedulingQueueHint is disabled", - initialPods: []*v1.Pod{pod}, + initialPods: []*v1.Pod{pod1}, actions: []action{ - {podPopped: pod}, + {podPopped: pod1}, {eventHappens: &framework.AssignedPodAdd}, - {podEnqueued: newQueuedPodInfoForLookup(pod, "fooPlugin1")}, + {podEnqueued: newQueuedPodInfoForLookup(pod1, "fooPlugin1")}, }, wantBackoffQPodNames: []string{"targetpod"}, wantInFlightPods: nil, @@ -404,13 +407,13 @@ func Test_InFlightPods(t *testing.T) { { name: "events before popping Pod are ignored when Pod is enqueued back to queue", isSchedulingQueueHintEnabled: true, - initialPods: []*v1.Pod{pod}, + initialPods: []*v1.Pod{pod1}, actions: []action{ {eventHappens: &framework.WildCardEvent}, - {podPopped: pod}, + {podPopped: pod1}, {eventHappens: &framework.AssignedPodAdd}, // This Pod won't be requeued to activeQ/backoffQ because fooPlugin1 returns QueueSkip. - {podEnqueued: newQueuedPodInfoForLookup(pod, "fooPlugin1")}, + {podEnqueued: newQueuedPodInfoForLookup(pod1, "fooPlugin1")}, }, wantUnschedPodPoolPodNames: []string{"targetpod"}, wantInFlightPods: nil, @@ -431,11 +434,11 @@ func Test_InFlightPods(t *testing.T) { { name: "pod is enqueued to backoff if no failed plugin", isSchedulingQueueHintEnabled: true, - initialPods: []*v1.Pod{pod}, + initialPods: []*v1.Pod{pod1}, actions: []action{ - {podPopped: pod}, + {podPopped: pod1}, {eventHappens: &framework.AssignedPodAdd}, - {podEnqueued: newQueuedPodInfoForLookup(pod)}, + {podEnqueued: newQueuedPodInfoForLookup(pod1)}, }, wantBackoffQPodNames: []string{"targetpod"}, wantInFlightPods: nil, @@ -455,11 +458,11 @@ func Test_InFlightPods(t *testing.T) { { name: "pod is enqueued to unschedulable pod pool if no events that can make the pod schedulable", isSchedulingQueueHintEnabled: true, - initialPods: []*v1.Pod{pod}, + initialPods: []*v1.Pod{pod1}, actions: []action{ - {podPopped: pod}, + {podPopped: pod1}, {eventHappens: &framework.NodeAdd}, - {podEnqueued: newQueuedPodInfoForLookup(pod, "fooPlugin1")}, + {podEnqueued: newQueuedPodInfoForLookup(pod1, "fooPlugin1")}, }, wantUnschedPodPoolPodNames: []string{"targetpod"}, wantInFlightPods: nil, @@ -480,11 +483,11 @@ func Test_InFlightPods(t *testing.T) { { name: "pod is enqueued to unschedulable pod pool because the failed plugin has a hint fn but it returns Skip", isSchedulingQueueHintEnabled: true, - initialPods: []*v1.Pod{pod}, + initialPods: []*v1.Pod{pod1}, actions: []action{ - {podPopped: pod}, + {podPopped: pod1}, {eventHappens: &framework.AssignedPodAdd}, - {podEnqueued: newQueuedPodInfoForLookup(pod, "fooPlugin1")}, + {podEnqueued: newQueuedPodInfoForLookup(pod1, "fooPlugin1")}, }, wantUnschedPodPoolPodNames: []string{"targetpod"}, wantInFlightPods: nil, @@ -505,12 +508,12 @@ func Test_InFlightPods(t *testing.T) { { name: "pod is enqueued to activeQ because the Pending plugins has a hint fn and it returns Queue", isSchedulingQueueHintEnabled: true, - initialPods: []*v1.Pod{pod}, + initialPods: []*v1.Pod{pod1}, actions: []action{ - {podPopped: pod}, + {podPopped: pod1}, {eventHappens: &framework.AssignedPodAdd}, {podEnqueued: &framework.QueuedPodInfo{ - PodInfo: mustNewPodInfo(pod), + PodInfo: mustNewPodInfo(pod1), UnschedulablePlugins: sets.New("fooPlugin2", "fooPlugin3"), PendingPlugins: sets.New("fooPlugin1"), }}, @@ -541,11 +544,11 @@ func Test_InFlightPods(t *testing.T) { { name: "pod is enqueued to backoffQ because the failed plugin has a hint fn and it returns Queue", isSchedulingQueueHintEnabled: true, - initialPods: []*v1.Pod{pod}, + initialPods: []*v1.Pod{pod1}, actions: []action{ - {podPopped: pod}, + {podPopped: pod1}, {eventHappens: &framework.AssignedPodAdd}, - {podEnqueued: newQueuedPodInfoForLookup(pod, "fooPlugin1", "fooPlugin2")}, + {podEnqueued: newQueuedPodInfoForLookup(pod1, "fooPlugin1", "fooPlugin2")}, }, wantBackoffQPodNames: []string{"targetpod"}, wantInFlightPods: nil, @@ -570,9 +573,9 @@ func Test_InFlightPods(t *testing.T) { { name: "pod is enqueued to activeQ because the failed plugin has a hint fn and it returns Queue for a concurrent event that was received while some other pod was in flight", isSchedulingQueueHintEnabled: true, - initialPods: []*v1.Pod{pod, pod2}, + initialPods: []*v1.Pod{pod1, pod2}, actions: []action{ - {callback: func(t *testing.T, q *PriorityQueue) { poppedPod = popPod(t, logger, q, pod) }}, + {callback: func(t *testing.T, q *PriorityQueue) { poppedPod = popPod(t, logger, q, pod1) }}, {eventHappens: &framework.NodeAdd}, {callback: func(t *testing.T, q *PriorityQueue) { poppedPod2 = popPod(t, logger, q, pod2) }}, {eventHappens: &framework.AssignedPodAdd}, @@ -622,9 +625,9 @@ func Test_InFlightPods(t *testing.T) { { name: "popped pod must have empty UnschedulablePlugins and PendingPlugins", isSchedulingQueueHintEnabled: true, - initialPods: []*v1.Pod{pod}, + initialPods: []*v1.Pod{pod1}, actions: []action{ - {callback: func(t *testing.T, q *PriorityQueue) { poppedPod = popPod(t, logger, q, pod) }}, + {callback: func(t *testing.T, q *PriorityQueue) { poppedPod = popPod(t, logger, q, pod1) }}, {callback: func(t *testing.T, q *PriorityQueue) { logger, _ := ktesting.NewTestContext(t) // Unschedulable due to PendingPlugins. @@ -636,7 +639,7 @@ func Test_InFlightPods(t *testing.T) { }}, {eventHappens: &framework.PvAdd}, // Active again. {callback: func(t *testing.T, q *PriorityQueue) { - poppedPod = popPod(t, logger, q, pod) + poppedPod = popPod(t, logger, q, pod1) if len(poppedPod.UnschedulablePlugins) > 0 { t.Errorf("QueuedPodInfo from Pop should have empty UnschedulablePlugins, got instead: %+v", poppedPod) } @@ -661,6 +664,76 @@ func Test_InFlightPods(t *testing.T) { }, }, }, + { + // This scenario shouldn't happen unless we make the similar bug like https://github.com/kubernetes/kubernetes/issues/118226. + // But, given the bug could make a serious memory leak and likely would be hard to detect, + // we should have a safe guard from the same bug so that, at least, we can prevent the memory leak. + name: "Pop is made twice for the same Pod, but the cleanup still happen correctly", + isSchedulingQueueHintEnabled: true, + initialPods: []*v1.Pod{pod1, pod2}, + actions: []action{ + // This won't be added to inFlightEvents because no inFlightPods at this point. + {eventHappens: &framework.PvcAdd}, + {podPopped: pod1}, + {eventHappens: &framework.PvAdd}, + {podPopped: pod2}, + // Simulate a bug, putting pod into activeQ, while pod is being scheduled. + {callback: func(t *testing.T, q *PriorityQueue) { + q.activeQ.underLock(func(unlocked unlockedActiveQueuer) { + unlocked.AddOrUpdate(newQueuedPodInfoForLookup(pod1)) + }) + }}, + // At this point, in the activeQ, we have pod1 and pod3 in this order. + {podCreated: pod3}, + // pod3 is poped, not pod1. + // In detail, this Pop() first tries to pop pod1, but it's already being scheduled and hence discarded. + // Then, it pops the next pod, pod3. + {podPopped: pod3}, + {callback: func(t *testing.T, q *PriorityQueue) { + // Make sure that pod1 is discarded and hence no pod in activeQ. + if len(q.activeQ.list()) != 0 { + t.Fatalf("activeQ should be empty, but got: %v", q.activeQ.list()) + } + }}, + {eventHappens: &framework.NodeAdd}, + // This pod will be requeued to backoffQ because no plugin is registered as unschedulable plugin. + {podEnqueued: newQueuedPodInfoForLookup(pod1)}, + {eventHappens: &framework.CSINodeUpdate}, + // This pod will be requeued to backoffQ because no plugin is registered as unschedulable plugin. + {podEnqueued: newQueuedPodInfoForLookup(pod2)}, + {podEnqueued: newQueuedPodInfoForLookup(pod3)}, + }, + wantBackoffQPodNames: []string{"targetpod", "targetpod2", "targetpod3"}, + wantInFlightPods: nil, // should be empty + queueingHintMap: QueueingHintMapPerProfile{ + "": { + framework.PvAdd: { + { + PluginName: "fooPlugin1", + QueueingHintFn: queueHintReturnQueue, + }, + }, + framework.NodeAdd: { + { + PluginName: "fooPlugin1", + QueueingHintFn: queueHintReturnQueue, + }, + }, + framework.PvcAdd: { + { + PluginName: "fooPlugin1", + QueueingHintFn: queueHintReturnQueue, + }, + }, + framework.CSINodeUpdate: { + { + PluginName: "fooPlugin1", + QueueingHintFn: queueHintReturnQueue, + }, + }, + }, + }, + }, } for _, test := range tests { @@ -692,6 +765,8 @@ func Test_InFlightPods(t *testing.T) { for _, action := range test.actions { switch { + case action.podCreated != nil: + q.Add(logger, action.podCreated) case action.podPopped != nil: popPod(t, logger, q, action.podPopped) case action.eventHappens != nil: