kube-scheduler: NewFramework function to pass the context parameter

Co-authored-by: Aldo Culquicondor <1299064+alculquicondor@users.noreply.github.com>
This commit is contained in:
Mengjiao Liu 2023-05-15 18:36:17 +08:00
parent b31774d39b
commit 1c05cf1d51
31 changed files with 222 additions and 164 deletions

View File

@ -345,11 +345,11 @@ func Setup(ctx context.Context, opts *options.Options, outOfTreeRegistryOptions
recorderFactory := getRecorderFactory(&cc) recorderFactory := getRecorderFactory(&cc)
completedProfiles := make([]kubeschedulerconfig.KubeSchedulerProfile, 0) completedProfiles := make([]kubeschedulerconfig.KubeSchedulerProfile, 0)
// Create the scheduler. // Create the scheduler.
sched, err := scheduler.New(cc.Client, sched, err := scheduler.New(ctx,
cc.Client,
cc.InformerFactory, cc.InformerFactory,
cc.DynInformerFactory, cc.DynInformerFactory,
recorderFactory, recorderFactory,
ctx.Done(),
scheduler.WithComponentConfigVersion(cc.ComponentConfig.TypeMeta.APIVersion), scheduler.WithComponentConfigVersion(cc.ComponentConfig.TypeMeta.APIVersion),
scheduler.WithKubeConfig(cc.KubeConfig), scheduler.WithKubeConfig(cc.KubeConfig),
scheduler.WithProfiles(cc.ComponentConfig.Profiles...), scheduler.WithProfiles(cc.ComponentConfig.Profiles...),

View File

@ -288,7 +288,8 @@ func TestSchedulerWithExtenders(t *testing.T) {
cache.AddNode(createNode(name)) cache.AddNode(createNode(name))
} }
fwk, err := st.NewFramework( fwk, err := st.NewFramework(
test.registerPlugins, "", ctx.Done(), ctx,
test.registerPlugins, "",
runtime.WithClientSet(client), runtime.WithClientSet(client),
runtime.WithInformerFactory(informerFactory), runtime.WithInformerFactory(informerFactory),
runtime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())), runtime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())),

View File

@ -26,6 +26,7 @@ import (
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
"k8s.io/klog/v2/ktesting"
"k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/runtime" "k8s.io/kubernetes/pkg/scheduler/framework/runtime"
) )
@ -42,8 +43,9 @@ func TestFrameworkContract(t *testing.T) {
} }
func TestNewFramework(t *testing.T) { func TestNewFramework(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
var f interface{} var f interface{}
if f, _ = runtime.NewFramework(nil, nil, nil); f != nil { if f, _ = runtime.NewFramework(ctx, nil, nil); f != nil {
_, ok := f.(framework.Framework) _, ok := f.(framework.Framework)
assert.True(t, ok) assert.True(t, ok)
} }

View File

@ -27,6 +27,7 @@ import (
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/kubernetes/fake"
clienttesting "k8s.io/client-go/testing" clienttesting "k8s.io/client-go/testing"
"k8s.io/klog/v2/ktesting"
frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime" frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime"
st "k8s.io/kubernetes/pkg/scheduler/testing" st "k8s.io/kubernetes/pkg/scheduler/testing"
) )
@ -52,7 +53,8 @@ func TestDefaultBinder(t *testing.T) {
} }
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) _, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
var gotBinding *v1.Binding var gotBinding *v1.Binding
@ -68,7 +70,7 @@ func TestDefaultBinder(t *testing.T) {
return true, gotBinding, nil return true, gotBinding, nil
}) })
fh, err := frameworkruntime.NewFramework(nil, nil, ctx.Done(), frameworkruntime.WithClientSet(client)) fh, err := frameworkruntime.NewFramework(ctx, nil, nil, frameworkruntime.WithClientSet(client))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@ -355,7 +355,7 @@ func TestPostFilter(t *testing.T) {
logger, ctx := ktesting.NewTestContext(t) logger, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
f, err := st.NewFramework(registeredPlugins, "", ctx.Done(), f, err := st.NewFramework(ctx, registeredPlugins, "",
frameworkruntime.WithClientSet(cs), frameworkruntime.WithClientSet(cs),
frameworkruntime.WithEventRecorder(&events.FakeRecorder{}), frameworkruntime.WithEventRecorder(&events.FakeRecorder{}),
frameworkruntime.WithInformerFactory(informerFactory), frameworkruntime.WithInformerFactory(informerFactory),
@ -1093,7 +1093,8 @@ func TestDryRunPreemption(t *testing.T) {
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
fwk, err := st.NewFramework( fwk, err := st.NewFramework(
registeredPlugins, "", ctx.Done(), ctx,
registeredPlugins, "",
frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())), frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())),
frameworkruntime.WithSnapshotSharedLister(snapshot), frameworkruntime.WithSnapshotSharedLister(snapshot),
frameworkruntime.WithInformerFactory(informerFactory), frameworkruntime.WithInformerFactory(informerFactory),
@ -1346,13 +1347,13 @@ func TestSelectBestCandidate(t *testing.T) {
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
fwk, err := st.NewFramework( fwk, err := st.NewFramework(
ctx,
[]st.RegisterPluginFunc{ []st.RegisterPluginFunc{
tt.registerPlugin, tt.registerPlugin,
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
}, },
"", "",
ctx.Done(),
frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())), frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())),
frameworkruntime.WithSnapshotSharedLister(snapshot), frameworkruntime.WithSnapshotSharedLister(snapshot),
frameworkruntime.WithLogger(logger), frameworkruntime.WithLogger(logger),
@ -1485,7 +1486,9 @@ func TestPodEligibleToPreemptOthers(t *testing.T) {
for _, test := range tests { for _, test := range tests {
t.Run(test.name, func(t *testing.T) { t.Run(test.name, func(t *testing.T) {
logger, _ := ktesting.NewTestContext(t) logger, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
var nodes []*v1.Node var nodes []*v1.Node
for _, n := range test.nodes { for _, n := range test.nodes {
nodes = append(nodes, st.MakeNode().Name(n).Obj()) nodes = append(nodes, st.MakeNode().Name(n).Obj())
@ -1494,9 +1497,7 @@ func TestPodEligibleToPreemptOthers(t *testing.T) {
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
} }
stopCh := make(chan struct{}) f, err := st.NewFramework(ctx, registeredPlugins, "",
defer close(stopCh)
f, err := st.NewFramework(registeredPlugins, "", stopCh,
frameworkruntime.WithSnapshotSharedLister(internalcache.NewSnapshot(test.pods, nodes)), frameworkruntime.WithSnapshotSharedLister(internalcache.NewSnapshot(test.pods, nodes)),
frameworkruntime.WithLogger(logger), frameworkruntime.WithLogger(logger),
) )
@ -1730,13 +1731,13 @@ func TestPreempt(t *testing.T) {
extenders = append(extenders, extender) extenders = append(extenders, extender)
} }
fwk, err := st.NewFramework( fwk, err := st.NewFramework(
ctx,
[]st.RegisterPluginFunc{ []st.RegisterPluginFunc{
test.registerPlugin, test.registerPlugin,
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
}, },
"", "",
ctx.Done(),
frameworkruntime.WithClientSet(client), frameworkruntime.WithClientSet(client),
frameworkruntime.WithEventRecorder(&events.FakeRecorder{}), frameworkruntime.WithEventRecorder(&events.FakeRecorder{}),
frameworkruntime.WithExtenders(extenders), frameworkruntime.WithExtenders(extenders),

View File

@ -680,7 +680,7 @@ func setup(t *testing.T, nodes []*v1.Node, claims []*resourcev1alpha2.ResourceCl
runtime.WithClientSet(tc.client), runtime.WithClientSet(tc.client),
runtime.WithInformerFactory(informerFactory), runtime.WithInformerFactory(informerFactory),
} }
fh, err := runtime.NewFramework(nil, nil, tc.ctx.Done(), opts...) fh, err := runtime.NewFramework(ctx, nil, nil, opts...)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@ -25,6 +25,7 @@ import (
"github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog/v2/ktesting"
"k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/runtime" "k8s.io/kubernetes/pkg/scheduler/framework/runtime"
"k8s.io/kubernetes/pkg/scheduler/internal/cache" "k8s.io/kubernetes/pkg/scheduler/internal/cache"
@ -331,12 +332,13 @@ func TestImageLocalityPriority(t *testing.T) {
for _, test := range tests { for _, test := range tests {
t.Run(test.name, func(t *testing.T) { t.Run(test.name, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) _, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
snapshot := cache.NewSnapshot(nil, test.nodes) snapshot := cache.NewSnapshot(nil, test.nodes)
state := framework.NewCycleState() state := framework.NewCycleState()
fh, _ := runtime.NewFramework(nil, nil, ctx.Done(), runtime.WithSnapshotSharedLister(snapshot)) fh, _ := runtime.NewFramework(ctx, nil, nil, runtime.WithSnapshotSharedLister(snapshot))
p, _ := New(nil, fh) p, _ := New(nil, fh)
var gotList framework.NodeScoreList var gotList framework.NodeScoreList

View File

@ -24,6 +24,7 @@ import (
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
"k8s.io/klog/v2/ktesting"
"k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/runtime" "k8s.io/kubernetes/pkg/scheduler/framework/runtime"
@ -1134,11 +1135,12 @@ func TestNodeAffinityPriority(t *testing.T) {
for _, test := range tests { for _, test := range tests {
t.Run(test.name, func(t *testing.T) { t.Run(test.name, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) _, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
state := framework.NewCycleState() state := framework.NewCycleState()
fh, _ := runtime.NewFramework(nil, nil, ctx.Done(), runtime.WithSnapshotSharedLister(cache.NewSnapshot(nil, test.nodes))) fh, _ := runtime.NewFramework(ctx, nil, nil, runtime.WithSnapshotSharedLister(cache.NewSnapshot(nil, test.nodes)))
p, err := New(&test.args, fh) p, err := New(&test.args, fh)
if err != nil { if err != nil {
t.Fatalf("Creating plugin: %v", err) t.Fatalf("Creating plugin: %v", err)

View File

@ -24,6 +24,7 @@ import (
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog/v2/ktesting"
"k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
@ -384,9 +385,10 @@ func TestNodeResourcesBalancedAllocation(t *testing.T) {
for _, test := range tests { for _, test := range tests {
t.Run(test.name, func(t *testing.T) { t.Run(test.name, func(t *testing.T) {
snapshot := cache.NewSnapshot(test.pods, test.nodes) snapshot := cache.NewSnapshot(test.pods, test.nodes)
ctx, cancel := context.WithCancel(context.Background()) _, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
fh, _ := runtime.NewFramework(nil, nil, ctx.Done(), runtime.WithSnapshotSharedLister(snapshot)) fh, _ := runtime.NewFramework(ctx, nil, nil, runtime.WithSnapshotSharedLister(snapshot))
p, _ := NewBalancedAllocation(&test.args, fh, feature.Features{}) p, _ := NewBalancedAllocation(&test.args, fh, feature.Features{})
state := framework.NewCycleState() state := framework.NewCycleState()
for i := range test.nodes { for i := range test.nodes {

View File

@ -25,6 +25,7 @@ import (
"github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/api/resource"
"k8s.io/klog/v2/ktesting"
"k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework"
plfeature "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature" plfeature "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
@ -836,12 +837,13 @@ func TestFitScore(t *testing.T) {
for _, test := range tests { for _, test := range tests {
t.Run(test.name, func(t *testing.T) { t.Run(test.name, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) _, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
state := framework.NewCycleState() state := framework.NewCycleState()
snapshot := cache.NewSnapshot(test.existingPods, test.nodes) snapshot := cache.NewSnapshot(test.existingPods, test.nodes)
fh, _ := runtime.NewFramework(nil, nil, ctx.Done(), runtime.WithSnapshotSharedLister(snapshot)) fh, _ := runtime.NewFramework(ctx, nil, nil, runtime.WithSnapshotSharedLister(snapshot))
args := test.nodeResourcesFitArgs args := test.nodeResourcesFitArgs
p, err := NewFit(&args, fh, plfeature.Features{}) p, err := NewFit(&args, fh, plfeature.Features{})
if err != nil { if err != nil {
@ -958,6 +960,9 @@ func BenchmarkTestFitScore(b *testing.B) {
for _, test := range tests { for _, test := range tests {
b.Run(test.name, func(b *testing.B) { b.Run(test.name, func(b *testing.B) {
_, ctx := ktesting.NewTestContext(b)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
existingPods := []*v1.Pod{ existingPods := []*v1.Pod{
st.MakePod().Node("node1").Req(map[v1.ResourceName]string{"cpu": "2000", "memory": "4000"}).Obj(), st.MakePod().Node("node1").Req(map[v1.ResourceName]string{"cpu": "2000", "memory": "4000"}).Obj(),
} }
@ -966,15 +971,14 @@ func BenchmarkTestFitScore(b *testing.B) {
} }
state := framework.NewCycleState() state := framework.NewCycleState()
var nodeResourcesFunc = runtime.FactoryAdapter(plfeature.Features{}, NewFit) var nodeResourcesFunc = runtime.FactoryAdapter(plfeature.Features{}, NewFit)
pl := plugintesting.SetupPlugin(b, nodeResourcesFunc, &test.nodeResourcesFitArgs, cache.NewSnapshot(existingPods, nodes)) pl := plugintesting.SetupPlugin(ctx, b, nodeResourcesFunc, &test.nodeResourcesFitArgs, cache.NewSnapshot(existingPods, nodes))
p := pl.(*Fit) p := pl.(*Fit)
b.ResetTimer() b.ResetTimer()
requestedPod := st.MakePod().Req(map[v1.ResourceName]string{"cpu": "1000", "memory": "2000"}).Obj() requestedPod := st.MakePod().Req(map[v1.ResourceName]string{"cpu": "1000", "memory": "2000"}).Obj()
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
_, status := p.Score(ctx, state, requestedPod, nodes[0].Name)
_, status := p.Score(context.Background(), state, requestedPod, nodes[0].Name)
if !status.IsSuccess() { if !status.IsSuccess() {
b.Errorf("unexpected status: %v", status) b.Errorf("unexpected status: %v", status)
} }

View File

@ -23,6 +23,7 @@ import (
"github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/validation/field" "k8s.io/apimachinery/pkg/util/validation/field"
"k8s.io/klog/v2/ktesting"
"k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework"
plfeature "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature" plfeature "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
@ -385,12 +386,13 @@ func TestLeastAllocatedScoringStrategy(t *testing.T) {
for _, test := range tests { for _, test := range tests {
t.Run(test.name, func(t *testing.T) { t.Run(test.name, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) _, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
state := framework.NewCycleState() state := framework.NewCycleState()
snapshot := cache.NewSnapshot(test.existingPods, test.nodes) snapshot := cache.NewSnapshot(test.existingPods, test.nodes)
fh, _ := runtime.NewFramework(nil, nil, ctx.Done(), runtime.WithSnapshotSharedLister(snapshot)) fh, _ := runtime.NewFramework(ctx, nil, nil, runtime.WithSnapshotSharedLister(snapshot))
p, err := NewFit( p, err := NewFit(
&config.NodeResourcesFitArgs{ &config.NodeResourcesFitArgs{

View File

@ -23,6 +23,7 @@ import (
"github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/validation/field" "k8s.io/apimachinery/pkg/util/validation/field"
"k8s.io/klog/v2/ktesting"
"k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework"
plfeature "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature" plfeature "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
@ -342,12 +343,13 @@ func TestMostAllocatedScoringStrategy(t *testing.T) {
for _, test := range tests { for _, test := range tests {
t.Run(test.name, func(t *testing.T) { t.Run(test.name, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) _, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
state := framework.NewCycleState() state := framework.NewCycleState()
snapshot := cache.NewSnapshot(test.existingPods, test.nodes) snapshot := cache.NewSnapshot(test.existingPods, test.nodes)
fh, _ := runtime.NewFramework(nil, nil, ctx.Done(), runtime.WithSnapshotSharedLister(snapshot)) fh, _ := runtime.NewFramework(ctx, nil, nil, runtime.WithSnapshotSharedLister(snapshot))
p, err := NewFit( p, err := NewFit(
&config.NodeResourcesFitArgs{ &config.NodeResourcesFitArgs{

View File

@ -25,7 +25,7 @@ import (
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/validation/field" "k8s.io/apimachinery/pkg/util/validation/field"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/klog/v2/ktesting"
"k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework"
plfeature "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature" plfeature "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
@ -103,12 +103,13 @@ func TestRequestedToCapacityRatioScoringStrategy(t *testing.T) {
for _, test := range tests { for _, test := range tests {
t.Run(test.name, func(t *testing.T) { t.Run(test.name, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) _, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
state := framework.NewCycleState() state := framework.NewCycleState()
snapshot := cache.NewSnapshot(test.existingPods, test.nodes) snapshot := cache.NewSnapshot(test.existingPods, test.nodes)
fh, _ := runtime.NewFramework(nil, nil, ctx.Done(), runtime.WithSnapshotSharedLister(snapshot)) fh, _ := runtime.NewFramework(ctx, nil, nil, runtime.WithSnapshotSharedLister(snapshot))
p, err := NewFit(&config.NodeResourcesFitArgs{ p, err := NewFit(&config.NodeResourcesFitArgs{
ScoringStrategy: &config.ScoringStrategy{ ScoringStrategy: &config.ScoringStrategy{
@ -303,7 +304,8 @@ func TestResourceBinPackingSingleExtended(t *testing.T) {
t.Run(test.name, func(t *testing.T) { t.Run(test.name, func(t *testing.T) {
state := framework.NewCycleState() state := framework.NewCycleState()
snapshot := cache.NewSnapshot(test.pods, test.nodes) snapshot := cache.NewSnapshot(test.pods, test.nodes)
fh, _ := runtime.NewFramework(nil, nil, wait.NeverStop, runtime.WithSnapshotSharedLister(snapshot)) _, ctx := ktesting.NewTestContext(t)
fh, _ := runtime.NewFramework(ctx, nil, nil, runtime.WithSnapshotSharedLister(snapshot))
args := config.NodeResourcesFitArgs{ args := config.NodeResourcesFitArgs{
ScoringStrategy: &config.ScoringStrategy{ ScoringStrategy: &config.ScoringStrategy{
Type: config.RequestedToCapacityRatio, Type: config.RequestedToCapacityRatio,
@ -527,7 +529,8 @@ func TestResourceBinPackingMultipleExtended(t *testing.T) {
t.Run(test.name, func(t *testing.T) { t.Run(test.name, func(t *testing.T) {
state := framework.NewCycleState() state := framework.NewCycleState()
snapshot := cache.NewSnapshot(test.pods, test.nodes) snapshot := cache.NewSnapshot(test.pods, test.nodes)
fh, _ := runtime.NewFramework(nil, nil, wait.NeverStop, runtime.WithSnapshotSharedLister(snapshot)) _, ctx := ktesting.NewTestContext(t)
fh, _ := runtime.NewFramework(ctx, nil, nil, runtime.WithSnapshotSharedLister(snapshot))
args := config.NodeResourcesFitArgs{ args := config.NodeResourcesFitArgs{
ScoringStrategy: &config.ScoringStrategy{ ScoringStrategy: &config.ScoringStrategy{

View File

@ -1986,7 +1986,7 @@ func TestPreFilterStateAddPod(t *testing.T) {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
_, ctx := ktesting.NewTestContext(t) _, ctx := ktesting.NewTestContext(t)
snapshot := cache.NewSnapshot(tt.existingPods, tt.nodes) snapshot := cache.NewSnapshot(tt.existingPods, tt.nodes)
pl := plugintesting.SetupPlugin(t, topologySpreadFunc, &config.PodTopologySpreadArgs{DefaultingType: config.ListDefaulting}, snapshot) pl := plugintesting.SetupPlugin(ctx, t, topologySpreadFunc, &config.PodTopologySpreadArgs{DefaultingType: config.ListDefaulting}, snapshot)
p := pl.(*PodTopologySpread) p := pl.(*PodTopologySpread)
p.enableNodeInclusionPolicyInPodTopologySpread = tt.enableNodeInclusionPolicy p.enableNodeInclusionPolicyInPodTopologySpread = tt.enableNodeInclusionPolicy
@ -2302,7 +2302,7 @@ func TestPreFilterStateRemovePod(t *testing.T) {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
_, ctx := ktesting.NewTestContext(t) _, ctx := ktesting.NewTestContext(t)
snapshot := cache.NewSnapshot(tt.existingPods, tt.nodes) snapshot := cache.NewSnapshot(tt.existingPods, tt.nodes)
pl := plugintesting.SetupPlugin(t, topologySpreadFunc, &config.PodTopologySpreadArgs{DefaultingType: config.ListDefaulting}, snapshot) pl := plugintesting.SetupPlugin(ctx, t, topologySpreadFunc, &config.PodTopologySpreadArgs{DefaultingType: config.ListDefaulting}, snapshot)
p := pl.(*PodTopologySpread) p := pl.(*PodTopologySpread)
p.enableNodeInclusionPolicyInPodTopologySpread = tt.enableNodeInclusionPolicy p.enableNodeInclusionPolicyInPodTopologySpread = tt.enableNodeInclusionPolicy
@ -2376,8 +2376,8 @@ func BenchmarkFilter(b *testing.B) {
var state *framework.CycleState var state *framework.CycleState
b.Run(tt.name, func(b *testing.B) { b.Run(tt.name, func(b *testing.B) {
existingPods, allNodes, _ := st.MakeNodesAndPodsForEvenPodsSpread(tt.pod.Labels, tt.existingPodsNum, tt.allNodesNum, tt.filteredNodesNum) existingPods, allNodes, _ := st.MakeNodesAndPodsForEvenPodsSpread(tt.pod.Labels, tt.existingPodsNum, tt.allNodesNum, tt.filteredNodesNum)
ctx := context.Background() _, ctx := ktesting.NewTestContext(b)
pl := plugintesting.SetupPlugin(b, topologySpreadFunc, &config.PodTopologySpreadArgs{DefaultingType: config.ListDefaulting}, cache.NewSnapshot(existingPods, allNodes)) pl := plugintesting.SetupPlugin(ctx, b, topologySpreadFunc, &config.PodTopologySpreadArgs{DefaultingType: config.ListDefaulting}, cache.NewSnapshot(existingPods, allNodes))
p := pl.(*PodTopologySpread) p := pl.(*PodTopologySpread)
b.ResetTimer() b.ResetTimer()
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
@ -3007,7 +3007,7 @@ func TestSingleConstraint(t *testing.T) {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
_, ctx := ktesting.NewTestContext(t) _, ctx := ktesting.NewTestContext(t)
snapshot := cache.NewSnapshot(tt.existingPods, tt.nodes) snapshot := cache.NewSnapshot(tt.existingPods, tt.nodes)
pl := plugintesting.SetupPlugin(t, topologySpreadFunc, &config.PodTopologySpreadArgs{DefaultingType: config.ListDefaulting}, snapshot) pl := plugintesting.SetupPlugin(ctx, t, topologySpreadFunc, &config.PodTopologySpreadArgs{DefaultingType: config.ListDefaulting}, snapshot)
p := pl.(*PodTopologySpread) p := pl.(*PodTopologySpread)
p.enableMinDomainsInPodTopologySpread = tt.enableMinDomains p.enableMinDomainsInPodTopologySpread = tt.enableMinDomains
p.enableNodeInclusionPolicyInPodTopologySpread = tt.enableNodeInclusionPolicy p.enableNodeInclusionPolicyInPodTopologySpread = tt.enableNodeInclusionPolicy
@ -3352,7 +3352,7 @@ func TestMultipleConstraints(t *testing.T) {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
_, ctx := ktesting.NewTestContext(t) _, ctx := ktesting.NewTestContext(t)
snapshot := cache.NewSnapshot(tt.existingPods, tt.nodes) snapshot := cache.NewSnapshot(tt.existingPods, tt.nodes)
pl := plugintesting.SetupPlugin(t, topologySpreadFunc, &config.PodTopologySpreadArgs{DefaultingType: config.ListDefaulting}, snapshot) pl := plugintesting.SetupPlugin(ctx, t, topologySpreadFunc, &config.PodTopologySpreadArgs{DefaultingType: config.ListDefaulting}, snapshot)
p := pl.(*PodTopologySpread) p := pl.(*PodTopologySpread)
p.enableNodeInclusionPolicyInPodTopologySpread = tt.enableNodeInclusionPolicy p.enableNodeInclusionPolicyInPodTopologySpread = tt.enableNodeInclusionPolicy
state := framework.NewCycleState() state := framework.NewCycleState()
@ -3376,7 +3376,8 @@ func TestPreFilterDisabled(t *testing.T) {
nodeInfo := framework.NewNodeInfo() nodeInfo := framework.NewNodeInfo()
node := v1.Node{} node := v1.Node{}
nodeInfo.SetNode(&node) nodeInfo.SetNode(&node)
p := plugintesting.SetupPlugin(t, topologySpreadFunc, &config.PodTopologySpreadArgs{DefaultingType: config.ListDefaulting}, cache.NewEmptySnapshot()) _, ctx := ktesting.NewTestContext(t)
p := plugintesting.SetupPlugin(ctx, t, topologySpreadFunc, &config.PodTopologySpreadArgs{DefaultingType: config.ListDefaulting}, cache.NewEmptySnapshot())
cycleState := framework.NewCycleState() cycleState := framework.NewCycleState()
gotStatus := p.(*PodTopologySpread).Filter(context.Background(), cycleState, pod, nodeInfo) gotStatus := p.(*PodTopologySpread).Filter(context.Background(), cycleState, pod, nodeInfo)
wantStatus := framework.AsStatus(fmt.Errorf(`reading "PreFilterPodTopologySpread" from cycleState: %w`, framework.ErrNotFound)) wantStatus := framework.AsStatus(fmt.Errorf(`reading "PreFilterPodTopologySpread" from cycleState: %w`, framework.ErrNotFound))

View File

@ -29,6 +29,7 @@ import (
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/informers" "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/kubernetes/fake"
"k8s.io/klog/v2/ktesting"
"k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
@ -526,10 +527,11 @@ func TestPreScoreStateEmptyNodes(t *testing.T) {
} }
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) _, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
informerFactory := informers.NewSharedInformerFactory(fake.NewSimpleClientset(tt.objs...), 0) informerFactory := informers.NewSharedInformerFactory(fake.NewSimpleClientset(tt.objs...), 0)
f, err := frameworkruntime.NewFramework(nil, nil, ctx.Done(), f, err := frameworkruntime.NewFramework(ctx, nil, nil,
frameworkruntime.WithSnapshotSharedLister(cache.NewSnapshot(nil, tt.nodes)), frameworkruntime.WithSnapshotSharedLister(cache.NewSnapshot(nil, tt.nodes)),
frameworkruntime.WithInformerFactory(informerFactory)) frameworkruntime.WithInformerFactory(informerFactory))
if err != nil { if err != nil {
@ -1364,12 +1366,13 @@ func BenchmarkTestPodTopologySpreadScore(b *testing.B) {
} }
for _, tt := range tests { for _, tt := range tests {
b.Run(tt.name, func(b *testing.B) { b.Run(tt.name, func(b *testing.B) {
_, ctx := ktesting.NewTestContext(b)
existingPods, allNodes, filteredNodes := st.MakeNodesAndPodsForEvenPodsSpread(tt.pod.Labels, tt.existingPodsNum, tt.allNodesNum, tt.filteredNodesNum) existingPods, allNodes, filteredNodes := st.MakeNodesAndPodsForEvenPodsSpread(tt.pod.Labels, tt.existingPodsNum, tt.allNodesNum, tt.filteredNodesNum)
state := framework.NewCycleState() state := framework.NewCycleState()
pl := plugintesting.SetupPlugin(b, podTopologySpreadFunc, &config.PodTopologySpreadArgs{DefaultingType: config.ListDefaulting}, cache.NewSnapshot(existingPods, allNodes)) pl := plugintesting.SetupPlugin(ctx, b, podTopologySpreadFunc, &config.PodTopologySpreadArgs{DefaultingType: config.ListDefaulting}, cache.NewSnapshot(existingPods, allNodes))
p := pl.(*PodTopologySpread) p := pl.(*PodTopologySpread)
status := p.PreScore(context.Background(), state, tt.pod, filteredNodes) status := p.PreScore(ctx, state, tt.pod, filteredNodes)
if !status.IsSuccess() { if !status.IsSuccess() {
b.Fatalf("unexpected error: %v", status) b.Fatalf("unexpected error: %v", status)
} }
@ -1434,10 +1437,11 @@ func BenchmarkTestDefaultEvenPodsSpreadPriority(b *testing.B) {
client := fake.NewSimpleClientset( client := fake.NewSimpleClientset(
&v1.Service{Spec: v1.ServiceSpec{Selector: map[string]string{"foo": ""}}}, &v1.Service{Spec: v1.ServiceSpec{Selector: map[string]string{"foo": ""}}},
) )
ctx, cancel := context.WithCancel(context.Background()) _, ctx := ktesting.NewTestContext(b)
ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
informerFactory := informers.NewSharedInformerFactory(client, 0) informerFactory := informers.NewSharedInformerFactory(client, 0)
f, err := frameworkruntime.NewFramework(nil, nil, ctx.Done(), f, err := frameworkruntime.NewFramework(ctx, nil, nil,
frameworkruntime.WithSnapshotSharedLister(snapshot), frameworkruntime.WithSnapshotSharedLister(snapshot),
frameworkruntime.WithInformerFactory(informerFactory)) frameworkruntime.WithInformerFactory(informerFactory))
if err != nil { if err != nil {

View File

@ -23,6 +23,7 @@ import (
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
"k8s.io/client-go/informers" "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/kubernetes/fake"
"k8s.io/klog/v2/ktesting"
"k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/parallelize" "k8s.io/kubernetes/pkg/scheduler/framework/parallelize"
"k8s.io/kubernetes/pkg/scheduler/framework/runtime" "k8s.io/kubernetes/pkg/scheduler/framework/runtime"
@ -58,7 +59,8 @@ func BenchmarkTestSelectorSpreadPriority(b *testing.B) {
client := fake.NewSimpleClientset( client := fake.NewSimpleClientset(
&v1.Service{Spec: v1.ServiceSpec{Selector: map[string]string{"foo": ""}}}, &v1.Service{Spec: v1.ServiceSpec{Selector: map[string]string{"foo": ""}}},
) )
ctx, cancel := context.WithCancel(context.Background()) _, ctx := ktesting.NewTestContext(b)
ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
informerFactory := informers.NewSharedInformerFactory(client, 0) informerFactory := informers.NewSharedInformerFactory(client, 0)
_ = informerFactory.Core().V1().Services().Lister() _ = informerFactory.Core().V1().Services().Lister()
@ -69,7 +71,7 @@ func BenchmarkTestSelectorSpreadPriority(b *testing.B) {
b.Errorf("error waiting for informer cache sync") b.Errorf("error waiting for informer cache sync")
} }
} }
fh, _ := runtime.NewFramework(nil, nil, ctx.Done(), runtime.WithSnapshotSharedLister(snapshot), runtime.WithInformerFactory(informerFactory)) fh, _ := runtime.NewFramework(ctx, nil, nil, runtime.WithSnapshotSharedLister(snapshot), runtime.WithInformerFactory(informerFactory))
pl, err := New(nil, fh) pl, err := New(nil, fh)
if err != nil { if err != nil {
b.Fatal(err) b.Fatal(err)

View File

@ -28,9 +28,9 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers" "k8s.io/client-go/informers"
clientsetfake "k8s.io/client-go/kubernetes/fake" clientsetfake "k8s.io/client-go/kubernetes/fake"
"k8s.io/klog/v2/ktesting"
"k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework"
frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime" frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime"
"k8s.io/kubernetes/pkg/scheduler/internal/cache" "k8s.io/kubernetes/pkg/scheduler/internal/cache"
@ -397,13 +397,14 @@ func TestSelectorSpreadScore(t *testing.T) {
t.Run(test.name, func(t *testing.T) { t.Run(test.name, func(t *testing.T) {
nodes := makeNodeList(test.nodes) nodes := makeNodeList(test.nodes)
snapshot := cache.NewSnapshot(test.pods, nodes) snapshot := cache.NewSnapshot(test.pods, nodes)
ctx, cancel := context.WithCancel(context.Background()) _, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
informerFactory, err := populateAndStartInformers(ctx, test.rcs, test.rss, test.services, test.sss) informerFactory, err := populateAndStartInformers(ctx, test.rcs, test.rss, test.services, test.sss)
if err != nil { if err != nil {
t.Errorf("error creating informerFactory: %+v", err) t.Errorf("error creating informerFactory: %+v", err)
} }
fh, err := frameworkruntime.NewFramework(nil, nil, ctx.Done(), frameworkruntime.WithSnapshotSharedLister(snapshot), frameworkruntime.WithInformerFactory(informerFactory)) fh, err := frameworkruntime.NewFramework(ctx, nil, nil, frameworkruntime.WithSnapshotSharedLister(snapshot), frameworkruntime.WithInformerFactory(informerFactory))
if err != nil { if err != nil {
t.Errorf("error creating new framework handle: %+v", err) t.Errorf("error creating new framework handle: %+v", err)
} }
@ -655,13 +656,14 @@ func TestZoneSelectorSpreadPriority(t *testing.T) {
t.Run(test.name, func(t *testing.T) { t.Run(test.name, func(t *testing.T) {
nodes := makeLabeledNodeList(labeledNodes) nodes := makeLabeledNodeList(labeledNodes)
snapshot := cache.NewSnapshot(test.pods, nodes) snapshot := cache.NewSnapshot(test.pods, nodes)
ctx, cancel := context.WithCancel(context.Background()) _, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
informerFactory, err := populateAndStartInformers(ctx, test.rcs, test.rss, test.services, test.sss) informerFactory, err := populateAndStartInformers(ctx, test.rcs, test.rss, test.services, test.sss)
if err != nil { if err != nil {
t.Errorf("error creating informerFactory: %+v", err) t.Errorf("error creating informerFactory: %+v", err)
} }
fh, err := frameworkruntime.NewFramework(nil, nil, wait.NeverStop, frameworkruntime.WithSnapshotSharedLister(snapshot), frameworkruntime.WithInformerFactory(informerFactory)) fh, err := frameworkruntime.NewFramework(ctx, nil, nil, frameworkruntime.WithSnapshotSharedLister(snapshot), frameworkruntime.WithInformerFactory(informerFactory))
if err != nil { if err != nil {
t.Errorf("error creating new framework handle: %+v", err) t.Errorf("error creating new framework handle: %+v", err)
} }

View File

@ -23,6 +23,7 @@ import (
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog/v2/ktesting"
"k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/runtime" "k8s.io/kubernetes/pkg/scheduler/framework/runtime"
"k8s.io/kubernetes/pkg/scheduler/internal/cache" "k8s.io/kubernetes/pkg/scheduler/internal/cache"
@ -228,12 +229,13 @@ func TestTaintTolerationScore(t *testing.T) {
} }
for _, test := range tests { for _, test := range tests {
t.Run(test.name, func(t *testing.T) { t.Run(test.name, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) _, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
state := framework.NewCycleState() state := framework.NewCycleState()
snapshot := cache.NewSnapshot(nil, test.nodes) snapshot := cache.NewSnapshot(nil, test.nodes)
fh, _ := runtime.NewFramework(nil, nil, ctx.Done(), runtime.WithSnapshotSharedLister(snapshot)) fh, _ := runtime.NewFramework(ctx, nil, nil, runtime.WithSnapshotSharedLister(snapshot))
p, _ := New(nil, fh) p, _ := New(nil, fh)
status := p.(framework.PreScorePlugin).PreScore(ctx, state, test.pod, test.nodes) status := p.(framework.PreScorePlugin).PreScore(ctx, state, test.pod, test.nodes)

View File

@ -23,7 +23,6 @@ import (
"k8s.io/api/core/v1" "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers" "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/kubernetes/fake"
"k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework"
@ -44,7 +43,7 @@ func SetupPluginWithInformers(
) framework.Plugin { ) framework.Plugin {
objs = append([]runtime.Object{&v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: ""}}}, objs...) objs = append([]runtime.Object{&v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: ""}}}, objs...)
informerFactory := informers.NewSharedInformerFactory(fake.NewSimpleClientset(objs...), 0) informerFactory := informers.NewSharedInformerFactory(fake.NewSimpleClientset(objs...), 0)
fh, err := frameworkruntime.NewFramework(nil, nil, wait.NeverStop, fh, err := frameworkruntime.NewFramework(ctx, nil, nil,
frameworkruntime.WithSnapshotSharedLister(sharedLister), frameworkruntime.WithSnapshotSharedLister(sharedLister),
frameworkruntime.WithInformerFactory(informerFactory)) frameworkruntime.WithInformerFactory(informerFactory))
if err != nil { if err != nil {
@ -62,12 +61,13 @@ func SetupPluginWithInformers(
// SetupPlugin creates a plugin using a framework handle that includes // SetupPlugin creates a plugin using a framework handle that includes
// the provided sharedLister. // the provided sharedLister.
func SetupPlugin( func SetupPlugin(
ctx context.Context,
tb testing.TB, tb testing.TB,
pf frameworkruntime.PluginFactory, pf frameworkruntime.PluginFactory,
config runtime.Object, config runtime.Object,
sharedLister framework.SharedLister, sharedLister framework.SharedLister,
) framework.Plugin { ) framework.Plugin {
fh, err := frameworkruntime.NewFramework(nil, nil, wait.NeverStop, fh, err := frameworkruntime.NewFramework(ctx, nil, nil,
frameworkruntime.WithSnapshotSharedLister(sharedLister)) frameworkruntime.WithSnapshotSharedLister(sharedLister))
if err != nil { if err != nil {
tb.Fatalf("Failed creating framework runtime: %v", err) tb.Fatalf("Failed creating framework runtime: %v", err)

View File

@ -28,9 +28,9 @@ import (
"k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers" "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/kubernetes/fake"
"k8s.io/klog/v2/ktesting"
"k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
@ -781,7 +781,8 @@ func TestVolumeBinding(t *testing.T) {
for _, item := range table { for _, item := range table {
t.Run(item.name, func(t *testing.T) { t.Run(item.name, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) _, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
client := fake.NewSimpleClientset() client := fake.NewSimpleClientset()
informerFactory := informers.NewSharedInformerFactory(client, 0) informerFactory := informers.NewSharedInformerFactory(client, 0)
@ -789,7 +790,7 @@ func TestVolumeBinding(t *testing.T) {
runtime.WithClientSet(client), runtime.WithClientSet(client),
runtime.WithInformerFactory(informerFactory), runtime.WithInformerFactory(informerFactory),
} }
fh, err := runtime.NewFramework(nil, nil, wait.NeverStop, opts...) fh, err := runtime.NewFramework(ctx, nil, nil, opts...)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@ -281,10 +281,12 @@ func TestDryRunPreemption(t *testing.T) {
} }
informerFactory := informers.NewSharedInformerFactory(clientsetfake.NewSimpleClientset(objs...), 0) informerFactory := informers.NewSharedInformerFactory(clientsetfake.NewSimpleClientset(objs...), 0)
parallelism := parallelize.DefaultParallelism parallelism := parallelize.DefaultParallelism
ctx, cancel := context.WithCancel(context.Background()) _, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
fwk, err := st.NewFramework( fwk, err := st.NewFramework(
registeredPlugins, "", ctx.Done(), ctx,
registeredPlugins, "",
frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())), frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())),
frameworkruntime.WithInformerFactory(informerFactory), frameworkruntime.WithInformerFactory(informerFactory),
frameworkruntime.WithParallelism(parallelism), frameworkruntime.WithParallelism(parallelism),

View File

@ -250,13 +250,13 @@ func defaultFrameworkOptions(stopCh <-chan struct{}) frameworkOptions {
var _ framework.Framework = &frameworkImpl{} var _ framework.Framework = &frameworkImpl{}
// NewFramework initializes plugins given the configuration and the registry. // NewFramework initializes plugins given the configuration and the registry.
func NewFramework(r Registry, profile *config.KubeSchedulerProfile, stopCh <-chan struct{}, opts ...Option) (framework.Framework, error) { func NewFramework(ctx context.Context, r Registry, profile *config.KubeSchedulerProfile, opts ...Option) (framework.Framework, error) {
options := defaultFrameworkOptions(stopCh) options := defaultFrameworkOptions(ctx.Done())
for _, opt := range opts { for _, opt := range opts {
opt(&options) opt(&options)
} }
logger := klog.TODO() logger := klog.FromContext(ctx)
if options.logger != nil { if options.logger != nil {
logger = *options.logger logger = *options.logger
} }

View File

@ -31,8 +31,8 @@ import (
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/component-base/metrics/testutil" "k8s.io/component-base/metrics/testutil"
"k8s.io/klog/v2/ktesting"
"k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework"
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue" internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
@ -405,7 +405,7 @@ var (
errInjectedFilterStatus = errors.New(injectFilterReason) errInjectedFilterStatus = errors.New(injectFilterReason)
) )
func newFrameworkWithQueueSortAndBind(r Registry, profile config.KubeSchedulerProfile, stopCh <-chan struct{}, opts ...Option) (framework.Framework, error) { func newFrameworkWithQueueSortAndBind(ctx context.Context, r Registry, profile config.KubeSchedulerProfile, opts ...Option) (framework.Framework, error) {
if _, ok := r[queueSortPlugin]; !ok { if _, ok := r[queueSortPlugin]; !ok {
r[queueSortPlugin] = newQueueSortPlugin r[queueSortPlugin] = newQueueSortPlugin
} }
@ -419,7 +419,7 @@ func newFrameworkWithQueueSortAndBind(r Registry, profile config.KubeSchedulerPr
if len(profile.Plugins.Bind.Enabled) == 0 { if len(profile.Plugins.Bind.Enabled) == 0 {
profile.Plugins.Bind.Enabled = append(profile.Plugins.Bind.Enabled, config.Plugin{Name: bindPlugin}) profile.Plugins.Bind.Enabled = append(profile.Plugins.Bind.Enabled, config.Plugin{Name: bindPlugin})
} }
return NewFramework(r, &profile, stopCh, opts...) return NewFramework(ctx, r, &profile, opts...)
} }
func TestInitFrameworkWithScorePlugins(t *testing.T) { func TestInitFrameworkWithScorePlugins(t *testing.T) {
@ -460,9 +460,10 @@ func TestInitFrameworkWithScorePlugins(t *testing.T) {
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
profile := config.KubeSchedulerProfile{Plugins: tt.plugins} profile := config.KubeSchedulerProfile{Plugins: tt.plugins}
stopCh := make(chan struct{}) _, ctx := ktesting.NewTestContext(t)
defer close(stopCh) ctx, cancel := context.WithCancel(ctx)
_, err := newFrameworkWithQueueSortAndBind(registry, profile, stopCh) defer cancel()
_, err := newFrameworkWithQueueSortAndBind(ctx, registry, profile)
if tt.initErr && err == nil { if tt.initErr && err == nil {
t.Fatal("Framework initialization should fail") t.Fatal("Framework initialization should fail")
} }
@ -514,11 +515,14 @@ func TestNewFrameworkErrors(t *testing.T) {
for _, tc := range tests { for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) { t.Run(tc.name, func(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
profile := &config.KubeSchedulerProfile{ profile := &config.KubeSchedulerProfile{
Plugins: tc.plugins, Plugins: tc.plugins,
PluginConfig: tc.pluginCfg, PluginConfig: tc.pluginCfg,
} }
_, err := NewFramework(registry, profile, wait.NeverStop) _, err := NewFramework(ctx, registry, profile)
if err == nil || !strings.Contains(err.Error(), tc.wantErr) { if err == nil || !strings.Contains(err.Error(), tc.wantErr) {
t.Errorf("Unexpected error, got %v, expect: %s", err, tc.wantErr) t.Errorf("Unexpected error, got %v, expect: %s", err, tc.wantErr)
} }
@ -826,9 +830,10 @@ func TestNewFrameworkMultiPointExpansion(t *testing.T) {
for _, tc := range tests { for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) { t.Run(tc.name, func(t *testing.T) {
stopCh := make(chan struct{}) _, ctx := ktesting.NewTestContext(t)
defer close(stopCh) ctx, cancel := context.WithCancel(ctx)
fw, err := NewFramework(registry, &config.KubeSchedulerProfile{Plugins: tc.plugins}, stopCh) defer cancel()
fw, err := NewFramework(ctx, registry, &config.KubeSchedulerProfile{Plugins: tc.plugins})
if err != nil { if err != nil {
if tc.wantErr == "" || !strings.Contains(err.Error(), tc.wantErr) { if tc.wantErr == "" || !strings.Contains(err.Error(), tc.wantErr) {
t.Fatalf("Unexpected error, got %v, expect: %s", err, tc.wantErr) t.Fatalf("Unexpected error, got %v, expect: %s", err, tc.wantErr)
@ -993,9 +998,10 @@ func TestNewFrameworkFillEventToPluginMap(t *testing.T) {
got := make(map[framework.ClusterEvent]sets.Set[string]) got := make(map[framework.ClusterEvent]sets.Set[string])
profile := config.KubeSchedulerProfile{Plugins: cfgPls} profile := config.KubeSchedulerProfile{Plugins: cfgPls}
stopCh := make(chan struct{}) _, ctx := ktesting.NewTestContext(t)
defer close(stopCh) ctx, cancel := context.WithCancel(ctx)
_, err := newFrameworkWithQueueSortAndBind(registry, profile, stopCh, WithClusterEventMap(got)) defer cancel()
_, err := newFrameworkWithQueueSortAndBind(ctx, registry, profile, WithClusterEventMap(got))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -1048,7 +1054,7 @@ func TestPreEnqueuePlugins(t *testing.T) {
profile := config.KubeSchedulerProfile{Plugins: cfgPls} profile := config.KubeSchedulerProfile{Plugins: cfgPls}
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
f, err := newFrameworkWithQueueSortAndBind(registry, profile, ctx.Done()) f, err := newFrameworkWithQueueSortAndBind(ctx, registry, profile)
if err != nil { if err != nil {
t.Fatalf("fail to create framework: %s", err) t.Fatalf("fail to create framework: %s", err)
} }
@ -1165,9 +1171,9 @@ func TestRunPreScorePlugins(t *testing.T) {
defer cancel() defer cancel()
f, err := newFrameworkWithQueueSortAndBind( f, err := newFrameworkWithQueueSortAndBind(
ctx,
r, r,
config.KubeSchedulerProfile{Plugins: &config.Plugins{PreScore: config.PluginSet{Enabled: enabled}}}, config.KubeSchedulerProfile{Plugins: &config.Plugins{PreScore: config.PluginSet{Enabled: enabled}}},
ctx.Done(),
) )
if err != nil { if err != nil {
t.Fatalf("Failed to create framework for testing: %v", err) t.Fatalf("Failed to create framework for testing: %v", err)
@ -1546,7 +1552,7 @@ func TestRunScorePlugins(t *testing.T) {
} }
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
f, err := newFrameworkWithQueueSortAndBind(registry, profile, ctx.Done()) f, err := newFrameworkWithQueueSortAndBind(ctx, registry, profile)
if err != nil { if err != nil {
t.Fatalf("Failed to create framework for testing: %v", err) t.Fatalf("Failed to create framework for testing: %v", err)
} }
@ -1590,7 +1596,7 @@ func TestPreFilterPlugins(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
f, err := newFrameworkWithQueueSortAndBind(r, profile, ctx.Done()) f, err := newFrameworkWithQueueSortAndBind(ctx, r, profile)
if err != nil { if err != nil {
t.Fatalf("Failed to create framework for testing: %v", err) t.Fatalf("Failed to create framework for testing: %v", err)
} }
@ -1724,9 +1730,9 @@ func TestRunPreFilterPlugins(t *testing.T) {
defer cancel() defer cancel()
f, err := newFrameworkWithQueueSortAndBind( f, err := newFrameworkWithQueueSortAndBind(
ctx,
r, r,
config.KubeSchedulerProfile{Plugins: &config.Plugins{PreFilter: config.PluginSet{Enabled: enabled}}}, config.KubeSchedulerProfile{Plugins: &config.Plugins{PreFilter: config.PluginSet{Enabled: enabled}}},
ctx.Done(),
) )
if err != nil { if err != nil {
t.Fatalf("Failed to create framework for testing: %v", err) t.Fatalf("Failed to create framework for testing: %v", err)
@ -1812,9 +1818,9 @@ func TestRunPreFilterExtensionRemovePod(t *testing.T) {
defer cancel() defer cancel()
f, err := newFrameworkWithQueueSortAndBind( f, err := newFrameworkWithQueueSortAndBind(
ctx,
r, r,
config.KubeSchedulerProfile{Plugins: &config.Plugins{PreFilter: config.PluginSet{Enabled: enabled}}}, config.KubeSchedulerProfile{Plugins: &config.Plugins{PreFilter: config.PluginSet{Enabled: enabled}}},
ctx.Done(),
) )
if err != nil { if err != nil {
t.Fatalf("Failed to create framework for testing: %v", err) t.Fatalf("Failed to create framework for testing: %v", err)
@ -1894,9 +1900,9 @@ func TestRunPreFilterExtensionAddPod(t *testing.T) {
defer cancel() defer cancel()
f, err := newFrameworkWithQueueSortAndBind( f, err := newFrameworkWithQueueSortAndBind(
ctx,
r, r,
config.KubeSchedulerProfile{Plugins: &config.Plugins{PreFilter: config.PluginSet{Enabled: enabled}}}, config.KubeSchedulerProfile{Plugins: &config.Plugins{PreFilter: config.PluginSet{Enabled: enabled}}},
ctx.Done(),
) )
if err != nil { if err != nil {
t.Fatalf("Failed to create framework for testing: %v", err) t.Fatalf("Failed to create framework for testing: %v", err)
@ -2100,7 +2106,7 @@ func TestFilterPlugins(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
f, err := newFrameworkWithQueueSortAndBind(registry, profile, ctx.Done()) f, err := newFrameworkWithQueueSortAndBind(ctx, registry, profile)
if err != nil { if err != nil {
t.Fatalf("fail to create framework: %s", err) t.Fatalf("fail to create framework: %s", err)
} }
@ -2224,7 +2230,7 @@ func TestPostFilterPlugins(t *testing.T) {
profile := config.KubeSchedulerProfile{Plugins: cfgPls} profile := config.KubeSchedulerProfile{Plugins: cfgPls}
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
f, err := newFrameworkWithQueueSortAndBind(registry, profile, ctx.Done()) f, err := newFrameworkWithQueueSortAndBind(ctx, registry, profile)
if err != nil { if err != nil {
t.Fatalf("fail to create framework: %s", err) t.Fatalf("fail to create framework: %s", err)
} }
@ -2373,7 +2379,7 @@ func TestFilterPluginsWithNominatedPods(t *testing.T) {
profile := config.KubeSchedulerProfile{Plugins: cfgPls} profile := config.KubeSchedulerProfile{Plugins: cfgPls}
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
f, err := newFrameworkWithQueueSortAndBind(registry, profile, ctx.Done(), WithPodNominator(podNominator)) f, err := newFrameworkWithQueueSortAndBind(ctx, registry, profile, WithPodNominator(podNominator))
if err != nil { if err != nil {
t.Fatalf("fail to create framework: %s", err) t.Fatalf("fail to create framework: %s", err)
} }
@ -2530,7 +2536,7 @@ func TestPreBindPlugins(t *testing.T) {
profile := config.KubeSchedulerProfile{Plugins: configPlugins} profile := config.KubeSchedulerProfile{Plugins: configPlugins}
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
f, err := newFrameworkWithQueueSortAndBind(registry, profile, ctx.Done()) f, err := newFrameworkWithQueueSortAndBind(ctx, registry, profile)
if err != nil { if err != nil {
t.Fatalf("fail to create framework: %s", err) t.Fatalf("fail to create framework: %s", err)
} }
@ -2688,7 +2694,7 @@ func TestReservePlugins(t *testing.T) {
profile := config.KubeSchedulerProfile{Plugins: configPlugins} profile := config.KubeSchedulerProfile{Plugins: configPlugins}
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
f, err := newFrameworkWithQueueSortAndBind(registry, profile, ctx.Done()) f, err := newFrameworkWithQueueSortAndBind(ctx, registry, profile)
if err != nil { if err != nil {
t.Fatalf("fail to create framework: %s", err) t.Fatalf("fail to create framework: %s", err)
} }
@ -2814,7 +2820,7 @@ func TestPermitPlugins(t *testing.T) {
profile := config.KubeSchedulerProfile{Plugins: configPlugins} profile := config.KubeSchedulerProfile{Plugins: configPlugins}
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
f, err := newFrameworkWithQueueSortAndBind(registry, profile, ctx.Done()) f, err := newFrameworkWithQueueSortAndBind(ctx, registry, profile)
if err != nil { if err != nil {
t.Fatalf("fail to create framework: %s", err) t.Fatalf("fail to create framework: %s", err)
} }
@ -2984,23 +2990,25 @@ func TestRecordingMetrics(t *testing.T) {
PostBind: pluginSet, PostBind: pluginSet,
} }
stopCh := make(chan struct{}) _, ctx := ktesting.NewTestContext(t)
recorder := metrics.NewMetricsAsyncRecorder(100, time.Nanosecond, stopCh) ctx, cancel := context.WithCancel(ctx)
recorder := metrics.NewMetricsAsyncRecorder(100, time.Nanosecond, ctx.Done())
profile := config.KubeSchedulerProfile{ profile := config.KubeSchedulerProfile{
PercentageOfNodesToScore: pointer.Int32(testPercentageOfNodesToScore), PercentageOfNodesToScore: pointer.Int32(testPercentageOfNodesToScore),
SchedulerName: testProfileName, SchedulerName: testProfileName,
Plugins: plugins, Plugins: plugins,
} }
f, err := newFrameworkWithQueueSortAndBind(r, profile, stopCh, withMetricsRecorder(recorder)) f, err := newFrameworkWithQueueSortAndBind(ctx, r, profile, withMetricsRecorder(recorder))
if err != nil { if err != nil {
close(stopCh) cancel()
t.Fatalf("Failed to create framework for testing: %v", err) t.Fatalf("Failed to create framework for testing: %v", err)
} }
tt.action(f) tt.action(f)
// Stop the goroutine which records metrics and ensure it's stopped. // Stop the goroutine which records metrics and ensure it's stopped.
close(stopCh) cancel()
<-recorder.IsStoppedCh <-recorder.IsStoppedCh
// Try to clean up the metrics buffer again in case it's not empty. // Try to clean up the metrics buffer again in case it's not empty.
recorder.FlushMetrics() recorder.FlushMetrics()
@ -3096,16 +3104,17 @@ func TestRunBindPlugins(t *testing.T) {
pluginSet.Enabled = append(pluginSet.Enabled, config.Plugin{Name: name}) pluginSet.Enabled = append(pluginSet.Enabled, config.Plugin{Name: name})
} }
plugins := &config.Plugins{Bind: pluginSet} plugins := &config.Plugins{Bind: pluginSet}
stopCh := make(chan struct{}) _, ctx := ktesting.NewTestContext(t)
recorder := metrics.NewMetricsAsyncRecorder(100, time.Nanosecond, stopCh) ctx, cancel := context.WithCancel(ctx)
recorder := metrics.NewMetricsAsyncRecorder(100, time.Nanosecond, ctx.Done())
profile := config.KubeSchedulerProfile{ profile := config.KubeSchedulerProfile{
SchedulerName: testProfileName, SchedulerName: testProfileName,
PercentageOfNodesToScore: pointer.Int32(testPercentageOfNodesToScore), PercentageOfNodesToScore: pointer.Int32(testPercentageOfNodesToScore),
Plugins: plugins, Plugins: plugins,
} }
fwk, err := newFrameworkWithQueueSortAndBind(r, profile, stopCh, withMetricsRecorder(recorder)) fwk, err := newFrameworkWithQueueSortAndBind(ctx, r, profile, withMetricsRecorder(recorder))
if err != nil { if err != nil {
close(stopCh) cancel()
t.Fatal(err) t.Fatal(err)
} }
@ -3115,7 +3124,7 @@ func TestRunBindPlugins(t *testing.T) {
} }
// Stop the goroutine which records metrics and ensure it's stopped. // Stop the goroutine which records metrics and ensure it's stopped.
close(stopCh) cancel()
<-recorder.IsStoppedCh <-recorder.IsStoppedCh
// Try to clean up the metrics buffer again in case it's not empty. // Try to clean up the metrics buffer again in case it's not empty.
recorder.FlushMetrics() recorder.FlushMetrics()
@ -3160,7 +3169,7 @@ func TestPermitWaitDurationMetric(t *testing.T) {
profile := config.KubeSchedulerProfile{Plugins: plugins} profile := config.KubeSchedulerProfile{Plugins: plugins}
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
f, err := newFrameworkWithQueueSortAndBind(r, profile, ctx.Done()) f, err := newFrameworkWithQueueSortAndBind(ctx, r, profile)
if err != nil { if err != nil {
t.Fatalf("Failed to create framework for testing: %v", err) t.Fatalf("Failed to create framework for testing: %v", err)
} }
@ -3216,7 +3225,7 @@ func TestWaitOnPermit(t *testing.T) {
profile := config.KubeSchedulerProfile{Plugins: plugins} profile := config.KubeSchedulerProfile{Plugins: plugins}
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
f, err := newFrameworkWithQueueSortAndBind(r, profile, ctx.Done()) f, err := newFrameworkWithQueueSortAndBind(ctx, r, profile)
if err != nil { if err != nil {
t.Fatalf("Failed to create framework for testing: %v", err) t.Fatalf("Failed to create framework for testing: %v", err)
} }
@ -3267,9 +3276,10 @@ func TestListPlugins(t *testing.T) {
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
profile := config.KubeSchedulerProfile{Plugins: tt.plugins} profile := config.KubeSchedulerProfile{Plugins: tt.plugins}
stopCh := make(chan struct{}) _, ctx := ktesting.NewTestContext(t)
defer close(stopCh) ctx, cancel := context.WithCancel(ctx)
f, err := newFrameworkWithQueueSortAndBind(registry, profile, stopCh) defer cancel()
f, err := newFrameworkWithQueueSortAndBind(ctx, registry, profile)
if err != nil { if err != nil {
t.Fatalf("Failed to create framework for testing: %v", err) t.Fatalf("Failed to create framework for testing: %v", err)
} }

View File

@ -18,6 +18,7 @@ limitations under the License.
package profile package profile
import ( import (
"context"
"errors" "errors"
"fmt" "fmt"
@ -34,24 +35,24 @@ import (
type RecorderFactory func(string) events.EventRecorder type RecorderFactory func(string) events.EventRecorder
// newProfile builds a Profile for the given configuration. // newProfile builds a Profile for the given configuration.
func newProfile(cfg config.KubeSchedulerProfile, r frameworkruntime.Registry, recorderFact RecorderFactory, func newProfile(ctx context.Context, cfg config.KubeSchedulerProfile, r frameworkruntime.Registry, recorderFact RecorderFactory,
stopCh <-chan struct{}, opts ...frameworkruntime.Option) (framework.Framework, error) { opts ...frameworkruntime.Option) (framework.Framework, error) {
recorder := recorderFact(cfg.SchedulerName) recorder := recorderFact(cfg.SchedulerName)
opts = append(opts, frameworkruntime.WithEventRecorder(recorder)) opts = append(opts, frameworkruntime.WithEventRecorder(recorder))
return frameworkruntime.NewFramework(r, &cfg, stopCh, opts...) return frameworkruntime.NewFramework(ctx, r, &cfg, opts...)
} }
// Map holds frameworks indexed by scheduler name. // Map holds frameworks indexed by scheduler name.
type Map map[string]framework.Framework type Map map[string]framework.Framework
// NewMap builds the frameworks given by the configuration, indexed by name. // NewMap builds the frameworks given by the configuration, indexed by name.
func NewMap(cfgs []config.KubeSchedulerProfile, r frameworkruntime.Registry, recorderFact RecorderFactory, func NewMap(ctx context.Context, cfgs []config.KubeSchedulerProfile, r frameworkruntime.Registry, recorderFact RecorderFactory,
stopCh <-chan struct{}, opts ...frameworkruntime.Option) (Map, error) { opts ...frameworkruntime.Option) (Map, error) {
m := make(Map) m := make(Map)
v := cfgValidator{m: m} v := cfgValidator{m: m}
for _, cfg := range cfgs { for _, cfg := range cfgs {
p, err := newProfile(cfg, r, recorderFact, stopCh, opts...) p, err := newProfile(ctx, cfg, r, recorderFact, opts...)
if err != nil { if err != nil {
return nil, fmt.Errorf("creating profile for scheduler name %s: %v", cfg.SchedulerName, err) return nil, fmt.Errorf("creating profile for scheduler name %s: %v", cfg.SchedulerName, err)
} }

View File

@ -25,6 +25,7 @@ import (
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/events" "k8s.io/client-go/tools/events"
"k8s.io/klog/v2/ktesting"
"k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework"
frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime" frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime"
@ -246,9 +247,10 @@ func TestNewMap(t *testing.T) {
} }
for _, tc := range cases { for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) { t.Run(tc.name, func(t *testing.T) {
stopCh := make(chan struct{}) _, ctx := ktesting.NewTestContext(t)
defer close(stopCh) ctx, cancel := context.WithCancel(ctx)
m, err := NewMap(tc.cfgs, fakeRegistry, nilRecorderFactory, stopCh) defer cancel()
m, err := NewMap(ctx, tc.cfgs, fakeRegistry, nilRecorderFactory)
if err := checkErr(err, tc.wantErr); err != nil { if err := checkErr(err, tc.wantErr); err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@ -395,11 +395,11 @@ func TestSchedulerMultipleProfilesScheduling(t *testing.T) {
informerFactory := informers.NewSharedInformerFactory(client, 0) informerFactory := informers.NewSharedInformerFactory(client, 0)
sched, err := New( sched, err := New(
ctx,
client, client,
informerFactory, informerFactory,
nil, nil,
profile.NewRecorderFactory(broadcaster), profile.NewRecorderFactory(broadcaster),
ctx.Done(),
WithProfiles( WithProfiles(
schedulerapi.KubeSchedulerProfile{SchedulerName: "match-node2", schedulerapi.KubeSchedulerProfile{SchedulerName: "match-node2",
Plugins: &schedulerapi.Plugins{ Plugins: &schedulerapi.Plugins{
@ -526,11 +526,11 @@ func TestSchedulerGuaranteeNonNilNodeInSchedulingCycle(t *testing.T) {
informerFactory := informers.NewSharedInformerFactory(client, 0) informerFactory := informers.NewSharedInformerFactory(client, 0)
sched, err := New( sched, err := New(
ctx,
client, client,
informerFactory, informerFactory,
nil, nil,
profile.NewRecorderFactory(broadcaster), profile.NewRecorderFactory(broadcaster),
ctx.Done(),
WithProfiles( WithProfiles(
schedulerapi.KubeSchedulerProfile{SchedulerName: fakeSchedulerName, schedulerapi.KubeSchedulerProfile{SchedulerName: fakeSchedulerName,
Plugins: &schedulerapi.Plugins{ Plugins: &schedulerapi.Plugins{
@ -748,9 +748,9 @@ func TestSchedulerScheduleOne(t *testing.T) {
) )
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
fwk, err := st.NewFramework(registerPluginFuncs, fwk, err := st.NewFramework(ctx,
registerPluginFuncs,
testSchedulerName, testSchedulerName,
ctx.Done(),
frameworkruntime.WithClientSet(client), frameworkruntime.WithClientSet(client),
frameworkruntime.WithEventRecorder(eventBroadcaster.NewRecorder(scheme.Scheme, testSchedulerName))) frameworkruntime.WithEventRecorder(eventBroadcaster.NewRecorder(scheme.Scheme, testSchedulerName)))
if err != nil { if err != nil {
@ -1223,10 +1223,11 @@ func TestSchedulerBinding(t *testing.T) {
}) })
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
fwk, err := st.NewFramework([]st.RegisterPluginFunc{ fwk, err := st.NewFramework(ctx,
[]st.RegisterPluginFunc{
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
}, "", ctx.Done(), frameworkruntime.WithClientSet(client), frameworkruntime.WithEventRecorder(&events.FakeRecorder{})) }, "", frameworkruntime.WithClientSet(client), frameworkruntime.WithEventRecorder(&events.FakeRecorder{}))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -2229,7 +2230,8 @@ func TestSchedulerSchedulePod(t *testing.T) {
} }
snapshot := internalcache.NewSnapshot(test.pods, nodes) snapshot := internalcache.NewSnapshot(test.pods, nodes)
fwk, err := st.NewFramework( fwk, err := st.NewFramework(
test.registerPlugins, "", ctx.Done(), ctx,
test.registerPlugins, "",
frameworkruntime.WithSnapshotSharedLister(snapshot), frameworkruntime.WithSnapshotSharedLister(snapshot),
frameworkruntime.WithInformerFactory(informerFactory), frameworkruntime.WithInformerFactory(informerFactory),
frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())), frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())),
@ -2282,6 +2284,7 @@ func TestFindFitAllError(t *testing.T) {
scheduler := makeScheduler(ctx, nodes) scheduler := makeScheduler(ctx, nodes)
fwk, err := st.NewFramework( fwk, err := st.NewFramework(
ctx,
[]st.RegisterPluginFunc{ []st.RegisterPluginFunc{
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
st.RegisterFilterPlugin("TrueFilter", st.NewTrueFilterPlugin), st.RegisterFilterPlugin("TrueFilter", st.NewTrueFilterPlugin),
@ -2289,7 +2292,6 @@ func TestFindFitAllError(t *testing.T) {
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
}, },
"", "",
ctx.Done(),
frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(nil)), frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(nil)),
) )
if err != nil { if err != nil {
@ -2322,6 +2324,7 @@ func TestFindFitSomeError(t *testing.T) {
scheduler := makeScheduler(ctx, nodes) scheduler := makeScheduler(ctx, nodes)
fwk, err := st.NewFramework( fwk, err := st.NewFramework(
ctx,
[]st.RegisterPluginFunc{ []st.RegisterPluginFunc{
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
st.RegisterFilterPlugin("TrueFilter", st.NewTrueFilterPlugin), st.RegisterFilterPlugin("TrueFilter", st.NewTrueFilterPlugin),
@ -2329,7 +2332,6 @@ func TestFindFitSomeError(t *testing.T) {
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
}, },
"", "",
ctx.Done(),
frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(nil)), frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(nil)),
) )
if err != nil { if err != nil {
@ -2404,7 +2406,8 @@ func TestFindFitPredicateCallCounts(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
fwk, err := st.NewFramework( fwk, err := st.NewFramework(
registerPlugins, "", ctx.Done(), ctx,
registerPlugins, "",
frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(nil)), frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(nil)),
) )
if err != nil { if err != nil {
@ -2540,7 +2543,8 @@ func TestZeroRequest(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
fwk, err := st.NewFramework( fwk, err := st.NewFramework(
pluginRegistrations, "", ctx.Done(), ctx,
pluginRegistrations, "",
frameworkruntime.WithInformerFactory(informerFactory), frameworkruntime.WithInformerFactory(informerFactory),
frameworkruntime.WithSnapshotSharedLister(snapshot), frameworkruntime.WithSnapshotSharedLister(snapshot),
frameworkruntime.WithClientSet(client), frameworkruntime.WithClientSet(client),
@ -2773,8 +2777,8 @@ func Test_prioritizeNodes(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
fwk, err := st.NewFramework( fwk, err := st.NewFramework(
ctx,
test.pluginRegistrations, "", test.pluginRegistrations, "",
ctx.Done(),
frameworkruntime.WithInformerFactory(informerFactory), frameworkruntime.WithInformerFactory(informerFactory),
frameworkruntime.WithSnapshotSharedLister(snapshot), frameworkruntime.WithSnapshotSharedLister(snapshot),
frameworkruntime.WithClientSet(client), frameworkruntime.WithClientSet(client),
@ -2888,13 +2892,13 @@ func TestFairEvaluationForNodes(t *testing.T) {
sched := makeScheduler(ctx, nodes) sched := makeScheduler(ctx, nodes)
fwk, err := st.NewFramework( fwk, err := st.NewFramework(
ctx,
[]st.RegisterPluginFunc{ []st.RegisterPluginFunc{
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
st.RegisterFilterPlugin("TrueFilter", st.NewTrueFilterPlugin), st.RegisterFilterPlugin("TrueFilter", st.NewTrueFilterPlugin),
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
}, },
"", "",
ctx.Done(),
frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(nil)), frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(nil)),
) )
if err != nil { if err != nil {
@ -2972,7 +2976,8 @@ func TestPreferNominatedNodeFilterCallCounts(t *testing.T) {
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
} }
fwk, err := st.NewFramework( fwk, err := st.NewFramework(
registerPlugins, "", ctx.Done(), ctx,
registerPlugins, "",
frameworkruntime.WithClientSet(client), frameworkruntime.WithClientSet(client),
frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())), frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())),
) )
@ -3123,9 +3128,9 @@ func setupTestScheduler(ctx context.Context, t *testing.T, queuedPodStore *clien
schedulingQueue := internalqueue.NewTestQueueWithInformerFactory(ctx, nil, informerFactory) schedulingQueue := internalqueue.NewTestQueueWithInformerFactory(ctx, nil, informerFactory)
fwk, _ := st.NewFramework( fwk, _ := st.NewFramework(
ctx,
fns, fns,
testSchedulerName, testSchedulerName,
ctx.Done(),
frameworkruntime.WithClientSet(client), frameworkruntime.WithClientSet(client),
frameworkruntime.WithEventRecorder(recorder), frameworkruntime.WithEventRecorder(recorder),
frameworkruntime.WithInformerFactory(informerFactory), frameworkruntime.WithInformerFactory(informerFactory),

View File

@ -239,17 +239,14 @@ var defaultSchedulerOptions = schedulerOptions{
} }
// New returns a Scheduler // New returns a Scheduler
func New(client clientset.Interface, func New(ctx context.Context,
client clientset.Interface,
informerFactory informers.SharedInformerFactory, informerFactory informers.SharedInformerFactory,
dynInformerFactory dynamicinformer.DynamicSharedInformerFactory, dynInformerFactory dynamicinformer.DynamicSharedInformerFactory,
recorderFactory profile.RecorderFactory, recorderFactory profile.RecorderFactory,
stopCh <-chan struct{},
opts ...Option) (*Scheduler, error) { opts ...Option) (*Scheduler, error) {
stopEverything := stopCh stopEverything := ctx.Done()
if stopEverything == nil {
stopEverything = wait.NeverStop
}
options := defaultSchedulerOptions options := defaultSchedulerOptions
for _, opt := range opts { for _, opt := range opts {
@ -283,9 +280,9 @@ func New(client clientset.Interface,
snapshot := internalcache.NewEmptySnapshot() snapshot := internalcache.NewEmptySnapshot()
clusterEventMap := make(map[framework.ClusterEvent]sets.Set[string]) clusterEventMap := make(map[framework.ClusterEvent]sets.Set[string])
metricsRecorder := metrics.NewMetricsAsyncRecorder(1000, time.Second, stopCh) metricsRecorder := metrics.NewMetricsAsyncRecorder(1000, time.Second, stopEverything)
profiles, err := profile.NewMap(options.profiles, registry, recorderFactory, stopCh, profiles, err := profile.NewMap(ctx, options.profiles, registry, recorderFactory,
frameworkruntime.WithComponentConfigVersion(options.componentConfigVersion), frameworkruntime.WithComponentConfigVersion(options.componentConfigVersion),
frameworkruntime.WithClientSet(client), frameworkruntime.WithClientSet(client),
frameworkruntime.WithKubeConfig(options.kubeConfig), frameworkruntime.WithKubeConfig(options.kubeConfig),

View File

@ -36,6 +36,7 @@ import (
"k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/events" "k8s.io/client-go/tools/events"
"k8s.io/klog/v2/ktesting"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config" schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultbinder" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultbinder"
@ -173,14 +174,15 @@ func TestSchedulerCreation(t *testing.T) {
eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1()}) eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1()})
stopCh := make(chan struct{}) _, ctx := ktesting.NewTestContext(t)
defer close(stopCh) ctx, cancel := context.WithCancel(ctx)
defer cancel()
s, err := New( s, err := New(
ctx,
client, client,
informerFactory, informerFactory,
nil, nil,
profile.NewRecorderFactory(eventBroadcaster), profile.NewRecorderFactory(eventBroadcaster),
stopCh,
tc.opts..., tc.opts...,
) )
@ -284,7 +286,7 @@ func TestFailureHandler(t *testing.T) {
queue.Delete(testPod) queue.Delete(testPod)
} }
s, fwk, err := initScheduler(ctx.Done(), schedulerCache, queue, client, informerFactory) s, fwk, err := initScheduler(ctx, schedulerCache, queue, client, informerFactory)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -359,7 +361,7 @@ func TestFailureHandler_NodeNotFound(t *testing.T) {
} }
} }
s, fwk, err := initScheduler(ctx.Done(), schedulerCache, queue, client, informerFactory) s, fwk, err := initScheduler(ctx, schedulerCache, queue, client, informerFactory)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -398,7 +400,7 @@ func TestFailureHandler_PodAlreadyBound(t *testing.T) {
// Add node to schedulerCache no matter it's deleted in API server or not. // Add node to schedulerCache no matter it's deleted in API server or not.
schedulerCache.AddNode(&nodeFoo) schedulerCache.AddNode(&nodeFoo)
s, fwk, err := initScheduler(ctx.Done(), schedulerCache, queue, client, informerFactory) s, fwk, err := initScheduler(ctx, schedulerCache, queue, client, informerFactory)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -436,14 +438,15 @@ func TestWithPercentageOfNodesToScore(t *testing.T) {
client := fake.NewSimpleClientset() client := fake.NewSimpleClientset()
informerFactory := informers.NewSharedInformerFactory(client, 0) informerFactory := informers.NewSharedInformerFactory(client, 0)
eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1()}) eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1()})
stopCh := make(chan struct{}) _, ctx := ktesting.NewTestContext(t)
defer close(stopCh) ctx, cancel := context.WithCancel(ctx)
defer cancel()
sched, err := New( sched, err := New(
ctx,
client, client,
informerFactory, informerFactory,
nil, nil,
profile.NewRecorderFactory(eventBroadcaster), profile.NewRecorderFactory(eventBroadcaster),
stopCh,
WithPercentageOfNodesToScore(tt.percentageOfNodesToScoreConfig), WithPercentageOfNodesToScore(tt.percentageOfNodesToScoreConfig),
) )
if err != nil { if err != nil {
@ -483,16 +486,16 @@ func getPodFromPriorityQueue(queue *internalqueue.PriorityQueue, pod *v1.Pod) *v
return nil return nil
} }
func initScheduler(stop <-chan struct{}, cache internalcache.Cache, queue internalqueue.SchedulingQueue, func initScheduler(ctx context.Context, cache internalcache.Cache, queue internalqueue.SchedulingQueue,
client kubernetes.Interface, informerFactory informers.SharedInformerFactory) (*Scheduler, framework.Framework, error) { client kubernetes.Interface, informerFactory informers.SharedInformerFactory) (*Scheduler, framework.Framework, error) {
registerPluginFuncs := []st.RegisterPluginFunc{ registerPluginFuncs := []st.RegisterPluginFunc{
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
} }
eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1()}) eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1()})
fwk, err := st.NewFramework(registerPluginFuncs, fwk, err := st.NewFramework(ctx,
registerPluginFuncs,
testSchedulerName, testSchedulerName,
stop,
frameworkruntime.WithClientSet(client), frameworkruntime.WithClientSet(client),
frameworkruntime.WithInformerFactory(informerFactory), frameworkruntime.WithInformerFactory(informerFactory),
frameworkruntime.WithEventRecorder(eventBroadcaster.NewRecorder(scheme.Scheme, testSchedulerName)), frameworkruntime.WithEventRecorder(eventBroadcaster.NewRecorder(scheme.Scheme, testSchedulerName)),
@ -504,7 +507,7 @@ func initScheduler(stop <-chan struct{}, cache internalcache.Cache, queue intern
s := &Scheduler{ s := &Scheduler{
Cache: cache, Cache: cache,
client: client, client: client,
StopEverything: stop, StopEverything: ctx.Done(),
SchedulingQueue: queue, SchedulingQueue: queue,
Profiles: profile.Map{testSchedulerName: fwk}, Profiles: profile.Map{testSchedulerName: fwk},
} }
@ -591,9 +594,10 @@ func TestInitPluginsWithIndexers(t *testing.T) {
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
) )
stopCh := make(chan struct{}) _, ctx := ktesting.NewTestContext(t)
defer close(stopCh) ctx, cancel := context.WithCancel(ctx)
_, err := st.NewFramework(registerPluginFuncs, "test", stopCh, frameworkruntime.WithInformerFactory(fakeInformerFactory)) defer cancel()
_, err := st.NewFramework(ctx, registerPluginFuncs, "test", frameworkruntime.WithInformerFactory(fakeInformerFactory))
if len(tt.wantErr) > 0 { if len(tt.wantErr) > 0 {
if err == nil || !strings.Contains(err.Error(), tt.wantErr) { if err == nil || !strings.Contains(err.Error(), tt.wantErr) {

View File

@ -17,6 +17,8 @@ limitations under the License.
package testing package testing
import ( import (
"context"
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
kubeschedulerconfigv1 "k8s.io/kube-scheduler/config/v1" kubeschedulerconfigv1 "k8s.io/kube-scheduler/config/v1"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config" schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config"
@ -28,7 +30,7 @@ import (
var configDecoder = scheme.Codecs.UniversalDecoder() var configDecoder = scheme.Codecs.UniversalDecoder()
// NewFramework creates a Framework from the register functions and options. // NewFramework creates a Framework from the register functions and options.
func NewFramework(fns []RegisterPluginFunc, profileName string, stopCh <-chan struct{}, opts ...runtime.Option) (framework.Framework, error) { func NewFramework(ctx context.Context, fns []RegisterPluginFunc, profileName string, opts ...runtime.Option) (framework.Framework, error) {
registry := runtime.Registry{} registry := runtime.Registry{}
profile := &schedulerapi.KubeSchedulerProfile{ profile := &schedulerapi.KubeSchedulerProfile{
SchedulerName: profileName, SchedulerName: profileName,
@ -37,7 +39,7 @@ func NewFramework(fns []RegisterPluginFunc, profileName string, stopCh <-chan st
for _, f := range fns { for _, f := range fns {
f(&registry, profile) f(&registry, profile)
} }
return runtime.NewFramework(registry, profile, stopCh, opts...) return runtime.NewFramework(ctx, registry, profile, opts...)
} }
// RegisterPluginFunc is a function signature used in method RegisterFilterPlugin() // RegisterPluginFunc is a function signature used in method RegisterFilterPlugin()

View File

@ -98,11 +98,11 @@ func setupWithServerSetup(t *testing.T, serverSetup framework.TestServerSetup) (
}) })
sched, err := scheduler.New( sched, err := scheduler.New(
ctx,
clientSet, clientSet,
informers, informers,
nil, nil,
profile.NewRecorderFactory(eventBroadcaster), profile.NewRecorderFactory(eventBroadcaster),
ctx.Done(),
) )
if err != nil { if err != nil {
t.Fatalf("Couldn't create scheduler: %v", err) t.Fatalf("Couldn't create scheduler: %v", err)

View File

@ -81,11 +81,11 @@ func StartScheduler(ctx context.Context, clientSet clientset.Interface, kubeConf
evtBroadcaster.StartRecordingToSink(ctx.Done()) evtBroadcaster.StartRecordingToSink(ctx.Done())
sched, err := scheduler.New( sched, err := scheduler.New(
ctx,
clientSet, clientSet,
informerFactory, informerFactory,
nil, nil,
profile.NewRecorderFactory(evtBroadcaster), profile.NewRecorderFactory(evtBroadcaster),
ctx.Done(),
scheduler.WithKubeConfig(kubeConfig), scheduler.WithKubeConfig(kubeConfig),
scheduler.WithProfiles(cfg.Profiles...), scheduler.WithProfiles(cfg.Profiles...),
scheduler.WithPercentageOfNodesToScore(cfg.PercentageOfNodesToScore), scheduler.WithPercentageOfNodesToScore(cfg.PercentageOfNodesToScore),
@ -446,11 +446,11 @@ func InitTestSchedulerWithOptions(
opts = append(opts, scheduler.WithKubeConfig(testCtx.KubeConfig)) opts = append(opts, scheduler.WithKubeConfig(testCtx.KubeConfig))
testCtx.Scheduler, err = scheduler.New( testCtx.Scheduler, err = scheduler.New(
ctx,
testCtx.ClientSet, testCtx.ClientSet,
testCtx.InformerFactory, testCtx.InformerFactory,
testCtx.DynInformerFactory, testCtx.DynInformerFactory,
profile.NewRecorderFactory(eventBroadcaster), profile.NewRecorderFactory(eventBroadcaster),
ctx.Done(),
opts..., opts...,
) )