Refactor scheduler to expose predicates to cluster autoscaler

This commit is contained in:
Filip Grzadkowski 2016-05-23 14:19:46 +02:00
parent 51e308412b
commit 55a1c826bb

View File

@ -305,6 +305,64 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String,
return nil, fmt.Errorf("invalid hardPodAffinitySymmetricWeight: %d, must be in the range 0-100", f.HardPodAffinitySymmetricWeight)
}
predicateFuncs, err := f.GetPredicates(predicateKeys)
if err != nil {
return nil, err
}
priorityConfigs, err := f.GetPriorityFunctionConfigs(priorityKeys)
if err != nil {
return nil, err
}
f.Run()
r := rand.New(rand.NewSource(time.Now().UnixNano()))
algo := scheduler.NewGenericScheduler(f.schedulerCache, predicateFuncs, priorityConfigs, extenders, r)
podBackoff := podBackoff{
perPodBackoff: map[types.NamespacedName]*backoffEntry{},
clock: realClock{},
defaultDuration: 1 * time.Second,
maxDuration: 60 * time.Second,
}
return &scheduler.Config{
SchedulerCache: f.schedulerCache,
// The scheduler only needs to consider schedulable nodes.
NodeLister: f.NodeLister.NodeCondition(getNodeConditionPredicate()),
Algorithm: algo,
Binder: &binder{f.Client},
PodConditionUpdater: &podConditionUpdater{f.Client},
NextPod: func() *api.Pod {
return f.getNextPod()
},
Error: f.makeDefaultErrorFunc(&podBackoff, f.PodQueue),
StopEverything: f.StopEverything,
}, nil
}
func (f *ConfigFactory) GetPriorityFunctionConfigs(priorityKeys sets.String) ([]algorithm.PriorityConfig, error) {
pluginArgs, err := f.getPluginArgs()
if err != nil {
return nil, err
}
return getPriorityFunctionConfigs(priorityKeys, *pluginArgs)
}
func (f *ConfigFactory) GetPredicates(predicateKeys sets.String) (map[string]algorithm.FitPredicate, error) {
pluginArgs, err := f.getPluginArgs()
if err != nil {
return nil, err
}
return getFitPredicateFunctions(predicateKeys, *pluginArgs)
}
func (f *ConfigFactory) getPluginArgs() (*PluginFactoryArgs, error) {
failureDomainArgs := strings.Split(f.FailureDomains, ",")
for _, failureDomain := range failureDomainArgs {
if errs := utilvalidation.IsQualifiedName(failureDomain); len(errs) != 0 {
@ -312,7 +370,7 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String,
}
}
pluginArgs := PluginFactoryArgs{
return &PluginFactoryArgs{
PodLister: f.PodLister,
ServiceLister: f.ServiceLister,
ControllerLister: f.ControllerLister,
@ -324,17 +382,10 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String,
PVCInfo: f.PVCLister,
HardPodAffinitySymmetricWeight: f.HardPodAffinitySymmetricWeight,
FailureDomains: sets.NewString(failureDomainArgs...).List(),
}
predicateFuncs, err := getFitPredicateFunctions(predicateKeys, pluginArgs)
if err != nil {
return nil, err
}
priorityConfigs, err := getPriorityFunctionConfigs(priorityKeys, pluginArgs)
if err != nil {
return nil, err
}
}, nil
}
func (f *ConfigFactory) Run() {
// Watch and queue pods that need scheduling.
cache.NewReflector(f.createUnassignedNonTerminatedPodLW(), &api.Pod{}, f.PodQueue, 0).RunUntil(f.StopEverything)
@ -363,32 +414,6 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String,
// created by the same services or ReplicationControllers/ReplicaSets, so that it can spread them correctly.
// Cache this locally.
cache.NewReflector(f.createReplicaSetLW(), &extensions.ReplicaSet{}, f.ReplicaSetLister.Store, 0).RunUntil(f.StopEverything)
r := rand.New(rand.NewSource(time.Now().UnixNano()))
algo := scheduler.NewGenericScheduler(f.schedulerCache, predicateFuncs, priorityConfigs, extenders, r)
podBackoff := podBackoff{
perPodBackoff: map[types.NamespacedName]*backoffEntry{},
clock: realClock{},
defaultDuration: 1 * time.Second,
maxDuration: 60 * time.Second,
}
return &scheduler.Config{
SchedulerCache: f.schedulerCache,
// The scheduler only needs to consider schedulable nodes.
NodeLister: f.NodeLister.NodeCondition(getNodeConditionPredicate()),
Algorithm: algo,
Binder: &binder{f.Client},
PodConditionUpdater: &podConditionUpdater{f.Client},
NextPod: func() *api.Pod {
return f.getNextPod()
},
Error: f.makeDefaultErrorFunc(&podBackoff, f.PodQueue),
StopEverything: f.StopEverything,
}, nil
}
func (f *ConfigFactory) getNextPod() *api.Pod {