From a689ad4cda7b5008cc84119fb910218d2e46b488 Mon Sep 17 00:00:00 2001 From: Wei Huang Date: Thu, 16 Sep 2021 19:33:00 -0700 Subject: [PATCH] sched: start dynamicInformerFactory along with regular informerFactory (#105016) * sched: start dynamicInformerFactory along with regular informerFactory * fixup: start all informers and then wait for their syncs --- cmd/kube-scheduler/app/config/config.go | 8 +++-- cmd/kube-scheduler/app/options/options.go | 4 +++ cmd/kube-scheduler/app/server.go | 9 +++++ .../apis/config/testing/compatibility_test.go | 1 + pkg/scheduler/factory_test.go | 1 + pkg/scheduler/scheduler.go | 14 +------- pkg/scheduler/scheduler_test.go | 2 ++ test/integration/daemonset/daemonset_test.go | 1 + test/integration/scheduler/scheduler_test.go | 2 ++ test/integration/util/util.go | 33 ++++++++++++++----- test/integration/volumescheduling/util.go | 1 + 11 files changed, 51 insertions(+), 25 deletions(-) diff --git a/cmd/kube-scheduler/app/config/config.go b/cmd/kube-scheduler/app/config/config.go index 45574e7f306..b050626e10a 100644 --- a/cmd/kube-scheduler/app/config/config.go +++ b/cmd/kube-scheduler/app/config/config.go @@ -18,6 +18,7 @@ package config import ( apiserver "k8s.io/apiserver/pkg/server" + "k8s.io/client-go/dynamic/dynamicinformer" "k8s.io/client-go/informers" clientset "k8s.io/client-go/kubernetes" restclient "k8s.io/client-go/rest" @@ -40,9 +41,10 @@ type Config struct { Authorization apiserver.AuthorizationInfo SecureServing *apiserver.SecureServingInfo - Client clientset.Interface - KubeConfig *restclient.Config - InformerFactory informers.SharedInformerFactory + Client clientset.Interface + KubeConfig *restclient.Config + InformerFactory informers.SharedInformerFactory + DynInformerFactory dynamicinformer.DynamicSharedInformerFactory //lint:ignore SA1019 this deprecated field still needs to be used for now. It will be removed once the migration is done. EventBroadcaster events.EventBroadcasterAdapter diff --git a/cmd/kube-scheduler/app/options/options.go b/cmd/kube-scheduler/app/options/options.go index 5c7d193e9bc..173aedf9287 100644 --- a/cmd/kube-scheduler/app/options/options.go +++ b/cmd/kube-scheduler/app/options/options.go @@ -27,6 +27,8 @@ import ( "k8s.io/apimachinery/pkg/util/uuid" apiserveroptions "k8s.io/apiserver/pkg/server/options" utilfeature "k8s.io/apiserver/pkg/util/feature" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/dynamic/dynamicinformer" clientset "k8s.io/client-go/kubernetes" restclient "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" @@ -284,6 +286,8 @@ func (o *Options) Config() (*schedulerappconfig.Config, error) { c.Client = client c.KubeConfig = kubeConfig c.InformerFactory = scheduler.NewInformerFactory(client, 0) + dynClient := dynamic.NewForConfigOrDie(kubeConfig) + c.DynInformerFactory = dynamicinformer.NewFilteredDynamicSharedInformerFactory(dynClient, 0, corev1.NamespaceAll, nil) c.LeaderElection = leaderElectionConfig return c, nil diff --git a/cmd/kube-scheduler/app/server.go b/cmd/kube-scheduler/app/server.go index 47d148dfaf6..ee5b5a462e1 100644 --- a/cmd/kube-scheduler/app/server.go +++ b/cmd/kube-scheduler/app/server.go @@ -180,9 +180,17 @@ func Run(ctx context.Context, cc *schedulerserverconfig.CompletedConfig, sched * // Start all informers. cc.InformerFactory.Start(ctx.Done()) + // DynInformerFactory can be nil in tests. + if cc.DynInformerFactory != nil { + cc.DynInformerFactory.Start(ctx.Done()) + } // Wait for all caches to sync before scheduling. cc.InformerFactory.WaitForCacheSync(ctx.Done()) + // DynInformerFactory can be nil in tests. + if cc.DynInformerFactory != nil { + cc.DynInformerFactory.WaitForCacheSync(ctx.Done()) + } // If leader election is enabled, runCommand via LeaderElector until done and exit. if cc.LeaderElection != nil { @@ -303,6 +311,7 @@ func Setup(ctx context.Context, opts *options.Options, outOfTreeRegistryOptions // Create the scheduler. sched, err := scheduler.New(cc.Client, cc.InformerFactory, + cc.DynInformerFactory, recorderFactory, ctx.Done(), scheduler.WithComponentConfigVersion(cc.ComponentConfig.TypeMeta.APIVersion), diff --git a/pkg/scheduler/apis/config/testing/compatibility_test.go b/pkg/scheduler/apis/config/testing/compatibility_test.go index 855b51794e6..e47fc987abe 100644 --- a/pkg/scheduler/apis/config/testing/compatibility_test.go +++ b/pkg/scheduler/apis/config/testing/compatibility_test.go @@ -245,6 +245,7 @@ func TestPolicyCompatibility(t *testing.T) { sched, err := scheduler.New( client, informerFactory, + nil, recorderFactory, make(chan struct{}), scheduler.WithProfiles([]config.KubeSchedulerProfile(nil)...), diff --git a/pkg/scheduler/factory_test.go b/pkg/scheduler/factory_test.go index 84c54b86a02..71be9fb33b6 100644 --- a/pkg/scheduler/factory_test.go +++ b/pkg/scheduler/factory_test.go @@ -422,6 +422,7 @@ func TestCreateFromConfig(t *testing.T) { _, err := New( client, informerFactory, + nil, recorderFactory, make(chan struct{}), WithProfiles([]schedulerapi.KubeSchedulerProfile(nil)...), diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 16841f84bd2..4cf15c0f3a9 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -31,7 +31,6 @@ import ( utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/client-go/dynamic" "k8s.io/client-go/dynamic/dynamicinformer" "k8s.io/client-go/informers" coreinformers "k8s.io/client-go/informers/core/v1" @@ -216,6 +215,7 @@ var defaultSchedulerOptions = schedulerOptions{ // New returns a Scheduler func New(client clientset.Interface, informerFactory informers.SharedInformerFactory, + dynInformerFactory dynamicinformer.DynamicSharedInformerFactory, recorderFactory profile.RecorderFactory, stopCh <-chan struct{}, opts ...Option) (*Scheduler, error) { @@ -307,20 +307,8 @@ func New(client clientset.Interface, sched.StopEverything = stopEverything sched.client = client - // Build dynamic client and dynamic informer factory - var dynInformerFactory dynamicinformer.DynamicSharedInformerFactory - // options.kubeConfig can be nil in tests. - if options.kubeConfig != nil { - dynClient := dynamic.NewForConfigOrDie(options.kubeConfig) - dynInformerFactory = dynamicinformer.NewFilteredDynamicSharedInformerFactory(dynClient, 0, v1.NamespaceAll, nil) - } - addAllEventHandlers(sched, informerFactory, dynInformerFactory, unionedGVKs(clusterEventMap)) - if dynInformerFactory != nil { - dynInformerFactory.Start(stopEverything) - } - return sched, nil } diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index 4ab0ea984ea..06be14085fb 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -245,6 +245,7 @@ func TestSchedulerCreation(t *testing.T) { s, err := New( client, informerFactory, + nil, profile.NewRecorderFactory(eventBroadcaster), stopCh, tc.opts..., @@ -553,6 +554,7 @@ func TestSchedulerMultipleProfilesScheduling(t *testing.T) { sched, err := New( client, informerFactory, + nil, profile.NewRecorderFactory(broadcaster), ctx.Done(), WithProfiles( diff --git a/test/integration/daemonset/daemonset_test.go b/test/integration/daemonset/daemonset_test.go index ec34573f370..d6b610bd112 100644 --- a/test/integration/daemonset/daemonset_test.go +++ b/test/integration/daemonset/daemonset_test.go @@ -90,6 +90,7 @@ func setupScheduler( sched, err := scheduler.New( cs, informerFactory, + nil, profile.NewRecorderFactory(eventBroadcaster), ctx.Done(), ) diff --git a/test/integration/scheduler/scheduler_test.go b/test/integration/scheduler/scheduler_test.go index dbbdf33d9b8..734e28c6dd4 100644 --- a/test/integration/scheduler/scheduler_test.go +++ b/test/integration/scheduler/scheduler_test.go @@ -280,6 +280,7 @@ priorities: [] sched, err := scheduler.New(clientSet, informerFactory, + nil, profile.NewRecorderFactory(eventBroadcaster), nil, scheduler.WithProfiles([]config.KubeSchedulerProfile(nil)...), @@ -337,6 +338,7 @@ func TestSchedulerCreationFromNonExistentConfigMap(t *testing.T) { _, err := scheduler.New(clientSet, informerFactory, + nil, profile.NewRecorderFactory(eventBroadcaster), nil, scheduler.WithLegacyPolicySource(&config.SchedulerPolicySource{ diff --git a/test/integration/util/util.go b/test/integration/util/util.go index 177fa69133b..577cc9b5f00 100644 --- a/test/integration/util/util.go +++ b/test/integration/util/util.go @@ -33,6 +33,8 @@ import ( "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apiserver/pkg/admission" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/dynamic/dynamicinformer" "k8s.io/client-go/informers" coreinformers "k8s.io/client-go/informers/core/v1" clientset "k8s.io/client-go/kubernetes" @@ -87,6 +89,7 @@ func StartScheduler(clientSet clientset.Interface, kubeConfig *restclient.Config sched, err := scheduler.New( clientSet, informerFactory, + nil, profile.NewRecorderFactory(evtBroadcaster), ctx.Done(), scheduler.WithKubeConfig(kubeConfig), @@ -156,15 +159,16 @@ func StartFakePVController(clientSet clientset.Interface) ShutdownFunc { // TestContext store necessary context info type TestContext struct { - CloseFn framework.CloseFunc - HTTPServer *httptest.Server - NS *v1.Namespace - ClientSet clientset.Interface - KubeConfig *restclient.Config - InformerFactory informers.SharedInformerFactory - Scheduler *scheduler.Scheduler - Ctx context.Context - CancelFn context.CancelFunc + CloseFn framework.CloseFunc + HTTPServer *httptest.Server + NS *v1.Namespace + ClientSet clientset.Interface + KubeConfig *restclient.Config + InformerFactory informers.SharedInformerFactory + DynInformerFactory dynamicinformer.DynamicSharedInformerFactory + Scheduler *scheduler.Scheduler + Ctx context.Context + CancelFn context.CancelFunc } // CleanupNodes cleans all nodes which were created during integration test @@ -192,7 +196,13 @@ func PodDeleted(c clientset.Interface, podNamespace, podName string) wait.Condit // SyncInformerFactory starts informer and waits for caches to be synced func SyncInformerFactory(testCtx *TestContext) { testCtx.InformerFactory.Start(testCtx.Ctx.Done()) + if testCtx.DynInformerFactory != nil { + testCtx.DynInformerFactory.Start(testCtx.Ctx.Done()) + } testCtx.InformerFactory.WaitForCacheSync(testCtx.Ctx.Done()) + if testCtx.DynInformerFactory != nil { + testCtx.DynInformerFactory.WaitForCacheSync(testCtx.Ctx.Done()) + } } // CleanupTest cleans related resources which were created during integration test @@ -395,6 +405,10 @@ func InitTestSchedulerWithOptions( ) *TestContext { // 1. Create scheduler testCtx.InformerFactory = scheduler.NewInformerFactory(testCtx.ClientSet, 0) + if testCtx.KubeConfig != nil { + dynClient := dynamic.NewForConfigOrDie(testCtx.KubeConfig) + testCtx.DynInformerFactory = dynamicinformer.NewFilteredDynamicSharedInformerFactory(dynClient, 0, v1.NamespaceAll, nil) + } var err error eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{ @@ -408,6 +422,7 @@ func InitTestSchedulerWithOptions( testCtx.Scheduler, err = scheduler.New( testCtx.ClientSet, testCtx.InformerFactory, + testCtx.DynInformerFactory, profile.NewRecorderFactory(eventBroadcaster), testCtx.Ctx.Done(), opts..., diff --git a/test/integration/volumescheduling/util.go b/test/integration/volumescheduling/util.go index 5441b5d5ce7..4217b7d5d76 100644 --- a/test/integration/volumescheduling/util.go +++ b/test/integration/volumescheduling/util.go @@ -123,6 +123,7 @@ func initTestSchedulerWithOptions( testCtx.scheduler, err = scheduler.New( testCtx.clientSet, testCtx.informerFactory, + nil, profile.NewRecorderFactory(eventBroadcaster), testCtx.ctx.Done())