diff --git a/pkg/scheduler/internal/queue/scheduling_queue.go b/pkg/scheduler/internal/queue/scheduling_queue.go index 657ce6481c0..fac98d0fcfd 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue.go +++ b/pkg/scheduler/internal/queue/scheduling_queue.go @@ -165,9 +165,26 @@ type PriorityQueue struct { // inFlightPods holds the UID of all pods which have been popped out for which Done // hasn't been called yet - in other words, all pods that are currently being // processed (being scheduled, in permit, or in the binding cycle). - inFlightPods map[types.UID]inFlightPod - // receivedEvents holds the events received by the scheduling queue. - receivedEvents *list.List + // + // The values in the map are the entry of each pod in the inFlightEvents list. + // The value of that entry is the *v1.Pod at the time that scheduling of that + // pod started, which can be useful for logging or debugging. + inFlightPods map[types.UID]*list.Element + + // inFlightEvents holds the events received by the scheduling queue + // (entry value is clusterEvent) together with in-flight pods (entry + // value is *v1.Pod). Entries get added at the end while the mutex is + // locked, so they get serialized. + // + // The pod entries are added in Pop and used to track which events + // occurred after the pod scheduling attempt for that pod started. + // They get removed when the scheduling attempt is done, at which + // point all events that occurred in the meantime are processed. + // + // After removal of a pod, events at the start of the list are no + // longer needed because all of the other in-flight pods started + // later. Those events can be removed. + inFlightEvents *list.List // activeQ is heap structure that scheduler actively looks at to find pods to // schedule. Head of heap is the highest priority pod. @@ -212,11 +229,6 @@ type QueueingHintFunction struct { QueueingHintFn framework.QueueingHintFn } -type inFlightPod struct { - // previousEvent is the latest observed event when the pod is popped. - previousEvent *list.Element -} - // clusterEvent has the event and involved objects. type clusterEvent struct { event framework.ClusterEvent @@ -224,11 +236,6 @@ type clusterEvent struct { oldObj interface{} // newObj is the object that involved this event. newObj interface{} - - // inFlightPodsNum is the counter of pods referring to this cluster event. - // It is initialized with the number of Pods being scheduled when the event is received, - // and is decremented when the scheduling for those Pods are Done(). - inFlightPodsNum int } type priorityQueueOptions struct { @@ -364,8 +371,8 @@ func NewPriorityQueue( podMaxInUnschedulablePodsDuration: options.podMaxInUnschedulablePodsDuration, activeQ: heap.NewWithRecorder(podInfoKeyFunc, comp, metrics.NewActivePodsRecorder()), unschedulablePods: newUnschedulablePods(metrics.NewUnschedulablePodsRecorder(), metrics.NewGatedPodsRecorder()), - inFlightPods: make(map[types.UID]inFlightPod), - receivedEvents: list.New(), + inFlightPods: make(map[types.UID]*list.Element), + inFlightEvents: list.New(), preEnqueuePluginMap: options.preEnqueuePluginMap, queueingHintMap: options.queueingHintMap, metricsRecorder: options.metricsRecorder, @@ -607,36 +614,48 @@ func (p *PriorityQueue) SchedulingCycle() int64 { // determineSchedulingHintForInFlightPod looks at the unschedulable plugins of the given Pod // and determines the scheduling hint for this Pod while checking the events that happened during in-flight. func (p *PriorityQueue) determineSchedulingHintForInFlightPod(logger klog.Logger, pInfo *framework.QueuedPodInfo, podSchedulingCycle int64) framework.QueueingHint { - if len(pInfo.UnschedulablePlugins) == 0 { - // When there is no unschedulable plugin, we cannot have a guess which event makes this Pod schedulable. - // Here, we use the latest requestCycle so that this Pod won't be stuck in the unschedulable pod pool for a long time. - if p.receivedEvents.Len() != 0 { + logger.V(5).Info("Checking events for in-flight pod", "pod", klog.KObj(pInfo.Pod), "unschedulablePlugins", pInfo.UnschedulablePlugins, "inFlightEventsSize", p.inFlightEvents.Len(), "inFlightPodsSize", len(p.inFlightPods)) + + // AddUnschedulableIfNotPresent is called with the Pod at the end of scheduling or binding. + // So, given pInfo should have been Pop()ed before, + // we can assume pInfo must be recorded in inFlightPods and thus inFlightEvents. + inFlightPod, ok := p.inFlightPods[pInfo.Pod.UID] + if !ok { + // This can happen while updating a pod. In that case pInfo.UnschedulablePlugins should + // be empty. If it is not, we may have a problem. + if len(pInfo.UnschedulablePlugins) != 0 { + logger.Error(nil, "In flight Pod isn't found in the scheduling queue. If you see this error log, it's likely a bug in the scheduler.", "pod", klog.KObj(pInfo.Pod)) + return framework.QueueAfterBackoff + } + if p.inFlightEvents.Len() > len(p.inFlightPods) { return framework.QueueAfterBackoff } return framework.QueueSkip } - inFlightPod, ok := p.inFlightPods[pInfo.Pod.UID] - if !ok { - // It shouldn't reach here unless there is a bug somewhere. - // But, set podSchedulingCycle to moveRequestCycle - // so that this Pod won't stuck in the unschedulable pod pool. - logger.Error(nil, "In flight Pod isn't found in the scheduling queue. If you see this error log, it's likely a bug in the scheduler.", "pod", klog.KObj(pInfo.Pod)) - return framework.QueueAfterBackoff + if len(pInfo.UnschedulablePlugins) == 0 { + // When there is no unschedulable plugin, we cannot have a guess which event makes this Pod schedulable. + // If there has been any concurrent event for the pod, it has to go to the backoff queue because the event + // may have been relevant. + for event := inFlightPod.Next(); event != nil; event = event.Next() { + _, ok := event.Value.(*clusterEvent) + if ok { + // There really was a concurrent event. + return framework.QueueAfterBackoff + } + } + return framework.QueueSkip } - // AddUnschedulableIfNotPresent is called with the Pod at the end of scheduling or binding. - // So, given pInfo should have been Pop()ed before, - // we can assume pInfo must be recorded in inFlightPods. // check if there is an event that makes this Pod schedulable based on pInfo.UnschedulablePlugins. - event := p.receivedEvents.Front() - if inFlightPod.previousEvent != nil { - // only check events that happened after the Pod was popped. - event = inFlightPod.previousEvent.Next() - } schedulingHint := framework.QueueSkip - for ; event != nil; event = event.Next() { - e := event.Value.(*clusterEvent) + for event := inFlightPod.Next(); event != nil; event = event.Next() { + e, ok := event.Value.(*clusterEvent) + if !ok { + // Must be another pod. Can be ignored. + continue + } + logger.V(5).Info("Checking event for in-flight pod", "pod", klog.KObj(pInfo.Pod), "event", e.event.Label) hint := p.isPodWorthRequeuing(logger, pInfo, e.event, e.oldObj, e.newObj) if hint == framework.QueueSkip { @@ -816,11 +835,9 @@ func (p *PriorityQueue) Pop() (*framework.QueuedPodInfo, error) { pInfo := obj.(*framework.QueuedPodInfo) pInfo.Attempts++ p.schedulingCycle++ - // In flight, no move request yet. + // In flight, no concurrent events yet. if p.isSchedulingQueueHintEnabled { - p.inFlightPods[pInfo.Pod.UID] = inFlightPod{ - previousEvent: p.receivedEvents.Back(), - } + p.inFlightPods[pInfo.Pod.UID] = p.inFlightEvents.PushBack(pInfo.Pod) } for plugin := range pInfo.UnschedulablePlugins { @@ -842,7 +859,7 @@ func (p *PriorityQueue) Done(pod types.UID) { func (p *PriorityQueue) done(pod types.UID) { if !p.isSchedulingQueueHintEnabled { // do nothing if schedulingQueueHint is disabled. - // In that case, we don't have inFlightPods and receivedEvents. + // In that case, we don't have inFlightPods and inFlightEvents. return } inFlightPod, ok := p.inFlightPods[pod] @@ -852,35 +869,25 @@ func (p *PriorityQueue) done(pod types.UID) { } delete(p.inFlightPods, pod) - // remove events which is only referred from this Pod - // so that the receivedEvents map doesn't grow infinitely. + // Remove the pod from the list. + p.inFlightEvents.Remove(inFlightPod) - // Find the event that we should start. - // case1. If the previousEvent is nil, it means no receivedEvents when this Pod's scheduling started. - // We start from the first event in the receivedEvents. - // case2. If the previousEvent is not nil, but the inFlightPodsNum is 0, - // this previousEvent is removed from the list already. - // We start from the first event in the receivedEvents. - event := p.receivedEvents.Front() - if inFlightPod.previousEvent != nil && inFlightPod.previousEvent.Value.(*clusterEvent).inFlightPodsNum != 0 { - // case3. If the previousEvent is not nil, and the inFlightPodsNum is not 0, - // we can start from the next event of the previousEvent. - event = inFlightPod.previousEvent.Next() - } - - for event != nil { - e := event.Value.(*clusterEvent) - // decrement inFlightPodsNum on events that happened after the Pod is popped. - e.inFlightPodsNum-- - if e.inFlightPodsNum <= 0 { - // remove the event from the list if no Pod refers to it. - eventToDelete := event - // we need to take next event before removal. - event = event.Next() - p.receivedEvents.Remove(eventToDelete) - continue + // Remove events which are only referred to by this Pod + // so that the inFlightEvents list doesn't grow infinitely. + // If the pod was at the head of the list, then all + // events between it and the next pod are no longer needed + // and can be removed. + for { + e := p.inFlightEvents.Front() + if e == nil { + // Empty list. + break } - event = event.Next() + if _, ok := e.Value.(*clusterEvent); !ok { + // A pod, must stop pruning. + break + } + p.inFlightEvents.Remove(e) } } @@ -1108,14 +1115,14 @@ func (p *PriorityQueue) movePodsToActiveOrBackoffQueue(logger klog.Logger, podIn // (no need to check the feature gate because there is always no p.inFlightPods when the feature is disabled.) if len(p.inFlightPods) != 0 { + logger.V(5).Info("Event received while pods are in flight", "event", event.Label, "numPods", len(p.inFlightPods)) // AddUnschedulableIfNotPresent might get called for in-flight Pods later, and in // AddUnschedulableIfNotPresent we need to know whether events were // observed while scheduling them. - p.receivedEvents.PushBack(&clusterEvent{ - event: event, - inFlightPodsNum: len(p.inFlightPods), - oldObj: oldObj, - newObj: newObj, + p.inFlightEvents.PushBack(&clusterEvent{ + event: event, + oldObj: oldObj, + newObj: newObj, }) } diff --git a/pkg/scheduler/internal/queue/scheduling_queue_test.go b/pkg/scheduler/internal/queue/scheduling_queue_test.go index 541f3f7d42c..12b92bb7bdd 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue_test.go +++ b/pkg/scheduler/internal/queue/scheduling_queue_test.go @@ -179,32 +179,37 @@ func TestPriorityQueue_AddWithReversePriorityLessFunc(t *testing.T) { } } -func clusterEventsToList(clusterEvents []*clusterEvent) *list.List { - l := list.New() - for _, event := range clusterEvents { - l.PushBack(event) - } - return l -} - -func listToClusterEvents(l *list.List) []*clusterEvent { - clusterEvents := []*clusterEvent{} +func listToValues(l *list.List) []interface{} { + var values []interface{} for e := l.Front(); e != nil; e = e.Next() { - clusterEvents = append(clusterEvents, e.Value.(*clusterEvent)) + values = append(values, e.Value) } - return clusterEvents + return values } func Test_InFlightPods(t *testing.T) { pod := st.MakePod().Name("targetpod").UID("pod1").Obj() pod2 := st.MakePod().Name("targetpod2").UID("pod2").Obj() pod3 := st.MakePod().Name("targetpod3").UID("pod3").Obj() + var poppedPod, poppedPod2 *framework.QueuedPodInfo type action struct { // ONLY ONE of the following should be set. eventHappens *framework.ClusterEvent podPopped *v1.Pod podEnqueued *framework.QueuedPodInfo + callback func(t *testing.T, q *PriorityQueue) + } + + popPod := func(t *testing.T, q *PriorityQueue, pod *v1.Pod) *framework.QueuedPodInfo { + p, err := q.Pop() + if err != nil { + t.Fatalf("Pop failed: %v", err) + } + if p.Pod.UID != pod.UID { + t.Errorf("Unexpected popped pod: %v", p) + } + return p } tests := []struct { @@ -213,24 +218,24 @@ func Test_InFlightPods(t *testing.T) { // initialPods is the initial Pods in the activeQ. initialPods []*v1.Pod actions []action - wantInFlightPods map[types.UID]inFlightPod + wantInFlightPods []*v1.Pod + wantInFlightEvents []interface{} wantActiveQPodNames []string wantBackoffQPodNames []string wantUnschedPodPoolPodNames []string - wantReceivedEvents *list.List isSchedulingQueueHintEnabled bool }{ { - name: "when SchedulingQueueHint is disabled, inFlightPods and receivedEvents should be empty", + name: "when SchedulingQueueHint is disabled, inFlightPods and inFlightEvents should be empty", initialPods: []*v1.Pod{pod}, actions: []action{ // This Pod shouldn't be added to inFlightPods because SchedulingQueueHint is disabled. {podPopped: pod}, - // This event shouldn't be added to receivedEvents because SchedulingQueueHint is disabled. + // This event shouldn't be added to inFlightEvents because SchedulingQueueHint is disabled. {eventHappens: &PvAdd}, }, - wantInFlightPods: map[types.UID]inFlightPod{}, - wantReceivedEvents: clusterEventsToList([]*clusterEvent{}), + wantInFlightPods: nil, + wantInFlightEvents: nil, }, { name: "when SchedulingQueueHint is disabled, which queue to enqueue Pod should be decided without SchedulingQueueHint", @@ -241,8 +246,8 @@ func Test_InFlightPods(t *testing.T) { {podEnqueued: newQueuedPodInfoForLookup(pod, "fooPlugin1")}, }, wantBackoffQPodNames: []string{"targetpod"}, - wantInFlightPods: map[types.UID]inFlightPod{}, - wantReceivedEvents: list.New(), + wantInFlightPods: nil, + wantInFlightEvents: nil, queueingHintMap: QueueingHintMapPerProfile{ "": { // This hint fn tells that this event doesn't make a Pod schedulable. @@ -257,33 +262,26 @@ func Test_InFlightPods(t *testing.T) { }, }, { - name: "Pod is registered in inFlightPods with no previousEvent if Pod is popped from activeQ while no receivedEvents", + name: "Pod is registered in inFlightPods when Pod is popped from activeQ", isSchedulingQueueHintEnabled: true, initialPods: []*v1.Pod{pod}, actions: []action{ - // This won't be added to receivedEvents because no inFlightPods at this point. + // This won't be added to inFlightEvents because no inFlightPods at this point. {eventHappens: &PvcAdd}, - // This Pod has no previousEvent because no receivedEvents at this point. {podPopped: pod}, + // This gets added for the pod. {eventHappens: &PvAdd}, }, - wantInFlightPods: map[types.UID]inFlightPod{ - "pod1": { - // no previousEvent - }, - }, - wantReceivedEvents: clusterEventsToList([]*clusterEvent{ - {event: PvAdd, inFlightPodsNum: 1}, - }), + wantInFlightPods: []*v1.Pod{pod}, + wantInFlightEvents: []interface{}{pod, PvAdd}, }, { - name: "Pod, registered in inFlightPods with no previousEvent, is enqueued back to activeQ", + name: "Pod, registered in inFlightPods, is enqueued back to activeQ", isSchedulingQueueHintEnabled: true, initialPods: []*v1.Pod{pod, pod2}, actions: []action{ - // This won't be added to receivedEvents because no inFlightPods at this point. + // This won't be added to inFlightEvents because no inFlightPods at this point. {eventHappens: &PvcAdd}, - // This Pod has no previousEvent because no receivedEvents at this point. {podPopped: pod}, {eventHappens: &PvAdd}, {podPopped: pod2}, @@ -291,74 +289,47 @@ func Test_InFlightPods(t *testing.T) { {podEnqueued: newQueuedPodInfoForLookup(pod)}, }, wantBackoffQPodNames: []string{"targetpod"}, - wantInFlightPods: map[types.UID]inFlightPod{ - "pod2": { - // When pod is enqueued back to queue, inFlightPodsNum in previousEvent is also updated to 0. - previousEvent: &list.Element{Value: &clusterEvent{event: PvAdd, inFlightPodsNum: 0}}, - }, - }, - wantReceivedEvents: clusterEventsToList([]*clusterEvent{ - // event: PvAdd is removed when pod is enqueued back to queue. - {event: NodeAdd, inFlightPodsNum: 1}, // inFlightPodsNum is updated from 2 to 1. - }), + wantInFlightPods: []*v1.Pod{pod2}, + wantInFlightEvents: []interface{}{pod2, NodeAdd}, }, { - name: "Pod registered in inFlightPods with previousEvent with inFlightPodsNum:0 is enqueued back to activeQ", + name: "All Pods registered in inFlightPods are enqueued back to activeQ", isSchedulingQueueHintEnabled: true, initialPods: []*v1.Pod{pod, pod2}, actions: []action{ - // This won't be added to receivedEvents because no inFlightPods at this point. + // This won't be added to inFlightEvents because no inFlightPods at this point. {eventHappens: &PvcAdd}, - // This Pod has no previousEvent because no receivedEvents at this point. {podPopped: pod}, {eventHappens: &PvAdd}, {podPopped: pod2}, {eventHappens: &NodeAdd}, {podEnqueued: newQueuedPodInfoForLookup(pod)}, {eventHappens: &CSINodeUpdate}, - // pod2 is registered in inFlightPods with previousEvent with inFlightPodsNum:0. {podEnqueued: newQueuedPodInfoForLookup(pod2)}, }, wantBackoffQPodNames: []string{"targetpod", "targetpod2"}, - wantInFlightPods: map[types.UID]inFlightPod{}, - wantReceivedEvents: clusterEventsToList([]*clusterEvent{ - // all events are correctly cleaned up. - }), + wantInFlightPods: nil, + wantInFlightEvents: nil, }, { - name: "Pod registered in inFlightPods with previousEvent with inFlightPodsNum:non-zero is enqueued back to activeQ", + name: "One intermediate Pod registered in inFlightPods is enqueued back to activeQ", isSchedulingQueueHintEnabled: true, initialPods: []*v1.Pod{pod, pod2, pod3}, actions: []action{ - // This won't be added to receivedEvents because no inFlightPods at this point. + // This won't be added to inFlightEvents because no inFlightPods at this point. {eventHappens: &PvcAdd}, - // This Pod has no previousEvent because no receivedEvents at this point. {podPopped: pod}, {eventHappens: &PvAdd}, - // This Pod will get previousEvent (PvAdd). {podPopped: pod2}, {eventHappens: &NodeAdd}, - // This Pod will get previousEvent (NodeAdd). // This Pod won't be requeued again. {podPopped: pod3}, {eventHappens: &AssignedPodAdd}, - // pod2 is registered in inFlightPods with previousEvent with inFlightPodsNum:non-zero. {podEnqueued: newQueuedPodInfoForLookup(pod2)}, }, wantBackoffQPodNames: []string{"targetpod2"}, - wantInFlightPods: map[types.UID]inFlightPod{ - "pod1": { - // no previousEvent - }, - "pod3": { - previousEvent: &list.Element{Value: &clusterEvent{event: NodeAdd, inFlightPodsNum: 1}}, - }, - }, - wantReceivedEvents: clusterEventsToList([]*clusterEvent{ - {event: PvAdd, inFlightPodsNum: 1}, - {event: NodeAdd, inFlightPodsNum: 1}, - {event: AssignedPodAdd, inFlightPodsNum: 2}, - }), + wantInFlightPods: []*v1.Pod{pod, pod3}, + wantInFlightEvents: []interface{}{pod, PvAdd, NodeAdd, pod3, AssignedPodAdd}, }, { name: "events before popping Pod are ignored", @@ -371,8 +342,8 @@ func Test_InFlightPods(t *testing.T) { {podEnqueued: newQueuedPodInfoForLookup(pod, "fooPlugin1")}, }, wantUnschedPodPoolPodNames: []string{"targetpod"}, - wantInFlightPods: map[types.UID]inFlightPod{}, - wantReceivedEvents: list.New(), + wantInFlightPods: nil, + wantInFlightEvents: nil, queueingHintMap: QueueingHintMapPerProfile{ "": { // fooPlugin1 has a queueing hint function for AssignedPodAdd, @@ -396,8 +367,8 @@ func Test_InFlightPods(t *testing.T) { {podEnqueued: newQueuedPodInfoForLookup(pod)}, }, wantBackoffQPodNames: []string{"targetpod"}, - wantInFlightPods: map[types.UID]inFlightPod{}, - wantReceivedEvents: list.New(), + wantInFlightPods: nil, + wantInFlightEvents: nil, queueingHintMap: QueueingHintMapPerProfile{ "": { // It will be ignored because no failed plugin. @@ -420,8 +391,8 @@ func Test_InFlightPods(t *testing.T) { {podEnqueued: newQueuedPodInfoForLookup(pod, "fooPlugin1")}, }, wantUnschedPodPoolPodNames: []string{"targetpod"}, - wantInFlightPods: map[types.UID]inFlightPod{}, - wantReceivedEvents: list.New(), + wantInFlightPods: nil, + wantInFlightEvents: nil, queueingHintMap: QueueingHintMapPerProfile{ "": { // fooPlugin1 has no queueing hint function for NodeAdd. @@ -445,8 +416,8 @@ func Test_InFlightPods(t *testing.T) { {podEnqueued: newQueuedPodInfoForLookup(pod, "fooPlugin1")}, }, wantUnschedPodPoolPodNames: []string{"targetpod"}, - wantInFlightPods: map[types.UID]inFlightPod{}, - wantReceivedEvents: list.New(), + wantInFlightPods: nil, + wantInFlightEvents: nil, queueingHintMap: QueueingHintMapPerProfile{ "": { // fooPlugin1 has a queueing hint function for AssignedPodAdd, @@ -470,8 +441,8 @@ func Test_InFlightPods(t *testing.T) { {podEnqueued: newQueuedPodInfoForLookup(pod, "fooPlugin1", "fooPlugin2", "fooPlugin3")}, }, wantActiveQPodNames: []string{"targetpod"}, - wantInFlightPods: map[types.UID]inFlightPod{}, - wantReceivedEvents: list.New(), + wantInFlightPods: nil, + wantInFlightEvents: nil, queueingHintMap: QueueingHintMapPerProfile{ "": { AssignedPodAdd: { @@ -504,8 +475,8 @@ func Test_InFlightPods(t *testing.T) { {podEnqueued: newQueuedPodInfoForLookup(pod, "fooPlugin1", "fooPlugin2")}, }, wantBackoffQPodNames: []string{"targetpod"}, - wantInFlightPods: map[types.UID]inFlightPod{}, - wantReceivedEvents: list.New(), + wantInFlightPods: nil, + wantInFlightEvents: nil, queueingHintMap: QueueingHintMapPerProfile{ "": { AssignedPodAdd: { @@ -523,6 +494,54 @@ func Test_InFlightPods(t *testing.T) { }, }, }, + { + name: "pod is enqueued to activeQ because the failed plugin has a hint fn and it returns QueueImmediately for a concurrent event that was received while some other pod was in flight", + isSchedulingQueueHintEnabled: true, + initialPods: []*v1.Pod{pod, pod2}, + actions: []action{ + {callback: func(t *testing.T, q *PriorityQueue) { poppedPod = popPod(t, q, pod) }}, + {eventHappens: &NodeAdd}, + {callback: func(t *testing.T, q *PriorityQueue) { poppedPod2 = popPod(t, q, pod2) }}, + {eventHappens: &AssignedPodAdd}, + {callback: func(t *testing.T, q *PriorityQueue) { + logger, _ := ktesting.NewTestContext(t) + if err := q.AddUnschedulableIfNotPresent(logger, poppedPod, q.SchedulingCycle()); err != nil { + t.Errorf("Unexpected error from AddUnschedulableIfNotPresent: %v", err) + } + }}, + {callback: func(t *testing.T, q *PriorityQueue) { + logger, _ := ktesting.NewTestContext(t) + poppedPod2.UnschedulablePlugins = sets.New("fooPlugin1", "fooPlugin2", "fooPlugin3") + if err := q.AddUnschedulableIfNotPresent(logger, poppedPod2, q.SchedulingCycle()); err != nil { + t.Errorf("Unexpected error from AddUnschedulableIfNotPresent: %v", err) + } + }}, + }, + wantActiveQPodNames: []string{pod2.Name}, + wantInFlightPods: nil, + wantInFlightEvents: nil, + queueingHintMap: QueueingHintMapPerProfile{ + "": { + AssignedPodAdd: { + { + // it will be ignored because the hint fn returns QueueSkip that is weaker than queueHintReturnQueueImmediately from fooPlugin1. + PluginName: "fooPlugin3", + QueueingHintFn: queueHintReturnQueueSkip, + }, + { + // it will be ignored because the hint fn returns QueueAfterBackoff that is weaker than queueHintReturnQueueImmediately from fooPlugin1. + PluginName: "fooPlugin2", + QueueingHintFn: queueHintReturnQueueAfterBackoff, + }, + { + // The hint fn tells that this event makes a Pod scheudlable immediately. + PluginName: "fooPlugin1", + QueueingHintFn: queueHintReturnQueueImmediately, + }, + }, + }, + }, + }, } for _, test := range tests { @@ -544,27 +563,37 @@ func Test_InFlightPods(t *testing.T) { for _, action := range test.actions { switch { case action.podPopped != nil: - p, err := q.Pop() - if err != nil { - t.Fatalf("Pop failed: %v", err) - } - if p.Pod.UID != action.podPopped.UID { - t.Errorf("Unexpected popped pod: %v", p) - } - continue + popPod(t, q, action.podPopped) case action.eventHappens != nil: q.MoveAllToActiveOrBackoffQueue(logger, *action.eventHappens, nil, nil, nil) case action.podEnqueued != nil: q.AddUnschedulableIfNotPresent(logger, action.podEnqueued, q.SchedulingCycle()) + case action.callback != nil: + action.callback(t, q) } } - if diff := cmp.Diff(test.wantInFlightPods, q.inFlightPods, cmp.AllowUnexported(inFlightPod{}, list.Element{}, clusterEvent{}), cmpopts.IgnoreFields(list.Element{}, "next", "prev", "list")); diff != "" { + actualInFlightPods := make(map[types.UID]*v1.Pod) + for uid, element := range q.inFlightPods { + actualInFlightPods[uid] = element.Value.(*v1.Pod) + } + wantInFlightPods := make(map[types.UID]*v1.Pod) + for _, pod := range test.wantInFlightPods { + wantInFlightPods[pod.UID] = pod + } + if diff := cmp.Diff(wantInFlightPods, actualInFlightPods); diff != "" { t.Errorf("Unexpected diff in inFlightPods (-want, +got):\n%s", diff) } - if diff := cmp.Diff(listToClusterEvents(test.wantReceivedEvents), listToClusterEvents(q.receivedEvents), cmp.AllowUnexported(clusterEvent{})); diff != "" { - t.Errorf("Unexpected diff in receivedEvents (-want, +got):\n%s", diff) + var wantInFlightEvents []interface{} + for _, value := range test.wantInFlightEvents { + if event, ok := value.(framework.ClusterEvent); ok { + value = &clusterEvent{event: event} + } + wantInFlightEvents = append(wantInFlightEvents, value) + } + if diff := cmp.Diff(wantInFlightEvents, listToValues(q.inFlightEvents), cmp.AllowUnexported(clusterEvent{})); diff != "" { + t.Errorf("Unexpected diff in inFlightEvents (-want, +got):\n%s", diff) } if test.wantActiveQPodNames != nil { @@ -1231,28 +1260,33 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueue(t *testing.T) { if p, err := q.Pop(); err != nil || p.Pod != unschedulablePodInfo.Pod { t.Errorf("Expected: %v after Pop, but got: %v", unschedulablePodInfo.Pod.Name, p.Pod.Name) } + expectInFlightPods(t, q, unschedulablePodInfo.Pod.UID) q.activeQ.Add(q.newQueuedPodInfo(highPriorityPodInfo.Pod)) if p, err := q.Pop(); err != nil || p.Pod != highPriorityPodInfo.Pod { t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPodInfo.Pod.Name, p.Pod.Name) } + expectInFlightPods(t, q, unschedulablePodInfo.Pod.UID, highPriorityPodInfo.Pod.UID) q.AddUnschedulableIfNotPresent(logger, q.newQueuedPodInfo(unschedulablePodInfo.Pod, "fooPlugin"), q.SchedulingCycle()) q.AddUnschedulableIfNotPresent(logger, q.newQueuedPodInfo(highPriorityPodInfo.Pod, "fooPlugin"), q.SchedulingCycle()) + expectInFlightPods(t, q) // Construct a Pod, but don't associate its scheduler failure to any plugin - hpp1 := highPriorityPodInfo.Pod.DeepCopy() - hpp1.Name = "hpp1" + hpp1 := clonePod(highPriorityPodInfo.Pod, "hpp1") q.activeQ.Add(q.newQueuedPodInfo(hpp1)) if p, err := q.Pop(); err != nil || p.Pod != hpp1 { t.Errorf("Expected: %v after Pop, but got: %v", hpp1, p.Pod.Name) } + expectInFlightPods(t, q, hpp1.UID) q.AddUnschedulableIfNotPresent(logger, q.newQueuedPodInfo(hpp1), q.SchedulingCycle()) + expectInFlightPods(t, q) // Construct another Pod, and associate its scheduler failure to plugin "barPlugin". - hpp2 := highPriorityPodInfo.Pod.DeepCopy() - hpp2.Name = "hpp2" + hpp2 := clonePod(highPriorityPodInfo.Pod, "hpp2") q.activeQ.Add(q.newQueuedPodInfo(hpp2)) if p, err := q.Pop(); err != nil || p.Pod != hpp2 { t.Errorf("Expected: %v after Pop, but got: %v", hpp2, p.Pod.Name) } + expectInFlightPods(t, q, hpp2.UID) q.AddUnschedulableIfNotPresent(logger, q.newQueuedPodInfo(hpp2, "barPlugin"), q.SchedulingCycle()) + expectInFlightPods(t, q) // Pods is still backing off, move the pod into backoffQ. q.MoveAllToActiveOrBackoffQueue(logger, NodeAdd, nil, nil, nil) q.Add(logger, medPriorityPodInfo.Pod) @@ -1263,32 +1297,41 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueue(t *testing.T) { if p, err := q.Pop(); err != nil || p.Pod != medPriorityPodInfo.Pod { t.Errorf("Expected: %v after Pop, but got: %v", medPriorityPodInfo.Pod, p.Pod.Name) } + expectInFlightPods(t, q, medPriorityPodInfo.Pod.UID) // hpp2 won't be moved. if q.podBackoffQ.Len() != 3 { t.Fatalf("Expected 3 items to be in podBackoffQ, but got: %v", q.podBackoffQ.Len()) } // pop out the pods in the backoffQ. + // This doesn't make them in-flight pods. for q.podBackoffQ.Len() != 0 { q.podBackoffQ.Pop() } + expectInFlightPods(t, q, medPriorityPodInfo.Pod.UID) q.schedulingCycle++ q.activeQ.Add(q.newQueuedPodInfo(unschedulablePodInfo.Pod)) if p, err := q.Pop(); err != nil || p.Pod != unschedulablePodInfo.Pod { t.Errorf("Expected: %v after Pop, but got: %v", unschedulablePodInfo.Pod.Name, p.Pod.Name) } + expectInFlightPods(t, q, medPriorityPodInfo.Pod.UID, unschedulablePodInfo.Pod.UID) q.activeQ.Add(q.newQueuedPodInfo(highPriorityPodInfo.Pod)) if p, err := q.Pop(); err != nil || p.Pod != highPriorityPodInfo.Pod { t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPodInfo.Pod.Name, p.Pod.Name) } + expectInFlightPods(t, q, medPriorityPodInfo.Pod.UID, unschedulablePodInfo.Pod.UID, highPriorityPodInfo.Pod.UID) q.activeQ.Add(q.newQueuedPodInfo(hpp1)) if p, err := q.Pop(); err != nil || p.Pod != hpp1 { t.Errorf("Expected: %v after Pop, but got: %v", hpp1, p.Pod.Name) } + expectInFlightPods(t, q, medPriorityPodInfo.Pod.UID, unschedulablePodInfo.Pod.UID, highPriorityPodInfo.Pod.UID, hpp1.UID) q.AddUnschedulableIfNotPresent(logger, q.newQueuedPodInfo(unschedulablePodInfo.Pod, "fooPlugin"), q.SchedulingCycle()) + expectInFlightPods(t, q, medPriorityPodInfo.Pod.UID, highPriorityPodInfo.Pod.UID, hpp1.UID) q.AddUnschedulableIfNotPresent(logger, q.newQueuedPodInfo(highPriorityPodInfo.Pod, "fooPlugin"), q.SchedulingCycle()) + expectInFlightPods(t, q, medPriorityPodInfo.Pod.UID, hpp1.UID) q.AddUnschedulableIfNotPresent(logger, q.newQueuedPodInfo(hpp1), q.SchedulingCycle()) + expectInFlightPods(t, q, medPriorityPodInfo.Pod.UID) q.Add(logger, medPriorityPodInfo.Pod) for _, pod := range []*v1.Pod{unschedulablePodInfo.Pod, highPriorityPodInfo.Pod, hpp1, hpp2} { if q.unschedulablePods.get(pod) == nil { @@ -1306,6 +1349,35 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueue(t *testing.T) { if q.podBackoffQ.Len() != 0 { t.Errorf("Expected 0 item to be in podBackoffQ, but got: %v", q.podBackoffQ.Len()) } + expectInFlightPods(t, q, medPriorityPodInfo.Pod.UID) +} + +func clonePod(pod *v1.Pod, newName string) *v1.Pod { + pod = pod.DeepCopy() + pod.Name = newName + pod.UID = types.UID(pod.Name + pod.Namespace) + return pod +} + +func expectInFlightPods(t *testing.T, q *PriorityQueue, uids ...types.UID) { + t.Helper() + var actualUIDs []types.UID + for uid := range q.inFlightPods { + actualUIDs = append(actualUIDs, uid) + } + sortUIDs := cmpopts.SortSlices(func(a, b types.UID) bool { return a < b }) + if diff := cmp.Diff(uids, actualUIDs, sortUIDs); diff != "" { + t.Fatalf("Unexpected content of inFlightPods (-want, +have):\n%s", diff) + } + actualUIDs = nil + for e := q.inFlightEvents.Front(); e != nil; e = e.Next() { + if pod, ok := e.Value.(*v1.Pod); ok { + actualUIDs = append(actualUIDs, pod.UID) + } + } + if diff := cmp.Diff(uids, actualUIDs, sortUIDs); diff != "" { + t.Fatalf("Unexpected pods in inFlightEvents (-want, +have):\n%s", diff) + } } // TestPriorityQueue_AssignedPodAdded tests AssignedPodAdded. It checks that