Merge pull request #2505 from abhgupta/abhgupta-dev

Scheduler changes for extensibility
This commit is contained in:
Brendan Burns
2014-12-09 15:25:37 -08:00
12 changed files with 444 additions and 115 deletions

View File

@@ -60,8 +60,11 @@ func main() {
go http.ListenAndServe(net.JoinHostPort(address.String(), strconv.Itoa(*port)), nil)
configFactory := &factory.ConfigFactory{Client: kubeClient}
config := configFactory.Create()
configFactory := factory.NewConfigFactory(kubeClient)
config, err := configFactory.Create(nil, nil)
if err != nil {
glog.Fatalf("Failed to create scheduler configuration: %v", err)
}
s := scheduler.New(config)
s.Run()

View File

@@ -40,46 +40,77 @@ import (
// ConfigFactory knows how to fill out a scheduler config with its support functions.
type ConfigFactory struct {
Client *client.Client
// queue for pods that need scheduling
PodQueue *cache.FIFO
// a means to list all scheduled pods
PodLister *storeToPodLister
// a means to list all minions
MinionLister *storeToMinionLister
// map of strings to predicate functions to be used
// to filter the minions for scheduling pods
PredicateMap map[string]algorithm.FitPredicate
// map of strings to priority config to be used
// to prioritize the filtered minions for scheduling pods
PriorityMap map[string]algorithm.PriorityConfig
}
// NewConfigFactory initializes the factory.
func NewConfigFactory(client *client.Client) *ConfigFactory {
// initialize the factory struct
factory := &ConfigFactory{
Client: client,
PodQueue: cache.NewFIFO(),
PodLister: &storeToPodLister{cache.NewStore()},
MinionLister: &storeToMinionLister{cache.NewStore()},
PredicateMap: make(map[string]algorithm.FitPredicate),
PriorityMap: make(map[string]algorithm.PriorityConfig),
}
factory.addDefaultPredicates()
factory.addDefaultPriorities()
return factory
}
// Create creates a scheduler and all support functions.
func (factory *ConfigFactory) Create() *scheduler.Config {
func (factory *ConfigFactory) Create(predicateKeys, priorityKeys []string) (*scheduler.Config, error) {
if predicateKeys == nil {
glog.V(2).Infof("Custom predicates list not provided, using default predicates")
predicateKeys = []string{"PodFitsPorts", "PodFitsResources", "NoDiskConflict", "MatchNodeSelector"}
}
predicateFuncs, err := factory.getPredicateFunctions(predicateKeys)
if err != nil {
return nil, err
}
if priorityKeys == nil {
glog.V(2).Infof("Custom priority list not provided, using default priority: LeastRequestedPriority")
priorityKeys = []string{"LeastRequestedPriority"}
}
priorityConfigs, err := factory.getPriorityConfigs(priorityKeys)
if err != nil {
return nil, err
}
// Watch and queue pods that need scheduling.
podQueue := cache.NewFIFO()
cache.NewReflector(factory.createUnassignedPodLW(), &api.Pod{}, podQueue).Run()
cache.NewReflector(factory.createUnassignedPodLW(), &api.Pod{}, factory.PodQueue).Run()
// Watch and cache all running pods. Scheduler needs to find all pods
// so it knows where it's safe to place a pod. Cache this locally.
podCache := cache.NewStore()
cache.NewReflector(factory.createAssignedPodLW(), &api.Pod{}, podCache).Run()
cache.NewReflector(factory.createAssignedPodLW(), &api.Pod{}, factory.PodLister.Store).Run()
// Watch minions.
// Minions may be listed frequently, so provide a local up-to-date cache.
minionCache := cache.NewStore()
if false {
// Disable this code until minions support watches.
cache.NewReflector(factory.createMinionLW(), &api.Minion{}, minionCache).Run()
cache.NewReflector(factory.createMinionLW(), &api.Minion{}, factory.MinionLister.Store).Run()
} else {
cache.NewPoller(factory.pollMinions, 10*time.Second, minionCache).Run()
cache.NewPoller(factory.pollMinions, 10*time.Second, factory.MinionLister.Store).Run()
}
r := rand.New(rand.NewSource(time.Now().UnixNano()))
minionLister := &storeToMinionLister{minionCache}
algo := algorithm.NewGenericScheduler(
[]algorithm.FitPredicate{
// Fit is defined based on the absence of port conflicts.
algorithm.PodFitsPorts,
// Fit is determined by resource availability
algorithm.NewResourceFitPredicate(minionLister),
// Fit is determined by non-conflicting disk volumes
algorithm.NoDiskConflict,
// Fit is determined by node selector query
algorithm.NewSelectorMatchPredicate(minionLister),
},
// Prioritize nodes by least requested utilization.
algorithm.LeastRequestedPriority,
&storeToPodLister{podCache}, r)
algo := algorithm.NewGenericScheduler(predicateFuncs, priorityConfigs, factory.PodLister, r)
podBackoff := podBackoff{
perPodBackoff: map[string]*backoffEntry{},
@@ -87,19 +118,77 @@ func (factory *ConfigFactory) Create() *scheduler.Config {
}
return &scheduler.Config{
MinionLister: minionLister,
MinionLister: factory.MinionLister,
Algorithm: algo,
Binder: &binder{factory.Client},
NextPod: func() *api.Pod {
pod := podQueue.Pop().(*api.Pod)
glog.V(2).Infof("About to try and schedule pod %v\n"+
"\tknown minions: %v\n"+
"\tknown scheduled pods: %v\n",
pod.Name, minionCache.ContainedIDs(), podCache.ContainedIDs())
pod := factory.PodQueue.Pop().(*api.Pod)
glog.V(2).Infof("glog.v2 --> About to try and schedule pod %v", pod.Name)
return pod
},
Error: factory.makeDefaultErrorFunc(&podBackoff, podQueue),
Error: factory.makeDefaultErrorFunc(&podBackoff, factory.PodQueue),
}, nil
}
func (factory *ConfigFactory) getPredicateFunctions(keys []string) ([]algorithm.FitPredicate, error) {
predicates := []algorithm.FitPredicate{}
for _, key := range keys {
function, ok := factory.PredicateMap[key]
if !ok {
return nil, fmt.Errorf("Invalid predicate key %s specified - no corresponding function found", key)
}
predicates = append(predicates, function)
}
return predicates, nil
}
func (factory *ConfigFactory) getPriorityConfigs(keys []string) ([]algorithm.PriorityConfig, error) {
configs := []algorithm.PriorityConfig{}
for _, key := range keys {
config, ok := factory.PriorityMap[key]
if !ok {
return nil, fmt.Errorf("Invalid priority key %s specified - no corresponding function found", key)
}
configs = append(configs, config)
}
return configs, nil
}
func (factory *ConfigFactory) addDefaultPredicates() {
// Fit is defined based on the absence of port conflicts.
factory.AddPredicate("PodFitsPorts", algorithm.PodFitsPorts)
// Fit is determined by resource availability
factory.AddPredicate("PodFitsResources", algorithm.NewResourceFitPredicate(factory.MinionLister))
// Fit is determined by non-conflicting disk volumes
factory.AddPredicate("NoDiskConflict", algorithm.NoDiskConflict)
// Fit is determined by node selector query
factory.AddPredicate("MatchNodeSelector", algorithm.NewSelectorMatchPredicate(factory.MinionLister))
}
func (factory *ConfigFactory) AddPredicate(key string, function algorithm.FitPredicate) {
factory.PredicateMap[key] = function
}
func (factory *ConfigFactory) addDefaultPriorities() {
// Prioritize nodes by least requested utilization.
factory.AddPriority("LeastRequestedPriority", algorithm.LeastRequestedPriority, 1)
// spreads pods by minimizing the number of pods on the same minion with the same labels.
factory.AddPriority("SpreadingPriority", algorithm.CalculateSpreadPriority, 1)
// EqualPriority is a prioritizer function that gives an equal weight of one to all minions
factory.AddPriority("EqualPriority", algorithm.EqualPriority, 0)
}
func (factory *ConfigFactory) AddPriority(key string, function algorithm.PriorityFunction, weight int) {
factory.PriorityMap[key] = algorithm.PriorityConfig{Function: function, Weight: weight}
}
func (factory *ConfigFactory) SetWeight(key string, weight int) {
config, ok := factory.PriorityMap[key]
if !ok {
glog.Errorf("Invalid priority key %s specified - no corresponding function found", key)
return
}
config.Weight = weight
}
type listWatch struct {

View File

@@ -42,12 +42,12 @@ func TestCreate(t *testing.T) {
server := httptest.NewServer(&handler)
defer server.Close()
client := client.NewOrDie(&client.Config{Host: server.URL, Version: testapi.Version()})
factory := ConfigFactory{client}
factory.Create()
factory := NewConfigFactory(client)
factory.Create(nil, nil)
}
func TestCreateLists(t *testing.T) {
factory := ConfigFactory{nil}
factory := NewConfigFactory(nil)
table := []struct {
location string
factory func() *listWatch
@@ -85,7 +85,7 @@ func TestCreateLists(t *testing.T) {
}
func TestCreateWatches(t *testing.T) {
factory := ConfigFactory{nil}
factory := NewConfigFactory(nil)
table := []struct {
rv string
location string
@@ -136,6 +136,7 @@ func TestCreateWatches(t *testing.T) {
server := httptest.NewServer(&handler)
defer server.Close()
factory.Client = client.NewOrDie(&client.Config{Host: server.URL, Version: testapi.Version()})
// This test merely tests that the correct request is made.
item.factory().Watch(item.rv)
handler.ValidateRequest(t, item.location, "GET", nil)
@@ -167,7 +168,7 @@ func TestPollMinions(t *testing.T) {
server := httptest.NewServer(mux)
defer server.Close()
client := client.NewOrDie(&client.Config{Host: server.URL, Version: testapi.Version()})
cf := ConfigFactory{client}
cf := NewConfigFactory(client)
ce, err := cf.pollMinions()
if err != nil {
@@ -194,7 +195,7 @@ func TestDefaultErrorFunc(t *testing.T) {
mux.Handle("/api/"+testapi.Version()+"/pods/foo", &handler)
server := httptest.NewServer(mux)
defer server.Close()
factory := ConfigFactory{client.NewOrDie(&client.Config{Host: server.URL, Version: testapi.Version()})}
factory := NewConfigFactory(client.NewOrDie(&client.Config{Host: server.URL, Version: testapi.Version()}))
queue := cache.NewFIFO()
podBackoff := podBackoff{
perPodBackoff: map[string]*backoffEntry{},