mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-22 11:21:47 +00:00
Initialize scheduler cache with assigned non-terminated pods before scheduling.
This commit is contained in:
parent
20fa30e4b5
commit
c78faec4ff
@ -127,6 +127,28 @@ func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
|
||||
}
|
||||
}
|
||||
|
||||
// WaitForCacheSync waits for all started informers' cache were synced.
|
||||
func (f *sharedInformerFactory) WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool {
|
||||
informers := func()map[reflect.Type]cache.SharedIndexInformer{
|
||||
f.lock.Lock()
|
||||
defer f.lock.Unlock()
|
||||
|
||||
informers := map[reflect.Type]cache.SharedIndexInformer{}
|
||||
for informerType, informer := range f.informers {
|
||||
if f.startedInformers[informerType] {
|
||||
informers[informerType] = informer
|
||||
}
|
||||
}
|
||||
return informers
|
||||
}()
|
||||
|
||||
res := map[reflect.Type]bool{}
|
||||
for informType, informer := range informers {
|
||||
res[informType] = cache.WaitForCacheSync(stopCh, informer.HasSynced)
|
||||
}
|
||||
return res
|
||||
}
|
||||
|
||||
// InternalInformerFor returns the SharedIndexInformer for obj using an internal
|
||||
// client.
|
||||
func (f *sharedInformerFactory) InformerFor(obj {{.runtimeObject|raw}}, newFunc {{.interfacesNewInformerFunc|raw}}) {{.cacheSharedIndexInformer|raw}} {
|
||||
@ -152,6 +174,7 @@ var sharedInformerFactoryInterface = `
|
||||
type SharedInformerFactory interface {
|
||||
{{.informerFactoryInterface|raw}}
|
||||
ForResource(resource {{.schemaGroupVersionResource|raw}}) (GenericInformer, error)
|
||||
WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool
|
||||
|
||||
{{$gvInterfaces := .gvInterfaces}}
|
||||
{{range $groupName, $group := .groupVersions}}{{$groupName}}() {{index $gvInterfaces $groupName|raw}}
|
||||
|
@ -94,6 +94,8 @@ func Run(s *options.SchedulerServer) error {
|
||||
stop := make(chan struct{})
|
||||
defer close(stop)
|
||||
informerFactory.Start(stop)
|
||||
// Waiting for all cache to sync before scheduling.
|
||||
informerFactory.WaitForCacheSync(stop)
|
||||
|
||||
run := func(_ <-chan struct{}) {
|
||||
sched.Run()
|
||||
|
@ -380,6 +380,9 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String,
|
||||
Algorithm: algo,
|
||||
Binder: &binder{f.client},
|
||||
PodConditionUpdater: &podConditionUpdater{f.client},
|
||||
WaitForCacheSync: func() bool {
|
||||
return cache.WaitForCacheSync(f.StopEverything, f.scheduledPodPopulator.HasSynced)
|
||||
},
|
||||
NextPod: func() *v1.Pod {
|
||||
return f.getNextPod()
|
||||
},
|
||||
|
@ -110,6 +110,10 @@ type Config struct {
|
||||
// stale while they sit in a channel.
|
||||
NextPod func() *v1.Pod
|
||||
|
||||
// WaitForCacheSync waits for scheduler cache to populate.
|
||||
// It returns true if it was successful, false if the controller should shutdown.
|
||||
WaitForCacheSync func() bool
|
||||
|
||||
// Error is called if there is an error. It is passed the pod in
|
||||
// question, and the error
|
||||
Error func(*v1.Pod, error)
|
||||
@ -140,8 +144,12 @@ func NewFromConfigurator(c Configurator, modifiers ...func(c *Config)) (*Schedul
|
||||
return s, nil
|
||||
}
|
||||
|
||||
// Run begins watching and scheduling. It starts a goroutine and returns immediately.
|
||||
// Run begins watching and scheduling. It waits for cache to be synced, then starts a goroutine and returns immediately.
|
||||
func (sched *Scheduler) Run() {
|
||||
if !sched.config.WaitForCacheSync() {
|
||||
return
|
||||
}
|
||||
|
||||
go wait.Until(sched.scheduleOne, 0, sched.config.StopEverything)
|
||||
}
|
||||
|
||||
|
@ -543,6 +543,9 @@ func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, sc
|
||||
bindingChan <- b
|
||||
return nil
|
||||
}},
|
||||
WaitForCacheSync: func() bool {
|
||||
return true
|
||||
},
|
||||
NextPod: func() *v1.Pod {
|
||||
return clientcache.Pop(queuedPodStore).(*v1.Pod)
|
||||
},
|
||||
|
Loading…
Reference in New Issue
Block a user