diff --git a/pkg/scheduler/internal/queue/active_queue.go b/pkg/scheduler/internal/queue/active_queue.go index 4eb19a9448f..223c7229cdd 100644 --- a/pkg/scheduler/internal/queue/active_queue.go +++ b/pkg/scheduler/internal/queue/active_queue.go @@ -36,8 +36,10 @@ import ( // and it is forbidden to call any other activeQueuer's method under this lock. type activeQueuer interface { underLock(func(unlockedActiveQ unlockedActiveQueuer)) - underRLock(func(unlockedActiveQ unlockedActiveQueuer)) + underRLock(func(unlockedActiveQ unlockedActiveQueueReader)) + update(newPod *v1.Pod, oldPodInfo *framework.QueuedPodInfo) *framework.QueuedPodInfo + delete(pInfo *framework.QueuedPodInfo) error pop(logger klog.Logger) (*framework.QueuedPodInfo, error) list() []*v1.Pod len() int @@ -56,12 +58,17 @@ type activeQueuer interface { } // unlockedActiveQueuer defines activeQ methods that are not protected by the lock itself. -// underLock() and underRLock() methods should be used to protect these methods. +// underLock() method should be used to protect these methods. type unlockedActiveQueuer interface { + unlockedActiveQueueReader + AddOrUpdate(pInfo *framework.QueuedPodInfo) +} + +// unlockedActiveQueueReader defines activeQ read-only methods that are not protected by the lock itself. +// underLock() or underRLock() method should be used to protect these methods. +type unlockedActiveQueueReader interface { Get(pInfo *framework.QueuedPodInfo) (*framework.QueuedPodInfo, bool) Has(pInfo *framework.QueuedPodInfo) bool - AddOrUpdate(pInfo *framework.QueuedPodInfo) - Delete(pInfo *framework.QueuedPodInfo) error } // activeQueue implements activeQueuer. All of the fields have to be protected using the lock. @@ -140,14 +147,36 @@ func (aq *activeQueue) underLock(fn func(unlockedActiveQ unlockedActiveQueuer)) } // underLock runs the fn function under the lock.RLock. -// fn can run unlockedActiveQueuer methods but should NOT run any other activeQueue method, +// fn can run unlockedActiveQueueReader methods but should NOT run any other activeQueue method, // as it would end up in deadlock. -func (aq *activeQueue) underRLock(fn func(unlockedActiveQ unlockedActiveQueuer)) { +func (aq *activeQueue) underRLock(fn func(unlockedActiveQ unlockedActiveQueueReader)) { aq.lock.RLock() defer aq.lock.RUnlock() fn(aq.queue) } +// update updates the pod in activeQ if oldPodInfo is already in the queue. +// It returns new pod info if updated, nil otherwise. +func (aq *activeQueue) update(newPod *v1.Pod, oldPodInfo *framework.QueuedPodInfo) *framework.QueuedPodInfo { + aq.lock.Lock() + defer aq.lock.Unlock() + + if pInfo, exists := aq.queue.Get(oldPodInfo); exists { + _ = pInfo.Update(newPod) + aq.queue.AddOrUpdate(pInfo) + return pInfo + } + return nil +} + +// delete deletes the pod info from activeQ. +func (aq *activeQueue) delete(pInfo *framework.QueuedPodInfo) error { + aq.lock.Lock() + defer aq.lock.Unlock() + + return aq.queue.Delete(pInfo) +} + // pop removes the head of the queue and returns it. // It blocks if the queue is empty and waits until a new item is added to the queue. // It increments scheduling cycle when a pod is popped. diff --git a/pkg/scheduler/internal/queue/scheduling_queue.go b/pkg/scheduler/internal/queue/scheduling_queue.go index ed0a4dc8577..25b1e993842 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue.go +++ b/pkg/scheduler/internal/queue/scheduling_queue.go @@ -860,19 +860,6 @@ func isPodUpdated(oldPod, newPod *v1.Pod) bool { return !reflect.DeepEqual(strip(oldPod), strip(newPod)) } -func (p *PriorityQueue) updateInActiveQueue(logger klog.Logger, oldPod, newPod *v1.Pod, oldPodInfo *framework.QueuedPodInfo) bool { - exists := false - p.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) { - if pInfo, exists := unlockedActiveQ.Get(oldPodInfo); exists { - _ = pInfo.Update(newPod) - p.UpdateNominatedPod(logger, oldPod, pInfo.PodInfo) - unlockedActiveQ.AddOrUpdate(pInfo) - exists = true - } - }) - return exists -} - // Update updates a pod in the active or backoff queue if present. Otherwise, it removes // the item from the unschedulable queue if pod is updated in a way that it may // become schedulable and adds the updated one to the active queue. @@ -896,7 +883,8 @@ func (p *PriorityQueue) Update(logger klog.Logger, oldPod, newPod *v1.Pod) { if oldPod != nil { oldPodInfo := newQueuedPodInfoForLookup(oldPod) // If the pod is already in the active queue, just update it there. - if exists := p.updateInActiveQueue(logger, oldPod, newPod, oldPodInfo); exists { + if pInfo := p.activeQ.update(newPod, oldPodInfo); pInfo != nil { + p.UpdateNominatedPod(logger, oldPod, pInfo.PodInfo) return } @@ -966,15 +954,13 @@ func (p *PriorityQueue) Delete(pod *v1.Pod) { defer p.lock.Unlock() p.DeleteNominatedPodIfExists(pod) pInfo := newQueuedPodInfoForLookup(pod) - p.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) { - if err := unlockedActiveQ.Delete(pInfo); err != nil { - // The item was probably not found in the activeQ. - p.podBackoffQ.Delete(pInfo) - if pInfo = p.unschedulablePods.get(pod); pInfo != nil { - p.unschedulablePods.delete(pod, pInfo.Gated) - } + if err := p.activeQ.delete(pInfo); err != nil { + // The item was probably not found in the activeQ. + p.podBackoffQ.Delete(pInfo) + if pInfo = p.unschedulablePods.get(pod); pInfo != nil { + p.unschedulablePods.delete(pod, pInfo.Gated) } - }) + } } // AssignedPodAdded is called when a bound pod is added. Creation of this pod @@ -1176,7 +1162,7 @@ func (p *PriorityQueue) PendingPods() ([]*v1.Pod, string) { } // Note: this function assumes the caller locks both p.lock.RLock and p.activeQ.getLock().RLock. -func (p *PriorityQueue) nominatedPodToInfo(np podRef, unlockedActiveQ unlockedActiveQueuer) *framework.PodInfo { +func (p *PriorityQueue) nominatedPodToInfo(np podRef, unlockedActiveQ unlockedActiveQueueReader) *framework.PodInfo { pod := np.toPod() pInfoLookup := newQueuedPodInfoForLookup(pod) @@ -1216,7 +1202,7 @@ func (p *PriorityQueue) NominatedPodsForNode(nodeName string) []*framework.PodIn nominatedPods := p.nominator.nominatedPodsForNode(nodeName) pods := make([]*framework.PodInfo, len(nominatedPods)) - p.activeQ.underRLock(func(unlockedActiveQ unlockedActiveQueuer) { + p.activeQ.underRLock(func(unlockedActiveQ unlockedActiveQueueReader) { for i, np := range nominatedPods { pods[i] = p.nominatedPodToInfo(np, unlockedActiveQ).DeepCopy() } diff --git a/pkg/scheduler/internal/queue/scheduling_queue_test.go b/pkg/scheduler/internal/queue/scheduling_queue_test.go index bdf4e037ee3..19b4ad1f5f7 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue_test.go +++ b/pkg/scheduler/internal/queue/scheduling_queue_test.go @@ -1104,7 +1104,7 @@ func TestPriorityQueue_Update(t *testing.T) { pInfo = pInfoFromBackoff } - q.activeQ.underRLock(func(unlockedActiveQ unlockedActiveQueuer) { + q.activeQ.underRLock(func(unlockedActiveQ unlockedActiveQueueReader) { if pInfoFromActive, exists := unlockedActiveQ.Get(newQueuedPodInfoForLookup(newPod)); exists { if tt.wantQ != activeQ { t.Errorf("expected pod %s not to be queued to activeQ, but it was", newPod.Name)