Make update and delete active queue methods

This commit is contained in:
Maciej Skoczeń 2024-08-20 09:21:06 +00:00
parent 9773a39b28
commit 3eefd62f94
3 changed files with 46 additions and 31 deletions

View File

@ -36,8 +36,10 @@ import (
// and it is forbidden to call any other activeQueuer's method under this lock.
type activeQueuer interface {
underLock(func(unlockedActiveQ unlockedActiveQueuer))
underRLock(func(unlockedActiveQ unlockedActiveQueuer))
underRLock(func(unlockedActiveQ unlockedActiveQueueReader))
update(newPod *v1.Pod, oldPodInfo *framework.QueuedPodInfo) *framework.QueuedPodInfo
delete(pInfo *framework.QueuedPodInfo) error
pop(logger klog.Logger) (*framework.QueuedPodInfo, error)
list() []*v1.Pod
len() int
@ -56,12 +58,17 @@ type activeQueuer interface {
}
// unlockedActiveQueuer defines activeQ methods that are not protected by the lock itself.
// underLock() and underRLock() methods should be used to protect these methods.
// underLock() method should be used to protect these methods.
type unlockedActiveQueuer interface {
unlockedActiveQueueReader
AddOrUpdate(pInfo *framework.QueuedPodInfo)
}
// unlockedActiveQueueReader defines activeQ read-only methods that are not protected by the lock itself.
// underLock() or underRLock() method should be used to protect these methods.
type unlockedActiveQueueReader interface {
Get(pInfo *framework.QueuedPodInfo) (*framework.QueuedPodInfo, bool)
Has(pInfo *framework.QueuedPodInfo) bool
AddOrUpdate(pInfo *framework.QueuedPodInfo)
Delete(pInfo *framework.QueuedPodInfo) error
}
// activeQueue implements activeQueuer. All of the fields have to be protected using the lock.
@ -140,14 +147,36 @@ func (aq *activeQueue) underLock(fn func(unlockedActiveQ unlockedActiveQueuer))
}
// underLock runs the fn function under the lock.RLock.
// fn can run unlockedActiveQueuer methods but should NOT run any other activeQueue method,
// fn can run unlockedActiveQueueReader methods but should NOT run any other activeQueue method,
// as it would end up in deadlock.
func (aq *activeQueue) underRLock(fn func(unlockedActiveQ unlockedActiveQueuer)) {
func (aq *activeQueue) underRLock(fn func(unlockedActiveQ unlockedActiveQueueReader)) {
aq.lock.RLock()
defer aq.lock.RUnlock()
fn(aq.queue)
}
// update updates the pod in activeQ if oldPodInfo is already in the queue.
// It returns new pod info if updated, nil otherwise.
func (aq *activeQueue) update(newPod *v1.Pod, oldPodInfo *framework.QueuedPodInfo) *framework.QueuedPodInfo {
aq.lock.Lock()
defer aq.lock.Unlock()
if pInfo, exists := aq.queue.Get(oldPodInfo); exists {
_ = pInfo.Update(newPod)
aq.queue.AddOrUpdate(pInfo)
return pInfo
}
return nil
}
// delete deletes the pod info from activeQ.
func (aq *activeQueue) delete(pInfo *framework.QueuedPodInfo) error {
aq.lock.Lock()
defer aq.lock.Unlock()
return aq.queue.Delete(pInfo)
}
// pop removes the head of the queue and returns it.
// It blocks if the queue is empty and waits until a new item is added to the queue.
// It increments scheduling cycle when a pod is popped.

View File

@ -860,19 +860,6 @@ func isPodUpdated(oldPod, newPod *v1.Pod) bool {
return !reflect.DeepEqual(strip(oldPod), strip(newPod))
}
func (p *PriorityQueue) updateInActiveQueue(logger klog.Logger, oldPod, newPod *v1.Pod, oldPodInfo *framework.QueuedPodInfo) bool {
exists := false
p.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) {
if pInfo, exists := unlockedActiveQ.Get(oldPodInfo); exists {
_ = pInfo.Update(newPod)
p.UpdateNominatedPod(logger, oldPod, pInfo.PodInfo)
unlockedActiveQ.AddOrUpdate(pInfo)
exists = true
}
})
return exists
}
// Update updates a pod in the active or backoff queue if present. Otherwise, it removes
// the item from the unschedulable queue if pod is updated in a way that it may
// become schedulable and adds the updated one to the active queue.
@ -896,7 +883,8 @@ func (p *PriorityQueue) Update(logger klog.Logger, oldPod, newPod *v1.Pod) {
if oldPod != nil {
oldPodInfo := newQueuedPodInfoForLookup(oldPod)
// If the pod is already in the active queue, just update it there.
if exists := p.updateInActiveQueue(logger, oldPod, newPod, oldPodInfo); exists {
if pInfo := p.activeQ.update(newPod, oldPodInfo); pInfo != nil {
p.UpdateNominatedPod(logger, oldPod, pInfo.PodInfo)
return
}
@ -966,15 +954,13 @@ func (p *PriorityQueue) Delete(pod *v1.Pod) {
defer p.lock.Unlock()
p.DeleteNominatedPodIfExists(pod)
pInfo := newQueuedPodInfoForLookup(pod)
p.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) {
if err := unlockedActiveQ.Delete(pInfo); err != nil {
// The item was probably not found in the activeQ.
p.podBackoffQ.Delete(pInfo)
if pInfo = p.unschedulablePods.get(pod); pInfo != nil {
p.unschedulablePods.delete(pod, pInfo.Gated)
}
if err := p.activeQ.delete(pInfo); err != nil {
// The item was probably not found in the activeQ.
p.podBackoffQ.Delete(pInfo)
if pInfo = p.unschedulablePods.get(pod); pInfo != nil {
p.unschedulablePods.delete(pod, pInfo.Gated)
}
})
}
}
// AssignedPodAdded is called when a bound pod is added. Creation of this pod
@ -1176,7 +1162,7 @@ func (p *PriorityQueue) PendingPods() ([]*v1.Pod, string) {
}
// Note: this function assumes the caller locks both p.lock.RLock and p.activeQ.getLock().RLock.
func (p *PriorityQueue) nominatedPodToInfo(np podRef, unlockedActiveQ unlockedActiveQueuer) *framework.PodInfo {
func (p *PriorityQueue) nominatedPodToInfo(np podRef, unlockedActiveQ unlockedActiveQueueReader) *framework.PodInfo {
pod := np.toPod()
pInfoLookup := newQueuedPodInfoForLookup(pod)
@ -1216,7 +1202,7 @@ func (p *PriorityQueue) NominatedPodsForNode(nodeName string) []*framework.PodIn
nominatedPods := p.nominator.nominatedPodsForNode(nodeName)
pods := make([]*framework.PodInfo, len(nominatedPods))
p.activeQ.underRLock(func(unlockedActiveQ unlockedActiveQueuer) {
p.activeQ.underRLock(func(unlockedActiveQ unlockedActiveQueueReader) {
for i, np := range nominatedPods {
pods[i] = p.nominatedPodToInfo(np, unlockedActiveQ).DeepCopy()
}

View File

@ -1104,7 +1104,7 @@ func TestPriorityQueue_Update(t *testing.T) {
pInfo = pInfoFromBackoff
}
q.activeQ.underRLock(func(unlockedActiveQ unlockedActiveQueuer) {
q.activeQ.underRLock(func(unlockedActiveQ unlockedActiveQueueReader) {
if pInfoFromActive, exists := unlockedActiveQ.Get(newQueuedPodInfoForLookup(newPod)); exists {
if tt.wantQ != activeQ {
t.Errorf("expected pod %s not to be queued to activeQ, but it was", newPod.Name)