From 8e630a9f68633a84285303135f49a9c8f1a44725 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maciej=20Skocze=C5=84?= Date: Mon, 22 Jul 2024 10:13:13 +0000 Subject: [PATCH] Move activeQ related fields to separate struct in scheduling queue --- pkg/scheduler/internal/queue/active_queue.go | 337 ++++++++++++++++++ .../internal/queue/scheduling_queue.go | 287 +++------------ .../internal/queue/scheduling_queue_test.go | 153 ++++---- 3 files changed, 465 insertions(+), 312 deletions(-) create mode 100644 pkg/scheduler/internal/queue/active_queue.go diff --git a/pkg/scheduler/internal/queue/active_queue.go b/pkg/scheduler/internal/queue/active_queue.go new file mode 100644 index 00000000000..91149bf5752 --- /dev/null +++ b/pkg/scheduler/internal/queue/active_queue.go @@ -0,0 +1,337 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package queue + +import ( + "container/list" + "fmt" + "sync" + + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/scheduler/framework" + "k8s.io/kubernetes/pkg/scheduler/internal/heap" + "k8s.io/kubernetes/pkg/scheduler/metrics" +) + +// activeQueuer is a wrapper for activeQ related operations. +// Its methods, except "unlocked" ones, take the lock inside. +// Note: be careful when using unlocked() methods. +// getLock() methods should be used only for unlocked() methods +// and it is forbidden to call any other activeQueuer's method under this lock. +type activeQueuer interface { + getLock() *sync.RWMutex + unlocked() unlockedActiveQueuer + + pop(logger klog.Logger) (*framework.QueuedPodInfo, error) + list() []*v1.Pod + len() int + has(pInfo *framework.QueuedPodInfo) bool + + listInFlightEvents() []interface{} + listInFlightPods() []*v1.Pod + clusterEventsForPod(logger klog.Logger, pInfo *framework.QueuedPodInfo) ([]*clusterEvent, error) + addEventIfPodInFlight(oldPod, newPod *v1.Pod, event framework.ClusterEvent) bool + addEventIfAnyInFlight(oldObj, newObj interface{}, event framework.ClusterEvent) bool + + schedulingCycle() int64 + done(pod types.UID) + close() + broadcast() +} + +// unlockedActiveQueuer defines activeQ methods that are not protected by the lock itself. +// getLock() methods should be used to protect these methods. +type unlockedActiveQueuer interface { + Get(pInfo *framework.QueuedPodInfo) (*framework.QueuedPodInfo, bool) + Has(pInfo *framework.QueuedPodInfo) bool + AddOrUpdate(pInfo *framework.QueuedPodInfo) + Delete(pInfo *framework.QueuedPodInfo) error +} + +// activeQueue implements activeQueuer. All of the fields have to be protected using the lock. +type activeQueue struct { + // lock synchronizes all operations related to activeQ. + // It protects activeQ, inFlightPods, inFlightEvents, schedulingCycle and closed fields. + // Caution: DO NOT take "SchedulingQueue.lock" after taking "lock". + // You should always take "SchedulingQueue.lock" first, otherwise the queue could end up in deadlock. + // "lock" should not be taken after taking "nLock". + // Correct locking order is: SchedulingQueue.lock > lock > nominator.nLock. + lock sync.RWMutex + + // activeQ is heap structure that scheduler actively looks at to find pods to + // schedule. Head of heap is the highest priority pod. + queue *heap.Heap[*framework.QueuedPodInfo] + + // cond is a condition that is notified when the pod is added to activeQ. + // It is used with lock. + cond sync.Cond + + // inFlightPods holds the UID of all pods which have been popped out for which Done + // hasn't been called yet - in other words, all pods that are currently being + // processed (being scheduled, in permit, or in the binding cycle). + // + // The values in the map are the entry of each pod in the inFlightEvents list. + // The value of that entry is the *v1.Pod at the time that scheduling of that + // pod started, which can be useful for logging or debugging. + inFlightPods map[types.UID]*list.Element + + // inFlightEvents holds the events received by the scheduling queue + // (entry value is clusterEvent) together with in-flight pods (entry + // value is *v1.Pod). Entries get added at the end while the mutex is + // locked, so they get serialized. + // + // The pod entries are added in Pop and used to track which events + // occurred after the pod scheduling attempt for that pod started. + // They get removed when the scheduling attempt is done, at which + // point all events that occurred in the meantime are processed. + // + // After removal of a pod, events at the start of the list are no + // longer needed because all of the other in-flight pods started + // later. Those events can be removed. + inFlightEvents *list.List + + // schedCycle represents sequence number of scheduling cycle and is incremented + // when a pod is popped. + schedCycle int64 + + // closed indicates that the queue is closed. + // It is mainly used to let Pop() exit its control loop while waiting for an item. + closed bool + + // isSchedulingQueueHintEnabled indicates whether the feature gate for the scheduling queue is enabled. + isSchedulingQueueHintEnabled bool +} + +func newActiveQueue(queue *heap.Heap[*framework.QueuedPodInfo], isSchedulingQueueHintEnabled bool) *activeQueue { + aq := &activeQueue{ + queue: queue, + inFlightPods: make(map[types.UID]*list.Element), + inFlightEvents: list.New(), + isSchedulingQueueHintEnabled: isSchedulingQueueHintEnabled, + } + aq.cond.L = &aq.lock + + return aq +} + +// getLock returns lock of activeQueue. Its methods should be used only to protect the unlocked() methods. +func (aq *activeQueue) getLock() *sync.RWMutex { + return &aq.lock +} + +// unlocked returns queue methods, that are not protected by the lock itself. +// getLock() methods should be used to protect queue methods. +func (aq *activeQueue) unlocked() unlockedActiveQueuer { + return aq.queue +} + +// pop removes the head of the queue and returns it. +// It blocks if the queue is empty and waits until a new item is added to the queue. +// It increments scheduling cycle when a pod is popped. +func (aq *activeQueue) pop(logger klog.Logger) (*framework.QueuedPodInfo, error) { + aq.lock.Lock() + defer aq.lock.Unlock() + for aq.queue.Len() == 0 { + // When the queue is empty, invocation of Pop() is blocked until new item is enqueued. + // When Close() is called, the p.closed is set and the condition is broadcast, + // which causes this loop to continue and return from the Pop(). + if aq.closed { + logger.V(2).Info("Scheduling queue is closed") + return nil, nil + } + aq.cond.Wait() + } + pInfo, err := aq.queue.Pop() + if err != nil { + return nil, err + } + pInfo.Attempts++ + aq.schedCycle++ + // In flight, no concurrent events yet. + if aq.isSchedulingQueueHintEnabled { + aq.inFlightPods[pInfo.Pod.UID] = aq.inFlightEvents.PushBack(pInfo.Pod) + } + + // Update metrics and reset the set of unschedulable plugins for the next attempt. + for plugin := range pInfo.UnschedulablePlugins.Union(pInfo.PendingPlugins) { + metrics.UnschedulableReason(plugin, pInfo.Pod.Spec.SchedulerName).Dec() + } + pInfo.UnschedulablePlugins.Clear() + pInfo.PendingPlugins.Clear() + + return pInfo, nil +} + +// list returns all pods that are in the queue. +func (aq *activeQueue) list() []*v1.Pod { + aq.lock.RLock() + defer aq.lock.RUnlock() + var result []*v1.Pod + for _, pInfo := range aq.queue.List() { + result = append(result, pInfo.Pod) + } + return result +} + +// len returns length of the queue. +func (aq *activeQueue) len() int { + return aq.queue.Len() +} + +// has inform if pInfo exists in the queue. +func (aq *activeQueue) has(pInfo *framework.QueuedPodInfo) bool { + aq.lock.RLock() + defer aq.lock.RUnlock() + return aq.queue.Has(pInfo) +} + +// listInFlightEvents returns all inFlightEvents. +func (aq *activeQueue) listInFlightEvents() []interface{} { + aq.lock.RLock() + defer aq.lock.RUnlock() + var values []interface{} + for event := aq.inFlightEvents.Front(); event != nil; event = event.Next() { + values = append(values, event.Value) + } + return values +} + +// listInFlightPods returns all inFlightPods. +func (aq *activeQueue) listInFlightPods() []*v1.Pod { + aq.lock.RLock() + defer aq.lock.RUnlock() + var pods []*v1.Pod + for _, obj := range aq.inFlightPods { + pods = append(pods, obj.Value.(*v1.Pod)) + } + return pods +} + +// clusterEventsForPod gets all cluster events that have happened during pod for pInfo is being scheduled. +func (aq *activeQueue) clusterEventsForPod(logger klog.Logger, pInfo *framework.QueuedPodInfo) ([]*clusterEvent, error) { + aq.lock.RLock() + defer aq.lock.RUnlock() + logger.V(5).Info("Checking events for in-flight pod", "pod", klog.KObj(pInfo.Pod), "unschedulablePlugins", pInfo.UnschedulablePlugins, "inFlightEventsSize", aq.inFlightEvents.Len(), "inFlightPodsSize", len(aq.inFlightPods)) + + // AddUnschedulableIfNotPresent is called with the Pod at the end of scheduling or binding. + // So, given pInfo should have been Pop()ed before, + // we can assume pInfo must be recorded in inFlightPods and thus inFlightEvents. + inFlightPod, ok := aq.inFlightPods[pInfo.Pod.UID] + if !ok { + return nil, fmt.Errorf("in flight Pod isn't found in the scheduling queue. If you see this error log, it's likely a bug in the scheduler") + } + + var events []*clusterEvent + for event := inFlightPod.Next(); event != nil; event = event.Next() { + e, ok := event.Value.(*clusterEvent) + if !ok { + // Must be another in-flight Pod (*v1.Pod). Can be ignored. + continue + } + events = append(events, e) + } + return events, nil +} + +// addEventIfPodInFlight adds clusterEvent to inFlightEvents if the newPod is in inFlightPods. +// It returns true if pushed the event to the inFlightEvents. +func (aq *activeQueue) addEventIfPodInFlight(oldPod, newPod *v1.Pod, event framework.ClusterEvent) bool { + aq.lock.Lock() + defer aq.lock.Unlock() + + _, ok := aq.inFlightPods[newPod.UID] + if ok { + aq.inFlightEvents.PushBack(&clusterEvent{ + event: event, + oldObj: oldPod, + newObj: newPod, + }) + } + return ok +} + +// addEventIfAnyInFlight adds clusterEvent to inFlightEvents if any pod is in inFlightPods. +// It returns true if pushed the event to the inFlightEvents. +func (aq *activeQueue) addEventIfAnyInFlight(oldObj, newObj interface{}, event framework.ClusterEvent) bool { + aq.lock.Lock() + defer aq.lock.Unlock() + + if len(aq.inFlightPods) != 0 { + aq.inFlightEvents.PushBack(&clusterEvent{ + event: event, + oldObj: oldObj, + newObj: newObj, + }) + return true + } + return false +} + +func (aq *activeQueue) schedulingCycle() int64 { + aq.lock.RLock() + defer aq.lock.RUnlock() + return aq.schedCycle +} + +// done must be called for pod returned by Pop. This allows the queue to +// keep track of which pods are currently being processed. +func (aq *activeQueue) done(pod types.UID) { + aq.lock.Lock() + defer aq.lock.Unlock() + + inFlightPod, ok := aq.inFlightPods[pod] + if !ok { + // This Pod is already done()ed. + return + } + delete(aq.inFlightPods, pod) + + // Remove the pod from the list. + aq.inFlightEvents.Remove(inFlightPod) + + // Remove events which are only referred to by this Pod + // so that the inFlightEvents list doesn't grow infinitely. + // If the pod was at the head of the list, then all + // events between it and the next pod are no longer needed + // and can be removed. + for { + e := aq.inFlightEvents.Front() + if e == nil { + // Empty list. + break + } + if _, ok := e.Value.(*clusterEvent); !ok { + // A pod, must stop pruning. + break + } + aq.inFlightEvents.Remove(e) + } +} + +// close closes the activeQueue. +func (aq *activeQueue) close() { + aq.lock.Lock() + aq.closed = true + aq.lock.Unlock() +} + +// broadcast notifies the pop() operation that new pod(s) was added to the activeQueue. +func (aq *activeQueue) broadcast() { + aq.cond.Broadcast() +} diff --git a/pkg/scheduler/internal/queue/scheduling_queue.go b/pkg/scheduler/internal/queue/scheduling_queue.go index e4b6f895d77..60309da1e30 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue.go +++ b/pkg/scheduler/internal/queue/scheduling_queue.go @@ -27,7 +27,6 @@ limitations under the License. package queue import ( - "container/list" "context" "fmt" "math/rand" @@ -159,8 +158,8 @@ type PriorityQueue struct { clock clock.Clock // lock takes precedence and should be taken first, - // before any other locks in the queue (activeQLock or nominator.nLock). - // Correct locking order is: lock > activeQLock > nominator.nLock. + // before any other locks in the queue (activeQueue.lock or nominator.nLock). + // Correct locking order is: lock > activeQueue.lock > nominator.nLock. lock sync.RWMutex // pod initial backoff duration. @@ -170,58 +169,12 @@ type PriorityQueue struct { // the maximum time a pod can stay in the unschedulablePods. podMaxInUnschedulablePodsDuration time.Duration - // cond is a condition that is notified when the pod is added to activeQ. - // It is used with activeQLock. - cond sync.Cond - - // activeQLock synchronizes all operations related to activeQ. - // It protects activeQ, inFlightPods, inFlightEvents, schedulingCycle and closed fields. - // Caution: DO NOT take "lock" after taking "activeQLock". - // You should always take "lock" first, otherwise the queue could end up in deadlock. - // "activeQLock" should not be taken after taking "nLock". - // Correct locking order is: lock > activeQLock > nominator.nLock. - activeQLock sync.RWMutex - - // inFlightPods holds the UID of all pods which have been popped out for which Done - // hasn't been called yet - in other words, all pods that are currently being - // processed (being scheduled, in permit, or in the binding cycle). - // - // The values in the map are the entry of each pod in the inFlightEvents list. - // The value of that entry is the *v1.Pod at the time that scheduling of that - // pod started, which can be useful for logging or debugging. - // - // It should be protected by activeQLock. - inFlightPods map[types.UID]*list.Element - - // inFlightEvents holds the events received by the scheduling queue - // (entry value is clusterEvent) together with in-flight pods (entry - // value is *v1.Pod). Entries get added at the end while the mutex is - // locked, so they get serialized. - // - // The pod entries are added in Pop and used to track which events - // occurred after the pod scheduling attempt for that pod started. - // They get removed when the scheduling attempt is done, at which - // point all events that occurred in the meantime are processed. - // - // After removal of a pod, events at the start of the list are no - // longer needed because all of the other in-flight pods started - // later. Those events can be removed. - // - // It should be protected by activeQLock. - inFlightEvents *list.List - - // activeQ is heap structure that scheduler actively looks at to find pods to - // schedule. Head of heap is the highest priority pod. It should be protected by activeQLock. - activeQ *heap.Heap[*framework.QueuedPodInfo] + activeQ activeQueuer // podBackoffQ is a heap ordered by backoff expiry. Pods which have completed backoff // are popped from this heap before the scheduler looks at activeQ podBackoffQ *heap.Heap[*framework.QueuedPodInfo] // unschedulablePods holds pods that have been tried and determined unschedulable. unschedulablePods *UnschedulablePods - // schedulingCycle represents sequence number of scheduling cycle and is incremented - // when a pod is popped. - // It should be protected by activeQLock. - schedulingCycle int64 // moveRequestCycle caches the sequence number of scheduling cycle when we // received a move request. Unschedulable pods in and before this scheduling // cycle will be put back to activeQueue if we were trying to schedule them @@ -234,11 +187,6 @@ type PriorityQueue struct { // queueingHintMap is keyed with profile name, valued with registered queueing hint functions. queueingHintMap QueueingHintMapPerProfile - // closed indicates that the queue is closed. - // It is mainly used to let Pop() exit its control loop while waiting for an item. - // It should be protected by activeQLock. - closed bool - nsLister listersv1.NamespaceLister metricsRecorder metrics.MetricAsyncRecorder @@ -382,24 +330,23 @@ func NewPriorityQueue( opt(&options) } + isSchedulingQueueHintEnabled := utilfeature.DefaultFeatureGate.Enabled(features.SchedulerQueueingHints) + pq := &PriorityQueue{ clock: options.clock, stop: make(chan struct{}), podInitialBackoffDuration: options.podInitialBackoffDuration, podMaxBackoffDuration: options.podMaxBackoffDuration, podMaxInUnschedulablePodsDuration: options.podMaxInUnschedulablePodsDuration, - activeQ: heap.NewWithRecorder(podInfoKeyFunc, heap.LessFunc[*framework.QueuedPodInfo](lessFn), metrics.NewActivePodsRecorder()), + activeQ: newActiveQueue(heap.NewWithRecorder(podInfoKeyFunc, heap.LessFunc[*framework.QueuedPodInfo](lessFn), metrics.NewActivePodsRecorder()), isSchedulingQueueHintEnabled), unschedulablePods: newUnschedulablePods(metrics.NewUnschedulablePodsRecorder(), metrics.NewGatedPodsRecorder()), - inFlightPods: make(map[types.UID]*list.Element), - inFlightEvents: list.New(), preEnqueuePluginMap: options.preEnqueuePluginMap, queueingHintMap: options.queueingHintMap, metricsRecorder: options.metricsRecorder, pluginMetricsSamplePercent: options.pluginMetricsSamplePercent, moveRequestCycle: -1, - isSchedulingQueueHintEnabled: utilfeature.DefaultFeatureGate.Enabled(features.SchedulerQueueingHints), + isSchedulingQueueHintEnabled: isSchedulingQueueHintEnabled, } - pq.cond.L = &pq.activeQLock pq.podBackoffQ = heap.NewWithRecorder(podInfoKeyFunc, pq.podsCompareBackoffCompleted, metrics.NewBackoffPodsRecorder()) pq.nsLister = informerFactory.Core().V1().Namespaces().Lister() pq.nominator = newPodNominator(options.podLister, pq.nominatedPodsToInfo) @@ -601,11 +548,11 @@ func (p *PriorityQueue) moveToActiveQ(logger klog.Logger, pInfo *framework.Queue gatedBefore := pInfo.Gated pInfo.Gated = !p.runPreEnqueuePlugins(context.Background(), pInfo) - p.activeQLock.Lock() - defer p.activeQLock.Unlock() + p.activeQ.getLock().Lock() + defer p.activeQ.getLock().Unlock() if pInfo.Gated { // Add the Pod to unschedulablePods if it's not passing PreEnqueuePlugins. - if p.activeQ.Has(pInfo) { + if p.activeQ.unlocked().Has(pInfo) { return false } if p.podBackoffQ.Has(pInfo) { @@ -619,7 +566,7 @@ func (p *PriorityQueue) moveToActiveQ(logger klog.Logger, pInfo *framework.Queue pInfo.InitialAttemptTimestamp = &now } - p.activeQ.AddOrUpdate(pInfo) + p.activeQ.unlocked().AddOrUpdate(pInfo) p.unschedulablePods.delete(pInfo.Pod, gatedBefore) _ = p.podBackoffQ.Delete(pInfo) // Don't need to react when pInfo is not found. @@ -640,7 +587,7 @@ func (p *PriorityQueue) Add(logger klog.Logger, pod *v1.Pod) { pInfo := p.newQueuedPodInfo(pod) if added := p.moveToActiveQ(logger, pInfo, framework.PodAdd); added { - p.cond.Broadcast() + p.activeQ.broadcast() } } @@ -657,16 +604,10 @@ func (p *PriorityQueue) Activate(logger klog.Logger, pods map[string]*v1.Pod) { } if activated { - p.cond.Broadcast() + p.activeQ.broadcast() } } -func (p *PriorityQueue) existsInActiveQ(pInfo *framework.QueuedPodInfo) bool { - p.activeQLock.RLock() - defer p.activeQLock.RUnlock() - return p.activeQ.Has(pInfo) -} - func (p *PriorityQueue) activate(logger klog.Logger, pod *v1.Pod) bool { var pInfo *framework.QueuedPodInfo // Verify if the pod is present in unschedulablePods or backoffQ. @@ -701,46 +642,13 @@ func (p *PriorityQueue) isPodBackingoff(podInfo *framework.QueuedPodInfo) bool { // SchedulingCycle returns current scheduling cycle. func (p *PriorityQueue) SchedulingCycle() int64 { - p.activeQLock.RLock() - defer p.activeQLock.RUnlock() - return p.schedulingCycle -} - -// clusterEventsSinceElementUnlocked gets all cluster events that have happened during this inFlightPod is being scheduled. -// Note: this function assumes activeQLock to be locked by the caller. -func (p *PriorityQueue) clusterEventsSinceElementUnlocked(inFlightPod *list.Element) []*clusterEvent { - var events []*clusterEvent - for event := inFlightPod.Next(); event != nil; event = event.Next() { - e, ok := event.Value.(*clusterEvent) - if !ok { - // Must be another in-flight Pod (*v1.Pod). Can be ignored. - continue - } - events = append(events, e) - } - return events -} - -func (p *PriorityQueue) clusterEventsForPod(logger klog.Logger, pInfo *framework.QueuedPodInfo) ([]*clusterEvent, error) { - p.activeQLock.RLock() - defer p.activeQLock.RUnlock() - logger.V(5).Info("Checking events for in-flight pod", "pod", klog.KObj(pInfo.Pod), "unschedulablePlugins", pInfo.UnschedulablePlugins, "inFlightEventsSize", p.inFlightEvents.Len(), "inFlightPodsSize", len(p.inFlightPods)) - - // AddUnschedulableIfNotPresent is called with the Pod at the end of scheduling or binding. - // So, given pInfo should have been Pop()ed before, - // we can assume pInfo must be recorded in inFlightPods and thus inFlightEvents. - inFlightPod, ok := p.inFlightPods[pInfo.Pod.UID] - if !ok { - return nil, fmt.Errorf("in flight Pod isn't found in the scheduling queue. If you see this error log, it's likely a bug in the scheduler") - } - - return p.clusterEventsSinceElementUnlocked(inFlightPod), nil + return p.activeQ.schedulingCycle() } // determineSchedulingHintForInFlightPod looks at the unschedulable plugins of the given Pod // and determines the scheduling hint for this Pod while checking the events that happened during in-flight. func (p *PriorityQueue) determineSchedulingHintForInFlightPod(logger klog.Logger, pInfo *framework.QueuedPodInfo) queueingStrategy { - events, err := p.clusterEventsForPod(logger, pInfo) + events, err := p.activeQ.clusterEventsForPod(logger, pInfo) if err != nil { logger.Error(err, "Error getting cluster events for pod", "pod", klog.KObj(pInfo.Pod)) return queueAfterBackoff @@ -834,7 +742,7 @@ func (p *PriorityQueue) AddUnschedulableIfNotPresent(logger klog.Logger, pInfo * return fmt.Errorf("Pod %v is already present in unschedulable queue", klog.KObj(pod)) } - if p.existsInActiveQ(pInfo) { + if p.activeQ.has(pInfo) { return fmt.Errorf("Pod %v is already present in the active queue", klog.KObj(pod)) } if p.podBackoffQ.Has(pInfo) { @@ -864,7 +772,7 @@ func (p *PriorityQueue) AddUnschedulableIfNotPresent(logger klog.Logger, pInfo * logger.V(3).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pod), "event", framework.ScheduleAttemptFailure, "queue", queue, "schedulingCycle", podSchedulingCycle, "hint", schedulingHint, "unschedulable plugins", rejectorPlugins) if queue == activeQ { // When the Pod is moved to activeQ, need to let p.cond know so that the Pod will be pop()ed out. - p.cond.Broadcast() + p.activeQ.broadcast() } return nil @@ -895,7 +803,7 @@ func (p *PriorityQueue) flushBackoffQCompleted(logger klog.Logger) { } if activated { - p.cond.Broadcast() + p.activeQ.broadcast() } } @@ -925,81 +833,18 @@ func (p *PriorityQueue) flushUnschedulablePodsLeftover(logger klog.Logger) { // Note: This method should NOT be locked by the p.lock at any moment, // as it would lead to scheduling throughput degradation. func (p *PriorityQueue) Pop(logger klog.Logger) (*framework.QueuedPodInfo, error) { - p.activeQLock.Lock() - defer p.activeQLock.Unlock() - for p.activeQ.Len() == 0 { - // When the queue is empty, invocation of Pop() is blocked until new item is enqueued. - // When Close() is called, the p.closed is set and the condition is broadcast, - // which causes this loop to continue and return from the Pop(). - if p.closed { - logger.V(2).Info("Scheduling queue is closed") - return nil, nil - } - p.cond.Wait() - } - pInfo, err := p.activeQ.Pop() - if err != nil { - return nil, err - } - pInfo.Attempts++ - p.schedulingCycle++ - // In flight, no concurrent events yet. - if p.isSchedulingQueueHintEnabled { - p.inFlightPods[pInfo.Pod.UID] = p.inFlightEvents.PushBack(pInfo.Pod) - } - - // Update metrics and reset the set of unschedulable plugins for the next attempt. - for plugin := range pInfo.UnschedulablePlugins.Union(pInfo.PendingPlugins) { - metrics.UnschedulableReason(plugin, pInfo.Pod.Spec.SchedulerName).Dec() - } - pInfo.UnschedulablePlugins.Clear() - pInfo.PendingPlugins.Clear() - - return pInfo, nil + return p.activeQ.pop(logger) } // Done must be called for pod returned by Pop. This allows the queue to // keep track of which pods are currently being processed. func (p *PriorityQueue) Done(pod types.UID) { - p.activeQLock.Lock() - defer p.activeQLock.Unlock() - - p.done(pod) -} - -func (p *PriorityQueue) done(pod types.UID) { if !p.isSchedulingQueueHintEnabled { // do nothing if schedulingQueueHint is disabled. // In that case, we don't have inFlightPods and inFlightEvents. return } - inFlightPod, ok := p.inFlightPods[pod] - if !ok { - // This Pod is already done()ed. - return - } - delete(p.inFlightPods, pod) - - // Remove the pod from the list. - p.inFlightEvents.Remove(inFlightPod) - - // Remove events which are only referred to by this Pod - // so that the inFlightEvents list doesn't grow infinitely. - // If the pod was at the head of the list, then all - // events between it and the next pod are no longer needed - // and can be removed. - for { - e := p.inFlightEvents.Front() - if e == nil { - // Empty list. - break - } - if _, ok := e.Value.(*clusterEvent); !ok { - // A pod, must stop pruning. - break - } - p.inFlightEvents.Remove(e) - } + p.activeQ.done(pod) } // isPodUpdated checks if the pod is updated in a way that it may have become @@ -1022,12 +867,12 @@ func isPodUpdated(oldPod, newPod *v1.Pod) bool { } func (p *PriorityQueue) updateInActiveQueue(logger klog.Logger, oldPod, newPod *v1.Pod, oldPodInfo *framework.QueuedPodInfo) bool { - p.activeQLock.Lock() - defer p.activeQLock.Unlock() - if pInfo, exists := p.activeQ.Get(oldPodInfo); exists { + p.activeQ.getLock().Lock() + defer p.activeQ.getLock().Unlock() + if pInfo, exists := p.activeQ.unlocked().Get(oldPodInfo); exists { _ = pInfo.Update(newPod) p.UpdateNominatedPod(logger, oldPod, pInfo.PodInfo) - p.activeQ.AddOrUpdate(pInfo) + p.activeQ.unlocked().AddOrUpdate(pInfo) return true } return false @@ -1042,25 +887,15 @@ func (p *PriorityQueue) Update(logger klog.Logger, oldPod, newPod *v1.Pod) { defer p.lock.Unlock() if p.isSchedulingQueueHintEnabled { - p.activeQLock.Lock() - // the inflight pod will be requeued using the latest version from the informer cache, which matches what the event delivers. - if _, ok := p.inFlightPods[newPod.UID]; ok { + // The inflight pod will be requeued using the latest version from the informer cache, which matches what the event delivers. + // Record this update as Pod/Update because + // this update may make the Pod schedulable in case it gets rejected and comes back to the queue. + // We can clean it up once we change updatePodInSchedulingQueue to call MoveAllToActiveOrBackoffQueue. + // See https://github.com/kubernetes/kubernetes/pull/125578#discussion_r1648338033 for more context. + if exists := p.activeQ.addEventIfPodInFlight(oldPod, newPod, framework.UnscheduledPodUpdate); exists { logger.V(6).Info("The pod doesn't be queued for now because it's being scheduled and will be queued back if necessary", "pod", klog.KObj(newPod)) - - // Record this update as Pod/Update because - // this update may make the Pod schedulable in case it gets rejected and comes back to the queue. - // We can clean it up once we change updatePodInSchedulingQueue to call MoveAllToActiveOrBackoffQueue. - // See https://github.com/kubernetes/kubernetes/pull/125578#discussion_r1648338033 for more context. - p.inFlightEvents.PushBack(&clusterEvent{ - event: framework.UnscheduledPodUpdate, - oldObj: oldPod, - newObj: newPod, - }) - - p.activeQLock.Unlock() return } - p.activeQLock.Unlock() } if oldPod != nil { @@ -1098,7 +933,7 @@ func (p *PriorityQueue) Update(logger klog.Logger, oldPod, newPod *v1.Pod) { p.unschedulablePods.delete(pInfo.Pod, gated) } if queue == activeQ { - p.cond.Broadcast() + p.activeQ.broadcast() break } } @@ -1113,7 +948,7 @@ func (p *PriorityQueue) Update(logger klog.Logger, oldPod, newPod *v1.Pod) { } if added := p.moveToActiveQ(logger, pInfo, framework.BackoffComplete); added { - p.cond.Broadcast() + p.activeQ.broadcast() } return } @@ -1125,7 +960,7 @@ func (p *PriorityQueue) Update(logger klog.Logger, oldPod, newPod *v1.Pod) { // If pod is not in any of the queues, we put it in the active queue. pInfo := p.newQueuedPodInfo(newPod) if added := p.moveToActiveQ(logger, pInfo, framework.PodUpdate); added { - p.cond.Broadcast() + p.activeQ.broadcast() } } @@ -1136,9 +971,9 @@ func (p *PriorityQueue) Delete(pod *v1.Pod) { defer p.lock.Unlock() p.DeleteNominatedPodIfExists(pod) pInfo := newQueuedPodInfoForLookup(pod) - p.activeQLock.Lock() - defer p.activeQLock.Unlock() - if err := p.activeQ.Delete(pInfo); err != nil { + p.activeQ.getLock().Lock() + defer p.activeQ.getLock().Unlock() + if err := p.activeQ.unlocked().Delete(pInfo); err != nil { // The item was probably not found in the activeQ. p.podBackoffQ.Delete(pInfo) if pInfo = p.unschedulablePods.get(pod); pInfo != nil { @@ -1227,7 +1062,7 @@ func (p *PriorityQueue) requeuePodViaQueueingHint(logger klog.Logger, pInfo *fra return activeQ } if pInfo.Gated { - // In case the pod is gated, the Pod is pushed back to unschedulable Pods pool in addToActiveQ. + // In case the pod is gated, the Pod is pushed back to unschedulable Pods pool in moveToActiveQ. return unschedulablePods } @@ -1279,24 +1114,19 @@ func (p *PriorityQueue) movePodsToActiveOrBackoffQueue(logger klog.Logger, podIn } } - p.activeQLock.Lock() - defer p.activeQLock.Unlock() - p.moveRequestCycle = p.schedulingCycle + p.moveRequestCycle = p.activeQ.schedulingCycle() - if p.isSchedulingQueueHintEnabled && len(p.inFlightPods) != 0 { - logger.V(5).Info("Event received while pods are in flight", "event", event.Label, "numPods", len(p.inFlightPods)) + if p.isSchedulingQueueHintEnabled { // AddUnschedulableIfNotPresent might get called for in-flight Pods later, and in // AddUnschedulableIfNotPresent we need to know whether events were // observed while scheduling them. - p.inFlightEvents.PushBack(&clusterEvent{ - event: event, - oldObj: oldObj, - newObj: newObj, - }) + if added := p.activeQ.addEventIfAnyInFlight(oldObj, newObj, event); added { + logger.V(5).Info("Event received while pods are in flight", "event", event.Label) + } } if activated { - p.cond.Broadcast() + p.activeQ.broadcast() } } @@ -1328,13 +1158,7 @@ func (p *PriorityQueue) getUnschedulablePodsWithCrossTopologyTerm(logger klog.Lo // PodsInActiveQ returns all the Pods in the activeQ. func (p *PriorityQueue) PodsInActiveQ() []*v1.Pod { - p.activeQLock.RLock() - defer p.activeQLock.RUnlock() - var result []*v1.Pod - for _, pInfo := range p.activeQ.List() { - result = append(result, pInfo.Pod) - } - return result + return p.activeQ.list() } var pendingPodsSummary = "activeQ:%v; backoffQ:%v; unschedulablePods:%v" @@ -1345,7 +1169,7 @@ var pendingPodsSummary = "activeQ:%v; backoffQ:%v; unschedulablePods:%v" func (p *PriorityQueue) PendingPods() ([]*v1.Pod, string) { p.lock.RLock() defer p.lock.RUnlock() - result := p.PodsInActiveQ() + result := p.activeQ.list() activeQLen := len(result) for _, pInfo := range p.podBackoffQ.List() { result = append(result, pInfo.Pod) @@ -1356,12 +1180,12 @@ func (p *PriorityQueue) PendingPods() ([]*v1.Pod, string) { return result, fmt.Sprintf(pendingPodsSummary, activeQLen, p.podBackoffQ.Len(), len(p.unschedulablePods.podInfoMap)) } -// Note: this function assumes the caller locks p.lock.RLock. +// Note: this function assumes the caller locks both p.lock.RLock and p.activeQ.getLock().RLock. func (p *PriorityQueue) nominatedPodToInfo(np PodRef) *framework.PodInfo { pod := np.ToPod() pInfoLookup := newQueuedPodInfoForLookup(pod) - queuedPodInfo, exists := p.activeQ.Get(pInfoLookup) + queuedPodInfo, exists := p.activeQ.unlocked().Get(pInfoLookup) if exists { return queuedPodInfo.PodInfo } @@ -1382,8 +1206,8 @@ func (p *PriorityQueue) nominatedPodToInfo(np PodRef) *framework.PodInfo { func (p *PriorityQueue) nominatedPodsToInfo(nominatedPods []PodRef) []*framework.PodInfo { p.lock.RLock() defer p.lock.RUnlock() - p.activeQLock.RLock() - defer p.activeQLock.RUnlock() + p.activeQ.getLock().RLock() + defer p.activeQ.getLock().RUnlock() pods := make([]*framework.PodInfo, len(nominatedPods)) for i, np := range nominatedPods { pods[i] = p.nominatedPodToInfo(np).DeepCopy() @@ -1396,11 +1220,8 @@ func (p *PriorityQueue) Close() { p.lock.Lock() defer p.lock.Unlock() close(p.stop) - p.activeQLock.Lock() - // closed field is locked by activeQLock as it is checked in Pop() without p.lock set. - p.closed = true - p.activeQLock.Unlock() - p.cond.Broadcast() + p.activeQ.close() + p.activeQ.broadcast() } // DeleteNominatedPodIfExists deletes from nominatedPods. @@ -1571,10 +1392,10 @@ func (np PodRef) ToPod() *v1.Pod { // by their UID and update/delete them. type nominator struct { // nLock synchronizes all operations related to nominator. - // Caution: DO NOT take ("SchedulingQueue.lock" or "SchedulingQueue.activeQLock") after taking "nLock". - // You should always take "SchedulingQueue.lock" and "SchedulingQueue.activeQLock" first, + // Caution: DO NOT take ("SchedulingQueue.lock" or "activeQueue.lock") after taking "nLock". + // You should always take "SchedulingQueue.lock" and "activeQueue.lock" first, // otherwise the nominator could end up in deadlock. - // Correct locking order is: SchedulingQueue.lock > SchedulingQueue.activeQLock > nLock. + // Correct locking order is: SchedulingQueue.lock > activeQueue.lock > nLock. nLock sync.RWMutex // podLister is used to verify if the given pod is alive. diff --git a/pkg/scheduler/internal/queue/scheduling_queue_test.go b/pkg/scheduler/internal/queue/scheduling_queue_test.go index 0033482371e..013d33aa202 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue_test.go +++ b/pkg/scheduler/internal/queue/scheduling_queue_test.go @@ -17,7 +17,6 @@ limitations under the License. package queue import ( - "container/list" "context" "fmt" "math" @@ -172,14 +171,6 @@ func TestPriorityQueue_AddWithReversePriorityLessFunc(t *testing.T) { } } -func listToValues(l *list.List) []interface{} { - var values []interface{} - for e := l.Front(); e != nil; e = e.Next() { - values = append(values, e.Value) - } - return values -} - func Test_InFlightPods(t *testing.T) { logger, _ := ktesting.NewTestContext(t) pod := st.MakePod().Name("targetpod").UID("pod1").Obj() @@ -715,8 +706,8 @@ func Test_InFlightPods(t *testing.T) { } actualInFlightPods := make(map[types.UID]*v1.Pod) - for uid, element := range q.inFlightPods { - actualInFlightPods[uid] = element.Value.(*v1.Pod) + for _, pod := range q.activeQ.listInFlightPods() { + actualInFlightPods[pod.UID] = pod } wantInFlightPods := make(map[types.UID]*v1.Pod) for _, pod := range test.wantInFlightPods { @@ -733,35 +724,36 @@ func Test_InFlightPods(t *testing.T) { } wantInFlightEvents = append(wantInFlightEvents, value) } - if diff := cmp.Diff(wantInFlightEvents, listToValues(q.inFlightEvents), cmp.AllowUnexported(clusterEvent{})); diff != "" { + if diff := cmp.Diff(wantInFlightEvents, q.activeQ.listInFlightEvents(), cmp.AllowUnexported(clusterEvent{})); diff != "" { t.Errorf("Unexpected diff in inFlightEvents (-want, +got):\n%s", diff) } if test.wantActiveQPodNames != nil { - podInfos := q.activeQ.List() - if len(podInfos) != len(test.wantActiveQPodNames) { - diff := cmp.Diff(test.wantActiveQPodNames, podInfos, cmpopts.SortSlices(func(a, b interface{}) bool { - return a.(framework.PodInfo).Pod.Name < b.(framework.PodInfo).Pod.Name - })) - t.Fatalf("Length of activeQ is not expected. Got %v, want %v.\n%s", len(podInfos), len(test.wantActiveQPodNames), diff) + pods := q.activeQ.list() + var podNames []string + for _, pod := range pods { + podNames = append(podNames, pod.Name) + } + if diff := cmp.Diff(test.wantActiveQPodNames, podNames); diff != "" { + t.Fatalf("Unexpected diff of activeQ pod names (-want, +got):\n%s", diff) } wantPodNames := sets.New(test.wantActiveQPodNames...) - for _, podInfo := range podInfos { - podGotFromActiveQ := podInfo.Pod - if !wantPodNames.Has(podGotFromActiveQ.Name) { - t.Fatalf("Pod %v was not expected to be in the activeQ.", podGotFromActiveQ.Name) + for _, pod := range pods { + if !wantPodNames.Has(pod.Name) { + t.Fatalf("Pod %v was not expected to be in the activeQ.", pod.Name) } } } if test.wantBackoffQPodNames != nil { podInfos := q.podBackoffQ.List() - if len(podInfos) != len(test.wantBackoffQPodNames) { - diff := cmp.Diff(test.wantBackoffQPodNames, podInfos, cmpopts.SortSlices(func(a, b interface{}) bool { - return a.(framework.PodInfo).Pod.Name < b.(framework.PodInfo).Pod.Name - })) - t.Fatalf("Length of backoffQ is not expected. Got %v, want %v.\n%s", len(podInfos), len(test.wantBackoffQPodNames), diff) + var podNames []string + for _, pInfo := range podInfos { + podNames = append(podNames, pInfo.Pod.Name) + } + if diff := cmp.Diff(test.wantBackoffQPodNames, podNames); diff != "" { + t.Fatalf("Unexpected diff of backoffQ pod names (-want, +got):\n%s", diff) } wantPodNames := sets.New(test.wantBackoffQPodNames...) @@ -1073,7 +1065,7 @@ func TestPriorityQueue_Update(t *testing.T) { prepareFunc: func(t *testing.T, logger klog.Logger, q *PriorityQueue) (oldPod, newPod *v1.Pod) { podInfo := q.newQueuedPodInfo(medPriorityPodInfo.Pod) // We need to once add this Pod to activeQ and Pop() it so that this Pod is registered correctly in inFlightPods. - q.activeQ.AddOrUpdate(podInfo) + q.activeQ.unlocked().AddOrUpdate(podInfo) if p, err := q.Pop(logger); err != nil || p.Pod != medPriorityPodInfo.Pod { t.Errorf("Expected: %v after Pop, but got: %v", medPriorityPodInfo.Pod.Name, p.Pod.Name) } @@ -1109,7 +1101,7 @@ func TestPriorityQueue_Update(t *testing.T) { pInfo = pInfoFromBackoff } - if pInfoFromActive, exists := q.activeQ.Get(newQueuedPodInfoForLookup(newPod)); exists { + if pInfoFromActive, exists := q.activeQ.unlocked().Get(newQueuedPodInfoForLookup(newPod)); exists { if tt.wantQ != activeQ { t.Errorf("expected pod %s not to be queued to activeQ, but it was", newPod.Name) } @@ -1204,10 +1196,10 @@ func TestPriorityQueue_Delete(t *testing.T) { q.Update(logger, highPriorityPodInfo.Pod, highPriNominatedPodInfo.Pod) q.Add(logger, unschedulablePodInfo.Pod) q.Delete(highPriNominatedPodInfo.Pod) - if !q.activeQ.Has(newQueuedPodInfoForLookup(unschedulablePodInfo.Pod)) { + if !q.activeQ.unlocked().Has(newQueuedPodInfoForLookup(unschedulablePodInfo.Pod)) { t.Errorf("Expected %v to be in activeQ.", unschedulablePodInfo.Pod.Name) } - if q.activeQ.Has(newQueuedPodInfoForLookup(highPriNominatedPodInfo.Pod)) { + if q.activeQ.unlocked().Has(newQueuedPodInfoForLookup(highPriNominatedPodInfo.Pod)) { t.Errorf("Didn't expect %v to be in activeQ.", highPriorityPodInfo.Pod.Name) } if len(q.nominator.nominatedPods) != 1 { @@ -1263,7 +1255,7 @@ func TestPriorityQueue_Activate(t *testing.T) { // Prepare activeQ/unschedulablePods/podBackoffQ according to the table for _, qPodInfo := range tt.qPodInfoInActiveQ { - q.activeQ.AddOrUpdate(qPodInfo) + q.activeQ.unlocked().AddOrUpdate(qPodInfo) } for _, qPodInfo := range tt.qPodInfoInUnschedulablePods { @@ -1278,13 +1270,13 @@ func TestPriorityQueue_Activate(t *testing.T) { q.Activate(logger, map[string]*v1.Pod{"test_pod": tt.qPodInfoToActivate.PodInfo.Pod}) // Check the result after activation by the length of activeQ - if wantLen := len(tt.want); q.activeQ.Len() != wantLen { - t.Errorf("length compare: want %v, got %v", wantLen, q.activeQ.Len()) + if wantLen := len(tt.want); q.activeQ.len() != wantLen { + t.Errorf("length compare: want %v, got %v", wantLen, q.activeQ.len()) } // Check if the specific pod exists in activeQ for _, want := range tt.want { - if !q.activeQ.Has(newQueuedPodInfoForLookup(want.PodInfo.Pod)) { + if !q.activeQ.unlocked().Has(newQueuedPodInfoForLookup(want.PodInfo.Pod)) { t.Errorf("podInfo not exist in activeQ: want %v", want.PodInfo.Pod.Name) } } @@ -1570,7 +1562,7 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueueWithQueueingHint(t *testing. } cl := testingclock.NewFakeClock(now) q := NewTestQueue(ctx, newDefaultQueueSort(), WithQueueingHintMapPerProfile(m), WithClock(cl)) - q.activeQ.AddOrUpdate(q.newQueuedPodInfo(test.podInfo.Pod)) + q.activeQ.unlocked().AddOrUpdate(q.newQueuedPodInfo(test.podInfo.Pod)) if p, err := q.Pop(logger); err != nil || p.Pod != test.podInfo.Pod { t.Errorf("Expected: %v after Pop, but got: %v", test.podInfo.Pod.Name, p.Pod.Name) } @@ -1587,7 +1579,7 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueueWithQueueingHint(t *testing. t.Fatalf("expected pod to be queued to backoffQ, but it was not") } - if q.activeQ.Len() == 0 && test.expectedQ == activeQ { + if q.activeQ.len() == 0 && test.expectedQ == activeQ { t.Fatalf("expected pod to be queued to activeQ, but it was not") } @@ -1614,12 +1606,12 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueue(t *testing.T) { } q := NewTestQueue(ctx, newDefaultQueueSort(), WithClock(c), WithQueueingHintMapPerProfile(m)) // To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before AddUnschedulableIfNotPresent()s below. - q.activeQ.AddOrUpdate(q.newQueuedPodInfo(unschedulablePodInfo.Pod)) + q.activeQ.unlocked().AddOrUpdate(q.newQueuedPodInfo(unschedulablePodInfo.Pod)) if p, err := q.Pop(logger); err != nil || p.Pod != unschedulablePodInfo.Pod { t.Errorf("Expected: %v after Pop, but got: %v", unschedulablePodInfo.Pod.Name, p.Pod.Name) } expectInFlightPods(t, q, unschedulablePodInfo.Pod.UID) - q.activeQ.AddOrUpdate(q.newQueuedPodInfo(highPriorityPodInfo.Pod)) + q.activeQ.unlocked().AddOrUpdate(q.newQueuedPodInfo(highPriorityPodInfo.Pod)) if p, err := q.Pop(logger); err != nil || p.Pod != highPriorityPodInfo.Pod { t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPodInfo.Pod.Name, p.Pod.Name) } @@ -1635,7 +1627,7 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueue(t *testing.T) { expectInFlightPods(t, q) // Construct a Pod, but don't associate its scheduler failure to any plugin hpp1 := clonePod(highPriorityPodInfo.Pod, "hpp1") - q.activeQ.AddOrUpdate(q.newQueuedPodInfo(hpp1)) + q.activeQ.unlocked().AddOrUpdate(q.newQueuedPodInfo(hpp1)) if p, err := q.Pop(logger); err != nil || p.Pod != hpp1 { t.Errorf("Expected: %v after Pop, but got: %v", hpp1, p.Pod.Name) } @@ -1648,7 +1640,7 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueue(t *testing.T) { expectInFlightPods(t, q) // Construct another Pod, and associate its scheduler failure to plugin "barPlugin". hpp2 := clonePod(highPriorityPodInfo.Pod, "hpp2") - q.activeQ.AddOrUpdate(q.newQueuedPodInfo(hpp2)) + q.activeQ.unlocked().AddOrUpdate(q.newQueuedPodInfo(hpp2)) if p, err := q.Pop(logger); err != nil || p.Pod != hpp2 { t.Errorf("Expected: %v after Pop, but got: %v", hpp2, p.Pod.Name) } @@ -1663,8 +1655,8 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueue(t *testing.T) { // because of the queueing hint function registered for NodeAdd/fooPlugin. q.MoveAllToActiveOrBackoffQueue(logger, framework.NodeAdd, nil, nil, nil) q.Add(logger, medPriorityPodInfo.Pod) - if q.activeQ.Len() != 1 { - t.Errorf("Expected 1 item to be in activeQ, but got: %v", q.activeQ.Len()) + if q.activeQ.len() != 1 { + t.Errorf("Expected 1 item to be in activeQ, but got: %v", q.activeQ.len()) } // Pop out the medPriorityPodInfo in activeQ. if p, err := q.Pop(logger); err != nil || p.Pod != medPriorityPodInfo.Pod { @@ -1683,18 +1675,17 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueue(t *testing.T) { } expectInFlightPods(t, q, medPriorityPodInfo.Pod.UID) - q.schedulingCycle++ - q.activeQ.AddOrUpdate(q.newQueuedPodInfo(unschedulablePodInfo.Pod)) + q.activeQ.unlocked().AddOrUpdate(q.newQueuedPodInfo(unschedulablePodInfo.Pod)) if p, err := q.Pop(logger); err != nil || p.Pod != unschedulablePodInfo.Pod { t.Errorf("Expected: %v after Pop, but got: %v", unschedulablePodInfo.Pod.Name, p.Pod.Name) } expectInFlightPods(t, q, medPriorityPodInfo.Pod.UID, unschedulablePodInfo.Pod.UID) - q.activeQ.AddOrUpdate(q.newQueuedPodInfo(highPriorityPodInfo.Pod)) + q.activeQ.unlocked().AddOrUpdate(q.newQueuedPodInfo(highPriorityPodInfo.Pod)) if p, err := q.Pop(logger); err != nil || p.Pod != highPriorityPodInfo.Pod { t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPodInfo.Pod.Name, p.Pod.Name) } expectInFlightPods(t, q, medPriorityPodInfo.Pod.UID, unschedulablePodInfo.Pod.UID, highPriorityPodInfo.Pod.UID) - q.activeQ.AddOrUpdate(q.newQueuedPodInfo(hpp1)) + q.activeQ.unlocked().AddOrUpdate(q.newQueuedPodInfo(hpp1)) if p, err := q.Pop(logger); err != nil || p.Pod != hpp1 { t.Errorf("Expected: %v after Pop, but got: %v", hpp1, p.Pod.Name) } @@ -1734,8 +1725,8 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueue(t *testing.T) { c.Step(q.podInitialBackoffDuration) q.flushBackoffQCompleted(logger) // flush the completed backoffQ to move hpp1 to activeQ. q.MoveAllToActiveOrBackoffQueue(logger, framework.NodeAdd, nil, nil, nil) - if q.activeQ.Len() != 4 { - t.Errorf("Expected 4 items to be in activeQ, but got: %v", q.activeQ.Len()) + if q.activeQ.len() != 4 { + t.Errorf("Expected 4 items to be in activeQ, but got: %v", q.activeQ.len()) } if q.podBackoffQ.Len() != 0 { t.Errorf("Expected 0 item to be in podBackoffQ, but got: %v", q.podBackoffQ.Len()) @@ -1789,8 +1780,8 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueueWithOutQueueingHint(t *testi // This NodeAdd event moves unschedulablePodInfo and highPriorityPodInfo to the backoffQ, // because of the queueing hint function registered for NodeAdd/fooPlugin. q.MoveAllToActiveOrBackoffQueue(logger, framework.NodeAdd, nil, nil, nil) - if q.activeQ.Len() != 1 { - t.Errorf("Expected 1 item to be in activeQ, but got: %v", q.activeQ.Len()) + if q.activeQ.len() != 1 { + t.Errorf("Expected 1 item to be in activeQ, but got: %v", q.activeQ.len()) } // Pop out the medPriorityPodInfo in activeQ. if p, err := q.Pop(logger); err != nil || p.Pod != medPriorityPodInfo.Pod { @@ -1810,7 +1801,6 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueueWithOutQueueingHint(t *testi } } - q.schedulingCycle++ unschedulableQueuedPodInfo := q.newQueuedPodInfo(unschedulablePodInfo.Pod, "fooPlugin") highPriorityQueuedPodInfo := q.newQueuedPodInfo(highPriorityPodInfo.Pod, "fooPlugin") hpp1QueuedPodInfo := q.newQueuedPodInfo(hpp1) @@ -1841,8 +1831,8 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueueWithOutQueueingHint(t *testi c.Step(q.podInitialBackoffDuration) q.flushBackoffQCompleted(logger) // flush the completed backoffQ to move hpp1 to activeQ. q.MoveAllToActiveOrBackoffQueue(logger, framework.NodeAdd, nil, nil, nil) - if q.activeQ.Len() != 4 { - t.Errorf("Expected 4 items to be in activeQ, but got: %v", q.activeQ.Len()) + if q.activeQ.len() != 4 { + t.Errorf("Expected 4 items to be in activeQ, but got: %v", q.activeQ.len()) } if q.podBackoffQ.Len() != 0 { t.Errorf("Expected 0 item to be in podBackoffQ, but got: %v", q.podBackoffQ.Len()) @@ -1863,16 +1853,17 @@ func clonePod(pod *v1.Pod, newName string) *v1.Pod { func expectInFlightPods(t *testing.T, q *PriorityQueue, uids ...types.UID) { t.Helper() var actualUIDs []types.UID - for uid := range q.inFlightPods { - actualUIDs = append(actualUIDs, uid) + for _, pod := range q.activeQ.listInFlightPods() { + actualUIDs = append(actualUIDs, pod.UID) } sortUIDs := cmpopts.SortSlices(func(a, b types.UID) bool { return a < b }) if diff := cmp.Diff(uids, actualUIDs, sortUIDs); diff != "" { t.Fatalf("Unexpected content of inFlightPods (-want, +have):\n%s", diff) } actualUIDs = nil - for e := q.inFlightEvents.Front(); e != nil; e = e.Next() { - if pod, ok := e.Value.(*v1.Pod); ok { + events := q.activeQ.listInFlightEvents() + for _, e := range events { + if pod, ok := e.(*v1.Pod); ok { actualUIDs = append(actualUIDs, pod.UID) } } @@ -1954,7 +1945,7 @@ func TestPriorityQueue_AssignedPodAdded_(t *testing.T) { q := NewTestQueue(ctx, newDefaultQueueSort(), WithClock(c), WithQueueingHintMapPerProfile(m)) // To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before AddUnschedulableIfNotPresent()s below. - q.activeQ.AddOrUpdate(q.newQueuedPodInfo(tt.unschedPod)) + q.activeQ.unlocked().AddOrUpdate(q.newQueuedPodInfo(tt.unschedPod)) if p, err := q.Pop(logger); err != nil || p.Pod != tt.unschedPod { t.Errorf("Expected: %v after Pop, but got: %v", tt.unschedPod.Name, p.Pod.Name) } @@ -1969,7 +1960,7 @@ func TestPriorityQueue_AssignedPodAdded_(t *testing.T) { q.AssignedPodAdded(logger, tt.updatedAssignedPod) - if q.activeQ.Has(newQueuedPodInfoForLookup(tt.unschedPod)) != tt.wantToRequeue { + if q.activeQ.unlocked().Has(newQueuedPodInfoForLookup(tt.unschedPod)) != tt.wantToRequeue { t.Fatalf("unexpected Pod move: Pod should be requeued: %v. Pod is actually requeued: %v", tt.wantToRequeue, !tt.wantToRequeue) } }) @@ -2069,11 +2060,11 @@ func TestPriorityQueue_PendingPods(t *testing.T) { defer cancel() q := NewTestQueue(ctx, newDefaultQueueSort()) // To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before AddUnschedulableIfNotPresent()s below. - q.activeQ.AddOrUpdate(q.newQueuedPodInfo(unschedulablePodInfo.Pod)) + q.activeQ.unlocked().AddOrUpdate(q.newQueuedPodInfo(unschedulablePodInfo.Pod)) if p, err := q.Pop(logger); err != nil || p.Pod != unschedulablePodInfo.Pod { t.Errorf("Expected: %v after Pop, but got: %v", unschedulablePodInfo.Pod.Name, p.Pod.Name) } - q.activeQ.AddOrUpdate(q.newQueuedPodInfo(highPriorityPodInfo.Pod)) + q.activeQ.unlocked().AddOrUpdate(q.newQueuedPodInfo(highPriorityPodInfo.Pod)) if p, err := q.Pop(logger); err != nil || p.Pod != highPriorityPodInfo.Pod { t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPodInfo.Pod.Name, p.Pod.Name) } @@ -2419,7 +2410,7 @@ func TestPodFailedSchedulingMultipleTimesDoesNotBlockNewerPod(t *testing.T) { }) // To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before AddUnschedulableIfNotPresent() below. - q.activeQ.AddOrUpdate(q.newQueuedPodInfo(unschedulablePod)) + q.activeQ.unlocked().AddOrUpdate(q.newQueuedPodInfo(unschedulablePod)) if p, err := q.Pop(logger); err != nil || p.Pod != unschedulablePod { t.Errorf("Expected: %v after Pop, but got: %v", unschedulablePod.Name, p.Pod.Name) } @@ -2557,11 +2548,11 @@ func TestHighPriorityFlushUnschedulablePodsLeftover(t *testing.T) { }) // To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before AddUnschedulableIfNotPresent()s below. - q.activeQ.AddOrUpdate(q.newQueuedPodInfo(highPod)) + q.activeQ.unlocked().AddOrUpdate(q.newQueuedPodInfo(highPod)) if p, err := q.Pop(logger); err != nil || p.Pod != highPod { t.Errorf("Expected: %v after Pop, but got: %v", highPod.Name, p.Pod.Name) } - q.activeQ.AddOrUpdate(q.newQueuedPodInfo(midPod)) + q.activeQ.unlocked().AddOrUpdate(q.newQueuedPodInfo(midPod)) if p, err := q.Pop(logger); err != nil || p.Pod != midPod { t.Errorf("Expected: %v after Pop, but got: %v", midPod.Name, p.Pod.Name) } @@ -2610,7 +2601,7 @@ func TestPriorityQueue_initPodMaxInUnschedulablePodsDuration(t *testing.T) { operations: []operation{ addPodUnschedulablePods, addPodUnschedulablePods, - flushUnschedulerQ, + flushUnscheduledQ, }, operands: []*framework.QueuedPodInfo{pInfo1, pInfo2, nil}, expected: []*framework.QueuedPodInfo{pInfo2, pInfo1}, @@ -2621,7 +2612,7 @@ func TestPriorityQueue_initPodMaxInUnschedulablePodsDuration(t *testing.T) { operations: []operation{ addPodUnschedulablePods, addPodUnschedulablePods, - flushUnschedulerQ, + flushUnscheduledQ, }, operands: []*framework.QueuedPodInfo{pInfo1, pInfo2, nil}, expected: []*framework.QueuedPodInfo{pInfo2, pInfo1}, @@ -2650,15 +2641,17 @@ func TestPriorityQueue_initPodMaxInUnschedulablePodsDuration(t *testing.T) { } expectedLen := len(test.expected) - if queue.activeQ.Len() != expectedLen { - t.Fatalf("Expected %v items to be in activeQ, but got: %v", expectedLen, queue.activeQ.Len()) + if queue.activeQ.len() != expectedLen { + t.Fatalf("Expected %v items to be in activeQ, but got: %v", expectedLen, queue.activeQ.len()) } for i := 0; i < expectedLen; i++ { - if pInfo, err := queue.activeQ.Pop(); err != nil { + if pInfo, err := queue.activeQ.pop(logger); err != nil { t.Errorf("Error while popping the head of the queue: %v", err) } else { podInfoList = append(podInfoList, pInfo) + // Cleanup attempts counter incremented in activeQ.pop() + pInfo.Attempts = 0 } } @@ -2679,7 +2672,7 @@ var ( // To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before AddUnschedulableIfNotPresent() below. // UnschedulablePlugins will get cleared by Pop, so make a copy first. unschedulablePlugins := pInfo.UnschedulablePlugins.Clone() - queue.activeQ.AddOrUpdate(queue.newQueuedPodInfo(pInfo.Pod)) + queue.activeQ.unlocked().AddOrUpdate(queue.newQueuedPodInfo(pInfo.Pod)) p, err := queue.Pop(logger) if err != nil { t.Fatalf("Unexpected error during Pop: %v", err) @@ -2695,7 +2688,7 @@ var ( } popAndRequeueAsBackoff = func(t *testing.T, logger klog.Logger, queue *PriorityQueue, pInfo *framework.QueuedPodInfo) { // To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before AddUnschedulableIfNotPresent() below. - queue.activeQ.AddOrUpdate(queue.newQueuedPodInfo(pInfo.Pod)) + queue.activeQ.unlocked().AddOrUpdate(queue.newQueuedPodInfo(pInfo.Pod)) p, err := queue.Pop(logger) if err != nil { t.Fatalf("Unexpected error during Pop: %v", err) @@ -2709,7 +2702,7 @@ var ( } } addPodActiveQ = func(t *testing.T, logger klog.Logger, queue *PriorityQueue, pInfo *framework.QueuedPodInfo) { - queue.activeQ.AddOrUpdate(pInfo) + queue.activeQ.unlocked().AddOrUpdate(pInfo) } addPodUnschedulablePods = func(t *testing.T, logger klog.Logger, queue *PriorityQueue, pInfo *framework.QueuedPodInfo) { if !pInfo.Gated { @@ -2744,7 +2737,7 @@ var ( moveClockForward = func(t *testing.T, logger klog.Logger, queue *PriorityQueue, _ *framework.QueuedPodInfo) { queue.clock.(*testingclock.FakeClock).Step(2 * time.Second) } - flushUnschedulerQ = func(t *testing.T, logger klog.Logger, queue *PriorityQueue, _ *framework.QueuedPodInfo) { + flushUnscheduledQ = func(t *testing.T, logger klog.Logger, queue *PriorityQueue, _ *framework.QueuedPodInfo) { queue.clock.(*testingclock.FakeClock).Step(queue.podMaxInUnschedulablePodsDuration) queue.flushUnschedulablePodsLeftover(logger) } @@ -2817,15 +2810,17 @@ func TestPodTimestamp(t *testing.T) { } expectedLen := len(test.expected) - if queue.activeQ.Len() != expectedLen { - t.Fatalf("Expected %v items to be in activeQ, but got: %v", expectedLen, queue.activeQ.Len()) + if queue.activeQ.len() != expectedLen { + t.Fatalf("Expected %v items to be in activeQ, but got: %v", expectedLen, queue.activeQ.len()) } for i := 0; i < expectedLen; i++ { - if pInfo, err := queue.activeQ.Pop(); err != nil { + if pInfo, err := queue.activeQ.pop(logger); err != nil { t.Errorf("Error while popping the head of the queue: %v", err) } else { podInfoList = append(podInfoList, pInfo) + // Cleanup attempts counter incremented in activeQ.pop() + pInfo.Attempts = 0 } } @@ -3453,12 +3448,12 @@ func TestMoveAllToActiveOrBackoffQueue_PreEnqueueChecks(t *testing.T) { q := NewTestQueue(ctx, newDefaultQueueSort()) for i, podInfo := range tt.podInfos { // To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before AddUnschedulableIfNotPresent() below. - q.activeQ.AddOrUpdate(q.newQueuedPodInfo(podInfo.Pod)) + q.activeQ.unlocked().AddOrUpdate(q.newQueuedPodInfo(podInfo.Pod)) if p, err := q.Pop(logger); err != nil || p.Pod != podInfo.Pod { t.Errorf("Expected: %v after Pop, but got: %v", podInfo.Pod.Name, p.Pod.Name) } podInfo.UnschedulablePlugins = sets.New("plugin") - err := q.AddUnschedulableIfNotPresent(logger, podInfo, q.schedulingCycle) + err := q.AddUnschedulableIfNotPresent(logger, podInfo, q.activeQ.schedulingCycle()) if err != nil { t.Fatalf("unexpected error from AddUnschedulableIfNotPresent: %v", err) }