From 8c2a540cdcde6d80850b0bd7fd7970a9e24782d8 Mon Sep 17 00:00:00 2001 From: Wei Huang Date: Tue, 5 May 2020 00:11:04 -0700 Subject: [PATCH] Refine schedulerQueue test to avoid unneeded locks - revert PR 88331 - remove createAndRunPriorityQueue() - PriorityQueue is created and not run by default - flushXYZ() is called on demand by manipulating FakeClock and Step() --- .../internal/queue/scheduling_queue_test.go | 92 ++++++------------- 1 file changed, 27 insertions(+), 65 deletions(-) diff --git a/pkg/scheduler/internal/queue/scheduling_queue_test.go b/pkg/scheduler/internal/queue/scheduling_queue_test.go index 2d136606877..ccb299c933b 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue_test.go +++ b/pkg/scheduler/internal/queue/scheduling_queue_test.go @@ -107,8 +107,6 @@ var highPriorityPod, highPriNominatedPod, medPriorityPod, unschedulablePod = v1. } func getUnschedulablePod(p *PriorityQueue, pod *v1.Pod) *v1.Pod { - p.lock.Lock() - defer p.lock.Unlock() pInfo := p.unschedulableQ.get(pod) if pInfo != nil { return pInfo.Pod @@ -117,7 +115,7 @@ func getUnschedulablePod(p *PriorityQueue, pod *v1.Pod) *v1.Pod { } func TestPriorityQueue_Add(t *testing.T) { - q := createAndRunPriorityQueue(newDefaultQueueSort()) + q := NewPriorityQueue(newDefaultQueueSort()) if err := q.Add(&medPriorityPod); err != nil { t.Errorf("add failed: %v", err) } @@ -159,7 +157,7 @@ func newDefaultQueueSort() framework.LessFunc { } func TestPriorityQueue_AddWithReversePriorityLessFunc(t *testing.T) { - q := createAndRunPriorityQueue(newDefaultQueueSort()) + q := NewPriorityQueue(newDefaultQueueSort()) if err := q.Add(&medPriorityPod); err != nil { t.Errorf("add failed: %v", err) } @@ -175,7 +173,7 @@ func TestPriorityQueue_AddWithReversePriorityLessFunc(t *testing.T) { } func TestPriorityQueue_AddUnschedulableIfNotPresent(t *testing.T) { - q := createAndRunPriorityQueue(newDefaultQueueSort()) + q := NewPriorityQueue(newDefaultQueueSort()) q.Add(&highPriNominatedPod) q.AddUnschedulableIfNotPresent(newQueuedPodInfoNoTimestamp(&highPriNominatedPod), q.SchedulingCycle()) // Must not add anything. q.AddUnschedulableIfNotPresent(newQueuedPodInfoNoTimestamp(&unschedulablePod), q.SchedulingCycle()) @@ -207,7 +205,7 @@ func TestPriorityQueue_AddUnschedulableIfNotPresent(t *testing.T) { // Pods in and before current scheduling cycle will be put back to activeQueue // if we were trying to schedule them when we received move request. func TestPriorityQueue_AddUnschedulableIfNotPresent_Backoff(t *testing.T) { - q := createAndRunPriorityQueue(newDefaultQueueSort(), WithClock(clock.NewFakeClock(time.Now()))) + q := NewPriorityQueue(newDefaultQueueSort(), WithClock(clock.NewFakeClock(time.Now()))) totalNum := 10 expectedPods := make([]v1.Pod, 0, totalNum) for i := 0; i < totalNum; i++ { @@ -262,7 +260,6 @@ func TestPriorityQueue_AddUnschedulableIfNotPresent_Backoff(t *testing.T) { } } - q.lock.RLock() // Since there was a move request at the same cycle as "oldCycle", these pods // should be in the backoff queue. for i := 1; i < totalNum; i++ { @@ -270,11 +267,10 @@ func TestPriorityQueue_AddUnschedulableIfNotPresent_Backoff(t *testing.T) { t.Errorf("Expected %v to be added to podBackoffQ.", expectedPods[i].Name) } } - q.lock.RUnlock() } func TestPriorityQueue_Pop(t *testing.T) { - q := createAndRunPriorityQueue(newDefaultQueueSort()) + q := NewPriorityQueue(newDefaultQueueSort()) wg := sync.WaitGroup{} wg.Add(1) go func() { @@ -291,44 +287,36 @@ func TestPriorityQueue_Pop(t *testing.T) { } func TestPriorityQueue_Update(t *testing.T) { - q := createAndRunPriorityQueue(newDefaultQueueSort()) + q := NewPriorityQueue(newDefaultQueueSort()) q.Update(nil, &highPriorityPod) - q.lock.RLock() if _, exists, _ := q.activeQ.Get(newQueuedPodInfoNoTimestamp(&highPriorityPod)); !exists { t.Errorf("Expected %v to be added to activeQ.", highPriorityPod.Name) } - q.lock.RUnlock() if len(q.nominatedPods.nominatedPods) != 0 { t.Errorf("Expected nomindatePods to be empty: %v", q.nominatedPods) } // Update highPriorityPod and add a nominatedNodeName to it. q.Update(&highPriorityPod, &highPriNominatedPod) - q.lock.RLock() if q.activeQ.Len() != 1 { t.Error("Expected only one item in activeQ.") } - q.lock.RUnlock() if len(q.nominatedPods.nominatedPods) != 1 { t.Errorf("Expected one item in nomindatePods map: %v", q.nominatedPods) } // Updating an unschedulable pod which is not in any of the two queues, should // add the pod to activeQ. q.Update(&unschedulablePod, &unschedulablePod) - q.lock.RLock() if _, exists, _ := q.activeQ.Get(newQueuedPodInfoNoTimestamp(&unschedulablePod)); !exists { t.Errorf("Expected %v to be added to activeQ.", unschedulablePod.Name) } - q.lock.RUnlock() // Updating a pod that is already in activeQ, should not change it. q.Update(&unschedulablePod, &unschedulablePod) if len(q.unschedulableQ.podInfoMap) != 0 { t.Error("Expected unschedulableQ to be empty.") } - q.lock.RLock() if _, exists, _ := q.activeQ.Get(newQueuedPodInfoNoTimestamp(&unschedulablePod)); !exists { t.Errorf("Expected: %v to be added to activeQ.", unschedulablePod.Name) } - q.lock.RUnlock() if p, err := q.Pop(); err != nil || p.Pod != &highPriNominatedPod { t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPod.Name, p.Pod.Name) } @@ -347,20 +335,18 @@ func TestPriorityQueue_Update(t *testing.T) { } func TestPriorityQueue_Delete(t *testing.T) { - q := createAndRunPriorityQueue(newDefaultQueueSort()) + q := NewPriorityQueue(newDefaultQueueSort()) q.Update(&highPriorityPod, &highPriNominatedPod) q.Add(&unschedulablePod) if err := q.Delete(&highPriNominatedPod); err != nil { t.Errorf("delete failed: %v", err) } - q.lock.RLock() if _, exists, _ := q.activeQ.Get(newQueuedPodInfoNoTimestamp(&unschedulablePod)); !exists { t.Errorf("Expected %v to be in activeQ.", unschedulablePod.Name) } if _, exists, _ := q.activeQ.Get(newQueuedPodInfoNoTimestamp(&highPriNominatedPod)); exists { t.Errorf("Didn't expect %v to be in activeQ.", highPriorityPod.Name) } - q.lock.RUnlock() if len(q.nominatedPods.nominatedPods) != 1 { t.Errorf("Expected nomindatePods to have only 'unschedulablePod': %v", q.nominatedPods.nominatedPods) } @@ -373,13 +359,11 @@ func TestPriorityQueue_Delete(t *testing.T) { } func TestPriorityQueue_MoveAllToActiveOrBackoffQueue(t *testing.T) { - q := createAndRunPriorityQueue(newDefaultQueueSort()) + q := NewPriorityQueue(newDefaultQueueSort()) q.Add(&medPriorityPod) q.AddUnschedulableIfNotPresent(q.newQueuedPodInfo(&unschedulablePod), q.SchedulingCycle()) q.AddUnschedulableIfNotPresent(q.newQueuedPodInfo(&highPriorityPod), q.SchedulingCycle()) q.MoveAllToActiveOrBackoffQueue("test") - q.lock.RLock() - defer q.lock.RUnlock() if q.activeQ.Len() != 1 { t.Error("Expected 1 item to be in activeQ") } @@ -425,7 +409,7 @@ func TestPriorityQueue_AssignedPodAdded(t *testing.T) { } c := clock.NewFakeClock(time.Now()) - q := createAndRunPriorityQueue(newDefaultQueueSort(), WithClock(c)) + q := NewPriorityQueue(newDefaultQueueSort(), WithClock(c)) q.Add(&medPriorityPod) // Add a couple of pods to the unschedulableQ. q.AddUnschedulableIfNotPresent(q.newQueuedPodInfo(&unschedulablePod), q.SchedulingCycle()) @@ -439,11 +423,9 @@ func TestPriorityQueue_AssignedPodAdded(t *testing.T) { if getUnschedulablePod(q, affinityPod) != nil { t.Error("affinityPod is still in the unschedulableQ.") } - q.lock.RLock() if _, exists, _ := q.activeQ.Get(newQueuedPodInfoNoTimestamp(affinityPod)); !exists { t.Error("affinityPod is not moved to activeQ.") } - q.lock.RUnlock() // Check that the other pod is still in the unschedulableQ. if getUnschedulablePod(q, &unschedulablePod) == nil { t.Error("unschedulablePod is not in the unschedulableQ.") @@ -451,7 +433,7 @@ func TestPriorityQueue_AssignedPodAdded(t *testing.T) { } func TestPriorityQueue_NominatedPodsForNode(t *testing.T) { - q := createAndRunPriorityQueue(newDefaultQueueSort()) + q := NewPriorityQueue(newDefaultQueueSort()) q.Add(&medPriorityPod) q.Add(&unschedulablePod) q.Add(&highPriorityPod) @@ -476,7 +458,7 @@ func TestPriorityQueue_PendingPods(t *testing.T) { return pendingSet } - q := createAndRunPriorityQueue(newDefaultQueueSort()) + q := NewPriorityQueue(newDefaultQueueSort()) q.Add(&medPriorityPod) q.AddUnschedulableIfNotPresent(q.newQueuedPodInfo(&unschedulablePod), q.SchedulingCycle()) q.AddUnschedulableIfNotPresent(q.newQueuedPodInfo(&highPriorityPod), q.SchedulingCycle()) @@ -493,7 +475,7 @@ func TestPriorityQueue_PendingPods(t *testing.T) { } func TestPriorityQueue_UpdateNominatedPodForNode(t *testing.T) { - q := createAndRunPriorityQueue(newDefaultQueueSort()) + q := NewPriorityQueue(newDefaultQueueSort()) if err := q.Add(&medPriorityPod); err != nil { t.Errorf("add failed: %v", err) } @@ -562,7 +544,7 @@ func TestPriorityQueue_UpdateNominatedPodForNode(t *testing.T) { } func TestPriorityQueue_NewWithOptions(t *testing.T) { - q := createAndRunPriorityQueue( + q := NewPriorityQueue( newDefaultQueueSort(), WithPodInitialBackoffDuration(2*time.Second), WithPodMaxBackoffDuration(20*time.Second), @@ -735,7 +717,7 @@ func TestSchedulingQueue_Close(t *testing.T) { }{ { name: "PriorityQueue close", - q: createAndRunPriorityQueue(newDefaultQueueSort()), + q: NewPriorityQueue(newDefaultQueueSort()), expectedErr: fmt.Errorf(queueClosed), }, } @@ -764,7 +746,8 @@ func TestSchedulingQueue_Close(t *testing.T) { // ensures that an unschedulable pod does not block head of the queue when there // are frequent events that move pods to the active queue. func TestRecentlyTriedPodsGoBack(t *testing.T) { - q := createAndRunPriorityQueue(newDefaultQueueSort()) + c := clock.NewFakeClock(time.Now()) + q := NewPriorityQueue(newDefaultQueueSort(), WithClock(c)) // Add a few pods to priority queue. for i := 0; i < 5; i++ { p := v1.Pod{ @@ -782,6 +765,7 @@ func TestRecentlyTriedPodsGoBack(t *testing.T) { } q.Add(&p) } + c.Step(time.Microsecond) // Simulate a pod being popped by the scheduler, determined unschedulable, and // then moved back to the active queue. p1, err := q.Pop() @@ -798,6 +782,7 @@ func TestRecentlyTriedPodsGoBack(t *testing.T) { }) // Put in the unschedulable queue. q.AddUnschedulableIfNotPresent(p1, q.SchedulingCycle()) + c.Step(DefaultPodInitialBackoffDuration) // Move all unschedulable pods to the active queue. q.MoveAllToActiveOrBackoffQueue("test") // Simulation is over. Now let's pop all pods. The pod popped first should be @@ -819,7 +804,7 @@ func TestRecentlyTriedPodsGoBack(t *testing.T) { // are frequent events that move pods to the active queue. func TestPodFailedSchedulingMultipleTimesDoesNotBlockNewerPod(t *testing.T) { c := clock.NewFakeClock(time.Now()) - q := createAndRunPriorityQueue(newDefaultQueueSort(), WithClock(c)) + q := NewPriorityQueue(newDefaultQueueSort(), WithClock(c)) // Add an unschedulable pod to a priority queue. // This makes a situation that the pod was tried to schedule @@ -910,7 +895,7 @@ func TestPodFailedSchedulingMultipleTimesDoesNotBlockNewerPod(t *testing.T) { // TestHighPriorityBackoff tests that a high priority pod does not block // other pods if it is unschedulable func TestHighPriorityBackoff(t *testing.T) { - q := createAndRunPriorityQueue(newDefaultQueueSort()) + q := NewPriorityQueue(newDefaultQueueSort()) midPod := v1.Pod{ ObjectMeta: metav1.ObjectMeta{ @@ -974,7 +959,7 @@ func TestHighPriorityBackoff(t *testing.T) { // activeQ after one minutes if it is in unschedulableQ func TestHighPriorityFlushUnschedulableQLeftover(t *testing.T) { c := clock.NewFakeClock(time.Now()) - q := createAndRunPriorityQueue(newDefaultQueueSort(), WithClock(c)) + q := NewPriorityQueue(newDefaultQueueSort(), WithClock(c)) midPod := v1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: "test-midpod", @@ -1021,6 +1006,7 @@ func TestHighPriorityFlushUnschedulableQLeftover(t *testing.T) { q.AddUnschedulableIfNotPresent(q.newQueuedPodInfo(&highPod), q.SchedulingCycle()) q.AddUnschedulableIfNotPresent(q.newQueuedPodInfo(&midPod), q.SchedulingCycle()) c.Step(unschedulableQTimeInterval + time.Second) + q.flushUnschedulableQLeftover() if p, err := q.Pop(); err != nil || p.Pod != &highPod { t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPod.Name, p.Pod.Name) @@ -1043,17 +1029,12 @@ var ( queue.AddUnschedulableIfNotPresent(pInfo, -1) } addPodActiveQ = func(queue *PriorityQueue, pInfo *framework.QueuedPodInfo) { - queue.lock.Lock() queue.activeQ.Add(pInfo) - queue.lock.Unlock() } updatePodActiveQ = func(queue *PriorityQueue, pInfo *framework.QueuedPodInfo) { - queue.lock.Lock() queue.activeQ.Update(pInfo) - queue.lock.Unlock() } addPodUnschedulableQ = func(queue *PriorityQueue, pInfo *framework.QueuedPodInfo) { - queue.lock.Lock() // Update pod condition to unschedulable. podutil.UpdatePodCondition(&pInfo.Pod.Status, &v1.PodCondition{ Type: v1.PodScheduled, @@ -1062,12 +1043,9 @@ var ( Message: "fake scheduling failure", }) queue.unschedulableQ.addOrUpdate(pInfo) - queue.lock.Unlock() } addPodBackoffQ = func(queue *PriorityQueue, pInfo *framework.QueuedPodInfo) { - queue.lock.Lock() queue.podBackoffQ.Add(pInfo) - queue.lock.Unlock() } moveAllToActiveOrBackoffQ = func(queue *PriorityQueue, _ *framework.QueuedPodInfo) { queue.MoveAllToActiveOrBackoffQueue("test") @@ -1165,14 +1143,13 @@ func TestPodTimestamp(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - queue := createAndRunPriorityQueue(newDefaultQueueSort(), WithClock(clock.NewFakeClock(timestamp))) + queue := NewPriorityQueue(newDefaultQueueSort(), WithClock(clock.NewFakeClock(timestamp))) var podInfoList []*framework.QueuedPodInfo for i, op := range test.operations { op(queue, test.operands[i]) } - queue.lock.Lock() for i := 0; i < len(test.expected); i++ { if pInfo, err := queue.activeQ.Pop(); err != nil { t.Errorf("Error while popping the head of the queue: %v", err) @@ -1180,7 +1157,6 @@ func TestPodTimestamp(t *testing.T) { podInfoList = append(podInfoList, pInfo.(*framework.QueuedPodInfo)) } } - queue.lock.Unlock() if !reflect.DeepEqual(test.expected, podInfoList) { t.Errorf("Unexpected QueuedPodInfo list. Expected: %v, got: %v", @@ -1324,7 +1300,7 @@ scheduler_pending_pods{queue="unschedulable"} 0 for _, test := range tests { t.Run(test.name, func(t *testing.T) { resetMetrics() - queue := createAndRunPriorityQueue(newDefaultQueueSort(), WithClock(clock.NewFakeClock(timestamp))) + queue := NewPriorityQueue(newDefaultQueueSort(), WithClock(clock.NewFakeClock(timestamp))) for i, op := range test.operations { for _, pInfo := range test.operands[i] { op(queue, pInfo) @@ -1353,7 +1329,7 @@ func TestPerPodSchedulingMetrics(t *testing.T) { // Case 1: A pod is created and scheduled after 1 attempt. The queue operations are // Add -> Pop. c := clock.NewFakeClock(timestamp) - queue := createAndRunPriorityQueue(newDefaultQueueSort(), WithClock(c)) + queue := NewPriorityQueue(newDefaultQueueSort(), WithClock(c)) queue.Add(pod) pInfo, err := queue.Pop() if err != nil { @@ -1364,7 +1340,7 @@ func TestPerPodSchedulingMetrics(t *testing.T) { // Case 2: A pod is created and scheduled after 2 attempts. The queue operations are // Add -> Pop -> AddUnschedulableIfNotPresent -> flushUnschedulableQLeftover -> Pop. c = clock.NewFakeClock(timestamp) - queue = createAndRunPriorityQueue(newDefaultQueueSort(), WithClock(c)) + queue = NewPriorityQueue(newDefaultQueueSort(), WithClock(c)) queue.Add(pod) pInfo, err = queue.Pop() if err != nil { @@ -1384,7 +1360,7 @@ func TestPerPodSchedulingMetrics(t *testing.T) { // Case 3: Similar to case 2, but before the second pop, call update, the queue operations are // Add -> Pop -> AddUnschedulableIfNotPresent -> flushUnschedulableQLeftover -> Update -> Pop. c = clock.NewFakeClock(timestamp) - queue = createAndRunPriorityQueue(newDefaultQueueSort(), WithClock(c)) + queue = NewPriorityQueue(newDefaultQueueSort(), WithClock(c)) queue.Add(pod) pInfo, err = queue.Pop() if err != nil { @@ -1483,8 +1459,6 @@ func TestIncomingPodsMetrics(t *testing.T) { t.Run(test.name, func(t *testing.T) { metrics.SchedulerQueueIncomingPods.Reset() queue := NewPriorityQueue(newDefaultQueueSort(), WithClock(clock.NewFakeClock(timestamp))) - queue.Close() - queue.Run() for _, op := range test.operations { for _, pInfo := range pInfos { op(queue, pInfo) @@ -1508,12 +1482,6 @@ func checkPerPodSchedulingMetrics(name string, t *testing.T, pInfo *framework.Qu } } -func createAndRunPriorityQueue(lessFn framework.LessFunc, opts ...Option) *PriorityQueue { - q := NewPriorityQueue(lessFn, opts...) - q.Run() - return q -} - func TestBackOffFlow(t *testing.T) { cl := clock.NewFakeClock(time.Now()) q := NewPriorityQueue(newDefaultQueueSort(), WithClock(cl)) @@ -1558,11 +1526,9 @@ func TestBackOffFlow(t *testing.T) { // An event happens. q.MoveAllToActiveOrBackoffQueue("deleted pod") - q.lock.RLock() if _, ok, _ := q.podBackoffQ.Get(podInfo); !ok { t.Errorf("pod %v is not in the backoff queue", podID) } - q.lock.RUnlock() // Check backoff duration. deadline := q.getBackoffTime(podInfo) @@ -1575,19 +1541,15 @@ func TestBackOffFlow(t *testing.T) { cl.Step(time.Millisecond) q.flushBackoffQCompleted() // Still in backoff queue after an early flush. - q.lock.RLock() if _, ok, _ := q.podBackoffQ.Get(podInfo); !ok { t.Errorf("pod %v is not in the backoff queue", podID) } - q.lock.RUnlock() // Moved out of the backoff queue after timeout. cl.Step(backoff) q.flushBackoffQCompleted() - q.lock.RLock() if _, ok, _ := q.podBackoffQ.Get(podInfo); ok { t.Errorf("pod %v is still in the backoff queue", podID) } - q.lock.RUnlock() }) } }