mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-20 18:31:15 +00:00
Merge pull request #110148 from wojtek-t/metrics_recorder_shutdown
Clear shutdown of scheduler metrics recorder
This commit is contained in:
commit
aa49dffc7f
@ -25,6 +25,7 @@ import (
|
||||
v1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/client-go/kubernetes/fake"
|
||||
clienttesting "k8s.io/client-go/testing"
|
||||
frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime"
|
||||
@ -65,7 +66,7 @@ func TestDefaultBinder(t *testing.T) {
|
||||
return true, gotBinding, nil
|
||||
})
|
||||
|
||||
fh, err := frameworkruntime.NewFramework(nil, nil, frameworkruntime.WithClientSet(client))
|
||||
fh, err := frameworkruntime.NewFramework(nil, nil, wait.NeverStop, frameworkruntime.WithClientSet(client))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
@ -25,6 +25,7 @@ import (
|
||||
"github.com/google/go-cmp/cmp"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework/runtime"
|
||||
"k8s.io/kubernetes/pkg/scheduler/internal/cache"
|
||||
@ -334,7 +335,7 @@ func TestImageLocalityPriority(t *testing.T) {
|
||||
snapshot := cache.NewSnapshot(nil, test.nodes)
|
||||
|
||||
state := framework.NewCycleState()
|
||||
fh, _ := runtime.NewFramework(nil, nil, runtime.WithSnapshotSharedLister(snapshot))
|
||||
fh, _ := runtime.NewFramework(nil, nil, wait.NeverStop, runtime.WithSnapshotSharedLister(snapshot))
|
||||
|
||||
p, _ := New(nil, fh)
|
||||
var gotList framework.NodeScoreList
|
||||
|
@ -24,6 +24,7 @@ import (
|
||||
v1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/kubernetes/pkg/scheduler/apis/config"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework/runtime"
|
||||
@ -1133,7 +1134,7 @@ func TestNodeAffinityPriority(t *testing.T) {
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
state := framework.NewCycleState()
|
||||
fh, _ := runtime.NewFramework(nil, nil, runtime.WithSnapshotSharedLister(cache.NewSnapshot(nil, test.nodes)))
|
||||
fh, _ := runtime.NewFramework(nil, nil, wait.NeverStop, runtime.WithSnapshotSharedLister(cache.NewSnapshot(nil, test.nodes)))
|
||||
p, err := New(&test.args, fh)
|
||||
if err != nil {
|
||||
t.Fatalf("Creating plugin: %v", err)
|
||||
|
@ -24,6 +24,7 @@ import (
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/kubernetes/pkg/scheduler/apis/config"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
|
||||
@ -350,7 +351,7 @@ func TestNodeResourcesBalancedAllocation(t *testing.T) {
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
snapshot := cache.NewSnapshot(test.pods, test.nodes)
|
||||
fh, _ := runtime.NewFramework(nil, nil, runtime.WithSnapshotSharedLister(snapshot))
|
||||
fh, _ := runtime.NewFramework(nil, nil, wait.NeverStop, runtime.WithSnapshotSharedLister(snapshot))
|
||||
p, _ := NewBalancedAllocation(&test.args, fh, feature.Features{})
|
||||
for i := range test.nodes {
|
||||
hostResult, err := p.(framework.ScorePlugin).Score(context.Background(), nil, test.pod, test.nodes[i].Name)
|
||||
|
@ -24,6 +24,7 @@ import (
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/apiserver/pkg/util/feature"
|
||||
"k8s.io/component-base/featuregate"
|
||||
featuregatetesting "k8s.io/component-base/featuregate/testing"
|
||||
@ -757,7 +758,7 @@ func TestFitScore(t *testing.T) {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
state := framework.NewCycleState()
|
||||
snapshot := cache.NewSnapshot(test.existingPods, test.nodes)
|
||||
fh, _ := runtime.NewFramework(nil, nil, runtime.WithSnapshotSharedLister(snapshot))
|
||||
fh, _ := runtime.NewFramework(nil, nil, wait.NeverStop, runtime.WithSnapshotSharedLister(snapshot))
|
||||
args := test.nodeResourcesFitArgs
|
||||
p, err := NewFit(&args, fh, plfeature.Features{})
|
||||
if err != nil {
|
||||
|
@ -23,6 +23,7 @@ import (
|
||||
"github.com/google/go-cmp/cmp"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/util/validation/field"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/kubernetes/pkg/scheduler/apis/config"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework"
|
||||
plfeature "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
|
||||
@ -376,7 +377,7 @@ func TestLeastAllocatedScoringStrategy(t *testing.T) {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
state := framework.NewCycleState()
|
||||
snapshot := cache.NewSnapshot(test.existingPods, test.nodes)
|
||||
fh, _ := runtime.NewFramework(nil, nil, runtime.WithSnapshotSharedLister(snapshot))
|
||||
fh, _ := runtime.NewFramework(nil, nil, wait.NeverStop, runtime.WithSnapshotSharedLister(snapshot))
|
||||
|
||||
p, err := NewFit(
|
||||
&config.NodeResourcesFitArgs{
|
||||
|
@ -23,6 +23,7 @@ import (
|
||||
"github.com/google/go-cmp/cmp"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/util/validation/field"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/kubernetes/pkg/scheduler/apis/config"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework"
|
||||
plfeature "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
|
||||
@ -332,7 +333,7 @@ func TestMostAllocatedScoringStrategy(t *testing.T) {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
state := framework.NewCycleState()
|
||||
snapshot := cache.NewSnapshot(test.existingPods, test.nodes)
|
||||
fh, _ := runtime.NewFramework(nil, nil, runtime.WithSnapshotSharedLister(snapshot))
|
||||
fh, _ := runtime.NewFramework(nil, nil, wait.NeverStop, runtime.WithSnapshotSharedLister(snapshot))
|
||||
|
||||
p, err := NewFit(
|
||||
&config.NodeResourcesFitArgs{
|
||||
|
@ -25,6 +25,7 @@ import (
|
||||
"github.com/stretchr/testify/assert"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/util/validation/field"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/kubernetes/pkg/scheduler/apis/config"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
|
||||
@ -110,7 +111,7 @@ func TestRequestedToCapacityRatioScoringStrategy(t *testing.T) {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
state := framework.NewCycleState()
|
||||
snapshot := cache.NewSnapshot(test.existingPods, test.nodes)
|
||||
fh, _ := runtime.NewFramework(nil, nil, runtime.WithSnapshotSharedLister(snapshot))
|
||||
fh, _ := runtime.NewFramework(nil, nil, wait.NeverStop, runtime.WithSnapshotSharedLister(snapshot))
|
||||
|
||||
p, err := NewFit(&config.NodeResourcesFitArgs{
|
||||
ScoringStrategy: &config.ScoringStrategy{
|
||||
@ -301,7 +302,7 @@ func TestResourceBinPackingSingleExtended(t *testing.T) {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
state := framework.NewCycleState()
|
||||
snapshot := cache.NewSnapshot(test.pods, test.nodes)
|
||||
fh, _ := runtime.NewFramework(nil, nil, runtime.WithSnapshotSharedLister(snapshot))
|
||||
fh, _ := runtime.NewFramework(nil, nil, wait.NeverStop, runtime.WithSnapshotSharedLister(snapshot))
|
||||
args := config.NodeResourcesFitArgs{
|
||||
ScoringStrategy: &config.ScoringStrategy{
|
||||
Type: config.RequestedToCapacityRatio,
|
||||
@ -521,7 +522,7 @@ func TestResourceBinPackingMultipleExtended(t *testing.T) {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
state := framework.NewCycleState()
|
||||
snapshot := cache.NewSnapshot(test.pods, test.nodes)
|
||||
fh, _ := runtime.NewFramework(nil, nil, runtime.WithSnapshotSharedLister(snapshot))
|
||||
fh, _ := runtime.NewFramework(nil, nil, wait.NeverStop, runtime.WithSnapshotSharedLister(snapshot))
|
||||
|
||||
args := config.NodeResourcesFitArgs{
|
||||
ScoringStrategy: &config.ScoringStrategy{
|
||||
|
@ -27,6 +27,7 @@ import (
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/client-go/informers"
|
||||
"k8s.io/client-go/kubernetes/fake"
|
||||
"k8s.io/kubernetes/pkg/scheduler/apis/config"
|
||||
@ -497,7 +498,7 @@ func TestPreScoreStateEmptyNodes(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
informerFactory := informers.NewSharedInformerFactory(fake.NewSimpleClientset(tt.objs...), 0)
|
||||
f, err := frameworkruntime.NewFramework(nil, nil,
|
||||
f, err := frameworkruntime.NewFramework(nil, nil, wait.NeverStop,
|
||||
frameworkruntime.WithSnapshotSharedLister(cache.NewSnapshot(nil, tt.nodes)),
|
||||
frameworkruntime.WithInformerFactory(informerFactory))
|
||||
if err != nil {
|
||||
@ -1299,7 +1300,7 @@ func BenchmarkTestDefaultEvenPodsSpreadPriority(b *testing.B) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
informerFactory := informers.NewSharedInformerFactory(client, 0)
|
||||
f, err := frameworkruntime.NewFramework(nil, nil,
|
||||
f, err := frameworkruntime.NewFramework(nil, nil, wait.NeverStop,
|
||||
frameworkruntime.WithSnapshotSharedLister(snapshot),
|
||||
frameworkruntime.WithInformerFactory(informerFactory))
|
||||
if err != nil {
|
||||
|
@ -21,6 +21,7 @@ import (
|
||||
"testing"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/client-go/informers"
|
||||
"k8s.io/client-go/kubernetes/fake"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework"
|
||||
@ -69,7 +70,7 @@ func BenchmarkTestSelectorSpreadPriority(b *testing.B) {
|
||||
b.Errorf("error waiting for informer cache sync")
|
||||
}
|
||||
}
|
||||
fh, _ := runtime.NewFramework(nil, nil, runtime.WithSnapshotSharedLister(snapshot), runtime.WithInformerFactory(informerFactory))
|
||||
fh, _ := runtime.NewFramework(nil, nil, wait.NeverStop, runtime.WithSnapshotSharedLister(snapshot), runtime.WithInformerFactory(informerFactory))
|
||||
pl, err := New(nil, fh)
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
|
@ -28,6 +28,7 @@ import (
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/client-go/informers"
|
||||
clientsetfake "k8s.io/client-go/kubernetes/fake"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework"
|
||||
@ -402,7 +403,7 @@ func TestSelectorSpreadScore(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Errorf("error creating informerFactory: %+v", err)
|
||||
}
|
||||
fh, err := frameworkruntime.NewFramework(nil, nil, frameworkruntime.WithSnapshotSharedLister(snapshot), frameworkruntime.WithInformerFactory(informerFactory))
|
||||
fh, err := frameworkruntime.NewFramework(nil, nil, wait.NeverStop, frameworkruntime.WithSnapshotSharedLister(snapshot), frameworkruntime.WithInformerFactory(informerFactory))
|
||||
if err != nil {
|
||||
t.Errorf("error creating new framework handle: %+v", err)
|
||||
}
|
||||
@ -660,7 +661,7 @@ func TestZoneSelectorSpreadPriority(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Errorf("error creating informerFactory: %+v", err)
|
||||
}
|
||||
fh, err := frameworkruntime.NewFramework(nil, nil, frameworkruntime.WithSnapshotSharedLister(snapshot), frameworkruntime.WithInformerFactory(informerFactory))
|
||||
fh, err := frameworkruntime.NewFramework(nil, nil, wait.NeverStop, frameworkruntime.WithSnapshotSharedLister(snapshot), frameworkruntime.WithInformerFactory(informerFactory))
|
||||
if err != nil {
|
||||
t.Errorf("error creating new framework handle: %+v", err)
|
||||
}
|
||||
|
@ -23,6 +23,7 @@ import (
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework/runtime"
|
||||
"k8s.io/kubernetes/pkg/scheduler/internal/cache"
|
||||
@ -230,7 +231,7 @@ func TestTaintTolerationScore(t *testing.T) {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
state := framework.NewCycleState()
|
||||
snapshot := cache.NewSnapshot(nil, test.nodes)
|
||||
fh, _ := runtime.NewFramework(nil, nil, runtime.WithSnapshotSharedLister(snapshot))
|
||||
fh, _ := runtime.NewFramework(nil, nil, wait.NeverStop, runtime.WithSnapshotSharedLister(snapshot))
|
||||
|
||||
p, _ := New(nil, fh)
|
||||
status := p.(framework.PreScorePlugin).PreScore(context.Background(), state, test.pod, test.nodes)
|
||||
|
@ -23,6 +23,7 @@ import (
|
||||
"k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/client-go/informers"
|
||||
"k8s.io/client-go/kubernetes/fake"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework"
|
||||
@ -43,7 +44,7 @@ func SetupPluginWithInformers(
|
||||
) framework.Plugin {
|
||||
objs = append([]runtime.Object{&v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: ""}}}, objs...)
|
||||
informerFactory := informers.NewSharedInformerFactory(fake.NewSimpleClientset(objs...), 0)
|
||||
fh, err := frameworkruntime.NewFramework(nil, nil,
|
||||
fh, err := frameworkruntime.NewFramework(nil, nil, wait.NeverStop,
|
||||
frameworkruntime.WithSnapshotSharedLister(sharedLister),
|
||||
frameworkruntime.WithInformerFactory(informerFactory))
|
||||
if err != nil {
|
||||
@ -66,7 +67,7 @@ func SetupPlugin(
|
||||
config runtime.Object,
|
||||
sharedLister framework.SharedLister,
|
||||
) framework.Plugin {
|
||||
fh, err := frameworkruntime.NewFramework(nil, nil,
|
||||
fh, err := frameworkruntime.NewFramework(nil, nil, wait.NeverStop,
|
||||
frameworkruntime.WithSnapshotSharedLister(sharedLister))
|
||||
if err != nil {
|
||||
tb.Fatalf("Failed creating framework runtime: %v", err)
|
||||
|
@ -27,6 +27,7 @@ import (
|
||||
storagev1 "k8s.io/api/storage/v1"
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/client-go/informers"
|
||||
"k8s.io/client-go/kubernetes/fake"
|
||||
"k8s.io/kubernetes/pkg/scheduler/apis/config"
|
||||
@ -600,7 +601,7 @@ func TestVolumeBinding(t *testing.T) {
|
||||
runtime.WithClientSet(client),
|
||||
runtime.WithInformerFactory(informerFactory),
|
||||
}
|
||||
fh, err := runtime.NewFramework(nil, nil, opts...)
|
||||
fh, err := runtime.NewFramework(nil, nil, wait.NeverStop, opts...)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
@ -227,9 +227,9 @@ func WithCaptureProfile(c CaptureProfile) Option {
|
||||
}
|
||||
}
|
||||
|
||||
func defaultFrameworkOptions() frameworkOptions {
|
||||
func defaultFrameworkOptions(stopCh <-chan struct{}) frameworkOptions {
|
||||
return frameworkOptions{
|
||||
metricsRecorder: newMetricsRecorder(1000, time.Second),
|
||||
metricsRecorder: newMetricsRecorder(1000, time.Second, stopCh),
|
||||
clusterEventMap: make(map[framework.ClusterEvent]sets.String),
|
||||
parallelizer: parallelize.NewParallelizer(parallelize.DefaultParallelism),
|
||||
}
|
||||
@ -245,8 +245,8 @@ func WithClusterEventMap(m map[framework.ClusterEvent]sets.String) Option {
|
||||
var _ framework.Framework = &frameworkImpl{}
|
||||
|
||||
// NewFramework initializes plugins given the configuration and the registry.
|
||||
func NewFramework(r Registry, profile *config.KubeSchedulerProfile, opts ...Option) (framework.Framework, error) {
|
||||
options := defaultFrameworkOptions()
|
||||
func NewFramework(r Registry, profile *config.KubeSchedulerProfile, stopCh <-chan struct{}, opts ...Option) (framework.Framework, error) {
|
||||
options := defaultFrameworkOptions(stopCh)
|
||||
for _, opt := range opts {
|
||||
opt(&options)
|
||||
}
|
||||
|
@ -31,6 +31,7 @@ import (
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/component-base/metrics/testutil"
|
||||
"k8s.io/kubernetes/pkg/scheduler/apis/config"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework"
|
||||
@ -391,7 +392,7 @@ func newFrameworkWithQueueSortAndBind(r Registry, profile config.KubeSchedulerPr
|
||||
if len(profile.Plugins.Bind.Enabled) == 0 {
|
||||
profile.Plugins.Bind.Enabled = append(profile.Plugins.Bind.Enabled, config.Plugin{Name: bindPlugin})
|
||||
}
|
||||
return NewFramework(r, &profile, opts...)
|
||||
return NewFramework(r, &profile, wait.NeverStop, opts...)
|
||||
}
|
||||
|
||||
func TestInitFrameworkWithScorePlugins(t *testing.T) {
|
||||
@ -488,7 +489,7 @@ func TestNewFrameworkErrors(t *testing.T) {
|
||||
Plugins: tc.plugins,
|
||||
PluginConfig: tc.pluginCfg,
|
||||
}
|
||||
_, err := NewFramework(registry, profile)
|
||||
_, err := NewFramework(registry, profile, wait.NeverStop)
|
||||
if err == nil || !strings.Contains(err.Error(), tc.wantErr) {
|
||||
t.Errorf("Unexpected error, got %v, expect: %s", err, tc.wantErr)
|
||||
}
|
||||
@ -796,7 +797,7 @@ func TestNewFrameworkMultiPointExpansion(t *testing.T) {
|
||||
|
||||
for _, tc := range tests {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
fw, err := NewFramework(registry, &config.KubeSchedulerProfile{Plugins: tc.plugins})
|
||||
fw, err := NewFramework(registry, &config.KubeSchedulerProfile{Plugins: tc.plugins}, wait.NeverStop)
|
||||
if err != nil {
|
||||
if tc.wantErr == "" || !strings.Contains(err.Error(), tc.wantErr) {
|
||||
t.Fatalf("Unexpected error, got %v, expect: %s", err, tc.wantErr)
|
||||
@ -2228,7 +2229,9 @@ func TestRecordingMetrics(t *testing.T) {
|
||||
Bind: pluginSet,
|
||||
PostBind: pluginSet,
|
||||
}
|
||||
recorder := newMetricsRecorder(100, time.Nanosecond)
|
||||
|
||||
stopCh := make(chan struct{})
|
||||
recorder := newMetricsRecorder(100, time.Nanosecond, stopCh)
|
||||
profile := config.KubeSchedulerProfile{
|
||||
SchedulerName: testProfileName,
|
||||
Plugins: plugins,
|
||||
@ -2241,7 +2244,7 @@ func TestRecordingMetrics(t *testing.T) {
|
||||
tt.action(f)
|
||||
|
||||
// Stop the goroutine which records metrics and ensure it's stopped.
|
||||
close(recorder.stopCh)
|
||||
close(stopCh)
|
||||
<-recorder.isStoppedCh
|
||||
// Try to clean up the metrics buffer again in case it's not empty.
|
||||
recorder.flushMetrics()
|
||||
@ -2337,7 +2340,8 @@ func TestRunBindPlugins(t *testing.T) {
|
||||
pluginSet.Enabled = append(pluginSet.Enabled, config.Plugin{Name: name})
|
||||
}
|
||||
plugins := &config.Plugins{Bind: pluginSet}
|
||||
recorder := newMetricsRecorder(100, time.Nanosecond)
|
||||
stopCh := make(chan struct{})
|
||||
recorder := newMetricsRecorder(100, time.Nanosecond, stopCh)
|
||||
profile := config.KubeSchedulerProfile{
|
||||
SchedulerName: testProfileName,
|
||||
Plugins: plugins,
|
||||
@ -2353,7 +2357,7 @@ func TestRunBindPlugins(t *testing.T) {
|
||||
}
|
||||
|
||||
// Stop the goroutine which records metrics and ensure it's stopped.
|
||||
close(recorder.stopCh)
|
||||
close(stopCh)
|
||||
<-recorder.isStoppedCh
|
||||
// Try to clean up the metrics buffer again in case it's not empty.
|
||||
recorder.flushMetrics()
|
||||
|
@ -43,18 +43,18 @@ type metricsRecorder struct {
|
||||
|
||||
// stopCh is used to stop the goroutine which periodically flushes metrics. It's currently only
|
||||
// used in tests.
|
||||
stopCh chan struct{}
|
||||
stopCh <-chan struct{}
|
||||
// isStoppedCh indicates whether the goroutine is stopped. It's used in tests only to make sure
|
||||
// the metric flushing goroutine is stopped so that tests can collect metrics for verification.
|
||||
isStoppedCh chan struct{}
|
||||
}
|
||||
|
||||
func newMetricsRecorder(bufferSize int, interval time.Duration) *metricsRecorder {
|
||||
func newMetricsRecorder(bufferSize int, interval time.Duration, stopCh <-chan struct{}) *metricsRecorder {
|
||||
recorder := &metricsRecorder{
|
||||
bufferCh: make(chan *frameworkMetric, bufferSize),
|
||||
bufferSize: bufferSize,
|
||||
interval: interval,
|
||||
stopCh: make(chan struct{}),
|
||||
stopCh: stopCh,
|
||||
isStoppedCh: make(chan struct{}),
|
||||
}
|
||||
go recorder.run()
|
||||
|
@ -35,10 +35,10 @@ type RecorderFactory func(string) events.EventRecorder
|
||||
|
||||
// newProfile builds a Profile for the given configuration.
|
||||
func newProfile(cfg config.KubeSchedulerProfile, r frameworkruntime.Registry, recorderFact RecorderFactory,
|
||||
opts ...frameworkruntime.Option) (framework.Framework, error) {
|
||||
stopCh <-chan struct{}, opts ...frameworkruntime.Option) (framework.Framework, error) {
|
||||
recorder := recorderFact(cfg.SchedulerName)
|
||||
opts = append(opts, frameworkruntime.WithEventRecorder(recorder))
|
||||
return frameworkruntime.NewFramework(r, &cfg, opts...)
|
||||
return frameworkruntime.NewFramework(r, &cfg, stopCh, opts...)
|
||||
}
|
||||
|
||||
// Map holds frameworks indexed by scheduler name.
|
||||
@ -46,12 +46,12 @@ type Map map[string]framework.Framework
|
||||
|
||||
// NewMap builds the frameworks given by the configuration, indexed by name.
|
||||
func NewMap(cfgs []config.KubeSchedulerProfile, r frameworkruntime.Registry, recorderFact RecorderFactory,
|
||||
opts ...frameworkruntime.Option) (Map, error) {
|
||||
stopCh <-chan struct{}, opts ...frameworkruntime.Option) (Map, error) {
|
||||
m := make(Map)
|
||||
v := cfgValidator{m: m}
|
||||
|
||||
for _, cfg := range cfgs {
|
||||
p, err := newProfile(cfg, r, recorderFact, opts...)
|
||||
p, err := newProfile(cfg, r, recorderFact, stopCh, opts...)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("creating profile for scheduler name %s: %v", cfg.SchedulerName, err)
|
||||
}
|
||||
|
@ -246,7 +246,9 @@ func TestNewMap(t *testing.T) {
|
||||
}
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
m, err := NewMap(tc.cfgs, fakeRegistry, nilRecorderFactory)
|
||||
stopCh := make(chan struct{})
|
||||
defer close(stopCh)
|
||||
m, err := NewMap(tc.cfgs, fakeRegistry, nilRecorderFactory, stopCh)
|
||||
if err := checkErr(err, tc.wantErr); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
@ -278,7 +278,7 @@ func New(client clientset.Interface,
|
||||
snapshot := internalcache.NewEmptySnapshot()
|
||||
clusterEventMap := make(map[framework.ClusterEvent]sets.String)
|
||||
|
||||
profiles, err := profile.NewMap(options.profiles, registry, recorderFactory,
|
||||
profiles, err := profile.NewMap(options.profiles, registry, recorderFactory, stopCh,
|
||||
frameworkruntime.WithComponentConfigVersion(options.componentConfigVersion),
|
||||
frameworkruntime.WithClientSet(client),
|
||||
frameworkruntime.WithKubeConfig(options.kubeConfig),
|
||||
|
@ -18,6 +18,7 @@ package testing
|
||||
|
||||
import (
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/kube-scheduler/config/v1beta2"
|
||||
schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config"
|
||||
"k8s.io/kubernetes/pkg/scheduler/apis/config/scheme"
|
||||
@ -37,7 +38,7 @@ func NewFramework(fns []RegisterPluginFunc, profileName string, opts ...runtime.
|
||||
for _, f := range fns {
|
||||
f(®istry, profile)
|
||||
}
|
||||
return runtime.NewFramework(registry, profile, opts...)
|
||||
return runtime.NewFramework(registry, profile, wait.NeverStop, opts...)
|
||||
}
|
||||
|
||||
// RegisterPluginFunc is a function signature used in method RegisterFilterPlugin()
|
||||
|
Loading…
Reference in New Issue
Block a user