diff --git a/pkg/scheduler/internal/queue/scheduling_queue.go b/pkg/scheduler/internal/queue/scheduling_queue.go index 4c91d7a7ac7..da6b7274219 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue.go +++ b/pkg/scheduler/internal/queue/scheduling_queue.go @@ -27,6 +27,7 @@ limitations under the License. package queue import ( + "container/list" "context" "fmt" "math/rand" @@ -104,6 +105,9 @@ type SchedulingQueue interface { // Pop removes the head of the queue and returns it. It blocks if the // queue is empty and waits until a new item is added to the queue. Pop() (*framework.QueuedPodInfo, error) + // Done must be called for pod returned by Pop. This allows the queue to + // keep track of which pods are currently being processed. + Done(types.UID) Update(logger klog.Logger, oldPod, newPod *v1.Pod) error Delete(pod *v1.Pod) error // TODO(sanposhiho): move all PreEnqueueCkeck to Requeue and delete it from this parameter eventually. @@ -158,6 +162,13 @@ type PriorityQueue struct { cond sync.Cond + // inFlightPods holds the UID of all pods which have been popped out for which Done + // hasn't been called yet - in other words, all pods that are currently being + // processed (being scheduled, in permit, or in the binding cycle). + inFlightPods map[types.UID]inFlightPod + // receivedEvents holds the events received by the scheduling queue. + receivedEvents *list.List + // activeQ is heap structure that scheduler actively looks at to find pods to // schedule. Head of heap is the highest priority pod. activeQ *heap.Heap @@ -173,6 +184,7 @@ type PriorityQueue struct { // received a move request. Unschedulable pods in and before this scheduling // cycle will be put back to activeQueue if we were trying to schedule them // when we received move request. + // TODO: this will be removed after SchedulingQueueHint goes to stable and the feature gate is removed. moveRequestCycle int64 // preEnqueuePluginMap is keyed with profile name, valued with registered preEnqueue plugins. @@ -189,6 +201,9 @@ type PriorityQueue struct { metricsRecorder metrics.MetricAsyncRecorder // pluginMetricsSamplePercent is the percentage of plugin metrics to be sampled. pluginMetricsSamplePercent int + + // isSchedulingQueueHintEnabled indicates whether the feature gate for the scheduling queue is enabled. + isSchedulingQueueHintEnabled bool } // QueueingHintFunction is the wrapper of QueueingHintFn that has PluginName. @@ -197,6 +212,25 @@ type QueueingHintFunction struct { QueueingHintFn framework.QueueingHintFn } +type inFlightPod struct { + // previousEvent is the latest observed event when the pod is popped. + previousEvent *list.Element +} + +// clusterEvent has the event and involved objects. +type clusterEvent struct { + event framework.ClusterEvent + // oldObj is the object that involved this event. + oldObj interface{} + // newObj is the object that involved this event. + newObj interface{} + + // inFlightPodsNum is the counter of pods referring to this cluster event. + // It is initialized with the number of Pods being scheduled when the event is received, + // and is decremented when the scheduling for those Pods are Done(). + inFlightPodsNum int +} + type priorityQueueOptions struct { clock clock.Clock podInitialBackoffDuration time.Duration @@ -330,11 +364,14 @@ func NewPriorityQueue( podMaxInUnschedulablePodsDuration: options.podMaxInUnschedulablePodsDuration, activeQ: heap.NewWithRecorder(podInfoKeyFunc, comp, metrics.NewActivePodsRecorder()), unschedulablePods: newUnschedulablePods(metrics.NewUnschedulablePodsRecorder(), metrics.NewGatedPodsRecorder()), - moveRequestCycle: -1, + inFlightPods: make(map[types.UID]inFlightPod), + receivedEvents: list.New(), preEnqueuePluginMap: options.preEnqueuePluginMap, queueingHintMap: options.queueingHintMap, metricsRecorder: options.metricsRecorder, pluginMetricsSamplePercent: options.pluginMetricsSamplePercent, + moveRequestCycle: -1, + isSchedulingQueueHintEnabled: utilfeature.DefaultFeatureGate.Enabled(features.SchedulerQueueingHints), } pq.cond.L = &pq.lock pq.podBackoffQ = heap.NewWithRecorder(podInfoKeyFunc, pq.podsCompareBackoffCompleted, metrics.NewBackoffPodsRecorder()) @@ -567,25 +604,67 @@ func (p *PriorityQueue) SchedulingCycle() int64 { return p.schedulingCycle } -// AddUnschedulableIfNotPresent inserts a pod that cannot be scheduled into +// determineSchedulingHintForInFlightPod looks at the unschedulable plugins of the given Pod +// and determines the scheduling hint for this Pod while checking the events that happened during in-flight. +func (p *PriorityQueue) determineSchedulingHintForInFlightPod(logger klog.Logger, pInfo *framework.QueuedPodInfo, podSchedulingCycle int64) framework.QueueingHint { + if len(pInfo.UnschedulablePlugins) == 0 { + // When there is no unschedulable plugin, we cannot have a guess which event makes this Pod schedulable. + // Here, we use the latest requestCycle so that this Pod won't be stuck in the unschedulable pod pool for a long time. + if p.receivedEvents.Len() != 0 { + return framework.QueueAfterBackoff + } + return framework.QueueSkip + } + + inFlightPod, ok := p.inFlightPods[pInfo.Pod.UID] + if !ok { + // It shouldn't reach here unless there is a bug somewhere. + // But, set podSchedulingCycle to moveRequestCycle + // so that this Pod won't stuck in the unschedulable pod pool. + logger.Error(nil, "In flight Pod isn't found in the scheduling queue. If you see this error log, it's likely a bug in the scheduler.", "pod", klog.KObj(pInfo.Pod)) + return framework.QueueAfterBackoff + } + + // AddUnschedulableIfNotPresent is called with the Pod at the end of scheduling or binding. + // So, given pInfo should have been Pop()ed before, + // we can assume pInfo must be recorded in inFlightPods. + // check if there is an event that makes this Pod schedulable based on pInfo.UnschedulablePlugins. + event := p.receivedEvents.Front() + if inFlightPod.previousEvent != nil { + // only check events that happened after the Pod was popped. + event = inFlightPod.previousEvent.Next() + } + schedulingHint := framework.QueueSkip + for ; event != nil; event = event.Next() { + e := event.Value.(*clusterEvent) + + hint := p.isPodWorthRequeuing(logger, pInfo, e.event, e.oldObj, e.newObj) + if hint == framework.QueueSkip { + continue + } + + if hint == framework.QueueImmediately { + // QueueImmediately is the strongest opinion, we don't need to check other events. + schedulingHint = framework.QueueImmediately + break + } + if hint == framework.QueueAfterBackoff { + // replace schedulingHint with QueueAfterBackoff, + // but continue to check other events because we may find it QueueImmediately with other events. + schedulingHint = framework.QueueAfterBackoff + } + } + return schedulingHint +} + +// addUnschedulableIfNotPresentWithoutQueueingHint inserts a pod that cannot be scheduled into // the queue, unless it is already in the queue. Normally, PriorityQueue puts // unschedulable pods in `unschedulablePods`. But if there has been a recent move // request, then the pod is put in `podBackoffQ`. -func (p *PriorityQueue) AddUnschedulableIfNotPresent(logger klog.Logger, pInfo *framework.QueuedPodInfo, podSchedulingCycle int64) error { - p.lock.Lock() - defer p.lock.Unlock() +// TODO: This function is called only when p.isSchedulingQueueHintEnabled is false, +// and this will be removed after SchedulingQueueHint goes to stable and the feature gate is removed. +func (p *PriorityQueue) addUnschedulableWithoutQueueingHint(logger klog.Logger, pInfo *framework.QueuedPodInfo, podSchedulingCycle int64) error { pod := pInfo.Pod - if p.unschedulablePods.get(pod) != nil { - return fmt.Errorf("Pod %v is already present in unschedulable queue", klog.KObj(pod)) - } - - if _, exists, _ := p.activeQ.Get(pInfo); exists { - return fmt.Errorf("Pod %v is already present in the active queue", klog.KObj(pod)) - } - if _, exists, _ := p.podBackoffQ.Get(pInfo); exists { - return fmt.Errorf("Pod %v is already present in the backoff queue", klog.KObj(pod)) - } - // Refresh the timestamp since the pod is re-added. pInfo.Timestamp = p.clock.Now() @@ -610,6 +689,54 @@ func (p *PriorityQueue) AddUnschedulableIfNotPresent(logger klog.Logger, pInfo * return nil } +// AddUnschedulableIfNotPresent inserts a pod that cannot be scheduled into +// the queue, unless it is already in the queue. Normally, PriorityQueue puts +// unschedulable pods in `unschedulablePods`. But if there has been a recent move +// request, then the pod is put in `podBackoffQ`. +func (p *PriorityQueue) AddUnschedulableIfNotPresent(logger klog.Logger, pInfo *framework.QueuedPodInfo, podSchedulingCycle int64) error { + p.lock.Lock() + defer p.lock.Unlock() + + // In any case, this Pod will be moved back to the queue and we should call Done. + defer p.done(pInfo.Pod.UID) + + pod := pInfo.Pod + if p.unschedulablePods.get(pod) != nil { + return fmt.Errorf("Pod %v is already present in unschedulable queue", klog.KObj(pod)) + } + + if _, exists, _ := p.activeQ.Get(pInfo); exists { + return fmt.Errorf("Pod %v is already present in the active queue", klog.KObj(pod)) + } + if _, exists, _ := p.podBackoffQ.Get(pInfo); exists { + return fmt.Errorf("Pod %v is already present in the backoff queue", klog.KObj(pod)) + } + + if !p.isSchedulingQueueHintEnabled { + // fall back to the old behavior which doesn't depend on the queueing hint. + return p.addUnschedulableWithoutQueueingHint(logger, pInfo, podSchedulingCycle) + } + + // Refresh the timestamp since the pod is re-added. + pInfo.Timestamp = p.clock.Now() + + // If a move request has been received, move it to the BackoffQ, otherwise move + // it to unschedulablePods. + for plugin := range pInfo.UnschedulablePlugins { + metrics.UnschedulableReason(plugin, pInfo.Pod.Spec.SchedulerName).Inc() + } + + // Based on isPodWorthRequeuing(), we check whether this Pod may change its scheduling result by any of events that happened during scheduling. + schedulingHint := p.determineSchedulingHintForInFlightPod(logger, pInfo, podSchedulingCycle) + + // In this case, we try to requeue this Pod to activeQ/backoffQ. + queue := p.requeuePodViaQueueingHint(logger, pInfo, schedulingHint, ScheduleAttemptFailure) + logger.V(6).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pod), "event", ScheduleAttemptFailure, "queue", queue, "schedulingCycle", podSchedulingCycle) + + p.addNominatedPodUnlocked(logger, pInfo.PodInfo, nil) + return nil +} + // flushBackoffQCompleted Moves all pods from backoffQ which have completed backoff in to activeQ func (p *PriorityQueue) flushBackoffQCompleted(logger klog.Logger) { p.lock.Lock() @@ -685,9 +812,74 @@ func (p *PriorityQueue) Pop() (*framework.QueuedPodInfo, error) { pInfo := obj.(*framework.QueuedPodInfo) pInfo.Attempts++ p.schedulingCycle++ + // In flight, no move request yet. + if p.isSchedulingQueueHintEnabled { + p.inFlightPods[pInfo.Pod.UID] = inFlightPod{ + previousEvent: p.receivedEvents.Back(), + } + } + + for plugin := range pInfo.UnschedulablePlugins { + metrics.UnschedulableReason(plugin, pInfo.Pod.Spec.SchedulerName).Dec() + } + return pInfo, nil } +// Done must be called for pod returned by Pop. This allows the queue to +// keep track of which pods are currently being processed. +func (p *PriorityQueue) Done(pod types.UID) { + p.lock.Lock() + defer p.lock.Unlock() + + p.done(pod) +} + +func (p *PriorityQueue) done(pod types.UID) { + if !p.isSchedulingQueueHintEnabled { + // do nothing if schedulingQueueHint is disabled. + // In that case, we don't have inFlightPods and receivedEvents. + return + } + inFlightPod, ok := p.inFlightPods[pod] + if !ok { + // This Pod is already done()ed. + return + } + delete(p.inFlightPods, pod) + + // remove events which is only referred from this Pod + // so that the receivedEvents map doesn't grow infinitely. + + // Find the event that we should start. + // case1. If the previousEvent is nil, it means no receivedEvents when this Pod's scheduling started. + // We start from the first event in the receivedEvents. + // case2. If the previousEvent is not nil, but the inFlightPodsNum is 0, + // this previousEvent is removed from the list already. + // We start from the first event in the receivedEvents. + event := p.receivedEvents.Front() + if inFlightPod.previousEvent != nil && inFlightPod.previousEvent.Value.(*clusterEvent).inFlightPodsNum != 0 { + // case3. If the previousEvent is not nil, and the inFlightPodsNum is not 0, + // we can start from the next event of the previousEvent. + event = inFlightPod.previousEvent.Next() + } + + for event != nil { + e := event.Value.(*clusterEvent) + // decrement inFlightPodsNum on events that happened after the Pod is popped. + e.inFlightPodsNum-- + if e.inFlightPodsNum <= 0 { + // remove the event from the list if no Pod refers to it. + eventToDelete := event + // we need to take next event before removal. + event = event.Next() + p.receivedEvents.Remove(eventToDelete) + continue + } + event = event.Next() + } +} + // isPodUpdated checks if the pod is updated in a way that it may have become // schedulable. It drops status of the pod and compares it with old version, // except for pod.status.resourceClaimStatuses: changing that may have an @@ -853,7 +1045,7 @@ func (p *PriorityQueue) MoveAllToActiveOrBackoffQueue(logger klog.Logger, event func (p *PriorityQueue) requeuePodViaQueueingHint(logger klog.Logger, pInfo *framework.QueuedPodInfo, schedulingHint framework.QueueingHint, event string) string { if schedulingHint == framework.QueueSkip { p.unschedulablePods.addOrUpdate(pInfo) - metrics.SchedulerQueueIncomingPods.WithLabelValues("unschedulable", ScheduleAttemptFailure).Inc() + metrics.SchedulerQueueIncomingPods.WithLabelValues("unschedulable", event).Inc() return unschedulablePods } @@ -907,7 +1099,22 @@ func (p *PriorityQueue) movePodsToActiveOrBackoffQueue(logger klog.Logger, podIn activated = true } } + p.moveRequestCycle = p.schedulingCycle + + // (no need to check the feature gate because there is always no p.inFlightPods when the feature is disabled.) + if len(p.inFlightPods) != 0 { + // AddUnschedulableIfNotPresent might get called for in-flight Pods later, and in + // AddUnschedulableIfNotPresent we need to know whether events were + // observed while scheduling them. + p.receivedEvents.PushBack(&clusterEvent{ + event: event, + inFlightPodsNum: len(p.inFlightPods), + oldObj: oldObj, + newObj: newObj, + }) + } + if activated { p.cond.Broadcast() } @@ -1233,24 +1440,6 @@ func newPodNominator(podLister listersv1.PodLister) *nominator { } } -// MakeNextPodFunc returns a function to retrieve the next pod from a given -// scheduling queue -func MakeNextPodFunc(logger klog.Logger, queue SchedulingQueue) func() *framework.QueuedPodInfo { - return func() *framework.QueuedPodInfo { - podInfo, err := queue.Pop() - if err == nil && podInfo != nil { - logger.V(4).Info("About to try and schedule pod", "pod", klog.KObj(podInfo.Pod)) - for plugin := range podInfo.UnschedulablePlugins { - metrics.UnschedulableReason(plugin, podInfo.Pod.Spec.SchedulerName).Dec() - } - return podInfo - } else if err != nil { - logger.Error(err, "Error while retrieving next pod from scheduling queue") - } - return nil - } -} - func podInfoKeyFunc(obj interface{}) (string, error) { return cache.MetaNamespaceKeyFunc(obj.(*framework.QueuedPodInfo).Pod) } diff --git a/pkg/scheduler/internal/queue/scheduling_queue_test.go b/pkg/scheduler/internal/queue/scheduling_queue_test.go index 3ddf38ac1ce..541f3f7d42c 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue_test.go +++ b/pkg/scheduler/internal/queue/scheduling_queue_test.go @@ -17,6 +17,7 @@ limitations under the License. package queue import ( + "container/list" "context" "fmt" "math" @@ -32,12 +33,15 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" + utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/fake" + featuregatetesting "k8s.io/component-base/featuregate/testing" "k8s.io/component-base/metrics/testutil" "k8s.io/klog/v2" "k8s.io/klog/v2/ktesting" podutil "k8s.io/kubernetes/pkg/api/v1/pod" + "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/queuesort" "k8s.io/kubernetes/pkg/scheduler/metrics" @@ -175,14 +179,454 @@ func TestPriorityQueue_AddWithReversePriorityLessFunc(t *testing.T) { } } +func clusterEventsToList(clusterEvents []*clusterEvent) *list.List { + l := list.New() + for _, event := range clusterEvents { + l.PushBack(event) + } + return l +} + +func listToClusterEvents(l *list.List) []*clusterEvent { + clusterEvents := []*clusterEvent{} + for e := l.Front(); e != nil; e = e.Next() { + clusterEvents = append(clusterEvents, e.Value.(*clusterEvent)) + } + return clusterEvents +} + +func Test_InFlightPods(t *testing.T) { + pod := st.MakePod().Name("targetpod").UID("pod1").Obj() + pod2 := st.MakePod().Name("targetpod2").UID("pod2").Obj() + pod3 := st.MakePod().Name("targetpod3").UID("pod3").Obj() + + type action struct { + // ONLY ONE of the following should be set. + eventHappens *framework.ClusterEvent + podPopped *v1.Pod + podEnqueued *framework.QueuedPodInfo + } + + tests := []struct { + name string + queueingHintMap QueueingHintMapPerProfile + // initialPods is the initial Pods in the activeQ. + initialPods []*v1.Pod + actions []action + wantInFlightPods map[types.UID]inFlightPod + wantActiveQPodNames []string + wantBackoffQPodNames []string + wantUnschedPodPoolPodNames []string + wantReceivedEvents *list.List + isSchedulingQueueHintEnabled bool + }{ + { + name: "when SchedulingQueueHint is disabled, inFlightPods and receivedEvents should be empty", + initialPods: []*v1.Pod{pod}, + actions: []action{ + // This Pod shouldn't be added to inFlightPods because SchedulingQueueHint is disabled. + {podPopped: pod}, + // This event shouldn't be added to receivedEvents because SchedulingQueueHint is disabled. + {eventHappens: &PvAdd}, + }, + wantInFlightPods: map[types.UID]inFlightPod{}, + wantReceivedEvents: clusterEventsToList([]*clusterEvent{}), + }, + { + name: "when SchedulingQueueHint is disabled, which queue to enqueue Pod should be decided without SchedulingQueueHint", + initialPods: []*v1.Pod{pod}, + actions: []action{ + {podPopped: pod}, + {eventHappens: &AssignedPodAdd}, + {podEnqueued: newQueuedPodInfoForLookup(pod, "fooPlugin1")}, + }, + wantBackoffQPodNames: []string{"targetpod"}, + wantInFlightPods: map[types.UID]inFlightPod{}, + wantReceivedEvents: list.New(), + queueingHintMap: QueueingHintMapPerProfile{ + "": { + // This hint fn tells that this event doesn't make a Pod schedulable. + // However, this QueueingHintFn will be ignored actually because SchedulingQueueHint is disabled. + AssignedPodAdd: { + { + PluginName: "fooPlugin1", + QueueingHintFn: queueHintReturnQueueSkip, + }, + }, + }, + }, + }, + { + name: "Pod is registered in inFlightPods with no previousEvent if Pod is popped from activeQ while no receivedEvents", + isSchedulingQueueHintEnabled: true, + initialPods: []*v1.Pod{pod}, + actions: []action{ + // This won't be added to receivedEvents because no inFlightPods at this point. + {eventHappens: &PvcAdd}, + // This Pod has no previousEvent because no receivedEvents at this point. + {podPopped: pod}, + {eventHappens: &PvAdd}, + }, + wantInFlightPods: map[types.UID]inFlightPod{ + "pod1": { + // no previousEvent + }, + }, + wantReceivedEvents: clusterEventsToList([]*clusterEvent{ + {event: PvAdd, inFlightPodsNum: 1}, + }), + }, + { + name: "Pod, registered in inFlightPods with no previousEvent, is enqueued back to activeQ", + isSchedulingQueueHintEnabled: true, + initialPods: []*v1.Pod{pod, pod2}, + actions: []action{ + // This won't be added to receivedEvents because no inFlightPods at this point. + {eventHappens: &PvcAdd}, + // This Pod has no previousEvent because no receivedEvents at this point. + {podPopped: pod}, + {eventHappens: &PvAdd}, + {podPopped: pod2}, + {eventHappens: &NodeAdd}, + {podEnqueued: newQueuedPodInfoForLookup(pod)}, + }, + wantBackoffQPodNames: []string{"targetpod"}, + wantInFlightPods: map[types.UID]inFlightPod{ + "pod2": { + // When pod is enqueued back to queue, inFlightPodsNum in previousEvent is also updated to 0. + previousEvent: &list.Element{Value: &clusterEvent{event: PvAdd, inFlightPodsNum: 0}}, + }, + }, + wantReceivedEvents: clusterEventsToList([]*clusterEvent{ + // event: PvAdd is removed when pod is enqueued back to queue. + {event: NodeAdd, inFlightPodsNum: 1}, // inFlightPodsNum is updated from 2 to 1. + }), + }, + { + name: "Pod registered in inFlightPods with previousEvent with inFlightPodsNum:0 is enqueued back to activeQ", + isSchedulingQueueHintEnabled: true, + initialPods: []*v1.Pod{pod, pod2}, + actions: []action{ + // This won't be added to receivedEvents because no inFlightPods at this point. + {eventHappens: &PvcAdd}, + // This Pod has no previousEvent because no receivedEvents at this point. + {podPopped: pod}, + {eventHappens: &PvAdd}, + {podPopped: pod2}, + {eventHappens: &NodeAdd}, + {podEnqueued: newQueuedPodInfoForLookup(pod)}, + {eventHappens: &CSINodeUpdate}, + // pod2 is registered in inFlightPods with previousEvent with inFlightPodsNum:0. + {podEnqueued: newQueuedPodInfoForLookup(pod2)}, + }, + wantBackoffQPodNames: []string{"targetpod", "targetpod2"}, + wantInFlightPods: map[types.UID]inFlightPod{}, + wantReceivedEvents: clusterEventsToList([]*clusterEvent{ + // all events are correctly cleaned up. + }), + }, + { + name: "Pod registered in inFlightPods with previousEvent with inFlightPodsNum:non-zero is enqueued back to activeQ", + isSchedulingQueueHintEnabled: true, + initialPods: []*v1.Pod{pod, pod2, pod3}, + actions: []action{ + // This won't be added to receivedEvents because no inFlightPods at this point. + {eventHappens: &PvcAdd}, + // This Pod has no previousEvent because no receivedEvents at this point. + {podPopped: pod}, + {eventHappens: &PvAdd}, + // This Pod will get previousEvent (PvAdd). + {podPopped: pod2}, + {eventHappens: &NodeAdd}, + // This Pod will get previousEvent (NodeAdd). + // This Pod won't be requeued again. + {podPopped: pod3}, + {eventHappens: &AssignedPodAdd}, + // pod2 is registered in inFlightPods with previousEvent with inFlightPodsNum:non-zero. + {podEnqueued: newQueuedPodInfoForLookup(pod2)}, + }, + wantBackoffQPodNames: []string{"targetpod2"}, + wantInFlightPods: map[types.UID]inFlightPod{ + "pod1": { + // no previousEvent + }, + "pod3": { + previousEvent: &list.Element{Value: &clusterEvent{event: NodeAdd, inFlightPodsNum: 1}}, + }, + }, + wantReceivedEvents: clusterEventsToList([]*clusterEvent{ + {event: PvAdd, inFlightPodsNum: 1}, + {event: NodeAdd, inFlightPodsNum: 1}, + {event: AssignedPodAdd, inFlightPodsNum: 2}, + }), + }, + { + name: "events before popping Pod are ignored", + isSchedulingQueueHintEnabled: true, + initialPods: []*v1.Pod{pod}, + actions: []action{ + {eventHappens: &WildCardEvent}, + {podPopped: pod}, + {eventHappens: &AssignedPodAdd}, + {podEnqueued: newQueuedPodInfoForLookup(pod, "fooPlugin1")}, + }, + wantUnschedPodPoolPodNames: []string{"targetpod"}, + wantInFlightPods: map[types.UID]inFlightPod{}, + wantReceivedEvents: list.New(), + queueingHintMap: QueueingHintMapPerProfile{ + "": { + // fooPlugin1 has a queueing hint function for AssignedPodAdd, + // but hint fn tells that this event doesn't make a Pod scheudlable. + AssignedPodAdd: { + { + PluginName: "fooPlugin1", + QueueingHintFn: queueHintReturnQueueSkip, + }, + }, + }, + }, + }, + { + name: "pod is enqueued to backoff if no failed plugin", + isSchedulingQueueHintEnabled: true, + initialPods: []*v1.Pod{pod}, + actions: []action{ + {podPopped: pod}, + {eventHappens: &AssignedPodAdd}, + {podEnqueued: newQueuedPodInfoForLookup(pod)}, + }, + wantBackoffQPodNames: []string{"targetpod"}, + wantInFlightPods: map[types.UID]inFlightPod{}, + wantReceivedEvents: list.New(), + queueingHintMap: QueueingHintMapPerProfile{ + "": { + // It will be ignored because no failed plugin. + AssignedPodAdd: { + { + PluginName: "fooPlugin1", + QueueingHintFn: queueHintReturnQueueImmediately, + }, + }, + }, + }, + }, + { + name: "pod is enqueued to unschedulable pod pool if no events that can make the pod schedulable", + isSchedulingQueueHintEnabled: true, + initialPods: []*v1.Pod{pod}, + actions: []action{ + {podPopped: pod}, + {eventHappens: &NodeAdd}, + {podEnqueued: newQueuedPodInfoForLookup(pod, "fooPlugin1")}, + }, + wantUnschedPodPoolPodNames: []string{"targetpod"}, + wantInFlightPods: map[types.UID]inFlightPod{}, + wantReceivedEvents: list.New(), + queueingHintMap: QueueingHintMapPerProfile{ + "": { + // fooPlugin1 has no queueing hint function for NodeAdd. + AssignedPodAdd: { + { + // It will be ignored because the event is not NodeAdd. + PluginName: "fooPlugin1", + QueueingHintFn: queueHintReturnQueueImmediately, + }, + }, + }, + }, + }, + { + name: "pod is enqueued to unschedulable pod pool because the failed plugin has a hint fn but it returns QueueSkip", + isSchedulingQueueHintEnabled: true, + initialPods: []*v1.Pod{pod}, + actions: []action{ + {podPopped: pod}, + {eventHappens: &AssignedPodAdd}, + {podEnqueued: newQueuedPodInfoForLookup(pod, "fooPlugin1")}, + }, + wantUnschedPodPoolPodNames: []string{"targetpod"}, + wantInFlightPods: map[types.UID]inFlightPod{}, + wantReceivedEvents: list.New(), + queueingHintMap: QueueingHintMapPerProfile{ + "": { + // fooPlugin1 has a queueing hint function for AssignedPodAdd, + // but hint fn tells that this event doesn't make a Pod scheudlable. + AssignedPodAdd: { + { + PluginName: "fooPlugin1", + QueueingHintFn: queueHintReturnQueueSkip, + }, + }, + }, + }, + }, + { + name: "pod is enqueued to activeQ because the failed plugin has a hint fn and it returns QueueImmediately", + isSchedulingQueueHintEnabled: true, + initialPods: []*v1.Pod{pod}, + actions: []action{ + {podPopped: pod}, + {eventHappens: &AssignedPodAdd}, + {podEnqueued: newQueuedPodInfoForLookup(pod, "fooPlugin1", "fooPlugin2", "fooPlugin3")}, + }, + wantActiveQPodNames: []string{"targetpod"}, + wantInFlightPods: map[types.UID]inFlightPod{}, + wantReceivedEvents: list.New(), + queueingHintMap: QueueingHintMapPerProfile{ + "": { + AssignedPodAdd: { + { + // it will be ignored because the hint fn returns QueueSkip that is weaker than queueHintReturnQueueImmediately from fooPlugin1. + PluginName: "fooPlugin3", + QueueingHintFn: queueHintReturnQueueSkip, + }, + { + // it will be ignored because the hint fn returns QueueAfterBackoff that is weaker than queueHintReturnQueueImmediately from fooPlugin1. + PluginName: "fooPlugin2", + QueueingHintFn: queueHintReturnQueueAfterBackoff, + }, + { + // The hint fn tells that this event makes a Pod scheudlable immediately. + PluginName: "fooPlugin1", + QueueingHintFn: queueHintReturnQueueImmediately, + }, + }, + }, + }, + }, + { + name: "pod is enqueued to backoffQ because the failed plugin has a hint fn and it returns QueueAfterBackoff", + isSchedulingQueueHintEnabled: true, + initialPods: []*v1.Pod{pod}, + actions: []action{ + {podPopped: pod}, + {eventHappens: &AssignedPodAdd}, + {podEnqueued: newQueuedPodInfoForLookup(pod, "fooPlugin1", "fooPlugin2")}, + }, + wantBackoffQPodNames: []string{"targetpod"}, + wantInFlightPods: map[types.UID]inFlightPod{}, + wantReceivedEvents: list.New(), + queueingHintMap: QueueingHintMapPerProfile{ + "": { + AssignedPodAdd: { + { + // it will be ignored because the hint fn returns QueueSkip that is weaker than queueHintReturnQueueAfterBackoff from fooPlugin1. + PluginName: "fooPlugin2", + QueueingHintFn: queueHintReturnQueueSkip, + }, + { + // The hint fn tells that this event makes a Pod scheudlable. + PluginName: "fooPlugin1", + QueueingHintFn: queueHintReturnQueueAfterBackoff, + }, + }, + }, + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.SchedulerQueueingHints, test.isSchedulingQueueHintEnabled)() + logger, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + obj := make([]runtime.Object, 0, len(test.initialPods)) + for _, p := range test.initialPods { + obj = append(obj, p) + } + q := NewTestQueueWithObjects(ctx, newDefaultQueueSort(), obj, WithQueueingHintMapPerProfile(test.queueingHintMap)) + + for _, p := range test.initialPods { + q.Add(logger, p) + } + + for _, action := range test.actions { + switch { + case action.podPopped != nil: + p, err := q.Pop() + if err != nil { + t.Fatalf("Pop failed: %v", err) + } + if p.Pod.UID != action.podPopped.UID { + t.Errorf("Unexpected popped pod: %v", p) + } + continue + case action.eventHappens != nil: + q.MoveAllToActiveOrBackoffQueue(logger, *action.eventHappens, nil, nil, nil) + case action.podEnqueued != nil: + q.AddUnschedulableIfNotPresent(logger, action.podEnqueued, q.SchedulingCycle()) + } + } + + if diff := cmp.Diff(test.wantInFlightPods, q.inFlightPods, cmp.AllowUnexported(inFlightPod{}, list.Element{}, clusterEvent{}), cmpopts.IgnoreFields(list.Element{}, "next", "prev", "list")); diff != "" { + t.Errorf("Unexpected diff in inFlightPods (-want, +got):\n%s", diff) + } + + if diff := cmp.Diff(listToClusterEvents(test.wantReceivedEvents), listToClusterEvents(q.receivedEvents), cmp.AllowUnexported(clusterEvent{})); diff != "" { + t.Errorf("Unexpected diff in receivedEvents (-want, +got):\n%s", diff) + } + + if test.wantActiveQPodNames != nil { + rawPodInfos := q.activeQ.List() + if len(rawPodInfos) != len(test.wantActiveQPodNames) { + diff := cmp.Diff(test.wantActiveQPodNames, rawPodInfos, cmpopts.SortSlices(func(a, b interface{}) bool { + return a.(framework.PodInfo).Pod.Name < b.(framework.PodInfo).Pod.Name + })) + t.Fatalf("Length of activeQ is not expected. Got %v, want %v.\n%s", len(rawPodInfos), len(test.wantActiveQPodNames), diff) + } + + wantPodNames := sets.New(test.wantActiveQPodNames...) + for _, rawPodInfo := range rawPodInfos { + podGotFromActiveQ := rawPodInfo.(*framework.QueuedPodInfo).Pod + if !wantPodNames.Has(podGotFromActiveQ.Name) { + t.Fatalf("Pod %v was not expected to be in the activeQ.", podGotFromActiveQ.Name) + } + } + } + + if test.wantBackoffQPodNames != nil { + rawPodInfos := q.podBackoffQ.List() + if len(rawPodInfos) != len(test.wantBackoffQPodNames) { + diff := cmp.Diff(test.wantBackoffQPodNames, rawPodInfos, cmpopts.SortSlices(func(a, b interface{}) bool { + return a.(framework.PodInfo).Pod.Name < b.(framework.PodInfo).Pod.Name + })) + t.Fatalf("Length of backoffQ is not expected. Got %v, want %v.\n%s", len(rawPodInfos), len(test.wantBackoffQPodNames), diff) + } + + wantPodNames := sets.New(test.wantBackoffQPodNames...) + for _, rawPodInfo := range rawPodInfos { + podGotFromBackoffQ := rawPodInfo.(*framework.QueuedPodInfo).Pod + if !wantPodNames.Has(podGotFromBackoffQ.Name) { + t.Fatalf("Pod %v was not expected to be in the backoffQ.", podGotFromBackoffQ.Name) + } + } + } + + for _, podName := range test.wantUnschedPodPoolPodNames { + p := getUnschedulablePod(q, &st.MakePod().Name(podName).Pod) + if p == nil { + t.Fatalf("Pod %v was not found in the unschedulablePods.", podName) + } + } + }) + } +} + func TestPriorityQueue_AddUnschedulableIfNotPresent(t *testing.T) { objs := []runtime.Object{highPriNominatedPodInfo.Pod, unschedulablePodInfo.Pod} logger, ctx := ktesting.NewTestContext(t) ctx, cancel := context.WithCancel(ctx) defer cancel() q := NewTestQueueWithObjects(ctx, newDefaultQueueSort(), objs) + // insert unschedulablePodInfo and pop right after that + // because the scheduling queue records unschedulablePod as in-flight Pod. + q.Add(logger, unschedulablePodInfo.Pod) + if p, err := q.Pop(); err != nil || p.Pod != unschedulablePodInfo.Pod { + t.Errorf("Expected: %v after Pop, but got: %v", unschedulablePodInfo.Pod.Name, p.Pod.Name) + } + q.Add(logger, highPriNominatedPodInfo.Pod) - q.AddUnschedulableIfNotPresent(logger, newQueuedPodInfoForLookup(highPriNominatedPodInfo.Pod), q.SchedulingCycle()) // Must not add anything. q.AddUnschedulableIfNotPresent(logger, newQueuedPodInfoForLookup(unschedulablePodInfo.Pod), q.SchedulingCycle()) expectedNominatedPods := &nominator{ nominatedPodToNode: map[types.UID]string{ @@ -202,6 +646,7 @@ func TestPriorityQueue_AddUnschedulableIfNotPresent(t *testing.T) { if len(q.nominator.nominatedPods) != 1 { t.Errorf("Expected nomindatePods to have one element: %v", q.nominator) } + // unschedulablePodInfo is inserted to unschedulable pod pool because no events happened during scheduling. if getUnschedulablePod(q, unschedulablePodInfo.Pod) != unschedulablePodInfo.Pod { t.Errorf("Pod %v was not found in the unschedulablePods.", unschedulablePodInfo.Pod.Name) } @@ -235,7 +680,7 @@ func TestPriorityQueue_AddUnschedulableIfNotPresent_Backoff(t *testing.T) { } // move all pods to active queue when we were trying to schedule them - q.MoveAllToActiveOrBackoffQueue(logger, TestEvent, nil, nil, nil) + q.MoveAllToActiveOrBackoffQueue(logger, WildCardEvent, nil, nil, nil) oldCycle := q.SchedulingCycle() firstPod, _ := q.Pop() @@ -298,6 +743,7 @@ func TestPriorityQueue_Update(t *testing.T) { ctx, cancel := context.WithCancel(ctx) defer cancel() q := NewTestQueueWithObjects(ctx, newDefaultQueueSort(), objs, WithClock(c)) + // add highPriorityPodInfo to activeQ. q.Update(logger, nil, highPriorityPodInfo.Pod) if _, exists, _ := q.activeQ.Get(newQueuedPodInfoForLookup(highPriorityPodInfo.Pod)); !exists { t.Errorf("Expected %v to be added to activeQ.", highPriorityPodInfo.Pod.Name) @@ -344,8 +790,11 @@ func TestPriorityQueue_Update(t *testing.T) { t.Errorf("Expected: %v after Pop, but got: %v", medPriorityPodInfo.Pod.Name, podGotFromBackoffQ.Name) } - // updating a pod which is in unschedulable queue, and it is still backing off, - // we will move it to backoff queue + // To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before testing AddUnschedulableIfNotPresent. + q.activeQ.Add(podInfo) + if p, err := q.Pop(); err != nil || p.Pod != medPriorityPodInfo.Pod { + t.Errorf("Expected: %v after Pop, but got: %v", medPriorityPodInfo.Pod.Name, p.Pod.Name) + } q.AddUnschedulableIfNotPresent(logger, q.newQueuedPodInfo(medPriorityPodInfo.Pod), q.SchedulingCycle()) if len(q.unschedulablePods.podInfoMap) != 1 { t.Error("Expected unschedulablePods to be 1.") @@ -739,6 +1188,10 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueueWithQueueingHint(t *testing. cl := testingclock.NewFakeClock(now) q := NewTestQueue(ctx, newDefaultQueueSort(), WithQueueingHintMapPerProfile(m), WithClock(cl)) // add to unsched pod pool + q.activeQ.Add(q.newQueuedPodInfo(test.podInfo.Pod)) + if p, err := q.Pop(); err != nil || p.Pod != test.podInfo.Pod { + t.Errorf("Expected: %v after Pop, but got: %v", test.podInfo.Pod.Name, p.Pod.Name) + } q.AddUnschedulableIfNotPresent(logger, test.podInfo, q.SchedulingCycle()) cl.Step(test.duration) @@ -773,22 +1226,43 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueue(t *testing.T) { }, } q := NewTestQueue(ctx, newDefaultQueueSort(), WithClock(c), WithQueueingHintMapPerProfile(m)) - q.Add(logger, medPriorityPodInfo.Pod) + // To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before AddUnschedulableIfNotPresent()s below. + q.activeQ.Add(q.newQueuedPodInfo(unschedulablePodInfo.Pod)) + if p, err := q.Pop(); err != nil || p.Pod != unschedulablePodInfo.Pod { + t.Errorf("Expected: %v after Pop, but got: %v", unschedulablePodInfo.Pod.Name, p.Pod.Name) + } + q.activeQ.Add(q.newQueuedPodInfo(highPriorityPodInfo.Pod)) + if p, err := q.Pop(); err != nil || p.Pod != highPriorityPodInfo.Pod { + t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPodInfo.Pod.Name, p.Pod.Name) + } q.AddUnschedulableIfNotPresent(logger, q.newQueuedPodInfo(unschedulablePodInfo.Pod, "fooPlugin"), q.SchedulingCycle()) q.AddUnschedulableIfNotPresent(logger, q.newQueuedPodInfo(highPriorityPodInfo.Pod, "fooPlugin"), q.SchedulingCycle()) // Construct a Pod, but don't associate its scheduler failure to any plugin hpp1 := highPriorityPodInfo.Pod.DeepCopy() hpp1.Name = "hpp1" + q.activeQ.Add(q.newQueuedPodInfo(hpp1)) + if p, err := q.Pop(); err != nil || p.Pod != hpp1 { + t.Errorf("Expected: %v after Pop, but got: %v", hpp1, p.Pod.Name) + } q.AddUnschedulableIfNotPresent(logger, q.newQueuedPodInfo(hpp1), q.SchedulingCycle()) // Construct another Pod, and associate its scheduler failure to plugin "barPlugin". hpp2 := highPriorityPodInfo.Pod.DeepCopy() hpp2.Name = "hpp2" + q.activeQ.Add(q.newQueuedPodInfo(hpp2)) + if p, err := q.Pop(); err != nil || p.Pod != hpp2 { + t.Errorf("Expected: %v after Pop, but got: %v", hpp2, p.Pod.Name) + } q.AddUnschedulableIfNotPresent(logger, q.newQueuedPodInfo(hpp2, "barPlugin"), q.SchedulingCycle()) // Pods is still backing off, move the pod into backoffQ. q.MoveAllToActiveOrBackoffQueue(logger, NodeAdd, nil, nil, nil) + q.Add(logger, medPriorityPodInfo.Pod) if q.activeQ.Len() != 1 { t.Errorf("Expected 1 item to be in activeQ, but got: %v", q.activeQ.Len()) } + // Pop out the medPriorityPodInfo in activeQ. + if p, err := q.Pop(); err != nil || p.Pod != medPriorityPodInfo.Pod { + t.Errorf("Expected: %v after Pop, but got: %v", medPriorityPodInfo.Pod, p.Pod.Name) + } // hpp2 won't be moved. if q.podBackoffQ.Len() != 3 { t.Fatalf("Expected 3 items to be in podBackoffQ, but got: %v", q.podBackoffQ.Len()) @@ -800,9 +1274,22 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueue(t *testing.T) { } q.schedulingCycle++ + q.activeQ.Add(q.newQueuedPodInfo(unschedulablePodInfo.Pod)) + if p, err := q.Pop(); err != nil || p.Pod != unschedulablePodInfo.Pod { + t.Errorf("Expected: %v after Pop, but got: %v", unschedulablePodInfo.Pod.Name, p.Pod.Name) + } + q.activeQ.Add(q.newQueuedPodInfo(highPriorityPodInfo.Pod)) + if p, err := q.Pop(); err != nil || p.Pod != highPriorityPodInfo.Pod { + t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPodInfo.Pod.Name, p.Pod.Name) + } + q.activeQ.Add(q.newQueuedPodInfo(hpp1)) + if p, err := q.Pop(); err != nil || p.Pod != hpp1 { + t.Errorf("Expected: %v after Pop, but got: %v", hpp1, p.Pod.Name) + } q.AddUnschedulableIfNotPresent(logger, q.newQueuedPodInfo(unschedulablePodInfo.Pod, "fooPlugin"), q.SchedulingCycle()) q.AddUnschedulableIfNotPresent(logger, q.newQueuedPodInfo(highPriorityPodInfo.Pod, "fooPlugin"), q.SchedulingCycle()) q.AddUnschedulableIfNotPresent(logger, q.newQueuedPodInfo(hpp1), q.SchedulingCycle()) + q.Add(logger, medPriorityPodInfo.Pod) for _, pod := range []*v1.Pod{unschedulablePodInfo.Pod, highPriorityPodInfo.Pod, hpp1, hpp2} { if q.unschedulablePods.get(pod) == nil { t.Errorf("Expected %v in the unschedulablePods", pod.Name) @@ -829,7 +1316,7 @@ func TestPriorityQueue_AssignedPodAdded(t *testing.T) { ctx, cancel := context.WithCancel(ctx) defer cancel() - affinityPod := st.MakePod().Name("afp").Namespace("ns1").UID("upns1").Annotation("annot2", "val2").Priority(mediumPriority).NominatedNodeName("node1").PodAffinityExists("service", "region", st.PodAffinityWithRequiredReq).Obj() + affinityPod := st.MakePod().Name("afp").Namespace("ns1").UID("afp").Annotation("annot2", "val2").Priority(mediumPriority).NominatedNodeName("node1").PodAffinityExists("service", "region", st.PodAffinityWithRequiredReq).Obj() labelPod := st.MakePod().Name("lbp").Namespace(affinityPod.Namespace).Label("service", "securityscan").Node("node1").Obj() c := testingclock.NewFakeClock(time.Now()) @@ -841,8 +1328,16 @@ func TestPriorityQueue_AssignedPodAdded(t *testing.T) { }, } q := NewTestQueue(ctx, newDefaultQueueSort(), WithClock(c), WithQueueingHintMapPerProfile(m)) + // To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before AddUnschedulableIfNotPresent()s below. + q.activeQ.Add(q.newQueuedPodInfo(unschedulablePodInfo.Pod)) + if p, err := q.Pop(); err != nil || p.Pod != unschedulablePodInfo.Pod { + t.Errorf("Expected: %v after Pop, but got: %v", unschedulablePodInfo.Pod.Name, p.Pod.Name) + } + q.activeQ.Add(q.newQueuedPodInfo(affinityPod)) + if p, err := q.Pop(); err != nil || p.Pod != affinityPod { + t.Errorf("Expected: %v after Pop, but got: %v", affinityPod.Name, p.Pod.Name) + } q.Add(logger, medPriorityPodInfo.Pod) - // Add a couple of pods to the unschedulablePods. q.AddUnschedulableIfNotPresent(logger, q.newQueuedPodInfo(unschedulablePodInfo.Pod, "fakePlugin"), q.SchedulingCycle()) q.AddUnschedulableIfNotPresent(logger, q.newQueuedPodInfo(affinityPod, "fakePlugin"), q.SchedulingCycle()) @@ -955,6 +1450,15 @@ func TestPriorityQueue_PendingPods(t *testing.T) { ctx, cancel := context.WithCancel(ctx) defer cancel() q := NewTestQueue(ctx, newDefaultQueueSort()) + // To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before AddUnschedulableIfNotPresent()s below. + q.activeQ.Add(q.newQueuedPodInfo(unschedulablePodInfo.Pod)) + if p, err := q.Pop(); err != nil || p.Pod != unschedulablePodInfo.Pod { + t.Errorf("Expected: %v after Pop, but got: %v", unschedulablePodInfo.Pod.Name, p.Pod.Name) + } + q.activeQ.Add(q.newQueuedPodInfo(highPriorityPodInfo.Pod)) + if p, err := q.Pop(); err != nil || p.Pod != highPriorityPodInfo.Pod { + t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPodInfo.Pod.Name, p.Pod.Name) + } q.Add(logger, medPriorityPodInfo.Pod) q.AddUnschedulableIfNotPresent(logger, q.newQueuedPodInfo(unschedulablePodInfo.Pod), q.SchedulingCycle()) q.AddUnschedulableIfNotPresent(logger, q.newQueuedPodInfo(highPriorityPodInfo.Pod), q.SchedulingCycle()) @@ -1287,6 +1791,11 @@ func TestPodFailedSchedulingMultipleTimesDoesNotBlockNewerPod(t *testing.T) { Message: "fake scheduling failure", }) + // To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before AddUnschedulableIfNotPresent() below. + q.activeQ.Add(q.newQueuedPodInfo(unschedulablePod)) + if p, err := q.Pop(); err != nil || p.Pod != unschedulablePod { + t.Errorf("Expected: %v after Pop, but got: %v", unschedulablePod.Name, p.Pod.Name) + } // Put in the unschedulable queue q.AddUnschedulableIfNotPresent(logger, newQueuedPodInfoForLookup(unschedulablePod), q.SchedulingCycle()) // Move clock to make the unschedulable pods complete backoff. @@ -1411,6 +1920,15 @@ func TestHighPriorityFlushUnschedulablePodsLeftover(t *testing.T) { Message: "fake scheduling failure", }) + // To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before AddUnschedulableIfNotPresent()s below. + q.activeQ.Add(q.newQueuedPodInfo(highPod)) + if p, err := q.Pop(); err != nil || p.Pod != highPod { + t.Errorf("Expected: %v after Pop, but got: %v", highPod.Name, p.Pod.Name) + } + q.activeQ.Add(q.newQueuedPodInfo(midPod)) + if p, err := q.Pop(); err != nil || p.Pod != midPod { + t.Errorf("Expected: %v after Pop, but got: %v", midPod.Name, p.Pod.Name) + } q.AddUnschedulableIfNotPresent(logger, q.newQueuedPodInfo(highPod, "fakePlugin"), q.SchedulingCycle()) q.AddUnschedulableIfNotPresent(logger, q.newQueuedPodInfo(midPod, "fakePlugin"), q.SchedulingCycle()) c.Step(DefaultPodMaxInUnschedulablePodsDuration + time.Second) @@ -1516,10 +2034,16 @@ var ( queue.Add(logger, pInfo.Pod) } addUnschedulablePodBackToUnschedulablePods = func(logger klog.Logger, queue *PriorityQueue, pInfo *framework.QueuedPodInfo) { - queue.AddUnschedulableIfNotPresent(logger, pInfo, 0) + // To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before AddUnschedulableIfNotPresent() below. + queue.activeQ.Add(queue.newQueuedPodInfo(pInfo.Pod)) + if p, err := queue.Pop(); err != nil || p.Pod != pInfo.Pod { + panic(fmt.Sprintf("Expected: %v after Pop, but got: %v", pInfo.Pod.Name, p.Pod.Name)) + } + + queue.AddUnschedulableIfNotPresent(logger, pInfo, 1) } addUnschedulablePodBackToBackoffQ = func(logger klog.Logger, queue *PriorityQueue, pInfo *framework.QueuedPodInfo) { - queue.AddUnschedulableIfNotPresent(logger, pInfo, -1) + queue.AddUnschedulableIfNotPresent(logger, pInfo, 1) } addPodActiveQ = func(logger klog.Logger, queue *PriorityQueue, pInfo *framework.QueuedPodInfo) { queue.activeQ.Add(pInfo) @@ -2050,13 +2574,15 @@ func TestPerPodSchedulingMetrics(t *testing.T) { func TestIncomingPodsMetrics(t *testing.T) { timestamp := time.Now() + unschedulablePlg := "unschedulable_plugin" metrics.Register() var pInfos = make([]*framework.QueuedPodInfo, 0, 3) for i := 1; i <= 3; i++ { p := &framework.QueuedPodInfo{ PodInfo: mustNewTestPodInfo(t, st.MakePod().Name(fmt.Sprintf("test-pod-%d", i)).Namespace(fmt.Sprintf("ns%d", i)).UID(fmt.Sprintf("tp-%d", i)).Obj()), - Timestamp: timestamp, + Timestamp: timestamp, + UnschedulablePlugins: sets.New(unschedulablePlg), } pInfos = append(pInfos, p) } @@ -2261,6 +2787,11 @@ func TestMoveAllToActiveOrBackoffQueue_PreEnqueueChecks(t *testing.T) { defer cancel() q := NewTestQueue(ctx, newDefaultQueueSort()) for i, podInfo := range tt.podInfos { + // To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before AddUnschedulableIfNotPresent() below. + q.activeQ.Add(q.newQueuedPodInfo(podInfo.Pod)) + if p, err := q.Pop(); err != nil || p.Pod != podInfo.Pod { + t.Errorf("Expected: %v after Pop, but got: %v", podInfo.Pod.Name, p.Pod.Name) + } q.AddUnschedulableIfNotPresent(logger, podInfo, q.schedulingCycle) // NOTE: On Windows, time.Now() is not as precise, 2 consecutive calls may return the same timestamp, // resulting in 0 time delta / latency. This will cause the pods to be backed off in a random diff --git a/pkg/scheduler/schedule_one.go b/pkg/scheduler/schedule_one.go index 3b0610b2543..02320d2300f 100644 --- a/pkg/scheduler/schedule_one.go +++ b/pkg/scheduler/schedule_one.go @@ -66,12 +66,19 @@ const ( // scheduleOne does the entire scheduling workflow for a single pod. It is serialized on the scheduling algorithm's host fitting. func (sched *Scheduler) scheduleOne(ctx context.Context) { logger := klog.FromContext(ctx) - podInfo := sched.NextPod() + podInfo, err := sched.NextPod() + if err != nil { + logger.Error(err, "Error while retrieving next pod from scheduling queue") + return + } // pod could be nil when schedulerQueue is closed if podInfo == nil || podInfo.Pod == nil { return } + pod := podInfo.Pod + logger.V(4).Info("About to try and schedule pod", "pod", klog.KObj(pod)) + fwk, err := sched.frameworkForPod(pod) if err != nil { // This shouldn't happen, because we only accept for scheduling the pods @@ -115,6 +122,9 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) { if !status.IsSuccess() { sched.handleBindingCycleError(bindingCycleCtx, state, fwk, assumedPodInfo, start, scheduleResult, status) } + // Usually, DonePod is called inside the scheduling queue, + // but in this case, we need to call it here because this Pod won't go back to the scheduling queue. + sched.SchedulingQueue.Done(assumedPodInfo.Pod.UID) }() } @@ -922,6 +932,16 @@ func getAttemptsLabel(p *framework.QueuedPodInfo) string { // handleSchedulingFailure records an event for the pod that indicates the // pod has failed to schedule. Also, update the pod condition and nominated node name if set. func (sched *Scheduler) handleSchedulingFailure(ctx context.Context, fwk framework.Framework, podInfo *framework.QueuedPodInfo, status *framework.Status, nominatingInfo *framework.NominatingInfo, start time.Time) { + calledDone := false + defer func() { + if !calledDone { + // Basically, AddUnschedulableIfNotPresent calls DonePod internally. + // But, AddUnschedulableIfNotPresent isn't called in some corner cases. + // Here, we call DonePod explicitly to avoid leaking the pod. + sched.SchedulingQueue.Done(podInfo.Pod.UID) + } + }() + logger := klog.FromContext(ctx) reason := v1.PodReasonSchedulerError if status.IsUnschedulable() { @@ -967,11 +987,13 @@ func (sched *Scheduler) handleSchedulingFailure(ctx context.Context, fwk framewo cachedPod, e := podLister.Pods(pod.Namespace).Get(pod.Name) if e != nil { logger.Info("Pod doesn't exist in informer cache", "pod", klog.KObj(pod), "err", e) + // We need to call DonePod here because we don't call AddUnschedulableIfNotPresent in this case. } else { // In the case of extender, the pod may have been bound successfully, but timed out returning its response to the scheduler. // It could result in the live version to carry .spec.nodeName, and that's inconsistent with the internal-queued version. if len(cachedPod.Spec.NodeName) != 0 { logger.Info("Pod has been assigned to node. Abort adding it back to queue.", "pod", klog.KObj(pod), "node", cachedPod.Spec.NodeName) + // We need to call DonePod here because we don't call AddUnschedulableIfNotPresent in this case. } else { // As is from SharedInformer, we need to do a DeepCopy() here. // ignore this err since apiserver doesn't properly validate affinity terms @@ -980,6 +1002,7 @@ func (sched *Scheduler) handleSchedulingFailure(ctx context.Context, fwk framewo if err := sched.SchedulingQueue.AddUnschedulableIfNotPresent(logger, podInfo, sched.SchedulingQueue.SchedulingCycle()); err != nil { logger.Error(err, "Error occurred") } + calledDone = true } } diff --git a/pkg/scheduler/schedule_one_test.go b/pkg/scheduler/schedule_one_test.go index c9fb77ba97a..c3084345a5c 100644 --- a/pkg/scheduler/schedule_one_test.go +++ b/pkg/scheduler/schedule_one_test.go @@ -765,8 +765,8 @@ func TestSchedulerScheduleOne(t *testing.T) { sched := &Scheduler{ Cache: cache, client: client, - NextPod: func() *framework.QueuedPodInfo { - return &framework.QueuedPodInfo{PodInfo: mustNewPodInfo(t, item.sendPod)} + NextPod: func() (*framework.QueuedPodInfo, error) { + return &framework.QueuedPodInfo{PodInfo: mustNewPodInfo(t, item.sendPod)}, nil }, SchedulingQueue: internalqueue.NewTestQueue(ctx, nil), Profiles: profile.Map{testSchedulerName: fwk}, @@ -3228,8 +3228,8 @@ func setupTestScheduler(ctx context.Context, t *testing.T, queuedPodStore *clien client: client, nodeInfoSnapshot: internalcache.NewEmptySnapshot(), percentageOfNodesToScore: schedulerapi.DefaultPercentageOfNodesToScore, - NextPod: func() *framework.QueuedPodInfo { - return &framework.QueuedPodInfo{PodInfo: mustNewPodInfo(t, clientcache.Pop(queuedPodStore).(*v1.Pod))} + NextPod: func() (*framework.QueuedPodInfo, error) { + return &framework.QueuedPodInfo{PodInfo: mustNewPodInfo(t, clientcache.Pop(queuedPodStore).(*v1.Pod))}, nil }, SchedulingQueue: schedulingQueue, Profiles: profile.Map{testSchedulerName: fwk}, diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 3b0d761a89d..60d345af046 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -71,7 +71,7 @@ type Scheduler struct { // is available. We don't use a channel for this, because scheduling // a pod may take some amount of time and we don't want pods to get // stale while they sit in a channel. - NextPod func() *framework.QueuedPodInfo + NextPod func() (*framework.QueuedPodInfo, error) // FailureHandler is called upon a scheduling failure. FailureHandler FailureHandlerFn @@ -346,12 +346,12 @@ func New(ctx context.Context, nodeInfoSnapshot: snapshot, percentageOfNodesToScore: options.percentageOfNodesToScore, Extenders: extenders, - NextPod: internalqueue.MakeNextPodFunc(logger, podQueue), StopEverything: stopEverything, SchedulingQueue: podQueue, Profiles: profiles, logger: logger, } + sched.NextPod = podQueue.Pop sched.applyDefaultHandlers() if err = addAllEventHandlers(sched, informerFactory, dynInformerFactory, unionedGVKs(queueingHintsPerProfile)); err != nil { diff --git a/test/integration/scheduler/preemption/preemption_test.go b/test/integration/scheduler/preemption/preemption_test.go index f76f9ba21d1..7db8533889d 100644 --- a/test/integration/scheduler/preemption/preemption_test.go +++ b/test/integration/scheduler/preemption/preemption_test.go @@ -1477,13 +1477,13 @@ func initTestPreferNominatedNode(t *testing.T, nsPrefix string, opts ...schedule testutils.SyncSchedulerInformerFactory(testCtx) // wraps the NextPod() method to make it appear the preemption has been done already and the nominated node has been set. f := testCtx.Scheduler.NextPod - testCtx.Scheduler.NextPod = func() (podInfo *framework.QueuedPodInfo) { - podInfo = f() + testCtx.Scheduler.NextPod = func() (*framework.QueuedPodInfo, error) { + podInfo, _ := f() // Scheduler.Next() may return nil when scheduler is shutting down. if podInfo != nil { podInfo.Pod.Status.NominatedNodeName = "node-1" } - return podInfo + return podInfo, nil } go testCtx.Scheduler.Run(testCtx.Ctx) return testCtx diff --git a/test/integration/util/util.go b/test/integration/util/util.go index 4c1f57c8c1e..8d70e0006be 100644 --- a/test/integration/util/util.go +++ b/test/integration/util/util.go @@ -1097,7 +1097,7 @@ func NextPodOrDie(t *testing.T, testCtx *TestContext) *schedulerframework.Queued // NextPod() is a blocking operation. Wrap it in timeout() to avoid relying on // default go testing timeout (10m) to abort. if err := timeout(testCtx.Ctx, time.Second*5, func() { - podInfo = testCtx.Scheduler.NextPod() + podInfo, _ = testCtx.Scheduler.NextPod() }); err != nil { t.Fatalf("Timed out waiting for the Pod to be popped: %v", err) } @@ -1112,7 +1112,7 @@ func NextPod(t *testing.T, testCtx *TestContext) *schedulerframework.QueuedPodIn // NextPod() is a blocking operation. Wrap it in timeout() to avoid relying on // default go testing timeout (10m) to abort. if err := timeout(testCtx.Ctx, time.Second*5, func() { - podInfo = testCtx.Scheduler.NextPod() + podInfo, _ = testCtx.Scheduler.NextPod() }); err != nil { return nil }