mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-05 02:09:56 +00:00
Clear shutdown of scheduler metrics recorder
This commit is contained in:
parent
32773d61c4
commit
7060953b92
@ -25,6 +25,7 @@ import (
|
|||||||
v1 "k8s.io/api/core/v1"
|
v1 "k8s.io/api/core/v1"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/runtime"
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
"k8s.io/client-go/kubernetes/fake"
|
"k8s.io/client-go/kubernetes/fake"
|
||||||
clienttesting "k8s.io/client-go/testing"
|
clienttesting "k8s.io/client-go/testing"
|
||||||
frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime"
|
frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime"
|
||||||
@ -65,7 +66,7 @@ func TestDefaultBinder(t *testing.T) {
|
|||||||
return true, gotBinding, nil
|
return true, gotBinding, nil
|
||||||
})
|
})
|
||||||
|
|
||||||
fh, err := frameworkruntime.NewFramework(nil, nil, frameworkruntime.WithClientSet(client))
|
fh, err := frameworkruntime.NewFramework(nil, nil, wait.NeverStop, frameworkruntime.WithClientSet(client))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
@ -25,6 +25,7 @@ import (
|
|||||||
"github.com/google/go-cmp/cmp"
|
"github.com/google/go-cmp/cmp"
|
||||||
v1 "k8s.io/api/core/v1"
|
v1 "k8s.io/api/core/v1"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
"k8s.io/kubernetes/pkg/scheduler/framework"
|
"k8s.io/kubernetes/pkg/scheduler/framework"
|
||||||
"k8s.io/kubernetes/pkg/scheduler/framework/runtime"
|
"k8s.io/kubernetes/pkg/scheduler/framework/runtime"
|
||||||
"k8s.io/kubernetes/pkg/scheduler/internal/cache"
|
"k8s.io/kubernetes/pkg/scheduler/internal/cache"
|
||||||
@ -334,7 +335,7 @@ func TestImageLocalityPriority(t *testing.T) {
|
|||||||
snapshot := cache.NewSnapshot(nil, test.nodes)
|
snapshot := cache.NewSnapshot(nil, test.nodes)
|
||||||
|
|
||||||
state := framework.NewCycleState()
|
state := framework.NewCycleState()
|
||||||
fh, _ := runtime.NewFramework(nil, nil, runtime.WithSnapshotSharedLister(snapshot))
|
fh, _ := runtime.NewFramework(nil, nil, wait.NeverStop, runtime.WithSnapshotSharedLister(snapshot))
|
||||||
|
|
||||||
p, _ := New(nil, fh)
|
p, _ := New(nil, fh)
|
||||||
var gotList framework.NodeScoreList
|
var gotList framework.NodeScoreList
|
||||||
|
@ -24,6 +24,7 @@ import (
|
|||||||
v1 "k8s.io/api/core/v1"
|
v1 "k8s.io/api/core/v1"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/util/sets"
|
"k8s.io/apimachinery/pkg/util/sets"
|
||||||
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
"k8s.io/kubernetes/pkg/scheduler/apis/config"
|
"k8s.io/kubernetes/pkg/scheduler/apis/config"
|
||||||
"k8s.io/kubernetes/pkg/scheduler/framework"
|
"k8s.io/kubernetes/pkg/scheduler/framework"
|
||||||
"k8s.io/kubernetes/pkg/scheduler/framework/runtime"
|
"k8s.io/kubernetes/pkg/scheduler/framework/runtime"
|
||||||
@ -1133,7 +1134,7 @@ func TestNodeAffinityPriority(t *testing.T) {
|
|||||||
for _, test := range tests {
|
for _, test := range tests {
|
||||||
t.Run(test.name, func(t *testing.T) {
|
t.Run(test.name, func(t *testing.T) {
|
||||||
state := framework.NewCycleState()
|
state := framework.NewCycleState()
|
||||||
fh, _ := runtime.NewFramework(nil, nil, runtime.WithSnapshotSharedLister(cache.NewSnapshot(nil, test.nodes)))
|
fh, _ := runtime.NewFramework(nil, nil, wait.NeverStop, runtime.WithSnapshotSharedLister(cache.NewSnapshot(nil, test.nodes)))
|
||||||
p, err := New(&test.args, fh)
|
p, err := New(&test.args, fh)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Creating plugin: %v", err)
|
t.Fatalf("Creating plugin: %v", err)
|
||||||
|
@ -24,6 +24,7 @@ import (
|
|||||||
v1 "k8s.io/api/core/v1"
|
v1 "k8s.io/api/core/v1"
|
||||||
"k8s.io/apimachinery/pkg/api/resource"
|
"k8s.io/apimachinery/pkg/api/resource"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
"k8s.io/kubernetes/pkg/scheduler/apis/config"
|
"k8s.io/kubernetes/pkg/scheduler/apis/config"
|
||||||
"k8s.io/kubernetes/pkg/scheduler/framework"
|
"k8s.io/kubernetes/pkg/scheduler/framework"
|
||||||
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
|
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
|
||||||
@ -350,7 +351,7 @@ func TestNodeResourcesBalancedAllocation(t *testing.T) {
|
|||||||
for _, test := range tests {
|
for _, test := range tests {
|
||||||
t.Run(test.name, func(t *testing.T) {
|
t.Run(test.name, func(t *testing.T) {
|
||||||
snapshot := cache.NewSnapshot(test.pods, test.nodes)
|
snapshot := cache.NewSnapshot(test.pods, test.nodes)
|
||||||
fh, _ := runtime.NewFramework(nil, nil, runtime.WithSnapshotSharedLister(snapshot))
|
fh, _ := runtime.NewFramework(nil, nil, wait.NeverStop, runtime.WithSnapshotSharedLister(snapshot))
|
||||||
p, _ := NewBalancedAllocation(&test.args, fh, feature.Features{})
|
p, _ := NewBalancedAllocation(&test.args, fh, feature.Features{})
|
||||||
for i := range test.nodes {
|
for i := range test.nodes {
|
||||||
hostResult, err := p.(framework.ScorePlugin).Score(context.Background(), nil, test.pod, test.nodes[i].Name)
|
hostResult, err := p.(framework.ScorePlugin).Score(context.Background(), nil, test.pod, test.nodes[i].Name)
|
||||||
|
@ -24,6 +24,7 @@ import (
|
|||||||
|
|
||||||
v1 "k8s.io/api/core/v1"
|
v1 "k8s.io/api/core/v1"
|
||||||
"k8s.io/apimachinery/pkg/api/resource"
|
"k8s.io/apimachinery/pkg/api/resource"
|
||||||
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
"k8s.io/apiserver/pkg/util/feature"
|
"k8s.io/apiserver/pkg/util/feature"
|
||||||
"k8s.io/component-base/featuregate"
|
"k8s.io/component-base/featuregate"
|
||||||
featuregatetesting "k8s.io/component-base/featuregate/testing"
|
featuregatetesting "k8s.io/component-base/featuregate/testing"
|
||||||
@ -757,7 +758,7 @@ func TestFitScore(t *testing.T) {
|
|||||||
t.Run(test.name, func(t *testing.T) {
|
t.Run(test.name, func(t *testing.T) {
|
||||||
state := framework.NewCycleState()
|
state := framework.NewCycleState()
|
||||||
snapshot := cache.NewSnapshot(test.existingPods, test.nodes)
|
snapshot := cache.NewSnapshot(test.existingPods, test.nodes)
|
||||||
fh, _ := runtime.NewFramework(nil, nil, runtime.WithSnapshotSharedLister(snapshot))
|
fh, _ := runtime.NewFramework(nil, nil, wait.NeverStop, runtime.WithSnapshotSharedLister(snapshot))
|
||||||
args := test.nodeResourcesFitArgs
|
args := test.nodeResourcesFitArgs
|
||||||
p, err := NewFit(&args, fh, plfeature.Features{})
|
p, err := NewFit(&args, fh, plfeature.Features{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -23,6 +23,7 @@ import (
|
|||||||
"github.com/google/go-cmp/cmp"
|
"github.com/google/go-cmp/cmp"
|
||||||
v1 "k8s.io/api/core/v1"
|
v1 "k8s.io/api/core/v1"
|
||||||
"k8s.io/apimachinery/pkg/util/validation/field"
|
"k8s.io/apimachinery/pkg/util/validation/field"
|
||||||
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
"k8s.io/kubernetes/pkg/scheduler/apis/config"
|
"k8s.io/kubernetes/pkg/scheduler/apis/config"
|
||||||
"k8s.io/kubernetes/pkg/scheduler/framework"
|
"k8s.io/kubernetes/pkg/scheduler/framework"
|
||||||
plfeature "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
|
plfeature "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
|
||||||
@ -376,7 +377,7 @@ func TestLeastAllocatedScoringStrategy(t *testing.T) {
|
|||||||
t.Run(test.name, func(t *testing.T) {
|
t.Run(test.name, func(t *testing.T) {
|
||||||
state := framework.NewCycleState()
|
state := framework.NewCycleState()
|
||||||
snapshot := cache.NewSnapshot(test.existingPods, test.nodes)
|
snapshot := cache.NewSnapshot(test.existingPods, test.nodes)
|
||||||
fh, _ := runtime.NewFramework(nil, nil, runtime.WithSnapshotSharedLister(snapshot))
|
fh, _ := runtime.NewFramework(nil, nil, wait.NeverStop, runtime.WithSnapshotSharedLister(snapshot))
|
||||||
|
|
||||||
p, err := NewFit(
|
p, err := NewFit(
|
||||||
&config.NodeResourcesFitArgs{
|
&config.NodeResourcesFitArgs{
|
||||||
|
@ -23,6 +23,7 @@ import (
|
|||||||
"github.com/google/go-cmp/cmp"
|
"github.com/google/go-cmp/cmp"
|
||||||
v1 "k8s.io/api/core/v1"
|
v1 "k8s.io/api/core/v1"
|
||||||
"k8s.io/apimachinery/pkg/util/validation/field"
|
"k8s.io/apimachinery/pkg/util/validation/field"
|
||||||
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
"k8s.io/kubernetes/pkg/scheduler/apis/config"
|
"k8s.io/kubernetes/pkg/scheduler/apis/config"
|
||||||
"k8s.io/kubernetes/pkg/scheduler/framework"
|
"k8s.io/kubernetes/pkg/scheduler/framework"
|
||||||
plfeature "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
|
plfeature "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
|
||||||
@ -332,7 +333,7 @@ func TestMostAllocatedScoringStrategy(t *testing.T) {
|
|||||||
t.Run(test.name, func(t *testing.T) {
|
t.Run(test.name, func(t *testing.T) {
|
||||||
state := framework.NewCycleState()
|
state := framework.NewCycleState()
|
||||||
snapshot := cache.NewSnapshot(test.existingPods, test.nodes)
|
snapshot := cache.NewSnapshot(test.existingPods, test.nodes)
|
||||||
fh, _ := runtime.NewFramework(nil, nil, runtime.WithSnapshotSharedLister(snapshot))
|
fh, _ := runtime.NewFramework(nil, nil, wait.NeverStop, runtime.WithSnapshotSharedLister(snapshot))
|
||||||
|
|
||||||
p, err := NewFit(
|
p, err := NewFit(
|
||||||
&config.NodeResourcesFitArgs{
|
&config.NodeResourcesFitArgs{
|
||||||
|
@ -25,6 +25,7 @@ import (
|
|||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
v1 "k8s.io/api/core/v1"
|
v1 "k8s.io/api/core/v1"
|
||||||
"k8s.io/apimachinery/pkg/util/validation/field"
|
"k8s.io/apimachinery/pkg/util/validation/field"
|
||||||
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
"k8s.io/kubernetes/pkg/scheduler/apis/config"
|
"k8s.io/kubernetes/pkg/scheduler/apis/config"
|
||||||
"k8s.io/kubernetes/pkg/scheduler/framework"
|
"k8s.io/kubernetes/pkg/scheduler/framework"
|
||||||
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
|
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
|
||||||
@ -110,7 +111,7 @@ func TestRequestedToCapacityRatioScoringStrategy(t *testing.T) {
|
|||||||
t.Run(test.name, func(t *testing.T) {
|
t.Run(test.name, func(t *testing.T) {
|
||||||
state := framework.NewCycleState()
|
state := framework.NewCycleState()
|
||||||
snapshot := cache.NewSnapshot(test.existingPods, test.nodes)
|
snapshot := cache.NewSnapshot(test.existingPods, test.nodes)
|
||||||
fh, _ := runtime.NewFramework(nil, nil, runtime.WithSnapshotSharedLister(snapshot))
|
fh, _ := runtime.NewFramework(nil, nil, wait.NeverStop, runtime.WithSnapshotSharedLister(snapshot))
|
||||||
|
|
||||||
p, err := NewFit(&config.NodeResourcesFitArgs{
|
p, err := NewFit(&config.NodeResourcesFitArgs{
|
||||||
ScoringStrategy: &config.ScoringStrategy{
|
ScoringStrategy: &config.ScoringStrategy{
|
||||||
@ -301,7 +302,7 @@ func TestResourceBinPackingSingleExtended(t *testing.T) {
|
|||||||
t.Run(test.name, func(t *testing.T) {
|
t.Run(test.name, func(t *testing.T) {
|
||||||
state := framework.NewCycleState()
|
state := framework.NewCycleState()
|
||||||
snapshot := cache.NewSnapshot(test.pods, test.nodes)
|
snapshot := cache.NewSnapshot(test.pods, test.nodes)
|
||||||
fh, _ := runtime.NewFramework(nil, nil, runtime.WithSnapshotSharedLister(snapshot))
|
fh, _ := runtime.NewFramework(nil, nil, wait.NeverStop, runtime.WithSnapshotSharedLister(snapshot))
|
||||||
args := config.NodeResourcesFitArgs{
|
args := config.NodeResourcesFitArgs{
|
||||||
ScoringStrategy: &config.ScoringStrategy{
|
ScoringStrategy: &config.ScoringStrategy{
|
||||||
Type: config.RequestedToCapacityRatio,
|
Type: config.RequestedToCapacityRatio,
|
||||||
@ -521,7 +522,7 @@ func TestResourceBinPackingMultipleExtended(t *testing.T) {
|
|||||||
t.Run(test.name, func(t *testing.T) {
|
t.Run(test.name, func(t *testing.T) {
|
||||||
state := framework.NewCycleState()
|
state := framework.NewCycleState()
|
||||||
snapshot := cache.NewSnapshot(test.pods, test.nodes)
|
snapshot := cache.NewSnapshot(test.pods, test.nodes)
|
||||||
fh, _ := runtime.NewFramework(nil, nil, runtime.WithSnapshotSharedLister(snapshot))
|
fh, _ := runtime.NewFramework(nil, nil, wait.NeverStop, runtime.WithSnapshotSharedLister(snapshot))
|
||||||
|
|
||||||
args := config.NodeResourcesFitArgs{
|
args := config.NodeResourcesFitArgs{
|
||||||
ScoringStrategy: &config.ScoringStrategy{
|
ScoringStrategy: &config.ScoringStrategy{
|
||||||
|
@ -27,6 +27,7 @@ import (
|
|||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/runtime"
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
"k8s.io/apimachinery/pkg/util/sets"
|
"k8s.io/apimachinery/pkg/util/sets"
|
||||||
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
"k8s.io/client-go/informers"
|
"k8s.io/client-go/informers"
|
||||||
"k8s.io/client-go/kubernetes/fake"
|
"k8s.io/client-go/kubernetes/fake"
|
||||||
"k8s.io/kubernetes/pkg/scheduler/apis/config"
|
"k8s.io/kubernetes/pkg/scheduler/apis/config"
|
||||||
@ -497,7 +498,7 @@ func TestPreScoreStateEmptyNodes(t *testing.T) {
|
|||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
defer cancel()
|
defer cancel()
|
||||||
informerFactory := informers.NewSharedInformerFactory(fake.NewSimpleClientset(tt.objs...), 0)
|
informerFactory := informers.NewSharedInformerFactory(fake.NewSimpleClientset(tt.objs...), 0)
|
||||||
f, err := frameworkruntime.NewFramework(nil, nil,
|
f, err := frameworkruntime.NewFramework(nil, nil, wait.NeverStop,
|
||||||
frameworkruntime.WithSnapshotSharedLister(cache.NewSnapshot(nil, tt.nodes)),
|
frameworkruntime.WithSnapshotSharedLister(cache.NewSnapshot(nil, tt.nodes)),
|
||||||
frameworkruntime.WithInformerFactory(informerFactory))
|
frameworkruntime.WithInformerFactory(informerFactory))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -1299,7 +1300,7 @@ func BenchmarkTestDefaultEvenPodsSpreadPriority(b *testing.B) {
|
|||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
defer cancel()
|
defer cancel()
|
||||||
informerFactory := informers.NewSharedInformerFactory(client, 0)
|
informerFactory := informers.NewSharedInformerFactory(client, 0)
|
||||||
f, err := frameworkruntime.NewFramework(nil, nil,
|
f, err := frameworkruntime.NewFramework(nil, nil, wait.NeverStop,
|
||||||
frameworkruntime.WithSnapshotSharedLister(snapshot),
|
frameworkruntime.WithSnapshotSharedLister(snapshot),
|
||||||
frameworkruntime.WithInformerFactory(informerFactory))
|
frameworkruntime.WithInformerFactory(informerFactory))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -21,6 +21,7 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
v1 "k8s.io/api/core/v1"
|
v1 "k8s.io/api/core/v1"
|
||||||
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
"k8s.io/client-go/informers"
|
"k8s.io/client-go/informers"
|
||||||
"k8s.io/client-go/kubernetes/fake"
|
"k8s.io/client-go/kubernetes/fake"
|
||||||
"k8s.io/kubernetes/pkg/scheduler/framework"
|
"k8s.io/kubernetes/pkg/scheduler/framework"
|
||||||
@ -69,7 +70,7 @@ func BenchmarkTestSelectorSpreadPriority(b *testing.B) {
|
|||||||
b.Errorf("error waiting for informer cache sync")
|
b.Errorf("error waiting for informer cache sync")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
fh, _ := runtime.NewFramework(nil, nil, runtime.WithSnapshotSharedLister(snapshot), runtime.WithInformerFactory(informerFactory))
|
fh, _ := runtime.NewFramework(nil, nil, wait.NeverStop, runtime.WithSnapshotSharedLister(snapshot), runtime.WithInformerFactory(informerFactory))
|
||||||
pl, err := New(nil, fh)
|
pl, err := New(nil, fh)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
b.Fatal(err)
|
b.Fatal(err)
|
||||||
|
@ -28,6 +28,7 @@ import (
|
|||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/runtime"
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||||
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
"k8s.io/client-go/informers"
|
"k8s.io/client-go/informers"
|
||||||
clientsetfake "k8s.io/client-go/kubernetes/fake"
|
clientsetfake "k8s.io/client-go/kubernetes/fake"
|
||||||
"k8s.io/kubernetes/pkg/scheduler/framework"
|
"k8s.io/kubernetes/pkg/scheduler/framework"
|
||||||
@ -402,7 +403,7 @@ func TestSelectorSpreadScore(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("error creating informerFactory: %+v", err)
|
t.Errorf("error creating informerFactory: %+v", err)
|
||||||
}
|
}
|
||||||
fh, err := frameworkruntime.NewFramework(nil, nil, frameworkruntime.WithSnapshotSharedLister(snapshot), frameworkruntime.WithInformerFactory(informerFactory))
|
fh, err := frameworkruntime.NewFramework(nil, nil, wait.NeverStop, frameworkruntime.WithSnapshotSharedLister(snapshot), frameworkruntime.WithInformerFactory(informerFactory))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("error creating new framework handle: %+v", err)
|
t.Errorf("error creating new framework handle: %+v", err)
|
||||||
}
|
}
|
||||||
@ -660,7 +661,7 @@ func TestZoneSelectorSpreadPriority(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("error creating informerFactory: %+v", err)
|
t.Errorf("error creating informerFactory: %+v", err)
|
||||||
}
|
}
|
||||||
fh, err := frameworkruntime.NewFramework(nil, nil, frameworkruntime.WithSnapshotSharedLister(snapshot), frameworkruntime.WithInformerFactory(informerFactory))
|
fh, err := frameworkruntime.NewFramework(nil, nil, wait.NeverStop, frameworkruntime.WithSnapshotSharedLister(snapshot), frameworkruntime.WithInformerFactory(informerFactory))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("error creating new framework handle: %+v", err)
|
t.Errorf("error creating new framework handle: %+v", err)
|
||||||
}
|
}
|
||||||
|
@ -23,6 +23,7 @@ import (
|
|||||||
|
|
||||||
v1 "k8s.io/api/core/v1"
|
v1 "k8s.io/api/core/v1"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
"k8s.io/kubernetes/pkg/scheduler/framework"
|
"k8s.io/kubernetes/pkg/scheduler/framework"
|
||||||
"k8s.io/kubernetes/pkg/scheduler/framework/runtime"
|
"k8s.io/kubernetes/pkg/scheduler/framework/runtime"
|
||||||
"k8s.io/kubernetes/pkg/scheduler/internal/cache"
|
"k8s.io/kubernetes/pkg/scheduler/internal/cache"
|
||||||
@ -230,7 +231,7 @@ func TestTaintTolerationScore(t *testing.T) {
|
|||||||
t.Run(test.name, func(t *testing.T) {
|
t.Run(test.name, func(t *testing.T) {
|
||||||
state := framework.NewCycleState()
|
state := framework.NewCycleState()
|
||||||
snapshot := cache.NewSnapshot(nil, test.nodes)
|
snapshot := cache.NewSnapshot(nil, test.nodes)
|
||||||
fh, _ := runtime.NewFramework(nil, nil, runtime.WithSnapshotSharedLister(snapshot))
|
fh, _ := runtime.NewFramework(nil, nil, wait.NeverStop, runtime.WithSnapshotSharedLister(snapshot))
|
||||||
|
|
||||||
p, _ := New(nil, fh)
|
p, _ := New(nil, fh)
|
||||||
status := p.(framework.PreScorePlugin).PreScore(context.Background(), state, test.pod, test.nodes)
|
status := p.(framework.PreScorePlugin).PreScore(context.Background(), state, test.pod, test.nodes)
|
||||||
|
@ -23,6 +23,7 @@ import (
|
|||||||
"k8s.io/api/core/v1"
|
"k8s.io/api/core/v1"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/runtime"
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
"k8s.io/client-go/informers"
|
"k8s.io/client-go/informers"
|
||||||
"k8s.io/client-go/kubernetes/fake"
|
"k8s.io/client-go/kubernetes/fake"
|
||||||
"k8s.io/kubernetes/pkg/scheduler/framework"
|
"k8s.io/kubernetes/pkg/scheduler/framework"
|
||||||
@ -43,7 +44,7 @@ func SetupPluginWithInformers(
|
|||||||
) framework.Plugin {
|
) framework.Plugin {
|
||||||
objs = append([]runtime.Object{&v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: ""}}}, objs...)
|
objs = append([]runtime.Object{&v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: ""}}}, objs...)
|
||||||
informerFactory := informers.NewSharedInformerFactory(fake.NewSimpleClientset(objs...), 0)
|
informerFactory := informers.NewSharedInformerFactory(fake.NewSimpleClientset(objs...), 0)
|
||||||
fh, err := frameworkruntime.NewFramework(nil, nil,
|
fh, err := frameworkruntime.NewFramework(nil, nil, wait.NeverStop,
|
||||||
frameworkruntime.WithSnapshotSharedLister(sharedLister),
|
frameworkruntime.WithSnapshotSharedLister(sharedLister),
|
||||||
frameworkruntime.WithInformerFactory(informerFactory))
|
frameworkruntime.WithInformerFactory(informerFactory))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -66,7 +67,7 @@ func SetupPlugin(
|
|||||||
config runtime.Object,
|
config runtime.Object,
|
||||||
sharedLister framework.SharedLister,
|
sharedLister framework.SharedLister,
|
||||||
) framework.Plugin {
|
) framework.Plugin {
|
||||||
fh, err := frameworkruntime.NewFramework(nil, nil,
|
fh, err := frameworkruntime.NewFramework(nil, nil, wait.NeverStop,
|
||||||
frameworkruntime.WithSnapshotSharedLister(sharedLister))
|
frameworkruntime.WithSnapshotSharedLister(sharedLister))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
tb.Fatalf("Failed creating framework runtime: %v", err)
|
tb.Fatalf("Failed creating framework runtime: %v", err)
|
||||||
|
@ -27,6 +27,7 @@ import (
|
|||||||
storagev1 "k8s.io/api/storage/v1"
|
storagev1 "k8s.io/api/storage/v1"
|
||||||
"k8s.io/apimachinery/pkg/api/resource"
|
"k8s.io/apimachinery/pkg/api/resource"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
"k8s.io/client-go/informers"
|
"k8s.io/client-go/informers"
|
||||||
"k8s.io/client-go/kubernetes/fake"
|
"k8s.io/client-go/kubernetes/fake"
|
||||||
"k8s.io/kubernetes/pkg/scheduler/apis/config"
|
"k8s.io/kubernetes/pkg/scheduler/apis/config"
|
||||||
@ -600,7 +601,7 @@ func TestVolumeBinding(t *testing.T) {
|
|||||||
runtime.WithClientSet(client),
|
runtime.WithClientSet(client),
|
||||||
runtime.WithInformerFactory(informerFactory),
|
runtime.WithInformerFactory(informerFactory),
|
||||||
}
|
}
|
||||||
fh, err := runtime.NewFramework(nil, nil, opts...)
|
fh, err := runtime.NewFramework(nil, nil, wait.NeverStop, opts...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
@ -227,9 +227,9 @@ func WithCaptureProfile(c CaptureProfile) Option {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func defaultFrameworkOptions() frameworkOptions {
|
func defaultFrameworkOptions(stopCh <-chan struct{}) frameworkOptions {
|
||||||
return frameworkOptions{
|
return frameworkOptions{
|
||||||
metricsRecorder: newMetricsRecorder(1000, time.Second),
|
metricsRecorder: newMetricsRecorder(1000, time.Second, stopCh),
|
||||||
clusterEventMap: make(map[framework.ClusterEvent]sets.String),
|
clusterEventMap: make(map[framework.ClusterEvent]sets.String),
|
||||||
parallelizer: parallelize.NewParallelizer(parallelize.DefaultParallelism),
|
parallelizer: parallelize.NewParallelizer(parallelize.DefaultParallelism),
|
||||||
}
|
}
|
||||||
@ -245,8 +245,8 @@ func WithClusterEventMap(m map[framework.ClusterEvent]sets.String) Option {
|
|||||||
var _ framework.Framework = &frameworkImpl{}
|
var _ framework.Framework = &frameworkImpl{}
|
||||||
|
|
||||||
// NewFramework initializes plugins given the configuration and the registry.
|
// NewFramework initializes plugins given the configuration and the registry.
|
||||||
func NewFramework(r Registry, profile *config.KubeSchedulerProfile, opts ...Option) (framework.Framework, error) {
|
func NewFramework(r Registry, profile *config.KubeSchedulerProfile, stopCh <-chan struct{}, opts ...Option) (framework.Framework, error) {
|
||||||
options := defaultFrameworkOptions()
|
options := defaultFrameworkOptions(stopCh)
|
||||||
for _, opt := range opts {
|
for _, opt := range opts {
|
||||||
opt(&options)
|
opt(&options)
|
||||||
}
|
}
|
||||||
|
@ -31,6 +31,7 @@ import (
|
|||||||
"k8s.io/apimachinery/pkg/runtime"
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
"k8s.io/apimachinery/pkg/types"
|
"k8s.io/apimachinery/pkg/types"
|
||||||
"k8s.io/apimachinery/pkg/util/sets"
|
"k8s.io/apimachinery/pkg/util/sets"
|
||||||
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
"k8s.io/component-base/metrics/testutil"
|
"k8s.io/component-base/metrics/testutil"
|
||||||
"k8s.io/kubernetes/pkg/scheduler/apis/config"
|
"k8s.io/kubernetes/pkg/scheduler/apis/config"
|
||||||
"k8s.io/kubernetes/pkg/scheduler/framework"
|
"k8s.io/kubernetes/pkg/scheduler/framework"
|
||||||
@ -391,7 +392,7 @@ func newFrameworkWithQueueSortAndBind(r Registry, profile config.KubeSchedulerPr
|
|||||||
if len(profile.Plugins.Bind.Enabled) == 0 {
|
if len(profile.Plugins.Bind.Enabled) == 0 {
|
||||||
profile.Plugins.Bind.Enabled = append(profile.Plugins.Bind.Enabled, config.Plugin{Name: bindPlugin})
|
profile.Plugins.Bind.Enabled = append(profile.Plugins.Bind.Enabled, config.Plugin{Name: bindPlugin})
|
||||||
}
|
}
|
||||||
return NewFramework(r, &profile, opts...)
|
return NewFramework(r, &profile, wait.NeverStop, opts...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestInitFrameworkWithScorePlugins(t *testing.T) {
|
func TestInitFrameworkWithScorePlugins(t *testing.T) {
|
||||||
@ -488,7 +489,7 @@ func TestNewFrameworkErrors(t *testing.T) {
|
|||||||
Plugins: tc.plugins,
|
Plugins: tc.plugins,
|
||||||
PluginConfig: tc.pluginCfg,
|
PluginConfig: tc.pluginCfg,
|
||||||
}
|
}
|
||||||
_, err := NewFramework(registry, profile)
|
_, err := NewFramework(registry, profile, wait.NeverStop)
|
||||||
if err == nil || !strings.Contains(err.Error(), tc.wantErr) {
|
if err == nil || !strings.Contains(err.Error(), tc.wantErr) {
|
||||||
t.Errorf("Unexpected error, got %v, expect: %s", err, tc.wantErr)
|
t.Errorf("Unexpected error, got %v, expect: %s", err, tc.wantErr)
|
||||||
}
|
}
|
||||||
@ -796,7 +797,7 @@ func TestNewFrameworkMultiPointExpansion(t *testing.T) {
|
|||||||
|
|
||||||
for _, tc := range tests {
|
for _, tc := range tests {
|
||||||
t.Run(tc.name, func(t *testing.T) {
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
fw, err := NewFramework(registry, &config.KubeSchedulerProfile{Plugins: tc.plugins})
|
fw, err := NewFramework(registry, &config.KubeSchedulerProfile{Plugins: tc.plugins}, wait.NeverStop)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if tc.wantErr == "" || !strings.Contains(err.Error(), tc.wantErr) {
|
if tc.wantErr == "" || !strings.Contains(err.Error(), tc.wantErr) {
|
||||||
t.Fatalf("Unexpected error, got %v, expect: %s", err, tc.wantErr)
|
t.Fatalf("Unexpected error, got %v, expect: %s", err, tc.wantErr)
|
||||||
@ -2228,7 +2229,9 @@ func TestRecordingMetrics(t *testing.T) {
|
|||||||
Bind: pluginSet,
|
Bind: pluginSet,
|
||||||
PostBind: pluginSet,
|
PostBind: pluginSet,
|
||||||
}
|
}
|
||||||
recorder := newMetricsRecorder(100, time.Nanosecond)
|
|
||||||
|
stopCh := make(chan struct{})
|
||||||
|
recorder := newMetricsRecorder(100, time.Nanosecond, stopCh)
|
||||||
profile := config.KubeSchedulerProfile{
|
profile := config.KubeSchedulerProfile{
|
||||||
SchedulerName: testProfileName,
|
SchedulerName: testProfileName,
|
||||||
Plugins: plugins,
|
Plugins: plugins,
|
||||||
@ -2241,7 +2244,7 @@ func TestRecordingMetrics(t *testing.T) {
|
|||||||
tt.action(f)
|
tt.action(f)
|
||||||
|
|
||||||
// Stop the goroutine which records metrics and ensure it's stopped.
|
// Stop the goroutine which records metrics and ensure it's stopped.
|
||||||
close(recorder.stopCh)
|
close(stopCh)
|
||||||
<-recorder.isStoppedCh
|
<-recorder.isStoppedCh
|
||||||
// Try to clean up the metrics buffer again in case it's not empty.
|
// Try to clean up the metrics buffer again in case it's not empty.
|
||||||
recorder.flushMetrics()
|
recorder.flushMetrics()
|
||||||
@ -2337,7 +2340,8 @@ func TestRunBindPlugins(t *testing.T) {
|
|||||||
pluginSet.Enabled = append(pluginSet.Enabled, config.Plugin{Name: name})
|
pluginSet.Enabled = append(pluginSet.Enabled, config.Plugin{Name: name})
|
||||||
}
|
}
|
||||||
plugins := &config.Plugins{Bind: pluginSet}
|
plugins := &config.Plugins{Bind: pluginSet}
|
||||||
recorder := newMetricsRecorder(100, time.Nanosecond)
|
stopCh := make(chan struct{})
|
||||||
|
recorder := newMetricsRecorder(100, time.Nanosecond, stopCh)
|
||||||
profile := config.KubeSchedulerProfile{
|
profile := config.KubeSchedulerProfile{
|
||||||
SchedulerName: testProfileName,
|
SchedulerName: testProfileName,
|
||||||
Plugins: plugins,
|
Plugins: plugins,
|
||||||
@ -2353,7 +2357,7 @@ func TestRunBindPlugins(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Stop the goroutine which records metrics and ensure it's stopped.
|
// Stop the goroutine which records metrics and ensure it's stopped.
|
||||||
close(recorder.stopCh)
|
close(stopCh)
|
||||||
<-recorder.isStoppedCh
|
<-recorder.isStoppedCh
|
||||||
// Try to clean up the metrics buffer again in case it's not empty.
|
// Try to clean up the metrics buffer again in case it's not empty.
|
||||||
recorder.flushMetrics()
|
recorder.flushMetrics()
|
||||||
|
@ -43,18 +43,18 @@ type metricsRecorder struct {
|
|||||||
|
|
||||||
// stopCh is used to stop the goroutine which periodically flushes metrics. It's currently only
|
// stopCh is used to stop the goroutine which periodically flushes metrics. It's currently only
|
||||||
// used in tests.
|
// used in tests.
|
||||||
stopCh chan struct{}
|
stopCh <-chan struct{}
|
||||||
// isStoppedCh indicates whether the goroutine is stopped. It's used in tests only to make sure
|
// isStoppedCh indicates whether the goroutine is stopped. It's used in tests only to make sure
|
||||||
// the metric flushing goroutine is stopped so that tests can collect metrics for verification.
|
// the metric flushing goroutine is stopped so that tests can collect metrics for verification.
|
||||||
isStoppedCh chan struct{}
|
isStoppedCh chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func newMetricsRecorder(bufferSize int, interval time.Duration) *metricsRecorder {
|
func newMetricsRecorder(bufferSize int, interval time.Duration, stopCh <-chan struct{}) *metricsRecorder {
|
||||||
recorder := &metricsRecorder{
|
recorder := &metricsRecorder{
|
||||||
bufferCh: make(chan *frameworkMetric, bufferSize),
|
bufferCh: make(chan *frameworkMetric, bufferSize),
|
||||||
bufferSize: bufferSize,
|
bufferSize: bufferSize,
|
||||||
interval: interval,
|
interval: interval,
|
||||||
stopCh: make(chan struct{}),
|
stopCh: stopCh,
|
||||||
isStoppedCh: make(chan struct{}),
|
isStoppedCh: make(chan struct{}),
|
||||||
}
|
}
|
||||||
go recorder.run()
|
go recorder.run()
|
||||||
|
@ -35,10 +35,10 @@ type RecorderFactory func(string) events.EventRecorder
|
|||||||
|
|
||||||
// newProfile builds a Profile for the given configuration.
|
// newProfile builds a Profile for the given configuration.
|
||||||
func newProfile(cfg config.KubeSchedulerProfile, r frameworkruntime.Registry, recorderFact RecorderFactory,
|
func newProfile(cfg config.KubeSchedulerProfile, r frameworkruntime.Registry, recorderFact RecorderFactory,
|
||||||
opts ...frameworkruntime.Option) (framework.Framework, error) {
|
stopCh <-chan struct{}, opts ...frameworkruntime.Option) (framework.Framework, error) {
|
||||||
recorder := recorderFact(cfg.SchedulerName)
|
recorder := recorderFact(cfg.SchedulerName)
|
||||||
opts = append(opts, frameworkruntime.WithEventRecorder(recorder))
|
opts = append(opts, frameworkruntime.WithEventRecorder(recorder))
|
||||||
return frameworkruntime.NewFramework(r, &cfg, opts...)
|
return frameworkruntime.NewFramework(r, &cfg, stopCh, opts...)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Map holds frameworks indexed by scheduler name.
|
// Map holds frameworks indexed by scheduler name.
|
||||||
@ -46,12 +46,12 @@ type Map map[string]framework.Framework
|
|||||||
|
|
||||||
// NewMap builds the frameworks given by the configuration, indexed by name.
|
// NewMap builds the frameworks given by the configuration, indexed by name.
|
||||||
func NewMap(cfgs []config.KubeSchedulerProfile, r frameworkruntime.Registry, recorderFact RecorderFactory,
|
func NewMap(cfgs []config.KubeSchedulerProfile, r frameworkruntime.Registry, recorderFact RecorderFactory,
|
||||||
opts ...frameworkruntime.Option) (Map, error) {
|
stopCh <-chan struct{}, opts ...frameworkruntime.Option) (Map, error) {
|
||||||
m := make(Map)
|
m := make(Map)
|
||||||
v := cfgValidator{m: m}
|
v := cfgValidator{m: m}
|
||||||
|
|
||||||
for _, cfg := range cfgs {
|
for _, cfg := range cfgs {
|
||||||
p, err := newProfile(cfg, r, recorderFact, opts...)
|
p, err := newProfile(cfg, r, recorderFact, stopCh, opts...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("creating profile for scheduler name %s: %v", cfg.SchedulerName, err)
|
return nil, fmt.Errorf("creating profile for scheduler name %s: %v", cfg.SchedulerName, err)
|
||||||
}
|
}
|
||||||
|
@ -246,7 +246,9 @@ func TestNewMap(t *testing.T) {
|
|||||||
}
|
}
|
||||||
for _, tc := range cases {
|
for _, tc := range cases {
|
||||||
t.Run(tc.name, func(t *testing.T) {
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
m, err := NewMap(tc.cfgs, fakeRegistry, nilRecorderFactory)
|
stopCh := make(chan struct{})
|
||||||
|
defer close(stopCh)
|
||||||
|
m, err := NewMap(tc.cfgs, fakeRegistry, nilRecorderFactory, stopCh)
|
||||||
if err := checkErr(err, tc.wantErr); err != nil {
|
if err := checkErr(err, tc.wantErr); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
@ -278,7 +278,7 @@ func New(client clientset.Interface,
|
|||||||
snapshot := internalcache.NewEmptySnapshot()
|
snapshot := internalcache.NewEmptySnapshot()
|
||||||
clusterEventMap := make(map[framework.ClusterEvent]sets.String)
|
clusterEventMap := make(map[framework.ClusterEvent]sets.String)
|
||||||
|
|
||||||
profiles, err := profile.NewMap(options.profiles, registry, recorderFactory,
|
profiles, err := profile.NewMap(options.profiles, registry, recorderFactory, stopCh,
|
||||||
frameworkruntime.WithComponentConfigVersion(options.componentConfigVersion),
|
frameworkruntime.WithComponentConfigVersion(options.componentConfigVersion),
|
||||||
frameworkruntime.WithClientSet(client),
|
frameworkruntime.WithClientSet(client),
|
||||||
frameworkruntime.WithKubeConfig(options.kubeConfig),
|
frameworkruntime.WithKubeConfig(options.kubeConfig),
|
||||||
|
@ -18,6 +18,7 @@ package testing
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||||
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
"k8s.io/kube-scheduler/config/v1beta2"
|
"k8s.io/kube-scheduler/config/v1beta2"
|
||||||
schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config"
|
schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config"
|
||||||
"k8s.io/kubernetes/pkg/scheduler/apis/config/scheme"
|
"k8s.io/kubernetes/pkg/scheduler/apis/config/scheme"
|
||||||
@ -37,7 +38,7 @@ func NewFramework(fns []RegisterPluginFunc, profileName string, opts ...runtime.
|
|||||||
for _, f := range fns {
|
for _, f := range fns {
|
||||||
f(®istry, profile)
|
f(®istry, profile)
|
||||||
}
|
}
|
||||||
return runtime.NewFramework(registry, profile, opts...)
|
return runtime.NewFramework(registry, profile, wait.NeverStop, opts...)
|
||||||
}
|
}
|
||||||
|
|
||||||
// RegisterPluginFunc is a function signature used in method RegisterFilterPlugin()
|
// RegisterPluginFunc is a function signature used in method RegisterFilterPlugin()
|
||||||
|
Loading…
Reference in New Issue
Block a user