From 55a1c826bbd71388672aa543586d1f8431daf1c0 Mon Sep 17 00:00:00 2001 From: Filip Grzadkowski Date: Mon, 23 May 2016 14:19:46 +0200 Subject: [PATCH] Refactor scheduler to expose predicates to cluster autoscaler --- plugin/pkg/scheduler/factory/factory.go | 99 ++++++++++++++++--------- 1 file changed, 62 insertions(+), 37 deletions(-) diff --git a/plugin/pkg/scheduler/factory/factory.go b/plugin/pkg/scheduler/factory/factory.go index 50f8a59cfbf..d4f03b1bc17 100644 --- a/plugin/pkg/scheduler/factory/factory.go +++ b/plugin/pkg/scheduler/factory/factory.go @@ -305,6 +305,64 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String, return nil, fmt.Errorf("invalid hardPodAffinitySymmetricWeight: %d, must be in the range 0-100", f.HardPodAffinitySymmetricWeight) } + predicateFuncs, err := f.GetPredicates(predicateKeys) + if err != nil { + return nil, err + } + + priorityConfigs, err := f.GetPriorityFunctionConfigs(priorityKeys) + if err != nil { + return nil, err + } + + f.Run() + + r := rand.New(rand.NewSource(time.Now().UnixNano())) + + algo := scheduler.NewGenericScheduler(f.schedulerCache, predicateFuncs, priorityConfigs, extenders, r) + + podBackoff := podBackoff{ + perPodBackoff: map[types.NamespacedName]*backoffEntry{}, + clock: realClock{}, + + defaultDuration: 1 * time.Second, + maxDuration: 60 * time.Second, + } + + return &scheduler.Config{ + SchedulerCache: f.schedulerCache, + // The scheduler only needs to consider schedulable nodes. + NodeLister: f.NodeLister.NodeCondition(getNodeConditionPredicate()), + Algorithm: algo, + Binder: &binder{f.Client}, + PodConditionUpdater: &podConditionUpdater{f.Client}, + NextPod: func() *api.Pod { + return f.getNextPod() + }, + Error: f.makeDefaultErrorFunc(&podBackoff, f.PodQueue), + StopEverything: f.StopEverything, + }, nil +} + +func (f *ConfigFactory) GetPriorityFunctionConfigs(priorityKeys sets.String) ([]algorithm.PriorityConfig, error) { + pluginArgs, err := f.getPluginArgs() + if err != nil { + return nil, err + } + + return getPriorityFunctionConfigs(priorityKeys, *pluginArgs) +} + +func (f *ConfigFactory) GetPredicates(predicateKeys sets.String) (map[string]algorithm.FitPredicate, error) { + pluginArgs, err := f.getPluginArgs() + if err != nil { + return nil, err + } + + return getFitPredicateFunctions(predicateKeys, *pluginArgs) +} + +func (f *ConfigFactory) getPluginArgs() (*PluginFactoryArgs, error) { failureDomainArgs := strings.Split(f.FailureDomains, ",") for _, failureDomain := range failureDomainArgs { if errs := utilvalidation.IsQualifiedName(failureDomain); len(errs) != 0 { @@ -312,7 +370,7 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String, } } - pluginArgs := PluginFactoryArgs{ + return &PluginFactoryArgs{ PodLister: f.PodLister, ServiceLister: f.ServiceLister, ControllerLister: f.ControllerLister, @@ -324,17 +382,10 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String, PVCInfo: f.PVCLister, HardPodAffinitySymmetricWeight: f.HardPodAffinitySymmetricWeight, FailureDomains: sets.NewString(failureDomainArgs...).List(), - } - predicateFuncs, err := getFitPredicateFunctions(predicateKeys, pluginArgs) - if err != nil { - return nil, err - } - - priorityConfigs, err := getPriorityFunctionConfigs(priorityKeys, pluginArgs) - if err != nil { - return nil, err - } + }, nil +} +func (f *ConfigFactory) Run() { // Watch and queue pods that need scheduling. cache.NewReflector(f.createUnassignedNonTerminatedPodLW(), &api.Pod{}, f.PodQueue, 0).RunUntil(f.StopEverything) @@ -363,32 +414,6 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String, // created by the same services or ReplicationControllers/ReplicaSets, so that it can spread them correctly. // Cache this locally. cache.NewReflector(f.createReplicaSetLW(), &extensions.ReplicaSet{}, f.ReplicaSetLister.Store, 0).RunUntil(f.StopEverything) - - r := rand.New(rand.NewSource(time.Now().UnixNano())) - - algo := scheduler.NewGenericScheduler(f.schedulerCache, predicateFuncs, priorityConfigs, extenders, r) - - podBackoff := podBackoff{ - perPodBackoff: map[types.NamespacedName]*backoffEntry{}, - clock: realClock{}, - - defaultDuration: 1 * time.Second, - maxDuration: 60 * time.Second, - } - - return &scheduler.Config{ - SchedulerCache: f.schedulerCache, - // The scheduler only needs to consider schedulable nodes. - NodeLister: f.NodeLister.NodeCondition(getNodeConditionPredicate()), - Algorithm: algo, - Binder: &binder{f.Client}, - PodConditionUpdater: &podConditionUpdater{f.Client}, - NextPod: func() *api.Pod { - return f.getNextPod() - }, - Error: f.makeDefaultErrorFunc(&podBackoff, f.PodQueue), - StopEverything: f.StopEverything, - }, nil } func (f *ConfigFactory) getNextPod() *api.Pod {