mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-02 08:17:26 +00:00
Merge pull request #120413 from pohly/scheduler-in-flight-events-fix
scheduler: fix tracking of concurrent events
This commit is contained in:
commit
a7f9e70384
@ -165,9 +165,26 @@ type PriorityQueue struct {
|
|||||||
// inFlightPods holds the UID of all pods which have been popped out for which Done
|
// inFlightPods holds the UID of all pods which have been popped out for which Done
|
||||||
// hasn't been called yet - in other words, all pods that are currently being
|
// hasn't been called yet - in other words, all pods that are currently being
|
||||||
// processed (being scheduled, in permit, or in the binding cycle).
|
// processed (being scheduled, in permit, or in the binding cycle).
|
||||||
inFlightPods map[types.UID]inFlightPod
|
//
|
||||||
// receivedEvents holds the events received by the scheduling queue.
|
// The values in the map are the entry of each pod in the inFlightEvents list.
|
||||||
receivedEvents *list.List
|
// The value of that entry is the *v1.Pod at the time that scheduling of that
|
||||||
|
// pod started, which can be useful for logging or debugging.
|
||||||
|
inFlightPods map[types.UID]*list.Element
|
||||||
|
|
||||||
|
// inFlightEvents holds the events received by the scheduling queue
|
||||||
|
// (entry value is clusterEvent) together with in-flight pods (entry
|
||||||
|
// value is *v1.Pod). Entries get added at the end while the mutex is
|
||||||
|
// locked, so they get serialized.
|
||||||
|
//
|
||||||
|
// The pod entries are added in Pop and used to track which events
|
||||||
|
// occurred after the pod scheduling attempt for that pod started.
|
||||||
|
// They get removed when the scheduling attempt is done, at which
|
||||||
|
// point all events that occurred in the meantime are processed.
|
||||||
|
//
|
||||||
|
// After removal of a pod, events at the start of the list are no
|
||||||
|
// longer needed because all of the other in-flight pods started
|
||||||
|
// later. Those events can be removed.
|
||||||
|
inFlightEvents *list.List
|
||||||
|
|
||||||
// activeQ is heap structure that scheduler actively looks at to find pods to
|
// activeQ is heap structure that scheduler actively looks at to find pods to
|
||||||
// schedule. Head of heap is the highest priority pod.
|
// schedule. Head of heap is the highest priority pod.
|
||||||
@ -212,11 +229,6 @@ type QueueingHintFunction struct {
|
|||||||
QueueingHintFn framework.QueueingHintFn
|
QueueingHintFn framework.QueueingHintFn
|
||||||
}
|
}
|
||||||
|
|
||||||
type inFlightPod struct {
|
|
||||||
// previousEvent is the latest observed event when the pod is popped.
|
|
||||||
previousEvent *list.Element
|
|
||||||
}
|
|
||||||
|
|
||||||
// clusterEvent has the event and involved objects.
|
// clusterEvent has the event and involved objects.
|
||||||
type clusterEvent struct {
|
type clusterEvent struct {
|
||||||
event framework.ClusterEvent
|
event framework.ClusterEvent
|
||||||
@ -224,11 +236,6 @@ type clusterEvent struct {
|
|||||||
oldObj interface{}
|
oldObj interface{}
|
||||||
// newObj is the object that involved this event.
|
// newObj is the object that involved this event.
|
||||||
newObj interface{}
|
newObj interface{}
|
||||||
|
|
||||||
// inFlightPodsNum is the counter of pods referring to this cluster event.
|
|
||||||
// It is initialized with the number of Pods being scheduled when the event is received,
|
|
||||||
// and is decremented when the scheduling for those Pods are Done().
|
|
||||||
inFlightPodsNum int
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type priorityQueueOptions struct {
|
type priorityQueueOptions struct {
|
||||||
@ -364,8 +371,8 @@ func NewPriorityQueue(
|
|||||||
podMaxInUnschedulablePodsDuration: options.podMaxInUnschedulablePodsDuration,
|
podMaxInUnschedulablePodsDuration: options.podMaxInUnschedulablePodsDuration,
|
||||||
activeQ: heap.NewWithRecorder(podInfoKeyFunc, comp, metrics.NewActivePodsRecorder()),
|
activeQ: heap.NewWithRecorder(podInfoKeyFunc, comp, metrics.NewActivePodsRecorder()),
|
||||||
unschedulablePods: newUnschedulablePods(metrics.NewUnschedulablePodsRecorder(), metrics.NewGatedPodsRecorder()),
|
unschedulablePods: newUnschedulablePods(metrics.NewUnschedulablePodsRecorder(), metrics.NewGatedPodsRecorder()),
|
||||||
inFlightPods: make(map[types.UID]inFlightPod),
|
inFlightPods: make(map[types.UID]*list.Element),
|
||||||
receivedEvents: list.New(),
|
inFlightEvents: list.New(),
|
||||||
preEnqueuePluginMap: options.preEnqueuePluginMap,
|
preEnqueuePluginMap: options.preEnqueuePluginMap,
|
||||||
queueingHintMap: options.queueingHintMap,
|
queueingHintMap: options.queueingHintMap,
|
||||||
metricsRecorder: options.metricsRecorder,
|
metricsRecorder: options.metricsRecorder,
|
||||||
@ -607,36 +614,48 @@ func (p *PriorityQueue) SchedulingCycle() int64 {
|
|||||||
// determineSchedulingHintForInFlightPod looks at the unschedulable plugins of the given Pod
|
// determineSchedulingHintForInFlightPod looks at the unschedulable plugins of the given Pod
|
||||||
// and determines the scheduling hint for this Pod while checking the events that happened during in-flight.
|
// and determines the scheduling hint for this Pod while checking the events that happened during in-flight.
|
||||||
func (p *PriorityQueue) determineSchedulingHintForInFlightPod(logger klog.Logger, pInfo *framework.QueuedPodInfo, podSchedulingCycle int64) framework.QueueingHint {
|
func (p *PriorityQueue) determineSchedulingHintForInFlightPod(logger klog.Logger, pInfo *framework.QueuedPodInfo, podSchedulingCycle int64) framework.QueueingHint {
|
||||||
if len(pInfo.UnschedulablePlugins) == 0 {
|
logger.V(5).Info("Checking events for in-flight pod", "pod", klog.KObj(pInfo.Pod), "unschedulablePlugins", pInfo.UnschedulablePlugins, "inFlightEventsSize", p.inFlightEvents.Len(), "inFlightPodsSize", len(p.inFlightPods))
|
||||||
// When there is no unschedulable plugin, we cannot have a guess which event makes this Pod schedulable.
|
|
||||||
// Here, we use the latest requestCycle so that this Pod won't be stuck in the unschedulable pod pool for a long time.
|
// AddUnschedulableIfNotPresent is called with the Pod at the end of scheduling or binding.
|
||||||
if p.receivedEvents.Len() != 0 {
|
// So, given pInfo should have been Pop()ed before,
|
||||||
|
// we can assume pInfo must be recorded in inFlightPods and thus inFlightEvents.
|
||||||
|
inFlightPod, ok := p.inFlightPods[pInfo.Pod.UID]
|
||||||
|
if !ok {
|
||||||
|
// This can happen while updating a pod. In that case pInfo.UnschedulablePlugins should
|
||||||
|
// be empty. If it is not, we may have a problem.
|
||||||
|
if len(pInfo.UnschedulablePlugins) != 0 {
|
||||||
|
logger.Error(nil, "In flight Pod isn't found in the scheduling queue. If you see this error log, it's likely a bug in the scheduler.", "pod", klog.KObj(pInfo.Pod))
|
||||||
|
return framework.QueueAfterBackoff
|
||||||
|
}
|
||||||
|
if p.inFlightEvents.Len() > len(p.inFlightPods) {
|
||||||
return framework.QueueAfterBackoff
|
return framework.QueueAfterBackoff
|
||||||
}
|
}
|
||||||
return framework.QueueSkip
|
return framework.QueueSkip
|
||||||
}
|
}
|
||||||
|
|
||||||
inFlightPod, ok := p.inFlightPods[pInfo.Pod.UID]
|
if len(pInfo.UnschedulablePlugins) == 0 {
|
||||||
if !ok {
|
// When there is no unschedulable plugin, we cannot have a guess which event makes this Pod schedulable.
|
||||||
// It shouldn't reach here unless there is a bug somewhere.
|
// If there has been any concurrent event for the pod, it has to go to the backoff queue because the event
|
||||||
// But, set podSchedulingCycle to moveRequestCycle
|
// may have been relevant.
|
||||||
// so that this Pod won't stuck in the unschedulable pod pool.
|
for event := inFlightPod.Next(); event != nil; event = event.Next() {
|
||||||
logger.Error(nil, "In flight Pod isn't found in the scheduling queue. If you see this error log, it's likely a bug in the scheduler.", "pod", klog.KObj(pInfo.Pod))
|
_, ok := event.Value.(*clusterEvent)
|
||||||
return framework.QueueAfterBackoff
|
if ok {
|
||||||
|
// There really was a concurrent event.
|
||||||
|
return framework.QueueAfterBackoff
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return framework.QueueSkip
|
||||||
}
|
}
|
||||||
|
|
||||||
// AddUnschedulableIfNotPresent is called with the Pod at the end of scheduling or binding.
|
|
||||||
// So, given pInfo should have been Pop()ed before,
|
|
||||||
// we can assume pInfo must be recorded in inFlightPods.
|
|
||||||
// check if there is an event that makes this Pod schedulable based on pInfo.UnschedulablePlugins.
|
// check if there is an event that makes this Pod schedulable based on pInfo.UnschedulablePlugins.
|
||||||
event := p.receivedEvents.Front()
|
|
||||||
if inFlightPod.previousEvent != nil {
|
|
||||||
// only check events that happened after the Pod was popped.
|
|
||||||
event = inFlightPod.previousEvent.Next()
|
|
||||||
}
|
|
||||||
schedulingHint := framework.QueueSkip
|
schedulingHint := framework.QueueSkip
|
||||||
for ; event != nil; event = event.Next() {
|
for event := inFlightPod.Next(); event != nil; event = event.Next() {
|
||||||
e := event.Value.(*clusterEvent)
|
e, ok := event.Value.(*clusterEvent)
|
||||||
|
if !ok {
|
||||||
|
// Must be another pod. Can be ignored.
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
logger.V(5).Info("Checking event for in-flight pod", "pod", klog.KObj(pInfo.Pod), "event", e.event.Label)
|
||||||
|
|
||||||
hint := p.isPodWorthRequeuing(logger, pInfo, e.event, e.oldObj, e.newObj)
|
hint := p.isPodWorthRequeuing(logger, pInfo, e.event, e.oldObj, e.newObj)
|
||||||
if hint == framework.QueueSkip {
|
if hint == framework.QueueSkip {
|
||||||
@ -816,11 +835,9 @@ func (p *PriorityQueue) Pop() (*framework.QueuedPodInfo, error) {
|
|||||||
pInfo := obj.(*framework.QueuedPodInfo)
|
pInfo := obj.(*framework.QueuedPodInfo)
|
||||||
pInfo.Attempts++
|
pInfo.Attempts++
|
||||||
p.schedulingCycle++
|
p.schedulingCycle++
|
||||||
// In flight, no move request yet.
|
// In flight, no concurrent events yet.
|
||||||
if p.isSchedulingQueueHintEnabled {
|
if p.isSchedulingQueueHintEnabled {
|
||||||
p.inFlightPods[pInfo.Pod.UID] = inFlightPod{
|
p.inFlightPods[pInfo.Pod.UID] = p.inFlightEvents.PushBack(pInfo.Pod)
|
||||||
previousEvent: p.receivedEvents.Back(),
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
for plugin := range pInfo.UnschedulablePlugins {
|
for plugin := range pInfo.UnschedulablePlugins {
|
||||||
@ -842,7 +859,7 @@ func (p *PriorityQueue) Done(pod types.UID) {
|
|||||||
func (p *PriorityQueue) done(pod types.UID) {
|
func (p *PriorityQueue) done(pod types.UID) {
|
||||||
if !p.isSchedulingQueueHintEnabled {
|
if !p.isSchedulingQueueHintEnabled {
|
||||||
// do nothing if schedulingQueueHint is disabled.
|
// do nothing if schedulingQueueHint is disabled.
|
||||||
// In that case, we don't have inFlightPods and receivedEvents.
|
// In that case, we don't have inFlightPods and inFlightEvents.
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
inFlightPod, ok := p.inFlightPods[pod]
|
inFlightPod, ok := p.inFlightPods[pod]
|
||||||
@ -852,35 +869,25 @@ func (p *PriorityQueue) done(pod types.UID) {
|
|||||||
}
|
}
|
||||||
delete(p.inFlightPods, pod)
|
delete(p.inFlightPods, pod)
|
||||||
|
|
||||||
// remove events which is only referred from this Pod
|
// Remove the pod from the list.
|
||||||
// so that the receivedEvents map doesn't grow infinitely.
|
p.inFlightEvents.Remove(inFlightPod)
|
||||||
|
|
||||||
// Find the event that we should start.
|
// Remove events which are only referred to by this Pod
|
||||||
// case1. If the previousEvent is nil, it means no receivedEvents when this Pod's scheduling started.
|
// so that the inFlightEvents list doesn't grow infinitely.
|
||||||
// We start from the first event in the receivedEvents.
|
// If the pod was at the head of the list, then all
|
||||||
// case2. If the previousEvent is not nil, but the inFlightPodsNum is 0,
|
// events between it and the next pod are no longer needed
|
||||||
// this previousEvent is removed from the list already.
|
// and can be removed.
|
||||||
// We start from the first event in the receivedEvents.
|
for {
|
||||||
event := p.receivedEvents.Front()
|
e := p.inFlightEvents.Front()
|
||||||
if inFlightPod.previousEvent != nil && inFlightPod.previousEvent.Value.(*clusterEvent).inFlightPodsNum != 0 {
|
if e == nil {
|
||||||
// case3. If the previousEvent is not nil, and the inFlightPodsNum is not 0,
|
// Empty list.
|
||||||
// we can start from the next event of the previousEvent.
|
break
|
||||||
event = inFlightPod.previousEvent.Next()
|
|
||||||
}
|
|
||||||
|
|
||||||
for event != nil {
|
|
||||||
e := event.Value.(*clusterEvent)
|
|
||||||
// decrement inFlightPodsNum on events that happened after the Pod is popped.
|
|
||||||
e.inFlightPodsNum--
|
|
||||||
if e.inFlightPodsNum <= 0 {
|
|
||||||
// remove the event from the list if no Pod refers to it.
|
|
||||||
eventToDelete := event
|
|
||||||
// we need to take next event before removal.
|
|
||||||
event = event.Next()
|
|
||||||
p.receivedEvents.Remove(eventToDelete)
|
|
||||||
continue
|
|
||||||
}
|
}
|
||||||
event = event.Next()
|
if _, ok := e.Value.(*clusterEvent); !ok {
|
||||||
|
// A pod, must stop pruning.
|
||||||
|
break
|
||||||
|
}
|
||||||
|
p.inFlightEvents.Remove(e)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1108,14 +1115,14 @@ func (p *PriorityQueue) movePodsToActiveOrBackoffQueue(logger klog.Logger, podIn
|
|||||||
|
|
||||||
// (no need to check the feature gate because there is always no p.inFlightPods when the feature is disabled.)
|
// (no need to check the feature gate because there is always no p.inFlightPods when the feature is disabled.)
|
||||||
if len(p.inFlightPods) != 0 {
|
if len(p.inFlightPods) != 0 {
|
||||||
|
logger.V(5).Info("Event received while pods are in flight", "event", event.Label, "numPods", len(p.inFlightPods))
|
||||||
// AddUnschedulableIfNotPresent might get called for in-flight Pods later, and in
|
// AddUnschedulableIfNotPresent might get called for in-flight Pods later, and in
|
||||||
// AddUnschedulableIfNotPresent we need to know whether events were
|
// AddUnschedulableIfNotPresent we need to know whether events were
|
||||||
// observed while scheduling them.
|
// observed while scheduling them.
|
||||||
p.receivedEvents.PushBack(&clusterEvent{
|
p.inFlightEvents.PushBack(&clusterEvent{
|
||||||
event: event,
|
event: event,
|
||||||
inFlightPodsNum: len(p.inFlightPods),
|
oldObj: oldObj,
|
||||||
oldObj: oldObj,
|
newObj: newObj,
|
||||||
newObj: newObj,
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -179,32 +179,37 @@ func TestPriorityQueue_AddWithReversePriorityLessFunc(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func clusterEventsToList(clusterEvents []*clusterEvent) *list.List {
|
func listToValues(l *list.List) []interface{} {
|
||||||
l := list.New()
|
var values []interface{}
|
||||||
for _, event := range clusterEvents {
|
|
||||||
l.PushBack(event)
|
|
||||||
}
|
|
||||||
return l
|
|
||||||
}
|
|
||||||
|
|
||||||
func listToClusterEvents(l *list.List) []*clusterEvent {
|
|
||||||
clusterEvents := []*clusterEvent{}
|
|
||||||
for e := l.Front(); e != nil; e = e.Next() {
|
for e := l.Front(); e != nil; e = e.Next() {
|
||||||
clusterEvents = append(clusterEvents, e.Value.(*clusterEvent))
|
values = append(values, e.Value)
|
||||||
}
|
}
|
||||||
return clusterEvents
|
return values
|
||||||
}
|
}
|
||||||
|
|
||||||
func Test_InFlightPods(t *testing.T) {
|
func Test_InFlightPods(t *testing.T) {
|
||||||
pod := st.MakePod().Name("targetpod").UID("pod1").Obj()
|
pod := st.MakePod().Name("targetpod").UID("pod1").Obj()
|
||||||
pod2 := st.MakePod().Name("targetpod2").UID("pod2").Obj()
|
pod2 := st.MakePod().Name("targetpod2").UID("pod2").Obj()
|
||||||
pod3 := st.MakePod().Name("targetpod3").UID("pod3").Obj()
|
pod3 := st.MakePod().Name("targetpod3").UID("pod3").Obj()
|
||||||
|
var poppedPod, poppedPod2 *framework.QueuedPodInfo
|
||||||
|
|
||||||
type action struct {
|
type action struct {
|
||||||
// ONLY ONE of the following should be set.
|
// ONLY ONE of the following should be set.
|
||||||
eventHappens *framework.ClusterEvent
|
eventHappens *framework.ClusterEvent
|
||||||
podPopped *v1.Pod
|
podPopped *v1.Pod
|
||||||
podEnqueued *framework.QueuedPodInfo
|
podEnqueued *framework.QueuedPodInfo
|
||||||
|
callback func(t *testing.T, q *PriorityQueue)
|
||||||
|
}
|
||||||
|
|
||||||
|
popPod := func(t *testing.T, q *PriorityQueue, pod *v1.Pod) *framework.QueuedPodInfo {
|
||||||
|
p, err := q.Pop()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Pop failed: %v", err)
|
||||||
|
}
|
||||||
|
if p.Pod.UID != pod.UID {
|
||||||
|
t.Errorf("Unexpected popped pod: %v", p)
|
||||||
|
}
|
||||||
|
return p
|
||||||
}
|
}
|
||||||
|
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
@ -213,24 +218,24 @@ func Test_InFlightPods(t *testing.T) {
|
|||||||
// initialPods is the initial Pods in the activeQ.
|
// initialPods is the initial Pods in the activeQ.
|
||||||
initialPods []*v1.Pod
|
initialPods []*v1.Pod
|
||||||
actions []action
|
actions []action
|
||||||
wantInFlightPods map[types.UID]inFlightPod
|
wantInFlightPods []*v1.Pod
|
||||||
|
wantInFlightEvents []interface{}
|
||||||
wantActiveQPodNames []string
|
wantActiveQPodNames []string
|
||||||
wantBackoffQPodNames []string
|
wantBackoffQPodNames []string
|
||||||
wantUnschedPodPoolPodNames []string
|
wantUnschedPodPoolPodNames []string
|
||||||
wantReceivedEvents *list.List
|
|
||||||
isSchedulingQueueHintEnabled bool
|
isSchedulingQueueHintEnabled bool
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
name: "when SchedulingQueueHint is disabled, inFlightPods and receivedEvents should be empty",
|
name: "when SchedulingQueueHint is disabled, inFlightPods and inFlightEvents should be empty",
|
||||||
initialPods: []*v1.Pod{pod},
|
initialPods: []*v1.Pod{pod},
|
||||||
actions: []action{
|
actions: []action{
|
||||||
// This Pod shouldn't be added to inFlightPods because SchedulingQueueHint is disabled.
|
// This Pod shouldn't be added to inFlightPods because SchedulingQueueHint is disabled.
|
||||||
{podPopped: pod},
|
{podPopped: pod},
|
||||||
// This event shouldn't be added to receivedEvents because SchedulingQueueHint is disabled.
|
// This event shouldn't be added to inFlightEvents because SchedulingQueueHint is disabled.
|
||||||
{eventHappens: &PvAdd},
|
{eventHappens: &PvAdd},
|
||||||
},
|
},
|
||||||
wantInFlightPods: map[types.UID]inFlightPod{},
|
wantInFlightPods: nil,
|
||||||
wantReceivedEvents: clusterEventsToList([]*clusterEvent{}),
|
wantInFlightEvents: nil,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "when SchedulingQueueHint is disabled, which queue to enqueue Pod should be decided without SchedulingQueueHint",
|
name: "when SchedulingQueueHint is disabled, which queue to enqueue Pod should be decided without SchedulingQueueHint",
|
||||||
@ -241,8 +246,8 @@ func Test_InFlightPods(t *testing.T) {
|
|||||||
{podEnqueued: newQueuedPodInfoForLookup(pod, "fooPlugin1")},
|
{podEnqueued: newQueuedPodInfoForLookup(pod, "fooPlugin1")},
|
||||||
},
|
},
|
||||||
wantBackoffQPodNames: []string{"targetpod"},
|
wantBackoffQPodNames: []string{"targetpod"},
|
||||||
wantInFlightPods: map[types.UID]inFlightPod{},
|
wantInFlightPods: nil,
|
||||||
wantReceivedEvents: list.New(),
|
wantInFlightEvents: nil,
|
||||||
queueingHintMap: QueueingHintMapPerProfile{
|
queueingHintMap: QueueingHintMapPerProfile{
|
||||||
"": {
|
"": {
|
||||||
// This hint fn tells that this event doesn't make a Pod schedulable.
|
// This hint fn tells that this event doesn't make a Pod schedulable.
|
||||||
@ -257,33 +262,26 @@ func Test_InFlightPods(t *testing.T) {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "Pod is registered in inFlightPods with no previousEvent if Pod is popped from activeQ while no receivedEvents",
|
name: "Pod is registered in inFlightPods when Pod is popped from activeQ",
|
||||||
isSchedulingQueueHintEnabled: true,
|
isSchedulingQueueHintEnabled: true,
|
||||||
initialPods: []*v1.Pod{pod},
|
initialPods: []*v1.Pod{pod},
|
||||||
actions: []action{
|
actions: []action{
|
||||||
// This won't be added to receivedEvents because no inFlightPods at this point.
|
// This won't be added to inFlightEvents because no inFlightPods at this point.
|
||||||
{eventHappens: &PvcAdd},
|
{eventHappens: &PvcAdd},
|
||||||
// This Pod has no previousEvent because no receivedEvents at this point.
|
|
||||||
{podPopped: pod},
|
{podPopped: pod},
|
||||||
|
// This gets added for the pod.
|
||||||
{eventHappens: &PvAdd},
|
{eventHappens: &PvAdd},
|
||||||
},
|
},
|
||||||
wantInFlightPods: map[types.UID]inFlightPod{
|
wantInFlightPods: []*v1.Pod{pod},
|
||||||
"pod1": {
|
wantInFlightEvents: []interface{}{pod, PvAdd},
|
||||||
// no previousEvent
|
|
||||||
},
|
|
||||||
},
|
|
||||||
wantReceivedEvents: clusterEventsToList([]*clusterEvent{
|
|
||||||
{event: PvAdd, inFlightPodsNum: 1},
|
|
||||||
}),
|
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "Pod, registered in inFlightPods with no previousEvent, is enqueued back to activeQ",
|
name: "Pod, registered in inFlightPods, is enqueued back to activeQ",
|
||||||
isSchedulingQueueHintEnabled: true,
|
isSchedulingQueueHintEnabled: true,
|
||||||
initialPods: []*v1.Pod{pod, pod2},
|
initialPods: []*v1.Pod{pod, pod2},
|
||||||
actions: []action{
|
actions: []action{
|
||||||
// This won't be added to receivedEvents because no inFlightPods at this point.
|
// This won't be added to inFlightEvents because no inFlightPods at this point.
|
||||||
{eventHappens: &PvcAdd},
|
{eventHappens: &PvcAdd},
|
||||||
// This Pod has no previousEvent because no receivedEvents at this point.
|
|
||||||
{podPopped: pod},
|
{podPopped: pod},
|
||||||
{eventHappens: &PvAdd},
|
{eventHappens: &PvAdd},
|
||||||
{podPopped: pod2},
|
{podPopped: pod2},
|
||||||
@ -291,74 +289,47 @@ func Test_InFlightPods(t *testing.T) {
|
|||||||
{podEnqueued: newQueuedPodInfoForLookup(pod)},
|
{podEnqueued: newQueuedPodInfoForLookup(pod)},
|
||||||
},
|
},
|
||||||
wantBackoffQPodNames: []string{"targetpod"},
|
wantBackoffQPodNames: []string{"targetpod"},
|
||||||
wantInFlightPods: map[types.UID]inFlightPod{
|
wantInFlightPods: []*v1.Pod{pod2},
|
||||||
"pod2": {
|
wantInFlightEvents: []interface{}{pod2, NodeAdd},
|
||||||
// When pod is enqueued back to queue, inFlightPodsNum in previousEvent is also updated to 0.
|
|
||||||
previousEvent: &list.Element{Value: &clusterEvent{event: PvAdd, inFlightPodsNum: 0}},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
wantReceivedEvents: clusterEventsToList([]*clusterEvent{
|
|
||||||
// event: PvAdd is removed when pod is enqueued back to queue.
|
|
||||||
{event: NodeAdd, inFlightPodsNum: 1}, // inFlightPodsNum is updated from 2 to 1.
|
|
||||||
}),
|
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "Pod registered in inFlightPods with previousEvent with inFlightPodsNum:0 is enqueued back to activeQ",
|
name: "All Pods registered in inFlightPods are enqueued back to activeQ",
|
||||||
isSchedulingQueueHintEnabled: true,
|
isSchedulingQueueHintEnabled: true,
|
||||||
initialPods: []*v1.Pod{pod, pod2},
|
initialPods: []*v1.Pod{pod, pod2},
|
||||||
actions: []action{
|
actions: []action{
|
||||||
// This won't be added to receivedEvents because no inFlightPods at this point.
|
// This won't be added to inFlightEvents because no inFlightPods at this point.
|
||||||
{eventHappens: &PvcAdd},
|
{eventHappens: &PvcAdd},
|
||||||
// This Pod has no previousEvent because no receivedEvents at this point.
|
|
||||||
{podPopped: pod},
|
{podPopped: pod},
|
||||||
{eventHappens: &PvAdd},
|
{eventHappens: &PvAdd},
|
||||||
{podPopped: pod2},
|
{podPopped: pod2},
|
||||||
{eventHappens: &NodeAdd},
|
{eventHappens: &NodeAdd},
|
||||||
{podEnqueued: newQueuedPodInfoForLookup(pod)},
|
{podEnqueued: newQueuedPodInfoForLookup(pod)},
|
||||||
{eventHappens: &CSINodeUpdate},
|
{eventHappens: &CSINodeUpdate},
|
||||||
// pod2 is registered in inFlightPods with previousEvent with inFlightPodsNum:0.
|
|
||||||
{podEnqueued: newQueuedPodInfoForLookup(pod2)},
|
{podEnqueued: newQueuedPodInfoForLookup(pod2)},
|
||||||
},
|
},
|
||||||
wantBackoffQPodNames: []string{"targetpod", "targetpod2"},
|
wantBackoffQPodNames: []string{"targetpod", "targetpod2"},
|
||||||
wantInFlightPods: map[types.UID]inFlightPod{},
|
wantInFlightPods: nil,
|
||||||
wantReceivedEvents: clusterEventsToList([]*clusterEvent{
|
wantInFlightEvents: nil,
|
||||||
// all events are correctly cleaned up.
|
|
||||||
}),
|
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "Pod registered in inFlightPods with previousEvent with inFlightPodsNum:non-zero is enqueued back to activeQ",
|
name: "One intermediate Pod registered in inFlightPods is enqueued back to activeQ",
|
||||||
isSchedulingQueueHintEnabled: true,
|
isSchedulingQueueHintEnabled: true,
|
||||||
initialPods: []*v1.Pod{pod, pod2, pod3},
|
initialPods: []*v1.Pod{pod, pod2, pod3},
|
||||||
actions: []action{
|
actions: []action{
|
||||||
// This won't be added to receivedEvents because no inFlightPods at this point.
|
// This won't be added to inFlightEvents because no inFlightPods at this point.
|
||||||
{eventHappens: &PvcAdd},
|
{eventHappens: &PvcAdd},
|
||||||
// This Pod has no previousEvent because no receivedEvents at this point.
|
|
||||||
{podPopped: pod},
|
{podPopped: pod},
|
||||||
{eventHappens: &PvAdd},
|
{eventHappens: &PvAdd},
|
||||||
// This Pod will get previousEvent (PvAdd).
|
|
||||||
{podPopped: pod2},
|
{podPopped: pod2},
|
||||||
{eventHappens: &NodeAdd},
|
{eventHappens: &NodeAdd},
|
||||||
// This Pod will get previousEvent (NodeAdd).
|
|
||||||
// This Pod won't be requeued again.
|
// This Pod won't be requeued again.
|
||||||
{podPopped: pod3},
|
{podPopped: pod3},
|
||||||
{eventHappens: &AssignedPodAdd},
|
{eventHappens: &AssignedPodAdd},
|
||||||
// pod2 is registered in inFlightPods with previousEvent with inFlightPodsNum:non-zero.
|
|
||||||
{podEnqueued: newQueuedPodInfoForLookup(pod2)},
|
{podEnqueued: newQueuedPodInfoForLookup(pod2)},
|
||||||
},
|
},
|
||||||
wantBackoffQPodNames: []string{"targetpod2"},
|
wantBackoffQPodNames: []string{"targetpod2"},
|
||||||
wantInFlightPods: map[types.UID]inFlightPod{
|
wantInFlightPods: []*v1.Pod{pod, pod3},
|
||||||
"pod1": {
|
wantInFlightEvents: []interface{}{pod, PvAdd, NodeAdd, pod3, AssignedPodAdd},
|
||||||
// no previousEvent
|
|
||||||
},
|
|
||||||
"pod3": {
|
|
||||||
previousEvent: &list.Element{Value: &clusterEvent{event: NodeAdd, inFlightPodsNum: 1}},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
wantReceivedEvents: clusterEventsToList([]*clusterEvent{
|
|
||||||
{event: PvAdd, inFlightPodsNum: 1},
|
|
||||||
{event: NodeAdd, inFlightPodsNum: 1},
|
|
||||||
{event: AssignedPodAdd, inFlightPodsNum: 2},
|
|
||||||
}),
|
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "events before popping Pod are ignored",
|
name: "events before popping Pod are ignored",
|
||||||
@ -371,8 +342,8 @@ func Test_InFlightPods(t *testing.T) {
|
|||||||
{podEnqueued: newQueuedPodInfoForLookup(pod, "fooPlugin1")},
|
{podEnqueued: newQueuedPodInfoForLookup(pod, "fooPlugin1")},
|
||||||
},
|
},
|
||||||
wantUnschedPodPoolPodNames: []string{"targetpod"},
|
wantUnschedPodPoolPodNames: []string{"targetpod"},
|
||||||
wantInFlightPods: map[types.UID]inFlightPod{},
|
wantInFlightPods: nil,
|
||||||
wantReceivedEvents: list.New(),
|
wantInFlightEvents: nil,
|
||||||
queueingHintMap: QueueingHintMapPerProfile{
|
queueingHintMap: QueueingHintMapPerProfile{
|
||||||
"": {
|
"": {
|
||||||
// fooPlugin1 has a queueing hint function for AssignedPodAdd,
|
// fooPlugin1 has a queueing hint function for AssignedPodAdd,
|
||||||
@ -396,8 +367,8 @@ func Test_InFlightPods(t *testing.T) {
|
|||||||
{podEnqueued: newQueuedPodInfoForLookup(pod)},
|
{podEnqueued: newQueuedPodInfoForLookup(pod)},
|
||||||
},
|
},
|
||||||
wantBackoffQPodNames: []string{"targetpod"},
|
wantBackoffQPodNames: []string{"targetpod"},
|
||||||
wantInFlightPods: map[types.UID]inFlightPod{},
|
wantInFlightPods: nil,
|
||||||
wantReceivedEvents: list.New(),
|
wantInFlightEvents: nil,
|
||||||
queueingHintMap: QueueingHintMapPerProfile{
|
queueingHintMap: QueueingHintMapPerProfile{
|
||||||
"": {
|
"": {
|
||||||
// It will be ignored because no failed plugin.
|
// It will be ignored because no failed plugin.
|
||||||
@ -420,8 +391,8 @@ func Test_InFlightPods(t *testing.T) {
|
|||||||
{podEnqueued: newQueuedPodInfoForLookup(pod, "fooPlugin1")},
|
{podEnqueued: newQueuedPodInfoForLookup(pod, "fooPlugin1")},
|
||||||
},
|
},
|
||||||
wantUnschedPodPoolPodNames: []string{"targetpod"},
|
wantUnschedPodPoolPodNames: []string{"targetpod"},
|
||||||
wantInFlightPods: map[types.UID]inFlightPod{},
|
wantInFlightPods: nil,
|
||||||
wantReceivedEvents: list.New(),
|
wantInFlightEvents: nil,
|
||||||
queueingHintMap: QueueingHintMapPerProfile{
|
queueingHintMap: QueueingHintMapPerProfile{
|
||||||
"": {
|
"": {
|
||||||
// fooPlugin1 has no queueing hint function for NodeAdd.
|
// fooPlugin1 has no queueing hint function for NodeAdd.
|
||||||
@ -445,8 +416,8 @@ func Test_InFlightPods(t *testing.T) {
|
|||||||
{podEnqueued: newQueuedPodInfoForLookup(pod, "fooPlugin1")},
|
{podEnqueued: newQueuedPodInfoForLookup(pod, "fooPlugin1")},
|
||||||
},
|
},
|
||||||
wantUnschedPodPoolPodNames: []string{"targetpod"},
|
wantUnschedPodPoolPodNames: []string{"targetpod"},
|
||||||
wantInFlightPods: map[types.UID]inFlightPod{},
|
wantInFlightPods: nil,
|
||||||
wantReceivedEvents: list.New(),
|
wantInFlightEvents: nil,
|
||||||
queueingHintMap: QueueingHintMapPerProfile{
|
queueingHintMap: QueueingHintMapPerProfile{
|
||||||
"": {
|
"": {
|
||||||
// fooPlugin1 has a queueing hint function for AssignedPodAdd,
|
// fooPlugin1 has a queueing hint function for AssignedPodAdd,
|
||||||
@ -470,8 +441,8 @@ func Test_InFlightPods(t *testing.T) {
|
|||||||
{podEnqueued: newQueuedPodInfoForLookup(pod, "fooPlugin1", "fooPlugin2", "fooPlugin3")},
|
{podEnqueued: newQueuedPodInfoForLookup(pod, "fooPlugin1", "fooPlugin2", "fooPlugin3")},
|
||||||
},
|
},
|
||||||
wantActiveQPodNames: []string{"targetpod"},
|
wantActiveQPodNames: []string{"targetpod"},
|
||||||
wantInFlightPods: map[types.UID]inFlightPod{},
|
wantInFlightPods: nil,
|
||||||
wantReceivedEvents: list.New(),
|
wantInFlightEvents: nil,
|
||||||
queueingHintMap: QueueingHintMapPerProfile{
|
queueingHintMap: QueueingHintMapPerProfile{
|
||||||
"": {
|
"": {
|
||||||
AssignedPodAdd: {
|
AssignedPodAdd: {
|
||||||
@ -504,8 +475,8 @@ func Test_InFlightPods(t *testing.T) {
|
|||||||
{podEnqueued: newQueuedPodInfoForLookup(pod, "fooPlugin1", "fooPlugin2")},
|
{podEnqueued: newQueuedPodInfoForLookup(pod, "fooPlugin1", "fooPlugin2")},
|
||||||
},
|
},
|
||||||
wantBackoffQPodNames: []string{"targetpod"},
|
wantBackoffQPodNames: []string{"targetpod"},
|
||||||
wantInFlightPods: map[types.UID]inFlightPod{},
|
wantInFlightPods: nil,
|
||||||
wantReceivedEvents: list.New(),
|
wantInFlightEvents: nil,
|
||||||
queueingHintMap: QueueingHintMapPerProfile{
|
queueingHintMap: QueueingHintMapPerProfile{
|
||||||
"": {
|
"": {
|
||||||
AssignedPodAdd: {
|
AssignedPodAdd: {
|
||||||
@ -523,6 +494,54 @@ func Test_InFlightPods(t *testing.T) {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
name: "pod is enqueued to activeQ because the failed plugin has a hint fn and it returns QueueImmediately for a concurrent event that was received while some other pod was in flight",
|
||||||
|
isSchedulingQueueHintEnabled: true,
|
||||||
|
initialPods: []*v1.Pod{pod, pod2},
|
||||||
|
actions: []action{
|
||||||
|
{callback: func(t *testing.T, q *PriorityQueue) { poppedPod = popPod(t, q, pod) }},
|
||||||
|
{eventHappens: &NodeAdd},
|
||||||
|
{callback: func(t *testing.T, q *PriorityQueue) { poppedPod2 = popPod(t, q, pod2) }},
|
||||||
|
{eventHappens: &AssignedPodAdd},
|
||||||
|
{callback: func(t *testing.T, q *PriorityQueue) {
|
||||||
|
logger, _ := ktesting.NewTestContext(t)
|
||||||
|
if err := q.AddUnschedulableIfNotPresent(logger, poppedPod, q.SchedulingCycle()); err != nil {
|
||||||
|
t.Errorf("Unexpected error from AddUnschedulableIfNotPresent: %v", err)
|
||||||
|
}
|
||||||
|
}},
|
||||||
|
{callback: func(t *testing.T, q *PriorityQueue) {
|
||||||
|
logger, _ := ktesting.NewTestContext(t)
|
||||||
|
poppedPod2.UnschedulablePlugins = sets.New("fooPlugin1", "fooPlugin2", "fooPlugin3")
|
||||||
|
if err := q.AddUnschedulableIfNotPresent(logger, poppedPod2, q.SchedulingCycle()); err != nil {
|
||||||
|
t.Errorf("Unexpected error from AddUnschedulableIfNotPresent: %v", err)
|
||||||
|
}
|
||||||
|
}},
|
||||||
|
},
|
||||||
|
wantActiveQPodNames: []string{pod2.Name},
|
||||||
|
wantInFlightPods: nil,
|
||||||
|
wantInFlightEvents: nil,
|
||||||
|
queueingHintMap: QueueingHintMapPerProfile{
|
||||||
|
"": {
|
||||||
|
AssignedPodAdd: {
|
||||||
|
{
|
||||||
|
// it will be ignored because the hint fn returns QueueSkip that is weaker than queueHintReturnQueueImmediately from fooPlugin1.
|
||||||
|
PluginName: "fooPlugin3",
|
||||||
|
QueueingHintFn: queueHintReturnQueueSkip,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
// it will be ignored because the hint fn returns QueueAfterBackoff that is weaker than queueHintReturnQueueImmediately from fooPlugin1.
|
||||||
|
PluginName: "fooPlugin2",
|
||||||
|
QueueingHintFn: queueHintReturnQueueAfterBackoff,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
// The hint fn tells that this event makes a Pod scheudlable immediately.
|
||||||
|
PluginName: "fooPlugin1",
|
||||||
|
QueueingHintFn: queueHintReturnQueueImmediately,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, test := range tests {
|
for _, test := range tests {
|
||||||
@ -544,27 +563,37 @@ func Test_InFlightPods(t *testing.T) {
|
|||||||
for _, action := range test.actions {
|
for _, action := range test.actions {
|
||||||
switch {
|
switch {
|
||||||
case action.podPopped != nil:
|
case action.podPopped != nil:
|
||||||
p, err := q.Pop()
|
popPod(t, q, action.podPopped)
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Pop failed: %v", err)
|
|
||||||
}
|
|
||||||
if p.Pod.UID != action.podPopped.UID {
|
|
||||||
t.Errorf("Unexpected popped pod: %v", p)
|
|
||||||
}
|
|
||||||
continue
|
|
||||||
case action.eventHappens != nil:
|
case action.eventHappens != nil:
|
||||||
q.MoveAllToActiveOrBackoffQueue(logger, *action.eventHappens, nil, nil, nil)
|
q.MoveAllToActiveOrBackoffQueue(logger, *action.eventHappens, nil, nil, nil)
|
||||||
case action.podEnqueued != nil:
|
case action.podEnqueued != nil:
|
||||||
q.AddUnschedulableIfNotPresent(logger, action.podEnqueued, q.SchedulingCycle())
|
q.AddUnschedulableIfNotPresent(logger, action.podEnqueued, q.SchedulingCycle())
|
||||||
|
case action.callback != nil:
|
||||||
|
action.callback(t, q)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if diff := cmp.Diff(test.wantInFlightPods, q.inFlightPods, cmp.AllowUnexported(inFlightPod{}, list.Element{}, clusterEvent{}), cmpopts.IgnoreFields(list.Element{}, "next", "prev", "list")); diff != "" {
|
actualInFlightPods := make(map[types.UID]*v1.Pod)
|
||||||
|
for uid, element := range q.inFlightPods {
|
||||||
|
actualInFlightPods[uid] = element.Value.(*v1.Pod)
|
||||||
|
}
|
||||||
|
wantInFlightPods := make(map[types.UID]*v1.Pod)
|
||||||
|
for _, pod := range test.wantInFlightPods {
|
||||||
|
wantInFlightPods[pod.UID] = pod
|
||||||
|
}
|
||||||
|
if diff := cmp.Diff(wantInFlightPods, actualInFlightPods); diff != "" {
|
||||||
t.Errorf("Unexpected diff in inFlightPods (-want, +got):\n%s", diff)
|
t.Errorf("Unexpected diff in inFlightPods (-want, +got):\n%s", diff)
|
||||||
}
|
}
|
||||||
|
|
||||||
if diff := cmp.Diff(listToClusterEvents(test.wantReceivedEvents), listToClusterEvents(q.receivedEvents), cmp.AllowUnexported(clusterEvent{})); diff != "" {
|
var wantInFlightEvents []interface{}
|
||||||
t.Errorf("Unexpected diff in receivedEvents (-want, +got):\n%s", diff)
|
for _, value := range test.wantInFlightEvents {
|
||||||
|
if event, ok := value.(framework.ClusterEvent); ok {
|
||||||
|
value = &clusterEvent{event: event}
|
||||||
|
}
|
||||||
|
wantInFlightEvents = append(wantInFlightEvents, value)
|
||||||
|
}
|
||||||
|
if diff := cmp.Diff(wantInFlightEvents, listToValues(q.inFlightEvents), cmp.AllowUnexported(clusterEvent{})); diff != "" {
|
||||||
|
t.Errorf("Unexpected diff in inFlightEvents (-want, +got):\n%s", diff)
|
||||||
}
|
}
|
||||||
|
|
||||||
if test.wantActiveQPodNames != nil {
|
if test.wantActiveQPodNames != nil {
|
||||||
@ -1231,28 +1260,33 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueue(t *testing.T) {
|
|||||||
if p, err := q.Pop(); err != nil || p.Pod != unschedulablePodInfo.Pod {
|
if p, err := q.Pop(); err != nil || p.Pod != unschedulablePodInfo.Pod {
|
||||||
t.Errorf("Expected: %v after Pop, but got: %v", unschedulablePodInfo.Pod.Name, p.Pod.Name)
|
t.Errorf("Expected: %v after Pop, but got: %v", unschedulablePodInfo.Pod.Name, p.Pod.Name)
|
||||||
}
|
}
|
||||||
|
expectInFlightPods(t, q, unschedulablePodInfo.Pod.UID)
|
||||||
q.activeQ.Add(q.newQueuedPodInfo(highPriorityPodInfo.Pod))
|
q.activeQ.Add(q.newQueuedPodInfo(highPriorityPodInfo.Pod))
|
||||||
if p, err := q.Pop(); err != nil || p.Pod != highPriorityPodInfo.Pod {
|
if p, err := q.Pop(); err != nil || p.Pod != highPriorityPodInfo.Pod {
|
||||||
t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPodInfo.Pod.Name, p.Pod.Name)
|
t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPodInfo.Pod.Name, p.Pod.Name)
|
||||||
}
|
}
|
||||||
|
expectInFlightPods(t, q, unschedulablePodInfo.Pod.UID, highPriorityPodInfo.Pod.UID)
|
||||||
q.AddUnschedulableIfNotPresent(logger, q.newQueuedPodInfo(unschedulablePodInfo.Pod, "fooPlugin"), q.SchedulingCycle())
|
q.AddUnschedulableIfNotPresent(logger, q.newQueuedPodInfo(unschedulablePodInfo.Pod, "fooPlugin"), q.SchedulingCycle())
|
||||||
q.AddUnschedulableIfNotPresent(logger, q.newQueuedPodInfo(highPriorityPodInfo.Pod, "fooPlugin"), q.SchedulingCycle())
|
q.AddUnschedulableIfNotPresent(logger, q.newQueuedPodInfo(highPriorityPodInfo.Pod, "fooPlugin"), q.SchedulingCycle())
|
||||||
|
expectInFlightPods(t, q)
|
||||||
// Construct a Pod, but don't associate its scheduler failure to any plugin
|
// Construct a Pod, but don't associate its scheduler failure to any plugin
|
||||||
hpp1 := highPriorityPodInfo.Pod.DeepCopy()
|
hpp1 := clonePod(highPriorityPodInfo.Pod, "hpp1")
|
||||||
hpp1.Name = "hpp1"
|
|
||||||
q.activeQ.Add(q.newQueuedPodInfo(hpp1))
|
q.activeQ.Add(q.newQueuedPodInfo(hpp1))
|
||||||
if p, err := q.Pop(); err != nil || p.Pod != hpp1 {
|
if p, err := q.Pop(); err != nil || p.Pod != hpp1 {
|
||||||
t.Errorf("Expected: %v after Pop, but got: %v", hpp1, p.Pod.Name)
|
t.Errorf("Expected: %v after Pop, but got: %v", hpp1, p.Pod.Name)
|
||||||
}
|
}
|
||||||
|
expectInFlightPods(t, q, hpp1.UID)
|
||||||
q.AddUnschedulableIfNotPresent(logger, q.newQueuedPodInfo(hpp1), q.SchedulingCycle())
|
q.AddUnschedulableIfNotPresent(logger, q.newQueuedPodInfo(hpp1), q.SchedulingCycle())
|
||||||
|
expectInFlightPods(t, q)
|
||||||
// Construct another Pod, and associate its scheduler failure to plugin "barPlugin".
|
// Construct another Pod, and associate its scheduler failure to plugin "barPlugin".
|
||||||
hpp2 := highPriorityPodInfo.Pod.DeepCopy()
|
hpp2 := clonePod(highPriorityPodInfo.Pod, "hpp2")
|
||||||
hpp2.Name = "hpp2"
|
|
||||||
q.activeQ.Add(q.newQueuedPodInfo(hpp2))
|
q.activeQ.Add(q.newQueuedPodInfo(hpp2))
|
||||||
if p, err := q.Pop(); err != nil || p.Pod != hpp2 {
|
if p, err := q.Pop(); err != nil || p.Pod != hpp2 {
|
||||||
t.Errorf("Expected: %v after Pop, but got: %v", hpp2, p.Pod.Name)
|
t.Errorf("Expected: %v after Pop, but got: %v", hpp2, p.Pod.Name)
|
||||||
}
|
}
|
||||||
|
expectInFlightPods(t, q, hpp2.UID)
|
||||||
q.AddUnschedulableIfNotPresent(logger, q.newQueuedPodInfo(hpp2, "barPlugin"), q.SchedulingCycle())
|
q.AddUnschedulableIfNotPresent(logger, q.newQueuedPodInfo(hpp2, "barPlugin"), q.SchedulingCycle())
|
||||||
|
expectInFlightPods(t, q)
|
||||||
// Pods is still backing off, move the pod into backoffQ.
|
// Pods is still backing off, move the pod into backoffQ.
|
||||||
q.MoveAllToActiveOrBackoffQueue(logger, NodeAdd, nil, nil, nil)
|
q.MoveAllToActiveOrBackoffQueue(logger, NodeAdd, nil, nil, nil)
|
||||||
q.Add(logger, medPriorityPodInfo.Pod)
|
q.Add(logger, medPriorityPodInfo.Pod)
|
||||||
@ -1263,32 +1297,41 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueue(t *testing.T) {
|
|||||||
if p, err := q.Pop(); err != nil || p.Pod != medPriorityPodInfo.Pod {
|
if p, err := q.Pop(); err != nil || p.Pod != medPriorityPodInfo.Pod {
|
||||||
t.Errorf("Expected: %v after Pop, but got: %v", medPriorityPodInfo.Pod, p.Pod.Name)
|
t.Errorf("Expected: %v after Pop, but got: %v", medPriorityPodInfo.Pod, p.Pod.Name)
|
||||||
}
|
}
|
||||||
|
expectInFlightPods(t, q, medPriorityPodInfo.Pod.UID)
|
||||||
// hpp2 won't be moved.
|
// hpp2 won't be moved.
|
||||||
if q.podBackoffQ.Len() != 3 {
|
if q.podBackoffQ.Len() != 3 {
|
||||||
t.Fatalf("Expected 3 items to be in podBackoffQ, but got: %v", q.podBackoffQ.Len())
|
t.Fatalf("Expected 3 items to be in podBackoffQ, but got: %v", q.podBackoffQ.Len())
|
||||||
}
|
}
|
||||||
|
|
||||||
// pop out the pods in the backoffQ.
|
// pop out the pods in the backoffQ.
|
||||||
|
// This doesn't make them in-flight pods.
|
||||||
for q.podBackoffQ.Len() != 0 {
|
for q.podBackoffQ.Len() != 0 {
|
||||||
q.podBackoffQ.Pop()
|
q.podBackoffQ.Pop()
|
||||||
}
|
}
|
||||||
|
expectInFlightPods(t, q, medPriorityPodInfo.Pod.UID)
|
||||||
|
|
||||||
q.schedulingCycle++
|
q.schedulingCycle++
|
||||||
q.activeQ.Add(q.newQueuedPodInfo(unschedulablePodInfo.Pod))
|
q.activeQ.Add(q.newQueuedPodInfo(unschedulablePodInfo.Pod))
|
||||||
if p, err := q.Pop(); err != nil || p.Pod != unschedulablePodInfo.Pod {
|
if p, err := q.Pop(); err != nil || p.Pod != unschedulablePodInfo.Pod {
|
||||||
t.Errorf("Expected: %v after Pop, but got: %v", unschedulablePodInfo.Pod.Name, p.Pod.Name)
|
t.Errorf("Expected: %v after Pop, but got: %v", unschedulablePodInfo.Pod.Name, p.Pod.Name)
|
||||||
}
|
}
|
||||||
|
expectInFlightPods(t, q, medPriorityPodInfo.Pod.UID, unschedulablePodInfo.Pod.UID)
|
||||||
q.activeQ.Add(q.newQueuedPodInfo(highPriorityPodInfo.Pod))
|
q.activeQ.Add(q.newQueuedPodInfo(highPriorityPodInfo.Pod))
|
||||||
if p, err := q.Pop(); err != nil || p.Pod != highPriorityPodInfo.Pod {
|
if p, err := q.Pop(); err != nil || p.Pod != highPriorityPodInfo.Pod {
|
||||||
t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPodInfo.Pod.Name, p.Pod.Name)
|
t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPodInfo.Pod.Name, p.Pod.Name)
|
||||||
}
|
}
|
||||||
|
expectInFlightPods(t, q, medPriorityPodInfo.Pod.UID, unschedulablePodInfo.Pod.UID, highPriorityPodInfo.Pod.UID)
|
||||||
q.activeQ.Add(q.newQueuedPodInfo(hpp1))
|
q.activeQ.Add(q.newQueuedPodInfo(hpp1))
|
||||||
if p, err := q.Pop(); err != nil || p.Pod != hpp1 {
|
if p, err := q.Pop(); err != nil || p.Pod != hpp1 {
|
||||||
t.Errorf("Expected: %v after Pop, but got: %v", hpp1, p.Pod.Name)
|
t.Errorf("Expected: %v after Pop, but got: %v", hpp1, p.Pod.Name)
|
||||||
}
|
}
|
||||||
|
expectInFlightPods(t, q, medPriorityPodInfo.Pod.UID, unschedulablePodInfo.Pod.UID, highPriorityPodInfo.Pod.UID, hpp1.UID)
|
||||||
q.AddUnschedulableIfNotPresent(logger, q.newQueuedPodInfo(unschedulablePodInfo.Pod, "fooPlugin"), q.SchedulingCycle())
|
q.AddUnschedulableIfNotPresent(logger, q.newQueuedPodInfo(unschedulablePodInfo.Pod, "fooPlugin"), q.SchedulingCycle())
|
||||||
|
expectInFlightPods(t, q, medPriorityPodInfo.Pod.UID, highPriorityPodInfo.Pod.UID, hpp1.UID)
|
||||||
q.AddUnschedulableIfNotPresent(logger, q.newQueuedPodInfo(highPriorityPodInfo.Pod, "fooPlugin"), q.SchedulingCycle())
|
q.AddUnschedulableIfNotPresent(logger, q.newQueuedPodInfo(highPriorityPodInfo.Pod, "fooPlugin"), q.SchedulingCycle())
|
||||||
|
expectInFlightPods(t, q, medPriorityPodInfo.Pod.UID, hpp1.UID)
|
||||||
q.AddUnschedulableIfNotPresent(logger, q.newQueuedPodInfo(hpp1), q.SchedulingCycle())
|
q.AddUnschedulableIfNotPresent(logger, q.newQueuedPodInfo(hpp1), q.SchedulingCycle())
|
||||||
|
expectInFlightPods(t, q, medPriorityPodInfo.Pod.UID)
|
||||||
q.Add(logger, medPriorityPodInfo.Pod)
|
q.Add(logger, medPriorityPodInfo.Pod)
|
||||||
for _, pod := range []*v1.Pod{unschedulablePodInfo.Pod, highPriorityPodInfo.Pod, hpp1, hpp2} {
|
for _, pod := range []*v1.Pod{unschedulablePodInfo.Pod, highPriorityPodInfo.Pod, hpp1, hpp2} {
|
||||||
if q.unschedulablePods.get(pod) == nil {
|
if q.unschedulablePods.get(pod) == nil {
|
||||||
@ -1306,6 +1349,35 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueue(t *testing.T) {
|
|||||||
if q.podBackoffQ.Len() != 0 {
|
if q.podBackoffQ.Len() != 0 {
|
||||||
t.Errorf("Expected 0 item to be in podBackoffQ, but got: %v", q.podBackoffQ.Len())
|
t.Errorf("Expected 0 item to be in podBackoffQ, but got: %v", q.podBackoffQ.Len())
|
||||||
}
|
}
|
||||||
|
expectInFlightPods(t, q, medPriorityPodInfo.Pod.UID)
|
||||||
|
}
|
||||||
|
|
||||||
|
func clonePod(pod *v1.Pod, newName string) *v1.Pod {
|
||||||
|
pod = pod.DeepCopy()
|
||||||
|
pod.Name = newName
|
||||||
|
pod.UID = types.UID(pod.Name + pod.Namespace)
|
||||||
|
return pod
|
||||||
|
}
|
||||||
|
|
||||||
|
func expectInFlightPods(t *testing.T, q *PriorityQueue, uids ...types.UID) {
|
||||||
|
t.Helper()
|
||||||
|
var actualUIDs []types.UID
|
||||||
|
for uid := range q.inFlightPods {
|
||||||
|
actualUIDs = append(actualUIDs, uid)
|
||||||
|
}
|
||||||
|
sortUIDs := cmpopts.SortSlices(func(a, b types.UID) bool { return a < b })
|
||||||
|
if diff := cmp.Diff(uids, actualUIDs, sortUIDs); diff != "" {
|
||||||
|
t.Fatalf("Unexpected content of inFlightPods (-want, +have):\n%s", diff)
|
||||||
|
}
|
||||||
|
actualUIDs = nil
|
||||||
|
for e := q.inFlightEvents.Front(); e != nil; e = e.Next() {
|
||||||
|
if pod, ok := e.Value.(*v1.Pod); ok {
|
||||||
|
actualUIDs = append(actualUIDs, pod.UID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if diff := cmp.Diff(uids, actualUIDs, sortUIDs); diff != "" {
|
||||||
|
t.Fatalf("Unexpected pods in inFlightEvents (-want, +have):\n%s", diff)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TestPriorityQueue_AssignedPodAdded tests AssignedPodAdded. It checks that
|
// TestPriorityQueue_AssignedPodAdded tests AssignedPodAdded. It checks that
|
||||||
|
Loading…
Reference in New Issue
Block a user