diff --git a/pkg/scheduler/extender_test.go b/pkg/scheduler/extender_test.go index 6286bbf85b7..64c1cd5b53c 100644 --- a/pkg/scheduler/extender_test.go +++ b/pkg/scheduler/extender_test.go @@ -283,8 +283,10 @@ func TestSchedulerWithExtenders(t *testing.T) { for _, name := range test.nodes { cache.AddNode(createNode(name)) } + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() fwk, err := st.NewFramework( - test.registerPlugins, "", + test.registerPlugins, "", ctx.Done(), runtime.WithClientSet(client), runtime.WithInformerFactory(informerFactory), runtime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())), @@ -304,7 +306,7 @@ func TestSchedulerWithExtenders(t *testing.T) { emptySnapshot, schedulerapi.DefaultPercentageOfNodesToScore) podIgnored := &v1.Pod{} - result, err := scheduler.SchedulePod(context.Background(), fwk, framework.NewCycleState(), podIgnored) + result, err := scheduler.SchedulePod(ctx, fwk, framework.NewCycleState(), podIgnored) if test.expectsErr { if err == nil { t.Errorf("Unexpected non-error, result %+v", result) diff --git a/pkg/scheduler/framework/plugins/defaultbinder/default_binder_test.go b/pkg/scheduler/framework/plugins/defaultbinder/default_binder_test.go index f09a9d0cd84..ac8c4fe0f86 100644 --- a/pkg/scheduler/framework/plugins/defaultbinder/default_binder_test.go +++ b/pkg/scheduler/framework/plugins/defaultbinder/default_binder_test.go @@ -25,7 +25,6 @@ import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes/fake" clienttesting "k8s.io/client-go/testing" frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime" @@ -53,6 +52,9 @@ func TestDefaultBinder(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + var gotBinding *v1.Binding client := fake.NewSimpleClientset(testPod) client.PrependReactor("create", "pods", func(action clienttesting.Action) (bool, runtime.Object, error) { @@ -66,12 +68,12 @@ func TestDefaultBinder(t *testing.T) { return true, gotBinding, nil }) - fh, err := frameworkruntime.NewFramework(nil, nil, wait.NeverStop, frameworkruntime.WithClientSet(client)) + fh, err := frameworkruntime.NewFramework(nil, nil, ctx.Done(), frameworkruntime.WithClientSet(client)) if err != nil { t.Fatal(err) } binder := &DefaultBinder{handle: fh} - status := binder.Bind(context.Background(), nil, testPod, "foohost.kubernetes.mydomain.com") + status := binder.Bind(ctx, nil, testPod, "foohost.kubernetes.mydomain.com") if got := status.AsError(); (tt.injectErr != nil) != (got != nil) { t.Errorf("got error %q, want %q", got, tt.injectErr) } diff --git a/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption_test.go b/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption_test.go index 9cc7641a5cd..b73c6194b2c 100644 --- a/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption_test.go +++ b/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption_test.go @@ -351,7 +351,9 @@ func TestPostFilter(t *testing.T) { if tt.extender != nil { extenders = append(extenders, tt.extender) } - f, err := st.NewFramework(registeredPlugins, "", + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + f, err := st.NewFramework(registeredPlugins, "", ctx.Done(), frameworkruntime.WithClientSet(cs), frameworkruntime.WithEventRecorder(&events.FakeRecorder{}), frameworkruntime.WithInformerFactory(informerFactory), @@ -371,11 +373,11 @@ func TestPostFilter(t *testing.T) { state := framework.NewCycleState() // Ensure is populated. - if _, status := f.RunPreFilterPlugins(context.Background(), state, tt.pod); !status.IsSuccess() { + if _, status := f.RunPreFilterPlugins(ctx, state, tt.pod); !status.IsSuccess() { t.Errorf("Unexpected PreFilter Status: %v", status) } - gotResult, gotStatus := p.PostFilter(context.TODO(), state, tt.pod, tt.filteredNodesStatuses) + gotResult, gotStatus := p.PostFilter(ctx, state, tt.pod, tt.filteredNodesStatuses) // As we cannot compare two errors directly due to miss the equal method for how to compare two errors, so just need to compare the reasons. if gotStatus.Code() == framework.Error { if diff := cmp.Diff(tt.wantStatus.Reasons(), gotStatus.Reasons()); diff != "" { @@ -1083,8 +1085,11 @@ func TestDryRunPreemption(t *testing.T) { // or minCandidateNodesAbsolute. This is only done in a handful of tests. parallelism = 1 } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() fwk, err := st.NewFramework( - registeredPlugins, "", + registeredPlugins, "", ctx.Done(), frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())), frameworkruntime.WithSnapshotSharedLister(snapshot), frameworkruntime.WithInformerFactory(informerFactory), @@ -1094,8 +1099,6 @@ func TestDryRunPreemption(t *testing.T) { t.Fatal(err) } - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() informerFactory.Start(ctx.Done()) informerFactory.WaitForCacheSync(ctx.Done()) @@ -1125,7 +1128,7 @@ func TestDryRunPreemption(t *testing.T) { for cycle, pod := range tt.testPods { state := framework.NewCycleState() // Some tests rely on PreFilter plugin to compute its CycleState. - if _, status := fwk.RunPreFilterPlugins(context.Background(), state, pod); !status.IsSuccess() { + if _, status := fwk.RunPreFilterPlugins(ctx, state, pod); !status.IsSuccess() { t.Errorf("cycle %d: Unexpected PreFilter Status: %v", cycle, status) } pe := preemption.Evaluator{ @@ -1137,7 +1140,7 @@ func TestDryRunPreemption(t *testing.T) { Interface: pl, } offset, numCandidates := pl.GetOffsetAndNumCandidates(int32(len(nodeInfos))) - got, _, _ := pe.DryRunPreemption(context.Background(), pod, nodeInfos, tt.pdbs, offset, numCandidates) + got, _, _ := pe.DryRunPreemption(ctx, pod, nodeInfos, tt.pdbs, offset, numCandidates) // Sort the values (inner victims) and the candidate itself (by its NominatedNodeName). for i := range got { victims := got[i].Victims().Pods @@ -1334,6 +1337,8 @@ func TestSelectBestCandidate(t *testing.T) { cs := clientsetfake.NewSimpleClientset(objs...) informerFactory := informers.NewSharedInformerFactory(cs, 0) snapshot := internalcache.NewSnapshot(tt.pods, nodes) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() fwk, err := st.NewFramework( []st.RegisterPluginFunc{ tt.registerPlugin, @@ -1341,6 +1346,7 @@ func TestSelectBestCandidate(t *testing.T) { st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), }, "", + ctx.Done(), frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())), frameworkruntime.WithSnapshotSharedLister(snapshot), ) @@ -1350,7 +1356,7 @@ func TestSelectBestCandidate(t *testing.T) { state := framework.NewCycleState() // Some tests rely on PreFilter plugin to compute its CycleState. - if _, status := fwk.RunPreFilterPlugins(context.Background(), state, tt.pod); !status.IsSuccess() { + if _, status := fwk.RunPreFilterPlugins(ctx, state, tt.pod); !status.IsSuccess() { t.Errorf("Unexpected PreFilter Status: %v", status) } nodeInfos, err := snapshot.NodeInfos().List() @@ -1373,7 +1379,7 @@ func TestSelectBestCandidate(t *testing.T) { Interface: pl, } offset, numCandidates := pl.GetOffsetAndNumCandidates(int32(len(nodeInfos))) - candidates, _, _ := pe.DryRunPreemption(context.Background(), tt.pod, nodeInfos, nil, offset, numCandidates) + candidates, _, _ := pe.DryRunPreemption(ctx, tt.pod, nodeInfos, nil, offset, numCandidates) s := pe.SelectCandidate(candidates) if s == nil || len(s.Name()) == 0 { return @@ -1445,7 +1451,9 @@ func TestPodEligibleToPreemptOthers(t *testing.T) { st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), } - f, err := st.NewFramework(registeredPlugins, "", + stopCh := make(chan struct{}) + defer close(stopCh) + f, err := st.NewFramework(registeredPlugins, "", stopCh, frameworkruntime.WithSnapshotSharedLister(internalcache.NewSnapshot(test.pods, nodes)), ) if err != nil { @@ -1639,10 +1647,10 @@ func TestPreempt(t *testing.T) { return true, nil, nil }) - stop := make(chan struct{}) - defer close(stop) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() - cache := internalcache.New(time.Duration(0), stop) + cache := internalcache.New(time.Duration(0), ctx.Done()) for _, pod := range test.pods { cache.AddPod(pod) } @@ -1678,6 +1686,7 @@ func TestPreempt(t *testing.T) { st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), }, "", + ctx.Done(), frameworkruntime.WithClientSet(client), frameworkruntime.WithEventRecorder(&events.FakeRecorder{}), frameworkruntime.WithExtenders(extenders), @@ -1691,7 +1700,7 @@ func TestPreempt(t *testing.T) { state := framework.NewCycleState() // Some tests rely on PreFilter plugin to compute its CycleState. - if _, s := fwk.RunPreFilterPlugins(context.Background(), state, test.pod); !s.IsSuccess() { + if _, s := fwk.RunPreFilterPlugins(ctx, state, test.pod); !s.IsSuccess() { t.Errorf("Unexpected preFilterStatus: %v", s) } // Call preempt and check the expected results. @@ -1710,7 +1719,7 @@ func TestPreempt(t *testing.T) { State: state, Interface: &pl, } - res, status := pe.Preempt(context.Background(), test.pod, make(framework.NodeToStatusMap)) + res, status := pe.Preempt(ctx, test.pod, make(framework.NodeToStatusMap)) if !status.IsSuccess() && !status.IsUnschedulable() { t.Errorf("unexpected error in preemption: %v", status.AsError()) } @@ -1746,7 +1755,7 @@ func TestPreempt(t *testing.T) { } // Call preempt again and make sure it doesn't preempt any more pods. - res, status = pe.Preempt(context.Background(), test.pod, make(framework.NodeToStatusMap)) + res, status = pe.Preempt(ctx, test.pod, make(framework.NodeToStatusMap)) if !status.IsSuccess() && !status.IsUnschedulable() { t.Errorf("unexpected error in preemption: %v", status.AsError()) } diff --git a/pkg/scheduler/framework/plugins/imagelocality/image_locality_test.go b/pkg/scheduler/framework/plugins/imagelocality/image_locality_test.go index 8995f21f982..47edef3477e 100644 --- a/pkg/scheduler/framework/plugins/imagelocality/image_locality_test.go +++ b/pkg/scheduler/framework/plugins/imagelocality/image_locality_test.go @@ -25,7 +25,6 @@ import ( "github.com/google/go-cmp/cmp" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/util/wait" "k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework/runtime" "k8s.io/kubernetes/pkg/scheduler/internal/cache" @@ -332,16 +331,18 @@ func TestImageLocalityPriority(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - snapshot := cache.NewSnapshot(nil, test.nodes) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + snapshot := cache.NewSnapshot(nil, test.nodes) state := framework.NewCycleState() - fh, _ := runtime.NewFramework(nil, nil, wait.NeverStop, runtime.WithSnapshotSharedLister(snapshot)) + fh, _ := runtime.NewFramework(nil, nil, ctx.Done(), runtime.WithSnapshotSharedLister(snapshot)) p, _ := New(nil, fh) var gotList framework.NodeScoreList for _, n := range test.nodes { nodeName := n.ObjectMeta.Name - score, status := p.(framework.ScorePlugin).Score(context.Background(), state, test.pod, nodeName) + score, status := p.(framework.ScorePlugin).Score(ctx, state, test.pod, nodeName) if !status.IsSuccess() { t.Errorf("unexpected error: %v", status) } diff --git a/pkg/scheduler/framework/plugins/nodeaffinity/node_affinity_test.go b/pkg/scheduler/framework/plugins/nodeaffinity/node_affinity_test.go index 37e66275f85..651f76baa06 100644 --- a/pkg/scheduler/framework/plugins/nodeaffinity/node_affinity_test.go +++ b/pkg/scheduler/framework/plugins/nodeaffinity/node_affinity_test.go @@ -24,7 +24,6 @@ import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/sets" - "k8s.io/apimachinery/pkg/util/wait" "k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework/runtime" @@ -1133,15 +1132,18 @@ func TestNodeAffinityPriority(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + state := framework.NewCycleState() - fh, _ := runtime.NewFramework(nil, nil, wait.NeverStop, runtime.WithSnapshotSharedLister(cache.NewSnapshot(nil, test.nodes))) + fh, _ := runtime.NewFramework(nil, nil, ctx.Done(), runtime.WithSnapshotSharedLister(cache.NewSnapshot(nil, test.nodes))) p, err := New(&test.args, fh) if err != nil { t.Fatalf("Creating plugin: %v", err) } var status *framework.Status if !test.disablePreScore { - status = p.(framework.PreScorePlugin).PreScore(context.Background(), state, test.pod, test.nodes) + status = p.(framework.PreScorePlugin).PreScore(ctx, state, test.pod, test.nodes) if !status.IsSuccess() { t.Errorf("unexpected error: %v", status) } @@ -1149,14 +1151,14 @@ func TestNodeAffinityPriority(t *testing.T) { var gotList framework.NodeScoreList for _, n := range test.nodes { nodeName := n.ObjectMeta.Name - score, status := p.(framework.ScorePlugin).Score(context.Background(), state, test.pod, nodeName) + score, status := p.(framework.ScorePlugin).Score(ctx, state, test.pod, nodeName) if !status.IsSuccess() { t.Errorf("unexpected error: %v", status) } gotList = append(gotList, framework.NodeScore{Name: nodeName, Score: score}) } - status = p.(framework.ScorePlugin).ScoreExtensions().NormalizeScore(context.Background(), state, test.pod, gotList) + status = p.(framework.ScorePlugin).ScoreExtensions().NormalizeScore(ctx, state, test.pod, gotList) if !status.IsSuccess() { t.Errorf("unexpected error: %v", status) } diff --git a/pkg/scheduler/framework/plugins/noderesources/balanced_allocation_test.go b/pkg/scheduler/framework/plugins/noderesources/balanced_allocation_test.go index 694fae89355..97a22283855 100644 --- a/pkg/scheduler/framework/plugins/noderesources/balanced_allocation_test.go +++ b/pkg/scheduler/framework/plugins/noderesources/balanced_allocation_test.go @@ -24,7 +24,6 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/util/wait" "k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature" @@ -351,10 +350,12 @@ func TestNodeResourcesBalancedAllocation(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { snapshot := cache.NewSnapshot(test.pods, test.nodes) - fh, _ := runtime.NewFramework(nil, nil, wait.NeverStop, runtime.WithSnapshotSharedLister(snapshot)) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + fh, _ := runtime.NewFramework(nil, nil, ctx.Done(), runtime.WithSnapshotSharedLister(snapshot)) p, _ := NewBalancedAllocation(&test.args, fh, feature.Features{}) for i := range test.nodes { - hostResult, err := p.(framework.ScorePlugin).Score(context.Background(), nil, test.pod, test.nodes[i].Name) + hostResult, err := p.(framework.ScorePlugin).Score(ctx, nil, test.pod, test.nodes[i].Name) if err != nil { t.Errorf("unexpected error: %v", err) } diff --git a/pkg/scheduler/framework/plugins/noderesources/fit_test.go b/pkg/scheduler/framework/plugins/noderesources/fit_test.go index f76de155105..08188da4998 100644 --- a/pkg/scheduler/framework/plugins/noderesources/fit_test.go +++ b/pkg/scheduler/framework/plugins/noderesources/fit_test.go @@ -24,7 +24,6 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" - "k8s.io/apimachinery/pkg/util/wait" "k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/framework" plfeature "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature" @@ -746,9 +745,12 @@ func TestFitScore(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + state := framework.NewCycleState() snapshot := cache.NewSnapshot(test.existingPods, test.nodes) - fh, _ := runtime.NewFramework(nil, nil, wait.NeverStop, runtime.WithSnapshotSharedLister(snapshot)) + fh, _ := runtime.NewFramework(nil, nil, ctx.Done(), runtime.WithSnapshotSharedLister(snapshot)) args := test.nodeResourcesFitArgs p, err := NewFit(&args, fh, plfeature.Features{}) if err != nil { @@ -757,7 +759,7 @@ func TestFitScore(t *testing.T) { var gotPriorities framework.NodeScoreList for _, n := range test.nodes { - score, status := p.(framework.ScorePlugin).Score(context.Background(), state, test.requestedPod, n.Name) + score, status := p.(framework.ScorePlugin).Score(ctx, state, test.requestedPod, n.Name) if !status.IsSuccess() { t.Errorf("unexpected error: %v", status) } diff --git a/pkg/scheduler/framework/plugins/noderesources/least_allocated_test.go b/pkg/scheduler/framework/plugins/noderesources/least_allocated_test.go index 4306b8f1841..011c351eea4 100644 --- a/pkg/scheduler/framework/plugins/noderesources/least_allocated_test.go +++ b/pkg/scheduler/framework/plugins/noderesources/least_allocated_test.go @@ -23,7 +23,6 @@ import ( "github.com/google/go-cmp/cmp" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/validation/field" - "k8s.io/apimachinery/pkg/util/wait" "k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/framework" plfeature "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature" @@ -375,9 +374,12 @@ func TestLeastAllocatedScoringStrategy(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + state := framework.NewCycleState() snapshot := cache.NewSnapshot(test.existingPods, test.nodes) - fh, _ := runtime.NewFramework(nil, nil, wait.NeverStop, runtime.WithSnapshotSharedLister(snapshot)) + fh, _ := runtime.NewFramework(nil, nil, ctx.Done(), runtime.WithSnapshotSharedLister(snapshot)) p, err := NewFit( &config.NodeResourcesFitArgs{ @@ -396,7 +398,7 @@ func TestLeastAllocatedScoringStrategy(t *testing.T) { var gotScores framework.NodeScoreList for _, n := range test.nodes { - score, status := p.(framework.ScorePlugin).Score(context.Background(), state, test.requestedPod, n.Name) + score, status := p.(framework.ScorePlugin).Score(ctx, state, test.requestedPod, n.Name) if !status.IsSuccess() { t.Errorf("unexpected error: %v", status) } diff --git a/pkg/scheduler/framework/plugins/noderesources/most_allocated_test.go b/pkg/scheduler/framework/plugins/noderesources/most_allocated_test.go index 211c397a5f6..d60a480d517 100644 --- a/pkg/scheduler/framework/plugins/noderesources/most_allocated_test.go +++ b/pkg/scheduler/framework/plugins/noderesources/most_allocated_test.go @@ -23,7 +23,6 @@ import ( "github.com/google/go-cmp/cmp" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/validation/field" - "k8s.io/apimachinery/pkg/util/wait" "k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/framework" plfeature "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature" @@ -331,9 +330,12 @@ func TestMostAllocatedScoringStrategy(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + state := framework.NewCycleState() snapshot := cache.NewSnapshot(test.existingPods, test.nodes) - fh, _ := runtime.NewFramework(nil, nil, wait.NeverStop, runtime.WithSnapshotSharedLister(snapshot)) + fh, _ := runtime.NewFramework(nil, nil, ctx.Done(), runtime.WithSnapshotSharedLister(snapshot)) p, err := NewFit( &config.NodeResourcesFitArgs{ @@ -352,7 +354,7 @@ func TestMostAllocatedScoringStrategy(t *testing.T) { var gotScores framework.NodeScoreList for _, n := range test.nodes { - score, status := p.(framework.ScorePlugin).Score(context.Background(), state, test.requestedPod, n.Name) + score, status := p.(framework.ScorePlugin).Score(ctx, state, test.requestedPod, n.Name) if !status.IsSuccess() { t.Errorf("unexpected error: %v", status) } diff --git a/pkg/scheduler/framework/plugins/noderesources/requested_to_capacity_ratio_test.go b/pkg/scheduler/framework/plugins/noderesources/requested_to_capacity_ratio_test.go index 0e72b8d2f84..71348ccbc27 100644 --- a/pkg/scheduler/framework/plugins/noderesources/requested_to_capacity_ratio_test.go +++ b/pkg/scheduler/framework/plugins/noderesources/requested_to_capacity_ratio_test.go @@ -108,9 +108,12 @@ func TestRequestedToCapacityRatioScoringStrategy(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + state := framework.NewCycleState() snapshot := cache.NewSnapshot(test.existingPods, test.nodes) - fh, _ := runtime.NewFramework(nil, nil, wait.NeverStop, runtime.WithSnapshotSharedLister(snapshot)) + fh, _ := runtime.NewFramework(nil, nil, ctx.Done(), runtime.WithSnapshotSharedLister(snapshot)) p, err := NewFit(&config.NodeResourcesFitArgs{ ScoringStrategy: &config.ScoringStrategy{ @@ -131,7 +134,7 @@ func TestRequestedToCapacityRatioScoringStrategy(t *testing.T) { var gotScores framework.NodeScoreList for _, n := range test.nodes { - score, status := p.(framework.ScorePlugin).Score(context.Background(), state, test.requestedPod, n.Name) + score, status := p.(framework.ScorePlugin).Score(ctx, state, test.requestedPod, n.Name) if !status.IsSuccess() { t.Errorf("unexpected error: %v", status) } diff --git a/pkg/scheduler/framework/plugins/podtopologyspread/scoring_test.go b/pkg/scheduler/framework/plugins/podtopologyspread/scoring_test.go index 863f4b48c68..6a602ea32fc 100644 --- a/pkg/scheduler/framework/plugins/podtopologyspread/scoring_test.go +++ b/pkg/scheduler/framework/plugins/podtopologyspread/scoring_test.go @@ -27,7 +27,6 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/sets" - "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/fake" "k8s.io/kubernetes/pkg/scheduler/apis/config" @@ -498,7 +497,7 @@ func TestPreScoreStateEmptyNodes(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() informerFactory := informers.NewSharedInformerFactory(fake.NewSimpleClientset(tt.objs...), 0) - f, err := frameworkruntime.NewFramework(nil, nil, wait.NeverStop, + f, err := frameworkruntime.NewFramework(nil, nil, ctx.Done(), frameworkruntime.WithSnapshotSharedLister(cache.NewSnapshot(nil, tt.nodes)), frameworkruntime.WithInformerFactory(informerFactory)) if err != nil { @@ -1382,7 +1381,7 @@ func BenchmarkTestDefaultEvenPodsSpreadPriority(b *testing.B) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() informerFactory := informers.NewSharedInformerFactory(client, 0) - f, err := frameworkruntime.NewFramework(nil, nil, wait.NeverStop, + f, err := frameworkruntime.NewFramework(nil, nil, ctx.Done(), frameworkruntime.WithSnapshotSharedLister(snapshot), frameworkruntime.WithInformerFactory(informerFactory)) if err != nil { diff --git a/pkg/scheduler/framework/plugins/selectorspread/selector_spread_perf_test.go b/pkg/scheduler/framework/plugins/selectorspread/selector_spread_perf_test.go index 509b11e54a4..c69b272017b 100644 --- a/pkg/scheduler/framework/plugins/selectorspread/selector_spread_perf_test.go +++ b/pkg/scheduler/framework/plugins/selectorspread/selector_spread_perf_test.go @@ -21,7 +21,6 @@ import ( "testing" v1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/fake" "k8s.io/kubernetes/pkg/scheduler/framework" @@ -70,7 +69,7 @@ func BenchmarkTestSelectorSpreadPriority(b *testing.B) { b.Errorf("error waiting for informer cache sync") } } - fh, _ := runtime.NewFramework(nil, nil, wait.NeverStop, runtime.WithSnapshotSharedLister(snapshot), runtime.WithInformerFactory(informerFactory)) + fh, _ := runtime.NewFramework(nil, nil, ctx.Done(), runtime.WithSnapshotSharedLister(snapshot), runtime.WithInformerFactory(informerFactory)) pl, err := New(nil, fh) if err != nil { b.Fatal(err) diff --git a/pkg/scheduler/framework/plugins/selectorspread/selector_spread_test.go b/pkg/scheduler/framework/plugins/selectorspread/selector_spread_test.go index 3572197ad73..0e2ad527e6c 100644 --- a/pkg/scheduler/framework/plugins/selectorspread/selector_spread_test.go +++ b/pkg/scheduler/framework/plugins/selectorspread/selector_spread_test.go @@ -403,7 +403,7 @@ func TestSelectorSpreadScore(t *testing.T) { if err != nil { t.Errorf("error creating informerFactory: %+v", err) } - fh, err := frameworkruntime.NewFramework(nil, nil, wait.NeverStop, frameworkruntime.WithSnapshotSharedLister(snapshot), frameworkruntime.WithInformerFactory(informerFactory)) + fh, err := frameworkruntime.NewFramework(nil, nil, ctx.Done(), frameworkruntime.WithSnapshotSharedLister(snapshot), frameworkruntime.WithInformerFactory(informerFactory)) if err != nil { t.Errorf("error creating new framework handle: %+v", err) } diff --git a/pkg/scheduler/framework/plugins/tainttoleration/taint_toleration_test.go b/pkg/scheduler/framework/plugins/tainttoleration/taint_toleration_test.go index 2e5e6d325f4..15889c40420 100644 --- a/pkg/scheduler/framework/plugins/tainttoleration/taint_toleration_test.go +++ b/pkg/scheduler/framework/plugins/tainttoleration/taint_toleration_test.go @@ -23,7 +23,6 @@ import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/util/wait" "k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework/runtime" "k8s.io/kubernetes/pkg/scheduler/internal/cache" @@ -229,26 +228,29 @@ func TestTaintTolerationScore(t *testing.T) { } for _, test := range tests { t.Run(test.name, func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + state := framework.NewCycleState() snapshot := cache.NewSnapshot(nil, test.nodes) - fh, _ := runtime.NewFramework(nil, nil, wait.NeverStop, runtime.WithSnapshotSharedLister(snapshot)) + fh, _ := runtime.NewFramework(nil, nil, ctx.Done(), runtime.WithSnapshotSharedLister(snapshot)) p, _ := New(nil, fh) - status := p.(framework.PreScorePlugin).PreScore(context.Background(), state, test.pod, test.nodes) + status := p.(framework.PreScorePlugin).PreScore(ctx, state, test.pod, test.nodes) if !status.IsSuccess() { t.Errorf("unexpected error: %v", status) } var gotList framework.NodeScoreList for _, n := range test.nodes { nodeName := n.ObjectMeta.Name - score, status := p.(framework.ScorePlugin).Score(context.Background(), state, test.pod, nodeName) + score, status := p.(framework.ScorePlugin).Score(ctx, state, test.pod, nodeName) if !status.IsSuccess() { t.Errorf("unexpected error: %v", status) } gotList = append(gotList, framework.NodeScore{Name: nodeName, Score: score}) } - status = p.(framework.ScorePlugin).ScoreExtensions().NormalizeScore(context.Background(), state, test.pod, gotList) + status = p.(framework.ScorePlugin).ScoreExtensions().NormalizeScore(ctx, state, test.pod, gotList) if !status.IsSuccess() { t.Errorf("unexpected error: %v", status) } diff --git a/pkg/scheduler/framework/preemption/preemption_test.go b/pkg/scheduler/framework/preemption/preemption_test.go index 14ecc9d8785..b6a4928b6b8 100644 --- a/pkg/scheduler/framework/preemption/preemption_test.go +++ b/pkg/scheduler/framework/preemption/preemption_test.go @@ -279,8 +279,10 @@ func TestDryRunPreemption(t *testing.T) { } informerFactory := informers.NewSharedInformerFactory(clientsetfake.NewSimpleClientset(objs...), 0) parallelism := parallelize.DefaultParallelism + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() fwk, err := st.NewFramework( - registeredPlugins, "", + registeredPlugins, "", ctx.Done(), frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())), frameworkruntime.WithInformerFactory(informerFactory), frameworkruntime.WithParallelism(parallelism), @@ -290,8 +292,6 @@ func TestDryRunPreemption(t *testing.T) { t.Fatal(err) } - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() informerFactory.Start(ctx.Done()) informerFactory.WaitForCacheSync(ctx.Done()) snapshot := internalcache.NewSnapshot(tt.initPods, tt.nodes) diff --git a/pkg/scheduler/framework/runtime/framework_test.go b/pkg/scheduler/framework/runtime/framework_test.go index a26afe96af1..c1f6f1d329c 100644 --- a/pkg/scheduler/framework/runtime/framework_test.go +++ b/pkg/scheduler/framework/runtime/framework_test.go @@ -378,7 +378,7 @@ var ( errInjectedFilterStatus = errors.New("injected filter status") ) -func newFrameworkWithQueueSortAndBind(r Registry, profile config.KubeSchedulerProfile, opts ...Option) (framework.Framework, error) { +func newFrameworkWithQueueSortAndBind(r Registry, profile config.KubeSchedulerProfile, stopCh <-chan struct{}, opts ...Option) (framework.Framework, error) { if _, ok := r[queueSortPlugin]; !ok { r[queueSortPlugin] = newQueueSortPlugin } @@ -392,7 +392,7 @@ func newFrameworkWithQueueSortAndBind(r Registry, profile config.KubeSchedulerPr if len(profile.Plugins.Bind.Enabled) == 0 { profile.Plugins.Bind.Enabled = append(profile.Plugins.Bind.Enabled, config.Plugin{Name: bindPlugin}) } - return NewFramework(r, &profile, wait.NeverStop, opts...) + return NewFramework(r, &profile, stopCh, opts...) } func TestInitFrameworkWithScorePlugins(t *testing.T) { @@ -433,7 +433,9 @@ func TestInitFrameworkWithScorePlugins(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { profile := config.KubeSchedulerProfile{Plugins: tt.plugins} - _, err := newFrameworkWithQueueSortAndBind(registry, profile) + stopCh := make(chan struct{}) + defer close(stopCh) + _, err := newFrameworkWithQueueSortAndBind(registry, profile, stopCh) if tt.initErr && err == nil { t.Fatal("Framework initialization should fail") } @@ -797,7 +799,9 @@ func TestNewFrameworkMultiPointExpansion(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - fw, err := NewFramework(registry, &config.KubeSchedulerProfile{Plugins: tc.plugins}, wait.NeverStop) + stopCh := make(chan struct{}) + defer close(stopCh) + fw, err := NewFramework(registry, &config.KubeSchedulerProfile{Plugins: tc.plugins}, stopCh) if err != nil { if tc.wantErr == "" || !strings.Contains(err.Error(), tc.wantErr) { t.Fatalf("Unexpected error, got %v, expect: %s", err, tc.wantErr) @@ -962,7 +966,9 @@ func TestNewFrameworkFillEventToPluginMap(t *testing.T) { got := make(map[framework.ClusterEvent]sets.String) profile := config.KubeSchedulerProfile{Plugins: cfgPls} - _, err := newFrameworkWithQueueSortAndBind(registry, profile, WithClusterEventMap(got)) + stopCh := make(chan struct{}) + defer close(stopCh) + _, err := newFrameworkWithQueueSortAndBind(registry, profile, stopCh, WithClusterEventMap(got)) if err != nil { t.Fatal(err) } @@ -1167,12 +1173,14 @@ func TestRunScorePlugins(t *testing.T) { Plugins: tt.plugins, PluginConfig: tt.pluginConfigs, } - f, err := newFrameworkWithQueueSortAndBind(registry, profile) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + f, err := newFrameworkWithQueueSortAndBind(registry, profile, ctx.Done()) if err != nil { t.Fatalf("Failed to create framework for testing: %v", err) } - res, status := f.RunScorePlugins(context.Background(), state, pod, nodes) + res, status := f.RunScorePlugins(ctx, state, pod, nodes) if tt.err { if status.IsSuccess() { @@ -1206,13 +1214,16 @@ func TestPreFilterPlugins(t *testing.T) { plugins := &config.Plugins{PreFilter: config.PluginSet{Enabled: []config.Plugin{{Name: preFilterWithExtensionsPluginName}, {Name: preFilterPluginName}}}} t.Run("TestPreFilterPlugin", func(t *testing.T) { profile := config.KubeSchedulerProfile{Plugins: plugins} - f, err := newFrameworkWithQueueSortAndBind(r, profile) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + f, err := newFrameworkWithQueueSortAndBind(r, profile, ctx.Done()) if err != nil { t.Fatalf("Failed to create framework for testing: %v", err) } - f.RunPreFilterPlugins(context.Background(), nil, nil) - f.RunPreFilterExtensionAddPod(context.Background(), nil, nil, nil, nil) - f.RunPreFilterExtensionRemovePod(context.Background(), nil, nil, nil, nil) + f.RunPreFilterPlugins(ctx, nil, nil) + f.RunPreFilterExtensionAddPod(ctx, nil, nil, nil, nil) + f.RunPreFilterExtensionRemovePod(ctx, nil, nil, nil, nil) if preFilter1.PreFilterCalled != 1 { t.Errorf("preFilter1 called %v, expected: 1", preFilter1.PreFilterCalled) @@ -1396,11 +1407,13 @@ func TestFilterPlugins(t *testing.T) { config.Plugin{Name: pl.name}) } profile := config.KubeSchedulerProfile{Plugins: cfgPls} - f, err := newFrameworkWithQueueSortAndBind(registry, profile) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + f, err := newFrameworkWithQueueSortAndBind(registry, profile, ctx.Done()) if err != nil { t.Fatalf("fail to create framework: %s", err) } - gotStatusMap := f.RunFilterPlugins(context.TODO(), nil, pod, nil) + gotStatusMap := f.RunFilterPlugins(ctx, nil, pod, nil) gotStatus := gotStatusMap.Merge() if !reflect.DeepEqual(gotStatus, tt.wantStatus) { t.Errorf("wrong status code. got: %v, want:%v", gotStatus, tt.wantStatus) @@ -1478,11 +1491,13 @@ func TestPostFilterPlugins(t *testing.T) { ) } profile := config.KubeSchedulerProfile{Plugins: cfgPls} - f, err := newFrameworkWithQueueSortAndBind(registry, profile) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + f, err := newFrameworkWithQueueSortAndBind(registry, profile, ctx.Done()) if err != nil { t.Fatalf("fail to create framework: %s", err) } - _, gotStatus := f.RunPostFilterPlugins(context.TODO(), nil, pod, nil) + _, gotStatus := f.RunPostFilterPlugins(ctx, nil, pod, nil) if !reflect.DeepEqual(gotStatus, tt.wantStatus) { t.Errorf("Unexpected status. got: %v, want: %v", gotStatus, tt.wantStatus) } @@ -1625,12 +1640,14 @@ func TestFilterPluginsWithNominatedPods(t *testing.T) { &framework.NominatingInfo{NominatingMode: framework.ModeOverride, NominatedNodeName: nodeName}) } profile := config.KubeSchedulerProfile{Plugins: cfgPls} - f, err := newFrameworkWithQueueSortAndBind(registry, profile, WithPodNominator(podNominator)) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + f, err := newFrameworkWithQueueSortAndBind(registry, profile, ctx.Done(), WithPodNominator(podNominator)) if err != nil { t.Fatalf("fail to create framework: %s", err) } tt.nodeInfo.SetNode(tt.node) - gotStatus := f.RunFilterPluginsWithNominatedPods(context.TODO(), nil, tt.pod, tt.nodeInfo) + gotStatus := f.RunFilterPluginsWithNominatedPods(ctx, nil, tt.pod, tt.nodeInfo) if !reflect.DeepEqual(gotStatus, tt.wantStatus) { t.Errorf("Unexpected status. got: %v, want: %v", gotStatus, tt.wantStatus) } @@ -1780,12 +1797,14 @@ func TestPreBindPlugins(t *testing.T) { ) } profile := config.KubeSchedulerProfile{Plugins: configPlugins} - f, err := newFrameworkWithQueueSortAndBind(registry, profile) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + f, err := newFrameworkWithQueueSortAndBind(registry, profile, ctx.Done()) if err != nil { t.Fatalf("fail to create framework: %s", err) } - status := f.RunPreBindPlugins(context.TODO(), nil, pod, "") + status := f.RunPreBindPlugins(ctx, nil, pod, "") if !reflect.DeepEqual(status, tt.wantStatus) { t.Errorf("wrong status code. got %v, want %v", status, tt.wantStatus) @@ -1936,12 +1955,14 @@ func TestReservePlugins(t *testing.T) { ) } profile := config.KubeSchedulerProfile{Plugins: configPlugins} - f, err := newFrameworkWithQueueSortAndBind(registry, profile) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + f, err := newFrameworkWithQueueSortAndBind(registry, profile, ctx.Done()) if err != nil { t.Fatalf("fail to create framework: %s", err) } - status := f.RunReservePluginsReserve(context.TODO(), nil, pod, "") + status := f.RunReservePluginsReserve(ctx, nil, pod, "") if !reflect.DeepEqual(status, tt.wantStatus) { t.Errorf("wrong status code. got %v, want %v", status, tt.wantStatus) @@ -2060,12 +2081,14 @@ func TestPermitPlugins(t *testing.T) { ) } profile := config.KubeSchedulerProfile{Plugins: configPlugins} - f, err := newFrameworkWithQueueSortAndBind(registry, profile) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + f, err := newFrameworkWithQueueSortAndBind(registry, profile, ctx.Done()) if err != nil { t.Fatalf("fail to create framework: %s", err) } - status := f.RunPermitPlugins(context.TODO(), nil, pod, "") + status := f.RunPermitPlugins(ctx, nil, pod, "") if !reflect.DeepEqual(status, tt.want) { t.Errorf("wrong status code. got %v, want %v", status, tt.want) @@ -2236,8 +2259,9 @@ func TestRecordingMetrics(t *testing.T) { SchedulerName: testProfileName, Plugins: plugins, } - f, err := newFrameworkWithQueueSortAndBind(r, profile, withMetricsRecorder(recorder)) + f, err := newFrameworkWithQueueSortAndBind(r, profile, stopCh, withMetricsRecorder(recorder)) if err != nil { + close(stopCh) t.Fatalf("Failed to create framework for testing: %v", err) } @@ -2346,8 +2370,9 @@ func TestRunBindPlugins(t *testing.T) { SchedulerName: testProfileName, Plugins: plugins, } - fwk, err := newFrameworkWithQueueSortAndBind(r, profile, withMetricsRecorder(recorder)) + fwk, err := newFrameworkWithQueueSortAndBind(r, profile, stopCh, withMetricsRecorder(recorder)) if err != nil { + close(stopCh) t.Fatal(err) } @@ -2400,13 +2425,15 @@ func TestPermitWaitDurationMetric(t *testing.T) { Permit: config.PluginSet{Enabled: []config.Plugin{{Name: testPlugin, Weight: 1}}}, } profile := config.KubeSchedulerProfile{Plugins: plugins} - f, err := newFrameworkWithQueueSortAndBind(r, profile) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + f, err := newFrameworkWithQueueSortAndBind(r, profile, ctx.Done()) if err != nil { t.Fatalf("Failed to create framework for testing: %v", err) } - f.RunPermitPlugins(context.TODO(), nil, pod, "") - f.WaitOnPermit(context.TODO(), pod) + f.RunPermitPlugins(ctx, nil, pod, "") + f.WaitOnPermit(ctx, pod) collectAndComparePermitWaitDuration(t, tt.wantRes) }) @@ -2454,12 +2481,14 @@ func TestWaitOnPermit(t *testing.T) { Permit: config.PluginSet{Enabled: []config.Plugin{{Name: permitPlugin, Weight: 1}}}, } profile := config.KubeSchedulerProfile{Plugins: plugins} - f, err := newFrameworkWithQueueSortAndBind(r, profile) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + f, err := newFrameworkWithQueueSortAndBind(r, profile, ctx.Done()) if err != nil { t.Fatalf("Failed to create framework for testing: %v", err) } - runPermitPluginsStatus := f.RunPermitPlugins(context.Background(), nil, pod, "") + runPermitPluginsStatus := f.RunPermitPlugins(ctx, nil, pod, "") if runPermitPluginsStatus.Code() != framework.Wait { t.Fatalf("Expected RunPermitPlugins to return status %v, but got %v", framework.Wait, runPermitPluginsStatus.Code()) @@ -2467,7 +2496,7 @@ func TestWaitOnPermit(t *testing.T) { go tt.action(f) - got := f.WaitOnPermit(context.Background(), pod) + got := f.WaitOnPermit(ctx, pod) if !reflect.DeepEqual(tt.want, got) { t.Errorf("Unexpected status: want %v, but got %v", tt.want, got) } @@ -2505,7 +2534,9 @@ func TestListPlugins(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { profile := config.KubeSchedulerProfile{Plugins: tt.plugins} - f, err := newFrameworkWithQueueSortAndBind(registry, profile) + stopCh := make(chan struct{}) + defer close(stopCh) + f, err := newFrameworkWithQueueSortAndBind(registry, profile, stopCh) if err != nil { t.Fatalf("Failed to create framework for testing: %v", err) } diff --git a/pkg/scheduler/schedule_one_test.go b/pkg/scheduler/schedule_one_test.go index bd58ae7a0e6..7807430e0a2 100644 --- a/pkg/scheduler/schedule_one_test.go +++ b/pkg/scheduler/schedule_one_test.go @@ -573,17 +573,17 @@ func TestSchedulerScheduleOne(t *testing.T) { st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), ) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() fwk, err := st.NewFramework(registerPluginFuncs, testSchedulerName, + ctx.Done(), frameworkruntime.WithClientSet(client), frameworkruntime.WithEventRecorder(eventBroadcaster.NewRecorder(scheme.Scheme, testSchedulerName))) if err != nil { t.Fatal(err) } - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - s := newScheduler( cache, nil, @@ -1046,22 +1046,22 @@ func TestSchedulerBinding(t *testing.T) { } return false, nil, nil }) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() fwk, err := st.NewFramework([]st.RegisterPluginFunc{ st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), - }, "", frameworkruntime.WithClientSet(client), frameworkruntime.WithEventRecorder(&events.FakeRecorder{})) + }, "", ctx.Done(), frameworkruntime.WithClientSet(client), frameworkruntime.WithEventRecorder(&events.FakeRecorder{})) if err != nil { t.Fatal(err) } - stop := make(chan struct{}) - defer close(stop) sched := &Scheduler{ Extenders: test.extenders, - Cache: internalcache.New(100*time.Millisecond, stop), + Cache: internalcache.New(100*time.Millisecond, ctx.Done()), nodeInfoSnapshot: nil, percentageOfNodesToScore: 0, } - err = sched.bind(context.Background(), fwk, pod, "node", nil) + err = sched.bind(ctx, fwk, pod, "node", nil) if err != nil { t.Error(err) } @@ -2012,7 +2012,7 @@ func TestSchedulerSchedulePod(t *testing.T) { } snapshot := internalcache.NewSnapshot(test.pods, nodes) fwk, err := st.NewFramework( - test.registerPlugins, "", + test.registerPlugins, "", ctx.Done(), frameworkruntime.WithSnapshotSharedLister(snapshot), frameworkruntime.WithInformerFactory(informerFactory), frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())), @@ -2063,6 +2063,9 @@ func TestSchedulerSchedulePod(t *testing.T) { func TestFindFitAllError(t *testing.T) { nodes := makeNodeList([]string{"3", "2", "1"}) scheduler := makeScheduler(nodes) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + fwk, err := st.NewFramework( []st.RegisterPluginFunc{ st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), @@ -2071,13 +2074,14 @@ func TestFindFitAllError(t *testing.T) { st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), }, "", + ctx.Done(), frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(nil)), ) if err != nil { t.Fatal(err) } - _, diagnosis, err := scheduler.findNodesThatFitPod(context.Background(), fwk, framework.NewCycleState(), &v1.Pod{}) + _, diagnosis, err := scheduler.findNodesThatFitPod(ctx, fwk, framework.NewCycleState(), &v1.Pod{}) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -2098,6 +2102,9 @@ func TestFindFitAllError(t *testing.T) { func TestFindFitSomeError(t *testing.T) { nodes := makeNodeList([]string{"3", "2", "1"}) scheduler := makeScheduler(nodes) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + fwk, err := st.NewFramework( []st.RegisterPluginFunc{ st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), @@ -2106,6 +2113,7 @@ func TestFindFitSomeError(t *testing.T) { st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), }, "", + ctx.Done(), frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(nil)), ) if err != nil { @@ -2113,7 +2121,7 @@ func TestFindFitSomeError(t *testing.T) { } pod := st.MakePod().Name("1").UID("1").Obj() - _, diagnosis, err := scheduler.findNodesThatFitPod(context.Background(), fwk, framework.NewCycleState(), pod) + _, diagnosis, err := scheduler.findNodesThatFitPod(ctx, fwk, framework.NewCycleState(), pod) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -2177,8 +2185,10 @@ func TestFindFitPredicateCallCounts(t *testing.T) { registerFakeFilterFunc, st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), } + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() fwk, err := st.NewFramework( - registerPlugins, "", + registerPlugins, "", ctx.Done(), frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(nil)), ) if err != nil { @@ -2192,7 +2202,7 @@ func TestFindFitPredicateCallCounts(t *testing.T) { fwk.AddNominatedPod(framework.NewPodInfo(st.MakePod().UID("nominated").Priority(midPriority).Obj()), &framework.NominatingInfo{NominatingMode: framework.ModeOverride, NominatedNodeName: "1"}) - _, _, err = scheduler.findNodesThatFitPod(context.Background(), fwk, framework.NewCycleState(), test.pod) + _, _, err = scheduler.findNodesThatFitPod(ctx, fwk, framework.NewCycleState(), test.pod) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -2311,8 +2321,10 @@ func TestZeroRequest(t *testing.T) { st.RegisterPreScorePlugin(selectorspread.Name, selectorspread.New), st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), } + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() fwk, err := st.NewFramework( - pluginRegistrations, "", + pluginRegistrations, "", ctx.Done(), frameworkruntime.WithInformerFactory(informerFactory), frameworkruntime.WithSnapshotSharedLister(snapshot), frameworkruntime.WithClientSet(client), @@ -2333,7 +2345,6 @@ func TestZeroRequest(t *testing.T) { snapshot, schedulerapi.DefaultPercentageOfNodesToScore) - ctx := context.Background() state := framework.NewCycleState() _, _, err = scheduler.findNodesThatFitPod(ctx, fwk, state, test.pod) if err != nil { @@ -2416,6 +2427,8 @@ func TestFairEvaluationForNodes(t *testing.T) { } nodes := makeNodeList(nodeNames) sched := makeScheduler(nodes) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() fwk, err := st.NewFramework( []st.RegisterPluginFunc{ st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), @@ -2423,6 +2436,7 @@ func TestFairEvaluationForNodes(t *testing.T) { st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), }, "", + ctx.Done(), frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(nil)), ) if err != nil { @@ -2435,7 +2449,7 @@ func TestFairEvaluationForNodes(t *testing.T) { // Iterating over all nodes more than twice for i := 0; i < 2*(numAllNodes/nodesToFind+1); i++ { - nodesThatFit, _, err := sched.findNodesThatFitPod(context.Background(), fwk, framework.NewCycleState(), &v1.Pod{}) + nodesThatFit, _, err := sched.findNodesThatFitPod(ctx, fwk, framework.NewCycleState(), &v1.Pod{}) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -2496,8 +2510,10 @@ func TestPreferNominatedNodeFilterCallCounts(t *testing.T) { registerFakeFilterFunc, st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), } + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() fwk, err := st.NewFramework( - registerPlugins, "", + registerPlugins, "", ctx.Done(), frameworkruntime.WithClientSet(client), frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())), ) @@ -2516,7 +2532,7 @@ func TestPreferNominatedNodeFilterCallCounts(t *testing.T) { snapshot, schedulerapi.DefaultPercentageOfNodesToScore) - _, _, err = scheduler.findNodesThatFitPod(context.Background(), fwk, framework.NewCycleState(), test.pod) + _, _, err = scheduler.findNodesThatFitPod(ctx, fwk, framework.NewCycleState(), test.pod) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -2657,6 +2673,7 @@ func setupTestScheduler(ctx context.Context, queuedPodStore *clientcache.FIFO, c fwk, _ := st.NewFramework( fns, testSchedulerName, + ctx.Done(), frameworkruntime.WithClientSet(client), frameworkruntime.WithEventRecorder(recorder), frameworkruntime.WithInformerFactory(informerFactory), diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index 839175488b8..8b1244d61e5 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -451,6 +451,7 @@ func initScheduler(stop <-chan struct{}, cache internalcache.Cache, queue intern eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1()}) fwk, err := st.NewFramework(registerPluginFuncs, testSchedulerName, + stop, frameworkruntime.WithClientSet(client), frameworkruntime.WithInformerFactory(informerFactory), frameworkruntime.WithEventRecorder(eventBroadcaster.NewRecorder(scheme.Scheme, testSchedulerName)), @@ -552,7 +553,9 @@ func TestInitPluginsWithIndexers(t *testing.T) { st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), ) - _, err := st.NewFramework(registerPluginFuncs, "test", frameworkruntime.WithInformerFactory(fakeInformerFactory)) + stopCh := make(chan struct{}) + defer close(stopCh) + _, err := st.NewFramework(registerPluginFuncs, "test", stopCh, frameworkruntime.WithInformerFactory(fakeInformerFactory)) if len(tt.wantErr) > 0 { if err == nil || !strings.Contains(err.Error(), tt.wantErr) { diff --git a/pkg/scheduler/testing/framework_helpers.go b/pkg/scheduler/testing/framework_helpers.go index 66a6eb56167..c80560e9108 100644 --- a/pkg/scheduler/testing/framework_helpers.go +++ b/pkg/scheduler/testing/framework_helpers.go @@ -18,7 +18,6 @@ package testing import ( "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/apimachinery/pkg/util/wait" "k8s.io/kube-scheduler/config/v1beta2" schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/apis/config/scheme" @@ -29,7 +28,7 @@ import ( var configDecoder = scheme.Codecs.UniversalDecoder() // NewFramework creates a Framework from the register functions and options. -func NewFramework(fns []RegisterPluginFunc, profileName string, opts ...runtime.Option) (framework.Framework, error) { +func NewFramework(fns []RegisterPluginFunc, profileName string, stopCh <-chan struct{}, opts ...runtime.Option) (framework.Framework, error) { registry := runtime.Registry{} profile := &schedulerapi.KubeSchedulerProfile{ SchedulerName: profileName, @@ -38,7 +37,7 @@ func NewFramework(fns []RegisterPluginFunc, profileName string, opts ...runtime. for _, f := range fns { f(®istry, profile) } - return runtime.NewFramework(registry, profile, wait.NeverStop, opts...) + return runtime.NewFramework(registry, profile, stopCh, opts...) } // RegisterPluginFunc is a function signature used in method RegisterFilterPlugin()