scheduler: fix tracking of concurrent events

The previous approach was based on the assumption that an in-flight pod can use
the head of the received event list as marker for identifying all events that
occur while the pod is in flight. That assumption is incorrect: when that
existing element gets removed from the list because all pods that were
in-flight when it was received are done, that marker's Next method returns nil
and the code which should have seen several concurrent events (if there were
any) missed all of those.

As a result, a pod with concurrent events could incorrectly get moved to the
unschedulable queue where it could got stuck until the next periodic purging
after 5 minutes if there was no other event for it.

The approach with maintaining a single list of concurrent events can be fixed
by inserting each in-flight pod into the list and using that element to
identify "more recent" events for the pod.
This commit is contained in:
Patrick Ohly 2023-09-04 15:48:45 +02:00
parent 6eca142082
commit cd943dd95e
2 changed files with 186 additions and 163 deletions

View File

@ -165,9 +165,26 @@ type PriorityQueue struct {
// inFlightPods holds the UID of all pods which have been popped out for which Done
// hasn't been called yet - in other words, all pods that are currently being
// processed (being scheduled, in permit, or in the binding cycle).
inFlightPods map[types.UID]inFlightPod
// receivedEvents holds the events received by the scheduling queue.
receivedEvents *list.List
//
// The values in the map are the entry of each pod in the inFlightEvents list.
// The value of that entry is the *v1.Pod at the time that scheduling of that
// pod started, which can be useful for logging or debugging.
inFlightPods map[types.UID]*list.Element
// inFlightEvents holds the events received by the scheduling queue
// (entry value is clusterEvent) together with in-flight pods (entry
// value is *v1.Pod). Entries get added at the end while the mutex is
// locked, so they get serialized.
//
// The pod entries are added in Pop and used to track which events
// occurred after the pod scheduling attempt for that pod started.
// They get removed when the scheduling attempt is done, at which
// point all events that occurred in the meantime are processed.
//
// After removal of a pod, events at the start of the list are no
// longer needed because all of the other in-flight pods started
// later. Those events can be removed.
inFlightEvents *list.List
// activeQ is heap structure that scheduler actively looks at to find pods to
// schedule. Head of heap is the highest priority pod.
@ -212,11 +229,6 @@ type QueueingHintFunction struct {
QueueingHintFn framework.QueueingHintFn
}
type inFlightPod struct {
// previousEvent is the latest observed event when the pod is popped.
previousEvent *list.Element
}
// clusterEvent has the event and involved objects.
type clusterEvent struct {
event framework.ClusterEvent
@ -224,11 +236,6 @@ type clusterEvent struct {
oldObj interface{}
// newObj is the object that involved this event.
newObj interface{}
// inFlightPodsNum is the counter of pods referring to this cluster event.
// It is initialized with the number of Pods being scheduled when the event is received,
// and is decremented when the scheduling for those Pods are Done().
inFlightPodsNum int
}
type priorityQueueOptions struct {
@ -364,8 +371,8 @@ func NewPriorityQueue(
podMaxInUnschedulablePodsDuration: options.podMaxInUnschedulablePodsDuration,
activeQ: heap.NewWithRecorder(podInfoKeyFunc, comp, metrics.NewActivePodsRecorder()),
unschedulablePods: newUnschedulablePods(metrics.NewUnschedulablePodsRecorder(), metrics.NewGatedPodsRecorder()),
inFlightPods: make(map[types.UID]inFlightPod),
receivedEvents: list.New(),
inFlightPods: make(map[types.UID]*list.Element),
inFlightEvents: list.New(),
preEnqueuePluginMap: options.preEnqueuePluginMap,
queueingHintMap: options.queueingHintMap,
metricsRecorder: options.metricsRecorder,
@ -607,36 +614,48 @@ func (p *PriorityQueue) SchedulingCycle() int64 {
// determineSchedulingHintForInFlightPod looks at the unschedulable plugins of the given Pod
// and determines the scheduling hint for this Pod while checking the events that happened during in-flight.
func (p *PriorityQueue) determineSchedulingHintForInFlightPod(logger klog.Logger, pInfo *framework.QueuedPodInfo, podSchedulingCycle int64) framework.QueueingHint {
if len(pInfo.UnschedulablePlugins) == 0 {
// When there is no unschedulable plugin, we cannot have a guess which event makes this Pod schedulable.
// Here, we use the latest requestCycle so that this Pod won't be stuck in the unschedulable pod pool for a long time.
if p.receivedEvents.Len() != 0 {
logger.V(5).Info("Checking events for in-flight pod", "pod", klog.KObj(pInfo.Pod), "unschedulablePlugins", pInfo.UnschedulablePlugins, "inFlightEventsSize", p.inFlightEvents.Len(), "inFlightPodsSize", len(p.inFlightPods))
// AddUnschedulableIfNotPresent is called with the Pod at the end of scheduling or binding.
// So, given pInfo should have been Pop()ed before,
// we can assume pInfo must be recorded in inFlightPods and thus inFlightEvents.
inFlightPod, ok := p.inFlightPods[pInfo.Pod.UID]
if !ok {
// This can happen while updating a pod. In that case pInfo.UnschedulablePlugins should
// be empty. If it is not, we may have a problem.
if len(pInfo.UnschedulablePlugins) != 0 {
logger.Error(nil, "In flight Pod isn't found in the scheduling queue. If you see this error log, it's likely a bug in the scheduler.", "pod", klog.KObj(pInfo.Pod))
return framework.QueueAfterBackoff
}
if p.inFlightEvents.Len() > len(p.inFlightPods) {
return framework.QueueAfterBackoff
}
return framework.QueueSkip
}
inFlightPod, ok := p.inFlightPods[pInfo.Pod.UID]
if !ok {
// It shouldn't reach here unless there is a bug somewhere.
// But, set podSchedulingCycle to moveRequestCycle
// so that this Pod won't stuck in the unschedulable pod pool.
logger.Error(nil, "In flight Pod isn't found in the scheduling queue. If you see this error log, it's likely a bug in the scheduler.", "pod", klog.KObj(pInfo.Pod))
return framework.QueueAfterBackoff
if len(pInfo.UnschedulablePlugins) == 0 {
// When there is no unschedulable plugin, we cannot have a guess which event makes this Pod schedulable.
// If there has been any concurrent event for the pod, it has to go to the backoff queue because the event
// may have been relevant.
for event := inFlightPod.Next(); event != nil; event = event.Next() {
_, ok := event.Value.(*clusterEvent)
if ok {
// There really was a concurrent event.
return framework.QueueAfterBackoff
}
}
return framework.QueueSkip
}
// AddUnschedulableIfNotPresent is called with the Pod at the end of scheduling or binding.
// So, given pInfo should have been Pop()ed before,
// we can assume pInfo must be recorded in inFlightPods.
// check if there is an event that makes this Pod schedulable based on pInfo.UnschedulablePlugins.
event := p.receivedEvents.Front()
if inFlightPod.previousEvent != nil {
// only check events that happened after the Pod was popped.
event = inFlightPod.previousEvent.Next()
}
schedulingHint := framework.QueueSkip
for ; event != nil; event = event.Next() {
e := event.Value.(*clusterEvent)
for event := inFlightPod.Next(); event != nil; event = event.Next() {
e, ok := event.Value.(*clusterEvent)
if !ok {
// Must be another pod. Can be ignored.
continue
}
logger.V(5).Info("Checking event for in-flight pod", "pod", klog.KObj(pInfo.Pod), "event", e.event.Label)
hint := p.isPodWorthRequeuing(logger, pInfo, e.event, e.oldObj, e.newObj)
if hint == framework.QueueSkip {
@ -816,11 +835,9 @@ func (p *PriorityQueue) Pop() (*framework.QueuedPodInfo, error) {
pInfo := obj.(*framework.QueuedPodInfo)
pInfo.Attempts++
p.schedulingCycle++
// In flight, no move request yet.
// In flight, no concurrent events yet.
if p.isSchedulingQueueHintEnabled {
p.inFlightPods[pInfo.Pod.UID] = inFlightPod{
previousEvent: p.receivedEvents.Back(),
}
p.inFlightPods[pInfo.Pod.UID] = p.inFlightEvents.PushBack(pInfo.Pod)
}
for plugin := range pInfo.UnschedulablePlugins {
@ -842,7 +859,7 @@ func (p *PriorityQueue) Done(pod types.UID) {
func (p *PriorityQueue) done(pod types.UID) {
if !p.isSchedulingQueueHintEnabled {
// do nothing if schedulingQueueHint is disabled.
// In that case, we don't have inFlightPods and receivedEvents.
// In that case, we don't have inFlightPods and inFlightEvents.
return
}
inFlightPod, ok := p.inFlightPods[pod]
@ -852,35 +869,25 @@ func (p *PriorityQueue) done(pod types.UID) {
}
delete(p.inFlightPods, pod)
// remove events which is only referred from this Pod
// so that the receivedEvents map doesn't grow infinitely.
// Remove the pod from the list.
p.inFlightEvents.Remove(inFlightPod)
// Find the event that we should start.
// case1. If the previousEvent is nil, it means no receivedEvents when this Pod's scheduling started.
// We start from the first event in the receivedEvents.
// case2. If the previousEvent is not nil, but the inFlightPodsNum is 0,
// this previousEvent is removed from the list already.
// We start from the first event in the receivedEvents.
event := p.receivedEvents.Front()
if inFlightPod.previousEvent != nil && inFlightPod.previousEvent.Value.(*clusterEvent).inFlightPodsNum != 0 {
// case3. If the previousEvent is not nil, and the inFlightPodsNum is not 0,
// we can start from the next event of the previousEvent.
event = inFlightPod.previousEvent.Next()
}
for event != nil {
e := event.Value.(*clusterEvent)
// decrement inFlightPodsNum on events that happened after the Pod is popped.
e.inFlightPodsNum--
if e.inFlightPodsNum <= 0 {
// remove the event from the list if no Pod refers to it.
eventToDelete := event
// we need to take next event before removal.
event = event.Next()
p.receivedEvents.Remove(eventToDelete)
continue
// Remove events which are only referred to by this Pod
// so that the inFlightEvents list doesn't grow infinitely.
// If the pod was at the head of the list, then all
// events between it and the next pod are no longer needed
// and can be removed.
for {
e := p.inFlightEvents.Front()
if e == nil {
// Empty list.
break
}
event = event.Next()
if _, ok := e.Value.(*clusterEvent); !ok {
// A pod, must stop pruning.
break
}
p.inFlightEvents.Remove(e)
}
}
@ -1108,14 +1115,14 @@ func (p *PriorityQueue) movePodsToActiveOrBackoffQueue(logger klog.Logger, podIn
// (no need to check the feature gate because there is always no p.inFlightPods when the feature is disabled.)
if len(p.inFlightPods) != 0 {
logger.V(5).Info("Event received while pods are in flight", "event", event.Label, "numPods", len(p.inFlightPods))
// AddUnschedulableIfNotPresent might get called for in-flight Pods later, and in
// AddUnschedulableIfNotPresent we need to know whether events were
// observed while scheduling them.
p.receivedEvents.PushBack(&clusterEvent{
event: event,
inFlightPodsNum: len(p.inFlightPods),
oldObj: oldObj,
newObj: newObj,
p.inFlightEvents.PushBack(&clusterEvent{
event: event,
oldObj: oldObj,
newObj: newObj,
})
}

View File

@ -179,20 +179,12 @@ func TestPriorityQueue_AddWithReversePriorityLessFunc(t *testing.T) {
}
}
func clusterEventsToList(clusterEvents []*clusterEvent) *list.List {
l := list.New()
for _, event := range clusterEvents {
l.PushBack(event)
}
return l
}
func listToClusterEvents(l *list.List) []*clusterEvent {
clusterEvents := []*clusterEvent{}
func listToValues(l *list.List) []interface{} {
var values []interface{}
for e := l.Front(); e != nil; e = e.Next() {
clusterEvents = append(clusterEvents, e.Value.(*clusterEvent))
values = append(values, e.Value)
}
return clusterEvents
return values
}
func Test_InFlightPods(t *testing.T) {
@ -213,24 +205,24 @@ func Test_InFlightPods(t *testing.T) {
// initialPods is the initial Pods in the activeQ.
initialPods []*v1.Pod
actions []action
wantInFlightPods map[types.UID]inFlightPod
wantInFlightPods []*v1.Pod
wantInFlightEvents []interface{}
wantActiveQPodNames []string
wantBackoffQPodNames []string
wantUnschedPodPoolPodNames []string
wantReceivedEvents *list.List
isSchedulingQueueHintEnabled bool
}{
{
name: "when SchedulingQueueHint is disabled, inFlightPods and receivedEvents should be empty",
name: "when SchedulingQueueHint is disabled, inFlightPods and inFlightEvents should be empty",
initialPods: []*v1.Pod{pod},
actions: []action{
// This Pod shouldn't be added to inFlightPods because SchedulingQueueHint is disabled.
{podPopped: pod},
// This event shouldn't be added to receivedEvents because SchedulingQueueHint is disabled.
// This event shouldn't be added to inFlightEvents because SchedulingQueueHint is disabled.
{eventHappens: &PvAdd},
},
wantInFlightPods: map[types.UID]inFlightPod{},
wantReceivedEvents: clusterEventsToList([]*clusterEvent{}),
wantInFlightPods: nil,
wantInFlightEvents: nil,
},
{
name: "when SchedulingQueueHint is disabled, which queue to enqueue Pod should be decided without SchedulingQueueHint",
@ -241,8 +233,8 @@ func Test_InFlightPods(t *testing.T) {
{podEnqueued: newQueuedPodInfoForLookup(pod, "fooPlugin1")},
},
wantBackoffQPodNames: []string{"targetpod"},
wantInFlightPods: map[types.UID]inFlightPod{},
wantReceivedEvents: list.New(),
wantInFlightPods: nil,
wantInFlightEvents: nil,
queueingHintMap: QueueingHintMapPerProfile{
"": {
// This hint fn tells that this event doesn't make a Pod schedulable.
@ -257,33 +249,26 @@ func Test_InFlightPods(t *testing.T) {
},
},
{
name: "Pod is registered in inFlightPods with no previousEvent if Pod is popped from activeQ while no receivedEvents",
name: "Pod is registered in inFlightPods when Pod is popped from activeQ",
isSchedulingQueueHintEnabled: true,
initialPods: []*v1.Pod{pod},
actions: []action{
// This won't be added to receivedEvents because no inFlightPods at this point.
// This won't be added to inFlightEvents because no inFlightPods at this point.
{eventHappens: &PvcAdd},
// This Pod has no previousEvent because no receivedEvents at this point.
{podPopped: pod},
// This gets added for the pod.
{eventHappens: &PvAdd},
},
wantInFlightPods: map[types.UID]inFlightPod{
"pod1": {
// no previousEvent
},
},
wantReceivedEvents: clusterEventsToList([]*clusterEvent{
{event: PvAdd, inFlightPodsNum: 1},
}),
wantInFlightPods: []*v1.Pod{pod},
wantInFlightEvents: []interface{}{pod, PvAdd},
},
{
name: "Pod, registered in inFlightPods with no previousEvent, is enqueued back to activeQ",
name: "Pod, registered in inFlightPods, is enqueued back to activeQ",
isSchedulingQueueHintEnabled: true,
initialPods: []*v1.Pod{pod, pod2},
actions: []action{
// This won't be added to receivedEvents because no inFlightPods at this point.
// This won't be added to inFlightEvents because no inFlightPods at this point.
{eventHappens: &PvcAdd},
// This Pod has no previousEvent because no receivedEvents at this point.
{podPopped: pod},
{eventHappens: &PvAdd},
{podPopped: pod2},
@ -291,74 +276,47 @@ func Test_InFlightPods(t *testing.T) {
{podEnqueued: newQueuedPodInfoForLookup(pod)},
},
wantBackoffQPodNames: []string{"targetpod"},
wantInFlightPods: map[types.UID]inFlightPod{
"pod2": {
// When pod is enqueued back to queue, inFlightPodsNum in previousEvent is also updated to 0.
previousEvent: &list.Element{Value: &clusterEvent{event: PvAdd, inFlightPodsNum: 0}},
},
},
wantReceivedEvents: clusterEventsToList([]*clusterEvent{
// event: PvAdd is removed when pod is enqueued back to queue.
{event: NodeAdd, inFlightPodsNum: 1}, // inFlightPodsNum is updated from 2 to 1.
}),
wantInFlightPods: []*v1.Pod{pod2},
wantInFlightEvents: []interface{}{pod2, NodeAdd},
},
{
name: "Pod registered in inFlightPods with previousEvent with inFlightPodsNum:0 is enqueued back to activeQ",
name: "All Pods registered in inFlightPods are enqueued back to activeQ",
isSchedulingQueueHintEnabled: true,
initialPods: []*v1.Pod{pod, pod2},
actions: []action{
// This won't be added to receivedEvents because no inFlightPods at this point.
// This won't be added to inFlightEvents because no inFlightPods at this point.
{eventHappens: &PvcAdd},
// This Pod has no previousEvent because no receivedEvents at this point.
{podPopped: pod},
{eventHappens: &PvAdd},
{podPopped: pod2},
{eventHappens: &NodeAdd},
{podEnqueued: newQueuedPodInfoForLookup(pod)},
{eventHappens: &CSINodeUpdate},
// pod2 is registered in inFlightPods with previousEvent with inFlightPodsNum:0.
{podEnqueued: newQueuedPodInfoForLookup(pod2)},
},
wantBackoffQPodNames: []string{"targetpod", "targetpod2"},
wantInFlightPods: map[types.UID]inFlightPod{},
wantReceivedEvents: clusterEventsToList([]*clusterEvent{
// all events are correctly cleaned up.
}),
wantInFlightPods: nil,
wantInFlightEvents: nil,
},
{
name: "Pod registered in inFlightPods with previousEvent with inFlightPodsNum:non-zero is enqueued back to activeQ",
name: "One intermediate Pod registered in inFlightPods is enqueued back to activeQ",
isSchedulingQueueHintEnabled: true,
initialPods: []*v1.Pod{pod, pod2, pod3},
actions: []action{
// This won't be added to receivedEvents because no inFlightPods at this point.
// This won't be added to inFlightEvents because no inFlightPods at this point.
{eventHappens: &PvcAdd},
// This Pod has no previousEvent because no receivedEvents at this point.
{podPopped: pod},
{eventHappens: &PvAdd},
// This Pod will get previousEvent (PvAdd).
{podPopped: pod2},
{eventHappens: &NodeAdd},
// This Pod will get previousEvent (NodeAdd).
// This Pod won't be requeued again.
{podPopped: pod3},
{eventHappens: &AssignedPodAdd},
// pod2 is registered in inFlightPods with previousEvent with inFlightPodsNum:non-zero.
{podEnqueued: newQueuedPodInfoForLookup(pod2)},
},
wantBackoffQPodNames: []string{"targetpod2"},
wantInFlightPods: map[types.UID]inFlightPod{
"pod1": {
// no previousEvent
},
"pod3": {
previousEvent: &list.Element{Value: &clusterEvent{event: NodeAdd, inFlightPodsNum: 1}},
},
},
wantReceivedEvents: clusterEventsToList([]*clusterEvent{
{event: PvAdd, inFlightPodsNum: 1},
{event: NodeAdd, inFlightPodsNum: 1},
{event: AssignedPodAdd, inFlightPodsNum: 2},
}),
wantInFlightPods: []*v1.Pod{pod, pod3},
wantInFlightEvents: []interface{}{pod, PvAdd, NodeAdd, pod3, AssignedPodAdd},
},
{
name: "events before popping Pod are ignored",
@ -371,8 +329,8 @@ func Test_InFlightPods(t *testing.T) {
{podEnqueued: newQueuedPodInfoForLookup(pod, "fooPlugin1")},
},
wantUnschedPodPoolPodNames: []string{"targetpod"},
wantInFlightPods: map[types.UID]inFlightPod{},
wantReceivedEvents: list.New(),
wantInFlightPods: nil,
wantInFlightEvents: nil,
queueingHintMap: QueueingHintMapPerProfile{
"": {
// fooPlugin1 has a queueing hint function for AssignedPodAdd,
@ -396,8 +354,8 @@ func Test_InFlightPods(t *testing.T) {
{podEnqueued: newQueuedPodInfoForLookup(pod)},
},
wantBackoffQPodNames: []string{"targetpod"},
wantInFlightPods: map[types.UID]inFlightPod{},
wantReceivedEvents: list.New(),
wantInFlightPods: nil,
wantInFlightEvents: nil,
queueingHintMap: QueueingHintMapPerProfile{
"": {
// It will be ignored because no failed plugin.
@ -420,8 +378,8 @@ func Test_InFlightPods(t *testing.T) {
{podEnqueued: newQueuedPodInfoForLookup(pod, "fooPlugin1")},
},
wantUnschedPodPoolPodNames: []string{"targetpod"},
wantInFlightPods: map[types.UID]inFlightPod{},
wantReceivedEvents: list.New(),
wantInFlightPods: nil,
wantInFlightEvents: nil,
queueingHintMap: QueueingHintMapPerProfile{
"": {
// fooPlugin1 has no queueing hint function for NodeAdd.
@ -445,8 +403,8 @@ func Test_InFlightPods(t *testing.T) {
{podEnqueued: newQueuedPodInfoForLookup(pod, "fooPlugin1")},
},
wantUnschedPodPoolPodNames: []string{"targetpod"},
wantInFlightPods: map[types.UID]inFlightPod{},
wantReceivedEvents: list.New(),
wantInFlightPods: nil,
wantInFlightEvents: nil,
queueingHintMap: QueueingHintMapPerProfile{
"": {
// fooPlugin1 has a queueing hint function for AssignedPodAdd,
@ -470,8 +428,8 @@ func Test_InFlightPods(t *testing.T) {
{podEnqueued: newQueuedPodInfoForLookup(pod, "fooPlugin1", "fooPlugin2", "fooPlugin3")},
},
wantActiveQPodNames: []string{"targetpod"},
wantInFlightPods: map[types.UID]inFlightPod{},
wantReceivedEvents: list.New(),
wantInFlightPods: nil,
wantInFlightEvents: nil,
queueingHintMap: QueueingHintMapPerProfile{
"": {
AssignedPodAdd: {
@ -504,8 +462,8 @@ func Test_InFlightPods(t *testing.T) {
{podEnqueued: newQueuedPodInfoForLookup(pod, "fooPlugin1", "fooPlugin2")},
},
wantBackoffQPodNames: []string{"targetpod"},
wantInFlightPods: map[types.UID]inFlightPod{},
wantReceivedEvents: list.New(),
wantInFlightPods: nil,
wantInFlightEvents: nil,
queueingHintMap: QueueingHintMapPerProfile{
"": {
AssignedPodAdd: {
@ -559,12 +517,27 @@ func Test_InFlightPods(t *testing.T) {
}
}
if diff := cmp.Diff(test.wantInFlightPods, q.inFlightPods, cmp.AllowUnexported(inFlightPod{}, list.Element{}, clusterEvent{}), cmpopts.IgnoreFields(list.Element{}, "next", "prev", "list")); diff != "" {
actualInFlightPods := make(map[types.UID]*v1.Pod)
for uid, element := range q.inFlightPods {
actualInFlightPods[uid] = element.Value.(*v1.Pod)
}
wantInFlightPods := make(map[types.UID]*v1.Pod)
for _, pod := range test.wantInFlightPods {
wantInFlightPods[pod.UID] = pod
}
if diff := cmp.Diff(wantInFlightPods, actualInFlightPods); diff != "" {
t.Errorf("Unexpected diff in inFlightPods (-want, +got):\n%s", diff)
}
if diff := cmp.Diff(listToClusterEvents(test.wantReceivedEvents), listToClusterEvents(q.receivedEvents), cmp.AllowUnexported(clusterEvent{})); diff != "" {
t.Errorf("Unexpected diff in receivedEvents (-want, +got):\n%s", diff)
var wantInFlightEvents []interface{}
for _, value := range test.wantInFlightEvents {
if event, ok := value.(framework.ClusterEvent); ok {
value = &clusterEvent{event: event}
}
wantInFlightEvents = append(wantInFlightEvents, value)
}
if diff := cmp.Diff(wantInFlightEvents, listToValues(q.inFlightEvents), cmp.AllowUnexported(clusterEvent{})); diff != "" {
t.Errorf("Unexpected diff in inFlightEvents (-want, +got):\n%s", diff)
}
if test.wantActiveQPodNames != nil {
@ -1231,28 +1204,33 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueue(t *testing.T) {
if p, err := q.Pop(); err != nil || p.Pod != unschedulablePodInfo.Pod {
t.Errorf("Expected: %v after Pop, but got: %v", unschedulablePodInfo.Pod.Name, p.Pod.Name)
}
expectInFlightPods(t, q, unschedulablePodInfo.Pod.UID)
q.activeQ.Add(q.newQueuedPodInfo(highPriorityPodInfo.Pod))
if p, err := q.Pop(); err != nil || p.Pod != highPriorityPodInfo.Pod {
t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPodInfo.Pod.Name, p.Pod.Name)
}
expectInFlightPods(t, q, unschedulablePodInfo.Pod.UID, highPriorityPodInfo.Pod.UID)
q.AddUnschedulableIfNotPresent(logger, q.newQueuedPodInfo(unschedulablePodInfo.Pod, "fooPlugin"), q.SchedulingCycle())
q.AddUnschedulableIfNotPresent(logger, q.newQueuedPodInfo(highPriorityPodInfo.Pod, "fooPlugin"), q.SchedulingCycle())
expectInFlightPods(t, q)
// Construct a Pod, but don't associate its scheduler failure to any plugin
hpp1 := highPriorityPodInfo.Pod.DeepCopy()
hpp1.Name = "hpp1"
hpp1 := clonePod(highPriorityPodInfo.Pod, "hpp1")
q.activeQ.Add(q.newQueuedPodInfo(hpp1))
if p, err := q.Pop(); err != nil || p.Pod != hpp1 {
t.Errorf("Expected: %v after Pop, but got: %v", hpp1, p.Pod.Name)
}
expectInFlightPods(t, q, hpp1.UID)
q.AddUnschedulableIfNotPresent(logger, q.newQueuedPodInfo(hpp1), q.SchedulingCycle())
expectInFlightPods(t, q)
// Construct another Pod, and associate its scheduler failure to plugin "barPlugin".
hpp2 := highPriorityPodInfo.Pod.DeepCopy()
hpp2.Name = "hpp2"
hpp2 := clonePod(highPriorityPodInfo.Pod, "hpp2")
q.activeQ.Add(q.newQueuedPodInfo(hpp2))
if p, err := q.Pop(); err != nil || p.Pod != hpp2 {
t.Errorf("Expected: %v after Pop, but got: %v", hpp2, p.Pod.Name)
}
expectInFlightPods(t, q, hpp2.UID)
q.AddUnschedulableIfNotPresent(logger, q.newQueuedPodInfo(hpp2, "barPlugin"), q.SchedulingCycle())
expectInFlightPods(t, q)
// Pods is still backing off, move the pod into backoffQ.
q.MoveAllToActiveOrBackoffQueue(logger, NodeAdd, nil, nil, nil)
q.Add(logger, medPriorityPodInfo.Pod)
@ -1263,32 +1241,41 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueue(t *testing.T) {
if p, err := q.Pop(); err != nil || p.Pod != medPriorityPodInfo.Pod {
t.Errorf("Expected: %v after Pop, but got: %v", medPriorityPodInfo.Pod, p.Pod.Name)
}
expectInFlightPods(t, q, medPriorityPodInfo.Pod.UID)
// hpp2 won't be moved.
if q.podBackoffQ.Len() != 3 {
t.Fatalf("Expected 3 items to be in podBackoffQ, but got: %v", q.podBackoffQ.Len())
}
// pop out the pods in the backoffQ.
// This doesn't make them in-flight pods.
for q.podBackoffQ.Len() != 0 {
q.podBackoffQ.Pop()
}
expectInFlightPods(t, q, medPriorityPodInfo.Pod.UID)
q.schedulingCycle++
q.activeQ.Add(q.newQueuedPodInfo(unschedulablePodInfo.Pod))
if p, err := q.Pop(); err != nil || p.Pod != unschedulablePodInfo.Pod {
t.Errorf("Expected: %v after Pop, but got: %v", unschedulablePodInfo.Pod.Name, p.Pod.Name)
}
expectInFlightPods(t, q, medPriorityPodInfo.Pod.UID, unschedulablePodInfo.Pod.UID)
q.activeQ.Add(q.newQueuedPodInfo(highPriorityPodInfo.Pod))
if p, err := q.Pop(); err != nil || p.Pod != highPriorityPodInfo.Pod {
t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPodInfo.Pod.Name, p.Pod.Name)
}
expectInFlightPods(t, q, medPriorityPodInfo.Pod.UID, unschedulablePodInfo.Pod.UID, highPriorityPodInfo.Pod.UID)
q.activeQ.Add(q.newQueuedPodInfo(hpp1))
if p, err := q.Pop(); err != nil || p.Pod != hpp1 {
t.Errorf("Expected: %v after Pop, but got: %v", hpp1, p.Pod.Name)
}
expectInFlightPods(t, q, medPriorityPodInfo.Pod.UID, unschedulablePodInfo.Pod.UID, highPriorityPodInfo.Pod.UID, hpp1.UID)
q.AddUnschedulableIfNotPresent(logger, q.newQueuedPodInfo(unschedulablePodInfo.Pod, "fooPlugin"), q.SchedulingCycle())
expectInFlightPods(t, q, medPriorityPodInfo.Pod.UID, highPriorityPodInfo.Pod.UID, hpp1.UID)
q.AddUnschedulableIfNotPresent(logger, q.newQueuedPodInfo(highPriorityPodInfo.Pod, "fooPlugin"), q.SchedulingCycle())
expectInFlightPods(t, q, medPriorityPodInfo.Pod.UID, hpp1.UID)
q.AddUnschedulableIfNotPresent(logger, q.newQueuedPodInfo(hpp1), q.SchedulingCycle())
expectInFlightPods(t, q, medPriorityPodInfo.Pod.UID)
q.Add(logger, medPriorityPodInfo.Pod)
for _, pod := range []*v1.Pod{unschedulablePodInfo.Pod, highPriorityPodInfo.Pod, hpp1, hpp2} {
if q.unschedulablePods.get(pod) == nil {
@ -1306,6 +1293,35 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueue(t *testing.T) {
if q.podBackoffQ.Len() != 0 {
t.Errorf("Expected 0 item to be in podBackoffQ, but got: %v", q.podBackoffQ.Len())
}
expectInFlightPods(t, q, medPriorityPodInfo.Pod.UID)
}
func clonePod(pod *v1.Pod, newName string) *v1.Pod {
pod = pod.DeepCopy()
pod.Name = newName
pod.UID = types.UID(pod.Name + pod.Namespace)
return pod
}
func expectInFlightPods(t *testing.T, q *PriorityQueue, uids ...types.UID) {
t.Helper()
var actualUIDs []types.UID
for uid := range q.inFlightPods {
actualUIDs = append(actualUIDs, uid)
}
sortUIDs := cmpopts.SortSlices(func(a, b types.UID) bool { return a < b })
if diff := cmp.Diff(uids, actualUIDs, sortUIDs); diff != "" {
t.Fatalf("Unexpected content of inFlightPods (-want, +have):\n%s", diff)
}
actualUIDs = nil
for e := q.inFlightEvents.Front(); e != nil; e = e.Next() {
if pod, ok := e.Value.(*v1.Pod); ok {
actualUIDs = append(actualUIDs, pod.UID)
}
}
if diff := cmp.Diff(uids, actualUIDs, sortUIDs); diff != "" {
t.Fatalf("Unexpected pods in inFlightEvents (-want, +have):\n%s", diff)
}
}
// TestPriorityQueue_AssignedPodAdded tests AssignedPodAdded. It checks that