diff --git a/pkg/scheduler/eventhandlers_test.go b/pkg/scheduler/eventhandlers_test.go index 8b4308894ce..b61b4866960 100644 --- a/pkg/scheduler/eventhandlers_test.go +++ b/pkg/scheduler/eventhandlers_test.go @@ -371,8 +371,9 @@ func TestAddAllEventHandlers(t *testing.T) { name: "default handlers in framework", gvkMap: map[framework.GVK]framework.ActionType{}, expectStaticInformers: map[reflect.Type]bool{ - reflect.TypeOf(&v1.Pod{}): true, - reflect.TypeOf(&v1.Node{}): true, + reflect.TypeOf(&v1.Pod{}): true, + reflect.TypeOf(&v1.Node{}): true, + reflect.TypeOf(&v1.Namespace{}): true, }, expectDynamicInformers: map[schema.GroupVersionResource]bool{}, }, @@ -386,6 +387,7 @@ func TestAddAllEventHandlers(t *testing.T) { expectStaticInformers: map[reflect.Type]bool{ reflect.TypeOf(&v1.Pod{}): true, reflect.TypeOf(&v1.Node{}): true, + reflect.TypeOf(&v1.Namespace{}): true, reflect.TypeOf(&v1.PersistentVolume{}): true, reflect.TypeOf(&storagev1beta1.CSIStorageCapacity{}): true, }, @@ -398,8 +400,9 @@ func TestAddAllEventHandlers(t *testing.T) { "cronjobs.v1.batch": framework.Delete, }, expectStaticInformers: map[reflect.Type]bool{ - reflect.TypeOf(&v1.Pod{}): true, - reflect.TypeOf(&v1.Node{}): true, + reflect.TypeOf(&v1.Pod{}): true, + reflect.TypeOf(&v1.Node{}): true, + reflect.TypeOf(&v1.Namespace{}): true, }, expectDynamicInformers: map[schema.GroupVersionResource]bool{ {Group: "apps", Version: "v1", Resource: "daemonsets"}: true, @@ -413,8 +416,9 @@ func TestAddAllEventHandlers(t *testing.T) { "custommetrics.v1beta1": framework.Update, }, expectStaticInformers: map[reflect.Type]bool{ - reflect.TypeOf(&v1.Pod{}): true, - reflect.TypeOf(&v1.Node{}): true, + reflect.TypeOf(&v1.Pod{}): true, + reflect.TypeOf(&v1.Node{}): true, + reflect.TypeOf(&v1.Namespace{}): true, }, expectDynamicInformers: map[schema.GroupVersionResource]bool{ {Group: "apps", Version: "v1", Resource: "daemonsets"}: true, @@ -433,13 +437,14 @@ func TestAddAllEventHandlers(t *testing.T) { t.Run(tt.name, func(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() + + informerFactory := informers.NewSharedInformerFactory(fake.NewSimpleClientset(), 0) + schedulingQueue := queue.NewTestQueueWithInformerFactory(ctx, nil, informerFactory) testSched := Scheduler{ StopEverything: ctx.Done(), - SchedulingQueue: queue.NewTestQueue(ctx, nil), + SchedulingQueue: schedulingQueue, } - client := fake.NewSimpleClientset() - informerFactory := informers.NewSharedInformerFactory(client, 0) dynclient := dyfake.NewSimpleDynamicClient(scheme) dynInformerFactory := dynamicinformer.NewDynamicSharedInformerFactory(dynclient, 0) diff --git a/pkg/scheduler/internal/queue/testing.go b/pkg/scheduler/internal/queue/testing.go index 7a11bc45f1c..e0ee28312e2 100644 --- a/pkg/scheduler/internal/queue/testing.go +++ b/pkg/scheduler/internal/queue/testing.go @@ -25,6 +25,11 @@ import ( "k8s.io/kubernetes/pkg/scheduler/framework" ) +// NewTestQueue creates a priority queue with an empty informer factory. +func NewTestQueue(ctx context.Context, lessFn framework.LessFunc, opts ...Option) *PriorityQueue { + return NewTestQueueWithObjects(ctx, lessFn, nil, opts...) +} + // NewTestQueueWithObjects creates a priority queue with an informer factory // populated with the provided objects. func NewTestQueueWithObjects( @@ -34,13 +39,17 @@ func NewTestQueueWithObjects( opts ...Option, ) *PriorityQueue { informerFactory := informers.NewSharedInformerFactory(fake.NewSimpleClientset(objs...), 0) + return NewTestQueueWithInformerFactory(ctx, lessFn, informerFactory, opts...) +} + +func NewTestQueueWithInformerFactory( + ctx context.Context, + lessFn framework.LessFunc, + informerFactory informers.SharedInformerFactory, + opts ...Option, +) *PriorityQueue { pq := NewPriorityQueue(lessFn, informerFactory, opts...) informerFactory.Start(ctx.Done()) informerFactory.WaitForCacheSync(ctx.Done()) return pq } - -// NewTestQueue creates a priority queue with an empty informer factory. -func NewTestQueue(ctx context.Context, lessFn framework.LessFunc, opts ...Option) *PriorityQueue { - return NewTestQueueWithObjects(ctx, lessFn, nil, opts...) -} diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index 2a77bc6d540..cf9efc2bffd 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -38,6 +38,7 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes/fake" clientsetfake "k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/kubernetes/scheme" clienttesting "k8s.io/client-go/testing" @@ -390,13 +391,6 @@ func TestSchedulerScheduleOne(t *testing.T) { }, } - stop := make(chan struct{}) - defer close(stop) - informerFactory := informers.NewSharedInformerFactory(client, 0) - - informerFactory.Start(stop) - informerFactory.WaitForCacheSync(stop) - for _, item := range table { t.Run(item.name, func(t *testing.T) { var gotError error @@ -440,6 +434,7 @@ func TestSchedulerScheduleOne(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() + s := &Scheduler{ SchedulerCache: sCache, Algorithm: item.algo, @@ -647,15 +642,13 @@ func TestSchedulerNoPhantomPodAfterExpire(t *testing.T) { pod := podWithPort("pod.Name", "", 8080) node := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "machine1", UID: types.UID("machine1")}} scache.AddNode(&node) - client := clientsetfake.NewSimpleClientset(&node) - informerFactory := informers.NewSharedInformerFactory(client, 0) fns := []st.RegisterPluginFunc{ st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), st.RegisterPluginAsExtensions(nodeports.Name, nodeports.New, "Filter", "PreFilter"), } - scheduler, bindingChan, errChan := setupTestSchedulerWithOnePodOnNode(ctx, t, queuedPodStore, scache, informerFactory, pod, &node, fns...) + scheduler, bindingChan, errChan := setupTestSchedulerWithOnePodOnNode(ctx, t, queuedPodStore, scache, pod, &node, fns...) waitPodExpireChan := make(chan struct{}) timeout := make(chan struct{}) @@ -714,14 +707,12 @@ func TestSchedulerNoPhantomPodAfterDelete(t *testing.T) { firstPod := podWithPort("pod.Name", "", 8080) node := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "machine1", UID: types.UID("machine1")}} scache.AddNode(&node) - client := clientsetfake.NewSimpleClientset(&node) - informerFactory := informers.NewSharedInformerFactory(client, 0) fns := []st.RegisterPluginFunc{ st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), st.RegisterPluginAsExtensions(nodeports.Name, nodeports.New, "Filter", "PreFilter"), } - scheduler, bindingChan, errChan := setupTestSchedulerWithOnePodOnNode(ctx, t, queuedPodStore, scache, informerFactory, firstPod, &node, fns...) + scheduler, bindingChan, errChan := setupTestSchedulerWithOnePodOnNode(ctx, t, queuedPodStore, scache, firstPod, &node, fns...) // We use conflicted pod ports to incur fit predicate failure. secondPod := podWithPort("bar", "", 8080) @@ -780,12 +771,8 @@ func TestSchedulerNoPhantomPodAfterDelete(t *testing.T) { // queuedPodStore: pods queued before processing. // cache: scheduler cache that might contain assumed pods. func setupTestSchedulerWithOnePodOnNode(ctx context.Context, t *testing.T, queuedPodStore *clientcache.FIFO, scache internalcache.Cache, - informerFactory informers.SharedInformerFactory, pod *v1.Pod, node *v1.Node, fns ...st.RegisterPluginFunc) (*Scheduler, chan *v1.Binding, chan error) { - - scheduler, bindingChan, errChan := setupTestScheduler(ctx, queuedPodStore, scache, informerFactory, nil, fns...) - - informerFactory.Start(ctx.Done()) - informerFactory.WaitForCacheSync(ctx.Done()) + pod *v1.Pod, node *v1.Node, fns ...st.RegisterPluginFunc) (*Scheduler, chan *v1.Binding, chan error) { + scheduler, bindingChan, errChan := setupTestScheduler(ctx, queuedPodStore, scache, nil, nil, fns...) queuedPodStore.Add(pod) // queuedPodStore: [foo:8080] @@ -850,8 +837,6 @@ func TestSchedulerFailedSchedulingReasons(t *testing.T) { nodes = append(nodes, &node) objects = append(objects, &node) } - client := clientsetfake.NewSimpleClientset(objects...) - informerFactory := informers.NewSharedInformerFactory(client, 0) // Create expected failure reasons for all the nodes. Hopefully they will get rolled up into a non-spammy summary. failedNodeStatues := framework.NodeToStatusMap{} @@ -867,10 +852,9 @@ func TestSchedulerFailedSchedulingReasons(t *testing.T) { st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), st.RegisterPluginAsExtensions(noderesources.Name, frameworkruntime.FactoryAdapter(feature.Features{}, noderesources.NewFit), "Filter", "PreFilter"), } - scheduler, _, errChan := setupTestScheduler(ctx, queuedPodStore, scache, informerFactory, nil, fns...) - informerFactory.Start(ctx.Done()) - informerFactory.WaitForCacheSync(ctx.Done()) + informerFactory := informers.NewSharedInformerFactory(fake.NewSimpleClientset(objects...), 0) + scheduler, _, errChan := setupTestScheduler(ctx, queuedPodStore, scache, informerFactory, nil, fns...) queuedPodStore.Add(podWithTooBigResourceRequests) scheduler.scheduleOne(ctx) @@ -916,6 +900,11 @@ func setupTestScheduler(ctx context.Context, queuedPodStore *clientcache.FIFO, s recorder = &events.FakeRecorder{} } + if informerFactory == nil { + informerFactory = informers.NewSharedInformerFactory(fake.NewSimpleClientset(), 0) + } + schedulingQueue := internalqueue.NewTestQueueWithInformerFactory(ctx, nil, informerFactory) + fwk, _ := st.NewFramework( fns, testSchedulerName, @@ -945,7 +934,7 @@ func setupTestScheduler(ctx context.Context, queuedPodStore *clientcache.FIFO, s testSchedulerName: fwk, }, client: client, - SchedulingQueue: internalqueue.NewTestQueue(ctx, nil), + SchedulingQueue: schedulingQueue, } return sched, bindingChan, errChan @@ -975,8 +964,6 @@ func setupTestSchedulerWithVolumeBinding(ctx context.Context, volumeBinder volum }, "PreFilter", "Filter", "Reserve", "PreBind"), } s, bindingChan, errChan := setupTestScheduler(ctx, queuedPodStore, scache, informerFactory, broadcaster, fns...) - informerFactory.Start(ctx.Done()) - informerFactory.WaitForCacheSync(ctx.Done()) return s, bindingChan, errChan }