diff --git a/pkg/scheduler/factory/factory.go b/pkg/scheduler/factory/factory.go index 06fd30f726f..c600f6c0893 100644 --- a/pkg/scheduler/factory/factory.go +++ b/pkg/scheduler/factory/factory.go @@ -155,8 +155,10 @@ type Configurator struct { storageClassLister storagelistersv1.StorageClassLister // a means to list all CSINodes csiNodeLister storagelistersv1beta1.CSINodeLister - // framework has a set of plugins and the context used for running them. - framework framework.Framework + // a means to list all Nodes + nodeLister corelisters.NodeLister + // a means to list all Pods + podLister corelisters.PodLister // Close this to stop all reflectors StopEverything <-chan struct{} @@ -183,10 +185,13 @@ type Configurator struct { percentageOfNodesToScore int32 bindTimeoutSeconds int64 - // queue for pods that need scheduling - podQueue internalqueue.SchedulingQueue enableNonPreempting bool + + // framework configuration arguments. + registry framework.Registry + plugins *config.Plugins + pluginConfig []config.PluginConfig } // ConfigFactoryArgs is a set arguments passed to NewConfigFactory. @@ -222,11 +227,6 @@ func NewConfigFactory(args *ConfigFactoryArgs) *Configurator { } schedulerCache := internalcache.New(30*time.Second, stopEverything) - framework, err := framework.NewFramework(args.Registry, args.Plugins, args.PluginConfig) - if err != nil { - klog.Fatalf("error initializing the scheduling framework: %v", err) - } - // storageClassInformer is only enabled through VolumeScheduling feature gate var storageClassLister storagelistersv1.StorageClassLister if args.StorageClassInformer != nil { @@ -240,7 +240,6 @@ func NewConfigFactory(args *ConfigFactoryArgs) *Configurator { c := &Configurator{ client: args.Client, - podQueue: internalqueue.NewSchedulingQueue(stopEverything, framework), pVLister: args.PvInformer.Lister(), pVCLister: args.PvcInformer.Lister(), serviceLister: args.ServiceInformer.Lister(), @@ -248,9 +247,10 @@ func NewConfigFactory(args *ConfigFactoryArgs) *Configurator { replicaSetLister: args.ReplicaSetInformer.Lister(), statefulSetLister: args.StatefulSetInformer.Lister(), pdbLister: args.PdbInformer.Lister(), + nodeLister: args.NodeInformer.Lister(), + podLister: args.PodInformer.Lister(), storageClassLister: storageClassLister, csiNodeLister: csiNodeLister, - framework: framework, schedulerCache: schedulerCache, StopEverything: stopEverything, hardPodAffinitySymmetricWeight: args.HardPodAffinitySymmetricWeight, @@ -258,24 +258,14 @@ func NewConfigFactory(args *ConfigFactoryArgs) *Configurator { percentageOfNodesToScore: args.PercentageOfNodesToScore, bindTimeoutSeconds: args.BindTimeoutSeconds, enableNonPreempting: utilfeature.DefaultFeatureGate.Enabled(features.NonPreemptingPriority), + registry: args.Registry, + plugins: args.Plugins, + pluginConfig: args.PluginConfig, } // Setup volume binder c.volumeBinder = volumebinder.NewVolumeBinder(args.Client, args.NodeInformer, args.PvcInformer, args.PvInformer, args.StorageClassInformer, time.Duration(args.BindTimeoutSeconds)*time.Second) c.scheduledPodsHasSynced = args.PodInformer.Informer().HasSynced - // Setup cache debugger - debugger := cachedebugger.New( - args.NodeInformer.Lister(), - args.PodInformer.Lister(), - c.schedulerCache, - c.podQueue, - ) - debugger.ListenForSignal(c.StopEverything) - - go func() { - <-c.StopEverything - c.podQueue.Close() - }() return c } @@ -405,14 +395,35 @@ func (c *Configurator) CreateFromKeys(predicateKeys, priorityKeys sets.String, e return nil, err } + framework, err := framework.NewFramework(c.registry, c.plugins, c.pluginConfig) + if err != nil { + klog.Fatalf("error initializing the scheduling framework: %v", err) + } + + podQueue := internalqueue.NewSchedulingQueue(c.StopEverything, framework) + + // Setup cache debugger + debugger := cachedebugger.New( + c.nodeLister, + c.podLister, + c.schedulerCache, + podQueue, + ) + debugger.ListenForSignal(c.StopEverything) + + go func() { + <-c.StopEverything + podQueue.Close() + }() + algo := core.NewGenericScheduler( c.schedulerCache, - c.podQueue, + podQueue, predicateFuncs, predicateMetaProducer, priorityConfigs, priorityMetaProducer, - c.framework, + framework, extenders, c.volumeBinder, c.pVCLister, @@ -429,15 +440,15 @@ func (c *Configurator) CreateFromKeys(predicateKeys, priorityKeys sets.String, e GetBinder: getBinderFunc(c.client, extenders), PodConditionUpdater: &podConditionUpdater{c.client}, PodPreemptor: &podPreemptor{c.client}, - Framework: c.framework, + Framework: framework, WaitForCacheSync: func() bool { return cache.WaitForCacheSync(c.StopEverything, c.scheduledPodsHasSynced) }, - NextPod: internalqueue.MakeNextPodFunc(c.podQueue), - Error: MakeDefaultErrorFunc(c.client, c.podQueue, c.schedulerCache, c.StopEverything), + NextPod: internalqueue.MakeNextPodFunc(podQueue), + Error: MakeDefaultErrorFunc(c.client, podQueue, c.schedulerCache, c.StopEverything), StopEverything: c.StopEverything, VolumeBinder: c.volumeBinder, - SchedulingQueue: c.podQueue, + SchedulingQueue: podQueue, }, nil }