diff --git a/cmd/integration/integration.go b/cmd/integration/integration.go index 60d20157439..c539c73c649 100644 --- a/cmd/integration/integration.go +++ b/cmd/integration/integration.go @@ -169,7 +169,7 @@ func startComponents(manifestURL string) (apiServerURL string) { // Scheduler schedulerConfigFactory := factory.NewConfigFactory(cl) - schedulerConfig, err := schedulerConfigFactory.Create(nil, nil) + schedulerConfig, err := schedulerConfigFactory.Create() if err != nil { glog.Fatal("Couldn't create scheduler config: %v", err) } diff --git a/pkg/standalone/standalone.go b/pkg/standalone/standalone.go index 78eec46793b..1fd1dae54ce 100644 --- a/pkg/standalone/standalone.go +++ b/pkg/standalone/standalone.go @@ -38,6 +38,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler" + _ "github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler/algorithmprovider" "github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler/factory" "github.com/golang/glog" @@ -105,7 +106,7 @@ func RunApiServer(cl *client.Client, etcdClient tools.EtcdClient, addr string, p func RunScheduler(cl *client.Client) { // Scheduler schedulerConfigFactory := factory.NewConfigFactory(cl) - schedulerConfig, err := schedulerConfigFactory.Create(nil, nil) + schedulerConfig, err := schedulerConfigFactory.Create() if err != nil { glog.Fatal("Couldn't create scheduler config: %v", err) } diff --git a/plugin/cmd/kube-scheduler/scheduler.go b/plugin/cmd/kube-scheduler/scheduler.go index 75af1276ea4..35c250b448f 100644 --- a/plugin/cmd/kube-scheduler/scheduler.go +++ b/plugin/cmd/kube-scheduler/scheduler.go @@ -29,6 +29,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/version/verflag" "github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler" + _ "github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler/algorithmprovider" "github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler/factory" "github.com/golang/glog" ) @@ -61,7 +62,7 @@ func main() { go http.ListenAndServe(net.JoinHostPort(address.String(), strconv.Itoa(*port)), nil) configFactory := factory.NewConfigFactory(kubeClient) - config, err := configFactory.Create(nil, nil) + config, err := configFactory.Create() if err != nil { glog.Fatalf("Failed to create scheduler configuration: %v", err) } diff --git a/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go b/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go new file mode 100644 index 00000000000..d91f2d132c0 --- /dev/null +++ b/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go @@ -0,0 +1,52 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// This is the default algorithm provider for the scheduler. +package defaults + +import ( + algorithm "github.com/GoogleCloudPlatform/kubernetes/pkg/scheduler" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + "github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler/factory" +) + +func init() { + factory.RegisterAlgorithmProvider(factory.DefaultProvider, defaultPredicates(), defaultPriorities()) +} + +func defaultPredicates() util.StringSet { + return util.NewStringSet( + // Fit is defined based on the absence of port conflicts. + factory.RegisterFitPredicate("PodFitsPorts", algorithm.PodFitsPorts), + // Fit is determined by resource availability + factory.RegisterFitPredicate("PodFitsResources", algorithm.NewResourceFitPredicate(factory.MinionLister)), + // Fit is determined by non-conflicting disk volumes + factory.RegisterFitPredicate("NoDiskConflict", algorithm.NoDiskConflict), + // Fit is determined by node selector query + factory.RegisterFitPredicate("MatchNodeSelector", algorithm.NewSelectorMatchPredicate(factory.MinionLister)), + ) +} + +func defaultPriorities() util.StringSet { + return util.NewStringSet( + // Prioritize nodes by least requested utilization. + factory.RegisterPriorityFunction("LeastRequestedPriority", algorithm.LeastRequestedPriority, 1), + // spreads pods by minimizing the number of pods on the same minion with the same labels. + factory.RegisterPriorityFunction("SpreadingPriority", algorithm.CalculateSpreadPriority, 1), + // EqualPriority is a prioritizer function that gives an equal weight of one to all minions + factory.RegisterPriorityFunction("EqualPriority", algorithm.EqualPriority, 0), + ) +} diff --git a/plugin/pkg/scheduler/algorithmprovider/plugins.go b/plugin/pkg/scheduler/algorithmprovider/plugins.go new file mode 100644 index 00000000000..d534b05c0bf --- /dev/null +++ b/plugin/pkg/scheduler/algorithmprovider/plugins.go @@ -0,0 +1,22 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// This package is used to register algorithm provider plugins. +package algorithmprovider + +import ( + _ "github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler/algorithmprovider/defaults" +) diff --git a/plugin/pkg/scheduler/algorithmprovider/plugins_test.go b/plugin/pkg/scheduler/algorithmprovider/plugins_test.go new file mode 100644 index 00000000000..82db467cb93 --- /dev/null +++ b/plugin/pkg/scheduler/algorithmprovider/plugins_test.go @@ -0,0 +1,65 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package algorithmprovider + +import ( + "testing" + + "github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler/factory" +) + +var ( + algorithmProviderNames = []string{ + factory.DefaultProvider, + } +) + +func TestDefaultConfigExists(t *testing.T) { + p, err := factory.GetAlgorithmProvider(factory.DefaultProvider) + if err != nil { + t.Errorf("error retrivieving default provider: %v", err) + } + if p == nil { + t.Error("algorithm provider config should not be nil") + } + if len(p.FitPredicateKeys) == 0 { + t.Error("default algorithm provider shouldn't have 0 fit predicates") + } +} + +func TestAlgorithmProviders(t *testing.T) { + for _, pn := range algorithmProviderNames { + p, err := factory.GetAlgorithmProvider(pn) + if err != nil { + t.Errorf("error retrivieving '%s' provider: %v", pn, err) + break + } + if len(p.PriorityFunctionKeys) == 0 { + t.Error("%s algorithm provider shouldn't have 0 priority functions", pn) + } + for _, pf := range p.PriorityFunctionKeys.List() { + if !factory.IsPriorityFunctionRegistered(pf) { + t.Errorf("priority function %s is not registerd but is used in the %s algorithm provider", pf, pn) + } + } + for _, fp := range p.FitPredicateKeys.List() { + if !factory.IsFitPredicateRegistered(fp) { + t.Errorf("fit predicate %s is not registerd but is used in the %s algorithm provider", fp, pn) + } + } + } +} diff --git a/plugin/pkg/scheduler/factory/factory.go b/plugin/pkg/scheduler/factory/factory.go index cd3fc51f389..e2b44f17fc4 100644 --- a/plugin/pkg/scheduler/factory/factory.go +++ b/plugin/pkg/scheduler/factory/factory.go @@ -37,6 +37,11 @@ import ( "github.com/golang/glog" ) +var ( + PodLister = &storeToPodLister{cache.NewStore()} + MinionLister = &storeToNodeLister{cache.NewStore()} +) + // ConfigFactory knows how to fill out a scheduler config with its support functions. type ConfigFactory struct { Client *client.Client @@ -46,71 +51,66 @@ type ConfigFactory struct { PodLister *storeToPodLister // a means to list all minions MinionLister *storeToNodeLister - // map of strings to predicate functions to be used - // to filter the minions for scheduling pods - PredicateMap map[string]algorithm.FitPredicate - // map of strings to priority config to be used - // to prioritize the filtered minions for scheduling pods - PriorityMap map[string]algorithm.PriorityConfig } // NewConfigFactory initializes the factory. func NewConfigFactory(client *client.Client) *ConfigFactory { - // initialize the factory struct - factory := &ConfigFactory{ + return &ConfigFactory{ Client: client, PodQueue: cache.NewFIFO(), - PodLister: &storeToPodLister{cache.NewStore()}, - MinionLister: &storeToNodeLister{cache.NewStore()}, - PredicateMap: make(map[string]algorithm.FitPredicate), - PriorityMap: make(map[string]algorithm.PriorityConfig), + PodLister: PodLister, + MinionLister: MinionLister, } - - factory.addDefaultPredicates() - factory.addDefaultPriorities() - - return factory } -// Create creates a scheduler and all support functions. -func (factory *ConfigFactory) Create(predicateKeys, priorityKeys []string) (*scheduler.Config, error) { - if predicateKeys == nil { - glog.V(2).Infof("Custom predicates list not provided, using default predicates") - predicateKeys = []string{"PodFitsPorts", "PodFitsResources", "NoDiskConflict", "MatchNodeSelector"} - } - predicateFuncs, err := factory.getPredicateFunctions(predicateKeys) +// Create creates a scheduler with the default algorithm provider. +func (f *ConfigFactory) Create() (*scheduler.Config, error) { + return f.CreateFromProvider(DefaultProvider) +} + +// CreateFromProvider creates a scheduler from the name of a registered algorithm provider. +func (f *ConfigFactory) CreateFromProvider(providerName string) (*scheduler.Config, error) { + glog.V(2).Infof("creating scheduler from algorithm provider '%v'", providerName) + provider, err := GetAlgorithmProvider(providerName) if err != nil { return nil, err } - if priorityKeys == nil { - glog.V(2).Infof("Custom priority list not provided, using default priority: LeastRequestedPriority") - priorityKeys = []string{"LeastRequestedPriority"} + return f.CreateFromKeys(provider.FitPredicateKeys, provider.PriorityFunctionKeys) +} + +// CreateFromKeys creates a scheduler from a set of registered fit predicate keys and priority keys. +func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys util.StringSet) (*scheduler.Config, error) { + glog.V(2).Infof("creating scheduler with fit predicates '%v' and priority functions '%v", predicateKeys, priorityKeys) + predicateFuncs, err := getFitPredicateFunctions(predicateKeys) + if err != nil { + return nil, err } - priorityConfigs, err := factory.getPriorityConfigs(priorityKeys) + + priorityConfigs, err := getPriorityFunctionConfigs(priorityKeys) if err != nil { return nil, err } // Watch and queue pods that need scheduling. - cache.NewReflector(factory.createUnassignedPodLW(), &api.Pod{}, factory.PodQueue).Run() + cache.NewReflector(f.createUnassignedPodLW(), &api.Pod{}, f.PodQueue).Run() // Watch and cache all running pods. Scheduler needs to find all pods // so it knows where it's safe to place a pod. Cache this locally. - cache.NewReflector(factory.createAssignedPodLW(), &api.Pod{}, factory.PodLister.Store).Run() + cache.NewReflector(f.createAssignedPodLW(), &api.Pod{}, f.PodLister.Store).Run() // Watch minions. // Minions may be listed frequently, so provide a local up-to-date cache. if false { // Disable this code until minions support watches. - cache.NewReflector(factory.createMinionLW(), &api.Node{}, factory.MinionLister.Store).Run() + cache.NewReflector(f.createMinionLW(), &api.Node{}, f.MinionLister.Store).Run() } else { - cache.NewPoller(factory.pollMinions, 10*time.Second, factory.MinionLister.Store).Run() + cache.NewPoller(f.pollMinions, 10*time.Second, f.MinionLister.Store).Run() } r := rand.New(rand.NewSource(time.Now().UnixNano())) - algo := algorithm.NewGenericScheduler(predicateFuncs, priorityConfigs, factory.PodLister, r) + algo := algorithm.NewGenericScheduler(predicateFuncs, priorityConfigs, f.PodLister, r) podBackoff := podBackoff{ perPodBackoff: map[string]*backoffEntry{}, @@ -118,79 +118,18 @@ func (factory *ConfigFactory) Create(predicateKeys, priorityKeys []string) (*sch } return &scheduler.Config{ - MinionLister: factory.MinionLister, + MinionLister: f.MinionLister, Algorithm: algo, - Binder: &binder{factory.Client}, + Binder: &binder{f.Client}, NextPod: func() *api.Pod { - pod := factory.PodQueue.Pop().(*api.Pod) + pod := f.PodQueue.Pop().(*api.Pod) glog.V(2).Infof("glog.v2 --> About to try and schedule pod %v", pod.Name) return pod }, - Error: factory.makeDefaultErrorFunc(&podBackoff, factory.PodQueue), + Error: f.makeDefaultErrorFunc(&podBackoff, f.PodQueue), }, nil } -func (factory *ConfigFactory) getPredicateFunctions(keys []string) ([]algorithm.FitPredicate, error) { - predicates := []algorithm.FitPredicate{} - for _, key := range keys { - function, ok := factory.PredicateMap[key] - if !ok { - return nil, fmt.Errorf("Invalid predicate key %s specified - no corresponding function found", key) - } - predicates = append(predicates, function) - } - return predicates, nil -} - -func (factory *ConfigFactory) getPriorityConfigs(keys []string) ([]algorithm.PriorityConfig, error) { - configs := []algorithm.PriorityConfig{} - for _, key := range keys { - config, ok := factory.PriorityMap[key] - if !ok { - return nil, fmt.Errorf("Invalid priority key %s specified - no corresponding function found", key) - } - configs = append(configs, config) - } - return configs, nil -} - -func (factory *ConfigFactory) addDefaultPredicates() { - // Fit is defined based on the absence of port conflicts. - factory.AddPredicate("PodFitsPorts", algorithm.PodFitsPorts) - // Fit is determined by resource availability - factory.AddPredicate("PodFitsResources", algorithm.NewResourceFitPredicate(factory.MinionLister)) - // Fit is determined by non-conflicting disk volumes - factory.AddPredicate("NoDiskConflict", algorithm.NoDiskConflict) - // Fit is determined by node selector query - factory.AddPredicate("MatchNodeSelector", algorithm.NewSelectorMatchPredicate(factory.MinionLister)) -} - -func (factory *ConfigFactory) AddPredicate(key string, function algorithm.FitPredicate) { - factory.PredicateMap[key] = function -} - -func (factory *ConfigFactory) addDefaultPriorities() { - // Prioritize nodes by least requested utilization. - factory.AddPriority("LeastRequestedPriority", algorithm.LeastRequestedPriority, 1) - // spreads pods by minimizing the number of pods on the same minion with the same labels. - factory.AddPriority("SpreadingPriority", algorithm.CalculateSpreadPriority, 1) - // EqualPriority is a prioritizer function that gives an equal weight of one to all minions - factory.AddPriority("EqualPriority", algorithm.EqualPriority, 0) -} - -func (factory *ConfigFactory) AddPriority(key string, function algorithm.PriorityFunction, weight int) { - factory.PriorityMap[key] = algorithm.PriorityConfig{Function: function, Weight: weight} -} - -func (factory *ConfigFactory) SetWeight(key string, weight int) { - config, ok := factory.PriorityMap[key] - if !ok { - glog.Errorf("Invalid priority key %s specified - no corresponding function found", key) - return - } - config.Weight = weight -} - type listWatch struct { client *client.Client fieldSelector labels.Selector diff --git a/plugin/pkg/scheduler/factory/factory_test.go b/plugin/pkg/scheduler/factory/factory_test.go index 80e1864bc08..4cfbf62ac39 100644 --- a/plugin/pkg/scheduler/factory/factory_test.go +++ b/plugin/pkg/scheduler/factory/factory_test.go @@ -43,7 +43,7 @@ func TestCreate(t *testing.T) { defer server.Close() client := client.NewOrDie(&client.Config{Host: server.URL, Version: testapi.Version()}) factory := NewConfigFactory(client) - factory.Create(nil, nil) + factory.Create() } func TestCreateLists(t *testing.T) { diff --git a/plugin/pkg/scheduler/factory/plugins.go b/plugin/pkg/scheduler/factory/plugins.go new file mode 100644 index 00000000000..ca77bb019f9 --- /dev/null +++ b/plugin/pkg/scheduler/factory/plugins.go @@ -0,0 +1,147 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package factory + +import ( + "fmt" + "sync" + + algorithm "github.com/GoogleCloudPlatform/kubernetes/pkg/scheduler" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + + "github.com/golang/glog" +) + +var ( + schedulerFactoryMutex sync.Mutex + + // maps that hold registered algorithm types + fitPredicateMap = make(map[string]algorithm.FitPredicate) + priorityFunctionMap = make(map[string]algorithm.PriorityConfig) + algorithmProviderMap = make(map[string]AlgorithmProviderConfig) +) + +const ( + DefaultProvider = "default" +) + +type AlgorithmProviderConfig struct { + FitPredicateKeys util.StringSet + PriorityFunctionKeys util.StringSet +} + +// RegisterFitPredicate registers a fit predicate with the algorithm registry. Returns the key, +// with which the predicate was registered. +func RegisterFitPredicate(key string, predicate algorithm.FitPredicate) string { + schedulerFactoryMutex.Lock() + defer schedulerFactoryMutex.Unlock() + fitPredicateMap[key] = predicate + return key +} + +// IsFitPredicateRegistered check is useful for testing providers. +func IsFitPredicateRegistered(key string) bool { + schedulerFactoryMutex.Lock() + defer schedulerFactoryMutex.Unlock() + _, ok := fitPredicateMap[key] + return ok +} + +// RegisterFitPredicate registers a priority function with the algorithm registry. Returns the key, +// with which the function was registered. +func RegisterPriorityFunction(key string, function algorithm.PriorityFunction, weight int) string { + schedulerFactoryMutex.Lock() + defer schedulerFactoryMutex.Unlock() + priorityFunctionMap[key] = algorithm.PriorityConfig{Function: function, Weight: weight} + return key +} + +// IsPriorityFunctionRegistered check is useful for testing providers. +func IsPriorityFunctionRegistered(key string) bool { + schedulerFactoryMutex.Lock() + defer schedulerFactoryMutex.Unlock() + _, ok := priorityFunctionMap[key] + return ok +} + +// SetPriorityFunctionWeight sets the weight of an already registered priority function. +func SetPriorityFunctionWeight(key string, weight int) { + schedulerFactoryMutex.Lock() + defer schedulerFactoryMutex.Unlock() + config, ok := priorityFunctionMap[key] + if !ok { + glog.Errorf("Invalid priority key %s specified - no corresponding function found", key) + return + } + config.Weight = weight +} + +// RegisterAlgorithmProvider registers a new algorithm provider with the algorithm registry. This should +// be called from the init function in a provider plugin. +func RegisterAlgorithmProvider(name string, predicateKeys, priorityKeys util.StringSet) string { + schedulerFactoryMutex.Lock() + defer schedulerFactoryMutex.Unlock() + algorithmProviderMap[name] = AlgorithmProviderConfig{ + FitPredicateKeys: predicateKeys, + PriorityFunctionKeys: priorityKeys, + } + return name +} + +// GetAlgorithmProvider should not be used to modify providers. It is publicly visible for testing. +func GetAlgorithmProvider(name string) (*AlgorithmProviderConfig, error) { + schedulerFactoryMutex.Lock() + defer schedulerFactoryMutex.Unlock() + + var provider AlgorithmProviderConfig + provider, ok := algorithmProviderMap[name] + if !ok { + return nil, fmt.Errorf("plugin '%v' has not been registered", provider) + } + + return &provider, nil +} + +func getFitPredicateFunctions(keys util.StringSet) ([]algorithm.FitPredicate, error) { + schedulerFactoryMutex.Lock() + defer schedulerFactoryMutex.Unlock() + + predicates := []algorithm.FitPredicate{} + for _, key := range keys.List() { + function, ok := fitPredicateMap[key] + if !ok { + return nil, fmt.Errorf("Invalid predicate key %s specified - no corresponding function found", key) + } + predicates = append(predicates, function) + } + return predicates, nil +} + +func getPriorityFunctionConfigs(keys util.StringSet) ([]algorithm.PriorityConfig, error) { + schedulerFactoryMutex.Lock() + defer schedulerFactoryMutex.Unlock() + + configs := []algorithm.PriorityConfig{} + for _, key := range keys.List() { + config, ok := priorityFunctionMap[key] + if !ok { + return nil, fmt.Errorf("Invalid priority key %s specified - no corresponding function found", key) + } + configs = append(configs, config) + } + return configs, nil +}