mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-28 05:57:25 +00:00
Ensure that pods obey backoff timers.
The function AddUnschedulableIfNotPresent is responsible for initializing or updating backoff timers for pods that could not be scheduled. The helper function backoffPod does that work, but was not being called in all cases. This moves that call to be (mostly) unconditional, while cleaning up comments and error handling.
This commit is contained in:
parent
4b8ecd68f3
commit
df4d65d2e1
@ -383,7 +383,8 @@ func (p *PriorityQueue) clearPodBackoff(pod *v1.Pod) {
|
|||||||
p.podBackoff.ClearPodBackoff(nsNameForPod(pod))
|
p.podBackoff.ClearPodBackoff(nsNameForPod(pod))
|
||||||
}
|
}
|
||||||
|
|
||||||
// isPodBackingOff returns whether a pod is currently undergoing backoff in the podBackoff structure
|
// isPodBackingOff returns true if a pod is still waiting for its backoff timer.
|
||||||
|
// If this returns true, the pod should not be re-tried.
|
||||||
func (p *PriorityQueue) isPodBackingOff(pod *v1.Pod) bool {
|
func (p *PriorityQueue) isPodBackingOff(pod *v1.Pod) bool {
|
||||||
boTime, exists := p.podBackoff.GetBackoffTime(nsNameForPod(pod))
|
boTime, exists := p.podBackoff.GetBackoffTime(nsNameForPod(pod))
|
||||||
if !exists {
|
if !exists {
|
||||||
@ -411,11 +412,10 @@ func (p *PriorityQueue) SchedulingCycle() int64 {
|
|||||||
return p.schedulingCycle
|
return p.schedulingCycle
|
||||||
}
|
}
|
||||||
|
|
||||||
// AddUnschedulableIfNotPresent does nothing if the pod is present in any
|
// AddUnschedulableIfNotPresent inserts a pod that cannot be scheduled into
|
||||||
// queue. If pod is unschedulable, it adds pod to unschedulable queue if
|
// the queue, unless it is already in the queue. Normally, PriorityQueue puts
|
||||||
// p.moveRequestCycle > podSchedulingCycle or to backoff queue if p.moveRequestCycle
|
// unschedulable pods in `unschedulableQ`. But if there has been a recent move
|
||||||
// <= podSchedulingCycle but pod is subject to backoff. In other cases, it adds pod to
|
// request, then the pod is put in `podBackoffQ`.
|
||||||
// active queue.
|
|
||||||
func (p *PriorityQueue) AddUnschedulableIfNotPresent(pod *v1.Pod, podSchedulingCycle int64) error {
|
func (p *PriorityQueue) AddUnschedulableIfNotPresent(pod *v1.Pod, podSchedulingCycle int64) error {
|
||||||
p.lock.Lock()
|
p.lock.Lock()
|
||||||
defer p.lock.Unlock()
|
defer p.lock.Unlock()
|
||||||
@ -430,30 +430,26 @@ func (p *PriorityQueue) AddUnschedulableIfNotPresent(pod *v1.Pod, podSchedulingC
|
|||||||
if _, exists, _ := p.podBackoffQ.Get(pInfo); exists {
|
if _, exists, _ := p.podBackoffQ.Get(pInfo); exists {
|
||||||
return fmt.Errorf("pod is already present in the backoffQ")
|
return fmt.Errorf("pod is already present in the backoffQ")
|
||||||
}
|
}
|
||||||
if podSchedulingCycle > p.moveRequestCycle && isPodUnschedulable(pod) {
|
|
||||||
p.backoffPod(pod)
|
|
||||||
p.unschedulableQ.addOrUpdate(pInfo)
|
|
||||||
p.nominatedPods.add(pod, "")
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// If a move request has been received and the pod is subject to backoff, move it to the BackoffQ.
|
// Every unschedulable pod is subject to backoff timers.
|
||||||
if p.isPodBackingOff(pod) && isPodUnschedulable(pod) {
|
p.backoffPod(pod)
|
||||||
err := p.podBackoffQ.Add(pInfo)
|
|
||||||
if err != nil {
|
// If a move request has been received, move it to the BackoffQ, otherwise move
|
||||||
klog.Errorf("Error adding pod %v to the backoff queue: %v", pod.Name, err)
|
// it to unschedulableQ.
|
||||||
} else {
|
if p.moveRequestCycle >= podSchedulingCycle {
|
||||||
p.nominatedPods.add(pod, "")
|
if err := p.podBackoffQ.Add(pInfo); err != nil {
|
||||||
|
// TODO: Delete this klog call and log returned errors at the call site.
|
||||||
|
err = fmt.Errorf("error adding pod %v to the backoff queue: %v", pod.Name, err)
|
||||||
|
klog.Error(err)
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
return err
|
} else {
|
||||||
|
p.unschedulableQ.addOrUpdate(pInfo)
|
||||||
}
|
}
|
||||||
|
|
||||||
err := p.activeQ.Add(pInfo)
|
p.nominatedPods.add(pod, "")
|
||||||
if err == nil {
|
return nil
|
||||||
p.nominatedPods.add(pod, "")
|
|
||||||
p.cond.Broadcast()
|
|
||||||
}
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// flushBackoffQCompleted Moves all pods from backoffQ which have completed backoff in to activeQ
|
// flushBackoffQCompleted Moves all pods from backoffQ which have completed backoff in to activeQ
|
||||||
|
@ -184,16 +184,14 @@ func TestPriorityQueue_AddUnschedulableIfNotPresent(t *testing.T) {
|
|||||||
q := NewPriorityQueue(nil)
|
q := NewPriorityQueue(nil)
|
||||||
q.Add(&highPriNominatedPod)
|
q.Add(&highPriNominatedPod)
|
||||||
q.AddUnschedulableIfNotPresent(&highPriNominatedPod, q.SchedulingCycle()) // Must not add anything.
|
q.AddUnschedulableIfNotPresent(&highPriNominatedPod, q.SchedulingCycle()) // Must not add anything.
|
||||||
q.AddUnschedulableIfNotPresent(&medPriorityPod, q.SchedulingCycle()) // This should go to activeQ.
|
|
||||||
q.AddUnschedulableIfNotPresent(&unschedulablePod, q.SchedulingCycle())
|
q.AddUnschedulableIfNotPresent(&unschedulablePod, q.SchedulingCycle())
|
||||||
expectedNominatedPods := &nominatedPodMap{
|
expectedNominatedPods := &nominatedPodMap{
|
||||||
nominatedPodToNode: map[types.UID]string{
|
nominatedPodToNode: map[types.UID]string{
|
||||||
medPriorityPod.UID: "node1",
|
|
||||||
unschedulablePod.UID: "node1",
|
unschedulablePod.UID: "node1",
|
||||||
highPriNominatedPod.UID: "node1",
|
highPriNominatedPod.UID: "node1",
|
||||||
},
|
},
|
||||||
nominatedPods: map[string][]*v1.Pod{
|
nominatedPods: map[string][]*v1.Pod{
|
||||||
"node1": {&highPriNominatedPod, &medPriorityPod, &unschedulablePod},
|
"node1": {&highPriNominatedPod, &unschedulablePod},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
if !reflect.DeepEqual(q.nominatedPods, expectedNominatedPods) {
|
if !reflect.DeepEqual(q.nominatedPods, expectedNominatedPods) {
|
||||||
@ -202,9 +200,6 @@ func TestPriorityQueue_AddUnschedulableIfNotPresent(t *testing.T) {
|
|||||||
if p, err := q.Pop(); err != nil || p != &highPriNominatedPod {
|
if p, err := q.Pop(); err != nil || p != &highPriNominatedPod {
|
||||||
t.Errorf("Expected: %v after Pop, but got: %v", highPriNominatedPod.Name, p.Name)
|
t.Errorf("Expected: %v after Pop, but got: %v", highPriNominatedPod.Name, p.Name)
|
||||||
}
|
}
|
||||||
if p, err := q.Pop(); err != nil || p != &medPriorityPod {
|
|
||||||
t.Errorf("Expected: %v after Pop, but got: %v", medPriorityPod.Name, p.Name)
|
|
||||||
}
|
|
||||||
if len(q.nominatedPods.nominatedPods) != 1 {
|
if len(q.nominatedPods.nominatedPods) != 1 {
|
||||||
t.Errorf("Expected nomindatePods to have one element: %v", q.nominatedPods)
|
t.Errorf("Expected nomindatePods to have one element: %v", q.nominatedPods)
|
||||||
}
|
}
|
||||||
@ -213,11 +208,11 @@ func TestPriorityQueue_AddUnschedulableIfNotPresent(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TestPriorityQueue_AddUnschedulableIfNotPresent_Async tests scenario when
|
// TestPriorityQueue_AddUnschedulableIfNotPresent_Backoff tests scenario when
|
||||||
// AddUnschedulableIfNotPresent is called asynchronously pods in and before
|
// AddUnschedulableIfNotPresent is called asynchronously pods in and before
|
||||||
// current scheduling cycle will be put back to activeQueue if we were trying
|
// current scheduling cycle will be put back to activeQueue if we were trying
|
||||||
// to schedule them when we received move request.
|
// to schedule them when we received move request.
|
||||||
func TestPriorityQueue_AddUnschedulableIfNotPresent_Async(t *testing.T) {
|
func TestPriorityQueue_AddUnschedulableIfNotPresent_Backoff(t *testing.T) {
|
||||||
q := NewPriorityQueue(nil)
|
q := NewPriorityQueue(nil)
|
||||||
totalNum := 10
|
totalNum := 10
|
||||||
expectedPods := make([]v1.Pod, 0, totalNum)
|
expectedPods := make([]v1.Pod, 0, totalNum)
|
||||||
@ -248,10 +243,14 @@ func TestPriorityQueue_AddUnschedulableIfNotPresent_Async(t *testing.T) {
|
|||||||
|
|
||||||
// move all pods to active queue when we were trying to schedule them
|
// move all pods to active queue when we were trying to schedule them
|
||||||
q.MoveAllToActiveQueue()
|
q.MoveAllToActiveQueue()
|
||||||
moveReqChan := make(chan struct{})
|
oldCycle := q.SchedulingCycle()
|
||||||
var wg sync.WaitGroup
|
|
||||||
wg.Add(totalNum - 1)
|
firstPod, _ := q.Pop()
|
||||||
// mark pods[1] ~ pods[totalNum-1] as unschedulable, fire goroutines to add them back later
|
if !reflect.DeepEqual(&expectedPods[0], firstPod) {
|
||||||
|
t.Errorf("Unexpected pod. Expected: %v, got: %v", &expectedPods[0], firstPod)
|
||||||
|
}
|
||||||
|
|
||||||
|
// mark pods[1] ~ pods[totalNum-1] as unschedulable and add them back
|
||||||
for i := 1; i < totalNum; i++ {
|
for i := 1; i < totalNum; i++ {
|
||||||
unschedulablePod := expectedPods[i].DeepCopy()
|
unschedulablePod := expectedPods[i].DeepCopy()
|
||||||
unschedulablePod.Status = v1.PodStatus{
|
unschedulablePod.Status = v1.PodStatus{
|
||||||
@ -263,24 +262,15 @@ func TestPriorityQueue_AddUnschedulableIfNotPresent_Async(t *testing.T) {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
cycle := q.SchedulingCycle()
|
|
||||||
go func() {
|
q.AddUnschedulableIfNotPresent(unschedulablePod, oldCycle)
|
||||||
<-moveReqChan
|
|
||||||
q.AddUnschedulableIfNotPresent(unschedulablePod, cycle)
|
|
||||||
wg.Done()
|
|
||||||
}()
|
|
||||||
}
|
}
|
||||||
firstPod, _ := q.Pop()
|
|
||||||
if !reflect.DeepEqual(&expectedPods[0], firstPod) {
|
// Since there was a move request at the same cycle as "oldCycle", these pods
|
||||||
t.Errorf("Unexpected pod. Expected: %v, got: %v", &expectedPods[0], firstPod)
|
// should be in the backoff queue.
|
||||||
}
|
|
||||||
// close moveReqChan here to make sure q.AddUnschedulableIfNotPresent is called after another pod is popped
|
|
||||||
close(moveReqChan)
|
|
||||||
wg.Wait()
|
|
||||||
// all other pods should be in active queue again
|
|
||||||
for i := 1; i < totalNum; i++ {
|
for i := 1; i < totalNum; i++ {
|
||||||
if _, exists, _ := q.activeQ.Get(newPodInfoNoTimestamp(&expectedPods[i])); !exists {
|
if _, exists, _ := q.podBackoffQ.Get(newPodInfoNoTimestamp(&expectedPods[i])); !exists {
|
||||||
t.Errorf("Expected %v to be added to activeQ.", expectedPods[i].Name)
|
t.Errorf("Expected %v to be added to podBackoffQ.", expectedPods[i].Name)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user