diff --git a/pkg/scheduler/framework/plugins/defaultbinder/default_binder_test.go b/pkg/scheduler/framework/plugins/defaultbinder/default_binder_test.go index 44d7b83d19f..f09a9d0cd84 100644 --- a/pkg/scheduler/framework/plugins/defaultbinder/default_binder_test.go +++ b/pkg/scheduler/framework/plugins/defaultbinder/default_binder_test.go @@ -25,6 +25,7 @@ import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes/fake" clienttesting "k8s.io/client-go/testing" frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime" @@ -65,7 +66,7 @@ func TestDefaultBinder(t *testing.T) { return true, gotBinding, nil }) - fh, err := frameworkruntime.NewFramework(nil, nil, frameworkruntime.WithClientSet(client)) + fh, err := frameworkruntime.NewFramework(nil, nil, wait.NeverStop, frameworkruntime.WithClientSet(client)) if err != nil { t.Fatal(err) } diff --git a/pkg/scheduler/framework/plugins/imagelocality/image_locality_test.go b/pkg/scheduler/framework/plugins/imagelocality/image_locality_test.go index 72e5feae55b..8995f21f982 100644 --- a/pkg/scheduler/framework/plugins/imagelocality/image_locality_test.go +++ b/pkg/scheduler/framework/plugins/imagelocality/image_locality_test.go @@ -25,6 +25,7 @@ import ( "github.com/google/go-cmp/cmp" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework/runtime" "k8s.io/kubernetes/pkg/scheduler/internal/cache" @@ -334,7 +335,7 @@ func TestImageLocalityPriority(t *testing.T) { snapshot := cache.NewSnapshot(nil, test.nodes) state := framework.NewCycleState() - fh, _ := runtime.NewFramework(nil, nil, runtime.WithSnapshotSharedLister(snapshot)) + fh, _ := runtime.NewFramework(nil, nil, wait.NeverStop, runtime.WithSnapshotSharedLister(snapshot)) p, _ := New(nil, fh) var gotList framework.NodeScoreList diff --git a/pkg/scheduler/framework/plugins/nodeaffinity/node_affinity_test.go b/pkg/scheduler/framework/plugins/nodeaffinity/node_affinity_test.go index e2ab85bcab1..37e66275f85 100644 --- a/pkg/scheduler/framework/plugins/nodeaffinity/node_affinity_test.go +++ b/pkg/scheduler/framework/plugins/nodeaffinity/node_affinity_test.go @@ -24,6 +24,7 @@ import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework/runtime" @@ -1133,7 +1134,7 @@ func TestNodeAffinityPriority(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { state := framework.NewCycleState() - fh, _ := runtime.NewFramework(nil, nil, runtime.WithSnapshotSharedLister(cache.NewSnapshot(nil, test.nodes))) + fh, _ := runtime.NewFramework(nil, nil, wait.NeverStop, runtime.WithSnapshotSharedLister(cache.NewSnapshot(nil, test.nodes))) p, err := New(&test.args, fh) if err != nil { t.Fatalf("Creating plugin: %v", err) diff --git a/pkg/scheduler/framework/plugins/noderesources/balanced_allocation_test.go b/pkg/scheduler/framework/plugins/noderesources/balanced_allocation_test.go index 4376f8025de..694fae89355 100644 --- a/pkg/scheduler/framework/plugins/noderesources/balanced_allocation_test.go +++ b/pkg/scheduler/framework/plugins/noderesources/balanced_allocation_test.go @@ -24,6 +24,7 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature" @@ -350,7 +351,7 @@ func TestNodeResourcesBalancedAllocation(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { snapshot := cache.NewSnapshot(test.pods, test.nodes) - fh, _ := runtime.NewFramework(nil, nil, runtime.WithSnapshotSharedLister(snapshot)) + fh, _ := runtime.NewFramework(nil, nil, wait.NeverStop, runtime.WithSnapshotSharedLister(snapshot)) p, _ := NewBalancedAllocation(&test.args, fh, feature.Features{}) for i := range test.nodes { hostResult, err := p.(framework.ScorePlugin).Score(context.Background(), nil, test.pod, test.nodes[i].Name) diff --git a/pkg/scheduler/framework/plugins/noderesources/fit_test.go b/pkg/scheduler/framework/plugins/noderesources/fit_test.go index 1d5360285ad..e2d6f15fdc3 100644 --- a/pkg/scheduler/framework/plugins/noderesources/fit_test.go +++ b/pkg/scheduler/framework/plugins/noderesources/fit_test.go @@ -24,6 +24,7 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apiserver/pkg/util/feature" "k8s.io/component-base/featuregate" featuregatetesting "k8s.io/component-base/featuregate/testing" @@ -757,7 +758,7 @@ func TestFitScore(t *testing.T) { t.Run(test.name, func(t *testing.T) { state := framework.NewCycleState() snapshot := cache.NewSnapshot(test.existingPods, test.nodes) - fh, _ := runtime.NewFramework(nil, nil, runtime.WithSnapshotSharedLister(snapshot)) + fh, _ := runtime.NewFramework(nil, nil, wait.NeverStop, runtime.WithSnapshotSharedLister(snapshot)) args := test.nodeResourcesFitArgs p, err := NewFit(&args, fh, plfeature.Features{}) if err != nil { diff --git a/pkg/scheduler/framework/plugins/noderesources/least_allocated_test.go b/pkg/scheduler/framework/plugins/noderesources/least_allocated_test.go index 5f8f3097d50..28443f392a6 100644 --- a/pkg/scheduler/framework/plugins/noderesources/least_allocated_test.go +++ b/pkg/scheduler/framework/plugins/noderesources/least_allocated_test.go @@ -23,6 +23,7 @@ import ( "github.com/google/go-cmp/cmp" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/validation/field" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/framework" plfeature "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature" @@ -376,7 +377,7 @@ func TestLeastAllocatedScoringStrategy(t *testing.T) { t.Run(test.name, func(t *testing.T) { state := framework.NewCycleState() snapshot := cache.NewSnapshot(test.existingPods, test.nodes) - fh, _ := runtime.NewFramework(nil, nil, runtime.WithSnapshotSharedLister(snapshot)) + fh, _ := runtime.NewFramework(nil, nil, wait.NeverStop, runtime.WithSnapshotSharedLister(snapshot)) p, err := NewFit( &config.NodeResourcesFitArgs{ diff --git a/pkg/scheduler/framework/plugins/noderesources/most_allocated_test.go b/pkg/scheduler/framework/plugins/noderesources/most_allocated_test.go index 5226ec15342..7f7ace6105a 100644 --- a/pkg/scheduler/framework/plugins/noderesources/most_allocated_test.go +++ b/pkg/scheduler/framework/plugins/noderesources/most_allocated_test.go @@ -23,6 +23,7 @@ import ( "github.com/google/go-cmp/cmp" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/validation/field" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/framework" plfeature "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature" @@ -332,7 +333,7 @@ func TestMostAllocatedScoringStrategy(t *testing.T) { t.Run(test.name, func(t *testing.T) { state := framework.NewCycleState() snapshot := cache.NewSnapshot(test.existingPods, test.nodes) - fh, _ := runtime.NewFramework(nil, nil, runtime.WithSnapshotSharedLister(snapshot)) + fh, _ := runtime.NewFramework(nil, nil, wait.NeverStop, runtime.WithSnapshotSharedLister(snapshot)) p, err := NewFit( &config.NodeResourcesFitArgs{ diff --git a/pkg/scheduler/framework/plugins/noderesources/requested_to_capacity_ratio_test.go b/pkg/scheduler/framework/plugins/noderesources/requested_to_capacity_ratio_test.go index f99c6b7247b..a487fc57adc 100644 --- a/pkg/scheduler/framework/plugins/noderesources/requested_to_capacity_ratio_test.go +++ b/pkg/scheduler/framework/plugins/noderesources/requested_to_capacity_ratio_test.go @@ -25,6 +25,7 @@ import ( "github.com/stretchr/testify/assert" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/validation/field" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature" @@ -110,7 +111,7 @@ func TestRequestedToCapacityRatioScoringStrategy(t *testing.T) { t.Run(test.name, func(t *testing.T) { state := framework.NewCycleState() snapshot := cache.NewSnapshot(test.existingPods, test.nodes) - fh, _ := runtime.NewFramework(nil, nil, runtime.WithSnapshotSharedLister(snapshot)) + fh, _ := runtime.NewFramework(nil, nil, wait.NeverStop, runtime.WithSnapshotSharedLister(snapshot)) p, err := NewFit(&config.NodeResourcesFitArgs{ ScoringStrategy: &config.ScoringStrategy{ @@ -301,7 +302,7 @@ func TestResourceBinPackingSingleExtended(t *testing.T) { t.Run(test.name, func(t *testing.T) { state := framework.NewCycleState() snapshot := cache.NewSnapshot(test.pods, test.nodes) - fh, _ := runtime.NewFramework(nil, nil, runtime.WithSnapshotSharedLister(snapshot)) + fh, _ := runtime.NewFramework(nil, nil, wait.NeverStop, runtime.WithSnapshotSharedLister(snapshot)) args := config.NodeResourcesFitArgs{ ScoringStrategy: &config.ScoringStrategy{ Type: config.RequestedToCapacityRatio, @@ -521,7 +522,7 @@ func TestResourceBinPackingMultipleExtended(t *testing.T) { t.Run(test.name, func(t *testing.T) { state := framework.NewCycleState() snapshot := cache.NewSnapshot(test.pods, test.nodes) - fh, _ := runtime.NewFramework(nil, nil, runtime.WithSnapshotSharedLister(snapshot)) + fh, _ := runtime.NewFramework(nil, nil, wait.NeverStop, runtime.WithSnapshotSharedLister(snapshot)) args := config.NodeResourcesFitArgs{ ScoringStrategy: &config.ScoringStrategy{ diff --git a/pkg/scheduler/framework/plugins/podtopologyspread/scoring_test.go b/pkg/scheduler/framework/plugins/podtopologyspread/scoring_test.go index b9089957a51..97e3e9e3a09 100644 --- a/pkg/scheduler/framework/plugins/podtopologyspread/scoring_test.go +++ b/pkg/scheduler/framework/plugins/podtopologyspread/scoring_test.go @@ -27,6 +27,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/fake" "k8s.io/kubernetes/pkg/scheduler/apis/config" @@ -497,7 +498,7 @@ func TestPreScoreStateEmptyNodes(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() informerFactory := informers.NewSharedInformerFactory(fake.NewSimpleClientset(tt.objs...), 0) - f, err := frameworkruntime.NewFramework(nil, nil, + f, err := frameworkruntime.NewFramework(nil, nil, wait.NeverStop, frameworkruntime.WithSnapshotSharedLister(cache.NewSnapshot(nil, tt.nodes)), frameworkruntime.WithInformerFactory(informerFactory)) if err != nil { @@ -1299,7 +1300,7 @@ func BenchmarkTestDefaultEvenPodsSpreadPriority(b *testing.B) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() informerFactory := informers.NewSharedInformerFactory(client, 0) - f, err := frameworkruntime.NewFramework(nil, nil, + f, err := frameworkruntime.NewFramework(nil, nil, wait.NeverStop, frameworkruntime.WithSnapshotSharedLister(snapshot), frameworkruntime.WithInformerFactory(informerFactory)) if err != nil { diff --git a/pkg/scheduler/framework/plugins/selectorspread/selector_spread_perf_test.go b/pkg/scheduler/framework/plugins/selectorspread/selector_spread_perf_test.go index 6c2c7a0962c..509b11e54a4 100644 --- a/pkg/scheduler/framework/plugins/selectorspread/selector_spread_perf_test.go +++ b/pkg/scheduler/framework/plugins/selectorspread/selector_spread_perf_test.go @@ -21,6 +21,7 @@ import ( "testing" v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/fake" "k8s.io/kubernetes/pkg/scheduler/framework" @@ -69,7 +70,7 @@ func BenchmarkTestSelectorSpreadPriority(b *testing.B) { b.Errorf("error waiting for informer cache sync") } } - fh, _ := runtime.NewFramework(nil, nil, runtime.WithSnapshotSharedLister(snapshot), runtime.WithInformerFactory(informerFactory)) + fh, _ := runtime.NewFramework(nil, nil, wait.NeverStop, runtime.WithSnapshotSharedLister(snapshot), runtime.WithInformerFactory(informerFactory)) pl, err := New(nil, fh) if err != nil { b.Fatal(err) diff --git a/pkg/scheduler/framework/plugins/selectorspread/selector_spread_test.go b/pkg/scheduler/framework/plugins/selectorspread/selector_spread_test.go index 12d425391bc..3572197ad73 100644 --- a/pkg/scheduler/framework/plugins/selectorspread/selector_spread_test.go +++ b/pkg/scheduler/framework/plugins/selectorspread/selector_spread_test.go @@ -28,6 +28,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/informers" clientsetfake "k8s.io/client-go/kubernetes/fake" "k8s.io/kubernetes/pkg/scheduler/framework" @@ -402,7 +403,7 @@ func TestSelectorSpreadScore(t *testing.T) { if err != nil { t.Errorf("error creating informerFactory: %+v", err) } - fh, err := frameworkruntime.NewFramework(nil, nil, frameworkruntime.WithSnapshotSharedLister(snapshot), frameworkruntime.WithInformerFactory(informerFactory)) + fh, err := frameworkruntime.NewFramework(nil, nil, wait.NeverStop, frameworkruntime.WithSnapshotSharedLister(snapshot), frameworkruntime.WithInformerFactory(informerFactory)) if err != nil { t.Errorf("error creating new framework handle: %+v", err) } @@ -660,7 +661,7 @@ func TestZoneSelectorSpreadPriority(t *testing.T) { if err != nil { t.Errorf("error creating informerFactory: %+v", err) } - fh, err := frameworkruntime.NewFramework(nil, nil, frameworkruntime.WithSnapshotSharedLister(snapshot), frameworkruntime.WithInformerFactory(informerFactory)) + fh, err := frameworkruntime.NewFramework(nil, nil, wait.NeverStop, frameworkruntime.WithSnapshotSharedLister(snapshot), frameworkruntime.WithInformerFactory(informerFactory)) if err != nil { t.Errorf("error creating new framework handle: %+v", err) } diff --git a/pkg/scheduler/framework/plugins/tainttoleration/taint_toleration_test.go b/pkg/scheduler/framework/plugins/tainttoleration/taint_toleration_test.go index 817df65f9b0..2e5e6d325f4 100644 --- a/pkg/scheduler/framework/plugins/tainttoleration/taint_toleration_test.go +++ b/pkg/scheduler/framework/plugins/tainttoleration/taint_toleration_test.go @@ -23,6 +23,7 @@ import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework/runtime" "k8s.io/kubernetes/pkg/scheduler/internal/cache" @@ -230,7 +231,7 @@ func TestTaintTolerationScore(t *testing.T) { t.Run(test.name, func(t *testing.T) { state := framework.NewCycleState() snapshot := cache.NewSnapshot(nil, test.nodes) - fh, _ := runtime.NewFramework(nil, nil, runtime.WithSnapshotSharedLister(snapshot)) + fh, _ := runtime.NewFramework(nil, nil, wait.NeverStop, runtime.WithSnapshotSharedLister(snapshot)) p, _ := New(nil, fh) status := p.(framework.PreScorePlugin).PreScore(context.Background(), state, test.pod, test.nodes) diff --git a/pkg/scheduler/framework/plugins/testing/testing.go b/pkg/scheduler/framework/plugins/testing/testing.go index e42c713915c..70021bb36b3 100644 --- a/pkg/scheduler/framework/plugins/testing/testing.go +++ b/pkg/scheduler/framework/plugins/testing/testing.go @@ -23,6 +23,7 @@ import ( "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/fake" "k8s.io/kubernetes/pkg/scheduler/framework" @@ -43,7 +44,7 @@ func SetupPluginWithInformers( ) framework.Plugin { objs = append([]runtime.Object{&v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: ""}}}, objs...) informerFactory := informers.NewSharedInformerFactory(fake.NewSimpleClientset(objs...), 0) - fh, err := frameworkruntime.NewFramework(nil, nil, + fh, err := frameworkruntime.NewFramework(nil, nil, wait.NeverStop, frameworkruntime.WithSnapshotSharedLister(sharedLister), frameworkruntime.WithInformerFactory(informerFactory)) if err != nil { @@ -66,7 +67,7 @@ func SetupPlugin( config runtime.Object, sharedLister framework.SharedLister, ) framework.Plugin { - fh, err := frameworkruntime.NewFramework(nil, nil, + fh, err := frameworkruntime.NewFramework(nil, nil, wait.NeverStop, frameworkruntime.WithSnapshotSharedLister(sharedLister)) if err != nil { tb.Fatalf("Failed creating framework runtime: %v", err) diff --git a/pkg/scheduler/framework/plugins/volumebinding/volume_binding_test.go b/pkg/scheduler/framework/plugins/volumebinding/volume_binding_test.go index 8c5c6fe8c05..89ca9614afa 100644 --- a/pkg/scheduler/framework/plugins/volumebinding/volume_binding_test.go +++ b/pkg/scheduler/framework/plugins/volumebinding/volume_binding_test.go @@ -27,6 +27,7 @@ import ( storagev1 "k8s.io/api/storage/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/fake" "k8s.io/kubernetes/pkg/scheduler/apis/config" @@ -600,7 +601,7 @@ func TestVolumeBinding(t *testing.T) { runtime.WithClientSet(client), runtime.WithInformerFactory(informerFactory), } - fh, err := runtime.NewFramework(nil, nil, opts...) + fh, err := runtime.NewFramework(nil, nil, wait.NeverStop, opts...) if err != nil { t.Fatal(err) } diff --git a/pkg/scheduler/framework/runtime/framework.go b/pkg/scheduler/framework/runtime/framework.go index 3fa96c2530b..b819e308584 100644 --- a/pkg/scheduler/framework/runtime/framework.go +++ b/pkg/scheduler/framework/runtime/framework.go @@ -227,9 +227,9 @@ func WithCaptureProfile(c CaptureProfile) Option { } } -func defaultFrameworkOptions() frameworkOptions { +func defaultFrameworkOptions(stopCh <-chan struct{}) frameworkOptions { return frameworkOptions{ - metricsRecorder: newMetricsRecorder(1000, time.Second), + metricsRecorder: newMetricsRecorder(1000, time.Second, stopCh), clusterEventMap: make(map[framework.ClusterEvent]sets.String), parallelizer: parallelize.NewParallelizer(parallelize.DefaultParallelism), } @@ -245,8 +245,8 @@ func WithClusterEventMap(m map[framework.ClusterEvent]sets.String) Option { var _ framework.Framework = &frameworkImpl{} // NewFramework initializes plugins given the configuration and the registry. -func NewFramework(r Registry, profile *config.KubeSchedulerProfile, opts ...Option) (framework.Framework, error) { - options := defaultFrameworkOptions() +func NewFramework(r Registry, profile *config.KubeSchedulerProfile, stopCh <-chan struct{}, opts ...Option) (framework.Framework, error) { + options := defaultFrameworkOptions(stopCh) for _, opt := range opts { opt(&options) } diff --git a/pkg/scheduler/framework/runtime/framework_test.go b/pkg/scheduler/framework/runtime/framework_test.go index 0d3ab4c8a39..a26afe96af1 100644 --- a/pkg/scheduler/framework/runtime/framework_test.go +++ b/pkg/scheduler/framework/runtime/framework_test.go @@ -31,6 +31,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/component-base/metrics/testutil" "k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/framework" @@ -391,7 +392,7 @@ func newFrameworkWithQueueSortAndBind(r Registry, profile config.KubeSchedulerPr if len(profile.Plugins.Bind.Enabled) == 0 { profile.Plugins.Bind.Enabled = append(profile.Plugins.Bind.Enabled, config.Plugin{Name: bindPlugin}) } - return NewFramework(r, &profile, opts...) + return NewFramework(r, &profile, wait.NeverStop, opts...) } func TestInitFrameworkWithScorePlugins(t *testing.T) { @@ -488,7 +489,7 @@ func TestNewFrameworkErrors(t *testing.T) { Plugins: tc.plugins, PluginConfig: tc.pluginCfg, } - _, err := NewFramework(registry, profile) + _, err := NewFramework(registry, profile, wait.NeverStop) if err == nil || !strings.Contains(err.Error(), tc.wantErr) { t.Errorf("Unexpected error, got %v, expect: %s", err, tc.wantErr) } @@ -796,7 +797,7 @@ func TestNewFrameworkMultiPointExpansion(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - fw, err := NewFramework(registry, &config.KubeSchedulerProfile{Plugins: tc.plugins}) + fw, err := NewFramework(registry, &config.KubeSchedulerProfile{Plugins: tc.plugins}, wait.NeverStop) if err != nil { if tc.wantErr == "" || !strings.Contains(err.Error(), tc.wantErr) { t.Fatalf("Unexpected error, got %v, expect: %s", err, tc.wantErr) @@ -2228,7 +2229,9 @@ func TestRecordingMetrics(t *testing.T) { Bind: pluginSet, PostBind: pluginSet, } - recorder := newMetricsRecorder(100, time.Nanosecond) + + stopCh := make(chan struct{}) + recorder := newMetricsRecorder(100, time.Nanosecond, stopCh) profile := config.KubeSchedulerProfile{ SchedulerName: testProfileName, Plugins: plugins, @@ -2241,7 +2244,7 @@ func TestRecordingMetrics(t *testing.T) { tt.action(f) // Stop the goroutine which records metrics and ensure it's stopped. - close(recorder.stopCh) + close(stopCh) <-recorder.isStoppedCh // Try to clean up the metrics buffer again in case it's not empty. recorder.flushMetrics() @@ -2337,7 +2340,8 @@ func TestRunBindPlugins(t *testing.T) { pluginSet.Enabled = append(pluginSet.Enabled, config.Plugin{Name: name}) } plugins := &config.Plugins{Bind: pluginSet} - recorder := newMetricsRecorder(100, time.Nanosecond) + stopCh := make(chan struct{}) + recorder := newMetricsRecorder(100, time.Nanosecond, stopCh) profile := config.KubeSchedulerProfile{ SchedulerName: testProfileName, Plugins: plugins, @@ -2353,7 +2357,7 @@ func TestRunBindPlugins(t *testing.T) { } // Stop the goroutine which records metrics and ensure it's stopped. - close(recorder.stopCh) + close(stopCh) <-recorder.isStoppedCh // Try to clean up the metrics buffer again in case it's not empty. recorder.flushMetrics() diff --git a/pkg/scheduler/framework/runtime/metrics_recorder.go b/pkg/scheduler/framework/runtime/metrics_recorder.go index ef98c336eba..1204f2e9ac3 100644 --- a/pkg/scheduler/framework/runtime/metrics_recorder.go +++ b/pkg/scheduler/framework/runtime/metrics_recorder.go @@ -43,18 +43,18 @@ type metricsRecorder struct { // stopCh is used to stop the goroutine which periodically flushes metrics. It's currently only // used in tests. - stopCh chan struct{} + stopCh <-chan struct{} // isStoppedCh indicates whether the goroutine is stopped. It's used in tests only to make sure // the metric flushing goroutine is stopped so that tests can collect metrics for verification. isStoppedCh chan struct{} } -func newMetricsRecorder(bufferSize int, interval time.Duration) *metricsRecorder { +func newMetricsRecorder(bufferSize int, interval time.Duration, stopCh <-chan struct{}) *metricsRecorder { recorder := &metricsRecorder{ bufferCh: make(chan *frameworkMetric, bufferSize), bufferSize: bufferSize, interval: interval, - stopCh: make(chan struct{}), + stopCh: stopCh, isStoppedCh: make(chan struct{}), } go recorder.run() diff --git a/pkg/scheduler/profile/profile.go b/pkg/scheduler/profile/profile.go index 1457e6b0bf4..64857475402 100644 --- a/pkg/scheduler/profile/profile.go +++ b/pkg/scheduler/profile/profile.go @@ -35,10 +35,10 @@ type RecorderFactory func(string) events.EventRecorder // newProfile builds a Profile for the given configuration. func newProfile(cfg config.KubeSchedulerProfile, r frameworkruntime.Registry, recorderFact RecorderFactory, - opts ...frameworkruntime.Option) (framework.Framework, error) { + stopCh <-chan struct{}, opts ...frameworkruntime.Option) (framework.Framework, error) { recorder := recorderFact(cfg.SchedulerName) opts = append(opts, frameworkruntime.WithEventRecorder(recorder)) - return frameworkruntime.NewFramework(r, &cfg, opts...) + return frameworkruntime.NewFramework(r, &cfg, stopCh, opts...) } // Map holds frameworks indexed by scheduler name. @@ -46,12 +46,12 @@ type Map map[string]framework.Framework // NewMap builds the frameworks given by the configuration, indexed by name. func NewMap(cfgs []config.KubeSchedulerProfile, r frameworkruntime.Registry, recorderFact RecorderFactory, - opts ...frameworkruntime.Option) (Map, error) { + stopCh <-chan struct{}, opts ...frameworkruntime.Option) (Map, error) { m := make(Map) v := cfgValidator{m: m} for _, cfg := range cfgs { - p, err := newProfile(cfg, r, recorderFact, opts...) + p, err := newProfile(cfg, r, recorderFact, stopCh, opts...) if err != nil { return nil, fmt.Errorf("creating profile for scheduler name %s: %v", cfg.SchedulerName, err) } diff --git a/pkg/scheduler/profile/profile_test.go b/pkg/scheduler/profile/profile_test.go index 6716ec0a288..018d42b29dd 100644 --- a/pkg/scheduler/profile/profile_test.go +++ b/pkg/scheduler/profile/profile_test.go @@ -246,7 +246,9 @@ func TestNewMap(t *testing.T) { } for _, tc := range cases { t.Run(tc.name, func(t *testing.T) { - m, err := NewMap(tc.cfgs, fakeRegistry, nilRecorderFactory) + stopCh := make(chan struct{}) + defer close(stopCh) + m, err := NewMap(tc.cfgs, fakeRegistry, nilRecorderFactory, stopCh) if err := checkErr(err, tc.wantErr); err != nil { t.Fatal(err) } diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 0559f393bdc..8d9cc2c2763 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -278,7 +278,7 @@ func New(client clientset.Interface, snapshot := internalcache.NewEmptySnapshot() clusterEventMap := make(map[framework.ClusterEvent]sets.String) - profiles, err := profile.NewMap(options.profiles, registry, recorderFactory, + profiles, err := profile.NewMap(options.profiles, registry, recorderFactory, stopCh, frameworkruntime.WithComponentConfigVersion(options.componentConfigVersion), frameworkruntime.WithClientSet(client), frameworkruntime.WithKubeConfig(options.kubeConfig), diff --git a/pkg/scheduler/testing/framework_helpers.go b/pkg/scheduler/testing/framework_helpers.go index 904e7f2e173..66a6eb56167 100644 --- a/pkg/scheduler/testing/framework_helpers.go +++ b/pkg/scheduler/testing/framework_helpers.go @@ -18,6 +18,7 @@ package testing import ( "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/kube-scheduler/config/v1beta2" schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/apis/config/scheme" @@ -37,7 +38,7 @@ func NewFramework(fns []RegisterPluginFunc, profileName string, opts ...runtime. for _, f := range fns { f(®istry, profile) } - return runtime.NewFramework(registry, profile, opts...) + return runtime.NewFramework(registry, profile, wait.NeverStop, opts...) } // RegisterPluginFunc is a function signature used in method RegisterFilterPlugin()