Refactor the scheduler's configuration logic.

This PR modifies the scheduler's configurator to allow for instantiating the framework at a later stage in the configuration. Specifically at the point where we know exactly which predicates/priorities need to be configured.

This is necessary to allow converting predicates/priorities configuration into a plugin configuration to facilitate framework migration.
This commit is contained in:
Abdullah Gharaibeh 2019-09-19 15:31:31 -04:00
parent 908fc3b98e
commit dc28cfca72

View File

@ -155,8 +155,10 @@ type Configurator struct {
storageClassLister storagelistersv1.StorageClassLister storageClassLister storagelistersv1.StorageClassLister
// a means to list all CSINodes // a means to list all CSINodes
csiNodeLister storagelistersv1beta1.CSINodeLister csiNodeLister storagelistersv1beta1.CSINodeLister
// framework has a set of plugins and the context used for running them. // a means to list all Nodes
framework framework.Framework nodeLister corelisters.NodeLister
// a means to list all Pods
podLister corelisters.PodLister
// Close this to stop all reflectors // Close this to stop all reflectors
StopEverything <-chan struct{} StopEverything <-chan struct{}
@ -183,10 +185,13 @@ type Configurator struct {
percentageOfNodesToScore int32 percentageOfNodesToScore int32
bindTimeoutSeconds int64 bindTimeoutSeconds int64
// queue for pods that need scheduling
podQueue internalqueue.SchedulingQueue
enableNonPreempting bool enableNonPreempting bool
// framework configuration arguments.
registry framework.Registry
plugins *config.Plugins
pluginConfig []config.PluginConfig
} }
// ConfigFactoryArgs is a set arguments passed to NewConfigFactory. // ConfigFactoryArgs is a set arguments passed to NewConfigFactory.
@ -222,11 +227,6 @@ func NewConfigFactory(args *ConfigFactoryArgs) *Configurator {
} }
schedulerCache := internalcache.New(30*time.Second, stopEverything) schedulerCache := internalcache.New(30*time.Second, stopEverything)
framework, err := framework.NewFramework(args.Registry, args.Plugins, args.PluginConfig)
if err != nil {
klog.Fatalf("error initializing the scheduling framework: %v", err)
}
// storageClassInformer is only enabled through VolumeScheduling feature gate // storageClassInformer is only enabled through VolumeScheduling feature gate
var storageClassLister storagelistersv1.StorageClassLister var storageClassLister storagelistersv1.StorageClassLister
if args.StorageClassInformer != nil { if args.StorageClassInformer != nil {
@ -240,7 +240,6 @@ func NewConfigFactory(args *ConfigFactoryArgs) *Configurator {
c := &Configurator{ c := &Configurator{
client: args.Client, client: args.Client,
podQueue: internalqueue.NewSchedulingQueue(stopEverything, framework),
pVLister: args.PvInformer.Lister(), pVLister: args.PvInformer.Lister(),
pVCLister: args.PvcInformer.Lister(), pVCLister: args.PvcInformer.Lister(),
serviceLister: args.ServiceInformer.Lister(), serviceLister: args.ServiceInformer.Lister(),
@ -248,9 +247,10 @@ func NewConfigFactory(args *ConfigFactoryArgs) *Configurator {
replicaSetLister: args.ReplicaSetInformer.Lister(), replicaSetLister: args.ReplicaSetInformer.Lister(),
statefulSetLister: args.StatefulSetInformer.Lister(), statefulSetLister: args.StatefulSetInformer.Lister(),
pdbLister: args.PdbInformer.Lister(), pdbLister: args.PdbInformer.Lister(),
nodeLister: args.NodeInformer.Lister(),
podLister: args.PodInformer.Lister(),
storageClassLister: storageClassLister, storageClassLister: storageClassLister,
csiNodeLister: csiNodeLister, csiNodeLister: csiNodeLister,
framework: framework,
schedulerCache: schedulerCache, schedulerCache: schedulerCache,
StopEverything: stopEverything, StopEverything: stopEverything,
hardPodAffinitySymmetricWeight: args.HardPodAffinitySymmetricWeight, hardPodAffinitySymmetricWeight: args.HardPodAffinitySymmetricWeight,
@ -258,24 +258,14 @@ func NewConfigFactory(args *ConfigFactoryArgs) *Configurator {
percentageOfNodesToScore: args.PercentageOfNodesToScore, percentageOfNodesToScore: args.PercentageOfNodesToScore,
bindTimeoutSeconds: args.BindTimeoutSeconds, bindTimeoutSeconds: args.BindTimeoutSeconds,
enableNonPreempting: utilfeature.DefaultFeatureGate.Enabled(features.NonPreemptingPriority), enableNonPreempting: utilfeature.DefaultFeatureGate.Enabled(features.NonPreemptingPriority),
registry: args.Registry,
plugins: args.Plugins,
pluginConfig: args.PluginConfig,
} }
// Setup volume binder // Setup volume binder
c.volumeBinder = volumebinder.NewVolumeBinder(args.Client, args.NodeInformer, args.PvcInformer, args.PvInformer, args.StorageClassInformer, time.Duration(args.BindTimeoutSeconds)*time.Second) c.volumeBinder = volumebinder.NewVolumeBinder(args.Client, args.NodeInformer, args.PvcInformer, args.PvInformer, args.StorageClassInformer, time.Duration(args.BindTimeoutSeconds)*time.Second)
c.scheduledPodsHasSynced = args.PodInformer.Informer().HasSynced c.scheduledPodsHasSynced = args.PodInformer.Informer().HasSynced
// Setup cache debugger
debugger := cachedebugger.New(
args.NodeInformer.Lister(),
args.PodInformer.Lister(),
c.schedulerCache,
c.podQueue,
)
debugger.ListenForSignal(c.StopEverything)
go func() {
<-c.StopEverything
c.podQueue.Close()
}()
return c return c
} }
@ -405,14 +395,35 @@ func (c *Configurator) CreateFromKeys(predicateKeys, priorityKeys sets.String, e
return nil, err return nil, err
} }
framework, err := framework.NewFramework(c.registry, c.plugins, c.pluginConfig)
if err != nil {
klog.Fatalf("error initializing the scheduling framework: %v", err)
}
podQueue := internalqueue.NewSchedulingQueue(c.StopEverything, framework)
// Setup cache debugger
debugger := cachedebugger.New(
c.nodeLister,
c.podLister,
c.schedulerCache,
podQueue,
)
debugger.ListenForSignal(c.StopEverything)
go func() {
<-c.StopEverything
podQueue.Close()
}()
algo := core.NewGenericScheduler( algo := core.NewGenericScheduler(
c.schedulerCache, c.schedulerCache,
c.podQueue, podQueue,
predicateFuncs, predicateFuncs,
predicateMetaProducer, predicateMetaProducer,
priorityConfigs, priorityConfigs,
priorityMetaProducer, priorityMetaProducer,
c.framework, framework,
extenders, extenders,
c.volumeBinder, c.volumeBinder,
c.pVCLister, c.pVCLister,
@ -429,15 +440,15 @@ func (c *Configurator) CreateFromKeys(predicateKeys, priorityKeys sets.String, e
GetBinder: getBinderFunc(c.client, extenders), GetBinder: getBinderFunc(c.client, extenders),
PodConditionUpdater: &podConditionUpdater{c.client}, PodConditionUpdater: &podConditionUpdater{c.client},
PodPreemptor: &podPreemptor{c.client}, PodPreemptor: &podPreemptor{c.client},
Framework: c.framework, Framework: framework,
WaitForCacheSync: func() bool { WaitForCacheSync: func() bool {
return cache.WaitForCacheSync(c.StopEverything, c.scheduledPodsHasSynced) return cache.WaitForCacheSync(c.StopEverything, c.scheduledPodsHasSynced)
}, },
NextPod: internalqueue.MakeNextPodFunc(c.podQueue), NextPod: internalqueue.MakeNextPodFunc(podQueue),
Error: MakeDefaultErrorFunc(c.client, c.podQueue, c.schedulerCache, c.StopEverything), Error: MakeDefaultErrorFunc(c.client, podQueue, c.schedulerCache, c.StopEverything),
StopEverything: c.StopEverything, StopEverything: c.StopEverything,
VolumeBinder: c.volumeBinder, VolumeBinder: c.volumeBinder,
SchedulingQueue: c.podQueue, SchedulingQueue: podQueue,
}, nil }, nil
} }