Move activeQ related fields to separate struct in scheduling queue

This commit is contained in:
Maciej Skoczeń 2024-07-22 10:13:13 +00:00
parent 5d10ab5cd5
commit 8e630a9f68
3 changed files with 465 additions and 312 deletions

View File

@ -0,0 +1,337 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package queue
import (
"container/list"
"fmt"
"sync"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/internal/heap"
"k8s.io/kubernetes/pkg/scheduler/metrics"
)
// activeQueuer is a wrapper for activeQ related operations.
// Its methods, except "unlocked" ones, take the lock inside.
// Note: be careful when using unlocked() methods.
// getLock() methods should be used only for unlocked() methods
// and it is forbidden to call any other activeQueuer's method under this lock.
type activeQueuer interface {
getLock() *sync.RWMutex
unlocked() unlockedActiveQueuer
pop(logger klog.Logger) (*framework.QueuedPodInfo, error)
list() []*v1.Pod
len() int
has(pInfo *framework.QueuedPodInfo) bool
listInFlightEvents() []interface{}
listInFlightPods() []*v1.Pod
clusterEventsForPod(logger klog.Logger, pInfo *framework.QueuedPodInfo) ([]*clusterEvent, error)
addEventIfPodInFlight(oldPod, newPod *v1.Pod, event framework.ClusterEvent) bool
addEventIfAnyInFlight(oldObj, newObj interface{}, event framework.ClusterEvent) bool
schedulingCycle() int64
done(pod types.UID)
close()
broadcast()
}
// unlockedActiveQueuer defines activeQ methods that are not protected by the lock itself.
// getLock() methods should be used to protect these methods.
type unlockedActiveQueuer interface {
Get(pInfo *framework.QueuedPodInfo) (*framework.QueuedPodInfo, bool)
Has(pInfo *framework.QueuedPodInfo) bool
AddOrUpdate(pInfo *framework.QueuedPodInfo)
Delete(pInfo *framework.QueuedPodInfo) error
}
// activeQueue implements activeQueuer. All of the fields have to be protected using the lock.
type activeQueue struct {
// lock synchronizes all operations related to activeQ.
// It protects activeQ, inFlightPods, inFlightEvents, schedulingCycle and closed fields.
// Caution: DO NOT take "SchedulingQueue.lock" after taking "lock".
// You should always take "SchedulingQueue.lock" first, otherwise the queue could end up in deadlock.
// "lock" should not be taken after taking "nLock".
// Correct locking order is: SchedulingQueue.lock > lock > nominator.nLock.
lock sync.RWMutex
// activeQ is heap structure that scheduler actively looks at to find pods to
// schedule. Head of heap is the highest priority pod.
queue *heap.Heap[*framework.QueuedPodInfo]
// cond is a condition that is notified when the pod is added to activeQ.
// It is used with lock.
cond sync.Cond
// inFlightPods holds the UID of all pods which have been popped out for which Done
// hasn't been called yet - in other words, all pods that are currently being
// processed (being scheduled, in permit, or in the binding cycle).
//
// The values in the map are the entry of each pod in the inFlightEvents list.
// The value of that entry is the *v1.Pod at the time that scheduling of that
// pod started, which can be useful for logging or debugging.
inFlightPods map[types.UID]*list.Element
// inFlightEvents holds the events received by the scheduling queue
// (entry value is clusterEvent) together with in-flight pods (entry
// value is *v1.Pod). Entries get added at the end while the mutex is
// locked, so they get serialized.
//
// The pod entries are added in Pop and used to track which events
// occurred after the pod scheduling attempt for that pod started.
// They get removed when the scheduling attempt is done, at which
// point all events that occurred in the meantime are processed.
//
// After removal of a pod, events at the start of the list are no
// longer needed because all of the other in-flight pods started
// later. Those events can be removed.
inFlightEvents *list.List
// schedCycle represents sequence number of scheduling cycle and is incremented
// when a pod is popped.
schedCycle int64
// closed indicates that the queue is closed.
// It is mainly used to let Pop() exit its control loop while waiting for an item.
closed bool
// isSchedulingQueueHintEnabled indicates whether the feature gate for the scheduling queue is enabled.
isSchedulingQueueHintEnabled bool
}
func newActiveQueue(queue *heap.Heap[*framework.QueuedPodInfo], isSchedulingQueueHintEnabled bool) *activeQueue {
aq := &activeQueue{
queue: queue,
inFlightPods: make(map[types.UID]*list.Element),
inFlightEvents: list.New(),
isSchedulingQueueHintEnabled: isSchedulingQueueHintEnabled,
}
aq.cond.L = &aq.lock
return aq
}
// getLock returns lock of activeQueue. Its methods should be used only to protect the unlocked() methods.
func (aq *activeQueue) getLock() *sync.RWMutex {
return &aq.lock
}
// unlocked returns queue methods, that are not protected by the lock itself.
// getLock() methods should be used to protect queue methods.
func (aq *activeQueue) unlocked() unlockedActiveQueuer {
return aq.queue
}
// pop removes the head of the queue and returns it.
// It blocks if the queue is empty and waits until a new item is added to the queue.
// It increments scheduling cycle when a pod is popped.
func (aq *activeQueue) pop(logger klog.Logger) (*framework.QueuedPodInfo, error) {
aq.lock.Lock()
defer aq.lock.Unlock()
for aq.queue.Len() == 0 {
// When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
// When Close() is called, the p.closed is set and the condition is broadcast,
// which causes this loop to continue and return from the Pop().
if aq.closed {
logger.V(2).Info("Scheduling queue is closed")
return nil, nil
}
aq.cond.Wait()
}
pInfo, err := aq.queue.Pop()
if err != nil {
return nil, err
}
pInfo.Attempts++
aq.schedCycle++
// In flight, no concurrent events yet.
if aq.isSchedulingQueueHintEnabled {
aq.inFlightPods[pInfo.Pod.UID] = aq.inFlightEvents.PushBack(pInfo.Pod)
}
// Update metrics and reset the set of unschedulable plugins for the next attempt.
for plugin := range pInfo.UnschedulablePlugins.Union(pInfo.PendingPlugins) {
metrics.UnschedulableReason(plugin, pInfo.Pod.Spec.SchedulerName).Dec()
}
pInfo.UnschedulablePlugins.Clear()
pInfo.PendingPlugins.Clear()
return pInfo, nil
}
// list returns all pods that are in the queue.
func (aq *activeQueue) list() []*v1.Pod {
aq.lock.RLock()
defer aq.lock.RUnlock()
var result []*v1.Pod
for _, pInfo := range aq.queue.List() {
result = append(result, pInfo.Pod)
}
return result
}
// len returns length of the queue.
func (aq *activeQueue) len() int {
return aq.queue.Len()
}
// has inform if pInfo exists in the queue.
func (aq *activeQueue) has(pInfo *framework.QueuedPodInfo) bool {
aq.lock.RLock()
defer aq.lock.RUnlock()
return aq.queue.Has(pInfo)
}
// listInFlightEvents returns all inFlightEvents.
func (aq *activeQueue) listInFlightEvents() []interface{} {
aq.lock.RLock()
defer aq.lock.RUnlock()
var values []interface{}
for event := aq.inFlightEvents.Front(); event != nil; event = event.Next() {
values = append(values, event.Value)
}
return values
}
// listInFlightPods returns all inFlightPods.
func (aq *activeQueue) listInFlightPods() []*v1.Pod {
aq.lock.RLock()
defer aq.lock.RUnlock()
var pods []*v1.Pod
for _, obj := range aq.inFlightPods {
pods = append(pods, obj.Value.(*v1.Pod))
}
return pods
}
// clusterEventsForPod gets all cluster events that have happened during pod for pInfo is being scheduled.
func (aq *activeQueue) clusterEventsForPod(logger klog.Logger, pInfo *framework.QueuedPodInfo) ([]*clusterEvent, error) {
aq.lock.RLock()
defer aq.lock.RUnlock()
logger.V(5).Info("Checking events for in-flight pod", "pod", klog.KObj(pInfo.Pod), "unschedulablePlugins", pInfo.UnschedulablePlugins, "inFlightEventsSize", aq.inFlightEvents.Len(), "inFlightPodsSize", len(aq.inFlightPods))
// AddUnschedulableIfNotPresent is called with the Pod at the end of scheduling or binding.
// So, given pInfo should have been Pop()ed before,
// we can assume pInfo must be recorded in inFlightPods and thus inFlightEvents.
inFlightPod, ok := aq.inFlightPods[pInfo.Pod.UID]
if !ok {
return nil, fmt.Errorf("in flight Pod isn't found in the scheduling queue. If you see this error log, it's likely a bug in the scheduler")
}
var events []*clusterEvent
for event := inFlightPod.Next(); event != nil; event = event.Next() {
e, ok := event.Value.(*clusterEvent)
if !ok {
// Must be another in-flight Pod (*v1.Pod). Can be ignored.
continue
}
events = append(events, e)
}
return events, nil
}
// addEventIfPodInFlight adds clusterEvent to inFlightEvents if the newPod is in inFlightPods.
// It returns true if pushed the event to the inFlightEvents.
func (aq *activeQueue) addEventIfPodInFlight(oldPod, newPod *v1.Pod, event framework.ClusterEvent) bool {
aq.lock.Lock()
defer aq.lock.Unlock()
_, ok := aq.inFlightPods[newPod.UID]
if ok {
aq.inFlightEvents.PushBack(&clusterEvent{
event: event,
oldObj: oldPod,
newObj: newPod,
})
}
return ok
}
// addEventIfAnyInFlight adds clusterEvent to inFlightEvents if any pod is in inFlightPods.
// It returns true if pushed the event to the inFlightEvents.
func (aq *activeQueue) addEventIfAnyInFlight(oldObj, newObj interface{}, event framework.ClusterEvent) bool {
aq.lock.Lock()
defer aq.lock.Unlock()
if len(aq.inFlightPods) != 0 {
aq.inFlightEvents.PushBack(&clusterEvent{
event: event,
oldObj: oldObj,
newObj: newObj,
})
return true
}
return false
}
func (aq *activeQueue) schedulingCycle() int64 {
aq.lock.RLock()
defer aq.lock.RUnlock()
return aq.schedCycle
}
// done must be called for pod returned by Pop. This allows the queue to
// keep track of which pods are currently being processed.
func (aq *activeQueue) done(pod types.UID) {
aq.lock.Lock()
defer aq.lock.Unlock()
inFlightPod, ok := aq.inFlightPods[pod]
if !ok {
// This Pod is already done()ed.
return
}
delete(aq.inFlightPods, pod)
// Remove the pod from the list.
aq.inFlightEvents.Remove(inFlightPod)
// Remove events which are only referred to by this Pod
// so that the inFlightEvents list doesn't grow infinitely.
// If the pod was at the head of the list, then all
// events between it and the next pod are no longer needed
// and can be removed.
for {
e := aq.inFlightEvents.Front()
if e == nil {
// Empty list.
break
}
if _, ok := e.Value.(*clusterEvent); !ok {
// A pod, must stop pruning.
break
}
aq.inFlightEvents.Remove(e)
}
}
// close closes the activeQueue.
func (aq *activeQueue) close() {
aq.lock.Lock()
aq.closed = true
aq.lock.Unlock()
}
// broadcast notifies the pop() operation that new pod(s) was added to the activeQueue.
func (aq *activeQueue) broadcast() {
aq.cond.Broadcast()
}

View File

@ -27,7 +27,6 @@ limitations under the License.
package queue
import (
"container/list"
"context"
"fmt"
"math/rand"
@ -159,8 +158,8 @@ type PriorityQueue struct {
clock clock.Clock
// lock takes precedence and should be taken first,
// before any other locks in the queue (activeQLock or nominator.nLock).
// Correct locking order is: lock > activeQLock > nominator.nLock.
// before any other locks in the queue (activeQueue.lock or nominator.nLock).
// Correct locking order is: lock > activeQueue.lock > nominator.nLock.
lock sync.RWMutex
// pod initial backoff duration.
@ -170,58 +169,12 @@ type PriorityQueue struct {
// the maximum time a pod can stay in the unschedulablePods.
podMaxInUnschedulablePodsDuration time.Duration
// cond is a condition that is notified when the pod is added to activeQ.
// It is used with activeQLock.
cond sync.Cond
// activeQLock synchronizes all operations related to activeQ.
// It protects activeQ, inFlightPods, inFlightEvents, schedulingCycle and closed fields.
// Caution: DO NOT take "lock" after taking "activeQLock".
// You should always take "lock" first, otherwise the queue could end up in deadlock.
// "activeQLock" should not be taken after taking "nLock".
// Correct locking order is: lock > activeQLock > nominator.nLock.
activeQLock sync.RWMutex
// inFlightPods holds the UID of all pods which have been popped out for which Done
// hasn't been called yet - in other words, all pods that are currently being
// processed (being scheduled, in permit, or in the binding cycle).
//
// The values in the map are the entry of each pod in the inFlightEvents list.
// The value of that entry is the *v1.Pod at the time that scheduling of that
// pod started, which can be useful for logging or debugging.
//
// It should be protected by activeQLock.
inFlightPods map[types.UID]*list.Element
// inFlightEvents holds the events received by the scheduling queue
// (entry value is clusterEvent) together with in-flight pods (entry
// value is *v1.Pod). Entries get added at the end while the mutex is
// locked, so they get serialized.
//
// The pod entries are added in Pop and used to track which events
// occurred after the pod scheduling attempt for that pod started.
// They get removed when the scheduling attempt is done, at which
// point all events that occurred in the meantime are processed.
//
// After removal of a pod, events at the start of the list are no
// longer needed because all of the other in-flight pods started
// later. Those events can be removed.
//
// It should be protected by activeQLock.
inFlightEvents *list.List
// activeQ is heap structure that scheduler actively looks at to find pods to
// schedule. Head of heap is the highest priority pod. It should be protected by activeQLock.
activeQ *heap.Heap[*framework.QueuedPodInfo]
activeQ activeQueuer
// podBackoffQ is a heap ordered by backoff expiry. Pods which have completed backoff
// are popped from this heap before the scheduler looks at activeQ
podBackoffQ *heap.Heap[*framework.QueuedPodInfo]
// unschedulablePods holds pods that have been tried and determined unschedulable.
unschedulablePods *UnschedulablePods
// schedulingCycle represents sequence number of scheduling cycle and is incremented
// when a pod is popped.
// It should be protected by activeQLock.
schedulingCycle int64
// moveRequestCycle caches the sequence number of scheduling cycle when we
// received a move request. Unschedulable pods in and before this scheduling
// cycle will be put back to activeQueue if we were trying to schedule them
@ -234,11 +187,6 @@ type PriorityQueue struct {
// queueingHintMap is keyed with profile name, valued with registered queueing hint functions.
queueingHintMap QueueingHintMapPerProfile
// closed indicates that the queue is closed.
// It is mainly used to let Pop() exit its control loop while waiting for an item.
// It should be protected by activeQLock.
closed bool
nsLister listersv1.NamespaceLister
metricsRecorder metrics.MetricAsyncRecorder
@ -382,24 +330,23 @@ func NewPriorityQueue(
opt(&options)
}
isSchedulingQueueHintEnabled := utilfeature.DefaultFeatureGate.Enabled(features.SchedulerQueueingHints)
pq := &PriorityQueue{
clock: options.clock,
stop: make(chan struct{}),
podInitialBackoffDuration: options.podInitialBackoffDuration,
podMaxBackoffDuration: options.podMaxBackoffDuration,
podMaxInUnschedulablePodsDuration: options.podMaxInUnschedulablePodsDuration,
activeQ: heap.NewWithRecorder(podInfoKeyFunc, heap.LessFunc[*framework.QueuedPodInfo](lessFn), metrics.NewActivePodsRecorder()),
activeQ: newActiveQueue(heap.NewWithRecorder(podInfoKeyFunc, heap.LessFunc[*framework.QueuedPodInfo](lessFn), metrics.NewActivePodsRecorder()), isSchedulingQueueHintEnabled),
unschedulablePods: newUnschedulablePods(metrics.NewUnschedulablePodsRecorder(), metrics.NewGatedPodsRecorder()),
inFlightPods: make(map[types.UID]*list.Element),
inFlightEvents: list.New(),
preEnqueuePluginMap: options.preEnqueuePluginMap,
queueingHintMap: options.queueingHintMap,
metricsRecorder: options.metricsRecorder,
pluginMetricsSamplePercent: options.pluginMetricsSamplePercent,
moveRequestCycle: -1,
isSchedulingQueueHintEnabled: utilfeature.DefaultFeatureGate.Enabled(features.SchedulerQueueingHints),
isSchedulingQueueHintEnabled: isSchedulingQueueHintEnabled,
}
pq.cond.L = &pq.activeQLock
pq.podBackoffQ = heap.NewWithRecorder(podInfoKeyFunc, pq.podsCompareBackoffCompleted, metrics.NewBackoffPodsRecorder())
pq.nsLister = informerFactory.Core().V1().Namespaces().Lister()
pq.nominator = newPodNominator(options.podLister, pq.nominatedPodsToInfo)
@ -601,11 +548,11 @@ func (p *PriorityQueue) moveToActiveQ(logger klog.Logger, pInfo *framework.Queue
gatedBefore := pInfo.Gated
pInfo.Gated = !p.runPreEnqueuePlugins(context.Background(), pInfo)
p.activeQLock.Lock()
defer p.activeQLock.Unlock()
p.activeQ.getLock().Lock()
defer p.activeQ.getLock().Unlock()
if pInfo.Gated {
// Add the Pod to unschedulablePods if it's not passing PreEnqueuePlugins.
if p.activeQ.Has(pInfo) {
if p.activeQ.unlocked().Has(pInfo) {
return false
}
if p.podBackoffQ.Has(pInfo) {
@ -619,7 +566,7 @@ func (p *PriorityQueue) moveToActiveQ(logger klog.Logger, pInfo *framework.Queue
pInfo.InitialAttemptTimestamp = &now
}
p.activeQ.AddOrUpdate(pInfo)
p.activeQ.unlocked().AddOrUpdate(pInfo)
p.unschedulablePods.delete(pInfo.Pod, gatedBefore)
_ = p.podBackoffQ.Delete(pInfo) // Don't need to react when pInfo is not found.
@ -640,7 +587,7 @@ func (p *PriorityQueue) Add(logger klog.Logger, pod *v1.Pod) {
pInfo := p.newQueuedPodInfo(pod)
if added := p.moveToActiveQ(logger, pInfo, framework.PodAdd); added {
p.cond.Broadcast()
p.activeQ.broadcast()
}
}
@ -657,16 +604,10 @@ func (p *PriorityQueue) Activate(logger klog.Logger, pods map[string]*v1.Pod) {
}
if activated {
p.cond.Broadcast()
p.activeQ.broadcast()
}
}
func (p *PriorityQueue) existsInActiveQ(pInfo *framework.QueuedPodInfo) bool {
p.activeQLock.RLock()
defer p.activeQLock.RUnlock()
return p.activeQ.Has(pInfo)
}
func (p *PriorityQueue) activate(logger klog.Logger, pod *v1.Pod) bool {
var pInfo *framework.QueuedPodInfo
// Verify if the pod is present in unschedulablePods or backoffQ.
@ -701,46 +642,13 @@ func (p *PriorityQueue) isPodBackingoff(podInfo *framework.QueuedPodInfo) bool {
// SchedulingCycle returns current scheduling cycle.
func (p *PriorityQueue) SchedulingCycle() int64 {
p.activeQLock.RLock()
defer p.activeQLock.RUnlock()
return p.schedulingCycle
}
// clusterEventsSinceElementUnlocked gets all cluster events that have happened during this inFlightPod is being scheduled.
// Note: this function assumes activeQLock to be locked by the caller.
func (p *PriorityQueue) clusterEventsSinceElementUnlocked(inFlightPod *list.Element) []*clusterEvent {
var events []*clusterEvent
for event := inFlightPod.Next(); event != nil; event = event.Next() {
e, ok := event.Value.(*clusterEvent)
if !ok {
// Must be another in-flight Pod (*v1.Pod). Can be ignored.
continue
}
events = append(events, e)
}
return events
}
func (p *PriorityQueue) clusterEventsForPod(logger klog.Logger, pInfo *framework.QueuedPodInfo) ([]*clusterEvent, error) {
p.activeQLock.RLock()
defer p.activeQLock.RUnlock()
logger.V(5).Info("Checking events for in-flight pod", "pod", klog.KObj(pInfo.Pod), "unschedulablePlugins", pInfo.UnschedulablePlugins, "inFlightEventsSize", p.inFlightEvents.Len(), "inFlightPodsSize", len(p.inFlightPods))
// AddUnschedulableIfNotPresent is called with the Pod at the end of scheduling or binding.
// So, given pInfo should have been Pop()ed before,
// we can assume pInfo must be recorded in inFlightPods and thus inFlightEvents.
inFlightPod, ok := p.inFlightPods[pInfo.Pod.UID]
if !ok {
return nil, fmt.Errorf("in flight Pod isn't found in the scheduling queue. If you see this error log, it's likely a bug in the scheduler")
}
return p.clusterEventsSinceElementUnlocked(inFlightPod), nil
return p.activeQ.schedulingCycle()
}
// determineSchedulingHintForInFlightPod looks at the unschedulable plugins of the given Pod
// and determines the scheduling hint for this Pod while checking the events that happened during in-flight.
func (p *PriorityQueue) determineSchedulingHintForInFlightPod(logger klog.Logger, pInfo *framework.QueuedPodInfo) queueingStrategy {
events, err := p.clusterEventsForPod(logger, pInfo)
events, err := p.activeQ.clusterEventsForPod(logger, pInfo)
if err != nil {
logger.Error(err, "Error getting cluster events for pod", "pod", klog.KObj(pInfo.Pod))
return queueAfterBackoff
@ -834,7 +742,7 @@ func (p *PriorityQueue) AddUnschedulableIfNotPresent(logger klog.Logger, pInfo *
return fmt.Errorf("Pod %v is already present in unschedulable queue", klog.KObj(pod))
}
if p.existsInActiveQ(pInfo) {
if p.activeQ.has(pInfo) {
return fmt.Errorf("Pod %v is already present in the active queue", klog.KObj(pod))
}
if p.podBackoffQ.Has(pInfo) {
@ -864,7 +772,7 @@ func (p *PriorityQueue) AddUnschedulableIfNotPresent(logger klog.Logger, pInfo *
logger.V(3).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pod), "event", framework.ScheduleAttemptFailure, "queue", queue, "schedulingCycle", podSchedulingCycle, "hint", schedulingHint, "unschedulable plugins", rejectorPlugins)
if queue == activeQ {
// When the Pod is moved to activeQ, need to let p.cond know so that the Pod will be pop()ed out.
p.cond.Broadcast()
p.activeQ.broadcast()
}
return nil
@ -895,7 +803,7 @@ func (p *PriorityQueue) flushBackoffQCompleted(logger klog.Logger) {
}
if activated {
p.cond.Broadcast()
p.activeQ.broadcast()
}
}
@ -925,81 +833,18 @@ func (p *PriorityQueue) flushUnschedulablePodsLeftover(logger klog.Logger) {
// Note: This method should NOT be locked by the p.lock at any moment,
// as it would lead to scheduling throughput degradation.
func (p *PriorityQueue) Pop(logger klog.Logger) (*framework.QueuedPodInfo, error) {
p.activeQLock.Lock()
defer p.activeQLock.Unlock()
for p.activeQ.Len() == 0 {
// When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
// When Close() is called, the p.closed is set and the condition is broadcast,
// which causes this loop to continue and return from the Pop().
if p.closed {
logger.V(2).Info("Scheduling queue is closed")
return nil, nil
}
p.cond.Wait()
}
pInfo, err := p.activeQ.Pop()
if err != nil {
return nil, err
}
pInfo.Attempts++
p.schedulingCycle++
// In flight, no concurrent events yet.
if p.isSchedulingQueueHintEnabled {
p.inFlightPods[pInfo.Pod.UID] = p.inFlightEvents.PushBack(pInfo.Pod)
}
// Update metrics and reset the set of unschedulable plugins for the next attempt.
for plugin := range pInfo.UnschedulablePlugins.Union(pInfo.PendingPlugins) {
metrics.UnschedulableReason(plugin, pInfo.Pod.Spec.SchedulerName).Dec()
}
pInfo.UnschedulablePlugins.Clear()
pInfo.PendingPlugins.Clear()
return pInfo, nil
return p.activeQ.pop(logger)
}
// Done must be called for pod returned by Pop. This allows the queue to
// keep track of which pods are currently being processed.
func (p *PriorityQueue) Done(pod types.UID) {
p.activeQLock.Lock()
defer p.activeQLock.Unlock()
p.done(pod)
}
func (p *PriorityQueue) done(pod types.UID) {
if !p.isSchedulingQueueHintEnabled {
// do nothing if schedulingQueueHint is disabled.
// In that case, we don't have inFlightPods and inFlightEvents.
return
}
inFlightPod, ok := p.inFlightPods[pod]
if !ok {
// This Pod is already done()ed.
return
}
delete(p.inFlightPods, pod)
// Remove the pod from the list.
p.inFlightEvents.Remove(inFlightPod)
// Remove events which are only referred to by this Pod
// so that the inFlightEvents list doesn't grow infinitely.
// If the pod was at the head of the list, then all
// events between it and the next pod are no longer needed
// and can be removed.
for {
e := p.inFlightEvents.Front()
if e == nil {
// Empty list.
break
}
if _, ok := e.Value.(*clusterEvent); !ok {
// A pod, must stop pruning.
break
}
p.inFlightEvents.Remove(e)
}
p.activeQ.done(pod)
}
// isPodUpdated checks if the pod is updated in a way that it may have become
@ -1022,12 +867,12 @@ func isPodUpdated(oldPod, newPod *v1.Pod) bool {
}
func (p *PriorityQueue) updateInActiveQueue(logger klog.Logger, oldPod, newPod *v1.Pod, oldPodInfo *framework.QueuedPodInfo) bool {
p.activeQLock.Lock()
defer p.activeQLock.Unlock()
if pInfo, exists := p.activeQ.Get(oldPodInfo); exists {
p.activeQ.getLock().Lock()
defer p.activeQ.getLock().Unlock()
if pInfo, exists := p.activeQ.unlocked().Get(oldPodInfo); exists {
_ = pInfo.Update(newPod)
p.UpdateNominatedPod(logger, oldPod, pInfo.PodInfo)
p.activeQ.AddOrUpdate(pInfo)
p.activeQ.unlocked().AddOrUpdate(pInfo)
return true
}
return false
@ -1042,25 +887,15 @@ func (p *PriorityQueue) Update(logger klog.Logger, oldPod, newPod *v1.Pod) {
defer p.lock.Unlock()
if p.isSchedulingQueueHintEnabled {
p.activeQLock.Lock()
// the inflight pod will be requeued using the latest version from the informer cache, which matches what the event delivers.
if _, ok := p.inFlightPods[newPod.UID]; ok {
// The inflight pod will be requeued using the latest version from the informer cache, which matches what the event delivers.
// Record this update as Pod/Update because
// this update may make the Pod schedulable in case it gets rejected and comes back to the queue.
// We can clean it up once we change updatePodInSchedulingQueue to call MoveAllToActiveOrBackoffQueue.
// See https://github.com/kubernetes/kubernetes/pull/125578#discussion_r1648338033 for more context.
if exists := p.activeQ.addEventIfPodInFlight(oldPod, newPod, framework.UnscheduledPodUpdate); exists {
logger.V(6).Info("The pod doesn't be queued for now because it's being scheduled and will be queued back if necessary", "pod", klog.KObj(newPod))
// Record this update as Pod/Update because
// this update may make the Pod schedulable in case it gets rejected and comes back to the queue.
// We can clean it up once we change updatePodInSchedulingQueue to call MoveAllToActiveOrBackoffQueue.
// See https://github.com/kubernetes/kubernetes/pull/125578#discussion_r1648338033 for more context.
p.inFlightEvents.PushBack(&clusterEvent{
event: framework.UnscheduledPodUpdate,
oldObj: oldPod,
newObj: newPod,
})
p.activeQLock.Unlock()
return
}
p.activeQLock.Unlock()
}
if oldPod != nil {
@ -1098,7 +933,7 @@ func (p *PriorityQueue) Update(logger klog.Logger, oldPod, newPod *v1.Pod) {
p.unschedulablePods.delete(pInfo.Pod, gated)
}
if queue == activeQ {
p.cond.Broadcast()
p.activeQ.broadcast()
break
}
}
@ -1113,7 +948,7 @@ func (p *PriorityQueue) Update(logger klog.Logger, oldPod, newPod *v1.Pod) {
}
if added := p.moveToActiveQ(logger, pInfo, framework.BackoffComplete); added {
p.cond.Broadcast()
p.activeQ.broadcast()
}
return
}
@ -1125,7 +960,7 @@ func (p *PriorityQueue) Update(logger klog.Logger, oldPod, newPod *v1.Pod) {
// If pod is not in any of the queues, we put it in the active queue.
pInfo := p.newQueuedPodInfo(newPod)
if added := p.moveToActiveQ(logger, pInfo, framework.PodUpdate); added {
p.cond.Broadcast()
p.activeQ.broadcast()
}
}
@ -1136,9 +971,9 @@ func (p *PriorityQueue) Delete(pod *v1.Pod) {
defer p.lock.Unlock()
p.DeleteNominatedPodIfExists(pod)
pInfo := newQueuedPodInfoForLookup(pod)
p.activeQLock.Lock()
defer p.activeQLock.Unlock()
if err := p.activeQ.Delete(pInfo); err != nil {
p.activeQ.getLock().Lock()
defer p.activeQ.getLock().Unlock()
if err := p.activeQ.unlocked().Delete(pInfo); err != nil {
// The item was probably not found in the activeQ.
p.podBackoffQ.Delete(pInfo)
if pInfo = p.unschedulablePods.get(pod); pInfo != nil {
@ -1227,7 +1062,7 @@ func (p *PriorityQueue) requeuePodViaQueueingHint(logger klog.Logger, pInfo *fra
return activeQ
}
if pInfo.Gated {
// In case the pod is gated, the Pod is pushed back to unschedulable Pods pool in addToActiveQ.
// In case the pod is gated, the Pod is pushed back to unschedulable Pods pool in moveToActiveQ.
return unschedulablePods
}
@ -1279,24 +1114,19 @@ func (p *PriorityQueue) movePodsToActiveOrBackoffQueue(logger klog.Logger, podIn
}
}
p.activeQLock.Lock()
defer p.activeQLock.Unlock()
p.moveRequestCycle = p.schedulingCycle
p.moveRequestCycle = p.activeQ.schedulingCycle()
if p.isSchedulingQueueHintEnabled && len(p.inFlightPods) != 0 {
logger.V(5).Info("Event received while pods are in flight", "event", event.Label, "numPods", len(p.inFlightPods))
if p.isSchedulingQueueHintEnabled {
// AddUnschedulableIfNotPresent might get called for in-flight Pods later, and in
// AddUnschedulableIfNotPresent we need to know whether events were
// observed while scheduling them.
p.inFlightEvents.PushBack(&clusterEvent{
event: event,
oldObj: oldObj,
newObj: newObj,
})
if added := p.activeQ.addEventIfAnyInFlight(oldObj, newObj, event); added {
logger.V(5).Info("Event received while pods are in flight", "event", event.Label)
}
}
if activated {
p.cond.Broadcast()
p.activeQ.broadcast()
}
}
@ -1328,13 +1158,7 @@ func (p *PriorityQueue) getUnschedulablePodsWithCrossTopologyTerm(logger klog.Lo
// PodsInActiveQ returns all the Pods in the activeQ.
func (p *PriorityQueue) PodsInActiveQ() []*v1.Pod {
p.activeQLock.RLock()
defer p.activeQLock.RUnlock()
var result []*v1.Pod
for _, pInfo := range p.activeQ.List() {
result = append(result, pInfo.Pod)
}
return result
return p.activeQ.list()
}
var pendingPodsSummary = "activeQ:%v; backoffQ:%v; unschedulablePods:%v"
@ -1345,7 +1169,7 @@ var pendingPodsSummary = "activeQ:%v; backoffQ:%v; unschedulablePods:%v"
func (p *PriorityQueue) PendingPods() ([]*v1.Pod, string) {
p.lock.RLock()
defer p.lock.RUnlock()
result := p.PodsInActiveQ()
result := p.activeQ.list()
activeQLen := len(result)
for _, pInfo := range p.podBackoffQ.List() {
result = append(result, pInfo.Pod)
@ -1356,12 +1180,12 @@ func (p *PriorityQueue) PendingPods() ([]*v1.Pod, string) {
return result, fmt.Sprintf(pendingPodsSummary, activeQLen, p.podBackoffQ.Len(), len(p.unschedulablePods.podInfoMap))
}
// Note: this function assumes the caller locks p.lock.RLock.
// Note: this function assumes the caller locks both p.lock.RLock and p.activeQ.getLock().RLock.
func (p *PriorityQueue) nominatedPodToInfo(np PodRef) *framework.PodInfo {
pod := np.ToPod()
pInfoLookup := newQueuedPodInfoForLookup(pod)
queuedPodInfo, exists := p.activeQ.Get(pInfoLookup)
queuedPodInfo, exists := p.activeQ.unlocked().Get(pInfoLookup)
if exists {
return queuedPodInfo.PodInfo
}
@ -1382,8 +1206,8 @@ func (p *PriorityQueue) nominatedPodToInfo(np PodRef) *framework.PodInfo {
func (p *PriorityQueue) nominatedPodsToInfo(nominatedPods []PodRef) []*framework.PodInfo {
p.lock.RLock()
defer p.lock.RUnlock()
p.activeQLock.RLock()
defer p.activeQLock.RUnlock()
p.activeQ.getLock().RLock()
defer p.activeQ.getLock().RUnlock()
pods := make([]*framework.PodInfo, len(nominatedPods))
for i, np := range nominatedPods {
pods[i] = p.nominatedPodToInfo(np).DeepCopy()
@ -1396,11 +1220,8 @@ func (p *PriorityQueue) Close() {
p.lock.Lock()
defer p.lock.Unlock()
close(p.stop)
p.activeQLock.Lock()
// closed field is locked by activeQLock as it is checked in Pop() without p.lock set.
p.closed = true
p.activeQLock.Unlock()
p.cond.Broadcast()
p.activeQ.close()
p.activeQ.broadcast()
}
// DeleteNominatedPodIfExists deletes <pod> from nominatedPods.
@ -1571,10 +1392,10 @@ func (np PodRef) ToPod() *v1.Pod {
// by their UID and update/delete them.
type nominator struct {
// nLock synchronizes all operations related to nominator.
// Caution: DO NOT take ("SchedulingQueue.lock" or "SchedulingQueue.activeQLock") after taking "nLock".
// You should always take "SchedulingQueue.lock" and "SchedulingQueue.activeQLock" first,
// Caution: DO NOT take ("SchedulingQueue.lock" or "activeQueue.lock") after taking "nLock".
// You should always take "SchedulingQueue.lock" and "activeQueue.lock" first,
// otherwise the nominator could end up in deadlock.
// Correct locking order is: SchedulingQueue.lock > SchedulingQueue.activeQLock > nLock.
// Correct locking order is: SchedulingQueue.lock > activeQueue.lock > nLock.
nLock sync.RWMutex
// podLister is used to verify if the given pod is alive.

View File

@ -17,7 +17,6 @@ limitations under the License.
package queue
import (
"container/list"
"context"
"fmt"
"math"
@ -172,14 +171,6 @@ func TestPriorityQueue_AddWithReversePriorityLessFunc(t *testing.T) {
}
}
func listToValues(l *list.List) []interface{} {
var values []interface{}
for e := l.Front(); e != nil; e = e.Next() {
values = append(values, e.Value)
}
return values
}
func Test_InFlightPods(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
pod := st.MakePod().Name("targetpod").UID("pod1").Obj()
@ -715,8 +706,8 @@ func Test_InFlightPods(t *testing.T) {
}
actualInFlightPods := make(map[types.UID]*v1.Pod)
for uid, element := range q.inFlightPods {
actualInFlightPods[uid] = element.Value.(*v1.Pod)
for _, pod := range q.activeQ.listInFlightPods() {
actualInFlightPods[pod.UID] = pod
}
wantInFlightPods := make(map[types.UID]*v1.Pod)
for _, pod := range test.wantInFlightPods {
@ -733,35 +724,36 @@ func Test_InFlightPods(t *testing.T) {
}
wantInFlightEvents = append(wantInFlightEvents, value)
}
if diff := cmp.Diff(wantInFlightEvents, listToValues(q.inFlightEvents), cmp.AllowUnexported(clusterEvent{})); diff != "" {
if diff := cmp.Diff(wantInFlightEvents, q.activeQ.listInFlightEvents(), cmp.AllowUnexported(clusterEvent{})); diff != "" {
t.Errorf("Unexpected diff in inFlightEvents (-want, +got):\n%s", diff)
}
if test.wantActiveQPodNames != nil {
podInfos := q.activeQ.List()
if len(podInfos) != len(test.wantActiveQPodNames) {
diff := cmp.Diff(test.wantActiveQPodNames, podInfos, cmpopts.SortSlices(func(a, b interface{}) bool {
return a.(framework.PodInfo).Pod.Name < b.(framework.PodInfo).Pod.Name
}))
t.Fatalf("Length of activeQ is not expected. Got %v, want %v.\n%s", len(podInfos), len(test.wantActiveQPodNames), diff)
pods := q.activeQ.list()
var podNames []string
for _, pod := range pods {
podNames = append(podNames, pod.Name)
}
if diff := cmp.Diff(test.wantActiveQPodNames, podNames); diff != "" {
t.Fatalf("Unexpected diff of activeQ pod names (-want, +got):\n%s", diff)
}
wantPodNames := sets.New(test.wantActiveQPodNames...)
for _, podInfo := range podInfos {
podGotFromActiveQ := podInfo.Pod
if !wantPodNames.Has(podGotFromActiveQ.Name) {
t.Fatalf("Pod %v was not expected to be in the activeQ.", podGotFromActiveQ.Name)
for _, pod := range pods {
if !wantPodNames.Has(pod.Name) {
t.Fatalf("Pod %v was not expected to be in the activeQ.", pod.Name)
}
}
}
if test.wantBackoffQPodNames != nil {
podInfos := q.podBackoffQ.List()
if len(podInfos) != len(test.wantBackoffQPodNames) {
diff := cmp.Diff(test.wantBackoffQPodNames, podInfos, cmpopts.SortSlices(func(a, b interface{}) bool {
return a.(framework.PodInfo).Pod.Name < b.(framework.PodInfo).Pod.Name
}))
t.Fatalf("Length of backoffQ is not expected. Got %v, want %v.\n%s", len(podInfos), len(test.wantBackoffQPodNames), diff)
var podNames []string
for _, pInfo := range podInfos {
podNames = append(podNames, pInfo.Pod.Name)
}
if diff := cmp.Diff(test.wantBackoffQPodNames, podNames); diff != "" {
t.Fatalf("Unexpected diff of backoffQ pod names (-want, +got):\n%s", diff)
}
wantPodNames := sets.New(test.wantBackoffQPodNames...)
@ -1073,7 +1065,7 @@ func TestPriorityQueue_Update(t *testing.T) {
prepareFunc: func(t *testing.T, logger klog.Logger, q *PriorityQueue) (oldPod, newPod *v1.Pod) {
podInfo := q.newQueuedPodInfo(medPriorityPodInfo.Pod)
// We need to once add this Pod to activeQ and Pop() it so that this Pod is registered correctly in inFlightPods.
q.activeQ.AddOrUpdate(podInfo)
q.activeQ.unlocked().AddOrUpdate(podInfo)
if p, err := q.Pop(logger); err != nil || p.Pod != medPriorityPodInfo.Pod {
t.Errorf("Expected: %v after Pop, but got: %v", medPriorityPodInfo.Pod.Name, p.Pod.Name)
}
@ -1109,7 +1101,7 @@ func TestPriorityQueue_Update(t *testing.T) {
pInfo = pInfoFromBackoff
}
if pInfoFromActive, exists := q.activeQ.Get(newQueuedPodInfoForLookup(newPod)); exists {
if pInfoFromActive, exists := q.activeQ.unlocked().Get(newQueuedPodInfoForLookup(newPod)); exists {
if tt.wantQ != activeQ {
t.Errorf("expected pod %s not to be queued to activeQ, but it was", newPod.Name)
}
@ -1204,10 +1196,10 @@ func TestPriorityQueue_Delete(t *testing.T) {
q.Update(logger, highPriorityPodInfo.Pod, highPriNominatedPodInfo.Pod)
q.Add(logger, unschedulablePodInfo.Pod)
q.Delete(highPriNominatedPodInfo.Pod)
if !q.activeQ.Has(newQueuedPodInfoForLookup(unschedulablePodInfo.Pod)) {
if !q.activeQ.unlocked().Has(newQueuedPodInfoForLookup(unschedulablePodInfo.Pod)) {
t.Errorf("Expected %v to be in activeQ.", unschedulablePodInfo.Pod.Name)
}
if q.activeQ.Has(newQueuedPodInfoForLookup(highPriNominatedPodInfo.Pod)) {
if q.activeQ.unlocked().Has(newQueuedPodInfoForLookup(highPriNominatedPodInfo.Pod)) {
t.Errorf("Didn't expect %v to be in activeQ.", highPriorityPodInfo.Pod.Name)
}
if len(q.nominator.nominatedPods) != 1 {
@ -1263,7 +1255,7 @@ func TestPriorityQueue_Activate(t *testing.T) {
// Prepare activeQ/unschedulablePods/podBackoffQ according to the table
for _, qPodInfo := range tt.qPodInfoInActiveQ {
q.activeQ.AddOrUpdate(qPodInfo)
q.activeQ.unlocked().AddOrUpdate(qPodInfo)
}
for _, qPodInfo := range tt.qPodInfoInUnschedulablePods {
@ -1278,13 +1270,13 @@ func TestPriorityQueue_Activate(t *testing.T) {
q.Activate(logger, map[string]*v1.Pod{"test_pod": tt.qPodInfoToActivate.PodInfo.Pod})
// Check the result after activation by the length of activeQ
if wantLen := len(tt.want); q.activeQ.Len() != wantLen {
t.Errorf("length compare: want %v, got %v", wantLen, q.activeQ.Len())
if wantLen := len(tt.want); q.activeQ.len() != wantLen {
t.Errorf("length compare: want %v, got %v", wantLen, q.activeQ.len())
}
// Check if the specific pod exists in activeQ
for _, want := range tt.want {
if !q.activeQ.Has(newQueuedPodInfoForLookup(want.PodInfo.Pod)) {
if !q.activeQ.unlocked().Has(newQueuedPodInfoForLookup(want.PodInfo.Pod)) {
t.Errorf("podInfo not exist in activeQ: want %v", want.PodInfo.Pod.Name)
}
}
@ -1570,7 +1562,7 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueueWithQueueingHint(t *testing.
}
cl := testingclock.NewFakeClock(now)
q := NewTestQueue(ctx, newDefaultQueueSort(), WithQueueingHintMapPerProfile(m), WithClock(cl))
q.activeQ.AddOrUpdate(q.newQueuedPodInfo(test.podInfo.Pod))
q.activeQ.unlocked().AddOrUpdate(q.newQueuedPodInfo(test.podInfo.Pod))
if p, err := q.Pop(logger); err != nil || p.Pod != test.podInfo.Pod {
t.Errorf("Expected: %v after Pop, but got: %v", test.podInfo.Pod.Name, p.Pod.Name)
}
@ -1587,7 +1579,7 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueueWithQueueingHint(t *testing.
t.Fatalf("expected pod to be queued to backoffQ, but it was not")
}
if q.activeQ.Len() == 0 && test.expectedQ == activeQ {
if q.activeQ.len() == 0 && test.expectedQ == activeQ {
t.Fatalf("expected pod to be queued to activeQ, but it was not")
}
@ -1614,12 +1606,12 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueue(t *testing.T) {
}
q := NewTestQueue(ctx, newDefaultQueueSort(), WithClock(c), WithQueueingHintMapPerProfile(m))
// To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before AddUnschedulableIfNotPresent()s below.
q.activeQ.AddOrUpdate(q.newQueuedPodInfo(unschedulablePodInfo.Pod))
q.activeQ.unlocked().AddOrUpdate(q.newQueuedPodInfo(unschedulablePodInfo.Pod))
if p, err := q.Pop(logger); err != nil || p.Pod != unschedulablePodInfo.Pod {
t.Errorf("Expected: %v after Pop, but got: %v", unschedulablePodInfo.Pod.Name, p.Pod.Name)
}
expectInFlightPods(t, q, unschedulablePodInfo.Pod.UID)
q.activeQ.AddOrUpdate(q.newQueuedPodInfo(highPriorityPodInfo.Pod))
q.activeQ.unlocked().AddOrUpdate(q.newQueuedPodInfo(highPriorityPodInfo.Pod))
if p, err := q.Pop(logger); err != nil || p.Pod != highPriorityPodInfo.Pod {
t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPodInfo.Pod.Name, p.Pod.Name)
}
@ -1635,7 +1627,7 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueue(t *testing.T) {
expectInFlightPods(t, q)
// Construct a Pod, but don't associate its scheduler failure to any plugin
hpp1 := clonePod(highPriorityPodInfo.Pod, "hpp1")
q.activeQ.AddOrUpdate(q.newQueuedPodInfo(hpp1))
q.activeQ.unlocked().AddOrUpdate(q.newQueuedPodInfo(hpp1))
if p, err := q.Pop(logger); err != nil || p.Pod != hpp1 {
t.Errorf("Expected: %v after Pop, but got: %v", hpp1, p.Pod.Name)
}
@ -1648,7 +1640,7 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueue(t *testing.T) {
expectInFlightPods(t, q)
// Construct another Pod, and associate its scheduler failure to plugin "barPlugin".
hpp2 := clonePod(highPriorityPodInfo.Pod, "hpp2")
q.activeQ.AddOrUpdate(q.newQueuedPodInfo(hpp2))
q.activeQ.unlocked().AddOrUpdate(q.newQueuedPodInfo(hpp2))
if p, err := q.Pop(logger); err != nil || p.Pod != hpp2 {
t.Errorf("Expected: %v after Pop, but got: %v", hpp2, p.Pod.Name)
}
@ -1663,8 +1655,8 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueue(t *testing.T) {
// because of the queueing hint function registered for NodeAdd/fooPlugin.
q.MoveAllToActiveOrBackoffQueue(logger, framework.NodeAdd, nil, nil, nil)
q.Add(logger, medPriorityPodInfo.Pod)
if q.activeQ.Len() != 1 {
t.Errorf("Expected 1 item to be in activeQ, but got: %v", q.activeQ.Len())
if q.activeQ.len() != 1 {
t.Errorf("Expected 1 item to be in activeQ, but got: %v", q.activeQ.len())
}
// Pop out the medPriorityPodInfo in activeQ.
if p, err := q.Pop(logger); err != nil || p.Pod != medPriorityPodInfo.Pod {
@ -1683,18 +1675,17 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueue(t *testing.T) {
}
expectInFlightPods(t, q, medPriorityPodInfo.Pod.UID)
q.schedulingCycle++
q.activeQ.AddOrUpdate(q.newQueuedPodInfo(unschedulablePodInfo.Pod))
q.activeQ.unlocked().AddOrUpdate(q.newQueuedPodInfo(unschedulablePodInfo.Pod))
if p, err := q.Pop(logger); err != nil || p.Pod != unschedulablePodInfo.Pod {
t.Errorf("Expected: %v after Pop, but got: %v", unschedulablePodInfo.Pod.Name, p.Pod.Name)
}
expectInFlightPods(t, q, medPriorityPodInfo.Pod.UID, unschedulablePodInfo.Pod.UID)
q.activeQ.AddOrUpdate(q.newQueuedPodInfo(highPriorityPodInfo.Pod))
q.activeQ.unlocked().AddOrUpdate(q.newQueuedPodInfo(highPriorityPodInfo.Pod))
if p, err := q.Pop(logger); err != nil || p.Pod != highPriorityPodInfo.Pod {
t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPodInfo.Pod.Name, p.Pod.Name)
}
expectInFlightPods(t, q, medPriorityPodInfo.Pod.UID, unschedulablePodInfo.Pod.UID, highPriorityPodInfo.Pod.UID)
q.activeQ.AddOrUpdate(q.newQueuedPodInfo(hpp1))
q.activeQ.unlocked().AddOrUpdate(q.newQueuedPodInfo(hpp1))
if p, err := q.Pop(logger); err != nil || p.Pod != hpp1 {
t.Errorf("Expected: %v after Pop, but got: %v", hpp1, p.Pod.Name)
}
@ -1734,8 +1725,8 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueue(t *testing.T) {
c.Step(q.podInitialBackoffDuration)
q.flushBackoffQCompleted(logger) // flush the completed backoffQ to move hpp1 to activeQ.
q.MoveAllToActiveOrBackoffQueue(logger, framework.NodeAdd, nil, nil, nil)
if q.activeQ.Len() != 4 {
t.Errorf("Expected 4 items to be in activeQ, but got: %v", q.activeQ.Len())
if q.activeQ.len() != 4 {
t.Errorf("Expected 4 items to be in activeQ, but got: %v", q.activeQ.len())
}
if q.podBackoffQ.Len() != 0 {
t.Errorf("Expected 0 item to be in podBackoffQ, but got: %v", q.podBackoffQ.Len())
@ -1789,8 +1780,8 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueueWithOutQueueingHint(t *testi
// This NodeAdd event moves unschedulablePodInfo and highPriorityPodInfo to the backoffQ,
// because of the queueing hint function registered for NodeAdd/fooPlugin.
q.MoveAllToActiveOrBackoffQueue(logger, framework.NodeAdd, nil, nil, nil)
if q.activeQ.Len() != 1 {
t.Errorf("Expected 1 item to be in activeQ, but got: %v", q.activeQ.Len())
if q.activeQ.len() != 1 {
t.Errorf("Expected 1 item to be in activeQ, but got: %v", q.activeQ.len())
}
// Pop out the medPriorityPodInfo in activeQ.
if p, err := q.Pop(logger); err != nil || p.Pod != medPriorityPodInfo.Pod {
@ -1810,7 +1801,6 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueueWithOutQueueingHint(t *testi
}
}
q.schedulingCycle++
unschedulableQueuedPodInfo := q.newQueuedPodInfo(unschedulablePodInfo.Pod, "fooPlugin")
highPriorityQueuedPodInfo := q.newQueuedPodInfo(highPriorityPodInfo.Pod, "fooPlugin")
hpp1QueuedPodInfo := q.newQueuedPodInfo(hpp1)
@ -1841,8 +1831,8 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueueWithOutQueueingHint(t *testi
c.Step(q.podInitialBackoffDuration)
q.flushBackoffQCompleted(logger) // flush the completed backoffQ to move hpp1 to activeQ.
q.MoveAllToActiveOrBackoffQueue(logger, framework.NodeAdd, nil, nil, nil)
if q.activeQ.Len() != 4 {
t.Errorf("Expected 4 items to be in activeQ, but got: %v", q.activeQ.Len())
if q.activeQ.len() != 4 {
t.Errorf("Expected 4 items to be in activeQ, but got: %v", q.activeQ.len())
}
if q.podBackoffQ.Len() != 0 {
t.Errorf("Expected 0 item to be in podBackoffQ, but got: %v", q.podBackoffQ.Len())
@ -1863,16 +1853,17 @@ func clonePod(pod *v1.Pod, newName string) *v1.Pod {
func expectInFlightPods(t *testing.T, q *PriorityQueue, uids ...types.UID) {
t.Helper()
var actualUIDs []types.UID
for uid := range q.inFlightPods {
actualUIDs = append(actualUIDs, uid)
for _, pod := range q.activeQ.listInFlightPods() {
actualUIDs = append(actualUIDs, pod.UID)
}
sortUIDs := cmpopts.SortSlices(func(a, b types.UID) bool { return a < b })
if diff := cmp.Diff(uids, actualUIDs, sortUIDs); diff != "" {
t.Fatalf("Unexpected content of inFlightPods (-want, +have):\n%s", diff)
}
actualUIDs = nil
for e := q.inFlightEvents.Front(); e != nil; e = e.Next() {
if pod, ok := e.Value.(*v1.Pod); ok {
events := q.activeQ.listInFlightEvents()
for _, e := range events {
if pod, ok := e.(*v1.Pod); ok {
actualUIDs = append(actualUIDs, pod.UID)
}
}
@ -1954,7 +1945,7 @@ func TestPriorityQueue_AssignedPodAdded_(t *testing.T) {
q := NewTestQueue(ctx, newDefaultQueueSort(), WithClock(c), WithQueueingHintMapPerProfile(m))
// To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before AddUnschedulableIfNotPresent()s below.
q.activeQ.AddOrUpdate(q.newQueuedPodInfo(tt.unschedPod))
q.activeQ.unlocked().AddOrUpdate(q.newQueuedPodInfo(tt.unschedPod))
if p, err := q.Pop(logger); err != nil || p.Pod != tt.unschedPod {
t.Errorf("Expected: %v after Pop, but got: %v", tt.unschedPod.Name, p.Pod.Name)
}
@ -1969,7 +1960,7 @@ func TestPriorityQueue_AssignedPodAdded_(t *testing.T) {
q.AssignedPodAdded(logger, tt.updatedAssignedPod)
if q.activeQ.Has(newQueuedPodInfoForLookup(tt.unschedPod)) != tt.wantToRequeue {
if q.activeQ.unlocked().Has(newQueuedPodInfoForLookup(tt.unschedPod)) != tt.wantToRequeue {
t.Fatalf("unexpected Pod move: Pod should be requeued: %v. Pod is actually requeued: %v", tt.wantToRequeue, !tt.wantToRequeue)
}
})
@ -2069,11 +2060,11 @@ func TestPriorityQueue_PendingPods(t *testing.T) {
defer cancel()
q := NewTestQueue(ctx, newDefaultQueueSort())
// To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before AddUnschedulableIfNotPresent()s below.
q.activeQ.AddOrUpdate(q.newQueuedPodInfo(unschedulablePodInfo.Pod))
q.activeQ.unlocked().AddOrUpdate(q.newQueuedPodInfo(unschedulablePodInfo.Pod))
if p, err := q.Pop(logger); err != nil || p.Pod != unschedulablePodInfo.Pod {
t.Errorf("Expected: %v after Pop, but got: %v", unschedulablePodInfo.Pod.Name, p.Pod.Name)
}
q.activeQ.AddOrUpdate(q.newQueuedPodInfo(highPriorityPodInfo.Pod))
q.activeQ.unlocked().AddOrUpdate(q.newQueuedPodInfo(highPriorityPodInfo.Pod))
if p, err := q.Pop(logger); err != nil || p.Pod != highPriorityPodInfo.Pod {
t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPodInfo.Pod.Name, p.Pod.Name)
}
@ -2419,7 +2410,7 @@ func TestPodFailedSchedulingMultipleTimesDoesNotBlockNewerPod(t *testing.T) {
})
// To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before AddUnschedulableIfNotPresent() below.
q.activeQ.AddOrUpdate(q.newQueuedPodInfo(unschedulablePod))
q.activeQ.unlocked().AddOrUpdate(q.newQueuedPodInfo(unschedulablePod))
if p, err := q.Pop(logger); err != nil || p.Pod != unschedulablePod {
t.Errorf("Expected: %v after Pop, but got: %v", unschedulablePod.Name, p.Pod.Name)
}
@ -2557,11 +2548,11 @@ func TestHighPriorityFlushUnschedulablePodsLeftover(t *testing.T) {
})
// To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before AddUnschedulableIfNotPresent()s below.
q.activeQ.AddOrUpdate(q.newQueuedPodInfo(highPod))
q.activeQ.unlocked().AddOrUpdate(q.newQueuedPodInfo(highPod))
if p, err := q.Pop(logger); err != nil || p.Pod != highPod {
t.Errorf("Expected: %v after Pop, but got: %v", highPod.Name, p.Pod.Name)
}
q.activeQ.AddOrUpdate(q.newQueuedPodInfo(midPod))
q.activeQ.unlocked().AddOrUpdate(q.newQueuedPodInfo(midPod))
if p, err := q.Pop(logger); err != nil || p.Pod != midPod {
t.Errorf("Expected: %v after Pop, but got: %v", midPod.Name, p.Pod.Name)
}
@ -2610,7 +2601,7 @@ func TestPriorityQueue_initPodMaxInUnschedulablePodsDuration(t *testing.T) {
operations: []operation{
addPodUnschedulablePods,
addPodUnschedulablePods,
flushUnschedulerQ,
flushUnscheduledQ,
},
operands: []*framework.QueuedPodInfo{pInfo1, pInfo2, nil},
expected: []*framework.QueuedPodInfo{pInfo2, pInfo1},
@ -2621,7 +2612,7 @@ func TestPriorityQueue_initPodMaxInUnschedulablePodsDuration(t *testing.T) {
operations: []operation{
addPodUnschedulablePods,
addPodUnschedulablePods,
flushUnschedulerQ,
flushUnscheduledQ,
},
operands: []*framework.QueuedPodInfo{pInfo1, pInfo2, nil},
expected: []*framework.QueuedPodInfo{pInfo2, pInfo1},
@ -2650,15 +2641,17 @@ func TestPriorityQueue_initPodMaxInUnschedulablePodsDuration(t *testing.T) {
}
expectedLen := len(test.expected)
if queue.activeQ.Len() != expectedLen {
t.Fatalf("Expected %v items to be in activeQ, but got: %v", expectedLen, queue.activeQ.Len())
if queue.activeQ.len() != expectedLen {
t.Fatalf("Expected %v items to be in activeQ, but got: %v", expectedLen, queue.activeQ.len())
}
for i := 0; i < expectedLen; i++ {
if pInfo, err := queue.activeQ.Pop(); err != nil {
if pInfo, err := queue.activeQ.pop(logger); err != nil {
t.Errorf("Error while popping the head of the queue: %v", err)
} else {
podInfoList = append(podInfoList, pInfo)
// Cleanup attempts counter incremented in activeQ.pop()
pInfo.Attempts = 0
}
}
@ -2679,7 +2672,7 @@ var (
// To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before AddUnschedulableIfNotPresent() below.
// UnschedulablePlugins will get cleared by Pop, so make a copy first.
unschedulablePlugins := pInfo.UnschedulablePlugins.Clone()
queue.activeQ.AddOrUpdate(queue.newQueuedPodInfo(pInfo.Pod))
queue.activeQ.unlocked().AddOrUpdate(queue.newQueuedPodInfo(pInfo.Pod))
p, err := queue.Pop(logger)
if err != nil {
t.Fatalf("Unexpected error during Pop: %v", err)
@ -2695,7 +2688,7 @@ var (
}
popAndRequeueAsBackoff = func(t *testing.T, logger klog.Logger, queue *PriorityQueue, pInfo *framework.QueuedPodInfo) {
// To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before AddUnschedulableIfNotPresent() below.
queue.activeQ.AddOrUpdate(queue.newQueuedPodInfo(pInfo.Pod))
queue.activeQ.unlocked().AddOrUpdate(queue.newQueuedPodInfo(pInfo.Pod))
p, err := queue.Pop(logger)
if err != nil {
t.Fatalf("Unexpected error during Pop: %v", err)
@ -2709,7 +2702,7 @@ var (
}
}
addPodActiveQ = func(t *testing.T, logger klog.Logger, queue *PriorityQueue, pInfo *framework.QueuedPodInfo) {
queue.activeQ.AddOrUpdate(pInfo)
queue.activeQ.unlocked().AddOrUpdate(pInfo)
}
addPodUnschedulablePods = func(t *testing.T, logger klog.Logger, queue *PriorityQueue, pInfo *framework.QueuedPodInfo) {
if !pInfo.Gated {
@ -2744,7 +2737,7 @@ var (
moveClockForward = func(t *testing.T, logger klog.Logger, queue *PriorityQueue, _ *framework.QueuedPodInfo) {
queue.clock.(*testingclock.FakeClock).Step(2 * time.Second)
}
flushUnschedulerQ = func(t *testing.T, logger klog.Logger, queue *PriorityQueue, _ *framework.QueuedPodInfo) {
flushUnscheduledQ = func(t *testing.T, logger klog.Logger, queue *PriorityQueue, _ *framework.QueuedPodInfo) {
queue.clock.(*testingclock.FakeClock).Step(queue.podMaxInUnschedulablePodsDuration)
queue.flushUnschedulablePodsLeftover(logger)
}
@ -2817,15 +2810,17 @@ func TestPodTimestamp(t *testing.T) {
}
expectedLen := len(test.expected)
if queue.activeQ.Len() != expectedLen {
t.Fatalf("Expected %v items to be in activeQ, but got: %v", expectedLen, queue.activeQ.Len())
if queue.activeQ.len() != expectedLen {
t.Fatalf("Expected %v items to be in activeQ, but got: %v", expectedLen, queue.activeQ.len())
}
for i := 0; i < expectedLen; i++ {
if pInfo, err := queue.activeQ.Pop(); err != nil {
if pInfo, err := queue.activeQ.pop(logger); err != nil {
t.Errorf("Error while popping the head of the queue: %v", err)
} else {
podInfoList = append(podInfoList, pInfo)
// Cleanup attempts counter incremented in activeQ.pop()
pInfo.Attempts = 0
}
}
@ -3453,12 +3448,12 @@ func TestMoveAllToActiveOrBackoffQueue_PreEnqueueChecks(t *testing.T) {
q := NewTestQueue(ctx, newDefaultQueueSort())
for i, podInfo := range tt.podInfos {
// To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before AddUnschedulableIfNotPresent() below.
q.activeQ.AddOrUpdate(q.newQueuedPodInfo(podInfo.Pod))
q.activeQ.unlocked().AddOrUpdate(q.newQueuedPodInfo(podInfo.Pod))
if p, err := q.Pop(logger); err != nil || p.Pod != podInfo.Pod {
t.Errorf("Expected: %v after Pop, but got: %v", podInfo.Pod.Name, p.Pod.Name)
}
podInfo.UnschedulablePlugins = sets.New("plugin")
err := q.AddUnschedulableIfNotPresent(logger, podInfo, q.schedulingCycle)
err := q.AddUnschedulableIfNotPresent(logger, podInfo, q.activeQ.schedulingCycle())
if err != nil {
t.Fatalf("unexpected error from AddUnschedulableIfNotPresent: %v", err)
}