diff --git a/cmd/integration/integration.go b/cmd/integration/integration.go index fc673027bc6..3a5026fd8c7 100644 --- a/cmd/integration/integration.go +++ b/cmd/integration/integration.go @@ -168,8 +168,11 @@ func startComponents(manifestURL string) (apiServerURL string) { handler.delegate = m.Handler // Scheduler - schedulerConfigFactory := &factory.ConfigFactory{cl} - schedulerConfig := schedulerConfigFactory.Create() + schedulerConfigFactory := factory.NewConfigFactory(cl) + schedulerConfig, err := schedulerConfigFactory.Create(nil, nil) + if err != nil { + glog.Fatal("Couldn't create scheduler config: %v", err) + } scheduler.New(schedulerConfig).Run() endpoints := service.NewEndpointController(cl) diff --git a/pkg/scheduler/generic_scheduler.go b/pkg/scheduler/generic_scheduler.go index a65b726e9f0..4661c68ff0a 100644 --- a/pkg/scheduler/generic_scheduler.go +++ b/pkg/scheduler/generic_scheduler.go @@ -38,10 +38,15 @@ func (g *genericScheduler) Schedule(pod api.Pod, minionLister MinionLister) (str if err != nil { return "", err } + if len(minions.Items) == 0 { + return "", fmt.Errorf("no minions available to schedule pods") + } + filteredNodes, err := findNodesThatFit(pod, g.pods, g.predicates, minions) if err != nil { return "", err } + priorityList, err := g.prioritizer(pod, g.pods, FakeMinionLister(filteredNodes)) if err != nil { return "", err @@ -49,6 +54,7 @@ func (g *genericScheduler) Schedule(pod api.Pod, minionLister MinionLister) (str if len(priorityList) == 0 { return "", fmt.Errorf("failed to find a fit for pod: %v", pod) } + return g.selectHost(priorityList) } diff --git a/pkg/scheduler/predicates.go b/pkg/scheduler/predicates.go index 782ca86399d..f8566184555 100644 --- a/pkg/scheduler/predicates.go +++ b/pkg/scheduler/predicates.go @@ -74,6 +74,7 @@ func isVolumeConflict(volume api.Volume, pod *api.Pod) bool { // TODO: migrate this into some per-volume specific code? func NoDiskConflict(pod api.Pod, existingPods []api.Pod, node string) (bool, error) { manifest := &(pod.Spec) + glog.Errorf("custom predicate NoDiskConflict --> node: %s", node) for ix := range manifest.Volumes { for podIx := range existingPods { if isVolumeConflict(manifest.Volumes[ix], &existingPods[podIx]) { @@ -104,6 +105,7 @@ func getResourceRequest(pod *api.Pod) resourceRequest { // PodFitsResources calculates fit based on requested, rather than used resources func (r *ResourceFit) PodFitsResources(pod api.Pod, existingPods []api.Pod, node string) (bool, error) { + glog.Errorf("custom predicate PodFitsResources --> node: %s", node) podRequest := getResourceRequest(&pod) if podRequest.milliCPU == 0 && podRequest.memory == 0 { // no resources requested always fits. @@ -152,6 +154,7 @@ type NodeSelector struct { func (n *NodeSelector) PodSelectorMatches(pod api.Pod, existingPods []api.Pod, node string) (bool, error) { if len(pod.Spec.NodeSelector) == 0 { + glog.Errorf("custom predicate PodSelectorMatches --> node: %s", node) return true, nil } selector := labels.SelectorFromSet(pod.Spec.NodeSelector) @@ -163,6 +166,7 @@ func (n *NodeSelector) PodSelectorMatches(pod api.Pod, existingPods []api.Pod, n } func PodFitsPorts(pod api.Pod, existingPods []api.Pod, node string) (bool, error) { + glog.Errorf("custom predicate PodFitsPorts --> node: %s", node) existingPorts := getUsedPorts(existingPods...) wantPorts := getUsedPorts(pod) for wport := range wantPorts { diff --git a/pkg/standalone/standalone.go b/pkg/standalone/standalone.go index 7c3b09c9369..7941a8eaeb6 100644 --- a/pkg/standalone/standalone.go +++ b/pkg/standalone/standalone.go @@ -104,8 +104,11 @@ func RunApiServer(cl *client.Client, etcdClient tools.EtcdClient, addr string, p // RunScheduler starts up a scheduler in it's own goroutine func RunScheduler(cl *client.Client) { // Scheduler - schedulerConfigFactory := &factory.ConfigFactory{cl} - schedulerConfig := schedulerConfigFactory.Create() + schedulerConfigFactory := factory.NewConfigFactory(cl) + schedulerConfig, err := schedulerConfigFactory.Create(nil, nil) + if err != nil { + glog.Fatal("Couldn't create scheduler config: %v", err) + } scheduler.New(schedulerConfig).Run() } diff --git a/plugin/cmd/kube-scheduler/scheduler.go b/plugin/cmd/kube-scheduler/scheduler.go index 3b5541af0be..340ffde9f80 100644 --- a/plugin/cmd/kube-scheduler/scheduler.go +++ b/plugin/cmd/kube-scheduler/scheduler.go @@ -60,8 +60,14 @@ func main() { go http.ListenAndServe(net.JoinHostPort(address.String(), strconv.Itoa(*port)), nil) - configFactory := &factory.ConfigFactory{Client: kubeClient} - config := configFactory.Create() + configFactory := factory.NewConfigFactory(kubeClient) + configFactory.AddPredicate("CreateOnMinion1", scheduler.CreateOnMinion1) + configFactory.AddPredicate("CreateOnMinion2", scheduler.CreateOnMinion2) + + config, err := configFactory.Create([]string{"CreateOnMinion2"}, nil) + if err != nil { + glog.Fatalf("Failed to create scheduler configuration: %v", err) + } s := scheduler.New(config) s.Run() diff --git a/plugin/pkg/scheduler/custom_predicates.go b/plugin/pkg/scheduler/custom_predicates.go new file mode 100644 index 00000000000..bdee3950871 --- /dev/null +++ b/plugin/pkg/scheduler/custom_predicates.go @@ -0,0 +1,44 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scheduler + +import ( + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/golang/glog" +) + +func CreateOnMinion1(pod api.Pod, existingPods []api.Pod, node string) (bool, error) { + glog.V(2).Infof("custom predicate minion1 --> node: %s", node) + if node == "10.245.2.2" { + glog.V(2).Infof("custom predicate minion1 matched") + return true, nil + } else { + glog.V(2).Infof("custom predicate minion1 did not match") + return false, nil + } +} + +func CreateOnMinion2(pod api.Pod, existingPods []api.Pod, node string) (bool, error) { + glog.V(2).Infof("custom predicate minion2 --> node: %s", node) + if node == "10.245.2.3" { + glog.V(2).Infof("custom predicate minion2 matched") + return true, nil + } else { + glog.V(2).Infof("custom predicate minion2 did not match") + return false, nil + } +} diff --git a/plugin/pkg/scheduler/factory/factory.go b/plugin/pkg/scheduler/factory/factory.go index b827edc4148..db6dd2278c7 100644 --- a/plugin/pkg/scheduler/factory/factory.go +++ b/plugin/pkg/scheduler/factory/factory.go @@ -37,49 +37,82 @@ import ( "github.com/golang/glog" ) -// ConfigFactory knows how to fill out a scheduler config with its support functions. -type ConfigFactory struct { +// configFactory knows how to fill out a scheduler config with its support functions. +type configFactory struct { Client *client.Client + // queue for pods that need scheduling + PodQueue *cache.FIFO + // a means to list all scheduled pods + PodLister *storeToPodLister + // a means to list all minions + MinionLister *storeToMinionLister + // map of strings to predicate functions to be used + // to filter the minions for scheduling pods + PredicateMap map[string]algorithm.FitPredicate + // map of strings to priority functions to be used + // to prioritize the filtered minions for scheduling pods + PriorityMap map[string]algorithm.PriorityFunction +} + +// NewConfigFactory initializes the factory. +func NewConfigFactory(client *client.Client) *configFactory { + // initialize the factory struct + factory := &configFactory{Client: client, + PodQueue: cache.NewFIFO(), + PodLister: &storeToPodLister{cache.NewStore()}, + MinionLister: &storeToMinionLister{cache.NewStore()}, + PredicateMap: make(map[string]algorithm.FitPredicate), + PriorityMap: make(map[string]algorithm.PriorityFunction), + } + + // add default predicates + factory.addDefaultPredicates() + + // add default predicates + factory.addDefaultPriorities() + + return factory } // Create creates a scheduler and all support functions. -func (factory *ConfigFactory) Create() *scheduler.Config { +func (factory *configFactory) Create(predicateKeys, priorityKeys []string) (*scheduler.Config, error) { + if predicateKeys == nil { + glog.V(2).Infof("Custom predicates list not provided, using default predicates") + predicateKeys = []string{"PodFitsPorts", "PodFitsResources", "NoDiskConflict", "MatchNodeSelector"} + } + predicateFuncs, err := factory.getPredicateFunctions(predicateKeys) + if err != nil { + return nil, err + } + + if priorityKeys == nil { + glog.V(2).Infof("Custom priority list not provided, using default priorities") + priorityKeys = []string{"LeastRequestedPriority"} + } + priorityFuncs, err := factory.getPriorityFunctions(priorityKeys) + if err != nil { + return nil, err + } + // Watch and queue pods that need scheduling. - podQueue := cache.NewFIFO() - cache.NewReflector(factory.createUnassignedPodLW(), &api.Pod{}, podQueue).Run() + cache.NewReflector(factory.createUnassignedPodLW(), &api.Pod{}, factory.PodQueue).Run() // Watch and cache all running pods. Scheduler needs to find all pods // so it knows where it's safe to place a pod. Cache this locally. - podCache := cache.NewStore() - cache.NewReflector(factory.createAssignedPodLW(), &api.Pod{}, podCache).Run() + cache.NewReflector(factory.createAssignedPodLW(), &api.Pod{}, factory.PodLister.Store).Run() // Watch minions. // Minions may be listed frequently, so provide a local up-to-date cache. - minionCache := cache.NewStore() if false { // Disable this code until minions support watches. - cache.NewReflector(factory.createMinionLW(), &api.Minion{}, minionCache).Run() + cache.NewReflector(factory.createMinionLW(), &api.Minion{}, factory.MinionLister.Store).Run() } else { - cache.NewPoller(factory.pollMinions, 10*time.Second, minionCache).Run() + cache.NewPoller(factory.pollMinions, 10*time.Second, factory.MinionLister.Store).Run() } r := rand.New(rand.NewSource(time.Now().UnixNano())) - minionLister := &storeToMinionLister{minionCache} - algo := algorithm.NewGenericScheduler( - []algorithm.FitPredicate{ - // Fit is defined based on the absence of port conflicts. - algorithm.PodFitsPorts, - // Fit is determined by resource availability - algorithm.NewResourceFitPredicate(minionLister), - // Fit is determined by non-conflicting disk volumes - algorithm.NoDiskConflict, - // Fit is determined by node selector query - algorithm.NewSelectorMatchPredicate(minionLister), - }, - // Prioritize nodes by least requested utilization. - algorithm.LeastRequestedPriority, - &storeToPodLister{podCache}, r) + algo := algorithm.NewGenericScheduler(predicateFuncs, priorityFuncs[0], factory.PodLister, r) podBackoff := podBackoff{ perPodBackoff: map[string]*backoffEntry{}, @@ -87,19 +120,64 @@ func (factory *ConfigFactory) Create() *scheduler.Config { } return &scheduler.Config{ - MinionLister: minionLister, + MinionLister: factory.MinionLister, Algorithm: algo, Binder: &binder{factory.Client}, NextPod: func() *api.Pod { - pod := podQueue.Pop().(*api.Pod) - glog.V(2).Infof("About to try and schedule pod %v\n"+ - "\tknown minions: %v\n"+ - "\tknown scheduled pods: %v\n", - pod.Name, minionCache.ContainedIDs(), podCache.ContainedIDs()) + pod := factory.PodQueue.Pop().(*api.Pod) + glog.V(2).Infof("glog.v2 --> About to try and schedule pod %v", pod.Name) + glog.Errorf("glog.error --> About to try and schedule pod %v", pod.Name) return pod }, - Error: factory.makeDefaultErrorFunc(&podBackoff, podQueue), + Error: factory.makeDefaultErrorFunc(&podBackoff, factory.PodQueue), + }, nil +} + +func (factory *configFactory) getPredicateFunctions(keys []string) ([]algorithm.FitPredicate, error) { + var function algorithm.FitPredicate + predicates := []algorithm.FitPredicate{} + for _, key := range keys { + glog.Errorf("Adding predicate function for key: %s", key) + function = factory.PredicateMap[key] + if function == nil { + return nil, fmt.Errorf("Invalid predicate key %s specified - no corresponding function found", key) + } + predicates = append(predicates, function) } + return predicates, nil +} + +func (factory *configFactory) getPriorityFunctions(keys []string) ([]algorithm.PriorityFunction, error) { + var function algorithm.PriorityFunction + priorities := []algorithm.PriorityFunction{} + for _, key := range keys { + function = factory.PriorityMap[key] + if function == nil { + return nil, fmt.Errorf("Invalid priority key %s specified - no corresponding function found", key) + } + priorities = append(priorities, function) + } + return priorities, nil +} + +func (factory *configFactory) addDefaultPredicates() { + factory.AddPredicate("PodFitsPorts", algorithm.PodFitsPorts) + factory.AddPredicate("PodFitsResources", algorithm.NewResourceFitPredicate(factory.MinionLister)) + factory.AddPredicate("NoDiskConflict", algorithm.NoDiskConflict) + factory.AddPredicate("MatchNodeSelector", algorithm.NewSelectorMatchPredicate(factory.MinionLister)) +} + +func (factory *configFactory) AddPredicate(key string, function algorithm.FitPredicate) { + factory.PredicateMap[key] = function +} + +func (factory *configFactory) addDefaultPriorities() { + factory.AddPriority("LeastRequestedPriority", algorithm.LeastRequestedPriority) + factory.AddPriority("SpreadingPriority", algorithm.CalculateSpreadPriority) +} + +func (factory *configFactory) AddPriority(key string, function algorithm.PriorityFunction) { + factory.PriorityMap[key] = function } type listWatch struct { @@ -129,7 +207,7 @@ func (lw *listWatch) Watch(resourceVersion string) (watch.Interface, error) { // createUnassignedPodLW returns a listWatch that finds all pods that need to be // scheduled. -func (factory *ConfigFactory) createUnassignedPodLW() *listWatch { +func (factory *configFactory) createUnassignedPodLW() *listWatch { return &listWatch{ client: factory.Client, fieldSelector: labels.Set{"DesiredState.Host": ""}.AsSelector(), @@ -147,7 +225,7 @@ func parseSelectorOrDie(s string) labels.Selector { // createAssignedPodLW returns a listWatch that finds all pods that are // already scheduled. -func (factory *ConfigFactory) createAssignedPodLW() *listWatch { +func (factory *configFactory) createAssignedPodLW() *listWatch { return &listWatch{ client: factory.Client, fieldSelector: parseSelectorOrDie("DesiredState.Host!="), @@ -156,7 +234,7 @@ func (factory *ConfigFactory) createAssignedPodLW() *listWatch { } // createMinionLW returns a listWatch that gets all changes to minions. -func (factory *ConfigFactory) createMinionLW() *listWatch { +func (factory *configFactory) createMinionLW() *listWatch { return &listWatch{ client: factory.Client, fieldSelector: parseSelectorOrDie(""), @@ -165,7 +243,7 @@ func (factory *ConfigFactory) createMinionLW() *listWatch { } // pollMinions lists all minions and returns an enumerator for cache.Poller. -func (factory *ConfigFactory) pollMinions() (cache.Enumerator, error) { +func (factory *configFactory) pollMinions() (cache.Enumerator, error) { list := &api.MinionList{} err := factory.Client.Get().Path("minions").Do().Into(list) if err != nil { @@ -174,7 +252,7 @@ func (factory *ConfigFactory) pollMinions() (cache.Enumerator, error) { return &minionEnumerator{list}, nil } -func (factory *ConfigFactory) makeDefaultErrorFunc(backoff *podBackoff, podQueue *cache.FIFO) func(pod *api.Pod, err error) { +func (factory *configFactory) makeDefaultErrorFunc(backoff *podBackoff, podQueue *cache.FIFO) func(pod *api.Pod, err error) { return func(pod *api.Pod, err error) { glog.Errorf("Error scheduling %v: %v; retrying", pod.Name, err) backoff.gc() diff --git a/plugin/pkg/scheduler/factory/factory_test.go b/plugin/pkg/scheduler/factory/factory_test.go index d1bb337d6ca..6c7d504c495 100644 --- a/plugin/pkg/scheduler/factory/factory_test.go +++ b/plugin/pkg/scheduler/factory/factory_test.go @@ -42,12 +42,12 @@ func TestCreate(t *testing.T) { server := httptest.NewServer(&handler) defer server.Close() client := client.NewOrDie(&client.Config{Host: server.URL, Version: testapi.Version()}) - factory := ConfigFactory{client} - factory.Create() + factory := NewConfigFactory(client) + factory.Create(nil, nil) } func TestCreateLists(t *testing.T) { - factory := ConfigFactory{nil} + factory := NewConfigFactory(nil) table := []struct { location string factory func() *listWatch @@ -85,7 +85,7 @@ func TestCreateLists(t *testing.T) { } func TestCreateWatches(t *testing.T) { - factory := ConfigFactory{nil} + factory := NewConfigFactory(nil) table := []struct { rv string location string @@ -136,6 +136,7 @@ func TestCreateWatches(t *testing.T) { server := httptest.NewServer(&handler) defer server.Close() factory.Client = client.NewOrDie(&client.Config{Host: server.URL, Version: testapi.Version()}) + // This test merely tests that the correct request is made. item.factory().Watch(item.rv) handler.ValidateRequest(t, item.location, "GET", nil) @@ -167,7 +168,7 @@ func TestPollMinions(t *testing.T) { server := httptest.NewServer(mux) defer server.Close() client := client.NewOrDie(&client.Config{Host: server.URL, Version: testapi.Version()}) - cf := ConfigFactory{client} + cf := NewConfigFactory(client) ce, err := cf.pollMinions() if err != nil { @@ -194,7 +195,7 @@ func TestDefaultErrorFunc(t *testing.T) { mux.Handle("/api/"+testapi.Version()+"/pods/foo", &handler) server := httptest.NewServer(mux) defer server.Close() - factory := ConfigFactory{client.NewOrDie(&client.Config{Host: server.URL, Version: testapi.Version()})} + factory := NewConfigFactory(client.NewOrDie(&client.Config{Host: server.URL, Version: testapi.Version()})) queue := cache.NewFIFO() podBackoff := podBackoff{ perPodBackoff: map[string]*backoffEntry{},