diff --git a/pkg/scheduler/backend/queue/scheduling_queue.go b/pkg/scheduler/backend/queue/scheduling_queue.go index a16cccd6f1b..66e7334f61a 100644 --- a/pkg/scheduler/backend/queue/scheduling_queue.go +++ b/pkg/scheduler/backend/queue/scheduling_queue.go @@ -989,10 +989,10 @@ func (p *PriorityQueue) AssignedPodAdded(logger klog.Logger, pod *v1.Pod) { // may make pending pods with matching affinity terms schedulable. func (p *PriorityQueue) AssignedPodUpdated(logger klog.Logger, oldPod, newPod *v1.Pod, event framework.ClusterEvent) { p.lock.Lock() - if event.Resource == framework.Pod && event.ActionType&framework.UpdatePodScaleDown != 0 { + if (framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.UpdatePodScaleDown}.Match(event)) { // In this case, we don't want to pre-filter Pods by getUnschedulablePodsWithCrossTopologyTerm // because Pod related events may make Pods that were rejected by NodeResourceFit schedulable. - p.moveAllToActiveOrBackoffQueue(logger, framework.EventAssignedPodUpdate, oldPod, newPod, nil) + p.moveAllToActiveOrBackoffQueue(logger, event, oldPod, newPod, nil) } else { // Pre-filter Pods to move by getUnschedulablePodsWithCrossTopologyTerm // because Pod related events only make Pods rejected by cross topology term schedulable. diff --git a/pkg/scheduler/backend/queue/scheduling_queue_test.go b/pkg/scheduler/backend/queue/scheduling_queue_test.go index 83d9e707da7..aff395abf00 100644 --- a/pkg/scheduler/backend/queue/scheduling_queue_test.go +++ b/pkg/scheduler/backend/queue/scheduling_queue_test.go @@ -2053,6 +2053,121 @@ func TestPriorityQueue_AssignedPodAdded_(t *testing.T) { } } +func TestPriorityQueue_AssignedPodUpdated(t *testing.T) { + metrics.Register() + tests := []struct { + name string + unschedPod *v1.Pod + unschedPlugin string + updatedAssignedPod *v1.Pod + event framework.ClusterEvent + wantToRequeue bool + }{ + { + name: "Pod rejected by pod affinity is requeued with matching Pod's update", + unschedPod: st.MakePod().Name("afp").Namespace("ns1").UID("afp").Annotation("annot2", "val2").PodAffinityExists("service", "region", st.PodAffinityWithRequiredReq).Obj(), + unschedPlugin: names.InterPodAffinity, + event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.UpdatePodLabel}, + updatedAssignedPod: st.MakePod().Name("lbp").Namespace("ns1").Label("service", "securityscan").Node("node1").Obj(), + wantToRequeue: true, + }, + { + name: "Pod rejected by pod affinity isn't requeued with unrelated Pod's update", + unschedPod: st.MakePod().Name("afp").Namespace("ns1").UID("afp").Annotation("annot2", "val2").PodAffinityExists("service", "region", st.PodAffinityWithRequiredReq).Obj(), + unschedPlugin: names.InterPodAffinity, + event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.UpdatePodLabel}, + updatedAssignedPod: st.MakePod().Name("lbp").Namespace("unrelated").Label("unrelated", "unrelated").Node("node1").Obj(), + wantToRequeue: false, + }, + { + name: "Pod rejected by pod topology spread is requeued with Pod's update in the same namespace", + unschedPod: st.MakePod().Name("tsp").Namespace("ns1").UID("tsp").SpreadConstraint(1, "node", v1.DoNotSchedule, nil, nil, nil, nil, nil).Obj(), + unschedPlugin: names.PodTopologySpread, + event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.UpdatePodLabel}, + updatedAssignedPod: st.MakePod().Name("lbp").Namespace("ns1").Label("service", "securityscan").Node("node1").Obj(), + wantToRequeue: true, + }, + { + name: "Pod rejected by pod topology spread isn't requeued with unrelated Pod's update", + unschedPod: st.MakePod().Name("afp").Namespace("ns1").UID("afp").Annotation("annot2", "val2").PodAffinityExists("service", "region", st.PodAffinityWithRequiredReq).Obj(), + unschedPlugin: names.PodTopologySpread, + event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.UpdatePodLabel}, + updatedAssignedPod: st.MakePod().Name("lbp").Namespace("unrelated").Label("unrelated", "unrelated").Node("node1").Obj(), + wantToRequeue: false, + }, + { + name: "Pod rejected by resource fit is requeued with assigned Pod's scale down", + unschedPod: st.MakePod().Name("rp").Namespace("ns1").UID("afp").Annotation("annot2", "val2").Req(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).Obj(), + unschedPlugin: names.NodeResourcesFit, + event: framework.ClusterEvent{Resource: framework.EventResource("AssignedPod"), ActionType: framework.UpdatePodScaleDown}, + updatedAssignedPod: st.MakePod().Name("lbp").Namespace("ns2").Node("node1").Obj(), + wantToRequeue: true, + }, + { + name: "Pod rejected by other plugins isn't requeued with any Pod's update", + unschedPod: st.MakePod().Name("afp").Namespace("ns1").UID("afp").Annotation("annot2", "val2").Obj(), + unschedPlugin: "fakePlugin", + event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.UpdatePodLabel}, + updatedAssignedPod: st.MakePod().Name("lbp").Namespace("unrelated").Label("unrelated", "unrelated").Node("node1").Obj(), + wantToRequeue: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + logger, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + c := testingclock.NewFakeClock(time.Now()) + m := makeEmptyQueueingHintMapPerProfile() + m[""] = map[framework.ClusterEvent][]*QueueingHintFunction{ + {Resource: framework.Pod, ActionType: framework.UpdatePodLabel}: { + { + PluginName: "fakePlugin", + QueueingHintFn: queueHintReturnQueue, + }, + { + PluginName: names.InterPodAffinity, + QueueingHintFn: queueHintReturnQueue, + }, + { + PluginName: names.PodTopologySpread, + QueueingHintFn: queueHintReturnQueue, + }, + }, + {Resource: framework.Pod, ActionType: framework.UpdatePodScaleDown}: { + { + PluginName: names.NodeResourcesFit, + QueueingHintFn: queueHintReturnQueue, + }, + }, + } + q := NewTestQueue(ctx, newDefaultQueueSort(), WithClock(c), WithQueueingHintMapPerProfile(m)) + + // To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before AddUnschedulableIfNotPresent()s below. + q.Add(logger, tt.unschedPod) + if p, err := q.Pop(logger); err != nil || p.Pod != tt.unschedPod { + t.Errorf("Expected: %v after Pop, but got: %v", tt.unschedPod.Name, p.Pod.Name) + } + + err := q.AddUnschedulableIfNotPresent(logger, q.newQueuedPodInfo(tt.unschedPod, tt.unschedPlugin), q.SchedulingCycle()) + if err != nil { + t.Fatalf("unexpected error from AddUnschedulableIfNotPresent: %v", err) + } + + // Move clock to make the unschedulable pods complete backoff. + c.Step(DefaultPodInitialBackoffDuration + time.Second) + + q.AssignedPodUpdated(logger, nil, tt.updatedAssignedPod, tt.event) + + if q.activeQ.has(newQueuedPodInfoForLookup(tt.unschedPod)) != tt.wantToRequeue { + t.Fatalf("unexpected Pod move: Pod should be requeued: %v. Pod is actually requeued: %v", tt.wantToRequeue, !tt.wantToRequeue) + } + }) + } +} + func TestPriorityQueue_NominatedPodsForNode(t *testing.T) { objs := []runtime.Object{medPriorityPodInfo.Pod, unschedulablePodInfo.Pod, highPriorityPodInfo.Pod} logger, ctx := ktesting.NewTestContext(t) diff --git a/pkg/scheduler/framework/types_test.go b/pkg/scheduler/framework/types_test.go index 1b1d81bd561..ed354387dae 100644 --- a/pkg/scheduler/framework/types_test.go +++ b/pkg/scheduler/framework/types_test.go @@ -1709,6 +1709,12 @@ func TestCloudEvent_Match(t *testing.T) { comingEvent: ClusterEvent{Resource: Pod, ActionType: UpdateNodeLabel}, wantResult: true, }, + { + name: "event with resource = 'Pod' matching with coming events carries unschedulablePod", + event: ClusterEvent{Resource: Pod, ActionType: UpdateNodeLabel | UpdateNodeTaint}, + comingEvent: ClusterEvent{Resource: unschedulablePod, ActionType: UpdateNodeLabel}, + wantResult: true, + }, { name: "event with resource = '*' matching with coming events carries same actionType", event: ClusterEvent{Resource: WildCard, ActionType: UpdateNodeLabel},