diff --git a/pkg/scheduler/internal/queue/scheduling_queue.go b/pkg/scheduler/internal/queue/scheduling_queue.go index 97e98526e42..1a3a0877663 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue.go +++ b/pkg/scheduler/internal/queue/scheduling_queue.go @@ -383,7 +383,8 @@ func (p *PriorityQueue) clearPodBackoff(pod *v1.Pod) { p.podBackoff.ClearPodBackoff(nsNameForPod(pod)) } -// isPodBackingOff returns whether a pod is currently undergoing backoff in the podBackoff structure +// isPodBackingOff returns true if a pod is still waiting for its backoff timer. +// If this returns true, the pod should not be re-tried. func (p *PriorityQueue) isPodBackingOff(pod *v1.Pod) bool { boTime, exists := p.podBackoff.GetBackoffTime(nsNameForPod(pod)) if !exists { @@ -411,11 +412,10 @@ func (p *PriorityQueue) SchedulingCycle() int64 { return p.schedulingCycle } -// AddUnschedulableIfNotPresent does nothing if the pod is present in any -// queue. If pod is unschedulable, it adds pod to unschedulable queue if -// p.moveRequestCycle > podSchedulingCycle or to backoff queue if p.moveRequestCycle -// <= podSchedulingCycle but pod is subject to backoff. In other cases, it adds pod to -// active queue. +// AddUnschedulableIfNotPresent inserts a pod that cannot be scheduled into +// the queue, unless it is already in the queue. Normally, PriorityQueue puts +// unschedulable pods in `unschedulableQ`. But if there has been a recent move +// request, then the pod is put in `podBackoffQ`. func (p *PriorityQueue) AddUnschedulableIfNotPresent(pod *v1.Pod, podSchedulingCycle int64) error { p.lock.Lock() defer p.lock.Unlock() @@ -430,30 +430,26 @@ func (p *PriorityQueue) AddUnschedulableIfNotPresent(pod *v1.Pod, podSchedulingC if _, exists, _ := p.podBackoffQ.Get(pInfo); exists { return fmt.Errorf("pod is already present in the backoffQ") } - if podSchedulingCycle > p.moveRequestCycle && isPodUnschedulable(pod) { - p.backoffPod(pod) - p.unschedulableQ.addOrUpdate(pInfo) - p.nominatedPods.add(pod, "") - return nil - } - // If a move request has been received and the pod is subject to backoff, move it to the BackoffQ. - if p.isPodBackingOff(pod) && isPodUnschedulable(pod) { - err := p.podBackoffQ.Add(pInfo) - if err != nil { - klog.Errorf("Error adding pod %v to the backoff queue: %v", pod.Name, err) - } else { - p.nominatedPods.add(pod, "") + // Every unschedulable pod is subject to backoff timers. + p.backoffPod(pod) + + // If a move request has been received, move it to the BackoffQ, otherwise move + // it to unschedulableQ. + if p.moveRequestCycle >= podSchedulingCycle { + if err := p.podBackoffQ.Add(pInfo); err != nil { + // TODO: Delete this klog call and log returned errors at the call site. + err = fmt.Errorf("error adding pod %v to the backoff queue: %v", pod.Name, err) + klog.Error(err) + return err } - return err + } else { + p.unschedulableQ.addOrUpdate(pInfo) } - err := p.activeQ.Add(pInfo) - if err == nil { - p.nominatedPods.add(pod, "") - p.cond.Broadcast() - } - return err + p.nominatedPods.add(pod, "") + return nil + } // flushBackoffQCompleted Moves all pods from backoffQ which have completed backoff in to activeQ diff --git a/pkg/scheduler/internal/queue/scheduling_queue_test.go b/pkg/scheduler/internal/queue/scheduling_queue_test.go index 9f7599c86fb..c33f8547725 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue_test.go +++ b/pkg/scheduler/internal/queue/scheduling_queue_test.go @@ -184,16 +184,14 @@ func TestPriorityQueue_AddUnschedulableIfNotPresent(t *testing.T) { q := NewPriorityQueue(nil) q.Add(&highPriNominatedPod) q.AddUnschedulableIfNotPresent(&highPriNominatedPod, q.SchedulingCycle()) // Must not add anything. - q.AddUnschedulableIfNotPresent(&medPriorityPod, q.SchedulingCycle()) // This should go to activeQ. q.AddUnschedulableIfNotPresent(&unschedulablePod, q.SchedulingCycle()) expectedNominatedPods := &nominatedPodMap{ nominatedPodToNode: map[types.UID]string{ - medPriorityPod.UID: "node1", unschedulablePod.UID: "node1", highPriNominatedPod.UID: "node1", }, nominatedPods: map[string][]*v1.Pod{ - "node1": {&highPriNominatedPod, &medPriorityPod, &unschedulablePod}, + "node1": {&highPriNominatedPod, &unschedulablePod}, }, } if !reflect.DeepEqual(q.nominatedPods, expectedNominatedPods) { @@ -202,9 +200,6 @@ func TestPriorityQueue_AddUnschedulableIfNotPresent(t *testing.T) { if p, err := q.Pop(); err != nil || p != &highPriNominatedPod { t.Errorf("Expected: %v after Pop, but got: %v", highPriNominatedPod.Name, p.Name) } - if p, err := q.Pop(); err != nil || p != &medPriorityPod { - t.Errorf("Expected: %v after Pop, but got: %v", medPriorityPod.Name, p.Name) - } if len(q.nominatedPods.nominatedPods) != 1 { t.Errorf("Expected nomindatePods to have one element: %v", q.nominatedPods) } @@ -213,11 +208,11 @@ func TestPriorityQueue_AddUnschedulableIfNotPresent(t *testing.T) { } } -// TestPriorityQueue_AddUnschedulableIfNotPresent_Async tests scenario when +// TestPriorityQueue_AddUnschedulableIfNotPresent_Backoff tests scenario when // AddUnschedulableIfNotPresent is called asynchronously pods in and before // current scheduling cycle will be put back to activeQueue if we were trying // to schedule them when we received move request. -func TestPriorityQueue_AddUnschedulableIfNotPresent_Async(t *testing.T) { +func TestPriorityQueue_AddUnschedulableIfNotPresent_Backoff(t *testing.T) { q := NewPriorityQueue(nil) totalNum := 10 expectedPods := make([]v1.Pod, 0, totalNum) @@ -248,10 +243,14 @@ func TestPriorityQueue_AddUnschedulableIfNotPresent_Async(t *testing.T) { // move all pods to active queue when we were trying to schedule them q.MoveAllToActiveQueue() - moveReqChan := make(chan struct{}) - var wg sync.WaitGroup - wg.Add(totalNum - 1) - // mark pods[1] ~ pods[totalNum-1] as unschedulable, fire goroutines to add them back later + oldCycle := q.SchedulingCycle() + + firstPod, _ := q.Pop() + if !reflect.DeepEqual(&expectedPods[0], firstPod) { + t.Errorf("Unexpected pod. Expected: %v, got: %v", &expectedPods[0], firstPod) + } + + // mark pods[1] ~ pods[totalNum-1] as unschedulable and add them back for i := 1; i < totalNum; i++ { unschedulablePod := expectedPods[i].DeepCopy() unschedulablePod.Status = v1.PodStatus{ @@ -263,24 +262,15 @@ func TestPriorityQueue_AddUnschedulableIfNotPresent_Async(t *testing.T) { }, }, } - cycle := q.SchedulingCycle() - go func() { - <-moveReqChan - q.AddUnschedulableIfNotPresent(unschedulablePod, cycle) - wg.Done() - }() + + q.AddUnschedulableIfNotPresent(unschedulablePod, oldCycle) } - firstPod, _ := q.Pop() - if !reflect.DeepEqual(&expectedPods[0], firstPod) { - t.Errorf("Unexpected pod. Expected: %v, got: %v", &expectedPods[0], firstPod) - } - // close moveReqChan here to make sure q.AddUnschedulableIfNotPresent is called after another pod is popped - close(moveReqChan) - wg.Wait() - // all other pods should be in active queue again + + // Since there was a move request at the same cycle as "oldCycle", these pods + // should be in the backoff queue. for i := 1; i < totalNum; i++ { - if _, exists, _ := q.activeQ.Get(newPodInfoNoTimestamp(&expectedPods[i])); !exists { - t.Errorf("Expected %v to be added to activeQ.", expectedPods[i].Name) + if _, exists, _ := q.podBackoffQ.Get(newPodInfoNoTimestamp(&expectedPods[i])); !exists { + t.Errorf("Expected %v to be added to podBackoffQ.", expectedPods[i].Name) } } }