diff --git a/pkg/scheduler/eventhandlers.go b/pkg/scheduler/eventhandlers.go index 44311f5b247..2001b9ed91e 100644 --- a/pkg/scheduler/eventhandlers.go +++ b/pkg/scheduler/eventhandlers.go @@ -205,7 +205,20 @@ func (sched *Scheduler) addPodToCache(obj interface{}) { logger.Error(err, "Scheduler cache AddPod failed", "pod", klog.KObj(pod)) } - sched.SchedulingQueue.AssignedPodAdded(logger, pod) + // SchedulingQueue.AssignedPodAdded has a problem: + // It internally pre-filters Pods to move to activeQ, + // while taking only in-tree plugins into consideration. + // Consequently, if custom plugins that subscribes Pod/Add events reject Pods, + // those Pods will never be requeued to activeQ by an assigned Pod related events, + // and they may be stuck in unschedulableQ. + // + // Here we use MoveAllToActiveOrBackoffQueue only when QueueingHint is enabled. + // (We cannot switch to MoveAllToActiveOrBackoffQueue right away because of throughput concern.) + if utilfeature.DefaultFeatureGate.Enabled(features.SchedulerQueueingHints) { + sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, queue.AssignedPodAdd, nil, pod, nil) + } else { + sched.SchedulingQueue.AssignedPodAdded(logger, pod) + } } func (sched *Scheduler) updatePodInCache(oldObj, newObj interface{}) { @@ -226,7 +239,20 @@ func (sched *Scheduler) updatePodInCache(oldObj, newObj interface{}) { logger.Error(err, "Scheduler cache UpdatePod failed", "pod", klog.KObj(oldPod)) } - sched.SchedulingQueue.AssignedPodUpdated(logger, oldPod, newPod) + // SchedulingQueue.AssignedPodUpdated has a problem: + // It internally pre-filters Pods to move to activeQ, + // while taking only in-tree plugins into consideration. + // Consequently, if custom plugins that subscribes Pod/Update events reject Pods, + // those Pods will never be requeued to activeQ by an assigned Pod related events, + // and they may be stuck in unschedulableQ. + // + // Here we use MoveAllToActiveOrBackoffQueue only when QueueingHint is enabled. + // (We cannot switch to MoveAllToActiveOrBackoffQueue right away because of throughput concern.) + if utilfeature.DefaultFeatureGate.Enabled(features.SchedulerQueueingHints) { + sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, queue.AssignedPodUpdate, oldPod, newPod, nil) + } else { + sched.SchedulingQueue.AssignedPodUpdated(logger, oldPod, newPod) + } } func (sched *Scheduler) deletePodFromCache(obj interface{}) { diff --git a/pkg/scheduler/internal/queue/scheduling_queue.go b/pkg/scheduler/internal/queue/scheduling_queue.go index 98f7d64d28a..98367a63d55 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue.go +++ b/pkg/scheduler/internal/queue/scheduling_queue.go @@ -48,6 +48,7 @@ import ( "k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/interpodaffinity" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/names" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/podtopologyspread" "k8s.io/kubernetes/pkg/scheduler/internal/heap" "k8s.io/kubernetes/pkg/scheduler/metrics" "k8s.io/kubernetes/pkg/scheduler/util" @@ -1092,7 +1093,10 @@ func (p *PriorityQueue) Delete(pod *v1.Pod) error { // may make pending pods with matching affinity terms schedulable. func (p *PriorityQueue) AssignedPodAdded(logger klog.Logger, pod *v1.Pod) { p.lock.Lock() - p.movePodsToActiveOrBackoffQueue(logger, p.getUnschedulablePodsWithMatchingAffinityTerm(logger, pod), AssignedPodAdd, nil, pod) + + // Pre-filter Pods to move by getUnschedulablePodsWithCrossTopologyTerm + // because Pod related events shouldn't make Pods that rejected by single-node scheduling requirement schedulable. + p.movePodsToActiveOrBackoffQueue(logger, p.getUnschedulablePodsWithCrossTopologyTerm(logger, pod), AssignedPodAdd, nil, pod) p.lock.Unlock() } @@ -1115,9 +1119,13 @@ func isPodResourcesResizedDown(pod *v1.Pod) bool { func (p *PriorityQueue) AssignedPodUpdated(logger klog.Logger, oldPod, newPod *v1.Pod) { p.lock.Lock() if isPodResourcesResizedDown(newPod) { + // In this case, we don't want to pre-filter Pods by getUnschedulablePodsWithCrossTopologyTerm + // because Pod related events may make Pods that were rejected by NodeResourceFit schedulable. p.moveAllToActiveOrBackoffQueue(logger, AssignedPodUpdate, oldPod, newPod, nil) } else { - p.movePodsToActiveOrBackoffQueue(logger, p.getUnschedulablePodsWithMatchingAffinityTerm(logger, newPod), AssignedPodUpdate, oldPod, newPod) + // Pre-filter Pods to move by getUnschedulablePodsWithCrossTopologyTerm + // because Pod related events only make Pods rejected by cross topology term schedulable. + p.movePodsToActiveOrBackoffQueue(logger, p.getUnschedulablePodsWithCrossTopologyTerm(logger, newPod), AssignedPodUpdate, oldPod, newPod) } p.lock.Unlock() } @@ -1258,22 +1266,29 @@ func (p *PriorityQueue) movePodsToActiveOrBackoffQueue(logger klog.Logger, podIn } } -// getUnschedulablePodsWithMatchingAffinityTerm returns unschedulable pods which have -// any affinity term that matches "pod". +// getUnschedulablePodsWithCrossTopologyTerm returns unschedulable pods which either of following conditions is met: +// - have any affinity term that matches "pod". +// - rejected by PodTopologySpread plugin. // NOTE: this function assumes lock has been acquired in caller. -func (p *PriorityQueue) getUnschedulablePodsWithMatchingAffinityTerm(logger klog.Logger, pod *v1.Pod) []*framework.QueuedPodInfo { +func (p *PriorityQueue) getUnschedulablePodsWithCrossTopologyTerm(logger klog.Logger, pod *v1.Pod) []*framework.QueuedPodInfo { nsLabels := interpodaffinity.GetNamespaceLabelsSnapshot(logger, pod.Namespace, p.nsLister) var podsToMove []*framework.QueuedPodInfo for _, pInfo := range p.unschedulablePods.podInfoMap { + if pInfo.UnschedulablePlugins.Has(podtopologyspread.Name) && pod.Namespace == pInfo.Pod.Namespace { + // This Pod may be schedulable now by this Pod event. + podsToMove = append(podsToMove, pInfo) + continue + } + for _, term := range pInfo.RequiredAffinityTerms { if term.Matches(pod, nsLabels) { podsToMove = append(podsToMove, pInfo) break } } - } + return podsToMove } diff --git a/pkg/scheduler/internal/queue/scheduling_queue_test.go b/pkg/scheduler/internal/queue/scheduling_queue_test.go index 85edacba2d6..88131fa3017 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue_test.go +++ b/pkg/scheduler/internal/queue/scheduling_queue_test.go @@ -1932,56 +1932,97 @@ func expectInFlightPods(t *testing.T, q *PriorityQueue, uids ...types.UID) { // TestPriorityQueue_AssignedPodAdded tests AssignedPodAdded. It checks that // when a pod with pod affinity is in unschedulablePods and another pod with a // matching label is added, the unschedulable pod is moved to activeQ. -func TestPriorityQueue_AssignedPodAdded(t *testing.T) { - logger, ctx := ktesting.NewTestContext(t) - ctx, cancel := context.WithCancel(ctx) - defer cancel() - - affinityPod := st.MakePod().Name("afp").Namespace("ns1").UID("afp").Annotation("annot2", "val2").Priority(mediumPriority).NominatedNodeName("node1").PodAffinityExists("service", "region", st.PodAffinityWithRequiredReq).Obj() - labelPod := st.MakePod().Name("lbp").Namespace(affinityPod.Namespace).Label("service", "securityscan").Node("node1").Obj() - - c := testingclock.NewFakeClock(time.Now()) - m := makeEmptyQueueingHintMapPerProfile() - m[""][AssignedPodAdd] = []*QueueingHintFunction{ +func TestPriorityQueue_AssignedPodAdded_(t *testing.T) { + tests := []struct { + name string + unschedPod *v1.Pod + unschedPlugin string + updatedAssignedPod *v1.Pod + wantToRequeue bool + }{ { - PluginName: "fakePlugin", - QueueingHintFn: queueHintReturnQueue, + name: "Pod rejected by pod affinity is requeued with matching Pod's update", + unschedPod: st.MakePod().Name("afp").Namespace("ns1").UID("afp").Annotation("annot2", "val2").PodAffinityExists("service", "region", st.PodAffinityWithRequiredReq).Obj(), + unschedPlugin: names.InterPodAffinity, + updatedAssignedPod: st.MakePod().Name("lbp").Namespace("ns1").Label("service", "securityscan").Node("node1").Obj(), + wantToRequeue: true, + }, + { + name: "Pod rejected by pod affinity isn't requeued with unrelated Pod's update", + unschedPod: st.MakePod().Name("afp").Namespace("ns1").UID("afp").Annotation("annot2", "val2").PodAffinityExists("service", "region", st.PodAffinityWithRequiredReq).Obj(), + unschedPlugin: names.InterPodAffinity, + updatedAssignedPod: st.MakePod().Name("lbp").Namespace("unrelated").Label("unrelated", "unrelated").Node("node1").Obj(), + wantToRequeue: false, + }, + { + name: "Pod rejected by pod topology spread is requeued with Pod's update in the same namespace", + unschedPod: st.MakePod().Name("tsp").Namespace("ns1").UID("tsp").SpreadConstraint(1, "node", v1.DoNotSchedule, nil, nil, nil, nil, nil).Obj(), + unschedPlugin: names.PodTopologySpread, + updatedAssignedPod: st.MakePod().Name("lbp").Namespace("ns1").Label("service", "securityscan").Node("node1").Obj(), + wantToRequeue: true, + }, + { + name: "Pod rejected by pod topology spread isn't requeued with unrelated Pod's update", + unschedPod: st.MakePod().Name("afp").Namespace("ns1").UID("afp").Annotation("annot2", "val2").PodAffinityExists("service", "region", st.PodAffinityWithRequiredReq).Obj(), + unschedPlugin: names.PodTopologySpread, + updatedAssignedPod: st.MakePod().Name("lbp").Namespace("unrelated").Label("unrelated", "unrelated").Node("node1").Obj(), + wantToRequeue: false, + }, + { + name: "Pod rejected by other plugins isn't requeued with any Pod's update", + unschedPod: st.MakePod().Name("afp").Namespace("ns1").UID("afp").Annotation("annot2", "val2").Obj(), + unschedPlugin: "fakePlugin", + updatedAssignedPod: st.MakePod().Name("lbp").Namespace("unrelated").Label("unrelated", "unrelated").Node("node1").Obj(), + wantToRequeue: false, }, } - q := NewTestQueue(ctx, newDefaultQueueSort(), WithClock(c), WithQueueingHintMapPerProfile(m)) - // To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before AddUnschedulableIfNotPresent()s below. - q.activeQ.Add(q.newQueuedPodInfo(unschedulablePodInfo.Pod)) - if p, err := q.Pop(logger); err != nil || p.Pod != unschedulablePodInfo.Pod { - t.Errorf("Expected: %v after Pop, but got: %v", unschedulablePodInfo.Pod.Name, p.Pod.Name) - } - q.activeQ.Add(q.newQueuedPodInfo(affinityPod)) - if p, err := q.Pop(logger); err != nil || p.Pod != affinityPod { - t.Errorf("Expected: %v after Pop, but got: %v", affinityPod.Name, p.Pod.Name) - } - q.Add(logger, medPriorityPodInfo.Pod) - err := q.AddUnschedulableIfNotPresent(logger, q.newQueuedPodInfo(unschedulablePodInfo.Pod, "fakePlugin"), q.SchedulingCycle()) - if err != nil { - t.Fatalf("unexpected error from AddUnschedulableIfNotPresent: %v", err) - } - err = q.AddUnschedulableIfNotPresent(logger, q.newQueuedPodInfo(affinityPod, "fakePlugin"), q.SchedulingCycle()) - if err != nil { - t.Fatalf("unexpected error from AddUnschedulableIfNotPresent: %v", err) - } - // Move clock to make the unschedulable pods complete backoff. - c.Step(DefaultPodInitialBackoffDuration + time.Second) - // Simulate addition of an assigned pod. The pod has matching labels for - // affinityPod. So, affinityPod should go to activeQ. - q.AssignedPodAdded(logger, labelPod) - if getUnschedulablePod(q, affinityPod) != nil { - t.Error("affinityPod is still in the unschedulablePods.") - } - if _, exists, _ := q.activeQ.Get(newQueuedPodInfoForLookup(affinityPod)); !exists { - t.Error("affinityPod is not moved to activeQ.") - } - // Check that the other pod is still in the unschedulablePods. - if getUnschedulablePod(q, unschedulablePodInfo.Pod) == nil { - t.Error("unschedulablePodInfo is not in the unschedulablePods.") + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + logger, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + c := testingclock.NewFakeClock(time.Now()) + m := makeEmptyQueueingHintMapPerProfile() + m[""][AssignedPodAdd] = []*QueueingHintFunction{ + { + PluginName: "fakePlugin", + QueueingHintFn: queueHintReturnQueue, + }, + { + PluginName: names.InterPodAffinity, + QueueingHintFn: queueHintReturnQueue, + }, + { + PluginName: names.PodTopologySpread, + QueueingHintFn: queueHintReturnQueue, + }, + } + q := NewTestQueue(ctx, newDefaultQueueSort(), WithClock(c), WithQueueingHintMapPerProfile(m)) + + // To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before AddUnschedulableIfNotPresent()s below. + if err := q.activeQ.Add(q.newQueuedPodInfo(tt.unschedPod)); err != nil { + t.Errorf("failed to add pod to activeQ: %v", err) + } + if p, err := q.Pop(logger); err != nil || p.Pod != tt.unschedPod { + t.Errorf("Expected: %v after Pop, but got: %v", tt.unschedPod.Name, p.Pod.Name) + } + + err := q.AddUnschedulableIfNotPresent(logger, q.newQueuedPodInfo(tt.unschedPod, tt.unschedPlugin), q.SchedulingCycle()) + if err != nil { + t.Fatalf("unexpected error from AddUnschedulableIfNotPresent: %v", err) + } + + // Move clock to make the unschedulable pods complete backoff. + c.Step(DefaultPodInitialBackoffDuration + time.Second) + + q.AssignedPodAdded(logger, tt.updatedAssignedPod) + + if _, exists, _ := q.activeQ.Get(newQueuedPodInfoForLookup(tt.unschedPod)); exists != tt.wantToRequeue { + t.Fatalf("unexpected Pod move: Pod should be requeued: %v. Pod is actually requeued: %v", tt.wantToRequeue, exists) + } + }) } } diff --git a/test/integration/scheduler/queue_test.go b/test/integration/scheduler/queue_test.go index c0c9f258202..0ff6b0a4352 100644 --- a/test/integration/scheduler/queue_test.go +++ b/test/integration/scheduler/queue_test.go @@ -53,6 +53,7 @@ import ( testutils "k8s.io/kubernetes/test/integration/util" imageutils "k8s.io/kubernetes/test/utils/image" "k8s.io/utils/pointer" + "k8s.io/utils/ptr" ) func TestSchedulingGates(t *testing.T) { @@ -362,6 +363,25 @@ func TestCoreResourceEnqueue(t *testing.T) { }, wantRequeuedPods: sets.New("pod1"), }, + { + name: "Pods with PodTopologySpread should be requeued when a Pod with matching label is scheduled", + initialNodes: []*v1.Node{st.MakeNode().Name("fake-node").Label("node", "fake-node").Capacity(map[v1.ResourceName]string{v1.ResourceCPU: "2"}).Obj()}, + initialPod: st.MakePod().Name("pod1").Label("key", "val").Container("image").Node("fake-node").Obj(), + pods: []*v1.Pod{ + // - Pod2 will be rejected by the PodTopologySpread plugin. + st.MakePod().Name("pod2").Label("key", "val").SpreadConstraint(1, "node", v1.DoNotSchedule, st.MakeLabelSelector().Exists("key").Obj(), ptr.To(int32(3)), nil, nil, nil).Container("image").Obj(), + }, + triggerFn: func(testCtx *testutils.TestContext) error { + // Trigger an assigned Pod add event. + pod := st.MakePod().Name("pod3").Label("key", "val").Node("fake-node").Container("image").Obj() + if _, err := testCtx.ClientSet.CoreV1().Pods(testCtx.NS.Name).Create(testCtx.Ctx, pod, metav1.CreateOptions{}); err != nil { + return fmt.Errorf("failed to create Pod %q: %w", pod.Name, err) + } + + return nil + }, + wantRequeuedPods: sets.New("pod2"), + }, } for _, tt := range tests {