Merge pull request #82896 from ahg-g/ahg-config

Refactor the scheduler's configuration logic.
This commit is contained in:
Kubernetes Prow Robot 2019-09-20 16:17:46 -07:00 committed by GitHub
commit 6d58376baa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -155,8 +155,10 @@ type Configurator struct {
storageClassLister storagelistersv1.StorageClassLister
// a means to list all CSINodes
csiNodeLister storagelistersv1beta1.CSINodeLister
// framework has a set of plugins and the context used for running them.
framework framework.Framework
// a means to list all Nodes
nodeLister corelisters.NodeLister
// a means to list all Pods
podLister corelisters.PodLister
// Close this to stop all reflectors
StopEverything <-chan struct{}
@ -183,10 +185,13 @@ type Configurator struct {
percentageOfNodesToScore int32
bindTimeoutSeconds int64
// queue for pods that need scheduling
podQueue internalqueue.SchedulingQueue
enableNonPreempting bool
// framework configuration arguments.
registry framework.Registry
plugins *config.Plugins
pluginConfig []config.PluginConfig
}
// ConfigFactoryArgs is a set arguments passed to NewConfigFactory.
@ -222,11 +227,6 @@ func NewConfigFactory(args *ConfigFactoryArgs) *Configurator {
}
schedulerCache := internalcache.New(30*time.Second, stopEverything)
framework, err := framework.NewFramework(args.Registry, args.Plugins, args.PluginConfig)
if err != nil {
klog.Fatalf("error initializing the scheduling framework: %v", err)
}
// storageClassInformer is only enabled through VolumeScheduling feature gate
var storageClassLister storagelistersv1.StorageClassLister
if args.StorageClassInformer != nil {
@ -240,7 +240,6 @@ func NewConfigFactory(args *ConfigFactoryArgs) *Configurator {
c := &Configurator{
client: args.Client,
podQueue: internalqueue.NewSchedulingQueue(stopEverything, framework),
pVLister: args.PvInformer.Lister(),
pVCLister: args.PvcInformer.Lister(),
serviceLister: args.ServiceInformer.Lister(),
@ -248,9 +247,10 @@ func NewConfigFactory(args *ConfigFactoryArgs) *Configurator {
replicaSetLister: args.ReplicaSetInformer.Lister(),
statefulSetLister: args.StatefulSetInformer.Lister(),
pdbLister: args.PdbInformer.Lister(),
nodeLister: args.NodeInformer.Lister(),
podLister: args.PodInformer.Lister(),
storageClassLister: storageClassLister,
csiNodeLister: csiNodeLister,
framework: framework,
schedulerCache: schedulerCache,
StopEverything: stopEverything,
hardPodAffinitySymmetricWeight: args.HardPodAffinitySymmetricWeight,
@ -258,24 +258,14 @@ func NewConfigFactory(args *ConfigFactoryArgs) *Configurator {
percentageOfNodesToScore: args.PercentageOfNodesToScore,
bindTimeoutSeconds: args.BindTimeoutSeconds,
enableNonPreempting: utilfeature.DefaultFeatureGate.Enabled(features.NonPreemptingPriority),
registry: args.Registry,
plugins: args.Plugins,
pluginConfig: args.PluginConfig,
}
// Setup volume binder
c.volumeBinder = volumebinder.NewVolumeBinder(args.Client, args.NodeInformer, args.PvcInformer, args.PvInformer, args.StorageClassInformer, time.Duration(args.BindTimeoutSeconds)*time.Second)
c.scheduledPodsHasSynced = args.PodInformer.Informer().HasSynced
// Setup cache debugger
debugger := cachedebugger.New(
args.NodeInformer.Lister(),
args.PodInformer.Lister(),
c.schedulerCache,
c.podQueue,
)
debugger.ListenForSignal(c.StopEverything)
go func() {
<-c.StopEverything
c.podQueue.Close()
}()
return c
}
@ -405,14 +395,35 @@ func (c *Configurator) CreateFromKeys(predicateKeys, priorityKeys sets.String, e
return nil, err
}
framework, err := framework.NewFramework(c.registry, c.plugins, c.pluginConfig)
if err != nil {
klog.Fatalf("error initializing the scheduling framework: %v", err)
}
podQueue := internalqueue.NewSchedulingQueue(c.StopEverything, framework)
// Setup cache debugger
debugger := cachedebugger.New(
c.nodeLister,
c.podLister,
c.schedulerCache,
podQueue,
)
debugger.ListenForSignal(c.StopEverything)
go func() {
<-c.StopEverything
podQueue.Close()
}()
algo := core.NewGenericScheduler(
c.schedulerCache,
c.podQueue,
podQueue,
predicateFuncs,
predicateMetaProducer,
priorityConfigs,
priorityMetaProducer,
c.framework,
framework,
extenders,
c.volumeBinder,
c.pVCLister,
@ -429,15 +440,15 @@ func (c *Configurator) CreateFromKeys(predicateKeys, priorityKeys sets.String, e
GetBinder: getBinderFunc(c.client, extenders),
PodConditionUpdater: &podConditionUpdater{c.client},
PodPreemptor: &podPreemptor{c.client},
Framework: c.framework,
Framework: framework,
WaitForCacheSync: func() bool {
return cache.WaitForCacheSync(c.StopEverything, c.scheduledPodsHasSynced)
},
NextPod: internalqueue.MakeNextPodFunc(c.podQueue),
Error: MakeDefaultErrorFunc(c.client, c.podQueue, c.schedulerCache, c.StopEverything),
NextPod: internalqueue.MakeNextPodFunc(podQueue),
Error: MakeDefaultErrorFunc(c.client, podQueue, c.schedulerCache, c.StopEverything),
StopEverything: c.StopEverything,
VolumeBinder: c.volumeBinder,
SchedulingQueue: c.podQueue,
SchedulingQueue: podQueue,
}, nil
}