sched: fix goroutine leak in unit tests

This commit is contained in:
Wei Huang 2022-01-22 11:00:56 -08:00
parent be8a8482ed
commit 939e98135c
No known key found for this signature in database
GPG Key ID: 17AFE05D01EA77B2
9 changed files with 165 additions and 89 deletions

View File

@ -422,10 +422,11 @@ func TestAddAllEventHandlers(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
stopCh := make(chan struct{})
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
testSched := Scheduler{
StopEverything: stopCh,
SchedulingQueue: queue.NewTestQueue(context.Background(), nil),
StopEverything: ctx.Done(),
SchedulingQueue: queue.NewTestQueue(ctx, nil),
}
client := fake.NewSimpleClientset()

View File

@ -995,7 +995,8 @@ func TestRequiredAffinitySingleNode(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
snapshot := cache.NewSnapshot(test.pods, []*v1.Node{test.node})
fts := feature.Features{EnablePodAffinityNamespaceSelector: !test.disableNSSelector}
p := plugintesting.SetupPluginWithInformers(ctx, t, frameworkruntime.FactoryAdapter(fts, New), &config.InterPodAffinityArgs{}, snapshot, namespaces)
@ -1860,7 +1861,8 @@ func TestRequiredAffinityMultipleNodes(t *testing.T) {
for indexTest, test := range tests {
t.Run(test.name, func(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
snapshot := cache.NewSnapshot(test.pods, test.nodes)
p := plugintesting.SetupPluginWithInformers(ctx, t, frameworkruntime.FactoryAdapter(feature.Features{}, New), &config.InterPodAffinityArgs{}, snapshot,
[]runtime.Object{

View File

@ -742,7 +742,8 @@ func TestPreferredAffinity(t *testing.T) {
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
state := framework.NewCycleState()
fts := feature.Features{EnablePodAffinityNamespaceSelector: !test.disableNSSelector}
p := plugintesting.SetupPluginWithInformers(ctx, t, frameworkruntime.FactoryAdapter(fts, New), &config.InterPodAffinityArgs{HardPodAffinityWeight: 1}, cache.NewSnapshot(test.pods, test.nodes), namespaces)
@ -904,7 +905,8 @@ func TestPreferredAffinityWithHardPodAffinitySymmetricWeight(t *testing.T) {
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
state := framework.NewCycleState()
fts := feature.Features{EnablePodAffinityNamespaceSelector: !test.disableNSSelector}
p := plugintesting.SetupPluginWithInformers(ctx, t, frameworkruntime.FactoryAdapter(fts, New), &config.InterPodAffinityArgs{HardPodAffinityWeight: test.hardPodAffinityWeight}, cache.NewSnapshot(test.pods, test.nodes), namespaces)

View File

@ -513,7 +513,8 @@ func TestPreFilterState(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
args := &config.PodTopologySpreadArgs{
DefaultConstraints: tt.defaultConstraints,
DefaultingType: config.ListDefaulting,

View File

@ -244,7 +244,8 @@ func TestPreScoreStateEmptyNodes(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
informerFactory := informers.NewSharedInformerFactory(fake.NewSimpleClientset(tt.objs...), 0)
f, err := frameworkruntime.NewFramework(nil, nil,
frameworkruntime.WithSnapshotSharedLister(cache.NewSnapshot(nil, tt.nodes)),
@ -869,7 +870,8 @@ func BenchmarkTestDefaultEvenPodsSpreadPriority(b *testing.B) {
client := fake.NewSimpleClientset(
&v1.Service{Spec: v1.ServiceSpec{Selector: map[string]string{"foo": ""}}},
)
ctx := context.Background()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
informerFactory := informers.NewSharedInformerFactory(client, 0)
f, err := frameworkruntime.NewFramework(nil, nil,
frameworkruntime.WithSnapshotSharedLister(snapshot),

View File

@ -73,7 +73,9 @@ func TestGCEDiskConflicts(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
p := newPlugin(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
p := newPlugin(ctx, t)
gotStatus := p.(framework.FilterPlugin).Filter(context.Background(), nil, test.pod, test.nodeInfo)
if !reflect.DeepEqual(gotStatus, test.wantStatus) {
t.Errorf("status does not match: %v, want: %v", gotStatus, test.wantStatus)
@ -121,7 +123,9 @@ func TestAWSDiskConflicts(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
p := newPlugin(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
p := newPlugin(ctx, t)
gotStatus := p.(framework.FilterPlugin).Filter(context.Background(), nil, test.pod, test.nodeInfo)
if !reflect.DeepEqual(gotStatus, test.wantStatus) {
t.Errorf("status does not match: %v, want: %v", gotStatus, test.wantStatus)
@ -175,7 +179,9 @@ func TestRBDDiskConflicts(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
p := newPlugin(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
p := newPlugin(ctx, t)
gotStatus := p.(framework.FilterPlugin).Filter(context.Background(), nil, test.pod, test.nodeInfo)
if !reflect.DeepEqual(gotStatus, test.wantStatus) {
t.Errorf("status does not match: %v, want: %v", gotStatus, test.wantStatus)
@ -229,7 +235,9 @@ func TestISCSIDiskConflicts(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
p := newPlugin(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
p := newPlugin(ctx, t)
gotStatus := p.(framework.FilterPlugin).Filter(context.Background(), nil, test.pod, test.nodeInfo)
if !reflect.DeepEqual(gotStatus, test.wantStatus) {
t.Errorf("status does not match: %v, want: %v", gotStatus, test.wantStatus)
@ -355,7 +363,9 @@ func TestAccessModeConflicts(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
p := newPluginWithListers(t, test.existingPods, test.existingNodes, test.existingPVCs, test.enableReadWriteOncePod)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
p := newPluginWithListers(ctx, t, test.existingPods, test.existingNodes, test.existingPVCs, test.enableReadWriteOncePod)
gotStatus := p.(framework.PreFilterPlugin).PreFilter(context.Background(), nil, test.pod)
if !reflect.DeepEqual(gotStatus, test.wantStatus) {
t.Errorf("status does not match: %+v, want: %+v", gotStatus, test.wantStatus)
@ -364,12 +374,11 @@ func TestAccessModeConflicts(t *testing.T) {
}
}
func newPlugin(t *testing.T) framework.Plugin {
return newPluginWithListers(t, nil, nil, nil, true)
func newPlugin(ctx context.Context, t *testing.T) framework.Plugin {
return newPluginWithListers(ctx, t, nil, nil, nil, true)
}
func newPluginWithListers(t *testing.T, pods []*v1.Pod, nodes []*v1.Node, pvcs []*v1.PersistentVolumeClaim, enableReadWriteOncePod bool) framework.Plugin {
ctx := context.Background()
func newPluginWithListers(ctx context.Context, t *testing.T, pods []*v1.Pod, nodes []*v1.Node, pvcs []*v1.PersistentVolumeClaim, enableReadWriteOncePod bool) framework.Plugin {
pluginFactory := func(plArgs runtime.Object, fh framework.Handle) (framework.Plugin, error) {
return New(plArgs, fh, feature.Features{
EnableReadWriteOncePod: enableReadWriteOncePod,

View File

@ -976,7 +976,8 @@ func TestGenericScheduler(t *testing.T) {
cache.AddNode(node)
}
ctx := context.Background()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cs := clientsetfake.NewSimpleClientset()
informerFactory := informers.NewSharedInformerFactory(cs, 0)
for _, pvc := range test.pvcs {

View File

@ -131,7 +131,9 @@ func getUnschedulablePod(p *PriorityQueue, pod *v1.Pod) *v1.Pod {
func TestPriorityQueue_Add(t *testing.T) {
objs := []runtime.Object{medPriorityPodInfo.Pod, unschedulablePodInfo.Pod, highPriorityPodInfo.Pod}
q := NewTestQueueWithObjects(context.Background(), newDefaultQueueSort(), objs)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
q := NewTestQueueWithObjects(ctx, newDefaultQueueSort(), objs)
if err := q.Add(medPriorityPodInfo.Pod); err != nil {
t.Errorf("add failed: %v", err)
}
@ -174,7 +176,9 @@ func newDefaultQueueSort() framework.LessFunc {
func TestPriorityQueue_AddWithReversePriorityLessFunc(t *testing.T) {
objs := []runtime.Object{medPriorityPodInfo.Pod, highPriorityPodInfo.Pod}
q := NewTestQueueWithObjects(context.Background(), newDefaultQueueSort(), objs)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
q := NewTestQueueWithObjects(ctx, newDefaultQueueSort(), objs)
if err := q.Add(medPriorityPodInfo.Pod); err != nil {
t.Errorf("add failed: %v", err)
}
@ -191,7 +195,9 @@ func TestPriorityQueue_AddWithReversePriorityLessFunc(t *testing.T) {
func TestPriorityQueue_AddUnschedulableIfNotPresent(t *testing.T) {
objs := []runtime.Object{highPriNominatedPodInfo.Pod, unschedulablePodInfo.Pod}
q := NewTestQueueWithObjects(context.Background(), newDefaultQueueSort(), objs)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
q := NewTestQueueWithObjects(ctx, newDefaultQueueSort(), objs)
q.Add(highPriNominatedPodInfo.Pod)
q.AddUnschedulableIfNotPresent(newQueuedPodInfoForLookup(highPriNominatedPodInfo.Pod), q.SchedulingCycle()) // Must not add anything.
q.AddUnschedulableIfNotPresent(newQueuedPodInfoForLookup(unschedulablePodInfo.Pod), q.SchedulingCycle())
@ -223,7 +229,9 @@ func TestPriorityQueue_AddUnschedulableIfNotPresent(t *testing.T) {
// Pods in and before current scheduling cycle will be put back to activeQueue
// if we were trying to schedule them when we received move request.
func TestPriorityQueue_AddUnschedulableIfNotPresent_Backoff(t *testing.T) {
q := NewTestQueue(context.Background(), newDefaultQueueSort(), WithClock(testingclock.NewFakeClock(time.Now())))
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
q := NewTestQueue(ctx, newDefaultQueueSort(), WithClock(testingclock.NewFakeClock(time.Now())))
totalNum := 10
expectedPods := make([]v1.Pod, 0, totalNum)
for i := 0; i < totalNum; i++ {
@ -289,7 +297,9 @@ func TestPriorityQueue_AddUnschedulableIfNotPresent_Backoff(t *testing.T) {
func TestPriorityQueue_Pop(t *testing.T) {
objs := []runtime.Object{medPriorityPodInfo.Pod}
q := NewTestQueueWithObjects(context.Background(), newDefaultQueueSort(), objs)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
q := NewTestQueueWithObjects(ctx, newDefaultQueueSort(), objs)
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
@ -308,7 +318,9 @@ func TestPriorityQueue_Pop(t *testing.T) {
func TestPriorityQueue_Update(t *testing.T) {
objs := []runtime.Object{highPriorityPodInfo.Pod, unschedulablePodInfo.Pod, medPriorityPodInfo.Pod}
c := testingclock.NewFakeClock(time.Now())
q := NewTestQueueWithObjects(context.Background(), newDefaultQueueSort(), objs, WithClock(c))
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
q := NewTestQueueWithObjects(ctx, newDefaultQueueSort(), objs, WithClock(c))
q.Update(nil, highPriorityPodInfo.Pod)
if _, exists, _ := q.activeQ.Get(newQueuedPodInfoForLookup(highPriorityPodInfo.Pod)); !exists {
t.Errorf("Expected %v to be added to activeQ.", highPriorityPodInfo.Pod.Name)
@ -389,7 +401,9 @@ func TestPriorityQueue_Update(t *testing.T) {
func TestPriorityQueue_Delete(t *testing.T) {
objs := []runtime.Object{highPriorityPodInfo.Pod, unschedulablePodInfo.Pod}
q := NewTestQueueWithObjects(context.Background(), newDefaultQueueSort(), objs)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
q := NewTestQueueWithObjects(ctx, newDefaultQueueSort(), objs)
q.Update(highPriorityPodInfo.Pod, highPriNominatedPodInfo.Pod)
q.Add(unschedulablePodInfo.Pod)
if err := q.Delete(highPriNominatedPodInfo.Pod); err != nil {
@ -449,7 +463,9 @@ func TestPriorityQueue_Activate(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
var objs []runtime.Object
q := NewTestQueueWithObjects(context.Background(), newDefaultQueueSort(), objs)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
q := NewTestQueueWithObjects(ctx, newDefaultQueueSort(), objs)
// Prepare activeQ/unschedulableQ/podBackoffQ according to the table
for _, qPodInfo := range tt.qPodInfoInActiveQ {
@ -554,7 +570,9 @@ func BenchmarkMoveAllToActiveOrBackoffQueue(b *testing.B) {
}
}
q := NewTestQueue(context.Background(), newDefaultQueueSort(), WithClock(c), WithClusterEventMap(m))
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
q := NewTestQueue(ctx, newDefaultQueueSort(), WithClock(c), WithClusterEventMap(m))
// Init pods in unschedulableQ.
for j := 0; j < podsInUnschedulableQ; j++ {
@ -600,7 +618,9 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueue(t *testing.T) {
m := map[framework.ClusterEvent]sets.String{
{Resource: framework.Node, ActionType: framework.Add}: sets.NewString("fooPlugin"),
}
q := NewTestQueue(context.Background(), newDefaultQueueSort(), WithClock(c), WithClusterEventMap(m))
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
q := NewTestQueue(ctx, newDefaultQueueSort(), WithClock(c), WithClusterEventMap(m))
q.Add(medPriorityPodInfo.Pod)
q.AddUnschedulableIfNotPresent(q.newQueuedPodInfo(unschedulablePodInfo.Pod, "fooPlugin"), q.SchedulingCycle())
q.AddUnschedulableIfNotPresent(q.newQueuedPodInfo(highPriorityPodInfo.Pod, "fooPlugin"), q.SchedulingCycle())
@ -687,7 +707,9 @@ func TestPriorityQueue_AssignedPodAdded(t *testing.T) {
c := testingclock.NewFakeClock(time.Now())
m := map[framework.ClusterEvent]sets.String{AssignedPodAdd: sets.NewString("fakePlugin")}
q := NewTestQueue(context.Background(), newDefaultQueueSort(), WithClock(c), WithClusterEventMap(m))
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
q := NewTestQueue(ctx, newDefaultQueueSort(), WithClock(c), WithClusterEventMap(m))
q.Add(medPriorityPodInfo.Pod)
// Add a couple of pods to the unschedulableQ.
q.AddUnschedulableIfNotPresent(q.newQueuedPodInfo(unschedulablePodInfo.Pod, "fakePlugin"), q.SchedulingCycle())
@ -712,7 +734,9 @@ func TestPriorityQueue_AssignedPodAdded(t *testing.T) {
func TestPriorityQueue_NominatedPodsForNode(t *testing.T) {
objs := []runtime.Object{medPriorityPodInfo.Pod, unschedulablePodInfo.Pod, highPriorityPodInfo.Pod}
q := NewTestQueueWithObjects(context.Background(), newDefaultQueueSort(), objs)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
q := NewTestQueueWithObjects(ctx, newDefaultQueueSort(), objs)
q.Add(medPriorityPodInfo.Pod)
q.Add(unschedulablePodInfo.Pod)
q.Add(highPriorityPodInfo.Pod)
@ -766,7 +790,8 @@ func TestPriorityQueue_NominatedPodDeleted(t *testing.T) {
// Build a PriorityQueue.
q := NewPriorityQueue(newDefaultQueueSort(), informerFactory, WithPodNominator(NewPodNominator(podLister)))
ctx := context.Background()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
informerFactory.Start(ctx.Done())
informerFactory.WaitForCacheSync(ctx.Done())
@ -793,7 +818,9 @@ func TestPriorityQueue_PendingPods(t *testing.T) {
return pendingSet
}
q := NewTestQueue(context.Background(), newDefaultQueueSort())
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
q := NewTestQueue(ctx, newDefaultQueueSort())
q.Add(medPriorityPodInfo.Pod)
q.AddUnschedulableIfNotPresent(q.newQueuedPodInfo(unschedulablePodInfo.Pod), q.SchedulingCycle())
q.AddUnschedulableIfNotPresent(q.newQueuedPodInfo(highPriorityPodInfo.Pod), q.SchedulingCycle())
@ -811,7 +838,9 @@ func TestPriorityQueue_PendingPods(t *testing.T) {
func TestPriorityQueue_UpdateNominatedPodForNode(t *testing.T) {
objs := []runtime.Object{medPriorityPodInfo.Pod, unschedulablePodInfo.Pod, highPriorityPodInfo.Pod}
q := NewTestQueueWithObjects(context.Background(), newDefaultQueueSort(), objs)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
q := NewTestQueueWithObjects(ctx, newDefaultQueueSort(), objs)
if err := q.Add(medPriorityPodInfo.Pod); err != nil {
t.Errorf("add failed: %v", err)
}
@ -882,7 +911,9 @@ func TestPriorityQueue_UpdateNominatedPodForNode(t *testing.T) {
}
func TestPriorityQueue_NewWithOptions(t *testing.T) {
q := NewTestQueue(context.Background(),
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
q := NewTestQueue(ctx,
newDefaultQueueSort(),
WithPodInitialBackoffDuration(2*time.Second),
WithPodMaxBackoffDuration(20*time.Second),
@ -1048,7 +1079,9 @@ func TestUnschedulablePodsMap(t *testing.T) {
}
func TestSchedulingQueue_Close(t *testing.T) {
q := NewTestQueue(context.Background(), newDefaultQueueSort())
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
q := NewTestQueue(ctx, newDefaultQueueSort())
wantErr := fmt.Errorf(queueClosed)
wg := sync.WaitGroup{}
wg.Add(1)
@ -1072,7 +1105,9 @@ func TestSchedulingQueue_Close(t *testing.T) {
// are frequent events that move pods to the active queue.
func TestRecentlyTriedPodsGoBack(t *testing.T) {
c := testingclock.NewFakeClock(time.Now())
q := NewTestQueue(context.Background(), newDefaultQueueSort(), WithClock(c))
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
q := NewTestQueue(ctx, newDefaultQueueSort(), WithClock(c))
// Add a few pods to priority queue.
for i := 0; i < 5; i++ {
p := v1.Pod{
@ -1129,7 +1164,9 @@ func TestRecentlyTriedPodsGoBack(t *testing.T) {
// are frequent events that move pods to the active queue.
func TestPodFailedSchedulingMultipleTimesDoesNotBlockNewerPod(t *testing.T) {
c := testingclock.NewFakeClock(time.Now())
q := NewTestQueue(context.Background(), newDefaultQueueSort(), WithClock(c))
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
q := NewTestQueue(ctx, newDefaultQueueSort(), WithClock(c))
// Add an unschedulable pod to a priority queue.
// This makes a situation that the pod was tried to schedule
@ -1220,7 +1257,9 @@ func TestPodFailedSchedulingMultipleTimesDoesNotBlockNewerPod(t *testing.T) {
// TestHighPriorityBackoff tests that a high priority pod does not block
// other pods if it is unschedulable
func TestHighPriorityBackoff(t *testing.T) {
q := NewTestQueue(context.Background(), newDefaultQueueSort())
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
q := NewTestQueue(ctx, newDefaultQueueSort())
midPod := v1.Pod{
ObjectMeta: metav1.ObjectMeta{
@ -1287,7 +1326,9 @@ func TestHighPriorityFlushUnschedulableQLeftover(t *testing.T) {
m := map[framework.ClusterEvent]sets.String{
NodeAdd: sets.NewString("fakePlugin"),
}
q := NewTestQueue(context.Background(), newDefaultQueueSort(), WithClock(c), WithClusterEventMap(m))
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
q := NewTestQueue(ctx, newDefaultQueueSort(), WithClock(c), WithClusterEventMap(m))
midPod := v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "test-midpod",
@ -1471,7 +1512,9 @@ func TestPodTimestamp(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
queue := NewTestQueue(context.Background(), newDefaultQueueSort(), WithClock(testingclock.NewFakeClock(timestamp)))
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
queue := NewTestQueue(ctx, newDefaultQueueSort(), WithClock(testingclock.NewFakeClock(timestamp)))
var podInfoList []*framework.QueuedPodInfo
for i, op := range test.operations {
@ -1628,7 +1671,9 @@ scheduler_pending_pods{queue="unschedulable"} 0
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
resetMetrics()
queue := NewTestQueue(context.Background(), newDefaultQueueSort(), WithClock(testingclock.NewFakeClock(timestamp)))
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
queue := NewTestQueue(ctx, newDefaultQueueSort(), WithClock(testingclock.NewFakeClock(timestamp)))
for i, op := range test.operations {
for _, pInfo := range test.operands[i] {
op(queue, pInfo)
@ -1657,7 +1702,9 @@ func TestPerPodSchedulingMetrics(t *testing.T) {
// Case 1: A pod is created and scheduled after 1 attempt. The queue operations are
// Add -> Pop.
c := testingclock.NewFakeClock(timestamp)
queue := NewTestQueue(context.Background(), newDefaultQueueSort(), WithClock(c))
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
queue := NewTestQueue(ctx, newDefaultQueueSort(), WithClock(c))
queue.Add(pod)
pInfo, err := queue.Pop()
if err != nil {
@ -1668,7 +1715,7 @@ func TestPerPodSchedulingMetrics(t *testing.T) {
// Case 2: A pod is created and scheduled after 2 attempts. The queue operations are
// Add -> Pop -> AddUnschedulableIfNotPresent -> flushUnschedulableQLeftover -> Pop.
c = testingclock.NewFakeClock(timestamp)
queue = NewTestQueue(context.Background(), newDefaultQueueSort(), WithClock(c))
queue = NewTestQueue(ctx, newDefaultQueueSort(), WithClock(c))
queue.Add(pod)
pInfo, err = queue.Pop()
if err != nil {
@ -1688,7 +1735,7 @@ func TestPerPodSchedulingMetrics(t *testing.T) {
// Case 3: Similar to case 2, but before the second pop, call update, the queue operations are
// Add -> Pop -> AddUnschedulableIfNotPresent -> flushUnschedulableQLeftover -> Update -> Pop.
c = testingclock.NewFakeClock(timestamp)
queue = NewTestQueue(context.Background(), newDefaultQueueSort(), WithClock(c))
queue = NewTestQueue(ctx, newDefaultQueueSort(), WithClock(c))
queue.Add(pod)
pInfo, err = queue.Pop()
if err != nil {
@ -1786,7 +1833,9 @@ func TestIncomingPodsMetrics(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
metrics.SchedulerQueueIncomingPods.Reset()
queue := NewTestQueue(context.Background(), newDefaultQueueSort(), WithClock(testingclock.NewFakeClock(timestamp)))
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
queue := NewTestQueue(ctx, newDefaultQueueSort(), WithClock(testingclock.NewFakeClock(timestamp)))
for _, op := range test.operations {
for _, pInfo := range pInfos {
op(queue, pInfo)
@ -1812,7 +1861,9 @@ func checkPerPodSchedulingMetrics(name string, t *testing.T, pInfo *framework.Qu
func TestBackOffFlow(t *testing.T) {
cl := testingclock.NewFakeClock(time.Now())
q := NewTestQueue(context.Background(), newDefaultQueueSort(), WithClock(cl))
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
q := NewTestQueue(ctx, newDefaultQueueSort(), WithClock(cl))
steps := []struct {
wantBackoff time.Duration
}{
@ -1971,7 +2022,9 @@ func TestPodMatchesEvent(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
q := NewTestQueue(context.Background(), newDefaultQueueSort())
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
q := NewTestQueue(ctx, newDefaultQueueSort())
q.clusterEventMap = tt.clusterEventMap
if got := q.podMatchesEvent(tt.podInfo, tt.event); got != tt.want {
t.Errorf("Want %v, but got %v", tt.want, got)
@ -2026,7 +2079,9 @@ func TestMoveAllToActiveOrBackoffQueue_PreEnqueueChecks(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
q := NewTestQueue(context.Background(), newDefaultQueueSort())
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
q := NewTestQueue(ctx, newDefaultQueueSort())
for _, podInfo := range tt.podInfos {
q.AddUnschedulableIfNotPresent(podInfo, q.schedulingCycle)
}
@ -2100,7 +2155,9 @@ func TestPriorityQueue_calculateBackoffDuration(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
q := NewTestQueue(context.Background(), newDefaultQueueSort(), WithPodInitialBackoffDuration(tt.initialBackoffDuration), WithPodMaxBackoffDuration(tt.maxBackoffDuration))
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
q := NewTestQueue(ctx, newDefaultQueueSort(), WithPodInitialBackoffDuration(tt.initialBackoffDuration), WithPodMaxBackoffDuration(tt.maxBackoffDuration))
if got := q.calculateBackoffDuration(tt.podInfo); got != tt.want {
t.Errorf("PriorityQueue.calculateBackoffDuration() = %v, want %v", got, tt.want)
}

View File

@ -438,6 +438,8 @@ func TestSchedulerScheduleOne(t *testing.T) {
t.Fatal(err)
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
s := &Scheduler{
SchedulerCache: sCache,
Algorithm: item.algo,
@ -452,7 +454,7 @@ func TestSchedulerScheduleOne(t *testing.T) {
Profiles: profile.Map{
testSchedulerName: fwk,
},
SchedulingQueue: internalqueue.NewTestQueue(context.Background(), nil),
SchedulingQueue: internalqueue.NewTestQueue(ctx, nil),
}
called := make(chan struct{})
stopFunc := eventBroadcaster.StartEventWatcher(func(obj runtime.Object) {
@ -462,7 +464,7 @@ func TestSchedulerScheduleOne(t *testing.T) {
}
close(called)
})
s.scheduleOne(context.Background())
s.scheduleOne(ctx)
<-called
if e, a := item.expectAssumedPod, gotAssumedPod; !reflect.DeepEqual(e, a) {
t.Errorf("assumed pod: wanted %v, got %v", e, a)
@ -638,10 +640,10 @@ func TestSchedulerMultipleProfilesScheduling(t *testing.T) {
}
func TestSchedulerNoPhantomPodAfterExpire(t *testing.T) {
stop := make(chan struct{})
defer close(stop)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
queuedPodStore := clientcache.NewFIFO(clientcache.MetaNamespaceKeyFunc)
scache := internalcache.New(100*time.Millisecond, stop)
scache := internalcache.New(100*time.Millisecond, ctx.Done())
pod := podWithPort("pod.Name", "", 8080)
node := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "machine1", UID: types.UID("machine1")}}
scache.AddNode(&node)
@ -653,7 +655,7 @@ func TestSchedulerNoPhantomPodAfterExpire(t *testing.T) {
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
st.RegisterPluginAsExtensions(nodeports.Name, nodeports.New, "Filter", "PreFilter"),
}
scheduler, bindingChan, errChan := setupTestSchedulerWithOnePodOnNode(t, queuedPodStore, scache, informerFactory, stop, pod, &node, fns...)
scheduler, bindingChan, errChan := setupTestSchedulerWithOnePodOnNode(ctx, t, queuedPodStore, scache, informerFactory, pod, &node, fns...)
waitPodExpireChan := make(chan struct{})
timeout := make(chan struct{})
@ -689,7 +691,7 @@ func TestSchedulerNoPhantomPodAfterExpire(t *testing.T) {
// We use conflicted pod ports to incur fit predicate failure if first pod not removed.
secondPod := podWithPort("bar", "", 8080)
queuedPodStore.Add(secondPod)
scheduler.scheduleOne(context.Background())
scheduler.scheduleOne(ctx)
select {
case b := <-bindingChan:
expectBinding := &v1.Binding{
@ -705,10 +707,10 @@ func TestSchedulerNoPhantomPodAfterExpire(t *testing.T) {
}
func TestSchedulerNoPhantomPodAfterDelete(t *testing.T) {
stop := make(chan struct{})
defer close(stop)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
queuedPodStore := clientcache.NewFIFO(clientcache.MetaNamespaceKeyFunc)
scache := internalcache.New(10*time.Minute, stop)
scache := internalcache.New(10*time.Minute, ctx.Done())
firstPod := podWithPort("pod.Name", "", 8080)
node := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "machine1", UID: types.UID("machine1")}}
scache.AddNode(&node)
@ -719,7 +721,7 @@ func TestSchedulerNoPhantomPodAfterDelete(t *testing.T) {
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
st.RegisterPluginAsExtensions(nodeports.Name, nodeports.New, "Filter", "PreFilter"),
}
scheduler, bindingChan, errChan := setupTestSchedulerWithOnePodOnNode(t, queuedPodStore, scache, informerFactory, stop, firstPod, &node, fns...)
scheduler, bindingChan, errChan := setupTestSchedulerWithOnePodOnNode(ctx, t, queuedPodStore, scache, informerFactory, firstPod, &node, fns...)
// We use conflicted pod ports to incur fit predicate failure.
secondPod := podWithPort("bar", "", 8080)
@ -727,7 +729,7 @@ func TestSchedulerNoPhantomPodAfterDelete(t *testing.T) {
// queuedPodStore: [bar:8080]
// cache: [(assumed)foo:8080]
scheduler.scheduleOne(context.Background())
scheduler.scheduleOne(ctx)
select {
case err := <-errChan:
expectErr := &framework.FitError{
@ -760,7 +762,7 @@ func TestSchedulerNoPhantomPodAfterDelete(t *testing.T) {
}
queuedPodStore.Add(secondPod)
scheduler.scheduleOne(context.Background())
scheduler.scheduleOne(ctx)
select {
case b := <-bindingChan:
expectBinding := &v1.Binding{
@ -777,19 +779,19 @@ func TestSchedulerNoPhantomPodAfterDelete(t *testing.T) {
// queuedPodStore: pods queued before processing.
// cache: scheduler cache that might contain assumed pods.
func setupTestSchedulerWithOnePodOnNode(t *testing.T, queuedPodStore *clientcache.FIFO, scache internalcache.Cache,
informerFactory informers.SharedInformerFactory, stop chan struct{}, pod *v1.Pod, node *v1.Node, fns ...st.RegisterPluginFunc) (*Scheduler, chan *v1.Binding, chan error) {
func setupTestSchedulerWithOnePodOnNode(ctx context.Context, t *testing.T, queuedPodStore *clientcache.FIFO, scache internalcache.Cache,
informerFactory informers.SharedInformerFactory, pod *v1.Pod, node *v1.Node, fns ...st.RegisterPluginFunc) (*Scheduler, chan *v1.Binding, chan error) {
scheduler, bindingChan, errChan := setupTestScheduler(queuedPodStore, scache, informerFactory, nil, fns...)
scheduler, bindingChan, errChan := setupTestScheduler(ctx, queuedPodStore, scache, informerFactory, nil, fns...)
informerFactory.Start(stop)
informerFactory.WaitForCacheSync(stop)
informerFactory.Start(ctx.Done())
informerFactory.WaitForCacheSync(ctx.Done())
queuedPodStore.Add(pod)
// queuedPodStore: [foo:8080]
// cache: []
scheduler.scheduleOne(context.Background())
scheduler.scheduleOne(ctx)
// queuedPodStore: []
// cache: [(assumed)foo:8080]
@ -809,10 +811,10 @@ func setupTestSchedulerWithOnePodOnNode(t *testing.T, queuedPodStore *clientcach
}
func TestSchedulerFailedSchedulingReasons(t *testing.T) {
stop := make(chan struct{})
defer close(stop)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
queuedPodStore := clientcache.NewFIFO(clientcache.MetaNamespaceKeyFunc)
scache := internalcache.New(10*time.Minute, stop)
scache := internalcache.New(10*time.Minute, ctx.Done())
// Design the baseline for the pods, and we will make nodes that don't fit it later.
var cpu = int64(4)
@ -865,13 +867,13 @@ func TestSchedulerFailedSchedulingReasons(t *testing.T) {
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
st.RegisterPluginAsExtensions(noderesources.Name, frameworkruntime.FactoryAdapter(feature.Features{}, noderesources.NewFit), "Filter", "PreFilter"),
}
scheduler, _, errChan := setupTestScheduler(queuedPodStore, scache, informerFactory, nil, fns...)
scheduler, _, errChan := setupTestScheduler(ctx, queuedPodStore, scache, informerFactory, nil, fns...)
informerFactory.Start(stop)
informerFactory.WaitForCacheSync(stop)
informerFactory.Start(ctx.Done())
informerFactory.WaitForCacheSync(ctx.Done())
queuedPodStore.Add(podWithTooBigResourceRequests)
scheduler.scheduleOne(context.Background())
scheduler.scheduleOne(ctx)
select {
case err := <-errChan:
expectErr := &framework.FitError{
@ -895,7 +897,7 @@ func TestSchedulerFailedSchedulingReasons(t *testing.T) {
// queuedPodStore: pods queued before processing.
// scache: scheduler cache that might contain assumed pods.
func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.Cache, informerFactory informers.SharedInformerFactory, broadcaster events.EventBroadcaster, fns ...st.RegisterPluginFunc) (*Scheduler, chan *v1.Binding, chan error) {
func setupTestScheduler(ctx context.Context, queuedPodStore *clientcache.FIFO, scache internalcache.Cache, informerFactory informers.SharedInformerFactory, broadcaster events.EventBroadcaster, fns ...st.RegisterPluginFunc) (*Scheduler, chan *v1.Binding, chan error) {
bindingChan := make(chan *v1.Binding, 1)
client := clientsetfake.NewSimpleClientset()
client.PrependReactor("create", "pods", func(action clienttesting.Action) (bool, runtime.Object, error) {
@ -943,13 +945,13 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.C
testSchedulerName: fwk,
},
client: client,
SchedulingQueue: internalqueue.NewTestQueue(context.Background(), nil),
SchedulingQueue: internalqueue.NewTestQueue(ctx, nil),
}
return sched, bindingChan, errChan
}
func setupTestSchedulerWithVolumeBinding(volumeBinder volumebinding.SchedulerVolumeBinder, stop <-chan struct{}, broadcaster events.EventBroadcaster) (*Scheduler, chan *v1.Binding, chan error) {
func setupTestSchedulerWithVolumeBinding(ctx context.Context, volumeBinder volumebinding.SchedulerVolumeBinder, broadcaster events.EventBroadcaster) (*Scheduler, chan *v1.Binding, chan error) {
testNode := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "machine1", UID: types.UID("machine1")}}
queuedPodStore := clientcache.NewFIFO(clientcache.MetaNamespaceKeyFunc)
pod := podWithID("foo", "")
@ -957,7 +959,7 @@ func setupTestSchedulerWithVolumeBinding(volumeBinder volumebinding.SchedulerVol
pod.Spec.Volumes = append(pod.Spec.Volumes, v1.Volume{Name: "testVol",
VolumeSource: v1.VolumeSource{PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{ClaimName: "testPVC"}}})
queuedPodStore.Add(pod)
scache := internalcache.New(10*time.Minute, stop)
scache := internalcache.New(10*time.Minute, ctx.Done())
scache.AddNode(&testNode)
testPVC := v1.PersistentVolumeClaim{ObjectMeta: metav1.ObjectMeta{Name: "testPVC", Namespace: pod.Namespace, UID: types.UID("testPVC")}}
client := clientsetfake.NewSimpleClientset(&testNode, &testPVC)
@ -972,9 +974,9 @@ func setupTestSchedulerWithVolumeBinding(volumeBinder volumebinding.SchedulerVol
return &volumebinding.VolumeBinding{Binder: volumeBinder, PVCLister: pvcInformer.Lister()}, nil
}, "PreFilter", "Filter", "Reserve", "PreBind"),
}
s, bindingChan, errChan := setupTestScheduler(queuedPodStore, scache, informerFactory, broadcaster, fns...)
informerFactory.Start(stop)
informerFactory.WaitForCacheSync(stop)
s, bindingChan, errChan := setupTestScheduler(ctx, queuedPodStore, scache, informerFactory, broadcaster, fns...)
informerFactory.Start(ctx.Done())
informerFactory.WaitForCacheSync(ctx.Done())
return s, bindingChan, errChan
}
@ -1079,9 +1081,10 @@ func TestSchedulerWithVolumeBinding(t *testing.T) {
for _, item := range table {
t.Run(item.name, func(t *testing.T) {
stop := make(chan struct{})
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
fakeVolumeBinder := volumebinding.NewFakeVolumeBinder(item.volumeBinderConfig)
s, bindingChan, errChan := setupTestSchedulerWithVolumeBinding(fakeVolumeBinder, stop, eventBroadcaster)
s, bindingChan, errChan := setupTestSchedulerWithVolumeBinding(ctx, fakeVolumeBinder, eventBroadcaster)
eventChan := make(chan struct{})
stopFunc := eventBroadcaster.StartEventWatcher(func(obj runtime.Object) {
e, _ := obj.(*eventsv1.Event)
@ -1090,7 +1093,7 @@ func TestSchedulerWithVolumeBinding(t *testing.T) {
}
close(eventChan)
})
s.scheduleOne(context.Background())
s.scheduleOne(ctx)
// Wait for pod to succeed or fail scheduling
select {
case <-eventChan:
@ -1127,8 +1130,6 @@ func TestSchedulerWithVolumeBinding(t *testing.T) {
if item.expectBindCalled != fakeVolumeBinder.BindCalled {
t.Errorf("expectedBindCall %v", item.expectBindCalled)
}
close(stop)
})
}
}