sched: start dynamicInformerFactory along with regular informerFactory (#105016)

* sched: start dynamicInformerFactory along with regular informerFactory

* fixup: start all informers and then wait for their syncs
This commit is contained in:
Wei Huang 2021-09-16 19:33:00 -07:00 committed by GitHub
parent 2f10e6587c
commit a689ad4cda
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 51 additions and 25 deletions

View File

@ -18,6 +18,7 @@ package config
import ( import (
apiserver "k8s.io/apiserver/pkg/server" apiserver "k8s.io/apiserver/pkg/server"
"k8s.io/client-go/dynamic/dynamicinformer"
"k8s.io/client-go/informers" "k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest" restclient "k8s.io/client-go/rest"
@ -40,9 +41,10 @@ type Config struct {
Authorization apiserver.AuthorizationInfo Authorization apiserver.AuthorizationInfo
SecureServing *apiserver.SecureServingInfo SecureServing *apiserver.SecureServingInfo
Client clientset.Interface Client clientset.Interface
KubeConfig *restclient.Config KubeConfig *restclient.Config
InformerFactory informers.SharedInformerFactory InformerFactory informers.SharedInformerFactory
DynInformerFactory dynamicinformer.DynamicSharedInformerFactory
//lint:ignore SA1019 this deprecated field still needs to be used for now. It will be removed once the migration is done. //lint:ignore SA1019 this deprecated field still needs to be used for now. It will be removed once the migration is done.
EventBroadcaster events.EventBroadcasterAdapter EventBroadcaster events.EventBroadcasterAdapter

View File

@ -27,6 +27,8 @@ import (
"k8s.io/apimachinery/pkg/util/uuid" "k8s.io/apimachinery/pkg/util/uuid"
apiserveroptions "k8s.io/apiserver/pkg/server/options" apiserveroptions "k8s.io/apiserver/pkg/server/options"
utilfeature "k8s.io/apiserver/pkg/util/feature" utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/dynamic/dynamicinformer"
clientset "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest" restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/tools/clientcmd"
@ -284,6 +286,8 @@ func (o *Options) Config() (*schedulerappconfig.Config, error) {
c.Client = client c.Client = client
c.KubeConfig = kubeConfig c.KubeConfig = kubeConfig
c.InformerFactory = scheduler.NewInformerFactory(client, 0) c.InformerFactory = scheduler.NewInformerFactory(client, 0)
dynClient := dynamic.NewForConfigOrDie(kubeConfig)
c.DynInformerFactory = dynamicinformer.NewFilteredDynamicSharedInformerFactory(dynClient, 0, corev1.NamespaceAll, nil)
c.LeaderElection = leaderElectionConfig c.LeaderElection = leaderElectionConfig
return c, nil return c, nil

View File

@ -180,9 +180,17 @@ func Run(ctx context.Context, cc *schedulerserverconfig.CompletedConfig, sched *
// Start all informers. // Start all informers.
cc.InformerFactory.Start(ctx.Done()) cc.InformerFactory.Start(ctx.Done())
// DynInformerFactory can be nil in tests.
if cc.DynInformerFactory != nil {
cc.DynInformerFactory.Start(ctx.Done())
}
// Wait for all caches to sync before scheduling. // Wait for all caches to sync before scheduling.
cc.InformerFactory.WaitForCacheSync(ctx.Done()) cc.InformerFactory.WaitForCacheSync(ctx.Done())
// DynInformerFactory can be nil in tests.
if cc.DynInformerFactory != nil {
cc.DynInformerFactory.WaitForCacheSync(ctx.Done())
}
// If leader election is enabled, runCommand via LeaderElector until done and exit. // If leader election is enabled, runCommand via LeaderElector until done and exit.
if cc.LeaderElection != nil { if cc.LeaderElection != nil {
@ -303,6 +311,7 @@ func Setup(ctx context.Context, opts *options.Options, outOfTreeRegistryOptions
// Create the scheduler. // Create the scheduler.
sched, err := scheduler.New(cc.Client, sched, err := scheduler.New(cc.Client,
cc.InformerFactory, cc.InformerFactory,
cc.DynInformerFactory,
recorderFactory, recorderFactory,
ctx.Done(), ctx.Done(),
scheduler.WithComponentConfigVersion(cc.ComponentConfig.TypeMeta.APIVersion), scheduler.WithComponentConfigVersion(cc.ComponentConfig.TypeMeta.APIVersion),

View File

@ -245,6 +245,7 @@ func TestPolicyCompatibility(t *testing.T) {
sched, err := scheduler.New( sched, err := scheduler.New(
client, client,
informerFactory, informerFactory,
nil,
recorderFactory, recorderFactory,
make(chan struct{}), make(chan struct{}),
scheduler.WithProfiles([]config.KubeSchedulerProfile(nil)...), scheduler.WithProfiles([]config.KubeSchedulerProfile(nil)...),

View File

@ -422,6 +422,7 @@ func TestCreateFromConfig(t *testing.T) {
_, err := New( _, err := New(
client, client,
informerFactory, informerFactory,
nil,
recorderFactory, recorderFactory,
make(chan struct{}), make(chan struct{}),
WithProfiles([]schedulerapi.KubeSchedulerProfile(nil)...), WithProfiles([]schedulerapi.KubeSchedulerProfile(nil)...),

View File

@ -31,7 +31,6 @@ import (
utilruntime "k8s.io/apimachinery/pkg/util/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/dynamic/dynamicinformer" "k8s.io/client-go/dynamic/dynamicinformer"
"k8s.io/client-go/informers" "k8s.io/client-go/informers"
coreinformers "k8s.io/client-go/informers/core/v1" coreinformers "k8s.io/client-go/informers/core/v1"
@ -216,6 +215,7 @@ var defaultSchedulerOptions = schedulerOptions{
// New returns a Scheduler // New returns a Scheduler
func New(client clientset.Interface, func New(client clientset.Interface,
informerFactory informers.SharedInformerFactory, informerFactory informers.SharedInformerFactory,
dynInformerFactory dynamicinformer.DynamicSharedInformerFactory,
recorderFactory profile.RecorderFactory, recorderFactory profile.RecorderFactory,
stopCh <-chan struct{}, stopCh <-chan struct{},
opts ...Option) (*Scheduler, error) { opts ...Option) (*Scheduler, error) {
@ -307,20 +307,8 @@ func New(client clientset.Interface,
sched.StopEverything = stopEverything sched.StopEverything = stopEverything
sched.client = client sched.client = client
// Build dynamic client and dynamic informer factory
var dynInformerFactory dynamicinformer.DynamicSharedInformerFactory
// options.kubeConfig can be nil in tests.
if options.kubeConfig != nil {
dynClient := dynamic.NewForConfigOrDie(options.kubeConfig)
dynInformerFactory = dynamicinformer.NewFilteredDynamicSharedInformerFactory(dynClient, 0, v1.NamespaceAll, nil)
}
addAllEventHandlers(sched, informerFactory, dynInformerFactory, unionedGVKs(clusterEventMap)) addAllEventHandlers(sched, informerFactory, dynInformerFactory, unionedGVKs(clusterEventMap))
if dynInformerFactory != nil {
dynInformerFactory.Start(stopEverything)
}
return sched, nil return sched, nil
} }

View File

@ -245,6 +245,7 @@ func TestSchedulerCreation(t *testing.T) {
s, err := New( s, err := New(
client, client,
informerFactory, informerFactory,
nil,
profile.NewRecorderFactory(eventBroadcaster), profile.NewRecorderFactory(eventBroadcaster),
stopCh, stopCh,
tc.opts..., tc.opts...,
@ -553,6 +554,7 @@ func TestSchedulerMultipleProfilesScheduling(t *testing.T) {
sched, err := New( sched, err := New(
client, client,
informerFactory, informerFactory,
nil,
profile.NewRecorderFactory(broadcaster), profile.NewRecorderFactory(broadcaster),
ctx.Done(), ctx.Done(),
WithProfiles( WithProfiles(

View File

@ -90,6 +90,7 @@ func setupScheduler(
sched, err := scheduler.New( sched, err := scheduler.New(
cs, cs,
informerFactory, informerFactory,
nil,
profile.NewRecorderFactory(eventBroadcaster), profile.NewRecorderFactory(eventBroadcaster),
ctx.Done(), ctx.Done(),
) )

View File

@ -280,6 +280,7 @@ priorities: []
sched, err := scheduler.New(clientSet, sched, err := scheduler.New(clientSet,
informerFactory, informerFactory,
nil,
profile.NewRecorderFactory(eventBroadcaster), profile.NewRecorderFactory(eventBroadcaster),
nil, nil,
scheduler.WithProfiles([]config.KubeSchedulerProfile(nil)...), scheduler.WithProfiles([]config.KubeSchedulerProfile(nil)...),
@ -337,6 +338,7 @@ func TestSchedulerCreationFromNonExistentConfigMap(t *testing.T) {
_, err := scheduler.New(clientSet, _, err := scheduler.New(clientSet,
informerFactory, informerFactory,
nil,
profile.NewRecorderFactory(eventBroadcaster), profile.NewRecorderFactory(eventBroadcaster),
nil, nil,
scheduler.WithLegacyPolicySource(&config.SchedulerPolicySource{ scheduler.WithLegacyPolicySource(&config.SchedulerPolicySource{

View File

@ -33,6 +33,8 @@ import (
"k8s.io/apimachinery/pkg/util/uuid" "k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/admission" "k8s.io/apiserver/pkg/admission"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/dynamic/dynamicinformer"
"k8s.io/client-go/informers" "k8s.io/client-go/informers"
coreinformers "k8s.io/client-go/informers/core/v1" coreinformers "k8s.io/client-go/informers/core/v1"
clientset "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes"
@ -87,6 +89,7 @@ func StartScheduler(clientSet clientset.Interface, kubeConfig *restclient.Config
sched, err := scheduler.New( sched, err := scheduler.New(
clientSet, clientSet,
informerFactory, informerFactory,
nil,
profile.NewRecorderFactory(evtBroadcaster), profile.NewRecorderFactory(evtBroadcaster),
ctx.Done(), ctx.Done(),
scheduler.WithKubeConfig(kubeConfig), scheduler.WithKubeConfig(kubeConfig),
@ -156,15 +159,16 @@ func StartFakePVController(clientSet clientset.Interface) ShutdownFunc {
// TestContext store necessary context info // TestContext store necessary context info
type TestContext struct { type TestContext struct {
CloseFn framework.CloseFunc CloseFn framework.CloseFunc
HTTPServer *httptest.Server HTTPServer *httptest.Server
NS *v1.Namespace NS *v1.Namespace
ClientSet clientset.Interface ClientSet clientset.Interface
KubeConfig *restclient.Config KubeConfig *restclient.Config
InformerFactory informers.SharedInformerFactory InformerFactory informers.SharedInformerFactory
Scheduler *scheduler.Scheduler DynInformerFactory dynamicinformer.DynamicSharedInformerFactory
Ctx context.Context Scheduler *scheduler.Scheduler
CancelFn context.CancelFunc Ctx context.Context
CancelFn context.CancelFunc
} }
// CleanupNodes cleans all nodes which were created during integration test // CleanupNodes cleans all nodes which were created during integration test
@ -192,7 +196,13 @@ func PodDeleted(c clientset.Interface, podNamespace, podName string) wait.Condit
// SyncInformerFactory starts informer and waits for caches to be synced // SyncInformerFactory starts informer and waits for caches to be synced
func SyncInformerFactory(testCtx *TestContext) { func SyncInformerFactory(testCtx *TestContext) {
testCtx.InformerFactory.Start(testCtx.Ctx.Done()) testCtx.InformerFactory.Start(testCtx.Ctx.Done())
if testCtx.DynInformerFactory != nil {
testCtx.DynInformerFactory.Start(testCtx.Ctx.Done())
}
testCtx.InformerFactory.WaitForCacheSync(testCtx.Ctx.Done()) testCtx.InformerFactory.WaitForCacheSync(testCtx.Ctx.Done())
if testCtx.DynInformerFactory != nil {
testCtx.DynInformerFactory.WaitForCacheSync(testCtx.Ctx.Done())
}
} }
// CleanupTest cleans related resources which were created during integration test // CleanupTest cleans related resources which were created during integration test
@ -395,6 +405,10 @@ func InitTestSchedulerWithOptions(
) *TestContext { ) *TestContext {
// 1. Create scheduler // 1. Create scheduler
testCtx.InformerFactory = scheduler.NewInformerFactory(testCtx.ClientSet, 0) testCtx.InformerFactory = scheduler.NewInformerFactory(testCtx.ClientSet, 0)
if testCtx.KubeConfig != nil {
dynClient := dynamic.NewForConfigOrDie(testCtx.KubeConfig)
testCtx.DynInformerFactory = dynamicinformer.NewFilteredDynamicSharedInformerFactory(dynClient, 0, v1.NamespaceAll, nil)
}
var err error var err error
eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{ eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{
@ -408,6 +422,7 @@ func InitTestSchedulerWithOptions(
testCtx.Scheduler, err = scheduler.New( testCtx.Scheduler, err = scheduler.New(
testCtx.ClientSet, testCtx.ClientSet,
testCtx.InformerFactory, testCtx.InformerFactory,
testCtx.DynInformerFactory,
profile.NewRecorderFactory(eventBroadcaster), profile.NewRecorderFactory(eventBroadcaster),
testCtx.Ctx.Done(), testCtx.Ctx.Done(),
opts..., opts...,

View File

@ -123,6 +123,7 @@ func initTestSchedulerWithOptions(
testCtx.scheduler, err = scheduler.New( testCtx.scheduler, err = scheduler.New(
testCtx.clientSet, testCtx.clientSet,
testCtx.informerFactory, testCtx.informerFactory,
nil,
profile.NewRecorderFactory(eventBroadcaster), profile.NewRecorderFactory(eventBroadcaster),
testCtx.ctx.Done()) testCtx.ctx.Done())