Remove potential goroutine leak in NewFramework

Signed-off-by: kerthcet <kerthcet@gmail.com>
This commit is contained in:
kerthcet 2022-08-06 00:05:22 +08:00
parent bd9444c1cf
commit 97e3e50493
19 changed files with 189 additions and 113 deletions

View File

@ -283,8 +283,10 @@ func TestSchedulerWithExtenders(t *testing.T) {
for _, name := range test.nodes {
cache.AddNode(createNode(name))
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
fwk, err := st.NewFramework(
test.registerPlugins, "",
test.registerPlugins, "", ctx.Done(),
runtime.WithClientSet(client),
runtime.WithInformerFactory(informerFactory),
runtime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())),
@ -304,7 +306,7 @@ func TestSchedulerWithExtenders(t *testing.T) {
emptySnapshot,
schedulerapi.DefaultPercentageOfNodesToScore)
podIgnored := &v1.Pod{}
result, err := scheduler.SchedulePod(context.Background(), fwk, framework.NewCycleState(), podIgnored)
result, err := scheduler.SchedulePod(ctx, fwk, framework.NewCycleState(), podIgnored)
if test.expectsErr {
if err == nil {
t.Errorf("Unexpected non-error, result %+v", result)

View File

@ -25,7 +25,6 @@ import (
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes/fake"
clienttesting "k8s.io/client-go/testing"
frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime"
@ -53,6 +52,9 @@ func TestDefaultBinder(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
var gotBinding *v1.Binding
client := fake.NewSimpleClientset(testPod)
client.PrependReactor("create", "pods", func(action clienttesting.Action) (bool, runtime.Object, error) {
@ -66,12 +68,12 @@ func TestDefaultBinder(t *testing.T) {
return true, gotBinding, nil
})
fh, err := frameworkruntime.NewFramework(nil, nil, wait.NeverStop, frameworkruntime.WithClientSet(client))
fh, err := frameworkruntime.NewFramework(nil, nil, ctx.Done(), frameworkruntime.WithClientSet(client))
if err != nil {
t.Fatal(err)
}
binder := &DefaultBinder{handle: fh}
status := binder.Bind(context.Background(), nil, testPod, "foohost.kubernetes.mydomain.com")
status := binder.Bind(ctx, nil, testPod, "foohost.kubernetes.mydomain.com")
if got := status.AsError(); (tt.injectErr != nil) != (got != nil) {
t.Errorf("got error %q, want %q", got, tt.injectErr)
}

View File

@ -351,7 +351,9 @@ func TestPostFilter(t *testing.T) {
if tt.extender != nil {
extenders = append(extenders, tt.extender)
}
f, err := st.NewFramework(registeredPlugins, "",
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
f, err := st.NewFramework(registeredPlugins, "", ctx.Done(),
frameworkruntime.WithClientSet(cs),
frameworkruntime.WithEventRecorder(&events.FakeRecorder{}),
frameworkruntime.WithInformerFactory(informerFactory),
@ -371,11 +373,11 @@ func TestPostFilter(t *testing.T) {
state := framework.NewCycleState()
// Ensure <state> is populated.
if _, status := f.RunPreFilterPlugins(context.Background(), state, tt.pod); !status.IsSuccess() {
if _, status := f.RunPreFilterPlugins(ctx, state, tt.pod); !status.IsSuccess() {
t.Errorf("Unexpected PreFilter Status: %v", status)
}
gotResult, gotStatus := p.PostFilter(context.TODO(), state, tt.pod, tt.filteredNodesStatuses)
gotResult, gotStatus := p.PostFilter(ctx, state, tt.pod, tt.filteredNodesStatuses)
// As we cannot compare two errors directly due to miss the equal method for how to compare two errors, so just need to compare the reasons.
if gotStatus.Code() == framework.Error {
if diff := cmp.Diff(tt.wantStatus.Reasons(), gotStatus.Reasons()); diff != "" {
@ -1083,8 +1085,11 @@ func TestDryRunPreemption(t *testing.T) {
// or minCandidateNodesAbsolute. This is only done in a handful of tests.
parallelism = 1
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
fwk, err := st.NewFramework(
registeredPlugins, "",
registeredPlugins, "", ctx.Done(),
frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())),
frameworkruntime.WithSnapshotSharedLister(snapshot),
frameworkruntime.WithInformerFactory(informerFactory),
@ -1094,8 +1099,6 @@ func TestDryRunPreemption(t *testing.T) {
t.Fatal(err)
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
informerFactory.Start(ctx.Done())
informerFactory.WaitForCacheSync(ctx.Done())
@ -1125,7 +1128,7 @@ func TestDryRunPreemption(t *testing.T) {
for cycle, pod := range tt.testPods {
state := framework.NewCycleState()
// Some tests rely on PreFilter plugin to compute its CycleState.
if _, status := fwk.RunPreFilterPlugins(context.Background(), state, pod); !status.IsSuccess() {
if _, status := fwk.RunPreFilterPlugins(ctx, state, pod); !status.IsSuccess() {
t.Errorf("cycle %d: Unexpected PreFilter Status: %v", cycle, status)
}
pe := preemption.Evaluator{
@ -1137,7 +1140,7 @@ func TestDryRunPreemption(t *testing.T) {
Interface: pl,
}
offset, numCandidates := pl.GetOffsetAndNumCandidates(int32(len(nodeInfos)))
got, _, _ := pe.DryRunPreemption(context.Background(), pod, nodeInfos, tt.pdbs, offset, numCandidates)
got, _, _ := pe.DryRunPreemption(ctx, pod, nodeInfos, tt.pdbs, offset, numCandidates)
// Sort the values (inner victims) and the candidate itself (by its NominatedNodeName).
for i := range got {
victims := got[i].Victims().Pods
@ -1334,6 +1337,8 @@ func TestSelectBestCandidate(t *testing.T) {
cs := clientsetfake.NewSimpleClientset(objs...)
informerFactory := informers.NewSharedInformerFactory(cs, 0)
snapshot := internalcache.NewSnapshot(tt.pods, nodes)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
fwk, err := st.NewFramework(
[]st.RegisterPluginFunc{
tt.registerPlugin,
@ -1341,6 +1346,7 @@ func TestSelectBestCandidate(t *testing.T) {
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
},
"",
ctx.Done(),
frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())),
frameworkruntime.WithSnapshotSharedLister(snapshot),
)
@ -1350,7 +1356,7 @@ func TestSelectBestCandidate(t *testing.T) {
state := framework.NewCycleState()
// Some tests rely on PreFilter plugin to compute its CycleState.
if _, status := fwk.RunPreFilterPlugins(context.Background(), state, tt.pod); !status.IsSuccess() {
if _, status := fwk.RunPreFilterPlugins(ctx, state, tt.pod); !status.IsSuccess() {
t.Errorf("Unexpected PreFilter Status: %v", status)
}
nodeInfos, err := snapshot.NodeInfos().List()
@ -1373,7 +1379,7 @@ func TestSelectBestCandidate(t *testing.T) {
Interface: pl,
}
offset, numCandidates := pl.GetOffsetAndNumCandidates(int32(len(nodeInfos)))
candidates, _, _ := pe.DryRunPreemption(context.Background(), tt.pod, nodeInfos, nil, offset, numCandidates)
candidates, _, _ := pe.DryRunPreemption(ctx, tt.pod, nodeInfos, nil, offset, numCandidates)
s := pe.SelectCandidate(candidates)
if s == nil || len(s.Name()) == 0 {
return
@ -1445,7 +1451,9 @@ func TestPodEligibleToPreemptOthers(t *testing.T) {
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
}
f, err := st.NewFramework(registeredPlugins, "",
stopCh := make(chan struct{})
defer close(stopCh)
f, err := st.NewFramework(registeredPlugins, "", stopCh,
frameworkruntime.WithSnapshotSharedLister(internalcache.NewSnapshot(test.pods, nodes)),
)
if err != nil {
@ -1639,10 +1647,10 @@ func TestPreempt(t *testing.T) {
return true, nil, nil
})
stop := make(chan struct{})
defer close(stop)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cache := internalcache.New(time.Duration(0), stop)
cache := internalcache.New(time.Duration(0), ctx.Done())
for _, pod := range test.pods {
cache.AddPod(pod)
}
@ -1678,6 +1686,7 @@ func TestPreempt(t *testing.T) {
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
},
"",
ctx.Done(),
frameworkruntime.WithClientSet(client),
frameworkruntime.WithEventRecorder(&events.FakeRecorder{}),
frameworkruntime.WithExtenders(extenders),
@ -1691,7 +1700,7 @@ func TestPreempt(t *testing.T) {
state := framework.NewCycleState()
// Some tests rely on PreFilter plugin to compute its CycleState.
if _, s := fwk.RunPreFilterPlugins(context.Background(), state, test.pod); !s.IsSuccess() {
if _, s := fwk.RunPreFilterPlugins(ctx, state, test.pod); !s.IsSuccess() {
t.Errorf("Unexpected preFilterStatus: %v", s)
}
// Call preempt and check the expected results.
@ -1710,7 +1719,7 @@ func TestPreempt(t *testing.T) {
State: state,
Interface: &pl,
}
res, status := pe.Preempt(context.Background(), test.pod, make(framework.NodeToStatusMap))
res, status := pe.Preempt(ctx, test.pod, make(framework.NodeToStatusMap))
if !status.IsSuccess() && !status.IsUnschedulable() {
t.Errorf("unexpected error in preemption: %v", status.AsError())
}
@ -1746,7 +1755,7 @@ func TestPreempt(t *testing.T) {
}
// Call preempt again and make sure it doesn't preempt any more pods.
res, status = pe.Preempt(context.Background(), test.pod, make(framework.NodeToStatusMap))
res, status = pe.Preempt(ctx, test.pod, make(framework.NodeToStatusMap))
if !status.IsSuccess() && !status.IsUnschedulable() {
t.Errorf("unexpected error in preemption: %v", status.AsError())
}

View File

@ -25,7 +25,6 @@ import (
"github.com/google/go-cmp/cmp"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/runtime"
"k8s.io/kubernetes/pkg/scheduler/internal/cache"
@ -332,16 +331,18 @@ func TestImageLocalityPriority(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
snapshot := cache.NewSnapshot(nil, test.nodes)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
snapshot := cache.NewSnapshot(nil, test.nodes)
state := framework.NewCycleState()
fh, _ := runtime.NewFramework(nil, nil, wait.NeverStop, runtime.WithSnapshotSharedLister(snapshot))
fh, _ := runtime.NewFramework(nil, nil, ctx.Done(), runtime.WithSnapshotSharedLister(snapshot))
p, _ := New(nil, fh)
var gotList framework.NodeScoreList
for _, n := range test.nodes {
nodeName := n.ObjectMeta.Name
score, status := p.(framework.ScorePlugin).Score(context.Background(), state, test.pod, nodeName)
score, status := p.(framework.ScorePlugin).Score(ctx, state, test.pod, nodeName)
if !status.IsSuccess() {
t.Errorf("unexpected error: %v", status)
}

View File

@ -24,7 +24,6 @@ import (
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/runtime"
@ -1133,15 +1132,18 @@ func TestNodeAffinityPriority(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
state := framework.NewCycleState()
fh, _ := runtime.NewFramework(nil, nil, wait.NeverStop, runtime.WithSnapshotSharedLister(cache.NewSnapshot(nil, test.nodes)))
fh, _ := runtime.NewFramework(nil, nil, ctx.Done(), runtime.WithSnapshotSharedLister(cache.NewSnapshot(nil, test.nodes)))
p, err := New(&test.args, fh)
if err != nil {
t.Fatalf("Creating plugin: %v", err)
}
var status *framework.Status
if !test.disablePreScore {
status = p.(framework.PreScorePlugin).PreScore(context.Background(), state, test.pod, test.nodes)
status = p.(framework.PreScorePlugin).PreScore(ctx, state, test.pod, test.nodes)
if !status.IsSuccess() {
t.Errorf("unexpected error: %v", status)
}
@ -1149,14 +1151,14 @@ func TestNodeAffinityPriority(t *testing.T) {
var gotList framework.NodeScoreList
for _, n := range test.nodes {
nodeName := n.ObjectMeta.Name
score, status := p.(framework.ScorePlugin).Score(context.Background(), state, test.pod, nodeName)
score, status := p.(framework.ScorePlugin).Score(ctx, state, test.pod, nodeName)
if !status.IsSuccess() {
t.Errorf("unexpected error: %v", status)
}
gotList = append(gotList, framework.NodeScore{Name: nodeName, Score: score})
}
status = p.(framework.ScorePlugin).ScoreExtensions().NormalizeScore(context.Background(), state, test.pod, gotList)
status = p.(framework.ScorePlugin).ScoreExtensions().NormalizeScore(ctx, state, test.pod, gotList)
if !status.IsSuccess() {
t.Errorf("unexpected error: %v", status)
}

View File

@ -24,7 +24,6 @@ import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
@ -351,10 +350,12 @@ func TestNodeResourcesBalancedAllocation(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
snapshot := cache.NewSnapshot(test.pods, test.nodes)
fh, _ := runtime.NewFramework(nil, nil, wait.NeverStop, runtime.WithSnapshotSharedLister(snapshot))
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
fh, _ := runtime.NewFramework(nil, nil, ctx.Done(), runtime.WithSnapshotSharedLister(snapshot))
p, _ := NewBalancedAllocation(&test.args, fh, feature.Features{})
for i := range test.nodes {
hostResult, err := p.(framework.ScorePlugin).Score(context.Background(), nil, test.pod, test.nodes[i].Name)
hostResult, err := p.(framework.ScorePlugin).Score(ctx, nil, test.pod, test.nodes[i].Name)
if err != nil {
t.Errorf("unexpected error: %v", err)
}

View File

@ -24,7 +24,6 @@ import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/framework"
plfeature "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
@ -746,9 +745,12 @@ func TestFitScore(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
state := framework.NewCycleState()
snapshot := cache.NewSnapshot(test.existingPods, test.nodes)
fh, _ := runtime.NewFramework(nil, nil, wait.NeverStop, runtime.WithSnapshotSharedLister(snapshot))
fh, _ := runtime.NewFramework(nil, nil, ctx.Done(), runtime.WithSnapshotSharedLister(snapshot))
args := test.nodeResourcesFitArgs
p, err := NewFit(&args, fh, plfeature.Features{})
if err != nil {
@ -757,7 +759,7 @@ func TestFitScore(t *testing.T) {
var gotPriorities framework.NodeScoreList
for _, n := range test.nodes {
score, status := p.(framework.ScorePlugin).Score(context.Background(), state, test.requestedPod, n.Name)
score, status := p.(framework.ScorePlugin).Score(ctx, state, test.requestedPod, n.Name)
if !status.IsSuccess() {
t.Errorf("unexpected error: %v", status)
}

View File

@ -23,7 +23,6 @@ import (
"github.com/google/go-cmp/cmp"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/validation/field"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/framework"
plfeature "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
@ -375,9 +374,12 @@ func TestLeastAllocatedScoringStrategy(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
state := framework.NewCycleState()
snapshot := cache.NewSnapshot(test.existingPods, test.nodes)
fh, _ := runtime.NewFramework(nil, nil, wait.NeverStop, runtime.WithSnapshotSharedLister(snapshot))
fh, _ := runtime.NewFramework(nil, nil, ctx.Done(), runtime.WithSnapshotSharedLister(snapshot))
p, err := NewFit(
&config.NodeResourcesFitArgs{
@ -396,7 +398,7 @@ func TestLeastAllocatedScoringStrategy(t *testing.T) {
var gotScores framework.NodeScoreList
for _, n := range test.nodes {
score, status := p.(framework.ScorePlugin).Score(context.Background(), state, test.requestedPod, n.Name)
score, status := p.(framework.ScorePlugin).Score(ctx, state, test.requestedPod, n.Name)
if !status.IsSuccess() {
t.Errorf("unexpected error: %v", status)
}

View File

@ -23,7 +23,6 @@ import (
"github.com/google/go-cmp/cmp"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/validation/field"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/framework"
plfeature "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
@ -331,9 +330,12 @@ func TestMostAllocatedScoringStrategy(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
state := framework.NewCycleState()
snapshot := cache.NewSnapshot(test.existingPods, test.nodes)
fh, _ := runtime.NewFramework(nil, nil, wait.NeverStop, runtime.WithSnapshotSharedLister(snapshot))
fh, _ := runtime.NewFramework(nil, nil, ctx.Done(), runtime.WithSnapshotSharedLister(snapshot))
p, err := NewFit(
&config.NodeResourcesFitArgs{
@ -352,7 +354,7 @@ func TestMostAllocatedScoringStrategy(t *testing.T) {
var gotScores framework.NodeScoreList
for _, n := range test.nodes {
score, status := p.(framework.ScorePlugin).Score(context.Background(), state, test.requestedPod, n.Name)
score, status := p.(framework.ScorePlugin).Score(ctx, state, test.requestedPod, n.Name)
if !status.IsSuccess() {
t.Errorf("unexpected error: %v", status)
}

View File

@ -108,9 +108,12 @@ func TestRequestedToCapacityRatioScoringStrategy(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
state := framework.NewCycleState()
snapshot := cache.NewSnapshot(test.existingPods, test.nodes)
fh, _ := runtime.NewFramework(nil, nil, wait.NeverStop, runtime.WithSnapshotSharedLister(snapshot))
fh, _ := runtime.NewFramework(nil, nil, ctx.Done(), runtime.WithSnapshotSharedLister(snapshot))
p, err := NewFit(&config.NodeResourcesFitArgs{
ScoringStrategy: &config.ScoringStrategy{
@ -131,7 +134,7 @@ func TestRequestedToCapacityRatioScoringStrategy(t *testing.T) {
var gotScores framework.NodeScoreList
for _, n := range test.nodes {
score, status := p.(framework.ScorePlugin).Score(context.Background(), state, test.requestedPod, n.Name)
score, status := p.(framework.ScorePlugin).Score(ctx, state, test.requestedPod, n.Name)
if !status.IsSuccess() {
t.Errorf("unexpected error: %v", status)
}

View File

@ -27,7 +27,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/kubernetes/pkg/scheduler/apis/config"
@ -498,7 +497,7 @@ func TestPreScoreStateEmptyNodes(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
informerFactory := informers.NewSharedInformerFactory(fake.NewSimpleClientset(tt.objs...), 0)
f, err := frameworkruntime.NewFramework(nil, nil, wait.NeverStop,
f, err := frameworkruntime.NewFramework(nil, nil, ctx.Done(),
frameworkruntime.WithSnapshotSharedLister(cache.NewSnapshot(nil, tt.nodes)),
frameworkruntime.WithInformerFactory(informerFactory))
if err != nil {
@ -1382,7 +1381,7 @@ func BenchmarkTestDefaultEvenPodsSpreadPriority(b *testing.B) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
informerFactory := informers.NewSharedInformerFactory(client, 0)
f, err := frameworkruntime.NewFramework(nil, nil, wait.NeverStop,
f, err := frameworkruntime.NewFramework(nil, nil, ctx.Done(),
frameworkruntime.WithSnapshotSharedLister(snapshot),
frameworkruntime.WithInformerFactory(informerFactory))
if err != nil {

View File

@ -21,7 +21,6 @@ import (
"testing"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/kubernetes/pkg/scheduler/framework"
@ -70,7 +69,7 @@ func BenchmarkTestSelectorSpreadPriority(b *testing.B) {
b.Errorf("error waiting for informer cache sync")
}
}
fh, _ := runtime.NewFramework(nil, nil, wait.NeverStop, runtime.WithSnapshotSharedLister(snapshot), runtime.WithInformerFactory(informerFactory))
fh, _ := runtime.NewFramework(nil, nil, ctx.Done(), runtime.WithSnapshotSharedLister(snapshot), runtime.WithInformerFactory(informerFactory))
pl, err := New(nil, fh)
if err != nil {
b.Fatal(err)

View File

@ -403,7 +403,7 @@ func TestSelectorSpreadScore(t *testing.T) {
if err != nil {
t.Errorf("error creating informerFactory: %+v", err)
}
fh, err := frameworkruntime.NewFramework(nil, nil, wait.NeverStop, frameworkruntime.WithSnapshotSharedLister(snapshot), frameworkruntime.WithInformerFactory(informerFactory))
fh, err := frameworkruntime.NewFramework(nil, nil, ctx.Done(), frameworkruntime.WithSnapshotSharedLister(snapshot), frameworkruntime.WithInformerFactory(informerFactory))
if err != nil {
t.Errorf("error creating new framework handle: %+v", err)
}

View File

@ -23,7 +23,6 @@ import (
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/runtime"
"k8s.io/kubernetes/pkg/scheduler/internal/cache"
@ -229,26 +228,29 @@ func TestTaintTolerationScore(t *testing.T) {
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
state := framework.NewCycleState()
snapshot := cache.NewSnapshot(nil, test.nodes)
fh, _ := runtime.NewFramework(nil, nil, wait.NeverStop, runtime.WithSnapshotSharedLister(snapshot))
fh, _ := runtime.NewFramework(nil, nil, ctx.Done(), runtime.WithSnapshotSharedLister(snapshot))
p, _ := New(nil, fh)
status := p.(framework.PreScorePlugin).PreScore(context.Background(), state, test.pod, test.nodes)
status := p.(framework.PreScorePlugin).PreScore(ctx, state, test.pod, test.nodes)
if !status.IsSuccess() {
t.Errorf("unexpected error: %v", status)
}
var gotList framework.NodeScoreList
for _, n := range test.nodes {
nodeName := n.ObjectMeta.Name
score, status := p.(framework.ScorePlugin).Score(context.Background(), state, test.pod, nodeName)
score, status := p.(framework.ScorePlugin).Score(ctx, state, test.pod, nodeName)
if !status.IsSuccess() {
t.Errorf("unexpected error: %v", status)
}
gotList = append(gotList, framework.NodeScore{Name: nodeName, Score: score})
}
status = p.(framework.ScorePlugin).ScoreExtensions().NormalizeScore(context.Background(), state, test.pod, gotList)
status = p.(framework.ScorePlugin).ScoreExtensions().NormalizeScore(ctx, state, test.pod, gotList)
if !status.IsSuccess() {
t.Errorf("unexpected error: %v", status)
}

View File

@ -279,8 +279,10 @@ func TestDryRunPreemption(t *testing.T) {
}
informerFactory := informers.NewSharedInformerFactory(clientsetfake.NewSimpleClientset(objs...), 0)
parallelism := parallelize.DefaultParallelism
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
fwk, err := st.NewFramework(
registeredPlugins, "",
registeredPlugins, "", ctx.Done(),
frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())),
frameworkruntime.WithInformerFactory(informerFactory),
frameworkruntime.WithParallelism(parallelism),
@ -290,8 +292,6 @@ func TestDryRunPreemption(t *testing.T) {
t.Fatal(err)
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
informerFactory.Start(ctx.Done())
informerFactory.WaitForCacheSync(ctx.Done())
snapshot := internalcache.NewSnapshot(tt.initPods, tt.nodes)

View File

@ -378,7 +378,7 @@ var (
errInjectedFilterStatus = errors.New("injected filter status")
)
func newFrameworkWithQueueSortAndBind(r Registry, profile config.KubeSchedulerProfile, opts ...Option) (framework.Framework, error) {
func newFrameworkWithQueueSortAndBind(r Registry, profile config.KubeSchedulerProfile, stopCh <-chan struct{}, opts ...Option) (framework.Framework, error) {
if _, ok := r[queueSortPlugin]; !ok {
r[queueSortPlugin] = newQueueSortPlugin
}
@ -392,7 +392,7 @@ func newFrameworkWithQueueSortAndBind(r Registry, profile config.KubeSchedulerPr
if len(profile.Plugins.Bind.Enabled) == 0 {
profile.Plugins.Bind.Enabled = append(profile.Plugins.Bind.Enabled, config.Plugin{Name: bindPlugin})
}
return NewFramework(r, &profile, wait.NeverStop, opts...)
return NewFramework(r, &profile, stopCh, opts...)
}
func TestInitFrameworkWithScorePlugins(t *testing.T) {
@ -433,7 +433,9 @@ func TestInitFrameworkWithScorePlugins(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
profile := config.KubeSchedulerProfile{Plugins: tt.plugins}
_, err := newFrameworkWithQueueSortAndBind(registry, profile)
stopCh := make(chan struct{})
defer close(stopCh)
_, err := newFrameworkWithQueueSortAndBind(registry, profile, stopCh)
if tt.initErr && err == nil {
t.Fatal("Framework initialization should fail")
}
@ -797,7 +799,9 @@ func TestNewFrameworkMultiPointExpansion(t *testing.T) {
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
fw, err := NewFramework(registry, &config.KubeSchedulerProfile{Plugins: tc.plugins}, wait.NeverStop)
stopCh := make(chan struct{})
defer close(stopCh)
fw, err := NewFramework(registry, &config.KubeSchedulerProfile{Plugins: tc.plugins}, stopCh)
if err != nil {
if tc.wantErr == "" || !strings.Contains(err.Error(), tc.wantErr) {
t.Fatalf("Unexpected error, got %v, expect: %s", err, tc.wantErr)
@ -962,7 +966,9 @@ func TestNewFrameworkFillEventToPluginMap(t *testing.T) {
got := make(map[framework.ClusterEvent]sets.String)
profile := config.KubeSchedulerProfile{Plugins: cfgPls}
_, err := newFrameworkWithQueueSortAndBind(registry, profile, WithClusterEventMap(got))
stopCh := make(chan struct{})
defer close(stopCh)
_, err := newFrameworkWithQueueSortAndBind(registry, profile, stopCh, WithClusterEventMap(got))
if err != nil {
t.Fatal(err)
}
@ -1167,12 +1173,14 @@ func TestRunScorePlugins(t *testing.T) {
Plugins: tt.plugins,
PluginConfig: tt.pluginConfigs,
}
f, err := newFrameworkWithQueueSortAndBind(registry, profile)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
f, err := newFrameworkWithQueueSortAndBind(registry, profile, ctx.Done())
if err != nil {
t.Fatalf("Failed to create framework for testing: %v", err)
}
res, status := f.RunScorePlugins(context.Background(), state, pod, nodes)
res, status := f.RunScorePlugins(ctx, state, pod, nodes)
if tt.err {
if status.IsSuccess() {
@ -1206,13 +1214,16 @@ func TestPreFilterPlugins(t *testing.T) {
plugins := &config.Plugins{PreFilter: config.PluginSet{Enabled: []config.Plugin{{Name: preFilterWithExtensionsPluginName}, {Name: preFilterPluginName}}}}
t.Run("TestPreFilterPlugin", func(t *testing.T) {
profile := config.KubeSchedulerProfile{Plugins: plugins}
f, err := newFrameworkWithQueueSortAndBind(r, profile)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
f, err := newFrameworkWithQueueSortAndBind(r, profile, ctx.Done())
if err != nil {
t.Fatalf("Failed to create framework for testing: %v", err)
}
f.RunPreFilterPlugins(context.Background(), nil, nil)
f.RunPreFilterExtensionAddPod(context.Background(), nil, nil, nil, nil)
f.RunPreFilterExtensionRemovePod(context.Background(), nil, nil, nil, nil)
f.RunPreFilterPlugins(ctx, nil, nil)
f.RunPreFilterExtensionAddPod(ctx, nil, nil, nil, nil)
f.RunPreFilterExtensionRemovePod(ctx, nil, nil, nil, nil)
if preFilter1.PreFilterCalled != 1 {
t.Errorf("preFilter1 called %v, expected: 1", preFilter1.PreFilterCalled)
@ -1396,11 +1407,13 @@ func TestFilterPlugins(t *testing.T) {
config.Plugin{Name: pl.name})
}
profile := config.KubeSchedulerProfile{Plugins: cfgPls}
f, err := newFrameworkWithQueueSortAndBind(registry, profile)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
f, err := newFrameworkWithQueueSortAndBind(registry, profile, ctx.Done())
if err != nil {
t.Fatalf("fail to create framework: %s", err)
}
gotStatusMap := f.RunFilterPlugins(context.TODO(), nil, pod, nil)
gotStatusMap := f.RunFilterPlugins(ctx, nil, pod, nil)
gotStatus := gotStatusMap.Merge()
if !reflect.DeepEqual(gotStatus, tt.wantStatus) {
t.Errorf("wrong status code. got: %v, want:%v", gotStatus, tt.wantStatus)
@ -1478,11 +1491,13 @@ func TestPostFilterPlugins(t *testing.T) {
)
}
profile := config.KubeSchedulerProfile{Plugins: cfgPls}
f, err := newFrameworkWithQueueSortAndBind(registry, profile)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
f, err := newFrameworkWithQueueSortAndBind(registry, profile, ctx.Done())
if err != nil {
t.Fatalf("fail to create framework: %s", err)
}
_, gotStatus := f.RunPostFilterPlugins(context.TODO(), nil, pod, nil)
_, gotStatus := f.RunPostFilterPlugins(ctx, nil, pod, nil)
if !reflect.DeepEqual(gotStatus, tt.wantStatus) {
t.Errorf("Unexpected status. got: %v, want: %v", gotStatus, tt.wantStatus)
}
@ -1625,12 +1640,14 @@ func TestFilterPluginsWithNominatedPods(t *testing.T) {
&framework.NominatingInfo{NominatingMode: framework.ModeOverride, NominatedNodeName: nodeName})
}
profile := config.KubeSchedulerProfile{Plugins: cfgPls}
f, err := newFrameworkWithQueueSortAndBind(registry, profile, WithPodNominator(podNominator))
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
f, err := newFrameworkWithQueueSortAndBind(registry, profile, ctx.Done(), WithPodNominator(podNominator))
if err != nil {
t.Fatalf("fail to create framework: %s", err)
}
tt.nodeInfo.SetNode(tt.node)
gotStatus := f.RunFilterPluginsWithNominatedPods(context.TODO(), nil, tt.pod, tt.nodeInfo)
gotStatus := f.RunFilterPluginsWithNominatedPods(ctx, nil, tt.pod, tt.nodeInfo)
if !reflect.DeepEqual(gotStatus, tt.wantStatus) {
t.Errorf("Unexpected status. got: %v, want: %v", gotStatus, tt.wantStatus)
}
@ -1780,12 +1797,14 @@ func TestPreBindPlugins(t *testing.T) {
)
}
profile := config.KubeSchedulerProfile{Plugins: configPlugins}
f, err := newFrameworkWithQueueSortAndBind(registry, profile)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
f, err := newFrameworkWithQueueSortAndBind(registry, profile, ctx.Done())
if err != nil {
t.Fatalf("fail to create framework: %s", err)
}
status := f.RunPreBindPlugins(context.TODO(), nil, pod, "")
status := f.RunPreBindPlugins(ctx, nil, pod, "")
if !reflect.DeepEqual(status, tt.wantStatus) {
t.Errorf("wrong status code. got %v, want %v", status, tt.wantStatus)
@ -1936,12 +1955,14 @@ func TestReservePlugins(t *testing.T) {
)
}
profile := config.KubeSchedulerProfile{Plugins: configPlugins}
f, err := newFrameworkWithQueueSortAndBind(registry, profile)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
f, err := newFrameworkWithQueueSortAndBind(registry, profile, ctx.Done())
if err != nil {
t.Fatalf("fail to create framework: %s", err)
}
status := f.RunReservePluginsReserve(context.TODO(), nil, pod, "")
status := f.RunReservePluginsReserve(ctx, nil, pod, "")
if !reflect.DeepEqual(status, tt.wantStatus) {
t.Errorf("wrong status code. got %v, want %v", status, tt.wantStatus)
@ -2060,12 +2081,14 @@ func TestPermitPlugins(t *testing.T) {
)
}
profile := config.KubeSchedulerProfile{Plugins: configPlugins}
f, err := newFrameworkWithQueueSortAndBind(registry, profile)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
f, err := newFrameworkWithQueueSortAndBind(registry, profile, ctx.Done())
if err != nil {
t.Fatalf("fail to create framework: %s", err)
}
status := f.RunPermitPlugins(context.TODO(), nil, pod, "")
status := f.RunPermitPlugins(ctx, nil, pod, "")
if !reflect.DeepEqual(status, tt.want) {
t.Errorf("wrong status code. got %v, want %v", status, tt.want)
@ -2236,8 +2259,9 @@ func TestRecordingMetrics(t *testing.T) {
SchedulerName: testProfileName,
Plugins: plugins,
}
f, err := newFrameworkWithQueueSortAndBind(r, profile, withMetricsRecorder(recorder))
f, err := newFrameworkWithQueueSortAndBind(r, profile, stopCh, withMetricsRecorder(recorder))
if err != nil {
close(stopCh)
t.Fatalf("Failed to create framework for testing: %v", err)
}
@ -2346,8 +2370,9 @@ func TestRunBindPlugins(t *testing.T) {
SchedulerName: testProfileName,
Plugins: plugins,
}
fwk, err := newFrameworkWithQueueSortAndBind(r, profile, withMetricsRecorder(recorder))
fwk, err := newFrameworkWithQueueSortAndBind(r, profile, stopCh, withMetricsRecorder(recorder))
if err != nil {
close(stopCh)
t.Fatal(err)
}
@ -2400,13 +2425,15 @@ func TestPermitWaitDurationMetric(t *testing.T) {
Permit: config.PluginSet{Enabled: []config.Plugin{{Name: testPlugin, Weight: 1}}},
}
profile := config.KubeSchedulerProfile{Plugins: plugins}
f, err := newFrameworkWithQueueSortAndBind(r, profile)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
f, err := newFrameworkWithQueueSortAndBind(r, profile, ctx.Done())
if err != nil {
t.Fatalf("Failed to create framework for testing: %v", err)
}
f.RunPermitPlugins(context.TODO(), nil, pod, "")
f.WaitOnPermit(context.TODO(), pod)
f.RunPermitPlugins(ctx, nil, pod, "")
f.WaitOnPermit(ctx, pod)
collectAndComparePermitWaitDuration(t, tt.wantRes)
})
@ -2454,12 +2481,14 @@ func TestWaitOnPermit(t *testing.T) {
Permit: config.PluginSet{Enabled: []config.Plugin{{Name: permitPlugin, Weight: 1}}},
}
profile := config.KubeSchedulerProfile{Plugins: plugins}
f, err := newFrameworkWithQueueSortAndBind(r, profile)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
f, err := newFrameworkWithQueueSortAndBind(r, profile, ctx.Done())
if err != nil {
t.Fatalf("Failed to create framework for testing: %v", err)
}
runPermitPluginsStatus := f.RunPermitPlugins(context.Background(), nil, pod, "")
runPermitPluginsStatus := f.RunPermitPlugins(ctx, nil, pod, "")
if runPermitPluginsStatus.Code() != framework.Wait {
t.Fatalf("Expected RunPermitPlugins to return status %v, but got %v",
framework.Wait, runPermitPluginsStatus.Code())
@ -2467,7 +2496,7 @@ func TestWaitOnPermit(t *testing.T) {
go tt.action(f)
got := f.WaitOnPermit(context.Background(), pod)
got := f.WaitOnPermit(ctx, pod)
if !reflect.DeepEqual(tt.want, got) {
t.Errorf("Unexpected status: want %v, but got %v", tt.want, got)
}
@ -2505,7 +2534,9 @@ func TestListPlugins(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
profile := config.KubeSchedulerProfile{Plugins: tt.plugins}
f, err := newFrameworkWithQueueSortAndBind(registry, profile)
stopCh := make(chan struct{})
defer close(stopCh)
f, err := newFrameworkWithQueueSortAndBind(registry, profile, stopCh)
if err != nil {
t.Fatalf("Failed to create framework for testing: %v", err)
}

View File

@ -573,17 +573,17 @@ func TestSchedulerScheduleOne(t *testing.T) {
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
fwk, err := st.NewFramework(registerPluginFuncs,
testSchedulerName,
ctx.Done(),
frameworkruntime.WithClientSet(client),
frameworkruntime.WithEventRecorder(eventBroadcaster.NewRecorder(scheme.Scheme, testSchedulerName)))
if err != nil {
t.Fatal(err)
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
s := newScheduler(
cache,
nil,
@ -1046,22 +1046,22 @@ func TestSchedulerBinding(t *testing.T) {
}
return false, nil, nil
})
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
fwk, err := st.NewFramework([]st.RegisterPluginFunc{
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
}, "", frameworkruntime.WithClientSet(client), frameworkruntime.WithEventRecorder(&events.FakeRecorder{}))
}, "", ctx.Done(), frameworkruntime.WithClientSet(client), frameworkruntime.WithEventRecorder(&events.FakeRecorder{}))
if err != nil {
t.Fatal(err)
}
stop := make(chan struct{})
defer close(stop)
sched := &Scheduler{
Extenders: test.extenders,
Cache: internalcache.New(100*time.Millisecond, stop),
Cache: internalcache.New(100*time.Millisecond, ctx.Done()),
nodeInfoSnapshot: nil,
percentageOfNodesToScore: 0,
}
err = sched.bind(context.Background(), fwk, pod, "node", nil)
err = sched.bind(ctx, fwk, pod, "node", nil)
if err != nil {
t.Error(err)
}
@ -2012,7 +2012,7 @@ func TestSchedulerSchedulePod(t *testing.T) {
}
snapshot := internalcache.NewSnapshot(test.pods, nodes)
fwk, err := st.NewFramework(
test.registerPlugins, "",
test.registerPlugins, "", ctx.Done(),
frameworkruntime.WithSnapshotSharedLister(snapshot),
frameworkruntime.WithInformerFactory(informerFactory),
frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())),
@ -2063,6 +2063,9 @@ func TestSchedulerSchedulePod(t *testing.T) {
func TestFindFitAllError(t *testing.T) {
nodes := makeNodeList([]string{"3", "2", "1"})
scheduler := makeScheduler(nodes)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
fwk, err := st.NewFramework(
[]st.RegisterPluginFunc{
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
@ -2071,13 +2074,14 @@ func TestFindFitAllError(t *testing.T) {
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
},
"",
ctx.Done(),
frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(nil)),
)
if err != nil {
t.Fatal(err)
}
_, diagnosis, err := scheduler.findNodesThatFitPod(context.Background(), fwk, framework.NewCycleState(), &v1.Pod{})
_, diagnosis, err := scheduler.findNodesThatFitPod(ctx, fwk, framework.NewCycleState(), &v1.Pod{})
if err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -2098,6 +2102,9 @@ func TestFindFitAllError(t *testing.T) {
func TestFindFitSomeError(t *testing.T) {
nodes := makeNodeList([]string{"3", "2", "1"})
scheduler := makeScheduler(nodes)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
fwk, err := st.NewFramework(
[]st.RegisterPluginFunc{
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
@ -2106,6 +2113,7 @@ func TestFindFitSomeError(t *testing.T) {
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
},
"",
ctx.Done(),
frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(nil)),
)
if err != nil {
@ -2113,7 +2121,7 @@ func TestFindFitSomeError(t *testing.T) {
}
pod := st.MakePod().Name("1").UID("1").Obj()
_, diagnosis, err := scheduler.findNodesThatFitPod(context.Background(), fwk, framework.NewCycleState(), pod)
_, diagnosis, err := scheduler.findNodesThatFitPod(ctx, fwk, framework.NewCycleState(), pod)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -2177,8 +2185,10 @@ func TestFindFitPredicateCallCounts(t *testing.T) {
registerFakeFilterFunc,
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
fwk, err := st.NewFramework(
registerPlugins, "",
registerPlugins, "", ctx.Done(),
frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(nil)),
)
if err != nil {
@ -2192,7 +2202,7 @@ func TestFindFitPredicateCallCounts(t *testing.T) {
fwk.AddNominatedPod(framework.NewPodInfo(st.MakePod().UID("nominated").Priority(midPriority).Obj()),
&framework.NominatingInfo{NominatingMode: framework.ModeOverride, NominatedNodeName: "1"})
_, _, err = scheduler.findNodesThatFitPod(context.Background(), fwk, framework.NewCycleState(), test.pod)
_, _, err = scheduler.findNodesThatFitPod(ctx, fwk, framework.NewCycleState(), test.pod)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -2311,8 +2321,10 @@ func TestZeroRequest(t *testing.T) {
st.RegisterPreScorePlugin(selectorspread.Name, selectorspread.New),
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
fwk, err := st.NewFramework(
pluginRegistrations, "",
pluginRegistrations, "", ctx.Done(),
frameworkruntime.WithInformerFactory(informerFactory),
frameworkruntime.WithSnapshotSharedLister(snapshot),
frameworkruntime.WithClientSet(client),
@ -2333,7 +2345,6 @@ func TestZeroRequest(t *testing.T) {
snapshot,
schedulerapi.DefaultPercentageOfNodesToScore)
ctx := context.Background()
state := framework.NewCycleState()
_, _, err = scheduler.findNodesThatFitPod(ctx, fwk, state, test.pod)
if err != nil {
@ -2416,6 +2427,8 @@ func TestFairEvaluationForNodes(t *testing.T) {
}
nodes := makeNodeList(nodeNames)
sched := makeScheduler(nodes)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
fwk, err := st.NewFramework(
[]st.RegisterPluginFunc{
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
@ -2423,6 +2436,7 @@ func TestFairEvaluationForNodes(t *testing.T) {
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
},
"",
ctx.Done(),
frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(nil)),
)
if err != nil {
@ -2435,7 +2449,7 @@ func TestFairEvaluationForNodes(t *testing.T) {
// Iterating over all nodes more than twice
for i := 0; i < 2*(numAllNodes/nodesToFind+1); i++ {
nodesThatFit, _, err := sched.findNodesThatFitPod(context.Background(), fwk, framework.NewCycleState(), &v1.Pod{})
nodesThatFit, _, err := sched.findNodesThatFitPod(ctx, fwk, framework.NewCycleState(), &v1.Pod{})
if err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -2496,8 +2510,10 @@ func TestPreferNominatedNodeFilterCallCounts(t *testing.T) {
registerFakeFilterFunc,
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
fwk, err := st.NewFramework(
registerPlugins, "",
registerPlugins, "", ctx.Done(),
frameworkruntime.WithClientSet(client),
frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())),
)
@ -2516,7 +2532,7 @@ func TestPreferNominatedNodeFilterCallCounts(t *testing.T) {
snapshot,
schedulerapi.DefaultPercentageOfNodesToScore)
_, _, err = scheduler.findNodesThatFitPod(context.Background(), fwk, framework.NewCycleState(), test.pod)
_, _, err = scheduler.findNodesThatFitPod(ctx, fwk, framework.NewCycleState(), test.pod)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -2657,6 +2673,7 @@ func setupTestScheduler(ctx context.Context, queuedPodStore *clientcache.FIFO, c
fwk, _ := st.NewFramework(
fns,
testSchedulerName,
ctx.Done(),
frameworkruntime.WithClientSet(client),
frameworkruntime.WithEventRecorder(recorder),
frameworkruntime.WithInformerFactory(informerFactory),

View File

@ -451,6 +451,7 @@ func initScheduler(stop <-chan struct{}, cache internalcache.Cache, queue intern
eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1()})
fwk, err := st.NewFramework(registerPluginFuncs,
testSchedulerName,
stop,
frameworkruntime.WithClientSet(client),
frameworkruntime.WithInformerFactory(informerFactory),
frameworkruntime.WithEventRecorder(eventBroadcaster.NewRecorder(scheme.Scheme, testSchedulerName)),
@ -552,7 +553,9 @@ func TestInitPluginsWithIndexers(t *testing.T) {
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
)
_, err := st.NewFramework(registerPluginFuncs, "test", frameworkruntime.WithInformerFactory(fakeInformerFactory))
stopCh := make(chan struct{})
defer close(stopCh)
_, err := st.NewFramework(registerPluginFuncs, "test", stopCh, frameworkruntime.WithInformerFactory(fakeInformerFactory))
if len(tt.wantErr) > 0 {
if err == nil || !strings.Contains(err.Error(), tt.wantErr) {

View File

@ -18,7 +18,6 @@ package testing
import (
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/kube-scheduler/config/v1beta2"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/apis/config/scheme"
@ -29,7 +28,7 @@ import (
var configDecoder = scheme.Codecs.UniversalDecoder()
// NewFramework creates a Framework from the register functions and options.
func NewFramework(fns []RegisterPluginFunc, profileName string, opts ...runtime.Option) (framework.Framework, error) {
func NewFramework(fns []RegisterPluginFunc, profileName string, stopCh <-chan struct{}, opts ...runtime.Option) (framework.Framework, error) {
registry := runtime.Registry{}
profile := &schedulerapi.KubeSchedulerProfile{
SchedulerName: profileName,
@ -38,7 +37,7 @@ func NewFramework(fns []RegisterPluginFunc, profileName string, opts ...runtime.
for _, f := range fns {
f(&registry, profile)
}
return runtime.NewFramework(registry, profile, wait.NeverStop, opts...)
return runtime.NewFramework(registry, profile, stopCh, opts...)
}
// RegisterPluginFunc is a function signature used in method RegisterFilterPlugin()