From c78faec4ff286515dd728c010b1b4f839b05a687 Mon Sep 17 00:00:00 2001 From: Klaus Ma Date: Sun, 7 May 2017 13:45:02 +0800 Subject: [PATCH 1/3] Initialize scheduler cache with assigned non-terminated pods before scheduling. --- .../go2idl/informer-gen/generators/factory.go | 23 +++++++++++++++++++ plugin/cmd/kube-scheduler/app/server.go | 2 ++ plugin/pkg/scheduler/factory/factory.go | 3 +++ plugin/pkg/scheduler/scheduler.go | 10 +++++++- plugin/pkg/scheduler/scheduler_test.go | 3 +++ 5 files changed, 40 insertions(+), 1 deletion(-) diff --git a/cmd/libs/go2idl/informer-gen/generators/factory.go b/cmd/libs/go2idl/informer-gen/generators/factory.go index 5c2be00ab04..b9dad9b4d52 100644 --- a/cmd/libs/go2idl/informer-gen/generators/factory.go +++ b/cmd/libs/go2idl/informer-gen/generators/factory.go @@ -127,6 +127,28 @@ func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) { } } +// WaitForCacheSync waits for all started informers' cache were synced. +func (f *sharedInformerFactory) WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool { + informers := func()map[reflect.Type]cache.SharedIndexInformer{ + f.lock.Lock() + defer f.lock.Unlock() + + informers := map[reflect.Type]cache.SharedIndexInformer{} + for informerType, informer := range f.informers { + if f.startedInformers[informerType] { + informers[informerType] = informer + } + } + return informers + }() + + res := map[reflect.Type]bool{} + for informType, informer := range informers { + res[informType] = cache.WaitForCacheSync(stopCh, informer.HasSynced) + } + return res +} + // InternalInformerFor returns the SharedIndexInformer for obj using an internal // client. func (f *sharedInformerFactory) InformerFor(obj {{.runtimeObject|raw}}, newFunc {{.interfacesNewInformerFunc|raw}}) {{.cacheSharedIndexInformer|raw}} { @@ -152,6 +174,7 @@ var sharedInformerFactoryInterface = ` type SharedInformerFactory interface { {{.informerFactoryInterface|raw}} ForResource(resource {{.schemaGroupVersionResource|raw}}) (GenericInformer, error) + WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool {{$gvInterfaces := .gvInterfaces}} {{range $groupName, $group := .groupVersions}}{{$groupName}}() {{index $gvInterfaces $groupName|raw}} diff --git a/plugin/cmd/kube-scheduler/app/server.go b/plugin/cmd/kube-scheduler/app/server.go index c164a6283d5..00aa3166c95 100644 --- a/plugin/cmd/kube-scheduler/app/server.go +++ b/plugin/cmd/kube-scheduler/app/server.go @@ -94,6 +94,8 @@ func Run(s *options.SchedulerServer) error { stop := make(chan struct{}) defer close(stop) informerFactory.Start(stop) + // Waiting for all cache to sync before scheduling. + informerFactory.WaitForCacheSync(stop) run := func(_ <-chan struct{}) { sched.Run() diff --git a/plugin/pkg/scheduler/factory/factory.go b/plugin/pkg/scheduler/factory/factory.go index be8d5c34663..312276f122d 100644 --- a/plugin/pkg/scheduler/factory/factory.go +++ b/plugin/pkg/scheduler/factory/factory.go @@ -380,6 +380,9 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String, Algorithm: algo, Binder: &binder{f.client}, PodConditionUpdater: &podConditionUpdater{f.client}, + WaitForCacheSync: func() bool { + return cache.WaitForCacheSync(f.StopEverything, f.scheduledPodPopulator.HasSynced) + }, NextPod: func() *v1.Pod { return f.getNextPod() }, diff --git a/plugin/pkg/scheduler/scheduler.go b/plugin/pkg/scheduler/scheduler.go index 2fc87efc91a..bebcc454fb9 100644 --- a/plugin/pkg/scheduler/scheduler.go +++ b/plugin/pkg/scheduler/scheduler.go @@ -110,6 +110,10 @@ type Config struct { // stale while they sit in a channel. NextPod func() *v1.Pod + // WaitForCacheSync waits for scheduler cache to populate. + // It returns true if it was successful, false if the controller should shutdown. + WaitForCacheSync func() bool + // Error is called if there is an error. It is passed the pod in // question, and the error Error func(*v1.Pod, error) @@ -140,8 +144,12 @@ func NewFromConfigurator(c Configurator, modifiers ...func(c *Config)) (*Schedul return s, nil } -// Run begins watching and scheduling. It starts a goroutine and returns immediately. +// Run begins watching and scheduling. It waits for cache to be synced, then starts a goroutine and returns immediately. func (sched *Scheduler) Run() { + if !sched.config.WaitForCacheSync() { + return + } + go wait.Until(sched.scheduleOne, 0, sched.config.StopEverything) } diff --git a/plugin/pkg/scheduler/scheduler_test.go b/plugin/pkg/scheduler/scheduler_test.go index 18ff57d236e..e9c4c21fd9e 100644 --- a/plugin/pkg/scheduler/scheduler_test.go +++ b/plugin/pkg/scheduler/scheduler_test.go @@ -543,6 +543,9 @@ func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, sc bindingChan <- b return nil }}, + WaitForCacheSync: func() bool { + return true + }, NextPod: func() *v1.Pod { return clientcache.Pop(queuedPodStore).(*v1.Pod) }, From 7bf698a2c811873c5742fd0ad3ec42ca861e99d1 Mon Sep 17 00:00:00 2001 From: Klaus Ma Date: Tue, 9 May 2017 17:04:13 +0800 Subject: [PATCH 2/3] generated codes. --- .../externalversions/factory.go | 23 +++++++++++++++++++ .../internalversion/factory.go | 23 +++++++++++++++++++ .../informers/externalversions/factory.go | 23 +++++++++++++++++++ .../informers/internalversion/factory.go | 23 +++++++++++++++++++ .../informers/externalversions/factory.go | 23 +++++++++++++++++++ .../informers/internalversion/factory.go | 23 +++++++++++++++++++ 6 files changed, 138 insertions(+) diff --git a/pkg/client/informers/informers_generated/externalversions/factory.go b/pkg/client/informers/informers_generated/externalversions/factory.go index 761d756313d..913335df07d 100644 --- a/pkg/client/informers/informers_generated/externalversions/factory.go +++ b/pkg/client/informers/informers_generated/externalversions/factory.go @@ -73,6 +73,28 @@ func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) { } } +// WaitForCacheSync waits for all started informers' cache were synced. +func (f *sharedInformerFactory) WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool { + informers := func() map[reflect.Type]cache.SharedIndexInformer { + f.lock.Lock() + defer f.lock.Unlock() + + informers := map[reflect.Type]cache.SharedIndexInformer{} + for informerType, informer := range f.informers { + if f.startedInformers[informerType] { + informers[informerType] = informer + } + } + return informers + }() + + res := map[reflect.Type]bool{} + for informType, informer := range informers { + res[informType] = cache.WaitForCacheSync(stopCh, informer.HasSynced) + } + return res +} + // InternalInformerFor returns the SharedIndexInformer for obj using an internal // client. func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer { @@ -95,6 +117,7 @@ func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internal type SharedInformerFactory interface { internalinterfaces.SharedInformerFactory ForResource(resource schema.GroupVersionResource) (GenericInformer, error) + WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool Apps() apps.Interface Autoscaling() autoscaling.Interface diff --git a/pkg/client/informers/informers_generated/internalversion/factory.go b/pkg/client/informers/informers_generated/internalversion/factory.go index 5e0798f57c9..b7613c1d211 100644 --- a/pkg/client/informers/informers_generated/internalversion/factory.go +++ b/pkg/client/informers/informers_generated/internalversion/factory.go @@ -73,6 +73,28 @@ func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) { } } +// WaitForCacheSync waits for all started informers' cache were synced. +func (f *sharedInformerFactory) WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool { + informers := func() map[reflect.Type]cache.SharedIndexInformer { + f.lock.Lock() + defer f.lock.Unlock() + + informers := map[reflect.Type]cache.SharedIndexInformer{} + for informerType, informer := range f.informers { + if f.startedInformers[informerType] { + informers[informerType] = informer + } + } + return informers + }() + + res := map[reflect.Type]bool{} + for informType, informer := range informers { + res[informType] = cache.WaitForCacheSync(stopCh, informer.HasSynced) + } + return res +} + // InternalInformerFor returns the SharedIndexInformer for obj using an internal // client. func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer { @@ -95,6 +117,7 @@ func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internal type SharedInformerFactory interface { internalinterfaces.SharedInformerFactory ForResource(resource schema.GroupVersionResource) (GenericInformer, error) + WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool Apps() apps.Interface Autoscaling() autoscaling.Interface diff --git a/staging/src/k8s.io/kube-aggregator/pkg/client/informers/externalversions/factory.go b/staging/src/k8s.io/kube-aggregator/pkg/client/informers/externalversions/factory.go index a4ab4234929..96358d9f101 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/client/informers/externalversions/factory.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/client/informers/externalversions/factory.go @@ -64,6 +64,28 @@ func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) { } } +// WaitForCacheSync waits for all started informers' cache were synced. +func (f *sharedInformerFactory) WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool { + informers := func() map[reflect.Type]cache.SharedIndexInformer { + f.lock.Lock() + defer f.lock.Unlock() + + informers := map[reflect.Type]cache.SharedIndexInformer{} + for informerType, informer := range f.informers { + if f.startedInformers[informerType] { + informers[informerType] = informer + } + } + return informers + }() + + res := map[reflect.Type]bool{} + for informType, informer := range informers { + res[informType] = cache.WaitForCacheSync(stopCh, informer.HasSynced) + } + return res +} + // InternalInformerFor returns the SharedIndexInformer for obj using an internal // client. func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer { @@ -86,6 +108,7 @@ func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internal type SharedInformerFactory interface { internalinterfaces.SharedInformerFactory ForResource(resource schema.GroupVersionResource) (GenericInformer, error) + WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool Apiregistration() apiregistration.Interface } diff --git a/staging/src/k8s.io/kube-aggregator/pkg/client/informers/internalversion/factory.go b/staging/src/k8s.io/kube-aggregator/pkg/client/informers/internalversion/factory.go index 2724a133d44..b59cb1f55f5 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/client/informers/internalversion/factory.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/client/informers/internalversion/factory.go @@ -64,6 +64,28 @@ func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) { } } +// WaitForCacheSync waits for all started informers' cache were synced. +func (f *sharedInformerFactory) WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool { + informers := func() map[reflect.Type]cache.SharedIndexInformer { + f.lock.Lock() + defer f.lock.Unlock() + + informers := map[reflect.Type]cache.SharedIndexInformer{} + for informerType, informer := range f.informers { + if f.startedInformers[informerType] { + informers[informerType] = informer + } + } + return informers + }() + + res := map[reflect.Type]bool{} + for informType, informer := range informers { + res[informType] = cache.WaitForCacheSync(stopCh, informer.HasSynced) + } + return res +} + // InternalInformerFor returns the SharedIndexInformer for obj using an internal // client. func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer { @@ -86,6 +108,7 @@ func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internal type SharedInformerFactory interface { internalinterfaces.SharedInformerFactory ForResource(resource schema.GroupVersionResource) (GenericInformer, error) + WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool Apiregistration() apiregistration.Interface } diff --git a/staging/src/k8s.io/kube-apiextensions-server/pkg/client/informers/externalversions/factory.go b/staging/src/k8s.io/kube-apiextensions-server/pkg/client/informers/externalversions/factory.go index b01b1f31195..bee98db85c2 100644 --- a/staging/src/k8s.io/kube-apiextensions-server/pkg/client/informers/externalversions/factory.go +++ b/staging/src/k8s.io/kube-apiextensions-server/pkg/client/informers/externalversions/factory.go @@ -64,6 +64,28 @@ func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) { } } +// WaitForCacheSync waits for all started informers' cache were synced. +func (f *sharedInformerFactory) WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool { + informers := func() map[reflect.Type]cache.SharedIndexInformer { + f.lock.Lock() + defer f.lock.Unlock() + + informers := map[reflect.Type]cache.SharedIndexInformer{} + for informerType, informer := range f.informers { + if f.startedInformers[informerType] { + informers[informerType] = informer + } + } + return informers + }() + + res := map[reflect.Type]bool{} + for informType, informer := range informers { + res[informType] = cache.WaitForCacheSync(stopCh, informer.HasSynced) + } + return res +} + // InternalInformerFor returns the SharedIndexInformer for obj using an internal // client. func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer { @@ -86,6 +108,7 @@ func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internal type SharedInformerFactory interface { internalinterfaces.SharedInformerFactory ForResource(resource schema.GroupVersionResource) (GenericInformer, error) + WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool Apiextensions() apiextensions.Interface } diff --git a/staging/src/k8s.io/kube-apiextensions-server/pkg/client/informers/internalversion/factory.go b/staging/src/k8s.io/kube-apiextensions-server/pkg/client/informers/internalversion/factory.go index 1de68e62a1c..5da778d39ff 100644 --- a/staging/src/k8s.io/kube-apiextensions-server/pkg/client/informers/internalversion/factory.go +++ b/staging/src/k8s.io/kube-apiextensions-server/pkg/client/informers/internalversion/factory.go @@ -64,6 +64,28 @@ func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) { } } +// WaitForCacheSync waits for all started informers' cache were synced. +func (f *sharedInformerFactory) WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool { + informers := func() map[reflect.Type]cache.SharedIndexInformer { + f.lock.Lock() + defer f.lock.Unlock() + + informers := map[reflect.Type]cache.SharedIndexInformer{} + for informerType, informer := range f.informers { + if f.startedInformers[informerType] { + informers[informerType] = informer + } + } + return informers + }() + + res := map[reflect.Type]bool{} + for informType, informer := range informers { + res[informType] = cache.WaitForCacheSync(stopCh, informer.HasSynced) + } + return res +} + // InternalInformerFor returns the SharedIndexInformer for obj using an internal // client. func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer { @@ -86,6 +108,7 @@ func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internal type SharedInformerFactory interface { internalinterfaces.SharedInformerFactory ForResource(resource schema.GroupVersionResource) (GenericInformer, error) + WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool Apiextensions() apiextensions.Interface } From 3278de723add74849089e9563be20005bb018af5 Mon Sep 17 00:00:00 2001 From: Klaus Ma Date: Tue, 9 May 2017 17:04:30 +0800 Subject: [PATCH 3/3] generated client-go. --- .../src/k8s.io/client-go/informers/factory.go | 23 +++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/staging/src/k8s.io/client-go/informers/factory.go b/staging/src/k8s.io/client-go/informers/factory.go index 67e5f624b5e..4116be71dd9 100644 --- a/staging/src/k8s.io/client-go/informers/factory.go +++ b/staging/src/k8s.io/client-go/informers/factory.go @@ -73,6 +73,28 @@ func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) { } } +// WaitForCacheSync waits for all started informers' cache were synced. +func (f *sharedInformerFactory) WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool { + informers := func() map[reflect.Type]cache.SharedIndexInformer { + f.lock.Lock() + defer f.lock.Unlock() + + informers := map[reflect.Type]cache.SharedIndexInformer{} + for informerType, informer := range f.informers { + if f.startedInformers[informerType] { + informers[informerType] = informer + } + } + return informers + }() + + res := map[reflect.Type]bool{} + for informType, informer := range informers { + res[informType] = cache.WaitForCacheSync(stopCh, informer.HasSynced) + } + return res +} + // InternalInformerFor returns the SharedIndexInformer for obj using an internal // client. func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer { @@ -95,6 +117,7 @@ func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internal type SharedInformerFactory interface { internalinterfaces.SharedInformerFactory ForResource(resource schema.GroupVersionResource) (GenericInformer, error) + WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool Apps() apps.Interface Autoscaling() autoscaling.Interface