mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 19:56:01 +00:00
Merge pull request #84268 from draveness/feature/remove-wait-for-cache-sync
feat: remove WaitForCacheSync from scheduler factory
This commit is contained in:
commit
c08161a781
@ -86,10 +86,6 @@ type Config struct {
|
|||||||
// stale while they sit in a channel.
|
// stale while they sit in a channel.
|
||||||
NextPod func() *framework.PodInfo
|
NextPod func() *framework.PodInfo
|
||||||
|
|
||||||
// WaitForCacheSync waits for scheduler cache to populate.
|
|
||||||
// It returns true if it was successful, false if the controller should shutdown.
|
|
||||||
WaitForCacheSync func() bool
|
|
||||||
|
|
||||||
// Error is called if there is an error. It is passed the pod in
|
// Error is called if there is an error. It is passed the pod in
|
||||||
// question, and the error
|
// question, and the error
|
||||||
Error func(*framework.PodInfo, error)
|
Error func(*framework.PodInfo, error)
|
||||||
@ -147,8 +143,6 @@ type Configurator struct {
|
|||||||
// Close this to stop all reflectors
|
// Close this to stop all reflectors
|
||||||
StopEverything <-chan struct{}
|
StopEverything <-chan struct{}
|
||||||
|
|
||||||
scheduledPodsHasSynced cache.InformerSynced
|
|
||||||
|
|
||||||
schedulerCache internalcache.Cache
|
schedulerCache internalcache.Cache
|
||||||
|
|
||||||
// RequiredDuringScheduling affinity is not symmetric, but there is an implicit PreferredDuringScheduling affinity rule
|
// RequiredDuringScheduling affinity is not symmetric, but there is an implicit PreferredDuringScheduling affinity rule
|
||||||
@ -261,7 +255,6 @@ func NewConfigFactory(args *ConfigFactoryArgs) *Configurator {
|
|||||||
pluginConfig: args.PluginConfig,
|
pluginConfig: args.PluginConfig,
|
||||||
pluginConfigProducerRegistry: args.PluginConfigProducerRegistry,
|
pluginConfigProducerRegistry: args.PluginConfigProducerRegistry,
|
||||||
}
|
}
|
||||||
c.scheduledPodsHasSynced = args.PodInformer.Informer().HasSynced
|
|
||||||
|
|
||||||
return c
|
return c
|
||||||
}
|
}
|
||||||
@ -454,13 +447,10 @@ func (c *Configurator) CreateFromKeys(predicateKeys, priorityKeys sets.String, e
|
|||||||
)
|
)
|
||||||
|
|
||||||
return &Config{
|
return &Config{
|
||||||
SchedulerCache: c.schedulerCache,
|
SchedulerCache: c.schedulerCache,
|
||||||
Algorithm: algo,
|
Algorithm: algo,
|
||||||
GetBinder: getBinderFunc(c.client, extenders),
|
GetBinder: getBinderFunc(c.client, extenders),
|
||||||
Framework: framework,
|
Framework: framework,
|
||||||
WaitForCacheSync: func() bool {
|
|
||||||
return cache.WaitForCacheSync(c.StopEverything, c.scheduledPodsHasSynced)
|
|
||||||
},
|
|
||||||
NextPod: internalqueue.MakeNextPodFunc(podQueue),
|
NextPod: internalqueue.MakeNextPodFunc(podQueue),
|
||||||
Error: MakeDefaultErrorFunc(c.client, podQueue, c.schedulerCache),
|
Error: MakeDefaultErrorFunc(c.client, podQueue, c.schedulerCache),
|
||||||
StopEverything: c.StopEverything,
|
StopEverything: c.StopEverything,
|
||||||
|
@ -32,6 +32,7 @@ import (
|
|||||||
"k8s.io/client-go/informers"
|
"k8s.io/client-go/informers"
|
||||||
coreinformers "k8s.io/client-go/informers/core/v1"
|
coreinformers "k8s.io/client-go/informers/core/v1"
|
||||||
clientset "k8s.io/client-go/kubernetes"
|
clientset "k8s.io/client-go/kubernetes"
|
||||||
|
"k8s.io/client-go/tools/cache"
|
||||||
"k8s.io/client-go/tools/events"
|
"k8s.io/client-go/tools/events"
|
||||||
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
|
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
|
||||||
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
|
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
|
||||||
@ -95,10 +96,6 @@ type Scheduler struct {
|
|||||||
// stale while they sit in a channel.
|
// stale while they sit in a channel.
|
||||||
NextPod func() *framework.PodInfo
|
NextPod func() *framework.PodInfo
|
||||||
|
|
||||||
// WaitForCacheSync waits for scheduler cache to populate.
|
|
||||||
// It returns true if it was successful, false if the controller should shutdown.
|
|
||||||
WaitForCacheSync func() bool
|
|
||||||
|
|
||||||
// Error is called if there is an error. It is passed the pod in
|
// Error is called if there is an error. It is passed the pod in
|
||||||
// question, and the error
|
// question, and the error
|
||||||
Error func(*framework.PodInfo, error)
|
Error func(*framework.PodInfo, error)
|
||||||
@ -117,6 +114,8 @@ type Scheduler struct {
|
|||||||
|
|
||||||
// SchedulingQueue holds pods to be scheduled
|
// SchedulingQueue holds pods to be scheduled
|
||||||
SchedulingQueue internalqueue.SchedulingQueue
|
SchedulingQueue internalqueue.SchedulingQueue
|
||||||
|
|
||||||
|
scheduledPodsHasSynced func() bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// Cache returns the cache in scheduler for test to check the data in scheduler.
|
// Cache returns the cache in scheduler for test to check the data in scheduler.
|
||||||
@ -348,6 +347,7 @@ func New(client clientset.Interface,
|
|||||||
sched := NewFromConfig(config)
|
sched := NewFromConfig(config)
|
||||||
sched.podConditionUpdater = &podConditionUpdaterImpl{client}
|
sched.podConditionUpdater = &podConditionUpdaterImpl{client}
|
||||||
sched.podPreemptor = &podPreemptorImpl{client}
|
sched.podPreemptor = &podPreemptorImpl{client}
|
||||||
|
sched.scheduledPodsHasSynced = podInformer.Informer().HasSynced
|
||||||
|
|
||||||
AddAllEventHandlers(sched, options.schedulerName, informerFactory, podInformer)
|
AddAllEventHandlers(sched, options.schedulerName, informerFactory, podInformer)
|
||||||
return sched, nil
|
return sched, nil
|
||||||
@ -398,7 +398,6 @@ func NewFromConfig(config *Config) *Scheduler {
|
|||||||
GetBinder: config.GetBinder,
|
GetBinder: config.GetBinder,
|
||||||
Framework: config.Framework,
|
Framework: config.Framework,
|
||||||
NextPod: config.NextPod,
|
NextPod: config.NextPod,
|
||||||
WaitForCacheSync: config.WaitForCacheSync,
|
|
||||||
Error: config.Error,
|
Error: config.Error,
|
||||||
Recorder: config.Recorder,
|
Recorder: config.Recorder,
|
||||||
StopEverything: config.StopEverything,
|
StopEverything: config.StopEverything,
|
||||||
@ -410,7 +409,7 @@ func NewFromConfig(config *Config) *Scheduler {
|
|||||||
|
|
||||||
// Run begins watching and scheduling. It waits for cache to be synced, then starts scheduling and blocked until the context is done.
|
// Run begins watching and scheduling. It waits for cache to be synced, then starts scheduling and blocked until the context is done.
|
||||||
func (sched *Scheduler) Run(ctx context.Context) {
|
func (sched *Scheduler) Run(ctx context.Context) {
|
||||||
if !sched.WaitForCacheSync() {
|
if !cache.WaitForCacheSync(ctx.Done(), sched.scheduledPodsHasSynced) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -717,7 +717,7 @@ func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, sc
|
|||||||
return nil
|
return nil
|
||||||
}}
|
}}
|
||||||
},
|
},
|
||||||
WaitForCacheSync: func() bool {
|
scheduledPodsHasSynced: func() bool {
|
||||||
return true
|
return true
|
||||||
},
|
},
|
||||||
NextPod: func() *framework.PodInfo {
|
NextPod: func() *framework.PodInfo {
|
||||||
|
Loading…
Reference in New Issue
Block a user