diff --git a/cmd/kube-scheduler/app/config/BUILD b/cmd/kube-scheduler/app/config/BUILD index 79ddd23c8a1..cdd87b7b8c1 100644 --- a/cmd/kube-scheduler/app/config/BUILD +++ b/cmd/kube-scheduler/app/config/BUILD @@ -9,7 +9,6 @@ go_library( "//pkg/scheduler/apis/config:go_default_library", "//staging/src/k8s.io/apiserver/pkg/server:go_default_library", "//staging/src/k8s.io/client-go/informers:go_default_library", - "//staging/src/k8s.io/client-go/informers/core/v1:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/rest:go_default_library", "//staging/src/k8s.io/client-go/tools/events:go_default_library", diff --git a/cmd/kube-scheduler/app/config/config.go b/cmd/kube-scheduler/app/config/config.go index dc85e6ab6eb..9ee3dcb0fc9 100644 --- a/cmd/kube-scheduler/app/config/config.go +++ b/cmd/kube-scheduler/app/config/config.go @@ -19,7 +19,6 @@ package config import ( apiserver "k8s.io/apiserver/pkg/server" "k8s.io/client-go/informers" - coreinformers "k8s.io/client-go/informers/core/v1" clientset "k8s.io/client-go/kubernetes" restclient "k8s.io/client-go/rest" "k8s.io/client-go/tools/events" @@ -43,7 +42,6 @@ type Config struct { Client clientset.Interface InformerFactory informers.SharedInformerFactory - PodInformer coreinformers.PodInformer //lint:ignore SA1019 this deprecated field still needs to be used for now. It will be removed once the migration is done. EventBroadcaster events.EventBroadcasterAdapter diff --git a/cmd/kube-scheduler/app/options/BUILD b/cmd/kube-scheduler/app/options/BUILD index 0465bf688cd..72577489c51 100644 --- a/cmd/kube-scheduler/app/options/BUILD +++ b/cmd/kube-scheduler/app/options/BUILD @@ -26,7 +26,6 @@ go_library( "//staging/src/k8s.io/apimachinery/pkg/util/validation/field:go_default_library", "//staging/src/k8s.io/apiserver/pkg/server/options:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", - "//staging/src/k8s.io/client-go/informers:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/rest:go_default_library", "//staging/src/k8s.io/client-go/tools/clientcmd:go_default_library", diff --git a/cmd/kube-scheduler/app/options/options.go b/cmd/kube-scheduler/app/options/options.go index 4549287b4bd..ebed9e4c521 100644 --- a/cmd/kube-scheduler/app/options/options.go +++ b/cmd/kube-scheduler/app/options/options.go @@ -28,7 +28,6 @@ import ( "k8s.io/apimachinery/pkg/util/uuid" apiserveroptions "k8s.io/apiserver/pkg/server/options" utilfeature "k8s.io/apiserver/pkg/util/feature" - "k8s.io/client-go/informers" clientset "k8s.io/client-go/kubernetes" restclient "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" @@ -287,8 +286,7 @@ func (o *Options) Config() (*schedulerappconfig.Config, error) { } c.Client = client - c.InformerFactory = informers.NewSharedInformerFactory(client, 0) - c.PodInformer = scheduler.NewPodInformer(client, 0) + c.InformerFactory = scheduler.NewInformerFactory(client, 0) c.LeaderElection = leaderElectionConfig return c, nil diff --git a/cmd/kube-scheduler/app/server.go b/cmd/kube-scheduler/app/server.go index 2d628ef373d..4b3614bb90b 100644 --- a/cmd/kube-scheduler/app/server.go +++ b/cmd/kube-scheduler/app/server.go @@ -185,7 +185,6 @@ func Run(ctx context.Context, cc *schedulerserverconfig.CompletedConfig, sched * } // Start all informers. - go cc.PodInformer.Informer().Run(ctx.Done()) cc.InformerFactory.Start(ctx.Done()) // Wait for all caches to sync before scheduling. @@ -316,7 +315,6 @@ func Setup(ctx context.Context, opts *options.Options, outOfTreeRegistryOptions // Create the scheduler. sched, err := scheduler.New(cc.Client, cc.InformerFactory, - cc.PodInformer, recorderFactory, ctx.Done(), scheduler.WithProfiles(cc.ComponentConfig.Profiles...), diff --git a/pkg/scheduler/BUILD b/pkg/scheduler/BUILD index f6f77557d85..2bda90dd759 100644 --- a/pkg/scheduler/BUILD +++ b/pkg/scheduler/BUILD @@ -40,7 +40,6 @@ go_library( "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//staging/src/k8s.io/client-go/informers:go_default_library", - "//staging/src/k8s.io/client-go/informers/core/v1:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/listers/core/v1:go_default_library", "//staging/src/k8s.io/client-go/tools/cache:go_default_library", diff --git a/pkg/scheduler/apis/config/testing/compatibility_test.go b/pkg/scheduler/apis/config/testing/compatibility_test.go index 18644c422ed..f22e5db6ba6 100644 --- a/pkg/scheduler/apis/config/testing/compatibility_test.go +++ b/pkg/scheduler/apis/config/testing/compatibility_test.go @@ -1338,7 +1338,6 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { sched, err := scheduler.New( client, informerFactory, - informerFactory.Core().V1().Pods(), recorderFactory, make(chan struct{}), scheduler.WithAlgorithmSource(algorithmSrc), @@ -1514,7 +1513,6 @@ func TestAlgorithmProviderCompatibility(t *testing.T) { sched, err := scheduler.New( client, informerFactory, - informerFactory.Core().V1().Pods(), recorderFactory, make(chan struct{}), opts..., @@ -1943,7 +1941,6 @@ func TestPluginsConfigurationCompatibility(t *testing.T) { sched, err := scheduler.New( client, informerFactory, - informerFactory.Core().V1().Pods(), recorderFactory, make(chan struct{}), scheduler.WithProfiles(config.KubeSchedulerProfile{ diff --git a/pkg/scheduler/eventhandlers.go b/pkg/scheduler/eventhandlers.go index 1809fc84645..9abfab6334a 100644 --- a/pkg/scheduler/eventhandlers.go +++ b/pkg/scheduler/eventhandlers.go @@ -27,7 +27,6 @@ import ( utilruntime "k8s.io/apimachinery/pkg/util/runtime" utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/informers" - coreinformers "k8s.io/client-go/informers/core/v1" "k8s.io/client-go/tools/cache" "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/scheduler/internal/queue" @@ -362,10 +361,9 @@ func (sched *Scheduler) skipPodUpdate(pod *v1.Pod) bool { func addAllEventHandlers( sched *Scheduler, informerFactory informers.SharedInformerFactory, - podInformer coreinformers.PodInformer, ) { // scheduled pod cache - podInformer.Informer().AddEventHandler( + informerFactory.Core().V1().Pods().Informer().AddEventHandler( cache.FilteringResourceEventHandler{ FilterFunc: func(obj interface{}) bool { switch t := obj.(type) { @@ -390,7 +388,7 @@ func addAllEventHandlers( }, ) // unscheduled pod queue - podInformer.Informer().AddEventHandler( + informerFactory.Core().V1().Pods().Informer().AddEventHandler( cache.FilteringResourceEventHandler{ FilterFunc: func(obj interface{}) bool { switch t := obj.(type) { diff --git a/pkg/scheduler/factory.go b/pkg/scheduler/factory.go index 8165968d28a..1e63fec98aa 100644 --- a/pkg/scheduler/factory.go +++ b/pkg/scheduler/factory.go @@ -27,14 +27,11 @@ import ( v1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/informers" - coreinformers "k8s.io/client-go/informers/core/v1" clientset "k8s.io/client-go/kubernetes" corelisters "k8s.io/client-go/listers/core/v1" - "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/scheduler/algorithmprovider" schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config" @@ -66,8 +63,6 @@ type Configurator struct { informerFactory informers.SharedInformerFactory - podInformer coreinformers.PodInformer - // Close this to stop all reflectors StopEverything <-chan struct{} @@ -178,7 +173,7 @@ func (c *Configurator) create() (*Scheduler, error) { // Setup cache debugger. debugger := cachedebugger.New( c.informerFactory.Core().V1().Nodes().Lister(), - c.podInformer.Lister(), + c.informerFactory.Core().V1().Pods().Lister(), c.schedulerCache, podQueue, ) @@ -412,29 +407,6 @@ func getPredicateConfigs(keys sets.String, lr *frameworkplugins.LegacyRegistry, return &plugins, pluginConfig, nil } -type podInformer struct { - informer cache.SharedIndexInformer -} - -func (i *podInformer) Informer() cache.SharedIndexInformer { - return i.informer -} - -func (i *podInformer) Lister() corelisters.PodLister { - return corelisters.NewPodLister(i.informer.GetIndexer()) -} - -// NewPodInformer creates a shared index informer that returns only non-terminal pods. -func NewPodInformer(client clientset.Interface, resyncPeriod time.Duration) coreinformers.PodInformer { - selector := fields.ParseSelectorOrDie( - "status.phase!=" + string(v1.PodSucceeded) + - ",status.phase!=" + string(v1.PodFailed)) - lw := cache.NewListWatchFromClient(client.CoreV1().RESTClient(), string(v1.ResourcePods), metav1.NamespaceAll, selector) - return &podInformer{ - informer: cache.NewSharedIndexInformer(lw, &v1.Pod{}, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}), - } -} - // MakeDefaultErrorFunc construct a function to handle pod scheduler error func MakeDefaultErrorFunc(client clientset.Interface, podLister corelisters.PodLister, podQueue internalqueue.SchedulingQueue, schedulerCache internalcache.Cache) func(*framework.QueuedPodInfo, error) { return func(podInfo *framework.QueuedPodInfo, err error) { diff --git a/pkg/scheduler/factory_test.go b/pkg/scheduler/factory_test.go index 3ab7608945b..5e6769e893f 100644 --- a/pkg/scheduler/factory_test.go +++ b/pkg/scheduler/factory_test.go @@ -453,7 +453,6 @@ func newConfigFactoryWithFrameworkRegistry( return &Configurator{ client: client, informerFactory: informerFactory, - podInformer: informerFactory.Core().V1().Pods(), disablePreemption: disablePodPreemption, percentageOfNodesToScore: schedulerapi.DefaultPercentageOfNodesToScore, podInitialBackoffSeconds: podInitialBackoffDurationSeconds, diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 7181e67ee3c..e1b9f91ffed 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -27,10 +27,10 @@ import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/informers" - coreinformers "k8s.io/client-go/informers/core/v1" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" @@ -83,8 +83,6 @@ type Scheduler struct { // Profiles are the scheduling profiles. Profiles profile.Map - scheduledPodsHasSynced func() bool - client clientset.Interface } @@ -185,7 +183,6 @@ var defaultSchedulerOptions = schedulerOptions{ // New returns a Scheduler func New(client clientset.Interface, informerFactory informers.SharedInformerFactory, - podInformer coreinformers.PodInformer, recorderFactory profile.RecorderFactory, stopCh <-chan struct{}, opts ...Option) (*Scheduler, error) { @@ -213,7 +210,6 @@ func New(client clientset.Interface, client: client, recorderFactory: recorderFactory, informerFactory: informerFactory, - podInformer: podInformer, schedulerCache: schedulerCache, StopEverything: stopEverything, percentageOfNodesToScore: options.percentageOfNodesToScore, @@ -266,9 +262,8 @@ func New(client clientset.Interface, // Additional tweaks to the config produced by the configurator. sched.StopEverything = stopEverything sched.client = client - sched.scheduledPodsHasSynced = podInformer.Informer().HasSynced - addAllEventHandlers(sched, informerFactory, podInformer) + addAllEventHandlers(sched, informerFactory) return sched, nil } @@ -310,9 +305,6 @@ func initPolicyFromConfigMap(client clientset.Interface, policyRef *schedulerapi // Run begins watching and scheduling. It waits for cache to be synced, then starts scheduling and blocked until the context is done. func (sched *Scheduler) Run(ctx context.Context) { - if !cache.WaitForCacheSync(ctx.Done(), sched.scheduledPodsHasSynced) { - return - } sched.SchedulingQueue.Run() wait.UntilWithContext(ctx, sched.scheduleOne, 0) sched.SchedulingQueue.Close() @@ -646,3 +638,20 @@ func defaultAlgorithmSourceProviderName() *string { provider := schedulerapi.SchedulerDefaultProviderName return &provider } + +// NewInformerFactory creates a SharedInformerFactory and initializes a scheduler specific +// in-place podInformer. +func NewInformerFactory(cs clientset.Interface, resyncPeriod time.Duration) informers.SharedInformerFactory { + informerFactory := informers.NewSharedInformerFactory(cs, resyncPeriod) + informerFactory.InformerFor(&v1.Pod{}, newPodInformer) + return informerFactory +} + +// newPodInformer creates a shared index informer that returns only non-terminal pods. +func newPodInformer(cs clientset.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer { + selector := fields.ParseSelectorOrDie( + "status.phase!=" + string(v1.PodSucceeded) + + ",status.phase!=" + string(v1.PodFailed)) + lw := cache.NewListWatchFromClient(cs.CoreV1().RESTClient(), string(v1.ResourcePods), metav1.NamespaceAll, selector) + return cache.NewSharedIndexInformer(lw, &v1.Pod{}, resyncPeriod, nil) +} diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index 00679e3a244..f2bdb84aa01 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -178,7 +178,6 @@ func TestSchedulerCreation(t *testing.T) { defer close(stopCh) s, err := New(client, informerFactory, - NewPodInformer(client, 0), profile.NewRecorderFactory(eventBroadcaster), stopCh, tc.opts..., @@ -457,7 +456,6 @@ func TestSchedulerMultipleProfilesScheduling(t *testing.T) { informerFactory := informers.NewSharedInformerFactory(client, 0) sched, err := New(client, informerFactory, - informerFactory.Core().V1().Pods(), profile.NewRecorderFactory(broadcaster), ctx.Done(), WithProfiles( diff --git a/test/integration/daemonset/daemonset_test.go b/test/integration/daemonset/daemonset_test.go index 804a4b835c8..5da26e95e81 100644 --- a/test/integration/daemonset/daemonset_test.go +++ b/test/integration/daemonset/daemonset_test.go @@ -91,7 +91,6 @@ func setupScheduler( sched, err := scheduler.New( cs, informerFactory, - informerFactory.Core().V1().Pods(), profile.NewRecorderFactory(eventBroadcaster), ctx.Done(), ) @@ -102,7 +101,6 @@ func setupScheduler( eventBroadcaster.StartRecordingToSink(ctx.Done()) go sched.Run(ctx) - return } func testLabels() map[string]string { diff --git a/test/integration/node/lifecycle_test.go b/test/integration/node/lifecycle_test.go index a8f5e58eca4..a0ba5c384af 100644 --- a/test/integration/node/lifecycle_test.go +++ b/test/integration/node/lifecycle_test.go @@ -131,7 +131,7 @@ func TestTaintBasedEvictions(t *testing.T) { podTolerations.SetExternalKubeClientSet(externalClientset) podTolerations.SetExternalKubeInformerFactory(externalInformers) - testCtx = testutils.InitTestScheduler(t, testCtx, true, nil) + testCtx = testutils.InitTestScheduler(t, testCtx, nil) defer testutils.CleanupTest(t, testCtx) cs := testCtx.ClientSet _, err := cs.CoreV1().Namespaces().Create(context.TODO(), testCtx.NS, metav1.CreateOptions{}) diff --git a/test/integration/scheduler/extender_test.go b/test/integration/scheduler/extender_test.go index 47ed7bf349d..f39600e5607 100644 --- a/test/integration/scheduler/extender_test.go +++ b/test/integration/scheduler/extender_test.go @@ -350,7 +350,7 @@ func TestSchedulerExtender(t *testing.T) { } policy.APIVersion = "v1" - testCtx = testutils.InitTestScheduler(t, testCtx, false, &policy) + testCtx = testutils.InitTestScheduler(t, testCtx, &policy) testutils.SyncInformerFactory(testCtx) go testCtx.Scheduler.Run(testCtx.Ctx) defer testutils.CleanupTest(t, testCtx) diff --git a/test/integration/scheduler/framework_test.go b/test/integration/scheduler/framework_test.go index 3996123c195..c63ee13ca48 100644 --- a/test/integration/scheduler/framework_test.go +++ b/test/integration/scheduler/framework_test.go @@ -1124,7 +1124,7 @@ func TestBindPlugin(t *testing.T) { } // Create the scheduler with the test plugin set. - testCtx := testutils.InitTestSchedulerWithOptions(t, testContext, false, nil, time.Second, + testCtx := testutils.InitTestSchedulerWithOptions(t, testContext, nil, time.Second, scheduler.WithProfiles(prof), scheduler.WithFrameworkOutOfTreeRegistry(registry)) testutils.SyncInformerFactory(testCtx) @@ -1846,7 +1846,7 @@ func TestPreemptWithPermitPlugin(t *testing.T) { } func initTestSchedulerForFrameworkTest(t *testing.T, testCtx *testutils.TestContext, nodeCount int, opts ...scheduler.Option) *testutils.TestContext { - testCtx = testutils.InitTestSchedulerWithOptions(t, testCtx, false, nil, time.Second, opts...) + testCtx = testutils.InitTestSchedulerWithOptions(t, testCtx, nil, time.Second, opts...) testutils.SyncInformerFactory(testCtx) go testCtx.Scheduler.Run(testCtx.Ctx) diff --git a/test/integration/scheduler/preemption_test.go b/test/integration/scheduler/preemption_test.go index 92a4f8db38d..214763ee3e1 100644 --- a/test/integration/scheduler/preemption_test.go +++ b/test/integration/scheduler/preemption_test.go @@ -149,7 +149,7 @@ func TestPreemption(t *testing.T) { } testCtx := testutils.InitTestSchedulerWithOptions(t, testutils.InitTestMaster(t, "preemption", nil), - false, nil, time.Second, + nil, time.Second, scheduler.WithProfiles(prof), scheduler.WithFrameworkOutOfTreeRegistry(registry)) testutils.SyncInformerFactory(testCtx) @@ -596,7 +596,7 @@ func TestDisablePreemption(t *testing.T) { // This test verifies that system critical priorities are created automatically and resolved properly. func TestPodPriorityResolution(t *testing.T) { admission := priority.NewPlugin() - testCtx := testutils.InitTestScheduler(t, testutils.InitTestMaster(t, "preemption", admission), true, nil) + testCtx := testutils.InitTestScheduler(t, testutils.InitTestMaster(t, "preemption", admission), nil) defer testutils.CleanupTest(t, testCtx) cs := testCtx.ClientSet diff --git a/test/integration/scheduler/priorities_test.go b/test/integration/scheduler/priorities_test.go index 28fdd782f00..ea0f1311a84 100644 --- a/test/integration/scheduler/priorities_test.go +++ b/test/integration/scheduler/priorities_test.go @@ -56,7 +56,6 @@ func initTestSchedulerForPriorityTest(t *testing.T, scorePluginName string) *tes testCtx := testutils.InitTestSchedulerWithOptions( t, testutils.InitTestMaster(t, strings.ToLower(scorePluginName), nil), - false, nil, 0, scheduler.WithProfiles(prof), diff --git a/test/integration/scheduler/scheduler_test.go b/test/integration/scheduler/scheduler_test.go index 4b15ffcc9ff..4493a5c27d7 100644 --- a/test/integration/scheduler/scheduler_test.go +++ b/test/integration/scheduler/scheduler_test.go @@ -31,7 +31,6 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" - "k8s.io/client-go/informers" clientset "k8s.io/client-go/kubernetes" corelisters "k8s.io/client-go/listers/core/v1" restclient "k8s.io/client-go/rest" @@ -64,7 +63,7 @@ func TestSchedulerCreationFromConfigMap(t *testing.T) { clientSet := clientset.NewForConfigOrDie(&restclient.Config{Host: s.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) defer clientSet.CoreV1().Nodes().DeleteCollection(context.TODO(), metav1.DeleteOptions{}, metav1.ListOptions{}) - informerFactory := informers.NewSharedInformerFactory(clientSet, 0) + informerFactory := scheduler.NewInformerFactory(clientSet, 0) for i, test := range []struct { policy string @@ -275,7 +274,6 @@ priorities: [] sched, err := scheduler.New(clientSet, informerFactory, - scheduler.NewPodInformer(clientSet, 0), profile.NewRecorderFactory(eventBroadcaster), nil, scheduler.WithAlgorithmSource(kubeschedulerconfig.SchedulerAlgorithmSource{ @@ -321,7 +319,7 @@ func TestSchedulerCreationFromNonExistentConfigMap(t *testing.T) { clientSet := clientset.NewForConfigOrDie(&restclient.Config{Host: s.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) defer clientSet.CoreV1().Nodes().DeleteCollection(context.TODO(), metav1.DeleteOptions{}, metav1.ListOptions{}) - informerFactory := informers.NewSharedInformerFactory(clientSet, 0) + informerFactory := scheduler.NewInformerFactory(clientSet, 0) eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: clientSet.EventsV1()}) stopCh := make(chan struct{}) @@ -329,7 +327,6 @@ func TestSchedulerCreationFromNonExistentConfigMap(t *testing.T) { _, err := scheduler.New(clientSet, informerFactory, - scheduler.NewPodInformer(clientSet, 0), profile.NewRecorderFactory(eventBroadcaster), nil, scheduler.WithAlgorithmSource(kubeschedulerconfig.SchedulerAlgorithmSource{ @@ -562,7 +559,7 @@ func TestMultipleSchedulers(t *testing.T) { // 5. create and start a scheduler with name "foo-scheduler" fooProf := kubeschedulerconfig.KubeSchedulerProfile{SchedulerName: fooScheduler} - testCtx = testutils.InitTestSchedulerWithOptions(t, testCtx, true, nil, time.Second, scheduler.WithProfiles(fooProf)) + testCtx = testutils.InitTestSchedulerWithOptions(t, testCtx, nil, time.Second, scheduler.WithProfiles(fooProf)) testutils.SyncInformerFactory(testCtx) go testCtx.Scheduler.Run(testCtx.Ctx) diff --git a/test/integration/scheduler/taint_test.go b/test/integration/scheduler/taint_test.go index bd7c3e91b5c..2f511325567 100644 --- a/test/integration/scheduler/taint_test.go +++ b/test/integration/scheduler/taint_test.go @@ -75,7 +75,7 @@ func TestTaintNodeByCondition(t *testing.T) { admission.SetExternalKubeClientSet(externalClientset) admission.SetExternalKubeInformerFactory(externalInformers) - testCtx = testutils.InitTestScheduler(t, testCtx, false, nil) + testCtx = testutils.InitTestScheduler(t, testCtx, nil) defer testutils.CleanupTest(t, testCtx) cs := testCtx.ClientSet diff --git a/test/integration/scheduler/util.go b/test/integration/scheduler/util.go index e752a81726f..633aa7222ef 100644 --- a/test/integration/scheduler/util.go +++ b/test/integration/scheduler/util.go @@ -80,7 +80,7 @@ func initDisruptionController(t *testing.T, testCtx *testutils.TestContext) *dis // initTest initializes a test environment and creates master and scheduler with default // configuration. func initTest(t *testing.T, nsPrefix string, opts ...scheduler.Option) *testutils.TestContext { - testCtx := testutils.InitTestSchedulerWithOptions(t, testutils.InitTestMaster(t, nsPrefix, nil), true, nil, time.Second, opts...) + testCtx := testutils.InitTestSchedulerWithOptions(t, testutils.InitTestMaster(t, nsPrefix, nil), nil, time.Second, opts...) testutils.SyncInformerFactory(testCtx) go testCtx.Scheduler.Run(testCtx.Ctx) return testCtx @@ -100,7 +100,7 @@ func initTestDisablePreemption(t *testing.T, nsPrefix string) *testutils.TestCon }, } testCtx := testutils.InitTestSchedulerWithOptions( - t, testutils.InitTestMaster(t, nsPrefix, nil), true, nil, + t, testutils.InitTestMaster(t, nsPrefix, nil), nil, time.Second, scheduler.WithProfiles(prof)) testutils.SyncInformerFactory(testCtx) go testCtx.Scheduler.Run(testCtx.Ctx) diff --git a/test/integration/util/util.go b/test/integration/util/util.go index 08d195f6dce..b269403fd85 100644 --- a/test/integration/util/util.go +++ b/test/integration/util/util.go @@ -73,12 +73,11 @@ func StartApiserver() (string, ShutdownFunc) { } // StartScheduler configures and starts a scheduler given a handle to the clientSet interface -// and event broadcaster. It returns the running scheduler and the shutdown function to stop it. +// and event broadcaster. It returns the running scheduler, podInformer and the shutdown function to stop it. func StartScheduler(clientSet clientset.Interface) (*scheduler.Scheduler, coreinformers.PodInformer, ShutdownFunc) { ctx, cancel := context.WithCancel(context.Background()) - informerFactory := informers.NewSharedInformerFactory(clientSet, 0) - podInformer := informerFactory.Core().V1().Pods() + informerFactory := scheduler.NewInformerFactory(clientSet, 0) evtBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{ Interface: clientSet.EventsV1()}) @@ -87,7 +86,6 @@ func StartScheduler(clientSet clientset.Interface) (*scheduler.Scheduler, corein sched, err := scheduler.New( clientSet, informerFactory, - podInformer, profile.NewRecorderFactory(evtBroadcaster), ctx.Done()) if err != nil { @@ -95,6 +93,7 @@ func StartScheduler(clientSet clientset.Interface) (*scheduler.Scheduler, corein } informerFactory.Start(ctx.Done()) + informerFactory.WaitForCacheSync(ctx.Done()) go sched.Run(ctx) shutdownFunc := func() { @@ -102,7 +101,7 @@ func StartScheduler(clientSet clientset.Interface) (*scheduler.Scheduler, corein cancel() klog.Infof("destroyed scheduler") } - return sched, podInformer, shutdownFunc + return sched, informerFactory.Core().V1().Pods(), shutdownFunc } // StartFakePVController is a simplified pv controller logic that sets PVC VolumeName and annotation for each PV binding. @@ -371,11 +370,10 @@ func WaitForSchedulerCacheCleanup(sched *scheduler.Scheduler, t *testing.T) { func InitTestScheduler( t *testing.T, testCtx *TestContext, - setPodInformer bool, policy *schedulerapi.Policy, ) *TestContext { // Pod preemption is enabled by default scheduler configuration. - return InitTestSchedulerWithOptions(t, testCtx, setPodInformer, policy, time.Second) + return InitTestSchedulerWithOptions(t, testCtx, policy, time.Second) } // InitTestSchedulerWithOptions initializes a test environment and creates a scheduler with default @@ -383,22 +381,13 @@ func InitTestScheduler( func InitTestSchedulerWithOptions( t *testing.T, testCtx *TestContext, - setPodInformer bool, policy *schedulerapi.Policy, resyncPeriod time.Duration, opts ...scheduler.Option, ) *TestContext { // 1. Create scheduler - testCtx.InformerFactory = informers.NewSharedInformerFactory(testCtx.ClientSet, resyncPeriod) + testCtx.InformerFactory = scheduler.NewInformerFactory(testCtx.ClientSet, resyncPeriod) - var podInformer coreinformers.PodInformer - - // create independent pod informer if required - if setPodInformer { - podInformer = scheduler.NewPodInformer(testCtx.ClientSet, 12*time.Hour) - } else { - podInformer = testCtx.InformerFactory.Core().V1().Pods() - } var err error eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{ Interface: testCtx.ClientSet.EventsV1(), @@ -410,7 +399,6 @@ func InitTestSchedulerWithOptions( testCtx.Scheduler, err = scheduler.New( testCtx.ClientSet, testCtx.InformerFactory, - podInformer, profile.NewRecorderFactory(eventBroadcaster), testCtx.Ctx.Done(), opts..., @@ -420,12 +408,6 @@ func InitTestSchedulerWithOptions( t.Fatalf("Couldn't create scheduler: %v", err) } - // set setPodInformer if provided. - if setPodInformer { - go podInformer.Informer().Run(testCtx.Scheduler.StopEverything) - cache.WaitForNamedCacheSync("scheduler", testCtx.Scheduler.StopEverything, podInformer.Informer().HasSynced) - } - stopCh := make(chan struct{}) eventBroadcaster.StartRecordingToSink(stopCh) diff --git a/test/integration/volumescheduling/util.go b/test/integration/volumescheduling/util.go index 10431f5fc40..b240f7e0e6f 100644 --- a/test/integration/volumescheduling/util.go +++ b/test/integration/volumescheduling/util.go @@ -115,7 +115,6 @@ func initTestSchedulerWithOptions( // 1. Create scheduler testCtx.informerFactory = informers.NewSharedInformerFactory(testCtx.clientSet, resyncPeriod) - podInformer := testCtx.informerFactory.Core().V1().Pods() eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{ Interface: testCtx.clientSet.EventsV1(), }) @@ -124,7 +123,6 @@ func initTestSchedulerWithOptions( testCtx.scheduler, err = scheduler.New( testCtx.clientSet, testCtx.informerFactory, - podInformer, profile.NewRecorderFactory(eventBroadcaster), testCtx.ctx.Done())