From 1c05cf1d5166798f11be1b10b2115d562ee93dea Mon Sep 17 00:00:00 2001 From: Mengjiao Liu Date: Mon, 15 May 2023 18:36:17 +0800 Subject: [PATCH] kube-scheduler: NewFramework function to pass the context parameter Co-authored-by: Aldo Culquicondor <1299064+alculquicondor@users.noreply.github.com> --- cmd/kube-scheduler/app/server.go | 4 +- pkg/scheduler/extender_test.go | 3 +- .../framework_contract_test.go | 4 +- .../defaultbinder/default_binder_test.go | 6 +- .../default_preemption_test.go | 17 ++-- .../dynamicresources/dynamicresources_test.go | 2 +- .../imagelocality/image_locality_test.go | 6 +- .../nodeaffinity/node_affinity_test.go | 6 +- .../noderesources/balanced_allocation_test.go | 6 +- .../plugins/noderesources/fit_test.go | 14 ++- .../noderesources/least_allocated_test.go | 6 +- .../noderesources/most_allocated_test.go | 6 +- .../requested_to_capacity_ratio_test.go | 13 ++- .../podtopologyspread/filtering_test.go | 15 +-- .../plugins/podtopologyspread/scoring_test.go | 16 ++-- .../selector_spread_perf_test.go | 6 +- .../selectorspread/selector_spread_test.go | 12 ++- .../tainttoleration/taint_toleration_test.go | 6 +- .../framework/plugins/testing/testing.go | 6 +- .../volumebinding/volume_binding_test.go | 7 +- .../framework/preemption/preemption_test.go | 6 +- pkg/scheduler/framework/runtime/framework.go | 6 +- .../framework/runtime/framework_test.go | 92 ++++++++++--------- pkg/scheduler/profile/profile.go | 13 +-- pkg/scheduler/profile/profile_test.go | 8 +- pkg/scheduler/schedule_one_test.go | 39 ++++---- pkg/scheduler/scheduler.go | 13 +-- pkg/scheduler/scheduler_test.go | 36 ++++---- pkg/scheduler/testing/framework_helpers.go | 6 +- test/integration/daemonset/daemonset_test.go | 2 +- test/integration/util/util.go | 4 +- 31 files changed, 222 insertions(+), 164 deletions(-) diff --git a/cmd/kube-scheduler/app/server.go b/cmd/kube-scheduler/app/server.go index 9991611e372..7fcae39f817 100644 --- a/cmd/kube-scheduler/app/server.go +++ b/cmd/kube-scheduler/app/server.go @@ -345,11 +345,11 @@ func Setup(ctx context.Context, opts *options.Options, outOfTreeRegistryOptions recorderFactory := getRecorderFactory(&cc) completedProfiles := make([]kubeschedulerconfig.KubeSchedulerProfile, 0) // Create the scheduler. - sched, err := scheduler.New(cc.Client, + sched, err := scheduler.New(ctx, + cc.Client, cc.InformerFactory, cc.DynInformerFactory, recorderFactory, - ctx.Done(), scheduler.WithComponentConfigVersion(cc.ComponentConfig.TypeMeta.APIVersion), scheduler.WithKubeConfig(cc.KubeConfig), scheduler.WithProfiles(cc.ComponentConfig.Profiles...), diff --git a/pkg/scheduler/extender_test.go b/pkg/scheduler/extender_test.go index 97f148f52db..ff70e348d1d 100644 --- a/pkg/scheduler/extender_test.go +++ b/pkg/scheduler/extender_test.go @@ -288,7 +288,8 @@ func TestSchedulerWithExtenders(t *testing.T) { cache.AddNode(createNode(name)) } fwk, err := st.NewFramework( - test.registerPlugins, "", ctx.Done(), + ctx, + test.registerPlugins, "", runtime.WithClientSet(client), runtime.WithInformerFactory(informerFactory), runtime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())), diff --git a/pkg/scheduler/framework/autoscaler_contract/framework_contract_test.go b/pkg/scheduler/framework/autoscaler_contract/framework_contract_test.go index 344ae28cea5..13feb969cad 100644 --- a/pkg/scheduler/framework/autoscaler_contract/framework_contract_test.go +++ b/pkg/scheduler/framework/autoscaler_contract/framework_contract_test.go @@ -26,6 +26,7 @@ import ( "github.com/stretchr/testify/assert" v1 "k8s.io/api/core/v1" + "k8s.io/klog/v2/ktesting" "k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework/runtime" ) @@ -42,8 +43,9 @@ func TestFrameworkContract(t *testing.T) { } func TestNewFramework(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) var f interface{} - if f, _ = runtime.NewFramework(nil, nil, nil); f != nil { + if f, _ = runtime.NewFramework(ctx, nil, nil); f != nil { _, ok := f.(framework.Framework) assert.True(t, ok) } diff --git a/pkg/scheduler/framework/plugins/defaultbinder/default_binder_test.go b/pkg/scheduler/framework/plugins/defaultbinder/default_binder_test.go index 3ce1f83c157..5c63748eaaf 100644 --- a/pkg/scheduler/framework/plugins/defaultbinder/default_binder_test.go +++ b/pkg/scheduler/framework/plugins/defaultbinder/default_binder_test.go @@ -27,6 +27,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/kubernetes/fake" clienttesting "k8s.io/client-go/testing" + "k8s.io/klog/v2/ktesting" frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime" st "k8s.io/kubernetes/pkg/scheduler/testing" ) @@ -52,7 +53,8 @@ func TestDefaultBinder(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) defer cancel() var gotBinding *v1.Binding @@ -68,7 +70,7 @@ func TestDefaultBinder(t *testing.T) { return true, gotBinding, nil }) - fh, err := frameworkruntime.NewFramework(nil, nil, ctx.Done(), frameworkruntime.WithClientSet(client)) + fh, err := frameworkruntime.NewFramework(ctx, nil, nil, frameworkruntime.WithClientSet(client)) if err != nil { t.Fatal(err) } diff --git a/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption_test.go b/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption_test.go index 496f5bd4a64..aa746c30673 100644 --- a/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption_test.go +++ b/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption_test.go @@ -355,7 +355,7 @@ func TestPostFilter(t *testing.T) { logger, ctx := ktesting.NewTestContext(t) ctx, cancel := context.WithCancel(ctx) defer cancel() - f, err := st.NewFramework(registeredPlugins, "", ctx.Done(), + f, err := st.NewFramework(ctx, registeredPlugins, "", frameworkruntime.WithClientSet(cs), frameworkruntime.WithEventRecorder(&events.FakeRecorder{}), frameworkruntime.WithInformerFactory(informerFactory), @@ -1093,7 +1093,8 @@ func TestDryRunPreemption(t *testing.T) { ctx, cancel := context.WithCancel(ctx) defer cancel() fwk, err := st.NewFramework( - registeredPlugins, "", ctx.Done(), + ctx, + registeredPlugins, "", frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())), frameworkruntime.WithSnapshotSharedLister(snapshot), frameworkruntime.WithInformerFactory(informerFactory), @@ -1346,13 +1347,13 @@ func TestSelectBestCandidate(t *testing.T) { ctx, cancel := context.WithCancel(ctx) defer cancel() fwk, err := st.NewFramework( + ctx, []st.RegisterPluginFunc{ tt.registerPlugin, st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), }, "", - ctx.Done(), frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())), frameworkruntime.WithSnapshotSharedLister(snapshot), frameworkruntime.WithLogger(logger), @@ -1485,7 +1486,9 @@ func TestPodEligibleToPreemptOthers(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - logger, _ := ktesting.NewTestContext(t) + logger, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + defer cancel() var nodes []*v1.Node for _, n := range test.nodes { nodes = append(nodes, st.MakeNode().Name(n).Obj()) @@ -1494,9 +1497,7 @@ func TestPodEligibleToPreemptOthers(t *testing.T) { st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), } - stopCh := make(chan struct{}) - defer close(stopCh) - f, err := st.NewFramework(registeredPlugins, "", stopCh, + f, err := st.NewFramework(ctx, registeredPlugins, "", frameworkruntime.WithSnapshotSharedLister(internalcache.NewSnapshot(test.pods, nodes)), frameworkruntime.WithLogger(logger), ) @@ -1730,13 +1731,13 @@ func TestPreempt(t *testing.T) { extenders = append(extenders, extender) } fwk, err := st.NewFramework( + ctx, []st.RegisterPluginFunc{ test.registerPlugin, st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), }, "", - ctx.Done(), frameworkruntime.WithClientSet(client), frameworkruntime.WithEventRecorder(&events.FakeRecorder{}), frameworkruntime.WithExtenders(extenders), diff --git a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources_test.go b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources_test.go index 5d09769aa39..2382ea0894d 100644 --- a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources_test.go +++ b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources_test.go @@ -680,7 +680,7 @@ func setup(t *testing.T, nodes []*v1.Node, claims []*resourcev1alpha2.ResourceCl runtime.WithClientSet(tc.client), runtime.WithInformerFactory(informerFactory), } - fh, err := runtime.NewFramework(nil, nil, tc.ctx.Done(), opts...) + fh, err := runtime.NewFramework(ctx, nil, nil, opts...) if err != nil { t.Fatal(err) } diff --git a/pkg/scheduler/framework/plugins/imagelocality/image_locality_test.go b/pkg/scheduler/framework/plugins/imagelocality/image_locality_test.go index 47edef3477e..26ee547f595 100644 --- a/pkg/scheduler/framework/plugins/imagelocality/image_locality_test.go +++ b/pkg/scheduler/framework/plugins/imagelocality/image_locality_test.go @@ -25,6 +25,7 @@ import ( "github.com/google/go-cmp/cmp" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/klog/v2/ktesting" "k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework/runtime" "k8s.io/kubernetes/pkg/scheduler/internal/cache" @@ -331,12 +332,13 @@ func TestImageLocalityPriority(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) defer cancel() snapshot := cache.NewSnapshot(nil, test.nodes) state := framework.NewCycleState() - fh, _ := runtime.NewFramework(nil, nil, ctx.Done(), runtime.WithSnapshotSharedLister(snapshot)) + fh, _ := runtime.NewFramework(ctx, nil, nil, runtime.WithSnapshotSharedLister(snapshot)) p, _ := New(nil, fh) var gotList framework.NodeScoreList diff --git a/pkg/scheduler/framework/plugins/nodeaffinity/node_affinity_test.go b/pkg/scheduler/framework/plugins/nodeaffinity/node_affinity_test.go index d5b0350c53c..69778c27a09 100644 --- a/pkg/scheduler/framework/plugins/nodeaffinity/node_affinity_test.go +++ b/pkg/scheduler/framework/plugins/nodeaffinity/node_affinity_test.go @@ -24,6 +24,7 @@ import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/klog/v2/ktesting" "k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework/runtime" @@ -1134,11 +1135,12 @@ func TestNodeAffinityPriority(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) defer cancel() state := framework.NewCycleState() - fh, _ := runtime.NewFramework(nil, nil, ctx.Done(), runtime.WithSnapshotSharedLister(cache.NewSnapshot(nil, test.nodes))) + fh, _ := runtime.NewFramework(ctx, nil, nil, runtime.WithSnapshotSharedLister(cache.NewSnapshot(nil, test.nodes))) p, err := New(&test.args, fh) if err != nil { t.Fatalf("Creating plugin: %v", err) diff --git a/pkg/scheduler/framework/plugins/noderesources/balanced_allocation_test.go b/pkg/scheduler/framework/plugins/noderesources/balanced_allocation_test.go index 9c1b0a9d384..f461a3d6d55 100644 --- a/pkg/scheduler/framework/plugins/noderesources/balanced_allocation_test.go +++ b/pkg/scheduler/framework/plugins/noderesources/balanced_allocation_test.go @@ -24,6 +24,7 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/klog/v2/ktesting" "k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature" @@ -384,9 +385,10 @@ func TestNodeResourcesBalancedAllocation(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { snapshot := cache.NewSnapshot(test.pods, test.nodes) - ctx, cancel := context.WithCancel(context.Background()) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) defer cancel() - fh, _ := runtime.NewFramework(nil, nil, ctx.Done(), runtime.WithSnapshotSharedLister(snapshot)) + fh, _ := runtime.NewFramework(ctx, nil, nil, runtime.WithSnapshotSharedLister(snapshot)) p, _ := NewBalancedAllocation(&test.args, fh, feature.Features{}) state := framework.NewCycleState() for i := range test.nodes { diff --git a/pkg/scheduler/framework/plugins/noderesources/fit_test.go b/pkg/scheduler/framework/plugins/noderesources/fit_test.go index c5982d6fd50..2ee4214683c 100644 --- a/pkg/scheduler/framework/plugins/noderesources/fit_test.go +++ b/pkg/scheduler/framework/plugins/noderesources/fit_test.go @@ -25,6 +25,7 @@ import ( "github.com/google/go-cmp/cmp" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/klog/v2/ktesting" "k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/framework" plfeature "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature" @@ -836,12 +837,13 @@ func TestFitScore(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) defer cancel() state := framework.NewCycleState() snapshot := cache.NewSnapshot(test.existingPods, test.nodes) - fh, _ := runtime.NewFramework(nil, nil, ctx.Done(), runtime.WithSnapshotSharedLister(snapshot)) + fh, _ := runtime.NewFramework(ctx, nil, nil, runtime.WithSnapshotSharedLister(snapshot)) args := test.nodeResourcesFitArgs p, err := NewFit(&args, fh, plfeature.Features{}) if err != nil { @@ -958,6 +960,9 @@ func BenchmarkTestFitScore(b *testing.B) { for _, test := range tests { b.Run(test.name, func(b *testing.B) { + _, ctx := ktesting.NewTestContext(b) + ctx, cancel := context.WithCancel(ctx) + defer cancel() existingPods := []*v1.Pod{ st.MakePod().Node("node1").Req(map[v1.ResourceName]string{"cpu": "2000", "memory": "4000"}).Obj(), } @@ -966,15 +971,14 @@ func BenchmarkTestFitScore(b *testing.B) { } state := framework.NewCycleState() var nodeResourcesFunc = runtime.FactoryAdapter(plfeature.Features{}, NewFit) - pl := plugintesting.SetupPlugin(b, nodeResourcesFunc, &test.nodeResourcesFitArgs, cache.NewSnapshot(existingPods, nodes)) + pl := plugintesting.SetupPlugin(ctx, b, nodeResourcesFunc, &test.nodeResourcesFitArgs, cache.NewSnapshot(existingPods, nodes)) p := pl.(*Fit) b.ResetTimer() requestedPod := st.MakePod().Req(map[v1.ResourceName]string{"cpu": "1000", "memory": "2000"}).Obj() for i := 0; i < b.N; i++ { - - _, status := p.Score(context.Background(), state, requestedPod, nodes[0].Name) + _, status := p.Score(ctx, state, requestedPod, nodes[0].Name) if !status.IsSuccess() { b.Errorf("unexpected status: %v", status) } diff --git a/pkg/scheduler/framework/plugins/noderesources/least_allocated_test.go b/pkg/scheduler/framework/plugins/noderesources/least_allocated_test.go index 9c8558d01e5..3c1c1ffc0a3 100644 --- a/pkg/scheduler/framework/plugins/noderesources/least_allocated_test.go +++ b/pkg/scheduler/framework/plugins/noderesources/least_allocated_test.go @@ -23,6 +23,7 @@ import ( "github.com/google/go-cmp/cmp" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/validation/field" + "k8s.io/klog/v2/ktesting" "k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/framework" plfeature "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature" @@ -385,12 +386,13 @@ func TestLeastAllocatedScoringStrategy(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) defer cancel() state := framework.NewCycleState() snapshot := cache.NewSnapshot(test.existingPods, test.nodes) - fh, _ := runtime.NewFramework(nil, nil, ctx.Done(), runtime.WithSnapshotSharedLister(snapshot)) + fh, _ := runtime.NewFramework(ctx, nil, nil, runtime.WithSnapshotSharedLister(snapshot)) p, err := NewFit( &config.NodeResourcesFitArgs{ diff --git a/pkg/scheduler/framework/plugins/noderesources/most_allocated_test.go b/pkg/scheduler/framework/plugins/noderesources/most_allocated_test.go index 1b9d3b784e2..03e56848066 100644 --- a/pkg/scheduler/framework/plugins/noderesources/most_allocated_test.go +++ b/pkg/scheduler/framework/plugins/noderesources/most_allocated_test.go @@ -23,6 +23,7 @@ import ( "github.com/google/go-cmp/cmp" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/validation/field" + "k8s.io/klog/v2/ktesting" "k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/framework" plfeature "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature" @@ -342,12 +343,13 @@ func TestMostAllocatedScoringStrategy(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) defer cancel() state := framework.NewCycleState() snapshot := cache.NewSnapshot(test.existingPods, test.nodes) - fh, _ := runtime.NewFramework(nil, nil, ctx.Done(), runtime.WithSnapshotSharedLister(snapshot)) + fh, _ := runtime.NewFramework(ctx, nil, nil, runtime.WithSnapshotSharedLister(snapshot)) p, err := NewFit( &config.NodeResourcesFitArgs{ diff --git a/pkg/scheduler/framework/plugins/noderesources/requested_to_capacity_ratio_test.go b/pkg/scheduler/framework/plugins/noderesources/requested_to_capacity_ratio_test.go index 2a84d8a91a5..3d32999671f 100644 --- a/pkg/scheduler/framework/plugins/noderesources/requested_to_capacity_ratio_test.go +++ b/pkg/scheduler/framework/plugins/noderesources/requested_to_capacity_ratio_test.go @@ -25,7 +25,7 @@ import ( "github.com/stretchr/testify/assert" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/validation/field" - "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/klog/v2/ktesting" "k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/framework" plfeature "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature" @@ -103,12 +103,13 @@ func TestRequestedToCapacityRatioScoringStrategy(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) defer cancel() state := framework.NewCycleState() snapshot := cache.NewSnapshot(test.existingPods, test.nodes) - fh, _ := runtime.NewFramework(nil, nil, ctx.Done(), runtime.WithSnapshotSharedLister(snapshot)) + fh, _ := runtime.NewFramework(ctx, nil, nil, runtime.WithSnapshotSharedLister(snapshot)) p, err := NewFit(&config.NodeResourcesFitArgs{ ScoringStrategy: &config.ScoringStrategy{ @@ -303,7 +304,8 @@ func TestResourceBinPackingSingleExtended(t *testing.T) { t.Run(test.name, func(t *testing.T) { state := framework.NewCycleState() snapshot := cache.NewSnapshot(test.pods, test.nodes) - fh, _ := runtime.NewFramework(nil, nil, wait.NeverStop, runtime.WithSnapshotSharedLister(snapshot)) + _, ctx := ktesting.NewTestContext(t) + fh, _ := runtime.NewFramework(ctx, nil, nil, runtime.WithSnapshotSharedLister(snapshot)) args := config.NodeResourcesFitArgs{ ScoringStrategy: &config.ScoringStrategy{ Type: config.RequestedToCapacityRatio, @@ -527,7 +529,8 @@ func TestResourceBinPackingMultipleExtended(t *testing.T) { t.Run(test.name, func(t *testing.T) { state := framework.NewCycleState() snapshot := cache.NewSnapshot(test.pods, test.nodes) - fh, _ := runtime.NewFramework(nil, nil, wait.NeverStop, runtime.WithSnapshotSharedLister(snapshot)) + _, ctx := ktesting.NewTestContext(t) + fh, _ := runtime.NewFramework(ctx, nil, nil, runtime.WithSnapshotSharedLister(snapshot)) args := config.NodeResourcesFitArgs{ ScoringStrategy: &config.ScoringStrategy{ diff --git a/pkg/scheduler/framework/plugins/podtopologyspread/filtering_test.go b/pkg/scheduler/framework/plugins/podtopologyspread/filtering_test.go index e5a7db05b61..b822b400b03 100644 --- a/pkg/scheduler/framework/plugins/podtopologyspread/filtering_test.go +++ b/pkg/scheduler/framework/plugins/podtopologyspread/filtering_test.go @@ -1986,7 +1986,7 @@ func TestPreFilterStateAddPod(t *testing.T) { t.Run(tt.name, func(t *testing.T) { _, ctx := ktesting.NewTestContext(t) snapshot := cache.NewSnapshot(tt.existingPods, tt.nodes) - pl := plugintesting.SetupPlugin(t, topologySpreadFunc, &config.PodTopologySpreadArgs{DefaultingType: config.ListDefaulting}, snapshot) + pl := plugintesting.SetupPlugin(ctx, t, topologySpreadFunc, &config.PodTopologySpreadArgs{DefaultingType: config.ListDefaulting}, snapshot) p := pl.(*PodTopologySpread) p.enableNodeInclusionPolicyInPodTopologySpread = tt.enableNodeInclusionPolicy @@ -2302,7 +2302,7 @@ func TestPreFilterStateRemovePod(t *testing.T) { t.Run(tt.name, func(t *testing.T) { _, ctx := ktesting.NewTestContext(t) snapshot := cache.NewSnapshot(tt.existingPods, tt.nodes) - pl := plugintesting.SetupPlugin(t, topologySpreadFunc, &config.PodTopologySpreadArgs{DefaultingType: config.ListDefaulting}, snapshot) + pl := plugintesting.SetupPlugin(ctx, t, topologySpreadFunc, &config.PodTopologySpreadArgs{DefaultingType: config.ListDefaulting}, snapshot) p := pl.(*PodTopologySpread) p.enableNodeInclusionPolicyInPodTopologySpread = tt.enableNodeInclusionPolicy @@ -2376,8 +2376,8 @@ func BenchmarkFilter(b *testing.B) { var state *framework.CycleState b.Run(tt.name, func(b *testing.B) { existingPods, allNodes, _ := st.MakeNodesAndPodsForEvenPodsSpread(tt.pod.Labels, tt.existingPodsNum, tt.allNodesNum, tt.filteredNodesNum) - ctx := context.Background() - pl := plugintesting.SetupPlugin(b, topologySpreadFunc, &config.PodTopologySpreadArgs{DefaultingType: config.ListDefaulting}, cache.NewSnapshot(existingPods, allNodes)) + _, ctx := ktesting.NewTestContext(b) + pl := plugintesting.SetupPlugin(ctx, b, topologySpreadFunc, &config.PodTopologySpreadArgs{DefaultingType: config.ListDefaulting}, cache.NewSnapshot(existingPods, allNodes)) p := pl.(*PodTopologySpread) b.ResetTimer() for i := 0; i < b.N; i++ { @@ -3007,7 +3007,7 @@ func TestSingleConstraint(t *testing.T) { t.Run(tt.name, func(t *testing.T) { _, ctx := ktesting.NewTestContext(t) snapshot := cache.NewSnapshot(tt.existingPods, tt.nodes) - pl := plugintesting.SetupPlugin(t, topologySpreadFunc, &config.PodTopologySpreadArgs{DefaultingType: config.ListDefaulting}, snapshot) + pl := plugintesting.SetupPlugin(ctx, t, topologySpreadFunc, &config.PodTopologySpreadArgs{DefaultingType: config.ListDefaulting}, snapshot) p := pl.(*PodTopologySpread) p.enableMinDomainsInPodTopologySpread = tt.enableMinDomains p.enableNodeInclusionPolicyInPodTopologySpread = tt.enableNodeInclusionPolicy @@ -3352,7 +3352,7 @@ func TestMultipleConstraints(t *testing.T) { t.Run(tt.name, func(t *testing.T) { _, ctx := ktesting.NewTestContext(t) snapshot := cache.NewSnapshot(tt.existingPods, tt.nodes) - pl := plugintesting.SetupPlugin(t, topologySpreadFunc, &config.PodTopologySpreadArgs{DefaultingType: config.ListDefaulting}, snapshot) + pl := plugintesting.SetupPlugin(ctx, t, topologySpreadFunc, &config.PodTopologySpreadArgs{DefaultingType: config.ListDefaulting}, snapshot) p := pl.(*PodTopologySpread) p.enableNodeInclusionPolicyInPodTopologySpread = tt.enableNodeInclusionPolicy state := framework.NewCycleState() @@ -3376,7 +3376,8 @@ func TestPreFilterDisabled(t *testing.T) { nodeInfo := framework.NewNodeInfo() node := v1.Node{} nodeInfo.SetNode(&node) - p := plugintesting.SetupPlugin(t, topologySpreadFunc, &config.PodTopologySpreadArgs{DefaultingType: config.ListDefaulting}, cache.NewEmptySnapshot()) + _, ctx := ktesting.NewTestContext(t) + p := plugintesting.SetupPlugin(ctx, t, topologySpreadFunc, &config.PodTopologySpreadArgs{DefaultingType: config.ListDefaulting}, cache.NewEmptySnapshot()) cycleState := framework.NewCycleState() gotStatus := p.(*PodTopologySpread).Filter(context.Background(), cycleState, pod, nodeInfo) wantStatus := framework.AsStatus(fmt.Errorf(`reading "PreFilterPodTopologySpread" from cycleState: %w`, framework.ErrNotFound)) diff --git a/pkg/scheduler/framework/plugins/podtopologyspread/scoring_test.go b/pkg/scheduler/framework/plugins/podtopologyspread/scoring_test.go index 40221ce67b3..9870f082ef8 100644 --- a/pkg/scheduler/framework/plugins/podtopologyspread/scoring_test.go +++ b/pkg/scheduler/framework/plugins/podtopologyspread/scoring_test.go @@ -29,6 +29,7 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/fake" + "k8s.io/klog/v2/ktesting" "k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature" @@ -526,10 +527,11 @@ func TestPreScoreStateEmptyNodes(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) defer cancel() informerFactory := informers.NewSharedInformerFactory(fake.NewSimpleClientset(tt.objs...), 0) - f, err := frameworkruntime.NewFramework(nil, nil, ctx.Done(), + f, err := frameworkruntime.NewFramework(ctx, nil, nil, frameworkruntime.WithSnapshotSharedLister(cache.NewSnapshot(nil, tt.nodes)), frameworkruntime.WithInformerFactory(informerFactory)) if err != nil { @@ -1364,12 +1366,13 @@ func BenchmarkTestPodTopologySpreadScore(b *testing.B) { } for _, tt := range tests { b.Run(tt.name, func(b *testing.B) { + _, ctx := ktesting.NewTestContext(b) existingPods, allNodes, filteredNodes := st.MakeNodesAndPodsForEvenPodsSpread(tt.pod.Labels, tt.existingPodsNum, tt.allNodesNum, tt.filteredNodesNum) state := framework.NewCycleState() - pl := plugintesting.SetupPlugin(b, podTopologySpreadFunc, &config.PodTopologySpreadArgs{DefaultingType: config.ListDefaulting}, cache.NewSnapshot(existingPods, allNodes)) + pl := plugintesting.SetupPlugin(ctx, b, podTopologySpreadFunc, &config.PodTopologySpreadArgs{DefaultingType: config.ListDefaulting}, cache.NewSnapshot(existingPods, allNodes)) p := pl.(*PodTopologySpread) - status := p.PreScore(context.Background(), state, tt.pod, filteredNodes) + status := p.PreScore(ctx, state, tt.pod, filteredNodes) if !status.IsSuccess() { b.Fatalf("unexpected error: %v", status) } @@ -1434,10 +1437,11 @@ func BenchmarkTestDefaultEvenPodsSpreadPriority(b *testing.B) { client := fake.NewSimpleClientset( &v1.Service{Spec: v1.ServiceSpec{Selector: map[string]string{"foo": ""}}}, ) - ctx, cancel := context.WithCancel(context.Background()) + _, ctx := ktesting.NewTestContext(b) + ctx, cancel := context.WithCancel(ctx) defer cancel() informerFactory := informers.NewSharedInformerFactory(client, 0) - f, err := frameworkruntime.NewFramework(nil, nil, ctx.Done(), + f, err := frameworkruntime.NewFramework(ctx, nil, nil, frameworkruntime.WithSnapshotSharedLister(snapshot), frameworkruntime.WithInformerFactory(informerFactory)) if err != nil { diff --git a/pkg/scheduler/framework/plugins/selectorspread/selector_spread_perf_test.go b/pkg/scheduler/framework/plugins/selectorspread/selector_spread_perf_test.go index 6ff35783895..820bf1b24ff 100644 --- a/pkg/scheduler/framework/plugins/selectorspread/selector_spread_perf_test.go +++ b/pkg/scheduler/framework/plugins/selectorspread/selector_spread_perf_test.go @@ -23,6 +23,7 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/fake" + "k8s.io/klog/v2/ktesting" "k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework/parallelize" "k8s.io/kubernetes/pkg/scheduler/framework/runtime" @@ -58,7 +59,8 @@ func BenchmarkTestSelectorSpreadPriority(b *testing.B) { client := fake.NewSimpleClientset( &v1.Service{Spec: v1.ServiceSpec{Selector: map[string]string{"foo": ""}}}, ) - ctx, cancel := context.WithCancel(context.Background()) + _, ctx := ktesting.NewTestContext(b) + ctx, cancel := context.WithCancel(ctx) defer cancel() informerFactory := informers.NewSharedInformerFactory(client, 0) _ = informerFactory.Core().V1().Services().Lister() @@ -69,7 +71,7 @@ func BenchmarkTestSelectorSpreadPriority(b *testing.B) { b.Errorf("error waiting for informer cache sync") } } - fh, _ := runtime.NewFramework(nil, nil, ctx.Done(), runtime.WithSnapshotSharedLister(snapshot), runtime.WithInformerFactory(informerFactory)) + fh, _ := runtime.NewFramework(ctx, nil, nil, runtime.WithSnapshotSharedLister(snapshot), runtime.WithInformerFactory(informerFactory)) pl, err := New(nil, fh) if err != nil { b.Fatal(err) diff --git a/pkg/scheduler/framework/plugins/selectorspread/selector_spread_test.go b/pkg/scheduler/framework/plugins/selectorspread/selector_spread_test.go index 69564fb2be7..cb8ceb3ca3d 100644 --- a/pkg/scheduler/framework/plugins/selectorspread/selector_spread_test.go +++ b/pkg/scheduler/framework/plugins/selectorspread/selector_spread_test.go @@ -28,9 +28,9 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/informers" clientsetfake "k8s.io/client-go/kubernetes/fake" + "k8s.io/klog/v2/ktesting" "k8s.io/kubernetes/pkg/scheduler/framework" frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime" "k8s.io/kubernetes/pkg/scheduler/internal/cache" @@ -397,13 +397,14 @@ func TestSelectorSpreadScore(t *testing.T) { t.Run(test.name, func(t *testing.T) { nodes := makeNodeList(test.nodes) snapshot := cache.NewSnapshot(test.pods, nodes) - ctx, cancel := context.WithCancel(context.Background()) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) defer cancel() informerFactory, err := populateAndStartInformers(ctx, test.rcs, test.rss, test.services, test.sss) if err != nil { t.Errorf("error creating informerFactory: %+v", err) } - fh, err := frameworkruntime.NewFramework(nil, nil, ctx.Done(), frameworkruntime.WithSnapshotSharedLister(snapshot), frameworkruntime.WithInformerFactory(informerFactory)) + fh, err := frameworkruntime.NewFramework(ctx, nil, nil, frameworkruntime.WithSnapshotSharedLister(snapshot), frameworkruntime.WithInformerFactory(informerFactory)) if err != nil { t.Errorf("error creating new framework handle: %+v", err) } @@ -655,13 +656,14 @@ func TestZoneSelectorSpreadPriority(t *testing.T) { t.Run(test.name, func(t *testing.T) { nodes := makeLabeledNodeList(labeledNodes) snapshot := cache.NewSnapshot(test.pods, nodes) - ctx, cancel := context.WithCancel(context.Background()) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) defer cancel() informerFactory, err := populateAndStartInformers(ctx, test.rcs, test.rss, test.services, test.sss) if err != nil { t.Errorf("error creating informerFactory: %+v", err) } - fh, err := frameworkruntime.NewFramework(nil, nil, wait.NeverStop, frameworkruntime.WithSnapshotSharedLister(snapshot), frameworkruntime.WithInformerFactory(informerFactory)) + fh, err := frameworkruntime.NewFramework(ctx, nil, nil, frameworkruntime.WithSnapshotSharedLister(snapshot), frameworkruntime.WithInformerFactory(informerFactory)) if err != nil { t.Errorf("error creating new framework handle: %+v", err) } diff --git a/pkg/scheduler/framework/plugins/tainttoleration/taint_toleration_test.go b/pkg/scheduler/framework/plugins/tainttoleration/taint_toleration_test.go index 15889c40420..fca4262d815 100644 --- a/pkg/scheduler/framework/plugins/tainttoleration/taint_toleration_test.go +++ b/pkg/scheduler/framework/plugins/tainttoleration/taint_toleration_test.go @@ -23,6 +23,7 @@ import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/klog/v2/ktesting" "k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework/runtime" "k8s.io/kubernetes/pkg/scheduler/internal/cache" @@ -228,12 +229,13 @@ func TestTaintTolerationScore(t *testing.T) { } for _, test := range tests { t.Run(test.name, func(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) defer cancel() state := framework.NewCycleState() snapshot := cache.NewSnapshot(nil, test.nodes) - fh, _ := runtime.NewFramework(nil, nil, ctx.Done(), runtime.WithSnapshotSharedLister(snapshot)) + fh, _ := runtime.NewFramework(ctx, nil, nil, runtime.WithSnapshotSharedLister(snapshot)) p, _ := New(nil, fh) status := p.(framework.PreScorePlugin).PreScore(ctx, state, test.pod, test.nodes) diff --git a/pkg/scheduler/framework/plugins/testing/testing.go b/pkg/scheduler/framework/plugins/testing/testing.go index 70021bb36b3..2a525deb09f 100644 --- a/pkg/scheduler/framework/plugins/testing/testing.go +++ b/pkg/scheduler/framework/plugins/testing/testing.go @@ -23,7 +23,6 @@ import ( "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/fake" "k8s.io/kubernetes/pkg/scheduler/framework" @@ -44,7 +43,7 @@ func SetupPluginWithInformers( ) framework.Plugin { objs = append([]runtime.Object{&v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: ""}}}, objs...) informerFactory := informers.NewSharedInformerFactory(fake.NewSimpleClientset(objs...), 0) - fh, err := frameworkruntime.NewFramework(nil, nil, wait.NeverStop, + fh, err := frameworkruntime.NewFramework(ctx, nil, nil, frameworkruntime.WithSnapshotSharedLister(sharedLister), frameworkruntime.WithInformerFactory(informerFactory)) if err != nil { @@ -62,12 +61,13 @@ func SetupPluginWithInformers( // SetupPlugin creates a plugin using a framework handle that includes // the provided sharedLister. func SetupPlugin( + ctx context.Context, tb testing.TB, pf frameworkruntime.PluginFactory, config runtime.Object, sharedLister framework.SharedLister, ) framework.Plugin { - fh, err := frameworkruntime.NewFramework(nil, nil, wait.NeverStop, + fh, err := frameworkruntime.NewFramework(ctx, nil, nil, frameworkruntime.WithSnapshotSharedLister(sharedLister)) if err != nil { tb.Fatalf("Failed creating framework runtime: %v", err) diff --git a/pkg/scheduler/framework/plugins/volumebinding/volume_binding_test.go b/pkg/scheduler/framework/plugins/volumebinding/volume_binding_test.go index aea8e054d01..b17244a0491 100644 --- a/pkg/scheduler/framework/plugins/volumebinding/volume_binding_test.go +++ b/pkg/scheduler/framework/plugins/volumebinding/volume_binding_test.go @@ -28,9 +28,9 @@ import ( "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/sets" - "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/fake" + "k8s.io/klog/v2/ktesting" "k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature" @@ -781,7 +781,8 @@ func TestVolumeBinding(t *testing.T) { for _, item := range table { t.Run(item.name, func(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) defer cancel() client := fake.NewSimpleClientset() informerFactory := informers.NewSharedInformerFactory(client, 0) @@ -789,7 +790,7 @@ func TestVolumeBinding(t *testing.T) { runtime.WithClientSet(client), runtime.WithInformerFactory(informerFactory), } - fh, err := runtime.NewFramework(nil, nil, wait.NeverStop, opts...) + fh, err := runtime.NewFramework(ctx, nil, nil, opts...) if err != nil { t.Fatal(err) } diff --git a/pkg/scheduler/framework/preemption/preemption_test.go b/pkg/scheduler/framework/preemption/preemption_test.go index d248857e59e..ff68f2309a7 100644 --- a/pkg/scheduler/framework/preemption/preemption_test.go +++ b/pkg/scheduler/framework/preemption/preemption_test.go @@ -281,10 +281,12 @@ func TestDryRunPreemption(t *testing.T) { } informerFactory := informers.NewSharedInformerFactory(clientsetfake.NewSimpleClientset(objs...), 0) parallelism := parallelize.DefaultParallelism - ctx, cancel := context.WithCancel(context.Background()) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) defer cancel() fwk, err := st.NewFramework( - registeredPlugins, "", ctx.Done(), + ctx, + registeredPlugins, "", frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())), frameworkruntime.WithInformerFactory(informerFactory), frameworkruntime.WithParallelism(parallelism), diff --git a/pkg/scheduler/framework/runtime/framework.go b/pkg/scheduler/framework/runtime/framework.go index f46400768ac..4380a404ef1 100644 --- a/pkg/scheduler/framework/runtime/framework.go +++ b/pkg/scheduler/framework/runtime/framework.go @@ -250,13 +250,13 @@ func defaultFrameworkOptions(stopCh <-chan struct{}) frameworkOptions { var _ framework.Framework = &frameworkImpl{} // NewFramework initializes plugins given the configuration and the registry. -func NewFramework(r Registry, profile *config.KubeSchedulerProfile, stopCh <-chan struct{}, opts ...Option) (framework.Framework, error) { - options := defaultFrameworkOptions(stopCh) +func NewFramework(ctx context.Context, r Registry, profile *config.KubeSchedulerProfile, opts ...Option) (framework.Framework, error) { + options := defaultFrameworkOptions(ctx.Done()) for _, opt := range opts { opt(&options) } - logger := klog.TODO() + logger := klog.FromContext(ctx) if options.logger != nil { logger = *options.logger } diff --git a/pkg/scheduler/framework/runtime/framework_test.go b/pkg/scheduler/framework/runtime/framework_test.go index 43e6c0099a8..e611ac77d36 100644 --- a/pkg/scheduler/framework/runtime/framework_test.go +++ b/pkg/scheduler/framework/runtime/framework_test.go @@ -31,8 +31,8 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" - "k8s.io/apimachinery/pkg/util/wait" "k8s.io/component-base/metrics/testutil" + "k8s.io/klog/v2/ktesting" "k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/framework" internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue" @@ -405,7 +405,7 @@ var ( errInjectedFilterStatus = errors.New(injectFilterReason) ) -func newFrameworkWithQueueSortAndBind(r Registry, profile config.KubeSchedulerProfile, stopCh <-chan struct{}, opts ...Option) (framework.Framework, error) { +func newFrameworkWithQueueSortAndBind(ctx context.Context, r Registry, profile config.KubeSchedulerProfile, opts ...Option) (framework.Framework, error) { if _, ok := r[queueSortPlugin]; !ok { r[queueSortPlugin] = newQueueSortPlugin } @@ -419,7 +419,7 @@ func newFrameworkWithQueueSortAndBind(r Registry, profile config.KubeSchedulerPr if len(profile.Plugins.Bind.Enabled) == 0 { profile.Plugins.Bind.Enabled = append(profile.Plugins.Bind.Enabled, config.Plugin{Name: bindPlugin}) } - return NewFramework(r, &profile, stopCh, opts...) + return NewFramework(ctx, r, &profile, opts...) } func TestInitFrameworkWithScorePlugins(t *testing.T) { @@ -460,9 +460,10 @@ func TestInitFrameworkWithScorePlugins(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { profile := config.KubeSchedulerProfile{Plugins: tt.plugins} - stopCh := make(chan struct{}) - defer close(stopCh) - _, err := newFrameworkWithQueueSortAndBind(registry, profile, stopCh) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + _, err := newFrameworkWithQueueSortAndBind(ctx, registry, profile) if tt.initErr && err == nil { t.Fatal("Framework initialization should fail") } @@ -514,11 +515,14 @@ func TestNewFrameworkErrors(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + defer cancel() profile := &config.KubeSchedulerProfile{ Plugins: tc.plugins, PluginConfig: tc.pluginCfg, } - _, err := NewFramework(registry, profile, wait.NeverStop) + _, err := NewFramework(ctx, registry, profile) if err == nil || !strings.Contains(err.Error(), tc.wantErr) { t.Errorf("Unexpected error, got %v, expect: %s", err, tc.wantErr) } @@ -826,9 +830,10 @@ func TestNewFrameworkMultiPointExpansion(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - stopCh := make(chan struct{}) - defer close(stopCh) - fw, err := NewFramework(registry, &config.KubeSchedulerProfile{Plugins: tc.plugins}, stopCh) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + fw, err := NewFramework(ctx, registry, &config.KubeSchedulerProfile{Plugins: tc.plugins}) if err != nil { if tc.wantErr == "" || !strings.Contains(err.Error(), tc.wantErr) { t.Fatalf("Unexpected error, got %v, expect: %s", err, tc.wantErr) @@ -993,9 +998,10 @@ func TestNewFrameworkFillEventToPluginMap(t *testing.T) { got := make(map[framework.ClusterEvent]sets.Set[string]) profile := config.KubeSchedulerProfile{Plugins: cfgPls} - stopCh := make(chan struct{}) - defer close(stopCh) - _, err := newFrameworkWithQueueSortAndBind(registry, profile, stopCh, WithClusterEventMap(got)) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + _, err := newFrameworkWithQueueSortAndBind(ctx, registry, profile, WithClusterEventMap(got)) if err != nil { t.Fatal(err) } @@ -1048,7 +1054,7 @@ func TestPreEnqueuePlugins(t *testing.T) { profile := config.KubeSchedulerProfile{Plugins: cfgPls} ctx, cancel := context.WithCancel(context.Background()) defer cancel() - f, err := newFrameworkWithQueueSortAndBind(registry, profile, ctx.Done()) + f, err := newFrameworkWithQueueSortAndBind(ctx, registry, profile) if err != nil { t.Fatalf("fail to create framework: %s", err) } @@ -1165,9 +1171,9 @@ func TestRunPreScorePlugins(t *testing.T) { defer cancel() f, err := newFrameworkWithQueueSortAndBind( + ctx, r, config.KubeSchedulerProfile{Plugins: &config.Plugins{PreScore: config.PluginSet{Enabled: enabled}}}, - ctx.Done(), ) if err != nil { t.Fatalf("Failed to create framework for testing: %v", err) @@ -1546,7 +1552,7 @@ func TestRunScorePlugins(t *testing.T) { } ctx, cancel := context.WithCancel(context.Background()) defer cancel() - f, err := newFrameworkWithQueueSortAndBind(registry, profile, ctx.Done()) + f, err := newFrameworkWithQueueSortAndBind(ctx, registry, profile) if err != nil { t.Fatalf("Failed to create framework for testing: %v", err) } @@ -1590,7 +1596,7 @@ func TestPreFilterPlugins(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - f, err := newFrameworkWithQueueSortAndBind(r, profile, ctx.Done()) + f, err := newFrameworkWithQueueSortAndBind(ctx, r, profile) if err != nil { t.Fatalf("Failed to create framework for testing: %v", err) } @@ -1724,9 +1730,9 @@ func TestRunPreFilterPlugins(t *testing.T) { defer cancel() f, err := newFrameworkWithQueueSortAndBind( + ctx, r, config.KubeSchedulerProfile{Plugins: &config.Plugins{PreFilter: config.PluginSet{Enabled: enabled}}}, - ctx.Done(), ) if err != nil { t.Fatalf("Failed to create framework for testing: %v", err) @@ -1812,9 +1818,9 @@ func TestRunPreFilterExtensionRemovePod(t *testing.T) { defer cancel() f, err := newFrameworkWithQueueSortAndBind( + ctx, r, config.KubeSchedulerProfile{Plugins: &config.Plugins{PreFilter: config.PluginSet{Enabled: enabled}}}, - ctx.Done(), ) if err != nil { t.Fatalf("Failed to create framework for testing: %v", err) @@ -1894,9 +1900,9 @@ func TestRunPreFilterExtensionAddPod(t *testing.T) { defer cancel() f, err := newFrameworkWithQueueSortAndBind( + ctx, r, config.KubeSchedulerProfile{Plugins: &config.Plugins{PreFilter: config.PluginSet{Enabled: enabled}}}, - ctx.Done(), ) if err != nil { t.Fatalf("Failed to create framework for testing: %v", err) @@ -2100,7 +2106,7 @@ func TestFilterPlugins(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - f, err := newFrameworkWithQueueSortAndBind(registry, profile, ctx.Done()) + f, err := newFrameworkWithQueueSortAndBind(ctx, registry, profile) if err != nil { t.Fatalf("fail to create framework: %s", err) } @@ -2224,7 +2230,7 @@ func TestPostFilterPlugins(t *testing.T) { profile := config.KubeSchedulerProfile{Plugins: cfgPls} ctx, cancel := context.WithCancel(context.Background()) defer cancel() - f, err := newFrameworkWithQueueSortAndBind(registry, profile, ctx.Done()) + f, err := newFrameworkWithQueueSortAndBind(ctx, registry, profile) if err != nil { t.Fatalf("fail to create framework: %s", err) } @@ -2373,7 +2379,7 @@ func TestFilterPluginsWithNominatedPods(t *testing.T) { profile := config.KubeSchedulerProfile{Plugins: cfgPls} ctx, cancel := context.WithCancel(context.Background()) defer cancel() - f, err := newFrameworkWithQueueSortAndBind(registry, profile, ctx.Done(), WithPodNominator(podNominator)) + f, err := newFrameworkWithQueueSortAndBind(ctx, registry, profile, WithPodNominator(podNominator)) if err != nil { t.Fatalf("fail to create framework: %s", err) } @@ -2530,7 +2536,7 @@ func TestPreBindPlugins(t *testing.T) { profile := config.KubeSchedulerProfile{Plugins: configPlugins} ctx, cancel := context.WithCancel(context.Background()) defer cancel() - f, err := newFrameworkWithQueueSortAndBind(registry, profile, ctx.Done()) + f, err := newFrameworkWithQueueSortAndBind(ctx, registry, profile) if err != nil { t.Fatalf("fail to create framework: %s", err) } @@ -2688,7 +2694,7 @@ func TestReservePlugins(t *testing.T) { profile := config.KubeSchedulerProfile{Plugins: configPlugins} ctx, cancel := context.WithCancel(context.Background()) defer cancel() - f, err := newFrameworkWithQueueSortAndBind(registry, profile, ctx.Done()) + f, err := newFrameworkWithQueueSortAndBind(ctx, registry, profile) if err != nil { t.Fatalf("fail to create framework: %s", err) } @@ -2814,7 +2820,7 @@ func TestPermitPlugins(t *testing.T) { profile := config.KubeSchedulerProfile{Plugins: configPlugins} ctx, cancel := context.WithCancel(context.Background()) defer cancel() - f, err := newFrameworkWithQueueSortAndBind(registry, profile, ctx.Done()) + f, err := newFrameworkWithQueueSortAndBind(ctx, registry, profile) if err != nil { t.Fatalf("fail to create framework: %s", err) } @@ -2984,23 +2990,25 @@ func TestRecordingMetrics(t *testing.T) { PostBind: pluginSet, } - stopCh := make(chan struct{}) - recorder := metrics.NewMetricsAsyncRecorder(100, time.Nanosecond, stopCh) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + + recorder := metrics.NewMetricsAsyncRecorder(100, time.Nanosecond, ctx.Done()) profile := config.KubeSchedulerProfile{ PercentageOfNodesToScore: pointer.Int32(testPercentageOfNodesToScore), SchedulerName: testProfileName, Plugins: plugins, } - f, err := newFrameworkWithQueueSortAndBind(r, profile, stopCh, withMetricsRecorder(recorder)) + f, err := newFrameworkWithQueueSortAndBind(ctx, r, profile, withMetricsRecorder(recorder)) if err != nil { - close(stopCh) + cancel() t.Fatalf("Failed to create framework for testing: %v", err) } tt.action(f) // Stop the goroutine which records metrics and ensure it's stopped. - close(stopCh) + cancel() <-recorder.IsStoppedCh // Try to clean up the metrics buffer again in case it's not empty. recorder.FlushMetrics() @@ -3096,16 +3104,17 @@ func TestRunBindPlugins(t *testing.T) { pluginSet.Enabled = append(pluginSet.Enabled, config.Plugin{Name: name}) } plugins := &config.Plugins{Bind: pluginSet} - stopCh := make(chan struct{}) - recorder := metrics.NewMetricsAsyncRecorder(100, time.Nanosecond, stopCh) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + recorder := metrics.NewMetricsAsyncRecorder(100, time.Nanosecond, ctx.Done()) profile := config.KubeSchedulerProfile{ SchedulerName: testProfileName, PercentageOfNodesToScore: pointer.Int32(testPercentageOfNodesToScore), Plugins: plugins, } - fwk, err := newFrameworkWithQueueSortAndBind(r, profile, stopCh, withMetricsRecorder(recorder)) + fwk, err := newFrameworkWithQueueSortAndBind(ctx, r, profile, withMetricsRecorder(recorder)) if err != nil { - close(stopCh) + cancel() t.Fatal(err) } @@ -3115,7 +3124,7 @@ func TestRunBindPlugins(t *testing.T) { } // Stop the goroutine which records metrics and ensure it's stopped. - close(stopCh) + cancel() <-recorder.IsStoppedCh // Try to clean up the metrics buffer again in case it's not empty. recorder.FlushMetrics() @@ -3160,7 +3169,7 @@ func TestPermitWaitDurationMetric(t *testing.T) { profile := config.KubeSchedulerProfile{Plugins: plugins} ctx, cancel := context.WithCancel(context.Background()) defer cancel() - f, err := newFrameworkWithQueueSortAndBind(r, profile, ctx.Done()) + f, err := newFrameworkWithQueueSortAndBind(ctx, r, profile) if err != nil { t.Fatalf("Failed to create framework for testing: %v", err) } @@ -3216,7 +3225,7 @@ func TestWaitOnPermit(t *testing.T) { profile := config.KubeSchedulerProfile{Plugins: plugins} ctx, cancel := context.WithCancel(context.Background()) defer cancel() - f, err := newFrameworkWithQueueSortAndBind(r, profile, ctx.Done()) + f, err := newFrameworkWithQueueSortAndBind(ctx, r, profile) if err != nil { t.Fatalf("Failed to create framework for testing: %v", err) } @@ -3267,9 +3276,10 @@ func TestListPlugins(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { profile := config.KubeSchedulerProfile{Plugins: tt.plugins} - stopCh := make(chan struct{}) - defer close(stopCh) - f, err := newFrameworkWithQueueSortAndBind(registry, profile, stopCh) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + f, err := newFrameworkWithQueueSortAndBind(ctx, registry, profile) if err != nil { t.Fatalf("Failed to create framework for testing: %v", err) } diff --git a/pkg/scheduler/profile/profile.go b/pkg/scheduler/profile/profile.go index b023b03f628..3846d9720df 100644 --- a/pkg/scheduler/profile/profile.go +++ b/pkg/scheduler/profile/profile.go @@ -18,6 +18,7 @@ limitations under the License. package profile import ( + "context" "errors" "fmt" @@ -34,24 +35,24 @@ import ( type RecorderFactory func(string) events.EventRecorder // newProfile builds a Profile for the given configuration. -func newProfile(cfg config.KubeSchedulerProfile, r frameworkruntime.Registry, recorderFact RecorderFactory, - stopCh <-chan struct{}, opts ...frameworkruntime.Option) (framework.Framework, error) { +func newProfile(ctx context.Context, cfg config.KubeSchedulerProfile, r frameworkruntime.Registry, recorderFact RecorderFactory, + opts ...frameworkruntime.Option) (framework.Framework, error) { recorder := recorderFact(cfg.SchedulerName) opts = append(opts, frameworkruntime.WithEventRecorder(recorder)) - return frameworkruntime.NewFramework(r, &cfg, stopCh, opts...) + return frameworkruntime.NewFramework(ctx, r, &cfg, opts...) } // Map holds frameworks indexed by scheduler name. type Map map[string]framework.Framework // NewMap builds the frameworks given by the configuration, indexed by name. -func NewMap(cfgs []config.KubeSchedulerProfile, r frameworkruntime.Registry, recorderFact RecorderFactory, - stopCh <-chan struct{}, opts ...frameworkruntime.Option) (Map, error) { +func NewMap(ctx context.Context, cfgs []config.KubeSchedulerProfile, r frameworkruntime.Registry, recorderFact RecorderFactory, + opts ...frameworkruntime.Option) (Map, error) { m := make(Map) v := cfgValidator{m: m} for _, cfg := range cfgs { - p, err := newProfile(cfg, r, recorderFact, stopCh, opts...) + p, err := newProfile(ctx, cfg, r, recorderFact, opts...) if err != nil { return nil, fmt.Errorf("creating profile for scheduler name %s: %v", cfg.SchedulerName, err) } diff --git a/pkg/scheduler/profile/profile_test.go b/pkg/scheduler/profile/profile_test.go index 018d42b29dd..6d0201bb24a 100644 --- a/pkg/scheduler/profile/profile_test.go +++ b/pkg/scheduler/profile/profile_test.go @@ -25,6 +25,7 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/tools/events" + "k8s.io/klog/v2/ktesting" "k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/framework" frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime" @@ -246,9 +247,10 @@ func TestNewMap(t *testing.T) { } for _, tc := range cases { t.Run(tc.name, func(t *testing.T) { - stopCh := make(chan struct{}) - defer close(stopCh) - m, err := NewMap(tc.cfgs, fakeRegistry, nilRecorderFactory, stopCh) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + m, err := NewMap(ctx, tc.cfgs, fakeRegistry, nilRecorderFactory) if err := checkErr(err, tc.wantErr); err != nil { t.Fatal(err) } diff --git a/pkg/scheduler/schedule_one_test.go b/pkg/scheduler/schedule_one_test.go index 4de3aa51120..098731e9206 100644 --- a/pkg/scheduler/schedule_one_test.go +++ b/pkg/scheduler/schedule_one_test.go @@ -395,11 +395,11 @@ func TestSchedulerMultipleProfilesScheduling(t *testing.T) { informerFactory := informers.NewSharedInformerFactory(client, 0) sched, err := New( + ctx, client, informerFactory, nil, profile.NewRecorderFactory(broadcaster), - ctx.Done(), WithProfiles( schedulerapi.KubeSchedulerProfile{SchedulerName: "match-node2", Plugins: &schedulerapi.Plugins{ @@ -526,11 +526,11 @@ func TestSchedulerGuaranteeNonNilNodeInSchedulingCycle(t *testing.T) { informerFactory := informers.NewSharedInformerFactory(client, 0) sched, err := New( + ctx, client, informerFactory, nil, profile.NewRecorderFactory(broadcaster), - ctx.Done(), WithProfiles( schedulerapi.KubeSchedulerProfile{SchedulerName: fakeSchedulerName, Plugins: &schedulerapi.Plugins{ @@ -748,9 +748,9 @@ func TestSchedulerScheduleOne(t *testing.T) { ) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - fwk, err := st.NewFramework(registerPluginFuncs, + fwk, err := st.NewFramework(ctx, + registerPluginFuncs, testSchedulerName, - ctx.Done(), frameworkruntime.WithClientSet(client), frameworkruntime.WithEventRecorder(eventBroadcaster.NewRecorder(scheme.Scheme, testSchedulerName))) if err != nil { @@ -1223,10 +1223,11 @@ func TestSchedulerBinding(t *testing.T) { }) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - fwk, err := st.NewFramework([]st.RegisterPluginFunc{ - st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), - st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), - }, "", ctx.Done(), frameworkruntime.WithClientSet(client), frameworkruntime.WithEventRecorder(&events.FakeRecorder{})) + fwk, err := st.NewFramework(ctx, + []st.RegisterPluginFunc{ + st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), + st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), + }, "", frameworkruntime.WithClientSet(client), frameworkruntime.WithEventRecorder(&events.FakeRecorder{})) if err != nil { t.Fatal(err) } @@ -2229,7 +2230,8 @@ func TestSchedulerSchedulePod(t *testing.T) { } snapshot := internalcache.NewSnapshot(test.pods, nodes) fwk, err := st.NewFramework( - test.registerPlugins, "", ctx.Done(), + ctx, + test.registerPlugins, "", frameworkruntime.WithSnapshotSharedLister(snapshot), frameworkruntime.WithInformerFactory(informerFactory), frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())), @@ -2282,6 +2284,7 @@ func TestFindFitAllError(t *testing.T) { scheduler := makeScheduler(ctx, nodes) fwk, err := st.NewFramework( + ctx, []st.RegisterPluginFunc{ st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), st.RegisterFilterPlugin("TrueFilter", st.NewTrueFilterPlugin), @@ -2289,7 +2292,6 @@ func TestFindFitAllError(t *testing.T) { st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), }, "", - ctx.Done(), frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(nil)), ) if err != nil { @@ -2322,6 +2324,7 @@ func TestFindFitSomeError(t *testing.T) { scheduler := makeScheduler(ctx, nodes) fwk, err := st.NewFramework( + ctx, []st.RegisterPluginFunc{ st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), st.RegisterFilterPlugin("TrueFilter", st.NewTrueFilterPlugin), @@ -2329,7 +2332,6 @@ func TestFindFitSomeError(t *testing.T) { st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), }, "", - ctx.Done(), frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(nil)), ) if err != nil { @@ -2404,7 +2406,8 @@ func TestFindFitPredicateCallCounts(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() fwk, err := st.NewFramework( - registerPlugins, "", ctx.Done(), + ctx, + registerPlugins, "", frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(nil)), ) if err != nil { @@ -2540,7 +2543,8 @@ func TestZeroRequest(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() fwk, err := st.NewFramework( - pluginRegistrations, "", ctx.Done(), + ctx, + pluginRegistrations, "", frameworkruntime.WithInformerFactory(informerFactory), frameworkruntime.WithSnapshotSharedLister(snapshot), frameworkruntime.WithClientSet(client), @@ -2773,8 +2777,8 @@ func Test_prioritizeNodes(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() fwk, err := st.NewFramework( + ctx, test.pluginRegistrations, "", - ctx.Done(), frameworkruntime.WithInformerFactory(informerFactory), frameworkruntime.WithSnapshotSharedLister(snapshot), frameworkruntime.WithClientSet(client), @@ -2888,13 +2892,13 @@ func TestFairEvaluationForNodes(t *testing.T) { sched := makeScheduler(ctx, nodes) fwk, err := st.NewFramework( + ctx, []st.RegisterPluginFunc{ st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), st.RegisterFilterPlugin("TrueFilter", st.NewTrueFilterPlugin), st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), }, "", - ctx.Done(), frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(nil)), ) if err != nil { @@ -2972,7 +2976,8 @@ func TestPreferNominatedNodeFilterCallCounts(t *testing.T) { st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), } fwk, err := st.NewFramework( - registerPlugins, "", ctx.Done(), + ctx, + registerPlugins, "", frameworkruntime.WithClientSet(client), frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())), ) @@ -3123,9 +3128,9 @@ func setupTestScheduler(ctx context.Context, t *testing.T, queuedPodStore *clien schedulingQueue := internalqueue.NewTestQueueWithInformerFactory(ctx, nil, informerFactory) fwk, _ := st.NewFramework( + ctx, fns, testSchedulerName, - ctx.Done(), frameworkruntime.WithClientSet(client), frameworkruntime.WithEventRecorder(recorder), frameworkruntime.WithInformerFactory(informerFactory), diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index ff298e8b98f..43d8b3c974e 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -239,17 +239,14 @@ var defaultSchedulerOptions = schedulerOptions{ } // New returns a Scheduler -func New(client clientset.Interface, +func New(ctx context.Context, + client clientset.Interface, informerFactory informers.SharedInformerFactory, dynInformerFactory dynamicinformer.DynamicSharedInformerFactory, recorderFactory profile.RecorderFactory, - stopCh <-chan struct{}, opts ...Option) (*Scheduler, error) { - stopEverything := stopCh - if stopEverything == nil { - stopEverything = wait.NeverStop - } + stopEverything := ctx.Done() options := defaultSchedulerOptions for _, opt := range opts { @@ -283,9 +280,9 @@ func New(client clientset.Interface, snapshot := internalcache.NewEmptySnapshot() clusterEventMap := make(map[framework.ClusterEvent]sets.Set[string]) - metricsRecorder := metrics.NewMetricsAsyncRecorder(1000, time.Second, stopCh) + metricsRecorder := metrics.NewMetricsAsyncRecorder(1000, time.Second, stopEverything) - profiles, err := profile.NewMap(options.profiles, registry, recorderFactory, stopCh, + profiles, err := profile.NewMap(ctx, options.profiles, registry, recorderFactory, frameworkruntime.WithComponentConfigVersion(options.componentConfigVersion), frameworkruntime.WithClientSet(client), frameworkruntime.WithKubeConfig(options.kubeConfig), diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index 329eb6b1e56..dc9b2153ba9 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -36,6 +36,7 @@ import ( "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/events" + "k8s.io/klog/v2/ktesting" schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultbinder" @@ -173,14 +174,15 @@ func TestSchedulerCreation(t *testing.T) { eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1()}) - stopCh := make(chan struct{}) - defer close(stopCh) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + defer cancel() s, err := New( + ctx, client, informerFactory, nil, profile.NewRecorderFactory(eventBroadcaster), - stopCh, tc.opts..., ) @@ -284,7 +286,7 @@ func TestFailureHandler(t *testing.T) { queue.Delete(testPod) } - s, fwk, err := initScheduler(ctx.Done(), schedulerCache, queue, client, informerFactory) + s, fwk, err := initScheduler(ctx, schedulerCache, queue, client, informerFactory) if err != nil { t.Fatal(err) } @@ -359,7 +361,7 @@ func TestFailureHandler_NodeNotFound(t *testing.T) { } } - s, fwk, err := initScheduler(ctx.Done(), schedulerCache, queue, client, informerFactory) + s, fwk, err := initScheduler(ctx, schedulerCache, queue, client, informerFactory) if err != nil { t.Fatal(err) } @@ -398,7 +400,7 @@ func TestFailureHandler_PodAlreadyBound(t *testing.T) { // Add node to schedulerCache no matter it's deleted in API server or not. schedulerCache.AddNode(&nodeFoo) - s, fwk, err := initScheduler(ctx.Done(), schedulerCache, queue, client, informerFactory) + s, fwk, err := initScheduler(ctx, schedulerCache, queue, client, informerFactory) if err != nil { t.Fatal(err) } @@ -436,14 +438,15 @@ func TestWithPercentageOfNodesToScore(t *testing.T) { client := fake.NewSimpleClientset() informerFactory := informers.NewSharedInformerFactory(client, 0) eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1()}) - stopCh := make(chan struct{}) - defer close(stopCh) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + defer cancel() sched, err := New( + ctx, client, informerFactory, nil, profile.NewRecorderFactory(eventBroadcaster), - stopCh, WithPercentageOfNodesToScore(tt.percentageOfNodesToScoreConfig), ) if err != nil { @@ -483,16 +486,16 @@ func getPodFromPriorityQueue(queue *internalqueue.PriorityQueue, pod *v1.Pod) *v return nil } -func initScheduler(stop <-chan struct{}, cache internalcache.Cache, queue internalqueue.SchedulingQueue, +func initScheduler(ctx context.Context, cache internalcache.Cache, queue internalqueue.SchedulingQueue, client kubernetes.Interface, informerFactory informers.SharedInformerFactory) (*Scheduler, framework.Framework, error) { registerPluginFuncs := []st.RegisterPluginFunc{ st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), } eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1()}) - fwk, err := st.NewFramework(registerPluginFuncs, + fwk, err := st.NewFramework(ctx, + registerPluginFuncs, testSchedulerName, - stop, frameworkruntime.WithClientSet(client), frameworkruntime.WithInformerFactory(informerFactory), frameworkruntime.WithEventRecorder(eventBroadcaster.NewRecorder(scheme.Scheme, testSchedulerName)), @@ -504,7 +507,7 @@ func initScheduler(stop <-chan struct{}, cache internalcache.Cache, queue intern s := &Scheduler{ Cache: cache, client: client, - StopEverything: stop, + StopEverything: ctx.Done(), SchedulingQueue: queue, Profiles: profile.Map{testSchedulerName: fwk}, } @@ -591,9 +594,10 @@ func TestInitPluginsWithIndexers(t *testing.T) { st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), ) - stopCh := make(chan struct{}) - defer close(stopCh) - _, err := st.NewFramework(registerPluginFuncs, "test", stopCh, frameworkruntime.WithInformerFactory(fakeInformerFactory)) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + _, err := st.NewFramework(ctx, registerPluginFuncs, "test", frameworkruntime.WithInformerFactory(fakeInformerFactory)) if len(tt.wantErr) > 0 { if err == nil || !strings.Contains(err.Error(), tt.wantErr) { diff --git a/pkg/scheduler/testing/framework_helpers.go b/pkg/scheduler/testing/framework_helpers.go index 48c3a6e75fd..a3097702885 100644 --- a/pkg/scheduler/testing/framework_helpers.go +++ b/pkg/scheduler/testing/framework_helpers.go @@ -17,6 +17,8 @@ limitations under the License. package testing import ( + "context" + "k8s.io/apimachinery/pkg/runtime/schema" kubeschedulerconfigv1 "k8s.io/kube-scheduler/config/v1" schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config" @@ -28,7 +30,7 @@ import ( var configDecoder = scheme.Codecs.UniversalDecoder() // NewFramework creates a Framework from the register functions and options. -func NewFramework(fns []RegisterPluginFunc, profileName string, stopCh <-chan struct{}, opts ...runtime.Option) (framework.Framework, error) { +func NewFramework(ctx context.Context, fns []RegisterPluginFunc, profileName string, opts ...runtime.Option) (framework.Framework, error) { registry := runtime.Registry{} profile := &schedulerapi.KubeSchedulerProfile{ SchedulerName: profileName, @@ -37,7 +39,7 @@ func NewFramework(fns []RegisterPluginFunc, profileName string, stopCh <-chan st for _, f := range fns { f(®istry, profile) } - return runtime.NewFramework(registry, profile, stopCh, opts...) + return runtime.NewFramework(ctx, registry, profile, opts...) } // RegisterPluginFunc is a function signature used in method RegisterFilterPlugin() diff --git a/test/integration/daemonset/daemonset_test.go b/test/integration/daemonset/daemonset_test.go index bfd2e43f010..566495040a7 100644 --- a/test/integration/daemonset/daemonset_test.go +++ b/test/integration/daemonset/daemonset_test.go @@ -98,11 +98,11 @@ func setupWithServerSetup(t *testing.T, serverSetup framework.TestServerSetup) ( }) sched, err := scheduler.New( + ctx, clientSet, informers, nil, profile.NewRecorderFactory(eventBroadcaster), - ctx.Done(), ) if err != nil { t.Fatalf("Couldn't create scheduler: %v", err) diff --git a/test/integration/util/util.go b/test/integration/util/util.go index b0d188b04de..359459e6846 100644 --- a/test/integration/util/util.go +++ b/test/integration/util/util.go @@ -81,11 +81,11 @@ func StartScheduler(ctx context.Context, clientSet clientset.Interface, kubeConf evtBroadcaster.StartRecordingToSink(ctx.Done()) sched, err := scheduler.New( + ctx, clientSet, informerFactory, nil, profile.NewRecorderFactory(evtBroadcaster), - ctx.Done(), scheduler.WithKubeConfig(kubeConfig), scheduler.WithProfiles(cfg.Profiles...), scheduler.WithPercentageOfNodesToScore(cfg.PercentageOfNodesToScore), @@ -446,11 +446,11 @@ func InitTestSchedulerWithOptions( opts = append(opts, scheduler.WithKubeConfig(testCtx.KubeConfig)) testCtx.Scheduler, err = scheduler.New( + ctx, testCtx.ClientSet, testCtx.InformerFactory, testCtx.DynInformerFactory, profile.NewRecorderFactory(eventBroadcaster), - ctx.Done(), opts..., )