mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 11:50:44 +00:00
Refine schedulerQueue test to avoid unneeded locks
- revert PR 88331 - remove createAndRunPriorityQueue() - PriorityQueue is created and not run by default - flushXYZ() is called on demand by manipulating FakeClock and Step()
This commit is contained in:
parent
de66c643c4
commit
8c2a540cdc
@ -107,8 +107,6 @@ var highPriorityPod, highPriNominatedPod, medPriorityPod, unschedulablePod = v1.
|
|||||||
}
|
}
|
||||||
|
|
||||||
func getUnschedulablePod(p *PriorityQueue, pod *v1.Pod) *v1.Pod {
|
func getUnschedulablePod(p *PriorityQueue, pod *v1.Pod) *v1.Pod {
|
||||||
p.lock.Lock()
|
|
||||||
defer p.lock.Unlock()
|
|
||||||
pInfo := p.unschedulableQ.get(pod)
|
pInfo := p.unschedulableQ.get(pod)
|
||||||
if pInfo != nil {
|
if pInfo != nil {
|
||||||
return pInfo.Pod
|
return pInfo.Pod
|
||||||
@ -117,7 +115,7 @@ func getUnschedulablePod(p *PriorityQueue, pod *v1.Pod) *v1.Pod {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestPriorityQueue_Add(t *testing.T) {
|
func TestPriorityQueue_Add(t *testing.T) {
|
||||||
q := createAndRunPriorityQueue(newDefaultQueueSort())
|
q := NewPriorityQueue(newDefaultQueueSort())
|
||||||
if err := q.Add(&medPriorityPod); err != nil {
|
if err := q.Add(&medPriorityPod); err != nil {
|
||||||
t.Errorf("add failed: %v", err)
|
t.Errorf("add failed: %v", err)
|
||||||
}
|
}
|
||||||
@ -159,7 +157,7 @@ func newDefaultQueueSort() framework.LessFunc {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestPriorityQueue_AddWithReversePriorityLessFunc(t *testing.T) {
|
func TestPriorityQueue_AddWithReversePriorityLessFunc(t *testing.T) {
|
||||||
q := createAndRunPriorityQueue(newDefaultQueueSort())
|
q := NewPriorityQueue(newDefaultQueueSort())
|
||||||
if err := q.Add(&medPriorityPod); err != nil {
|
if err := q.Add(&medPriorityPod); err != nil {
|
||||||
t.Errorf("add failed: %v", err)
|
t.Errorf("add failed: %v", err)
|
||||||
}
|
}
|
||||||
@ -175,7 +173,7 @@ func TestPriorityQueue_AddWithReversePriorityLessFunc(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestPriorityQueue_AddUnschedulableIfNotPresent(t *testing.T) {
|
func TestPriorityQueue_AddUnschedulableIfNotPresent(t *testing.T) {
|
||||||
q := createAndRunPriorityQueue(newDefaultQueueSort())
|
q := NewPriorityQueue(newDefaultQueueSort())
|
||||||
q.Add(&highPriNominatedPod)
|
q.Add(&highPriNominatedPod)
|
||||||
q.AddUnschedulableIfNotPresent(newQueuedPodInfoNoTimestamp(&highPriNominatedPod), q.SchedulingCycle()) // Must not add anything.
|
q.AddUnschedulableIfNotPresent(newQueuedPodInfoNoTimestamp(&highPriNominatedPod), q.SchedulingCycle()) // Must not add anything.
|
||||||
q.AddUnschedulableIfNotPresent(newQueuedPodInfoNoTimestamp(&unschedulablePod), q.SchedulingCycle())
|
q.AddUnschedulableIfNotPresent(newQueuedPodInfoNoTimestamp(&unschedulablePod), q.SchedulingCycle())
|
||||||
@ -207,7 +205,7 @@ func TestPriorityQueue_AddUnschedulableIfNotPresent(t *testing.T) {
|
|||||||
// Pods in and before current scheduling cycle will be put back to activeQueue
|
// Pods in and before current scheduling cycle will be put back to activeQueue
|
||||||
// if we were trying to schedule them when we received move request.
|
// if we were trying to schedule them when we received move request.
|
||||||
func TestPriorityQueue_AddUnschedulableIfNotPresent_Backoff(t *testing.T) {
|
func TestPriorityQueue_AddUnschedulableIfNotPresent_Backoff(t *testing.T) {
|
||||||
q := createAndRunPriorityQueue(newDefaultQueueSort(), WithClock(clock.NewFakeClock(time.Now())))
|
q := NewPriorityQueue(newDefaultQueueSort(), WithClock(clock.NewFakeClock(time.Now())))
|
||||||
totalNum := 10
|
totalNum := 10
|
||||||
expectedPods := make([]v1.Pod, 0, totalNum)
|
expectedPods := make([]v1.Pod, 0, totalNum)
|
||||||
for i := 0; i < totalNum; i++ {
|
for i := 0; i < totalNum; i++ {
|
||||||
@ -262,7 +260,6 @@ func TestPriorityQueue_AddUnschedulableIfNotPresent_Backoff(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
q.lock.RLock()
|
|
||||||
// Since there was a move request at the same cycle as "oldCycle", these pods
|
// Since there was a move request at the same cycle as "oldCycle", these pods
|
||||||
// should be in the backoff queue.
|
// should be in the backoff queue.
|
||||||
for i := 1; i < totalNum; i++ {
|
for i := 1; i < totalNum; i++ {
|
||||||
@ -270,11 +267,10 @@ func TestPriorityQueue_AddUnschedulableIfNotPresent_Backoff(t *testing.T) {
|
|||||||
t.Errorf("Expected %v to be added to podBackoffQ.", expectedPods[i].Name)
|
t.Errorf("Expected %v to be added to podBackoffQ.", expectedPods[i].Name)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
q.lock.RUnlock()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestPriorityQueue_Pop(t *testing.T) {
|
func TestPriorityQueue_Pop(t *testing.T) {
|
||||||
q := createAndRunPriorityQueue(newDefaultQueueSort())
|
q := NewPriorityQueue(newDefaultQueueSort())
|
||||||
wg := sync.WaitGroup{}
|
wg := sync.WaitGroup{}
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
@ -291,44 +287,36 @@ func TestPriorityQueue_Pop(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestPriorityQueue_Update(t *testing.T) {
|
func TestPriorityQueue_Update(t *testing.T) {
|
||||||
q := createAndRunPriorityQueue(newDefaultQueueSort())
|
q := NewPriorityQueue(newDefaultQueueSort())
|
||||||
q.Update(nil, &highPriorityPod)
|
q.Update(nil, &highPriorityPod)
|
||||||
q.lock.RLock()
|
|
||||||
if _, exists, _ := q.activeQ.Get(newQueuedPodInfoNoTimestamp(&highPriorityPod)); !exists {
|
if _, exists, _ := q.activeQ.Get(newQueuedPodInfoNoTimestamp(&highPriorityPod)); !exists {
|
||||||
t.Errorf("Expected %v to be added to activeQ.", highPriorityPod.Name)
|
t.Errorf("Expected %v to be added to activeQ.", highPriorityPod.Name)
|
||||||
}
|
}
|
||||||
q.lock.RUnlock()
|
|
||||||
if len(q.nominatedPods.nominatedPods) != 0 {
|
if len(q.nominatedPods.nominatedPods) != 0 {
|
||||||
t.Errorf("Expected nomindatePods to be empty: %v", q.nominatedPods)
|
t.Errorf("Expected nomindatePods to be empty: %v", q.nominatedPods)
|
||||||
}
|
}
|
||||||
// Update highPriorityPod and add a nominatedNodeName to it.
|
// Update highPriorityPod and add a nominatedNodeName to it.
|
||||||
q.Update(&highPriorityPod, &highPriNominatedPod)
|
q.Update(&highPriorityPod, &highPriNominatedPod)
|
||||||
q.lock.RLock()
|
|
||||||
if q.activeQ.Len() != 1 {
|
if q.activeQ.Len() != 1 {
|
||||||
t.Error("Expected only one item in activeQ.")
|
t.Error("Expected only one item in activeQ.")
|
||||||
}
|
}
|
||||||
q.lock.RUnlock()
|
|
||||||
if len(q.nominatedPods.nominatedPods) != 1 {
|
if len(q.nominatedPods.nominatedPods) != 1 {
|
||||||
t.Errorf("Expected one item in nomindatePods map: %v", q.nominatedPods)
|
t.Errorf("Expected one item in nomindatePods map: %v", q.nominatedPods)
|
||||||
}
|
}
|
||||||
// Updating an unschedulable pod which is not in any of the two queues, should
|
// Updating an unschedulable pod which is not in any of the two queues, should
|
||||||
// add the pod to activeQ.
|
// add the pod to activeQ.
|
||||||
q.Update(&unschedulablePod, &unschedulablePod)
|
q.Update(&unschedulablePod, &unschedulablePod)
|
||||||
q.lock.RLock()
|
|
||||||
if _, exists, _ := q.activeQ.Get(newQueuedPodInfoNoTimestamp(&unschedulablePod)); !exists {
|
if _, exists, _ := q.activeQ.Get(newQueuedPodInfoNoTimestamp(&unschedulablePod)); !exists {
|
||||||
t.Errorf("Expected %v to be added to activeQ.", unschedulablePod.Name)
|
t.Errorf("Expected %v to be added to activeQ.", unschedulablePod.Name)
|
||||||
}
|
}
|
||||||
q.lock.RUnlock()
|
|
||||||
// Updating a pod that is already in activeQ, should not change it.
|
// Updating a pod that is already in activeQ, should not change it.
|
||||||
q.Update(&unschedulablePod, &unschedulablePod)
|
q.Update(&unschedulablePod, &unschedulablePod)
|
||||||
if len(q.unschedulableQ.podInfoMap) != 0 {
|
if len(q.unschedulableQ.podInfoMap) != 0 {
|
||||||
t.Error("Expected unschedulableQ to be empty.")
|
t.Error("Expected unschedulableQ to be empty.")
|
||||||
}
|
}
|
||||||
q.lock.RLock()
|
|
||||||
if _, exists, _ := q.activeQ.Get(newQueuedPodInfoNoTimestamp(&unschedulablePod)); !exists {
|
if _, exists, _ := q.activeQ.Get(newQueuedPodInfoNoTimestamp(&unschedulablePod)); !exists {
|
||||||
t.Errorf("Expected: %v to be added to activeQ.", unschedulablePod.Name)
|
t.Errorf("Expected: %v to be added to activeQ.", unschedulablePod.Name)
|
||||||
}
|
}
|
||||||
q.lock.RUnlock()
|
|
||||||
if p, err := q.Pop(); err != nil || p.Pod != &highPriNominatedPod {
|
if p, err := q.Pop(); err != nil || p.Pod != &highPriNominatedPod {
|
||||||
t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPod.Name, p.Pod.Name)
|
t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPod.Name, p.Pod.Name)
|
||||||
}
|
}
|
||||||
@ -347,20 +335,18 @@ func TestPriorityQueue_Update(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestPriorityQueue_Delete(t *testing.T) {
|
func TestPriorityQueue_Delete(t *testing.T) {
|
||||||
q := createAndRunPriorityQueue(newDefaultQueueSort())
|
q := NewPriorityQueue(newDefaultQueueSort())
|
||||||
q.Update(&highPriorityPod, &highPriNominatedPod)
|
q.Update(&highPriorityPod, &highPriNominatedPod)
|
||||||
q.Add(&unschedulablePod)
|
q.Add(&unschedulablePod)
|
||||||
if err := q.Delete(&highPriNominatedPod); err != nil {
|
if err := q.Delete(&highPriNominatedPod); err != nil {
|
||||||
t.Errorf("delete failed: %v", err)
|
t.Errorf("delete failed: %v", err)
|
||||||
}
|
}
|
||||||
q.lock.RLock()
|
|
||||||
if _, exists, _ := q.activeQ.Get(newQueuedPodInfoNoTimestamp(&unschedulablePod)); !exists {
|
if _, exists, _ := q.activeQ.Get(newQueuedPodInfoNoTimestamp(&unschedulablePod)); !exists {
|
||||||
t.Errorf("Expected %v to be in activeQ.", unschedulablePod.Name)
|
t.Errorf("Expected %v to be in activeQ.", unschedulablePod.Name)
|
||||||
}
|
}
|
||||||
if _, exists, _ := q.activeQ.Get(newQueuedPodInfoNoTimestamp(&highPriNominatedPod)); exists {
|
if _, exists, _ := q.activeQ.Get(newQueuedPodInfoNoTimestamp(&highPriNominatedPod)); exists {
|
||||||
t.Errorf("Didn't expect %v to be in activeQ.", highPriorityPod.Name)
|
t.Errorf("Didn't expect %v to be in activeQ.", highPriorityPod.Name)
|
||||||
}
|
}
|
||||||
q.lock.RUnlock()
|
|
||||||
if len(q.nominatedPods.nominatedPods) != 1 {
|
if len(q.nominatedPods.nominatedPods) != 1 {
|
||||||
t.Errorf("Expected nomindatePods to have only 'unschedulablePod': %v", q.nominatedPods.nominatedPods)
|
t.Errorf("Expected nomindatePods to have only 'unschedulablePod': %v", q.nominatedPods.nominatedPods)
|
||||||
}
|
}
|
||||||
@ -373,13 +359,11 @@ func TestPriorityQueue_Delete(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestPriorityQueue_MoveAllToActiveOrBackoffQueue(t *testing.T) {
|
func TestPriorityQueue_MoveAllToActiveOrBackoffQueue(t *testing.T) {
|
||||||
q := createAndRunPriorityQueue(newDefaultQueueSort())
|
q := NewPriorityQueue(newDefaultQueueSort())
|
||||||
q.Add(&medPriorityPod)
|
q.Add(&medPriorityPod)
|
||||||
q.AddUnschedulableIfNotPresent(q.newQueuedPodInfo(&unschedulablePod), q.SchedulingCycle())
|
q.AddUnschedulableIfNotPresent(q.newQueuedPodInfo(&unschedulablePod), q.SchedulingCycle())
|
||||||
q.AddUnschedulableIfNotPresent(q.newQueuedPodInfo(&highPriorityPod), q.SchedulingCycle())
|
q.AddUnschedulableIfNotPresent(q.newQueuedPodInfo(&highPriorityPod), q.SchedulingCycle())
|
||||||
q.MoveAllToActiveOrBackoffQueue("test")
|
q.MoveAllToActiveOrBackoffQueue("test")
|
||||||
q.lock.RLock()
|
|
||||||
defer q.lock.RUnlock()
|
|
||||||
if q.activeQ.Len() != 1 {
|
if q.activeQ.Len() != 1 {
|
||||||
t.Error("Expected 1 item to be in activeQ")
|
t.Error("Expected 1 item to be in activeQ")
|
||||||
}
|
}
|
||||||
@ -425,7 +409,7 @@ func TestPriorityQueue_AssignedPodAdded(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
c := clock.NewFakeClock(time.Now())
|
c := clock.NewFakeClock(time.Now())
|
||||||
q := createAndRunPriorityQueue(newDefaultQueueSort(), WithClock(c))
|
q := NewPriorityQueue(newDefaultQueueSort(), WithClock(c))
|
||||||
q.Add(&medPriorityPod)
|
q.Add(&medPriorityPod)
|
||||||
// Add a couple of pods to the unschedulableQ.
|
// Add a couple of pods to the unschedulableQ.
|
||||||
q.AddUnschedulableIfNotPresent(q.newQueuedPodInfo(&unschedulablePod), q.SchedulingCycle())
|
q.AddUnschedulableIfNotPresent(q.newQueuedPodInfo(&unschedulablePod), q.SchedulingCycle())
|
||||||
@ -439,11 +423,9 @@ func TestPriorityQueue_AssignedPodAdded(t *testing.T) {
|
|||||||
if getUnschedulablePod(q, affinityPod) != nil {
|
if getUnschedulablePod(q, affinityPod) != nil {
|
||||||
t.Error("affinityPod is still in the unschedulableQ.")
|
t.Error("affinityPod is still in the unschedulableQ.")
|
||||||
}
|
}
|
||||||
q.lock.RLock()
|
|
||||||
if _, exists, _ := q.activeQ.Get(newQueuedPodInfoNoTimestamp(affinityPod)); !exists {
|
if _, exists, _ := q.activeQ.Get(newQueuedPodInfoNoTimestamp(affinityPod)); !exists {
|
||||||
t.Error("affinityPod is not moved to activeQ.")
|
t.Error("affinityPod is not moved to activeQ.")
|
||||||
}
|
}
|
||||||
q.lock.RUnlock()
|
|
||||||
// Check that the other pod is still in the unschedulableQ.
|
// Check that the other pod is still in the unschedulableQ.
|
||||||
if getUnschedulablePod(q, &unschedulablePod) == nil {
|
if getUnschedulablePod(q, &unschedulablePod) == nil {
|
||||||
t.Error("unschedulablePod is not in the unschedulableQ.")
|
t.Error("unschedulablePod is not in the unschedulableQ.")
|
||||||
@ -451,7 +433,7 @@ func TestPriorityQueue_AssignedPodAdded(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestPriorityQueue_NominatedPodsForNode(t *testing.T) {
|
func TestPriorityQueue_NominatedPodsForNode(t *testing.T) {
|
||||||
q := createAndRunPriorityQueue(newDefaultQueueSort())
|
q := NewPriorityQueue(newDefaultQueueSort())
|
||||||
q.Add(&medPriorityPod)
|
q.Add(&medPriorityPod)
|
||||||
q.Add(&unschedulablePod)
|
q.Add(&unschedulablePod)
|
||||||
q.Add(&highPriorityPod)
|
q.Add(&highPriorityPod)
|
||||||
@ -476,7 +458,7 @@ func TestPriorityQueue_PendingPods(t *testing.T) {
|
|||||||
return pendingSet
|
return pendingSet
|
||||||
}
|
}
|
||||||
|
|
||||||
q := createAndRunPriorityQueue(newDefaultQueueSort())
|
q := NewPriorityQueue(newDefaultQueueSort())
|
||||||
q.Add(&medPriorityPod)
|
q.Add(&medPriorityPod)
|
||||||
q.AddUnschedulableIfNotPresent(q.newQueuedPodInfo(&unschedulablePod), q.SchedulingCycle())
|
q.AddUnschedulableIfNotPresent(q.newQueuedPodInfo(&unschedulablePod), q.SchedulingCycle())
|
||||||
q.AddUnschedulableIfNotPresent(q.newQueuedPodInfo(&highPriorityPod), q.SchedulingCycle())
|
q.AddUnschedulableIfNotPresent(q.newQueuedPodInfo(&highPriorityPod), q.SchedulingCycle())
|
||||||
@ -493,7 +475,7 @@ func TestPriorityQueue_PendingPods(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestPriorityQueue_UpdateNominatedPodForNode(t *testing.T) {
|
func TestPriorityQueue_UpdateNominatedPodForNode(t *testing.T) {
|
||||||
q := createAndRunPriorityQueue(newDefaultQueueSort())
|
q := NewPriorityQueue(newDefaultQueueSort())
|
||||||
if err := q.Add(&medPriorityPod); err != nil {
|
if err := q.Add(&medPriorityPod); err != nil {
|
||||||
t.Errorf("add failed: %v", err)
|
t.Errorf("add failed: %v", err)
|
||||||
}
|
}
|
||||||
@ -562,7 +544,7 @@ func TestPriorityQueue_UpdateNominatedPodForNode(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestPriorityQueue_NewWithOptions(t *testing.T) {
|
func TestPriorityQueue_NewWithOptions(t *testing.T) {
|
||||||
q := createAndRunPriorityQueue(
|
q := NewPriorityQueue(
|
||||||
newDefaultQueueSort(),
|
newDefaultQueueSort(),
|
||||||
WithPodInitialBackoffDuration(2*time.Second),
|
WithPodInitialBackoffDuration(2*time.Second),
|
||||||
WithPodMaxBackoffDuration(20*time.Second),
|
WithPodMaxBackoffDuration(20*time.Second),
|
||||||
@ -735,7 +717,7 @@ func TestSchedulingQueue_Close(t *testing.T) {
|
|||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
name: "PriorityQueue close",
|
name: "PriorityQueue close",
|
||||||
q: createAndRunPriorityQueue(newDefaultQueueSort()),
|
q: NewPriorityQueue(newDefaultQueueSort()),
|
||||||
expectedErr: fmt.Errorf(queueClosed),
|
expectedErr: fmt.Errorf(queueClosed),
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
@ -764,7 +746,8 @@ func TestSchedulingQueue_Close(t *testing.T) {
|
|||||||
// ensures that an unschedulable pod does not block head of the queue when there
|
// ensures that an unschedulable pod does not block head of the queue when there
|
||||||
// are frequent events that move pods to the active queue.
|
// are frequent events that move pods to the active queue.
|
||||||
func TestRecentlyTriedPodsGoBack(t *testing.T) {
|
func TestRecentlyTriedPodsGoBack(t *testing.T) {
|
||||||
q := createAndRunPriorityQueue(newDefaultQueueSort())
|
c := clock.NewFakeClock(time.Now())
|
||||||
|
q := NewPriorityQueue(newDefaultQueueSort(), WithClock(c))
|
||||||
// Add a few pods to priority queue.
|
// Add a few pods to priority queue.
|
||||||
for i := 0; i < 5; i++ {
|
for i := 0; i < 5; i++ {
|
||||||
p := v1.Pod{
|
p := v1.Pod{
|
||||||
@ -782,6 +765,7 @@ func TestRecentlyTriedPodsGoBack(t *testing.T) {
|
|||||||
}
|
}
|
||||||
q.Add(&p)
|
q.Add(&p)
|
||||||
}
|
}
|
||||||
|
c.Step(time.Microsecond)
|
||||||
// Simulate a pod being popped by the scheduler, determined unschedulable, and
|
// Simulate a pod being popped by the scheduler, determined unschedulable, and
|
||||||
// then moved back to the active queue.
|
// then moved back to the active queue.
|
||||||
p1, err := q.Pop()
|
p1, err := q.Pop()
|
||||||
@ -798,6 +782,7 @@ func TestRecentlyTriedPodsGoBack(t *testing.T) {
|
|||||||
})
|
})
|
||||||
// Put in the unschedulable queue.
|
// Put in the unschedulable queue.
|
||||||
q.AddUnschedulableIfNotPresent(p1, q.SchedulingCycle())
|
q.AddUnschedulableIfNotPresent(p1, q.SchedulingCycle())
|
||||||
|
c.Step(DefaultPodInitialBackoffDuration)
|
||||||
// Move all unschedulable pods to the active queue.
|
// Move all unschedulable pods to the active queue.
|
||||||
q.MoveAllToActiveOrBackoffQueue("test")
|
q.MoveAllToActiveOrBackoffQueue("test")
|
||||||
// Simulation is over. Now let's pop all pods. The pod popped first should be
|
// Simulation is over. Now let's pop all pods. The pod popped first should be
|
||||||
@ -819,7 +804,7 @@ func TestRecentlyTriedPodsGoBack(t *testing.T) {
|
|||||||
// are frequent events that move pods to the active queue.
|
// are frequent events that move pods to the active queue.
|
||||||
func TestPodFailedSchedulingMultipleTimesDoesNotBlockNewerPod(t *testing.T) {
|
func TestPodFailedSchedulingMultipleTimesDoesNotBlockNewerPod(t *testing.T) {
|
||||||
c := clock.NewFakeClock(time.Now())
|
c := clock.NewFakeClock(time.Now())
|
||||||
q := createAndRunPriorityQueue(newDefaultQueueSort(), WithClock(c))
|
q := NewPriorityQueue(newDefaultQueueSort(), WithClock(c))
|
||||||
|
|
||||||
// Add an unschedulable pod to a priority queue.
|
// Add an unschedulable pod to a priority queue.
|
||||||
// This makes a situation that the pod was tried to schedule
|
// This makes a situation that the pod was tried to schedule
|
||||||
@ -910,7 +895,7 @@ func TestPodFailedSchedulingMultipleTimesDoesNotBlockNewerPod(t *testing.T) {
|
|||||||
// TestHighPriorityBackoff tests that a high priority pod does not block
|
// TestHighPriorityBackoff tests that a high priority pod does not block
|
||||||
// other pods if it is unschedulable
|
// other pods if it is unschedulable
|
||||||
func TestHighPriorityBackoff(t *testing.T) {
|
func TestHighPriorityBackoff(t *testing.T) {
|
||||||
q := createAndRunPriorityQueue(newDefaultQueueSort())
|
q := NewPriorityQueue(newDefaultQueueSort())
|
||||||
|
|
||||||
midPod := v1.Pod{
|
midPod := v1.Pod{
|
||||||
ObjectMeta: metav1.ObjectMeta{
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
@ -974,7 +959,7 @@ func TestHighPriorityBackoff(t *testing.T) {
|
|||||||
// activeQ after one minutes if it is in unschedulableQ
|
// activeQ after one minutes if it is in unschedulableQ
|
||||||
func TestHighPriorityFlushUnschedulableQLeftover(t *testing.T) {
|
func TestHighPriorityFlushUnschedulableQLeftover(t *testing.T) {
|
||||||
c := clock.NewFakeClock(time.Now())
|
c := clock.NewFakeClock(time.Now())
|
||||||
q := createAndRunPriorityQueue(newDefaultQueueSort(), WithClock(c))
|
q := NewPriorityQueue(newDefaultQueueSort(), WithClock(c))
|
||||||
midPod := v1.Pod{
|
midPod := v1.Pod{
|
||||||
ObjectMeta: metav1.ObjectMeta{
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
Name: "test-midpod",
|
Name: "test-midpod",
|
||||||
@ -1021,6 +1006,7 @@ func TestHighPriorityFlushUnschedulableQLeftover(t *testing.T) {
|
|||||||
q.AddUnschedulableIfNotPresent(q.newQueuedPodInfo(&highPod), q.SchedulingCycle())
|
q.AddUnschedulableIfNotPresent(q.newQueuedPodInfo(&highPod), q.SchedulingCycle())
|
||||||
q.AddUnschedulableIfNotPresent(q.newQueuedPodInfo(&midPod), q.SchedulingCycle())
|
q.AddUnschedulableIfNotPresent(q.newQueuedPodInfo(&midPod), q.SchedulingCycle())
|
||||||
c.Step(unschedulableQTimeInterval + time.Second)
|
c.Step(unschedulableQTimeInterval + time.Second)
|
||||||
|
q.flushUnschedulableQLeftover()
|
||||||
|
|
||||||
if p, err := q.Pop(); err != nil || p.Pod != &highPod {
|
if p, err := q.Pop(); err != nil || p.Pod != &highPod {
|
||||||
t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPod.Name, p.Pod.Name)
|
t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPod.Name, p.Pod.Name)
|
||||||
@ -1043,17 +1029,12 @@ var (
|
|||||||
queue.AddUnschedulableIfNotPresent(pInfo, -1)
|
queue.AddUnschedulableIfNotPresent(pInfo, -1)
|
||||||
}
|
}
|
||||||
addPodActiveQ = func(queue *PriorityQueue, pInfo *framework.QueuedPodInfo) {
|
addPodActiveQ = func(queue *PriorityQueue, pInfo *framework.QueuedPodInfo) {
|
||||||
queue.lock.Lock()
|
|
||||||
queue.activeQ.Add(pInfo)
|
queue.activeQ.Add(pInfo)
|
||||||
queue.lock.Unlock()
|
|
||||||
}
|
}
|
||||||
updatePodActiveQ = func(queue *PriorityQueue, pInfo *framework.QueuedPodInfo) {
|
updatePodActiveQ = func(queue *PriorityQueue, pInfo *framework.QueuedPodInfo) {
|
||||||
queue.lock.Lock()
|
|
||||||
queue.activeQ.Update(pInfo)
|
queue.activeQ.Update(pInfo)
|
||||||
queue.lock.Unlock()
|
|
||||||
}
|
}
|
||||||
addPodUnschedulableQ = func(queue *PriorityQueue, pInfo *framework.QueuedPodInfo) {
|
addPodUnschedulableQ = func(queue *PriorityQueue, pInfo *framework.QueuedPodInfo) {
|
||||||
queue.lock.Lock()
|
|
||||||
// Update pod condition to unschedulable.
|
// Update pod condition to unschedulable.
|
||||||
podutil.UpdatePodCondition(&pInfo.Pod.Status, &v1.PodCondition{
|
podutil.UpdatePodCondition(&pInfo.Pod.Status, &v1.PodCondition{
|
||||||
Type: v1.PodScheduled,
|
Type: v1.PodScheduled,
|
||||||
@ -1062,12 +1043,9 @@ var (
|
|||||||
Message: "fake scheduling failure",
|
Message: "fake scheduling failure",
|
||||||
})
|
})
|
||||||
queue.unschedulableQ.addOrUpdate(pInfo)
|
queue.unschedulableQ.addOrUpdate(pInfo)
|
||||||
queue.lock.Unlock()
|
|
||||||
}
|
}
|
||||||
addPodBackoffQ = func(queue *PriorityQueue, pInfo *framework.QueuedPodInfo) {
|
addPodBackoffQ = func(queue *PriorityQueue, pInfo *framework.QueuedPodInfo) {
|
||||||
queue.lock.Lock()
|
|
||||||
queue.podBackoffQ.Add(pInfo)
|
queue.podBackoffQ.Add(pInfo)
|
||||||
queue.lock.Unlock()
|
|
||||||
}
|
}
|
||||||
moveAllToActiveOrBackoffQ = func(queue *PriorityQueue, _ *framework.QueuedPodInfo) {
|
moveAllToActiveOrBackoffQ = func(queue *PriorityQueue, _ *framework.QueuedPodInfo) {
|
||||||
queue.MoveAllToActiveOrBackoffQueue("test")
|
queue.MoveAllToActiveOrBackoffQueue("test")
|
||||||
@ -1165,14 +1143,13 @@ func TestPodTimestamp(t *testing.T) {
|
|||||||
|
|
||||||
for _, test := range tests {
|
for _, test := range tests {
|
||||||
t.Run(test.name, func(t *testing.T) {
|
t.Run(test.name, func(t *testing.T) {
|
||||||
queue := createAndRunPriorityQueue(newDefaultQueueSort(), WithClock(clock.NewFakeClock(timestamp)))
|
queue := NewPriorityQueue(newDefaultQueueSort(), WithClock(clock.NewFakeClock(timestamp)))
|
||||||
var podInfoList []*framework.QueuedPodInfo
|
var podInfoList []*framework.QueuedPodInfo
|
||||||
|
|
||||||
for i, op := range test.operations {
|
for i, op := range test.operations {
|
||||||
op(queue, test.operands[i])
|
op(queue, test.operands[i])
|
||||||
}
|
}
|
||||||
|
|
||||||
queue.lock.Lock()
|
|
||||||
for i := 0; i < len(test.expected); i++ {
|
for i := 0; i < len(test.expected); i++ {
|
||||||
if pInfo, err := queue.activeQ.Pop(); err != nil {
|
if pInfo, err := queue.activeQ.Pop(); err != nil {
|
||||||
t.Errorf("Error while popping the head of the queue: %v", err)
|
t.Errorf("Error while popping the head of the queue: %v", err)
|
||||||
@ -1180,7 +1157,6 @@ func TestPodTimestamp(t *testing.T) {
|
|||||||
podInfoList = append(podInfoList, pInfo.(*framework.QueuedPodInfo))
|
podInfoList = append(podInfoList, pInfo.(*framework.QueuedPodInfo))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
queue.lock.Unlock()
|
|
||||||
|
|
||||||
if !reflect.DeepEqual(test.expected, podInfoList) {
|
if !reflect.DeepEqual(test.expected, podInfoList) {
|
||||||
t.Errorf("Unexpected QueuedPodInfo list. Expected: %v, got: %v",
|
t.Errorf("Unexpected QueuedPodInfo list. Expected: %v, got: %v",
|
||||||
@ -1324,7 +1300,7 @@ scheduler_pending_pods{queue="unschedulable"} 0
|
|||||||
for _, test := range tests {
|
for _, test := range tests {
|
||||||
t.Run(test.name, func(t *testing.T) {
|
t.Run(test.name, func(t *testing.T) {
|
||||||
resetMetrics()
|
resetMetrics()
|
||||||
queue := createAndRunPriorityQueue(newDefaultQueueSort(), WithClock(clock.NewFakeClock(timestamp)))
|
queue := NewPriorityQueue(newDefaultQueueSort(), WithClock(clock.NewFakeClock(timestamp)))
|
||||||
for i, op := range test.operations {
|
for i, op := range test.operations {
|
||||||
for _, pInfo := range test.operands[i] {
|
for _, pInfo := range test.operands[i] {
|
||||||
op(queue, pInfo)
|
op(queue, pInfo)
|
||||||
@ -1353,7 +1329,7 @@ func TestPerPodSchedulingMetrics(t *testing.T) {
|
|||||||
// Case 1: A pod is created and scheduled after 1 attempt. The queue operations are
|
// Case 1: A pod is created and scheduled after 1 attempt. The queue operations are
|
||||||
// Add -> Pop.
|
// Add -> Pop.
|
||||||
c := clock.NewFakeClock(timestamp)
|
c := clock.NewFakeClock(timestamp)
|
||||||
queue := createAndRunPriorityQueue(newDefaultQueueSort(), WithClock(c))
|
queue := NewPriorityQueue(newDefaultQueueSort(), WithClock(c))
|
||||||
queue.Add(pod)
|
queue.Add(pod)
|
||||||
pInfo, err := queue.Pop()
|
pInfo, err := queue.Pop()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -1364,7 +1340,7 @@ func TestPerPodSchedulingMetrics(t *testing.T) {
|
|||||||
// Case 2: A pod is created and scheduled after 2 attempts. The queue operations are
|
// Case 2: A pod is created and scheduled after 2 attempts. The queue operations are
|
||||||
// Add -> Pop -> AddUnschedulableIfNotPresent -> flushUnschedulableQLeftover -> Pop.
|
// Add -> Pop -> AddUnschedulableIfNotPresent -> flushUnschedulableQLeftover -> Pop.
|
||||||
c = clock.NewFakeClock(timestamp)
|
c = clock.NewFakeClock(timestamp)
|
||||||
queue = createAndRunPriorityQueue(newDefaultQueueSort(), WithClock(c))
|
queue = NewPriorityQueue(newDefaultQueueSort(), WithClock(c))
|
||||||
queue.Add(pod)
|
queue.Add(pod)
|
||||||
pInfo, err = queue.Pop()
|
pInfo, err = queue.Pop()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -1384,7 +1360,7 @@ func TestPerPodSchedulingMetrics(t *testing.T) {
|
|||||||
// Case 3: Similar to case 2, but before the second pop, call update, the queue operations are
|
// Case 3: Similar to case 2, but before the second pop, call update, the queue operations are
|
||||||
// Add -> Pop -> AddUnschedulableIfNotPresent -> flushUnschedulableQLeftover -> Update -> Pop.
|
// Add -> Pop -> AddUnschedulableIfNotPresent -> flushUnschedulableQLeftover -> Update -> Pop.
|
||||||
c = clock.NewFakeClock(timestamp)
|
c = clock.NewFakeClock(timestamp)
|
||||||
queue = createAndRunPriorityQueue(newDefaultQueueSort(), WithClock(c))
|
queue = NewPriorityQueue(newDefaultQueueSort(), WithClock(c))
|
||||||
queue.Add(pod)
|
queue.Add(pod)
|
||||||
pInfo, err = queue.Pop()
|
pInfo, err = queue.Pop()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -1483,8 +1459,6 @@ func TestIncomingPodsMetrics(t *testing.T) {
|
|||||||
t.Run(test.name, func(t *testing.T) {
|
t.Run(test.name, func(t *testing.T) {
|
||||||
metrics.SchedulerQueueIncomingPods.Reset()
|
metrics.SchedulerQueueIncomingPods.Reset()
|
||||||
queue := NewPriorityQueue(newDefaultQueueSort(), WithClock(clock.NewFakeClock(timestamp)))
|
queue := NewPriorityQueue(newDefaultQueueSort(), WithClock(clock.NewFakeClock(timestamp)))
|
||||||
queue.Close()
|
|
||||||
queue.Run()
|
|
||||||
for _, op := range test.operations {
|
for _, op := range test.operations {
|
||||||
for _, pInfo := range pInfos {
|
for _, pInfo := range pInfos {
|
||||||
op(queue, pInfo)
|
op(queue, pInfo)
|
||||||
@ -1508,12 +1482,6 @@ func checkPerPodSchedulingMetrics(name string, t *testing.T, pInfo *framework.Qu
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func createAndRunPriorityQueue(lessFn framework.LessFunc, opts ...Option) *PriorityQueue {
|
|
||||||
q := NewPriorityQueue(lessFn, opts...)
|
|
||||||
q.Run()
|
|
||||||
return q
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestBackOffFlow(t *testing.T) {
|
func TestBackOffFlow(t *testing.T) {
|
||||||
cl := clock.NewFakeClock(time.Now())
|
cl := clock.NewFakeClock(time.Now())
|
||||||
q := NewPriorityQueue(newDefaultQueueSort(), WithClock(cl))
|
q := NewPriorityQueue(newDefaultQueueSort(), WithClock(cl))
|
||||||
@ -1558,11 +1526,9 @@ func TestBackOffFlow(t *testing.T) {
|
|||||||
// An event happens.
|
// An event happens.
|
||||||
q.MoveAllToActiveOrBackoffQueue("deleted pod")
|
q.MoveAllToActiveOrBackoffQueue("deleted pod")
|
||||||
|
|
||||||
q.lock.RLock()
|
|
||||||
if _, ok, _ := q.podBackoffQ.Get(podInfo); !ok {
|
if _, ok, _ := q.podBackoffQ.Get(podInfo); !ok {
|
||||||
t.Errorf("pod %v is not in the backoff queue", podID)
|
t.Errorf("pod %v is not in the backoff queue", podID)
|
||||||
}
|
}
|
||||||
q.lock.RUnlock()
|
|
||||||
|
|
||||||
// Check backoff duration.
|
// Check backoff duration.
|
||||||
deadline := q.getBackoffTime(podInfo)
|
deadline := q.getBackoffTime(podInfo)
|
||||||
@ -1575,19 +1541,15 @@ func TestBackOffFlow(t *testing.T) {
|
|||||||
cl.Step(time.Millisecond)
|
cl.Step(time.Millisecond)
|
||||||
q.flushBackoffQCompleted()
|
q.flushBackoffQCompleted()
|
||||||
// Still in backoff queue after an early flush.
|
// Still in backoff queue after an early flush.
|
||||||
q.lock.RLock()
|
|
||||||
if _, ok, _ := q.podBackoffQ.Get(podInfo); !ok {
|
if _, ok, _ := q.podBackoffQ.Get(podInfo); !ok {
|
||||||
t.Errorf("pod %v is not in the backoff queue", podID)
|
t.Errorf("pod %v is not in the backoff queue", podID)
|
||||||
}
|
}
|
||||||
q.lock.RUnlock()
|
|
||||||
// Moved out of the backoff queue after timeout.
|
// Moved out of the backoff queue after timeout.
|
||||||
cl.Step(backoff)
|
cl.Step(backoff)
|
||||||
q.flushBackoffQCompleted()
|
q.flushBackoffQCompleted()
|
||||||
q.lock.RLock()
|
|
||||||
if _, ok, _ := q.podBackoffQ.Get(podInfo); ok {
|
if _, ok, _ := q.podBackoffQ.Get(podInfo); ok {
|
||||||
t.Errorf("pod %v is still in the backoff queue", podID)
|
t.Errorf("pod %v is still in the backoff queue", podID)
|
||||||
}
|
}
|
||||||
q.lock.RUnlock()
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user