diff --git a/pkg/scheduler/internal/queue/scheduling_queue.go b/pkg/scheduler/internal/queue/scheduling_queue.go index 98367a63d55..2c9043a8d03 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue.go +++ b/pkg/scheduler/internal/queue/scheduling_queue.go @@ -163,8 +163,17 @@ type PriorityQueue struct { // the maximum time a pod can stay in the unschedulablePods. podMaxInUnschedulablePodsDuration time.Duration + // cond is a condition that is notified when the pod is added to activeQ. + // It is used with activeQLock. cond sync.Cond + // activeQLock synchronizes all operations related to activeQ. + // It protects activeQ, inFlightPods, inFlightEvents, schedulingCycle and closed fields. + // Caution: DO NOT take nominator.lock after taking activeQLock, + // you should take nominator.lock first if you need two locks, + // otherwise the queue could end up deadlock. + activeQLock sync.RWMutex + // inFlightPods holds the UID of all pods which have been popped out for which Done // hasn't been called yet - in other words, all pods that are currently being // processed (being scheduled, in permit, or in the binding cycle). @@ -172,6 +181,8 @@ type PriorityQueue struct { // The values in the map are the entry of each pod in the inFlightEvents list. // The value of that entry is the *v1.Pod at the time that scheduling of that // pod started, which can be useful for logging or debugging. + // + // It should be protected by activeQLock. inFlightPods map[types.UID]*list.Element // inFlightEvents holds the events received by the scheduling queue @@ -187,10 +198,12 @@ type PriorityQueue struct { // After removal of a pod, events at the start of the list are no // longer needed because all of the other in-flight pods started // later. Those events can be removed. + // + // It should be protected by activeQLock. inFlightEvents *list.List // activeQ is heap structure that scheduler actively looks at to find pods to - // schedule. Head of heap is the highest priority pod. + // schedule. Head of heap is the highest priority pod. It should be protected by activeQLock. activeQ *heap.Heap // podBackoffQ is a heap ordered by backoff expiry. Pods which have completed backoff // are popped from this heap before the scheduler looks at activeQ @@ -199,6 +212,7 @@ type PriorityQueue struct { unschedulablePods *UnschedulablePods // schedulingCycle represents sequence number of scheduling cycle and is incremented // when a pod is popped. + // It should be protected by activeQLock. schedulingCycle int64 // moveRequestCycle caches the sequence number of scheduling cycle when we // received a move request. Unschedulable pods in and before this scheduling @@ -214,6 +228,7 @@ type PriorityQueue struct { // closed indicates that the queue is closed. // It is mainly used to let Pop() exit its control loop while waiting for an item. + // It should be protected by activeQLock. closed bool nsLister listersv1.NamespaceLister @@ -383,7 +398,7 @@ func NewPriorityQueue( moveRequestCycle: -1, isSchedulingQueueHintEnabled: utilfeature.DefaultFeatureGate.Enabled(features.SchedulerQueueingHints), } - pq.cond.L = &pq.lock + pq.cond.L = &pq.activeQLock pq.podBackoffQ = heap.NewWithRecorder(podInfoKeyFunc, pq.podsCompareBackoffCompleted, metrics.NewBackoffPodsRecorder()) pq.nsLister = informerFactory.Core().V1().Namespaces().Lister() @@ -555,13 +570,24 @@ func (p *PriorityQueue) runPreEnqueuePlugin(ctx context.Context, pl framework.Pr return s } -// addToActiveQ tries to add pod to active queue. It returns 2 parameters: +// moveToActiveQ tries to add pod to active queue and remove it from unschedulable and backoff queues. +// It returns 2 parameters: // 1. a boolean flag to indicate whether the pod is added successfully. // 2. an error for the caller to act on. -func (p *PriorityQueue) addToActiveQ(logger klog.Logger, pInfo *framework.QueuedPodInfo) (bool, error) { +func (p *PriorityQueue) moveToActiveQ(logger klog.Logger, pInfo *framework.QueuedPodInfo, event string) (bool, error) { + gatedBefore := pInfo.Gated pInfo.Gated = !p.runPreEnqueuePlugins(context.Background(), pInfo) + + p.activeQLock.Lock() + defer p.activeQLock.Unlock() if pInfo.Gated { // Add the Pod to unschedulablePods if it's not passing PreEnqueuePlugins. + if _, exists, _ := p.activeQ.Get(pInfo); exists { + return false, nil + } + if _, exists, _ := p.podBackoffQ.Get(pInfo); exists { + return false, nil + } p.unschedulablePods.addOrUpdate(pInfo) return false, nil } @@ -569,10 +595,20 @@ func (p *PriorityQueue) addToActiveQ(logger klog.Logger, pInfo *framework.Queued now := p.clock.Now() pInfo.InitialAttemptTimestamp = &now } + if err := p.activeQ.Add(pInfo); err != nil { logger.Error(err, "Error adding pod to the active queue", "pod", klog.KObj(pInfo.Pod)) return false, err } + + p.unschedulablePods.delete(pInfo.Pod, gatedBefore) + _ = p.podBackoffQ.Delete(pInfo) // Don't need to react when pInfo is not found. + logger.V(5).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pInfo.Pod), "event", event, "queue", activeQ) + metrics.SchedulerQueueIncomingPods.WithLabelValues("active", event).Inc() + if event == PodAdd || event == PodUpdate { + p.addNominatedPodUnlocked(logger, pInfo.PodInfo, nil) + } + return true, nil } @@ -583,21 +619,9 @@ func (p *PriorityQueue) Add(logger klog.Logger, pod *v1.Pod) error { defer p.lock.Unlock() pInfo := p.newQueuedPodInfo(pod) - gated := pInfo.Gated - if added, err := p.addToActiveQ(logger, pInfo); !added { + if added, err := p.moveToActiveQ(logger, pInfo, PodAdd); !added { return err } - if p.unschedulablePods.get(pod) != nil { - logger.Error(nil, "Error: pod is already in the unschedulable queue", "pod", klog.KObj(pod)) - p.unschedulablePods.delete(pod, gated) - } - // Delete pod from backoffQ if it is backing off - if err := p.podBackoffQ.Delete(pInfo); err == nil { - logger.Error(nil, "Error: pod is already in the podBackoff queue", "pod", klog.KObj(pod)) - } - logger.V(5).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pod), "event", PodAdd, "queue", activeQ) - metrics.SchedulerQueueIncomingPods.WithLabelValues("active", PodAdd).Inc() - p.addNominatedPodUnlocked(logger, pInfo.PodInfo, nil) p.cond.Broadcast() return nil @@ -620,9 +644,16 @@ func (p *PriorityQueue) Activate(logger klog.Logger, pods map[string]*v1.Pod) { } } +func (p *PriorityQueue) existsInActiveQ(pInfo *framework.QueuedPodInfo) bool { + p.activeQLock.RLock() + defer p.activeQLock.RUnlock() + _, exists, _ := p.activeQ.Get(pInfo) + return exists +} + func (p *PriorityQueue) activate(logger klog.Logger, pod *v1.Pod) bool { // Verify if the pod is present in activeQ. - if _, exists, _ := p.activeQ.Get(newQueuedPodInfoForLookup(pod)); exists { + if p.existsInActiveQ(newQueuedPodInfoForLookup(pod)) { // No need to activate if it's already present in activeQ. return false } @@ -644,15 +675,8 @@ func (p *PriorityQueue) activate(logger klog.Logger, pod *v1.Pod) bool { return false } - gated := pInfo.Gated - if added, _ := p.addToActiveQ(logger, pInfo); !added { - return false - } - p.unschedulablePods.delete(pInfo.Pod, gated) - p.podBackoffQ.Delete(pInfo) - metrics.SchedulerQueueIncomingPods.WithLabelValues("active", ForceActivate).Inc() - p.addNominatedPodUnlocked(logger, pInfo.PodInfo, nil) - return true + added, _ := p.moveToActiveQ(logger, pInfo, ForceActivate) + return added } // isPodBackingoff returns true if a pod is still waiting for its backoff timer. @@ -667,14 +691,29 @@ func (p *PriorityQueue) isPodBackingoff(podInfo *framework.QueuedPodInfo) bool { // SchedulingCycle returns current scheduling cycle. func (p *PriorityQueue) SchedulingCycle() int64 { - p.lock.RLock() - defer p.lock.RUnlock() + p.activeQLock.RLock() + defer p.activeQLock.RUnlock() return p.schedulingCycle } -// determineSchedulingHintForInFlightPod looks at the unschedulable plugins of the given Pod -// and determines the scheduling hint for this Pod while checking the events that happened during in-flight. -func (p *PriorityQueue) determineSchedulingHintForInFlightPod(logger klog.Logger, pInfo *framework.QueuedPodInfo) queueingStrategy { +// clusterEventsSinceElementUnlocked gets all cluster events that have happened during this inFlightPod is being scheduled. +// Note: this function assumes activeQLock to be locked by the caller. +func (p *PriorityQueue) clusterEventsSinceElementUnlocked(inFlightPod *list.Element) []*clusterEvent { + var events []*clusterEvent + for event := inFlightPod.Next(); event != nil; event = event.Next() { + e, ok := event.Value.(*clusterEvent) + if !ok { + // Must be another in-flight Pod (*v1.Pod). Can be ignored. + continue + } + events = append(events, e) + } + return events +} + +func (p *PriorityQueue) clusterEventsForPod(logger klog.Logger, pInfo *framework.QueuedPodInfo) ([]*clusterEvent, error) { + p.activeQLock.RLock() + defer p.activeQLock.RUnlock() logger.V(5).Info("Checking events for in-flight pod", "pod", klog.KObj(pInfo.Pod), "unschedulablePlugins", pInfo.UnschedulablePlugins, "inFlightEventsSize", p.inFlightEvents.Len(), "inFlightPodsSize", len(p.inFlightPods)) // AddUnschedulableIfNotPresent is called with the Pod at the end of scheduling or binding. @@ -682,7 +721,18 @@ func (p *PriorityQueue) determineSchedulingHintForInFlightPod(logger klog.Logger // we can assume pInfo must be recorded in inFlightPods and thus inFlightEvents. inFlightPod, ok := p.inFlightPods[pInfo.Pod.UID] if !ok { - logger.Error(nil, "In flight Pod isn't found in the scheduling queue. If you see this error log, it's likely a bug in the scheduler.", "pod", klog.KObj(pInfo.Pod)) + return nil, fmt.Errorf("in flight Pod isn't found in the scheduling queue. If you see this error log, it's likely a bug in the scheduler") + } + + return p.clusterEventsSinceElementUnlocked(inFlightPod), nil +} + +// determineSchedulingHintForInFlightPod looks at the unschedulable plugins of the given Pod +// and determines the scheduling hint for this Pod while checking the events that happened during in-flight. +func (p *PriorityQueue) determineSchedulingHintForInFlightPod(logger klog.Logger, pInfo *framework.QueuedPodInfo) queueingStrategy { + events, err := p.clusterEventsForPod(logger, pInfo) + if err != nil { + logger.Error(err, "Error getting cluster events for pod", "pod", klog.KObj(pInfo.Pod)) return queueAfterBackoff } @@ -696,12 +746,7 @@ func (p *PriorityQueue) determineSchedulingHintForInFlightPod(logger klog.Logger // check if there is an event that makes this Pod schedulable based on pInfo.UnschedulablePlugins. queueingStrategy := queueSkip - for event := inFlightPod.Next(); event != nil; event = event.Next() { - e, ok := event.Value.(*clusterEvent) - if !ok { - // Must be another in-flight Pod (*v1.Pod). Can be ignored. - continue - } + for _, e := range events { logger.V(5).Info("Checking event for in-flight pod", "pod", klog.KObj(pInfo.Pod), "event", e.event.Label) switch p.isPodWorthRequeuing(logger, pInfo, e.event, e.oldObj, e.newObj) { @@ -775,14 +820,14 @@ func (p *PriorityQueue) AddUnschedulableIfNotPresent(logger klog.Logger, pInfo * defer p.lock.Unlock() // In any case, this Pod will be moved back to the queue and we should call Done. - defer p.done(pInfo.Pod.UID) + defer p.Done(pInfo.Pod.UID) pod := pInfo.Pod if p.unschedulablePods.get(pod) != nil { return fmt.Errorf("Pod %v is already present in unschedulable queue", klog.KObj(pod)) } - if _, exists, _ := p.activeQ.Get(pInfo); exists { + if p.existsInActiveQ(pInfo) { return fmt.Errorf("Pod %v is already present in the active queue", klog.KObj(pod)) } if _, exists, _ := p.podBackoffQ.Get(pInfo); exists { @@ -839,9 +884,7 @@ func (p *PriorityQueue) flushBackoffQCompleted(logger klog.Logger) { logger.Error(err, "Unable to pop pod from backoff queue despite backoff completion", "pod", klog.KObj(pod)) break } - if added, _ := p.addToActiveQ(logger, pInfo); added { - logger.V(5).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pod), "event", BackoffComplete, "queue", activeQ) - metrics.SchedulerQueueIncomingPods.WithLabelValues("active", BackoffComplete).Inc() + if added, _ := p.moveToActiveQ(logger, pInfo, BackoffComplete); added { activated = true } } @@ -874,9 +917,11 @@ func (p *PriorityQueue) flushUnschedulablePodsLeftover(logger klog.Logger) { // Pop removes the head of the active queue and returns it. It blocks if the // activeQ is empty and waits until a new item is added to the queue. It // increments scheduling cycle when a pod is popped. +// Note: This method should NOT be locked by the p.lock at any moment, +// as it would lead to scheduling throughput degradation. func (p *PriorityQueue) Pop(logger klog.Logger) (*framework.QueuedPodInfo, error) { - p.lock.Lock() - defer p.lock.Unlock() + p.activeQLock.Lock() + defer p.activeQLock.Unlock() for p.activeQ.Len() == 0 { // When the queue is empty, invocation of Pop() is blocked until new item is enqueued. // When Close() is called, the p.closed is set and the condition is broadcast, @@ -912,8 +957,8 @@ func (p *PriorityQueue) Pop(logger klog.Logger) (*framework.QueuedPodInfo, error // Done must be called for pod returned by Pop. This allows the queue to // keep track of which pods are currently being processed. func (p *PriorityQueue) Done(pod types.UID) { - p.lock.Lock() - defer p.lock.Unlock() + p.activeQLock.Lock() + defer p.activeQLock.Unlock() p.done(pod) } @@ -972,6 +1017,17 @@ func isPodUpdated(oldPod, newPod *v1.Pod) bool { return !reflect.DeepEqual(strip(oldPod), strip(newPod)) } +func (p *PriorityQueue) updateInActiveQueue(logger klog.Logger, oldPod, newPod *v1.Pod, oldPodInfo *framework.QueuedPodInfo) (bool, error) { + p.activeQLock.Lock() + defer p.activeQLock.Unlock() + if oldPodInfo, exists, _ := p.activeQ.Get(oldPodInfo); exists { + pInfo := updatePod(oldPodInfo, newPod) + p.updateNominatedPodUnlocked(logger, oldPod, pInfo.PodInfo) + return true, p.activeQ.Update(pInfo) + } + return false, nil +} + // Update updates a pod in the active or backoff queue if present. Otherwise, it removes // the item from the unschedulable queue if pod is updated in a way that it may // become schedulable and adds the updated one to the active queue. @@ -981,6 +1037,7 @@ func (p *PriorityQueue) Update(logger klog.Logger, oldPod, newPod *v1.Pod) error defer p.lock.Unlock() if p.isSchedulingQueueHintEnabled { + p.activeQLock.Lock() // the inflight pod will be requeued using the latest version from the informer cache, which matches what the event delivers. if _, ok := p.inFlightPods[newPod.UID]; ok { logger.V(6).Info("The pod doesn't be queued for now because it's being scheduled and will be queued back if necessary", "pod", klog.KObj(newPod)) @@ -995,17 +1052,17 @@ func (p *PriorityQueue) Update(logger klog.Logger, oldPod, newPod *v1.Pod) error newObj: newPod, }) + p.activeQLock.Unlock() return nil } + p.activeQLock.Unlock() } if oldPod != nil { oldPodInfo := newQueuedPodInfoForLookup(oldPod) // If the pod is already in the active queue, just update it there. - if oldPodInfo, exists, _ := p.activeQ.Get(oldPodInfo); exists { - pInfo := updatePod(oldPodInfo, newPod) - p.updateNominatedPodUnlocked(logger, oldPod, pInfo.PodInfo) - return p.activeQ.Update(pInfo) + if exists, err := p.updateInActiveQueue(logger, oldPod, newPod, oldPodInfo); exists { + return err } // If the pod is in the backoff queue, update it there. @@ -1048,11 +1105,9 @@ func (p *PriorityQueue) Update(logger klog.Logger, oldPod, newPod *v1.Pod) error return nil } - if added, err := p.addToActiveQ(logger, pInfo); !added { + if added, err := p.moveToActiveQ(logger, pInfo, BackoffComplete); !added { return err } - p.unschedulablePods.delete(usPodInfo.Pod, gated) - logger.V(5).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pInfo.Pod), "event", BackoffComplete, "queue", activeQ) p.cond.Broadcast() return nil } @@ -1063,11 +1118,9 @@ func (p *PriorityQueue) Update(logger klog.Logger, oldPod, newPod *v1.Pod) error } // If pod is not in any of the queues, we put it in the active queue. pInfo := p.newQueuedPodInfo(newPod) - if added, err := p.addToActiveQ(logger, pInfo); !added { + if added, err := p.moveToActiveQ(logger, pInfo, PodUpdate); !added { return err } - p.addNominatedPodUnlocked(logger, pInfo.PodInfo, nil) - logger.V(5).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pInfo.Pod), "event", PodUpdate, "queue", activeQ) p.cond.Broadcast() return nil } @@ -1079,6 +1132,8 @@ func (p *PriorityQueue) Delete(pod *v1.Pod) error { defer p.lock.Unlock() p.deleteNominatedPodIfExistsUnlocked(pod) pInfo := newQueuedPodInfoForLookup(pod) + p.activeQLock.Lock() + defer p.activeQLock.Unlock() if err := p.activeQ.Delete(pInfo); err != nil { // The item was probably not found in the activeQ. p.podBackoffQ.Delete(pInfo) @@ -1186,12 +1241,11 @@ func (p *PriorityQueue) requeuePodViaQueueingHint(logger klog.Logger, pInfo *fra // Reach here if schedulingHint is QueueImmediately, or schedulingHint is Queue but the pod is not backing off. - added, err := p.addToActiveQ(logger, pInfo) + added, err := p.moveToActiveQ(logger, pInfo, event) if err != nil { logger.Error(err, "Error adding pod to the active queue, queue this Pod to unschedulable pod pool", "pod", klog.KObj(pod)) } if added { - metrics.SchedulerQueueIncomingPods.WithLabelValues("active", event).Inc() return activeQ } if pInfo.Gated { @@ -1247,6 +1301,8 @@ func (p *PriorityQueue) movePodsToActiveOrBackoffQueue(logger klog.Logger, podIn } } + p.activeQLock.Lock() + defer p.activeQLock.Unlock() p.moveRequestCycle = p.schedulingCycle if p.isSchedulingQueueHintEnabled && len(p.inFlightPods) != 0 { @@ -1293,10 +1349,9 @@ func (p *PriorityQueue) getUnschedulablePodsWithCrossTopologyTerm(logger klog.Lo } // PodsInActiveQ returns all the Pods in the activeQ. -// This function is only used in tests. func (p *PriorityQueue) PodsInActiveQ() []*v1.Pod { - p.lock.RLock() - defer p.lock.RUnlock() + p.activeQLock.RLock() + defer p.activeQLock.RUnlock() var result []*v1.Pod for _, pInfo := range p.activeQ.List() { result = append(result, pInfo.(*framework.QueuedPodInfo).Pod) @@ -1312,17 +1367,15 @@ var pendingPodsSummary = "activeQ:%v; backoffQ:%v; unschedulablePods:%v" func (p *PriorityQueue) PendingPods() ([]*v1.Pod, string) { p.lock.RLock() defer p.lock.RUnlock() - var result []*v1.Pod - for _, pInfo := range p.activeQ.List() { - result = append(result, pInfo.(*framework.QueuedPodInfo).Pod) - } + result := p.PodsInActiveQ() + activeQLen := len(result) for _, pInfo := range p.podBackoffQ.List() { result = append(result, pInfo.(*framework.QueuedPodInfo).Pod) } for _, pInfo := range p.unschedulablePods.podInfoMap { result = append(result, pInfo.Pod) } - return result, fmt.Sprintf(pendingPodsSummary, p.activeQ.Len(), p.podBackoffQ.Len(), len(p.unschedulablePods.podInfoMap)) + return result, fmt.Sprintf(pendingPodsSummary, activeQLen, p.podBackoffQ.Len(), len(p.unschedulablePods.podInfoMap)) } // Close closes the priority queue. @@ -1330,7 +1383,10 @@ func (p *PriorityQueue) Close() { p.lock.Lock() defer p.lock.Unlock() close(p.stop) + p.activeQLock.Lock() + // closed field is locked by activeQLock as it is checked in Pop() without p.lock set. p.closed = true + p.activeQLock.Unlock() p.cond.Broadcast() } diff --git a/pkg/scheduler/internal/queue/scheduling_queue_test.go b/pkg/scheduler/internal/queue/scheduling_queue_test.go index 88131fa3017..edd57a61f39 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue_test.go +++ b/pkg/scheduler/internal/queue/scheduling_queue_test.go @@ -1405,7 +1405,7 @@ func TestPriorityQueue_addToActiveQ(t *testing.T) { m := map[string][]framework.PreEnqueuePlugin{"": tt.plugins} q := NewTestQueueWithObjects(ctx, newDefaultQueueSort(), []runtime.Object{tt.pod}, WithPreEnqueuePluginMap(m), WithPodInitialBackoffDuration(time.Second*30), WithPodMaxBackoffDuration(time.Second*60)) - got, _ := q.addToActiveQ(logger, q.newQueuedPodInfo(tt.pod)) + got, _ := q.moveToActiveQ(logger, q.newQueuedPodInfo(tt.pod), PodAdd) if got != tt.wantSuccess { t.Errorf("Unexpected result: want %v, but got %v", tt.wantSuccess, got) }