Add activeQLock to scheduling queue to improve Pop() throughput

This commit is contained in:
Maciej Skoczeń 2024-06-25 09:47:59 +00:00
parent 7127246344
commit 31e89b1f4d
2 changed files with 125 additions and 69 deletions

View File

@ -163,8 +163,17 @@ type PriorityQueue struct {
// the maximum time a pod can stay in the unschedulablePods.
podMaxInUnschedulablePodsDuration time.Duration
// cond is a condition that is notified when the pod is added to activeQ.
// It is used with activeQLock.
cond sync.Cond
// activeQLock synchronizes all operations related to activeQ.
// It protects activeQ, inFlightPods, inFlightEvents, schedulingCycle and closed fields.
// Caution: DO NOT take nominator.lock after taking activeQLock,
// you should take nominator.lock first if you need two locks,
// otherwise the queue could end up deadlock.
activeQLock sync.RWMutex
// inFlightPods holds the UID of all pods which have been popped out for which Done
// hasn't been called yet - in other words, all pods that are currently being
// processed (being scheduled, in permit, or in the binding cycle).
@ -172,6 +181,8 @@ type PriorityQueue struct {
// The values in the map are the entry of each pod in the inFlightEvents list.
// The value of that entry is the *v1.Pod at the time that scheduling of that
// pod started, which can be useful for logging or debugging.
//
// It should be protected by activeQLock.
inFlightPods map[types.UID]*list.Element
// inFlightEvents holds the events received by the scheduling queue
@ -187,10 +198,12 @@ type PriorityQueue struct {
// After removal of a pod, events at the start of the list are no
// longer needed because all of the other in-flight pods started
// later. Those events can be removed.
//
// It should be protected by activeQLock.
inFlightEvents *list.List
// activeQ is heap structure that scheduler actively looks at to find pods to
// schedule. Head of heap is the highest priority pod.
// schedule. Head of heap is the highest priority pod. It should be protected by activeQLock.
activeQ *heap.Heap
// podBackoffQ is a heap ordered by backoff expiry. Pods which have completed backoff
// are popped from this heap before the scheduler looks at activeQ
@ -199,6 +212,7 @@ type PriorityQueue struct {
unschedulablePods *UnschedulablePods
// schedulingCycle represents sequence number of scheduling cycle and is incremented
// when a pod is popped.
// It should be protected by activeQLock.
schedulingCycle int64
// moveRequestCycle caches the sequence number of scheduling cycle when we
// received a move request. Unschedulable pods in and before this scheduling
@ -214,6 +228,7 @@ type PriorityQueue struct {
// closed indicates that the queue is closed.
// It is mainly used to let Pop() exit its control loop while waiting for an item.
// It should be protected by activeQLock.
closed bool
nsLister listersv1.NamespaceLister
@ -383,7 +398,7 @@ func NewPriorityQueue(
moveRequestCycle: -1,
isSchedulingQueueHintEnabled: utilfeature.DefaultFeatureGate.Enabled(features.SchedulerQueueingHints),
}
pq.cond.L = &pq.lock
pq.cond.L = &pq.activeQLock
pq.podBackoffQ = heap.NewWithRecorder(podInfoKeyFunc, pq.podsCompareBackoffCompleted, metrics.NewBackoffPodsRecorder())
pq.nsLister = informerFactory.Core().V1().Namespaces().Lister()
@ -555,13 +570,24 @@ func (p *PriorityQueue) runPreEnqueuePlugin(ctx context.Context, pl framework.Pr
return s
}
// addToActiveQ tries to add pod to active queue. It returns 2 parameters:
// moveToActiveQ tries to add pod to active queue and remove it from unschedulable and backoff queues.
// It returns 2 parameters:
// 1. a boolean flag to indicate whether the pod is added successfully.
// 2. an error for the caller to act on.
func (p *PriorityQueue) addToActiveQ(logger klog.Logger, pInfo *framework.QueuedPodInfo) (bool, error) {
func (p *PriorityQueue) moveToActiveQ(logger klog.Logger, pInfo *framework.QueuedPodInfo, event string) (bool, error) {
gatedBefore := pInfo.Gated
pInfo.Gated = !p.runPreEnqueuePlugins(context.Background(), pInfo)
p.activeQLock.Lock()
defer p.activeQLock.Unlock()
if pInfo.Gated {
// Add the Pod to unschedulablePods if it's not passing PreEnqueuePlugins.
if _, exists, _ := p.activeQ.Get(pInfo); exists {
return false, nil
}
if _, exists, _ := p.podBackoffQ.Get(pInfo); exists {
return false, nil
}
p.unschedulablePods.addOrUpdate(pInfo)
return false, nil
}
@ -569,10 +595,20 @@ func (p *PriorityQueue) addToActiveQ(logger klog.Logger, pInfo *framework.Queued
now := p.clock.Now()
pInfo.InitialAttemptTimestamp = &now
}
if err := p.activeQ.Add(pInfo); err != nil {
logger.Error(err, "Error adding pod to the active queue", "pod", klog.KObj(pInfo.Pod))
return false, err
}
p.unschedulablePods.delete(pInfo.Pod, gatedBefore)
_ = p.podBackoffQ.Delete(pInfo) // Don't need to react when pInfo is not found.
logger.V(5).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pInfo.Pod), "event", event, "queue", activeQ)
metrics.SchedulerQueueIncomingPods.WithLabelValues("active", event).Inc()
if event == PodAdd || event == PodUpdate {
p.addNominatedPodUnlocked(logger, pInfo.PodInfo, nil)
}
return true, nil
}
@ -583,21 +619,9 @@ func (p *PriorityQueue) Add(logger klog.Logger, pod *v1.Pod) error {
defer p.lock.Unlock()
pInfo := p.newQueuedPodInfo(pod)
gated := pInfo.Gated
if added, err := p.addToActiveQ(logger, pInfo); !added {
if added, err := p.moveToActiveQ(logger, pInfo, PodAdd); !added {
return err
}
if p.unschedulablePods.get(pod) != nil {
logger.Error(nil, "Error: pod is already in the unschedulable queue", "pod", klog.KObj(pod))
p.unschedulablePods.delete(pod, gated)
}
// Delete pod from backoffQ if it is backing off
if err := p.podBackoffQ.Delete(pInfo); err == nil {
logger.Error(nil, "Error: pod is already in the podBackoff queue", "pod", klog.KObj(pod))
}
logger.V(5).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pod), "event", PodAdd, "queue", activeQ)
metrics.SchedulerQueueIncomingPods.WithLabelValues("active", PodAdd).Inc()
p.addNominatedPodUnlocked(logger, pInfo.PodInfo, nil)
p.cond.Broadcast()
return nil
@ -620,9 +644,16 @@ func (p *PriorityQueue) Activate(logger klog.Logger, pods map[string]*v1.Pod) {
}
}
func (p *PriorityQueue) existsInActiveQ(pInfo *framework.QueuedPodInfo) bool {
p.activeQLock.RLock()
defer p.activeQLock.RUnlock()
_, exists, _ := p.activeQ.Get(pInfo)
return exists
}
func (p *PriorityQueue) activate(logger klog.Logger, pod *v1.Pod) bool {
// Verify if the pod is present in activeQ.
if _, exists, _ := p.activeQ.Get(newQueuedPodInfoForLookup(pod)); exists {
if p.existsInActiveQ(newQueuedPodInfoForLookup(pod)) {
// No need to activate if it's already present in activeQ.
return false
}
@ -644,15 +675,8 @@ func (p *PriorityQueue) activate(logger klog.Logger, pod *v1.Pod) bool {
return false
}
gated := pInfo.Gated
if added, _ := p.addToActiveQ(logger, pInfo); !added {
return false
}
p.unschedulablePods.delete(pInfo.Pod, gated)
p.podBackoffQ.Delete(pInfo)
metrics.SchedulerQueueIncomingPods.WithLabelValues("active", ForceActivate).Inc()
p.addNominatedPodUnlocked(logger, pInfo.PodInfo, nil)
return true
added, _ := p.moveToActiveQ(logger, pInfo, ForceActivate)
return added
}
// isPodBackingoff returns true if a pod is still waiting for its backoff timer.
@ -667,14 +691,29 @@ func (p *PriorityQueue) isPodBackingoff(podInfo *framework.QueuedPodInfo) bool {
// SchedulingCycle returns current scheduling cycle.
func (p *PriorityQueue) SchedulingCycle() int64 {
p.lock.RLock()
defer p.lock.RUnlock()
p.activeQLock.RLock()
defer p.activeQLock.RUnlock()
return p.schedulingCycle
}
// determineSchedulingHintForInFlightPod looks at the unschedulable plugins of the given Pod
// and determines the scheduling hint for this Pod while checking the events that happened during in-flight.
func (p *PriorityQueue) determineSchedulingHintForInFlightPod(logger klog.Logger, pInfo *framework.QueuedPodInfo) queueingStrategy {
// clusterEventsSinceElementUnlocked gets all cluster events that have happened during this inFlightPod is being scheduled.
// Note: this function assumes activeQLock to be locked by the caller.
func (p *PriorityQueue) clusterEventsSinceElementUnlocked(inFlightPod *list.Element) []*clusterEvent {
var events []*clusterEvent
for event := inFlightPod.Next(); event != nil; event = event.Next() {
e, ok := event.Value.(*clusterEvent)
if !ok {
// Must be another in-flight Pod (*v1.Pod). Can be ignored.
continue
}
events = append(events, e)
}
return events
}
func (p *PriorityQueue) clusterEventsForPod(logger klog.Logger, pInfo *framework.QueuedPodInfo) ([]*clusterEvent, error) {
p.activeQLock.RLock()
defer p.activeQLock.RUnlock()
logger.V(5).Info("Checking events for in-flight pod", "pod", klog.KObj(pInfo.Pod), "unschedulablePlugins", pInfo.UnschedulablePlugins, "inFlightEventsSize", p.inFlightEvents.Len(), "inFlightPodsSize", len(p.inFlightPods))
// AddUnschedulableIfNotPresent is called with the Pod at the end of scheduling or binding.
@ -682,7 +721,18 @@ func (p *PriorityQueue) determineSchedulingHintForInFlightPod(logger klog.Logger
// we can assume pInfo must be recorded in inFlightPods and thus inFlightEvents.
inFlightPod, ok := p.inFlightPods[pInfo.Pod.UID]
if !ok {
logger.Error(nil, "In flight Pod isn't found in the scheduling queue. If you see this error log, it's likely a bug in the scheduler.", "pod", klog.KObj(pInfo.Pod))
return nil, fmt.Errorf("in flight Pod isn't found in the scheduling queue. If you see this error log, it's likely a bug in the scheduler")
}
return p.clusterEventsSinceElementUnlocked(inFlightPod), nil
}
// determineSchedulingHintForInFlightPod looks at the unschedulable plugins of the given Pod
// and determines the scheduling hint for this Pod while checking the events that happened during in-flight.
func (p *PriorityQueue) determineSchedulingHintForInFlightPod(logger klog.Logger, pInfo *framework.QueuedPodInfo) queueingStrategy {
events, err := p.clusterEventsForPod(logger, pInfo)
if err != nil {
logger.Error(err, "Error getting cluster events for pod", "pod", klog.KObj(pInfo.Pod))
return queueAfterBackoff
}
@ -696,12 +746,7 @@ func (p *PriorityQueue) determineSchedulingHintForInFlightPod(logger klog.Logger
// check if there is an event that makes this Pod schedulable based on pInfo.UnschedulablePlugins.
queueingStrategy := queueSkip
for event := inFlightPod.Next(); event != nil; event = event.Next() {
e, ok := event.Value.(*clusterEvent)
if !ok {
// Must be another in-flight Pod (*v1.Pod). Can be ignored.
continue
}
for _, e := range events {
logger.V(5).Info("Checking event for in-flight pod", "pod", klog.KObj(pInfo.Pod), "event", e.event.Label)
switch p.isPodWorthRequeuing(logger, pInfo, e.event, e.oldObj, e.newObj) {
@ -775,14 +820,14 @@ func (p *PriorityQueue) AddUnschedulableIfNotPresent(logger klog.Logger, pInfo *
defer p.lock.Unlock()
// In any case, this Pod will be moved back to the queue and we should call Done.
defer p.done(pInfo.Pod.UID)
defer p.Done(pInfo.Pod.UID)
pod := pInfo.Pod
if p.unschedulablePods.get(pod) != nil {
return fmt.Errorf("Pod %v is already present in unschedulable queue", klog.KObj(pod))
}
if _, exists, _ := p.activeQ.Get(pInfo); exists {
if p.existsInActiveQ(pInfo) {
return fmt.Errorf("Pod %v is already present in the active queue", klog.KObj(pod))
}
if _, exists, _ := p.podBackoffQ.Get(pInfo); exists {
@ -839,9 +884,7 @@ func (p *PriorityQueue) flushBackoffQCompleted(logger klog.Logger) {
logger.Error(err, "Unable to pop pod from backoff queue despite backoff completion", "pod", klog.KObj(pod))
break
}
if added, _ := p.addToActiveQ(logger, pInfo); added {
logger.V(5).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pod), "event", BackoffComplete, "queue", activeQ)
metrics.SchedulerQueueIncomingPods.WithLabelValues("active", BackoffComplete).Inc()
if added, _ := p.moveToActiveQ(logger, pInfo, BackoffComplete); added {
activated = true
}
}
@ -874,9 +917,11 @@ func (p *PriorityQueue) flushUnschedulablePodsLeftover(logger klog.Logger) {
// Pop removes the head of the active queue and returns it. It blocks if the
// activeQ is empty and waits until a new item is added to the queue. It
// increments scheduling cycle when a pod is popped.
// Note: This method should NOT be locked by the p.lock at any moment,
// as it would lead to scheduling throughput degradation.
func (p *PriorityQueue) Pop(logger klog.Logger) (*framework.QueuedPodInfo, error) {
p.lock.Lock()
defer p.lock.Unlock()
p.activeQLock.Lock()
defer p.activeQLock.Unlock()
for p.activeQ.Len() == 0 {
// When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
// When Close() is called, the p.closed is set and the condition is broadcast,
@ -912,8 +957,8 @@ func (p *PriorityQueue) Pop(logger klog.Logger) (*framework.QueuedPodInfo, error
// Done must be called for pod returned by Pop. This allows the queue to
// keep track of which pods are currently being processed.
func (p *PriorityQueue) Done(pod types.UID) {
p.lock.Lock()
defer p.lock.Unlock()
p.activeQLock.Lock()
defer p.activeQLock.Unlock()
p.done(pod)
}
@ -972,6 +1017,17 @@ func isPodUpdated(oldPod, newPod *v1.Pod) bool {
return !reflect.DeepEqual(strip(oldPod), strip(newPod))
}
func (p *PriorityQueue) updateInActiveQueue(logger klog.Logger, oldPod, newPod *v1.Pod, oldPodInfo *framework.QueuedPodInfo) (bool, error) {
p.activeQLock.Lock()
defer p.activeQLock.Unlock()
if oldPodInfo, exists, _ := p.activeQ.Get(oldPodInfo); exists {
pInfo := updatePod(oldPodInfo, newPod)
p.updateNominatedPodUnlocked(logger, oldPod, pInfo.PodInfo)
return true, p.activeQ.Update(pInfo)
}
return false, nil
}
// Update updates a pod in the active or backoff queue if present. Otherwise, it removes
// the item from the unschedulable queue if pod is updated in a way that it may
// become schedulable and adds the updated one to the active queue.
@ -981,6 +1037,7 @@ func (p *PriorityQueue) Update(logger klog.Logger, oldPod, newPod *v1.Pod) error
defer p.lock.Unlock()
if p.isSchedulingQueueHintEnabled {
p.activeQLock.Lock()
// the inflight pod will be requeued using the latest version from the informer cache, which matches what the event delivers.
if _, ok := p.inFlightPods[newPod.UID]; ok {
logger.V(6).Info("The pod doesn't be queued for now because it's being scheduled and will be queued back if necessary", "pod", klog.KObj(newPod))
@ -995,17 +1052,17 @@ func (p *PriorityQueue) Update(logger klog.Logger, oldPod, newPod *v1.Pod) error
newObj: newPod,
})
p.activeQLock.Unlock()
return nil
}
p.activeQLock.Unlock()
}
if oldPod != nil {
oldPodInfo := newQueuedPodInfoForLookup(oldPod)
// If the pod is already in the active queue, just update it there.
if oldPodInfo, exists, _ := p.activeQ.Get(oldPodInfo); exists {
pInfo := updatePod(oldPodInfo, newPod)
p.updateNominatedPodUnlocked(logger, oldPod, pInfo.PodInfo)
return p.activeQ.Update(pInfo)
if exists, err := p.updateInActiveQueue(logger, oldPod, newPod, oldPodInfo); exists {
return err
}
// If the pod is in the backoff queue, update it there.
@ -1048,11 +1105,9 @@ func (p *PriorityQueue) Update(logger klog.Logger, oldPod, newPod *v1.Pod) error
return nil
}
if added, err := p.addToActiveQ(logger, pInfo); !added {
if added, err := p.moveToActiveQ(logger, pInfo, BackoffComplete); !added {
return err
}
p.unschedulablePods.delete(usPodInfo.Pod, gated)
logger.V(5).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pInfo.Pod), "event", BackoffComplete, "queue", activeQ)
p.cond.Broadcast()
return nil
}
@ -1063,11 +1118,9 @@ func (p *PriorityQueue) Update(logger klog.Logger, oldPod, newPod *v1.Pod) error
}
// If pod is not in any of the queues, we put it in the active queue.
pInfo := p.newQueuedPodInfo(newPod)
if added, err := p.addToActiveQ(logger, pInfo); !added {
if added, err := p.moveToActiveQ(logger, pInfo, PodUpdate); !added {
return err
}
p.addNominatedPodUnlocked(logger, pInfo.PodInfo, nil)
logger.V(5).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pInfo.Pod), "event", PodUpdate, "queue", activeQ)
p.cond.Broadcast()
return nil
}
@ -1079,6 +1132,8 @@ func (p *PriorityQueue) Delete(pod *v1.Pod) error {
defer p.lock.Unlock()
p.deleteNominatedPodIfExistsUnlocked(pod)
pInfo := newQueuedPodInfoForLookup(pod)
p.activeQLock.Lock()
defer p.activeQLock.Unlock()
if err := p.activeQ.Delete(pInfo); err != nil {
// The item was probably not found in the activeQ.
p.podBackoffQ.Delete(pInfo)
@ -1186,12 +1241,11 @@ func (p *PriorityQueue) requeuePodViaQueueingHint(logger klog.Logger, pInfo *fra
// Reach here if schedulingHint is QueueImmediately, or schedulingHint is Queue but the pod is not backing off.
added, err := p.addToActiveQ(logger, pInfo)
added, err := p.moveToActiveQ(logger, pInfo, event)
if err != nil {
logger.Error(err, "Error adding pod to the active queue, queue this Pod to unschedulable pod pool", "pod", klog.KObj(pod))
}
if added {
metrics.SchedulerQueueIncomingPods.WithLabelValues("active", event).Inc()
return activeQ
}
if pInfo.Gated {
@ -1247,6 +1301,8 @@ func (p *PriorityQueue) movePodsToActiveOrBackoffQueue(logger klog.Logger, podIn
}
}
p.activeQLock.Lock()
defer p.activeQLock.Unlock()
p.moveRequestCycle = p.schedulingCycle
if p.isSchedulingQueueHintEnabled && len(p.inFlightPods) != 0 {
@ -1293,10 +1349,9 @@ func (p *PriorityQueue) getUnschedulablePodsWithCrossTopologyTerm(logger klog.Lo
}
// PodsInActiveQ returns all the Pods in the activeQ.
// This function is only used in tests.
func (p *PriorityQueue) PodsInActiveQ() []*v1.Pod {
p.lock.RLock()
defer p.lock.RUnlock()
p.activeQLock.RLock()
defer p.activeQLock.RUnlock()
var result []*v1.Pod
for _, pInfo := range p.activeQ.List() {
result = append(result, pInfo.(*framework.QueuedPodInfo).Pod)
@ -1312,17 +1367,15 @@ var pendingPodsSummary = "activeQ:%v; backoffQ:%v; unschedulablePods:%v"
func (p *PriorityQueue) PendingPods() ([]*v1.Pod, string) {
p.lock.RLock()
defer p.lock.RUnlock()
var result []*v1.Pod
for _, pInfo := range p.activeQ.List() {
result = append(result, pInfo.(*framework.QueuedPodInfo).Pod)
}
result := p.PodsInActiveQ()
activeQLen := len(result)
for _, pInfo := range p.podBackoffQ.List() {
result = append(result, pInfo.(*framework.QueuedPodInfo).Pod)
}
for _, pInfo := range p.unschedulablePods.podInfoMap {
result = append(result, pInfo.Pod)
}
return result, fmt.Sprintf(pendingPodsSummary, p.activeQ.Len(), p.podBackoffQ.Len(), len(p.unschedulablePods.podInfoMap))
return result, fmt.Sprintf(pendingPodsSummary, activeQLen, p.podBackoffQ.Len(), len(p.unschedulablePods.podInfoMap))
}
// Close closes the priority queue.
@ -1330,7 +1383,10 @@ func (p *PriorityQueue) Close() {
p.lock.Lock()
defer p.lock.Unlock()
close(p.stop)
p.activeQLock.Lock()
// closed field is locked by activeQLock as it is checked in Pop() without p.lock set.
p.closed = true
p.activeQLock.Unlock()
p.cond.Broadcast()
}

View File

@ -1405,7 +1405,7 @@ func TestPriorityQueue_addToActiveQ(t *testing.T) {
m := map[string][]framework.PreEnqueuePlugin{"": tt.plugins}
q := NewTestQueueWithObjects(ctx, newDefaultQueueSort(), []runtime.Object{tt.pod}, WithPreEnqueuePluginMap(m),
WithPodInitialBackoffDuration(time.Second*30), WithPodMaxBackoffDuration(time.Second*60))
got, _ := q.addToActiveQ(logger, q.newQueuedPodInfo(tt.pod))
got, _ := q.moveToActiveQ(logger, q.newQueuedPodInfo(tt.pod), PodAdd)
if got != tt.wantSuccess {
t.Errorf("Unexpected result: want %v, but got %v", tt.wantSuccess, got)
}