diff --git a/cmd/libs/go2idl/informer-gen/generators/factory.go b/cmd/libs/go2idl/informer-gen/generators/factory.go index 5c2be00ab04..b9dad9b4d52 100644 --- a/cmd/libs/go2idl/informer-gen/generators/factory.go +++ b/cmd/libs/go2idl/informer-gen/generators/factory.go @@ -127,6 +127,28 @@ func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) { } } +// WaitForCacheSync waits for all started informers' cache were synced. +func (f *sharedInformerFactory) WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool { + informers := func()map[reflect.Type]cache.SharedIndexInformer{ + f.lock.Lock() + defer f.lock.Unlock() + + informers := map[reflect.Type]cache.SharedIndexInformer{} + for informerType, informer := range f.informers { + if f.startedInformers[informerType] { + informers[informerType] = informer + } + } + return informers + }() + + res := map[reflect.Type]bool{} + for informType, informer := range informers { + res[informType] = cache.WaitForCacheSync(stopCh, informer.HasSynced) + } + return res +} + // InternalInformerFor returns the SharedIndexInformer for obj using an internal // client. func (f *sharedInformerFactory) InformerFor(obj {{.runtimeObject|raw}}, newFunc {{.interfacesNewInformerFunc|raw}}) {{.cacheSharedIndexInformer|raw}} { @@ -152,6 +174,7 @@ var sharedInformerFactoryInterface = ` type SharedInformerFactory interface { {{.informerFactoryInterface|raw}} ForResource(resource {{.schemaGroupVersionResource|raw}}) (GenericInformer, error) + WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool {{$gvInterfaces := .gvInterfaces}} {{range $groupName, $group := .groupVersions}}{{$groupName}}() {{index $gvInterfaces $groupName|raw}} diff --git a/plugin/cmd/kube-scheduler/app/server.go b/plugin/cmd/kube-scheduler/app/server.go index c164a6283d5..00aa3166c95 100644 --- a/plugin/cmd/kube-scheduler/app/server.go +++ b/plugin/cmd/kube-scheduler/app/server.go @@ -94,6 +94,8 @@ func Run(s *options.SchedulerServer) error { stop := make(chan struct{}) defer close(stop) informerFactory.Start(stop) + // Waiting for all cache to sync before scheduling. + informerFactory.WaitForCacheSync(stop) run := func(_ <-chan struct{}) { sched.Run() diff --git a/plugin/pkg/scheduler/factory/factory.go b/plugin/pkg/scheduler/factory/factory.go index be8d5c34663..312276f122d 100644 --- a/plugin/pkg/scheduler/factory/factory.go +++ b/plugin/pkg/scheduler/factory/factory.go @@ -380,6 +380,9 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String, Algorithm: algo, Binder: &binder{f.client}, PodConditionUpdater: &podConditionUpdater{f.client}, + WaitForCacheSync: func() bool { + return cache.WaitForCacheSync(f.StopEverything, f.scheduledPodPopulator.HasSynced) + }, NextPod: func() *v1.Pod { return f.getNextPod() }, diff --git a/plugin/pkg/scheduler/scheduler.go b/plugin/pkg/scheduler/scheduler.go index 2fc87efc91a..bebcc454fb9 100644 --- a/plugin/pkg/scheduler/scheduler.go +++ b/plugin/pkg/scheduler/scheduler.go @@ -110,6 +110,10 @@ type Config struct { // stale while they sit in a channel. NextPod func() *v1.Pod + // WaitForCacheSync waits for scheduler cache to populate. + // It returns true if it was successful, false if the controller should shutdown. + WaitForCacheSync func() bool + // Error is called if there is an error. It is passed the pod in // question, and the error Error func(*v1.Pod, error) @@ -140,8 +144,12 @@ func NewFromConfigurator(c Configurator, modifiers ...func(c *Config)) (*Schedul return s, nil } -// Run begins watching and scheduling. It starts a goroutine and returns immediately. +// Run begins watching and scheduling. It waits for cache to be synced, then starts a goroutine and returns immediately. func (sched *Scheduler) Run() { + if !sched.config.WaitForCacheSync() { + return + } + go wait.Until(sched.scheduleOne, 0, sched.config.StopEverything) } diff --git a/plugin/pkg/scheduler/scheduler_test.go b/plugin/pkg/scheduler/scheduler_test.go index 18ff57d236e..e9c4c21fd9e 100644 --- a/plugin/pkg/scheduler/scheduler_test.go +++ b/plugin/pkg/scheduler/scheduler_test.go @@ -543,6 +543,9 @@ func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, sc bindingChan <- b return nil }}, + WaitForCacheSync: func() bool { + return true + }, NextPod: func() *v1.Pod { return clientcache.Pop(queuedPodStore).(*v1.Pod) },