From 939e98135cde89f55944b93d2774389f8dceaa38 Mon Sep 17 00:00:00 2001 From: Wei Huang Date: Sat, 22 Jan 2022 11:00:56 -0800 Subject: [PATCH] sched: fix goroutine leak in unit tests --- pkg/scheduler/eventhandlers_test.go | 7 +- .../interpodaffinity/filtering_test.go | 6 +- .../plugins/interpodaffinity/scoring_test.go | 6 +- .../podtopologyspread/filtering_test.go | 3 +- .../plugins/podtopologyspread/scoring_test.go | 6 +- .../volume_restrictions_test.go | 27 ++-- pkg/scheduler/generic_scheduler_test.go | 3 +- .../internal/queue/scheduling_queue_test.go | 119 +++++++++++++----- pkg/scheduler/scheduler_test.go | 77 ++++++------ 9 files changed, 165 insertions(+), 89 deletions(-) diff --git a/pkg/scheduler/eventhandlers_test.go b/pkg/scheduler/eventhandlers_test.go index 0f53940fd9f..e092697f449 100644 --- a/pkg/scheduler/eventhandlers_test.go +++ b/pkg/scheduler/eventhandlers_test.go @@ -422,10 +422,11 @@ func TestAddAllEventHandlers(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - stopCh := make(chan struct{}) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() testSched := Scheduler{ - StopEverything: stopCh, - SchedulingQueue: queue.NewTestQueue(context.Background(), nil), + StopEverything: ctx.Done(), + SchedulingQueue: queue.NewTestQueue(ctx, nil), } client := fake.NewSimpleClientset() diff --git a/pkg/scheduler/framework/plugins/interpodaffinity/filtering_test.go b/pkg/scheduler/framework/plugins/interpodaffinity/filtering_test.go index f243459fbdc..5c1774bd655 100644 --- a/pkg/scheduler/framework/plugins/interpodaffinity/filtering_test.go +++ b/pkg/scheduler/framework/plugins/interpodaffinity/filtering_test.go @@ -995,7 +995,8 @@ func TestRequiredAffinitySingleNode(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - ctx := context.Background() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() snapshot := cache.NewSnapshot(test.pods, []*v1.Node{test.node}) fts := feature.Features{EnablePodAffinityNamespaceSelector: !test.disableNSSelector} p := plugintesting.SetupPluginWithInformers(ctx, t, frameworkruntime.FactoryAdapter(fts, New), &config.InterPodAffinityArgs{}, snapshot, namespaces) @@ -1860,7 +1861,8 @@ func TestRequiredAffinityMultipleNodes(t *testing.T) { for indexTest, test := range tests { t.Run(test.name, func(t *testing.T) { - ctx := context.Background() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() snapshot := cache.NewSnapshot(test.pods, test.nodes) p := plugintesting.SetupPluginWithInformers(ctx, t, frameworkruntime.FactoryAdapter(feature.Features{}, New), &config.InterPodAffinityArgs{}, snapshot, []runtime.Object{ diff --git a/pkg/scheduler/framework/plugins/interpodaffinity/scoring_test.go b/pkg/scheduler/framework/plugins/interpodaffinity/scoring_test.go index 6eb0f541096..031319f033b 100644 --- a/pkg/scheduler/framework/plugins/interpodaffinity/scoring_test.go +++ b/pkg/scheduler/framework/plugins/interpodaffinity/scoring_test.go @@ -742,7 +742,8 @@ func TestPreferredAffinity(t *testing.T) { } for _, test := range tests { t.Run(test.name, func(t *testing.T) { - ctx := context.Background() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() state := framework.NewCycleState() fts := feature.Features{EnablePodAffinityNamespaceSelector: !test.disableNSSelector} p := plugintesting.SetupPluginWithInformers(ctx, t, frameworkruntime.FactoryAdapter(fts, New), &config.InterPodAffinityArgs{HardPodAffinityWeight: 1}, cache.NewSnapshot(test.pods, test.nodes), namespaces) @@ -904,7 +905,8 @@ func TestPreferredAffinityWithHardPodAffinitySymmetricWeight(t *testing.T) { } for _, test := range tests { t.Run(test.name, func(t *testing.T) { - ctx := context.Background() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() state := framework.NewCycleState() fts := feature.Features{EnablePodAffinityNamespaceSelector: !test.disableNSSelector} p := plugintesting.SetupPluginWithInformers(ctx, t, frameworkruntime.FactoryAdapter(fts, New), &config.InterPodAffinityArgs{HardPodAffinityWeight: test.hardPodAffinityWeight}, cache.NewSnapshot(test.pods, test.nodes), namespaces) diff --git a/pkg/scheduler/framework/plugins/podtopologyspread/filtering_test.go b/pkg/scheduler/framework/plugins/podtopologyspread/filtering_test.go index 9789dbf518e..ef4a7ee3c96 100644 --- a/pkg/scheduler/framework/plugins/podtopologyspread/filtering_test.go +++ b/pkg/scheduler/framework/plugins/podtopologyspread/filtering_test.go @@ -513,7 +513,8 @@ func TestPreFilterState(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - ctx := context.Background() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() args := &config.PodTopologySpreadArgs{ DefaultConstraints: tt.defaultConstraints, DefaultingType: config.ListDefaulting, diff --git a/pkg/scheduler/framework/plugins/podtopologyspread/scoring_test.go b/pkg/scheduler/framework/plugins/podtopologyspread/scoring_test.go index 0813ae28957..f5c9c84430e 100644 --- a/pkg/scheduler/framework/plugins/podtopologyspread/scoring_test.go +++ b/pkg/scheduler/framework/plugins/podtopologyspread/scoring_test.go @@ -244,7 +244,8 @@ func TestPreScoreStateEmptyNodes(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - ctx := context.Background() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() informerFactory := informers.NewSharedInformerFactory(fake.NewSimpleClientset(tt.objs...), 0) f, err := frameworkruntime.NewFramework(nil, nil, frameworkruntime.WithSnapshotSharedLister(cache.NewSnapshot(nil, tt.nodes)), @@ -869,7 +870,8 @@ func BenchmarkTestDefaultEvenPodsSpreadPriority(b *testing.B) { client := fake.NewSimpleClientset( &v1.Service{Spec: v1.ServiceSpec{Selector: map[string]string{"foo": ""}}}, ) - ctx := context.Background() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() informerFactory := informers.NewSharedInformerFactory(client, 0) f, err := frameworkruntime.NewFramework(nil, nil, frameworkruntime.WithSnapshotSharedLister(snapshot), diff --git a/pkg/scheduler/framework/plugins/volumerestrictions/volume_restrictions_test.go b/pkg/scheduler/framework/plugins/volumerestrictions/volume_restrictions_test.go index 2e3d3e7c091..8f8b30e592b 100644 --- a/pkg/scheduler/framework/plugins/volumerestrictions/volume_restrictions_test.go +++ b/pkg/scheduler/framework/plugins/volumerestrictions/volume_restrictions_test.go @@ -73,7 +73,9 @@ func TestGCEDiskConflicts(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - p := newPlugin(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + p := newPlugin(ctx, t) gotStatus := p.(framework.FilterPlugin).Filter(context.Background(), nil, test.pod, test.nodeInfo) if !reflect.DeepEqual(gotStatus, test.wantStatus) { t.Errorf("status does not match: %v, want: %v", gotStatus, test.wantStatus) @@ -121,7 +123,9 @@ func TestAWSDiskConflicts(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - p := newPlugin(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + p := newPlugin(ctx, t) gotStatus := p.(framework.FilterPlugin).Filter(context.Background(), nil, test.pod, test.nodeInfo) if !reflect.DeepEqual(gotStatus, test.wantStatus) { t.Errorf("status does not match: %v, want: %v", gotStatus, test.wantStatus) @@ -175,7 +179,9 @@ func TestRBDDiskConflicts(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - p := newPlugin(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + p := newPlugin(ctx, t) gotStatus := p.(framework.FilterPlugin).Filter(context.Background(), nil, test.pod, test.nodeInfo) if !reflect.DeepEqual(gotStatus, test.wantStatus) { t.Errorf("status does not match: %v, want: %v", gotStatus, test.wantStatus) @@ -229,7 +235,9 @@ func TestISCSIDiskConflicts(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - p := newPlugin(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + p := newPlugin(ctx, t) gotStatus := p.(framework.FilterPlugin).Filter(context.Background(), nil, test.pod, test.nodeInfo) if !reflect.DeepEqual(gotStatus, test.wantStatus) { t.Errorf("status does not match: %v, want: %v", gotStatus, test.wantStatus) @@ -355,7 +363,9 @@ func TestAccessModeConflicts(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - p := newPluginWithListers(t, test.existingPods, test.existingNodes, test.existingPVCs, test.enableReadWriteOncePod) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + p := newPluginWithListers(ctx, t, test.existingPods, test.existingNodes, test.existingPVCs, test.enableReadWriteOncePod) gotStatus := p.(framework.PreFilterPlugin).PreFilter(context.Background(), nil, test.pod) if !reflect.DeepEqual(gotStatus, test.wantStatus) { t.Errorf("status does not match: %+v, want: %+v", gotStatus, test.wantStatus) @@ -364,12 +374,11 @@ func TestAccessModeConflicts(t *testing.T) { } } -func newPlugin(t *testing.T) framework.Plugin { - return newPluginWithListers(t, nil, nil, nil, true) +func newPlugin(ctx context.Context, t *testing.T) framework.Plugin { + return newPluginWithListers(ctx, t, nil, nil, nil, true) } -func newPluginWithListers(t *testing.T, pods []*v1.Pod, nodes []*v1.Node, pvcs []*v1.PersistentVolumeClaim, enableReadWriteOncePod bool) framework.Plugin { - ctx := context.Background() +func newPluginWithListers(ctx context.Context, t *testing.T, pods []*v1.Pod, nodes []*v1.Node, pvcs []*v1.PersistentVolumeClaim, enableReadWriteOncePod bool) framework.Plugin { pluginFactory := func(plArgs runtime.Object, fh framework.Handle) (framework.Plugin, error) { return New(plArgs, fh, feature.Features{ EnableReadWriteOncePod: enableReadWriteOncePod, diff --git a/pkg/scheduler/generic_scheduler_test.go b/pkg/scheduler/generic_scheduler_test.go index c3dbc80b182..c0da6f3f83a 100644 --- a/pkg/scheduler/generic_scheduler_test.go +++ b/pkg/scheduler/generic_scheduler_test.go @@ -976,7 +976,8 @@ func TestGenericScheduler(t *testing.T) { cache.AddNode(node) } - ctx := context.Background() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() cs := clientsetfake.NewSimpleClientset() informerFactory := informers.NewSharedInformerFactory(cs, 0) for _, pvc := range test.pvcs { diff --git a/pkg/scheduler/internal/queue/scheduling_queue_test.go b/pkg/scheduler/internal/queue/scheduling_queue_test.go index 789dd94351d..1add2f561eb 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue_test.go +++ b/pkg/scheduler/internal/queue/scheduling_queue_test.go @@ -131,7 +131,9 @@ func getUnschedulablePod(p *PriorityQueue, pod *v1.Pod) *v1.Pod { func TestPriorityQueue_Add(t *testing.T) { objs := []runtime.Object{medPriorityPodInfo.Pod, unschedulablePodInfo.Pod, highPriorityPodInfo.Pod} - q := NewTestQueueWithObjects(context.Background(), newDefaultQueueSort(), objs) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + q := NewTestQueueWithObjects(ctx, newDefaultQueueSort(), objs) if err := q.Add(medPriorityPodInfo.Pod); err != nil { t.Errorf("add failed: %v", err) } @@ -174,7 +176,9 @@ func newDefaultQueueSort() framework.LessFunc { func TestPriorityQueue_AddWithReversePriorityLessFunc(t *testing.T) { objs := []runtime.Object{medPriorityPodInfo.Pod, highPriorityPodInfo.Pod} - q := NewTestQueueWithObjects(context.Background(), newDefaultQueueSort(), objs) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + q := NewTestQueueWithObjects(ctx, newDefaultQueueSort(), objs) if err := q.Add(medPriorityPodInfo.Pod); err != nil { t.Errorf("add failed: %v", err) } @@ -191,7 +195,9 @@ func TestPriorityQueue_AddWithReversePriorityLessFunc(t *testing.T) { func TestPriorityQueue_AddUnschedulableIfNotPresent(t *testing.T) { objs := []runtime.Object{highPriNominatedPodInfo.Pod, unschedulablePodInfo.Pod} - q := NewTestQueueWithObjects(context.Background(), newDefaultQueueSort(), objs) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + q := NewTestQueueWithObjects(ctx, newDefaultQueueSort(), objs) q.Add(highPriNominatedPodInfo.Pod) q.AddUnschedulableIfNotPresent(newQueuedPodInfoForLookup(highPriNominatedPodInfo.Pod), q.SchedulingCycle()) // Must not add anything. q.AddUnschedulableIfNotPresent(newQueuedPodInfoForLookup(unschedulablePodInfo.Pod), q.SchedulingCycle()) @@ -223,7 +229,9 @@ func TestPriorityQueue_AddUnschedulableIfNotPresent(t *testing.T) { // Pods in and before current scheduling cycle will be put back to activeQueue // if we were trying to schedule them when we received move request. func TestPriorityQueue_AddUnschedulableIfNotPresent_Backoff(t *testing.T) { - q := NewTestQueue(context.Background(), newDefaultQueueSort(), WithClock(testingclock.NewFakeClock(time.Now()))) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + q := NewTestQueue(ctx, newDefaultQueueSort(), WithClock(testingclock.NewFakeClock(time.Now()))) totalNum := 10 expectedPods := make([]v1.Pod, 0, totalNum) for i := 0; i < totalNum; i++ { @@ -289,7 +297,9 @@ func TestPriorityQueue_AddUnschedulableIfNotPresent_Backoff(t *testing.T) { func TestPriorityQueue_Pop(t *testing.T) { objs := []runtime.Object{medPriorityPodInfo.Pod} - q := NewTestQueueWithObjects(context.Background(), newDefaultQueueSort(), objs) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + q := NewTestQueueWithObjects(ctx, newDefaultQueueSort(), objs) wg := sync.WaitGroup{} wg.Add(1) go func() { @@ -308,7 +318,9 @@ func TestPriorityQueue_Pop(t *testing.T) { func TestPriorityQueue_Update(t *testing.T) { objs := []runtime.Object{highPriorityPodInfo.Pod, unschedulablePodInfo.Pod, medPriorityPodInfo.Pod} c := testingclock.NewFakeClock(time.Now()) - q := NewTestQueueWithObjects(context.Background(), newDefaultQueueSort(), objs, WithClock(c)) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + q := NewTestQueueWithObjects(ctx, newDefaultQueueSort(), objs, WithClock(c)) q.Update(nil, highPriorityPodInfo.Pod) if _, exists, _ := q.activeQ.Get(newQueuedPodInfoForLookup(highPriorityPodInfo.Pod)); !exists { t.Errorf("Expected %v to be added to activeQ.", highPriorityPodInfo.Pod.Name) @@ -389,7 +401,9 @@ func TestPriorityQueue_Update(t *testing.T) { func TestPriorityQueue_Delete(t *testing.T) { objs := []runtime.Object{highPriorityPodInfo.Pod, unschedulablePodInfo.Pod} - q := NewTestQueueWithObjects(context.Background(), newDefaultQueueSort(), objs) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + q := NewTestQueueWithObjects(ctx, newDefaultQueueSort(), objs) q.Update(highPriorityPodInfo.Pod, highPriNominatedPodInfo.Pod) q.Add(unschedulablePodInfo.Pod) if err := q.Delete(highPriNominatedPodInfo.Pod); err != nil { @@ -449,7 +463,9 @@ func TestPriorityQueue_Activate(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { var objs []runtime.Object - q := NewTestQueueWithObjects(context.Background(), newDefaultQueueSort(), objs) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + q := NewTestQueueWithObjects(ctx, newDefaultQueueSort(), objs) // Prepare activeQ/unschedulableQ/podBackoffQ according to the table for _, qPodInfo := range tt.qPodInfoInActiveQ { @@ -554,7 +570,9 @@ func BenchmarkMoveAllToActiveOrBackoffQueue(b *testing.B) { } } - q := NewTestQueue(context.Background(), newDefaultQueueSort(), WithClock(c), WithClusterEventMap(m)) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + q := NewTestQueue(ctx, newDefaultQueueSort(), WithClock(c), WithClusterEventMap(m)) // Init pods in unschedulableQ. for j := 0; j < podsInUnschedulableQ; j++ { @@ -600,7 +618,9 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueue(t *testing.T) { m := map[framework.ClusterEvent]sets.String{ {Resource: framework.Node, ActionType: framework.Add}: sets.NewString("fooPlugin"), } - q := NewTestQueue(context.Background(), newDefaultQueueSort(), WithClock(c), WithClusterEventMap(m)) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + q := NewTestQueue(ctx, newDefaultQueueSort(), WithClock(c), WithClusterEventMap(m)) q.Add(medPriorityPodInfo.Pod) q.AddUnschedulableIfNotPresent(q.newQueuedPodInfo(unschedulablePodInfo.Pod, "fooPlugin"), q.SchedulingCycle()) q.AddUnschedulableIfNotPresent(q.newQueuedPodInfo(highPriorityPodInfo.Pod, "fooPlugin"), q.SchedulingCycle()) @@ -687,7 +707,9 @@ func TestPriorityQueue_AssignedPodAdded(t *testing.T) { c := testingclock.NewFakeClock(time.Now()) m := map[framework.ClusterEvent]sets.String{AssignedPodAdd: sets.NewString("fakePlugin")} - q := NewTestQueue(context.Background(), newDefaultQueueSort(), WithClock(c), WithClusterEventMap(m)) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + q := NewTestQueue(ctx, newDefaultQueueSort(), WithClock(c), WithClusterEventMap(m)) q.Add(medPriorityPodInfo.Pod) // Add a couple of pods to the unschedulableQ. q.AddUnschedulableIfNotPresent(q.newQueuedPodInfo(unschedulablePodInfo.Pod, "fakePlugin"), q.SchedulingCycle()) @@ -712,7 +734,9 @@ func TestPriorityQueue_AssignedPodAdded(t *testing.T) { func TestPriorityQueue_NominatedPodsForNode(t *testing.T) { objs := []runtime.Object{medPriorityPodInfo.Pod, unschedulablePodInfo.Pod, highPriorityPodInfo.Pod} - q := NewTestQueueWithObjects(context.Background(), newDefaultQueueSort(), objs) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + q := NewTestQueueWithObjects(ctx, newDefaultQueueSort(), objs) q.Add(medPriorityPodInfo.Pod) q.Add(unschedulablePodInfo.Pod) q.Add(highPriorityPodInfo.Pod) @@ -766,7 +790,8 @@ func TestPriorityQueue_NominatedPodDeleted(t *testing.T) { // Build a PriorityQueue. q := NewPriorityQueue(newDefaultQueueSort(), informerFactory, WithPodNominator(NewPodNominator(podLister))) - ctx := context.Background() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() informerFactory.Start(ctx.Done()) informerFactory.WaitForCacheSync(ctx.Done()) @@ -793,7 +818,9 @@ func TestPriorityQueue_PendingPods(t *testing.T) { return pendingSet } - q := NewTestQueue(context.Background(), newDefaultQueueSort()) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + q := NewTestQueue(ctx, newDefaultQueueSort()) q.Add(medPriorityPodInfo.Pod) q.AddUnschedulableIfNotPresent(q.newQueuedPodInfo(unschedulablePodInfo.Pod), q.SchedulingCycle()) q.AddUnschedulableIfNotPresent(q.newQueuedPodInfo(highPriorityPodInfo.Pod), q.SchedulingCycle()) @@ -811,7 +838,9 @@ func TestPriorityQueue_PendingPods(t *testing.T) { func TestPriorityQueue_UpdateNominatedPodForNode(t *testing.T) { objs := []runtime.Object{medPriorityPodInfo.Pod, unschedulablePodInfo.Pod, highPriorityPodInfo.Pod} - q := NewTestQueueWithObjects(context.Background(), newDefaultQueueSort(), objs) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + q := NewTestQueueWithObjects(ctx, newDefaultQueueSort(), objs) if err := q.Add(medPriorityPodInfo.Pod); err != nil { t.Errorf("add failed: %v", err) } @@ -882,7 +911,9 @@ func TestPriorityQueue_UpdateNominatedPodForNode(t *testing.T) { } func TestPriorityQueue_NewWithOptions(t *testing.T) { - q := NewTestQueue(context.Background(), + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + q := NewTestQueue(ctx, newDefaultQueueSort(), WithPodInitialBackoffDuration(2*time.Second), WithPodMaxBackoffDuration(20*time.Second), @@ -1048,7 +1079,9 @@ func TestUnschedulablePodsMap(t *testing.T) { } func TestSchedulingQueue_Close(t *testing.T) { - q := NewTestQueue(context.Background(), newDefaultQueueSort()) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + q := NewTestQueue(ctx, newDefaultQueueSort()) wantErr := fmt.Errorf(queueClosed) wg := sync.WaitGroup{} wg.Add(1) @@ -1072,7 +1105,9 @@ func TestSchedulingQueue_Close(t *testing.T) { // are frequent events that move pods to the active queue. func TestRecentlyTriedPodsGoBack(t *testing.T) { c := testingclock.NewFakeClock(time.Now()) - q := NewTestQueue(context.Background(), newDefaultQueueSort(), WithClock(c)) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + q := NewTestQueue(ctx, newDefaultQueueSort(), WithClock(c)) // Add a few pods to priority queue. for i := 0; i < 5; i++ { p := v1.Pod{ @@ -1129,7 +1164,9 @@ func TestRecentlyTriedPodsGoBack(t *testing.T) { // are frequent events that move pods to the active queue. func TestPodFailedSchedulingMultipleTimesDoesNotBlockNewerPod(t *testing.T) { c := testingclock.NewFakeClock(time.Now()) - q := NewTestQueue(context.Background(), newDefaultQueueSort(), WithClock(c)) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + q := NewTestQueue(ctx, newDefaultQueueSort(), WithClock(c)) // Add an unschedulable pod to a priority queue. // This makes a situation that the pod was tried to schedule @@ -1220,7 +1257,9 @@ func TestPodFailedSchedulingMultipleTimesDoesNotBlockNewerPod(t *testing.T) { // TestHighPriorityBackoff tests that a high priority pod does not block // other pods if it is unschedulable func TestHighPriorityBackoff(t *testing.T) { - q := NewTestQueue(context.Background(), newDefaultQueueSort()) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + q := NewTestQueue(ctx, newDefaultQueueSort()) midPod := v1.Pod{ ObjectMeta: metav1.ObjectMeta{ @@ -1287,7 +1326,9 @@ func TestHighPriorityFlushUnschedulableQLeftover(t *testing.T) { m := map[framework.ClusterEvent]sets.String{ NodeAdd: sets.NewString("fakePlugin"), } - q := NewTestQueue(context.Background(), newDefaultQueueSort(), WithClock(c), WithClusterEventMap(m)) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + q := NewTestQueue(ctx, newDefaultQueueSort(), WithClock(c), WithClusterEventMap(m)) midPod := v1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: "test-midpod", @@ -1471,7 +1512,9 @@ func TestPodTimestamp(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - queue := NewTestQueue(context.Background(), newDefaultQueueSort(), WithClock(testingclock.NewFakeClock(timestamp))) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + queue := NewTestQueue(ctx, newDefaultQueueSort(), WithClock(testingclock.NewFakeClock(timestamp))) var podInfoList []*framework.QueuedPodInfo for i, op := range test.operations { @@ -1628,7 +1671,9 @@ scheduler_pending_pods{queue="unschedulable"} 0 for _, test := range tests { t.Run(test.name, func(t *testing.T) { resetMetrics() - queue := NewTestQueue(context.Background(), newDefaultQueueSort(), WithClock(testingclock.NewFakeClock(timestamp))) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + queue := NewTestQueue(ctx, newDefaultQueueSort(), WithClock(testingclock.NewFakeClock(timestamp))) for i, op := range test.operations { for _, pInfo := range test.operands[i] { op(queue, pInfo) @@ -1657,7 +1702,9 @@ func TestPerPodSchedulingMetrics(t *testing.T) { // Case 1: A pod is created and scheduled after 1 attempt. The queue operations are // Add -> Pop. c := testingclock.NewFakeClock(timestamp) - queue := NewTestQueue(context.Background(), newDefaultQueueSort(), WithClock(c)) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + queue := NewTestQueue(ctx, newDefaultQueueSort(), WithClock(c)) queue.Add(pod) pInfo, err := queue.Pop() if err != nil { @@ -1668,7 +1715,7 @@ func TestPerPodSchedulingMetrics(t *testing.T) { // Case 2: A pod is created and scheduled after 2 attempts. The queue operations are // Add -> Pop -> AddUnschedulableIfNotPresent -> flushUnschedulableQLeftover -> Pop. c = testingclock.NewFakeClock(timestamp) - queue = NewTestQueue(context.Background(), newDefaultQueueSort(), WithClock(c)) + queue = NewTestQueue(ctx, newDefaultQueueSort(), WithClock(c)) queue.Add(pod) pInfo, err = queue.Pop() if err != nil { @@ -1688,7 +1735,7 @@ func TestPerPodSchedulingMetrics(t *testing.T) { // Case 3: Similar to case 2, but before the second pop, call update, the queue operations are // Add -> Pop -> AddUnschedulableIfNotPresent -> flushUnschedulableQLeftover -> Update -> Pop. c = testingclock.NewFakeClock(timestamp) - queue = NewTestQueue(context.Background(), newDefaultQueueSort(), WithClock(c)) + queue = NewTestQueue(ctx, newDefaultQueueSort(), WithClock(c)) queue.Add(pod) pInfo, err = queue.Pop() if err != nil { @@ -1786,7 +1833,9 @@ func TestIncomingPodsMetrics(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { metrics.SchedulerQueueIncomingPods.Reset() - queue := NewTestQueue(context.Background(), newDefaultQueueSort(), WithClock(testingclock.NewFakeClock(timestamp))) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + queue := NewTestQueue(ctx, newDefaultQueueSort(), WithClock(testingclock.NewFakeClock(timestamp))) for _, op := range test.operations { for _, pInfo := range pInfos { op(queue, pInfo) @@ -1812,7 +1861,9 @@ func checkPerPodSchedulingMetrics(name string, t *testing.T, pInfo *framework.Qu func TestBackOffFlow(t *testing.T) { cl := testingclock.NewFakeClock(time.Now()) - q := NewTestQueue(context.Background(), newDefaultQueueSort(), WithClock(cl)) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + q := NewTestQueue(ctx, newDefaultQueueSort(), WithClock(cl)) steps := []struct { wantBackoff time.Duration }{ @@ -1971,7 +2022,9 @@ func TestPodMatchesEvent(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - q := NewTestQueue(context.Background(), newDefaultQueueSort()) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + q := NewTestQueue(ctx, newDefaultQueueSort()) q.clusterEventMap = tt.clusterEventMap if got := q.podMatchesEvent(tt.podInfo, tt.event); got != tt.want { t.Errorf("Want %v, but got %v", tt.want, got) @@ -2026,7 +2079,9 @@ func TestMoveAllToActiveOrBackoffQueue_PreEnqueueChecks(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - q := NewTestQueue(context.Background(), newDefaultQueueSort()) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + q := NewTestQueue(ctx, newDefaultQueueSort()) for _, podInfo := range tt.podInfos { q.AddUnschedulableIfNotPresent(podInfo, q.schedulingCycle) } @@ -2100,7 +2155,9 @@ func TestPriorityQueue_calculateBackoffDuration(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - q := NewTestQueue(context.Background(), newDefaultQueueSort(), WithPodInitialBackoffDuration(tt.initialBackoffDuration), WithPodMaxBackoffDuration(tt.maxBackoffDuration)) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + q := NewTestQueue(ctx, newDefaultQueueSort(), WithPodInitialBackoffDuration(tt.initialBackoffDuration), WithPodMaxBackoffDuration(tt.maxBackoffDuration)) if got := q.calculateBackoffDuration(tt.podInfo); got != tt.want { t.Errorf("PriorityQueue.calculateBackoffDuration() = %v, want %v", got, tt.want) } diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index 10e6e8d4e8e..2a77bc6d540 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -438,6 +438,8 @@ func TestSchedulerScheduleOne(t *testing.T) { t.Fatal(err) } + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() s := &Scheduler{ SchedulerCache: sCache, Algorithm: item.algo, @@ -452,7 +454,7 @@ func TestSchedulerScheduleOne(t *testing.T) { Profiles: profile.Map{ testSchedulerName: fwk, }, - SchedulingQueue: internalqueue.NewTestQueue(context.Background(), nil), + SchedulingQueue: internalqueue.NewTestQueue(ctx, nil), } called := make(chan struct{}) stopFunc := eventBroadcaster.StartEventWatcher(func(obj runtime.Object) { @@ -462,7 +464,7 @@ func TestSchedulerScheduleOne(t *testing.T) { } close(called) }) - s.scheduleOne(context.Background()) + s.scheduleOne(ctx) <-called if e, a := item.expectAssumedPod, gotAssumedPod; !reflect.DeepEqual(e, a) { t.Errorf("assumed pod: wanted %v, got %v", e, a) @@ -638,10 +640,10 @@ func TestSchedulerMultipleProfilesScheduling(t *testing.T) { } func TestSchedulerNoPhantomPodAfterExpire(t *testing.T) { - stop := make(chan struct{}) - defer close(stop) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() queuedPodStore := clientcache.NewFIFO(clientcache.MetaNamespaceKeyFunc) - scache := internalcache.New(100*time.Millisecond, stop) + scache := internalcache.New(100*time.Millisecond, ctx.Done()) pod := podWithPort("pod.Name", "", 8080) node := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "machine1", UID: types.UID("machine1")}} scache.AddNode(&node) @@ -653,7 +655,7 @@ func TestSchedulerNoPhantomPodAfterExpire(t *testing.T) { st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), st.RegisterPluginAsExtensions(nodeports.Name, nodeports.New, "Filter", "PreFilter"), } - scheduler, bindingChan, errChan := setupTestSchedulerWithOnePodOnNode(t, queuedPodStore, scache, informerFactory, stop, pod, &node, fns...) + scheduler, bindingChan, errChan := setupTestSchedulerWithOnePodOnNode(ctx, t, queuedPodStore, scache, informerFactory, pod, &node, fns...) waitPodExpireChan := make(chan struct{}) timeout := make(chan struct{}) @@ -689,7 +691,7 @@ func TestSchedulerNoPhantomPodAfterExpire(t *testing.T) { // We use conflicted pod ports to incur fit predicate failure if first pod not removed. secondPod := podWithPort("bar", "", 8080) queuedPodStore.Add(secondPod) - scheduler.scheduleOne(context.Background()) + scheduler.scheduleOne(ctx) select { case b := <-bindingChan: expectBinding := &v1.Binding{ @@ -705,10 +707,10 @@ func TestSchedulerNoPhantomPodAfterExpire(t *testing.T) { } func TestSchedulerNoPhantomPodAfterDelete(t *testing.T) { - stop := make(chan struct{}) - defer close(stop) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() queuedPodStore := clientcache.NewFIFO(clientcache.MetaNamespaceKeyFunc) - scache := internalcache.New(10*time.Minute, stop) + scache := internalcache.New(10*time.Minute, ctx.Done()) firstPod := podWithPort("pod.Name", "", 8080) node := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "machine1", UID: types.UID("machine1")}} scache.AddNode(&node) @@ -719,7 +721,7 @@ func TestSchedulerNoPhantomPodAfterDelete(t *testing.T) { st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), st.RegisterPluginAsExtensions(nodeports.Name, nodeports.New, "Filter", "PreFilter"), } - scheduler, bindingChan, errChan := setupTestSchedulerWithOnePodOnNode(t, queuedPodStore, scache, informerFactory, stop, firstPod, &node, fns...) + scheduler, bindingChan, errChan := setupTestSchedulerWithOnePodOnNode(ctx, t, queuedPodStore, scache, informerFactory, firstPod, &node, fns...) // We use conflicted pod ports to incur fit predicate failure. secondPod := podWithPort("bar", "", 8080) @@ -727,7 +729,7 @@ func TestSchedulerNoPhantomPodAfterDelete(t *testing.T) { // queuedPodStore: [bar:8080] // cache: [(assumed)foo:8080] - scheduler.scheduleOne(context.Background()) + scheduler.scheduleOne(ctx) select { case err := <-errChan: expectErr := &framework.FitError{ @@ -760,7 +762,7 @@ func TestSchedulerNoPhantomPodAfterDelete(t *testing.T) { } queuedPodStore.Add(secondPod) - scheduler.scheduleOne(context.Background()) + scheduler.scheduleOne(ctx) select { case b := <-bindingChan: expectBinding := &v1.Binding{ @@ -777,19 +779,19 @@ func TestSchedulerNoPhantomPodAfterDelete(t *testing.T) { // queuedPodStore: pods queued before processing. // cache: scheduler cache that might contain assumed pods. -func setupTestSchedulerWithOnePodOnNode(t *testing.T, queuedPodStore *clientcache.FIFO, scache internalcache.Cache, - informerFactory informers.SharedInformerFactory, stop chan struct{}, pod *v1.Pod, node *v1.Node, fns ...st.RegisterPluginFunc) (*Scheduler, chan *v1.Binding, chan error) { +func setupTestSchedulerWithOnePodOnNode(ctx context.Context, t *testing.T, queuedPodStore *clientcache.FIFO, scache internalcache.Cache, + informerFactory informers.SharedInformerFactory, pod *v1.Pod, node *v1.Node, fns ...st.RegisterPluginFunc) (*Scheduler, chan *v1.Binding, chan error) { - scheduler, bindingChan, errChan := setupTestScheduler(queuedPodStore, scache, informerFactory, nil, fns...) + scheduler, bindingChan, errChan := setupTestScheduler(ctx, queuedPodStore, scache, informerFactory, nil, fns...) - informerFactory.Start(stop) - informerFactory.WaitForCacheSync(stop) + informerFactory.Start(ctx.Done()) + informerFactory.WaitForCacheSync(ctx.Done()) queuedPodStore.Add(pod) // queuedPodStore: [foo:8080] // cache: [] - scheduler.scheduleOne(context.Background()) + scheduler.scheduleOne(ctx) // queuedPodStore: [] // cache: [(assumed)foo:8080] @@ -809,10 +811,10 @@ func setupTestSchedulerWithOnePodOnNode(t *testing.T, queuedPodStore *clientcach } func TestSchedulerFailedSchedulingReasons(t *testing.T) { - stop := make(chan struct{}) - defer close(stop) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() queuedPodStore := clientcache.NewFIFO(clientcache.MetaNamespaceKeyFunc) - scache := internalcache.New(10*time.Minute, stop) + scache := internalcache.New(10*time.Minute, ctx.Done()) // Design the baseline for the pods, and we will make nodes that don't fit it later. var cpu = int64(4) @@ -865,13 +867,13 @@ func TestSchedulerFailedSchedulingReasons(t *testing.T) { st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), st.RegisterPluginAsExtensions(noderesources.Name, frameworkruntime.FactoryAdapter(feature.Features{}, noderesources.NewFit), "Filter", "PreFilter"), } - scheduler, _, errChan := setupTestScheduler(queuedPodStore, scache, informerFactory, nil, fns...) + scheduler, _, errChan := setupTestScheduler(ctx, queuedPodStore, scache, informerFactory, nil, fns...) - informerFactory.Start(stop) - informerFactory.WaitForCacheSync(stop) + informerFactory.Start(ctx.Done()) + informerFactory.WaitForCacheSync(ctx.Done()) queuedPodStore.Add(podWithTooBigResourceRequests) - scheduler.scheduleOne(context.Background()) + scheduler.scheduleOne(ctx) select { case err := <-errChan: expectErr := &framework.FitError{ @@ -895,7 +897,7 @@ func TestSchedulerFailedSchedulingReasons(t *testing.T) { // queuedPodStore: pods queued before processing. // scache: scheduler cache that might contain assumed pods. -func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.Cache, informerFactory informers.SharedInformerFactory, broadcaster events.EventBroadcaster, fns ...st.RegisterPluginFunc) (*Scheduler, chan *v1.Binding, chan error) { +func setupTestScheduler(ctx context.Context, queuedPodStore *clientcache.FIFO, scache internalcache.Cache, informerFactory informers.SharedInformerFactory, broadcaster events.EventBroadcaster, fns ...st.RegisterPluginFunc) (*Scheduler, chan *v1.Binding, chan error) { bindingChan := make(chan *v1.Binding, 1) client := clientsetfake.NewSimpleClientset() client.PrependReactor("create", "pods", func(action clienttesting.Action) (bool, runtime.Object, error) { @@ -943,13 +945,13 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.C testSchedulerName: fwk, }, client: client, - SchedulingQueue: internalqueue.NewTestQueue(context.Background(), nil), + SchedulingQueue: internalqueue.NewTestQueue(ctx, nil), } return sched, bindingChan, errChan } -func setupTestSchedulerWithVolumeBinding(volumeBinder volumebinding.SchedulerVolumeBinder, stop <-chan struct{}, broadcaster events.EventBroadcaster) (*Scheduler, chan *v1.Binding, chan error) { +func setupTestSchedulerWithVolumeBinding(ctx context.Context, volumeBinder volumebinding.SchedulerVolumeBinder, broadcaster events.EventBroadcaster) (*Scheduler, chan *v1.Binding, chan error) { testNode := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "machine1", UID: types.UID("machine1")}} queuedPodStore := clientcache.NewFIFO(clientcache.MetaNamespaceKeyFunc) pod := podWithID("foo", "") @@ -957,7 +959,7 @@ func setupTestSchedulerWithVolumeBinding(volumeBinder volumebinding.SchedulerVol pod.Spec.Volumes = append(pod.Spec.Volumes, v1.Volume{Name: "testVol", VolumeSource: v1.VolumeSource{PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{ClaimName: "testPVC"}}}) queuedPodStore.Add(pod) - scache := internalcache.New(10*time.Minute, stop) + scache := internalcache.New(10*time.Minute, ctx.Done()) scache.AddNode(&testNode) testPVC := v1.PersistentVolumeClaim{ObjectMeta: metav1.ObjectMeta{Name: "testPVC", Namespace: pod.Namespace, UID: types.UID("testPVC")}} client := clientsetfake.NewSimpleClientset(&testNode, &testPVC) @@ -972,9 +974,9 @@ func setupTestSchedulerWithVolumeBinding(volumeBinder volumebinding.SchedulerVol return &volumebinding.VolumeBinding{Binder: volumeBinder, PVCLister: pvcInformer.Lister()}, nil }, "PreFilter", "Filter", "Reserve", "PreBind"), } - s, bindingChan, errChan := setupTestScheduler(queuedPodStore, scache, informerFactory, broadcaster, fns...) - informerFactory.Start(stop) - informerFactory.WaitForCacheSync(stop) + s, bindingChan, errChan := setupTestScheduler(ctx, queuedPodStore, scache, informerFactory, broadcaster, fns...) + informerFactory.Start(ctx.Done()) + informerFactory.WaitForCacheSync(ctx.Done()) return s, bindingChan, errChan } @@ -1079,9 +1081,10 @@ func TestSchedulerWithVolumeBinding(t *testing.T) { for _, item := range table { t.Run(item.name, func(t *testing.T) { - stop := make(chan struct{}) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() fakeVolumeBinder := volumebinding.NewFakeVolumeBinder(item.volumeBinderConfig) - s, bindingChan, errChan := setupTestSchedulerWithVolumeBinding(fakeVolumeBinder, stop, eventBroadcaster) + s, bindingChan, errChan := setupTestSchedulerWithVolumeBinding(ctx, fakeVolumeBinder, eventBroadcaster) eventChan := make(chan struct{}) stopFunc := eventBroadcaster.StartEventWatcher(func(obj runtime.Object) { e, _ := obj.(*eventsv1.Event) @@ -1090,7 +1093,7 @@ func TestSchedulerWithVolumeBinding(t *testing.T) { } close(eventChan) }) - s.scheduleOne(context.Background()) + s.scheduleOne(ctx) // Wait for pod to succeed or fail scheduling select { case <-eventChan: @@ -1127,8 +1130,6 @@ func TestSchedulerWithVolumeBinding(t *testing.T) { if item.expectBindCalled != fakeVolumeBinder.BindCalled { t.Errorf("expectedBindCall %v", item.expectBindCalled) } - - close(stop) }) } }