From 6b712cc700f3627d8bc56ca92f42434670d74a9c Mon Sep 17 00:00:00 2001 From: Abhishek Gupta Date: Wed, 19 Nov 2014 08:16:25 -0800 Subject: [PATCH 1/6] Scheduler changes for extensibility --- cmd/integration/integration.go | 7 +- pkg/scheduler/generic_scheduler.go | 6 + pkg/scheduler/predicates.go | 4 + pkg/standalone/standalone.go | 7 +- plugin/cmd/kube-scheduler/scheduler.go | 10 +- plugin/pkg/scheduler/custom_predicates.go | 44 ++++++ plugin/pkg/scheduler/factory/factory.go | 152 ++++++++++++++----- plugin/pkg/scheduler/factory/factory_test.go | 13 +- 8 files changed, 194 insertions(+), 49 deletions(-) create mode 100644 plugin/pkg/scheduler/custom_predicates.go diff --git a/cmd/integration/integration.go b/cmd/integration/integration.go index fc673027bc6..3a5026fd8c7 100644 --- a/cmd/integration/integration.go +++ b/cmd/integration/integration.go @@ -168,8 +168,11 @@ func startComponents(manifestURL string) (apiServerURL string) { handler.delegate = m.Handler // Scheduler - schedulerConfigFactory := &factory.ConfigFactory{cl} - schedulerConfig := schedulerConfigFactory.Create() + schedulerConfigFactory := factory.NewConfigFactory(cl) + schedulerConfig, err := schedulerConfigFactory.Create(nil, nil) + if err != nil { + glog.Fatal("Couldn't create scheduler config: %v", err) + } scheduler.New(schedulerConfig).Run() endpoints := service.NewEndpointController(cl) diff --git a/pkg/scheduler/generic_scheduler.go b/pkg/scheduler/generic_scheduler.go index a65b726e9f0..4661c68ff0a 100644 --- a/pkg/scheduler/generic_scheduler.go +++ b/pkg/scheduler/generic_scheduler.go @@ -38,10 +38,15 @@ func (g *genericScheduler) Schedule(pod api.Pod, minionLister MinionLister) (str if err != nil { return "", err } + if len(minions.Items) == 0 { + return "", fmt.Errorf("no minions available to schedule pods") + } + filteredNodes, err := findNodesThatFit(pod, g.pods, g.predicates, minions) if err != nil { return "", err } + priorityList, err := g.prioritizer(pod, g.pods, FakeMinionLister(filteredNodes)) if err != nil { return "", err @@ -49,6 +54,7 @@ func (g *genericScheduler) Schedule(pod api.Pod, minionLister MinionLister) (str if len(priorityList) == 0 { return "", fmt.Errorf("failed to find a fit for pod: %v", pod) } + return g.selectHost(priorityList) } diff --git a/pkg/scheduler/predicates.go b/pkg/scheduler/predicates.go index 782ca86399d..f8566184555 100644 --- a/pkg/scheduler/predicates.go +++ b/pkg/scheduler/predicates.go @@ -74,6 +74,7 @@ func isVolumeConflict(volume api.Volume, pod *api.Pod) bool { // TODO: migrate this into some per-volume specific code? func NoDiskConflict(pod api.Pod, existingPods []api.Pod, node string) (bool, error) { manifest := &(pod.Spec) + glog.Errorf("custom predicate NoDiskConflict --> node: %s", node) for ix := range manifest.Volumes { for podIx := range existingPods { if isVolumeConflict(manifest.Volumes[ix], &existingPods[podIx]) { @@ -104,6 +105,7 @@ func getResourceRequest(pod *api.Pod) resourceRequest { // PodFitsResources calculates fit based on requested, rather than used resources func (r *ResourceFit) PodFitsResources(pod api.Pod, existingPods []api.Pod, node string) (bool, error) { + glog.Errorf("custom predicate PodFitsResources --> node: %s", node) podRequest := getResourceRequest(&pod) if podRequest.milliCPU == 0 && podRequest.memory == 0 { // no resources requested always fits. @@ -152,6 +154,7 @@ type NodeSelector struct { func (n *NodeSelector) PodSelectorMatches(pod api.Pod, existingPods []api.Pod, node string) (bool, error) { if len(pod.Spec.NodeSelector) == 0 { + glog.Errorf("custom predicate PodSelectorMatches --> node: %s", node) return true, nil } selector := labels.SelectorFromSet(pod.Spec.NodeSelector) @@ -163,6 +166,7 @@ func (n *NodeSelector) PodSelectorMatches(pod api.Pod, existingPods []api.Pod, n } func PodFitsPorts(pod api.Pod, existingPods []api.Pod, node string) (bool, error) { + glog.Errorf("custom predicate PodFitsPorts --> node: %s", node) existingPorts := getUsedPorts(existingPods...) wantPorts := getUsedPorts(pod) for wport := range wantPorts { diff --git a/pkg/standalone/standalone.go b/pkg/standalone/standalone.go index 7c3b09c9369..7941a8eaeb6 100644 --- a/pkg/standalone/standalone.go +++ b/pkg/standalone/standalone.go @@ -104,8 +104,11 @@ func RunApiServer(cl *client.Client, etcdClient tools.EtcdClient, addr string, p // RunScheduler starts up a scheduler in it's own goroutine func RunScheduler(cl *client.Client) { // Scheduler - schedulerConfigFactory := &factory.ConfigFactory{cl} - schedulerConfig := schedulerConfigFactory.Create() + schedulerConfigFactory := factory.NewConfigFactory(cl) + schedulerConfig, err := schedulerConfigFactory.Create(nil, nil) + if err != nil { + glog.Fatal("Couldn't create scheduler config: %v", err) + } scheduler.New(schedulerConfig).Run() } diff --git a/plugin/cmd/kube-scheduler/scheduler.go b/plugin/cmd/kube-scheduler/scheduler.go index 3b5541af0be..340ffde9f80 100644 --- a/plugin/cmd/kube-scheduler/scheduler.go +++ b/plugin/cmd/kube-scheduler/scheduler.go @@ -60,8 +60,14 @@ func main() { go http.ListenAndServe(net.JoinHostPort(address.String(), strconv.Itoa(*port)), nil) - configFactory := &factory.ConfigFactory{Client: kubeClient} - config := configFactory.Create() + configFactory := factory.NewConfigFactory(kubeClient) + configFactory.AddPredicate("CreateOnMinion1", scheduler.CreateOnMinion1) + configFactory.AddPredicate("CreateOnMinion2", scheduler.CreateOnMinion2) + + config, err := configFactory.Create([]string{"CreateOnMinion2"}, nil) + if err != nil { + glog.Fatalf("Failed to create scheduler configuration: %v", err) + } s := scheduler.New(config) s.Run() diff --git a/plugin/pkg/scheduler/custom_predicates.go b/plugin/pkg/scheduler/custom_predicates.go new file mode 100644 index 00000000000..bdee3950871 --- /dev/null +++ b/plugin/pkg/scheduler/custom_predicates.go @@ -0,0 +1,44 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scheduler + +import ( + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/golang/glog" +) + +func CreateOnMinion1(pod api.Pod, existingPods []api.Pod, node string) (bool, error) { + glog.V(2).Infof("custom predicate minion1 --> node: %s", node) + if node == "10.245.2.2" { + glog.V(2).Infof("custom predicate minion1 matched") + return true, nil + } else { + glog.V(2).Infof("custom predicate minion1 did not match") + return false, nil + } +} + +func CreateOnMinion2(pod api.Pod, existingPods []api.Pod, node string) (bool, error) { + glog.V(2).Infof("custom predicate minion2 --> node: %s", node) + if node == "10.245.2.3" { + glog.V(2).Infof("custom predicate minion2 matched") + return true, nil + } else { + glog.V(2).Infof("custom predicate minion2 did not match") + return false, nil + } +} diff --git a/plugin/pkg/scheduler/factory/factory.go b/plugin/pkg/scheduler/factory/factory.go index b827edc4148..db6dd2278c7 100644 --- a/plugin/pkg/scheduler/factory/factory.go +++ b/plugin/pkg/scheduler/factory/factory.go @@ -37,49 +37,82 @@ import ( "github.com/golang/glog" ) -// ConfigFactory knows how to fill out a scheduler config with its support functions. -type ConfigFactory struct { +// configFactory knows how to fill out a scheduler config with its support functions. +type configFactory struct { Client *client.Client + // queue for pods that need scheduling + PodQueue *cache.FIFO + // a means to list all scheduled pods + PodLister *storeToPodLister + // a means to list all minions + MinionLister *storeToMinionLister + // map of strings to predicate functions to be used + // to filter the minions for scheduling pods + PredicateMap map[string]algorithm.FitPredicate + // map of strings to priority functions to be used + // to prioritize the filtered minions for scheduling pods + PriorityMap map[string]algorithm.PriorityFunction +} + +// NewConfigFactory initializes the factory. +func NewConfigFactory(client *client.Client) *configFactory { + // initialize the factory struct + factory := &configFactory{Client: client, + PodQueue: cache.NewFIFO(), + PodLister: &storeToPodLister{cache.NewStore()}, + MinionLister: &storeToMinionLister{cache.NewStore()}, + PredicateMap: make(map[string]algorithm.FitPredicate), + PriorityMap: make(map[string]algorithm.PriorityFunction), + } + + // add default predicates + factory.addDefaultPredicates() + + // add default predicates + factory.addDefaultPriorities() + + return factory } // Create creates a scheduler and all support functions. -func (factory *ConfigFactory) Create() *scheduler.Config { +func (factory *configFactory) Create(predicateKeys, priorityKeys []string) (*scheduler.Config, error) { + if predicateKeys == nil { + glog.V(2).Infof("Custom predicates list not provided, using default predicates") + predicateKeys = []string{"PodFitsPorts", "PodFitsResources", "NoDiskConflict", "MatchNodeSelector"} + } + predicateFuncs, err := factory.getPredicateFunctions(predicateKeys) + if err != nil { + return nil, err + } + + if priorityKeys == nil { + glog.V(2).Infof("Custom priority list not provided, using default priorities") + priorityKeys = []string{"LeastRequestedPriority"} + } + priorityFuncs, err := factory.getPriorityFunctions(priorityKeys) + if err != nil { + return nil, err + } + // Watch and queue pods that need scheduling. - podQueue := cache.NewFIFO() - cache.NewReflector(factory.createUnassignedPodLW(), &api.Pod{}, podQueue).Run() + cache.NewReflector(factory.createUnassignedPodLW(), &api.Pod{}, factory.PodQueue).Run() // Watch and cache all running pods. Scheduler needs to find all pods // so it knows where it's safe to place a pod. Cache this locally. - podCache := cache.NewStore() - cache.NewReflector(factory.createAssignedPodLW(), &api.Pod{}, podCache).Run() + cache.NewReflector(factory.createAssignedPodLW(), &api.Pod{}, factory.PodLister.Store).Run() // Watch minions. // Minions may be listed frequently, so provide a local up-to-date cache. - minionCache := cache.NewStore() if false { // Disable this code until minions support watches. - cache.NewReflector(factory.createMinionLW(), &api.Minion{}, minionCache).Run() + cache.NewReflector(factory.createMinionLW(), &api.Minion{}, factory.MinionLister.Store).Run() } else { - cache.NewPoller(factory.pollMinions, 10*time.Second, minionCache).Run() + cache.NewPoller(factory.pollMinions, 10*time.Second, factory.MinionLister.Store).Run() } r := rand.New(rand.NewSource(time.Now().UnixNano())) - minionLister := &storeToMinionLister{minionCache} - algo := algorithm.NewGenericScheduler( - []algorithm.FitPredicate{ - // Fit is defined based on the absence of port conflicts. - algorithm.PodFitsPorts, - // Fit is determined by resource availability - algorithm.NewResourceFitPredicate(minionLister), - // Fit is determined by non-conflicting disk volumes - algorithm.NoDiskConflict, - // Fit is determined by node selector query - algorithm.NewSelectorMatchPredicate(minionLister), - }, - // Prioritize nodes by least requested utilization. - algorithm.LeastRequestedPriority, - &storeToPodLister{podCache}, r) + algo := algorithm.NewGenericScheduler(predicateFuncs, priorityFuncs[0], factory.PodLister, r) podBackoff := podBackoff{ perPodBackoff: map[string]*backoffEntry{}, @@ -87,19 +120,64 @@ func (factory *ConfigFactory) Create() *scheduler.Config { } return &scheduler.Config{ - MinionLister: minionLister, + MinionLister: factory.MinionLister, Algorithm: algo, Binder: &binder{factory.Client}, NextPod: func() *api.Pod { - pod := podQueue.Pop().(*api.Pod) - glog.V(2).Infof("About to try and schedule pod %v\n"+ - "\tknown minions: %v\n"+ - "\tknown scheduled pods: %v\n", - pod.Name, minionCache.ContainedIDs(), podCache.ContainedIDs()) + pod := factory.PodQueue.Pop().(*api.Pod) + glog.V(2).Infof("glog.v2 --> About to try and schedule pod %v", pod.Name) + glog.Errorf("glog.error --> About to try and schedule pod %v", pod.Name) return pod }, - Error: factory.makeDefaultErrorFunc(&podBackoff, podQueue), + Error: factory.makeDefaultErrorFunc(&podBackoff, factory.PodQueue), + }, nil +} + +func (factory *configFactory) getPredicateFunctions(keys []string) ([]algorithm.FitPredicate, error) { + var function algorithm.FitPredicate + predicates := []algorithm.FitPredicate{} + for _, key := range keys { + glog.Errorf("Adding predicate function for key: %s", key) + function = factory.PredicateMap[key] + if function == nil { + return nil, fmt.Errorf("Invalid predicate key %s specified - no corresponding function found", key) + } + predicates = append(predicates, function) } + return predicates, nil +} + +func (factory *configFactory) getPriorityFunctions(keys []string) ([]algorithm.PriorityFunction, error) { + var function algorithm.PriorityFunction + priorities := []algorithm.PriorityFunction{} + for _, key := range keys { + function = factory.PriorityMap[key] + if function == nil { + return nil, fmt.Errorf("Invalid priority key %s specified - no corresponding function found", key) + } + priorities = append(priorities, function) + } + return priorities, nil +} + +func (factory *configFactory) addDefaultPredicates() { + factory.AddPredicate("PodFitsPorts", algorithm.PodFitsPorts) + factory.AddPredicate("PodFitsResources", algorithm.NewResourceFitPredicate(factory.MinionLister)) + factory.AddPredicate("NoDiskConflict", algorithm.NoDiskConflict) + factory.AddPredicate("MatchNodeSelector", algorithm.NewSelectorMatchPredicate(factory.MinionLister)) +} + +func (factory *configFactory) AddPredicate(key string, function algorithm.FitPredicate) { + factory.PredicateMap[key] = function +} + +func (factory *configFactory) addDefaultPriorities() { + factory.AddPriority("LeastRequestedPriority", algorithm.LeastRequestedPriority) + factory.AddPriority("SpreadingPriority", algorithm.CalculateSpreadPriority) +} + +func (factory *configFactory) AddPriority(key string, function algorithm.PriorityFunction) { + factory.PriorityMap[key] = function } type listWatch struct { @@ -129,7 +207,7 @@ func (lw *listWatch) Watch(resourceVersion string) (watch.Interface, error) { // createUnassignedPodLW returns a listWatch that finds all pods that need to be // scheduled. -func (factory *ConfigFactory) createUnassignedPodLW() *listWatch { +func (factory *configFactory) createUnassignedPodLW() *listWatch { return &listWatch{ client: factory.Client, fieldSelector: labels.Set{"DesiredState.Host": ""}.AsSelector(), @@ -147,7 +225,7 @@ func parseSelectorOrDie(s string) labels.Selector { // createAssignedPodLW returns a listWatch that finds all pods that are // already scheduled. -func (factory *ConfigFactory) createAssignedPodLW() *listWatch { +func (factory *configFactory) createAssignedPodLW() *listWatch { return &listWatch{ client: factory.Client, fieldSelector: parseSelectorOrDie("DesiredState.Host!="), @@ -156,7 +234,7 @@ func (factory *ConfigFactory) createAssignedPodLW() *listWatch { } // createMinionLW returns a listWatch that gets all changes to minions. -func (factory *ConfigFactory) createMinionLW() *listWatch { +func (factory *configFactory) createMinionLW() *listWatch { return &listWatch{ client: factory.Client, fieldSelector: parseSelectorOrDie(""), @@ -165,7 +243,7 @@ func (factory *ConfigFactory) createMinionLW() *listWatch { } // pollMinions lists all minions and returns an enumerator for cache.Poller. -func (factory *ConfigFactory) pollMinions() (cache.Enumerator, error) { +func (factory *configFactory) pollMinions() (cache.Enumerator, error) { list := &api.MinionList{} err := factory.Client.Get().Path("minions").Do().Into(list) if err != nil { @@ -174,7 +252,7 @@ func (factory *ConfigFactory) pollMinions() (cache.Enumerator, error) { return &minionEnumerator{list}, nil } -func (factory *ConfigFactory) makeDefaultErrorFunc(backoff *podBackoff, podQueue *cache.FIFO) func(pod *api.Pod, err error) { +func (factory *configFactory) makeDefaultErrorFunc(backoff *podBackoff, podQueue *cache.FIFO) func(pod *api.Pod, err error) { return func(pod *api.Pod, err error) { glog.Errorf("Error scheduling %v: %v; retrying", pod.Name, err) backoff.gc() diff --git a/plugin/pkg/scheduler/factory/factory_test.go b/plugin/pkg/scheduler/factory/factory_test.go index d1bb337d6ca..6c7d504c495 100644 --- a/plugin/pkg/scheduler/factory/factory_test.go +++ b/plugin/pkg/scheduler/factory/factory_test.go @@ -42,12 +42,12 @@ func TestCreate(t *testing.T) { server := httptest.NewServer(&handler) defer server.Close() client := client.NewOrDie(&client.Config{Host: server.URL, Version: testapi.Version()}) - factory := ConfigFactory{client} - factory.Create() + factory := NewConfigFactory(client) + factory.Create(nil, nil) } func TestCreateLists(t *testing.T) { - factory := ConfigFactory{nil} + factory := NewConfigFactory(nil) table := []struct { location string factory func() *listWatch @@ -85,7 +85,7 @@ func TestCreateLists(t *testing.T) { } func TestCreateWatches(t *testing.T) { - factory := ConfigFactory{nil} + factory := NewConfigFactory(nil) table := []struct { rv string location string @@ -136,6 +136,7 @@ func TestCreateWatches(t *testing.T) { server := httptest.NewServer(&handler) defer server.Close() factory.Client = client.NewOrDie(&client.Config{Host: server.URL, Version: testapi.Version()}) + // This test merely tests that the correct request is made. item.factory().Watch(item.rv) handler.ValidateRequest(t, item.location, "GET", nil) @@ -167,7 +168,7 @@ func TestPollMinions(t *testing.T) { server := httptest.NewServer(mux) defer server.Close() client := client.NewOrDie(&client.Config{Host: server.URL, Version: testapi.Version()}) - cf := ConfigFactory{client} + cf := NewConfigFactory(client) ce, err := cf.pollMinions() if err != nil { @@ -194,7 +195,7 @@ func TestDefaultErrorFunc(t *testing.T) { mux.Handle("/api/"+testapi.Version()+"/pods/foo", &handler) server := httptest.NewServer(mux) defer server.Close() - factory := ConfigFactory{client.NewOrDie(&client.Config{Host: server.URL, Version: testapi.Version()})} + factory := NewConfigFactory(client.NewOrDie(&client.Config{Host: server.URL, Version: testapi.Version()})) queue := cache.NewFIFO() podBackoff := podBackoff{ perPodBackoff: map[string]*backoffEntry{}, From 1eb28b0aa3d52f5bd0e0fbb12b8939ee69d66870 Mon Sep 17 00:00:00 2001 From: Abhishek Gupta Date: Thu, 20 Nov 2014 14:42:31 -0800 Subject: [PATCH 2/6] Scheduler changes to allow multiple priority functions --- pkg/scheduler/generic_scheduler.go | 37 +++++++++++++++----- pkg/scheduler/generic_scheduler_test.go | 32 ++++++++--------- pkg/scheduler/predicates.go | 4 --- pkg/scheduler/spreading.go | 28 ++++++++++++--- pkg/scheduler/spreading_test.go | 6 ++-- plugin/cmd/kube-scheduler/scheduler.go | 5 +-- plugin/pkg/scheduler/factory/factory.go | 9 +++-- plugin/pkg/scheduler/factory/factory_test.go | 12 +++---- 8 files changed, 83 insertions(+), 50 deletions(-) diff --git a/pkg/scheduler/generic_scheduler.go b/pkg/scheduler/generic_scheduler.go index 4661c68ff0a..1f119ce5caf 100644 --- a/pkg/scheduler/generic_scheduler.go +++ b/pkg/scheduler/generic_scheduler.go @@ -26,11 +26,11 @@ import ( ) type genericScheduler struct { - predicates []FitPredicate - prioritizer PriorityFunction - pods PodLister - random *rand.Rand - randomLock sync.Mutex + predicates []FitPredicate + prioritizers []PriorityFunction + pods PodLister + random *rand.Rand + randomLock sync.Mutex } func (g *genericScheduler) Schedule(pod api.Pod, minionLister MinionLister) (string, error) { @@ -47,7 +47,7 @@ func (g *genericScheduler) Schedule(pod api.Pod, minionLister MinionLister) (str return "", err } - priorityList, err := g.prioritizer(pod, g.pods, FakeMinionLister(filteredNodes)) + priorityList, err := prioritizeNodes(pod, g.pods, g.prioritizers, FakeMinionLister(filteredNodes)) if err != nil { return "", err } @@ -97,6 +97,27 @@ func findNodesThatFit(pod api.Pod, podLister PodLister, predicates []FitPredicat return api.MinionList{Items: filtered}, nil } +func prioritizeNodes(pod api.Pod, podLister PodLister, priorities []PriorityFunction, minionLister MinionLister) (HostPriorityList, error) { + result := HostPriorityList{} + combinedScores := map[string]int{} + for _, priority := range priorities { + prioritizedList, err := priority(pod, podLister, minionLister) + if err != nil { + return HostPriorityList{}, err + } + if len(priorities) == 1 { + return prioritizedList, nil + } + for _, hostEntry := range prioritizedList { + combinedScores[hostEntry.host] += hostEntry.score + } + } + for host, score := range combinedScores { + result = append(result, HostPriority{host: host, score: score}) + } + return result, nil +} + func getMinHosts(list HostPriorityList) []string { result := []string{} for _, hostEntry := range list { @@ -127,10 +148,10 @@ func EqualPriority(pod api.Pod, podLister PodLister, minionLister MinionLister) return result, nil } -func NewGenericScheduler(predicates []FitPredicate, prioritizer PriorityFunction, pods PodLister, random *rand.Rand) Scheduler { +func NewGenericScheduler(predicates []FitPredicate, prioritizers []PriorityFunction, pods PodLister, random *rand.Rand) Scheduler { return &genericScheduler{ predicates: predicates, - prioritizer: prioritizer, + prioritizers: prioritizers, pods: pods, random: random, } diff --git a/pkg/scheduler/generic_scheduler_test.go b/pkg/scheduler/generic_scheduler_test.go index c5cbbcdca33..a6819160247 100644 --- a/pkg/scheduler/generic_scheduler_test.go +++ b/pkg/scheduler/generic_scheduler_test.go @@ -138,57 +138,57 @@ func TestSelectHost(t *testing.T) { func TestGenericScheduler(t *testing.T) { tests := []struct { predicates []FitPredicate - prioritizer PriorityFunction + prioritizers []PriorityFunction nodes []string pod api.Pod expectedHost string expectsErr bool }{ { - predicates: []FitPredicate{falsePredicate}, - prioritizer: EqualPriority, - nodes: []string{"machine1", "machine2"}, - expectsErr: true, + predicates: []FitPredicate{falsePredicate}, + prioritizers: []PriorityFunction{EqualPriority}, + nodes: []string{"machine1", "machine2"}, + expectsErr: true, }, { - predicates: []FitPredicate{truePredicate}, - prioritizer: EqualPriority, - nodes: []string{"machine1", "machine2"}, + predicates: []FitPredicate{truePredicate}, + prioritizers: []PriorityFunction{EqualPriority}, + nodes: []string{"machine1", "machine2"}, // Random choice between both, the rand seeded above with zero, chooses "machine2" expectedHost: "machine2", }, { // Fits on a machine where the pod ID matches the machine name predicates: []FitPredicate{matchesPredicate}, - prioritizer: EqualPriority, + prioritizers: []PriorityFunction{EqualPriority}, nodes: []string{"machine1", "machine2"}, pod: api.Pod{ObjectMeta: api.ObjectMeta{Name: "machine2"}}, expectedHost: "machine2", }, { predicates: []FitPredicate{truePredicate}, - prioritizer: numericPriority, + prioritizers: []PriorityFunction{numericPriority}, nodes: []string{"3", "2", "1"}, expectedHost: "1", }, { predicates: []FitPredicate{matchesPredicate}, - prioritizer: numericPriority, + prioritizers: []PriorityFunction{numericPriority}, nodes: []string{"3", "2", "1"}, pod: api.Pod{ObjectMeta: api.ObjectMeta{Name: "2"}}, expectedHost: "2", }, { - predicates: []FitPredicate{truePredicate, falsePredicate}, - prioritizer: numericPriority, - nodes: []string{"3", "2", "1"}, - expectsErr: true, + predicates: []FitPredicate{truePredicate, falsePredicate}, + prioritizers: []PriorityFunction{numericPriority}, + nodes: []string{"3", "2", "1"}, + expectsErr: true, }, } for _, test := range tests { random := rand.New(rand.NewSource(0)) - scheduler := NewGenericScheduler(test.predicates, test.prioritizer, FakePodLister([]api.Pod{}), random) + scheduler := NewGenericScheduler(test.predicates, test.prioritizers, FakePodLister([]api.Pod{}), random) machine, err := scheduler.Schedule(test.pod, FakeMinionLister(makeMinionList(test.nodes))) if test.expectsErr { if err == nil { diff --git a/pkg/scheduler/predicates.go b/pkg/scheduler/predicates.go index f8566184555..782ca86399d 100644 --- a/pkg/scheduler/predicates.go +++ b/pkg/scheduler/predicates.go @@ -74,7 +74,6 @@ func isVolumeConflict(volume api.Volume, pod *api.Pod) bool { // TODO: migrate this into some per-volume specific code? func NoDiskConflict(pod api.Pod, existingPods []api.Pod, node string) (bool, error) { manifest := &(pod.Spec) - glog.Errorf("custom predicate NoDiskConflict --> node: %s", node) for ix := range manifest.Volumes { for podIx := range existingPods { if isVolumeConflict(manifest.Volumes[ix], &existingPods[podIx]) { @@ -105,7 +104,6 @@ func getResourceRequest(pod *api.Pod) resourceRequest { // PodFitsResources calculates fit based on requested, rather than used resources func (r *ResourceFit) PodFitsResources(pod api.Pod, existingPods []api.Pod, node string) (bool, error) { - glog.Errorf("custom predicate PodFitsResources --> node: %s", node) podRequest := getResourceRequest(&pod) if podRequest.milliCPU == 0 && podRequest.memory == 0 { // no resources requested always fits. @@ -154,7 +152,6 @@ type NodeSelector struct { func (n *NodeSelector) PodSelectorMatches(pod api.Pod, existingPods []api.Pod, node string) (bool, error) { if len(pod.Spec.NodeSelector) == 0 { - glog.Errorf("custom predicate PodSelectorMatches --> node: %s", node) return true, nil } selector := labels.SelectorFromSet(pod.Spec.NodeSelector) @@ -166,7 +163,6 @@ func (n *NodeSelector) PodSelectorMatches(pod api.Pod, existingPods []api.Pod, n } func PodFitsPorts(pod api.Pod, existingPods []api.Pod, node string) (bool, error) { - glog.Errorf("custom predicate PodFitsPorts --> node: %s", node) existingPorts := getUsedPorts(existingPods...) wantPorts := getUsedPorts(pod) for wport := range wantPorts { diff --git a/pkg/scheduler/spreading.go b/pkg/scheduler/spreading.go index 751d7f45195..cda90d460b5 100644 --- a/pkg/scheduler/spreading.go +++ b/pkg/scheduler/spreading.go @@ -18,6 +18,7 @@ package scheduler import ( "math/rand" + "sort" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" @@ -37,18 +38,37 @@ func CalculateSpreadPriority(pod api.Pod, podLister PodLister, minionLister Mini return nil, err } + var maxCount int + var fScore float32 counts := map[string]int{} - for _, pod := range pods { - counts[pod.Status.Host]++ + if len(pods) > 0 { + for _, pod := range pods { + counts[pod.Status.Host]++ + } + + // doing this separately since the pod count can be much higher + // than the filtered minion count + values := make([]int, len(counts)) + idx := 0 + for _, count := range counts { + values[idx] = count + idx++ + } + sort.Sort(sort.IntSlice(values)) + maxCount = values[len(values)-1] } result := []HostPriority{} + //score int for _, minion := range minions.Items { - result = append(result, HostPriority{host: minion.Name, score: counts[minion.Name]}) + if maxCount > 0 { + fScore = 100 * ( float32(counts[minion.Name]) / float32(maxCount) ) + } + result = append(result, HostPriority{host: minion.Name, score: int(fScore)}) } return result, nil } func NewSpreadingScheduler(podLister PodLister, minionLister MinionLister, predicates []FitPredicate, random *rand.Rand) Scheduler { - return NewGenericScheduler(predicates, CalculateSpreadPriority, podLister, random) + return NewGenericScheduler(predicates, []PriorityFunction{CalculateSpreadPriority}, podLister, random) } diff --git a/pkg/scheduler/spreading_test.go b/pkg/scheduler/spreading_test.go index 4301fd40f7b..2d71e4a9d81 100644 --- a/pkg/scheduler/spreading_test.go +++ b/pkg/scheduler/spreading_test.go @@ -71,7 +71,7 @@ func TestSpreadPriority(t *testing.T) { {Status: machine2Status, ObjectMeta: api.ObjectMeta{Labels: labels1}}, }, nodes: []string{"machine1", "machine2"}, - expectedList: []HostPriority{{"machine1", 0}, {"machine2", 1}}, + expectedList: []HostPriority{{"machine1", 0}, {"machine2", 100}}, test: "one label match", }, { @@ -82,7 +82,7 @@ func TestSpreadPriority(t *testing.T) { {Status: machine2Status, ObjectMeta: api.ObjectMeta{Labels: labels1}}, }, nodes: []string{"machine1", "machine2"}, - expectedList: []HostPriority{{"machine1", 1}, {"machine2", 1}}, + expectedList: []HostPriority{{"machine1", 100}, {"machine2", 100}}, test: "two label matches on different machines", }, { @@ -94,7 +94,7 @@ func TestSpreadPriority(t *testing.T) { {Status: machine2Status, ObjectMeta: api.ObjectMeta{Labels: labels1}}, }, nodes: []string{"machine1", "machine2"}, - expectedList: []HostPriority{{"machine1", 1}, {"machine2", 2}}, + expectedList: []HostPriority{{"machine1", 50}, {"machine2", 100}}, test: "three label matches", }, } diff --git a/plugin/cmd/kube-scheduler/scheduler.go b/plugin/cmd/kube-scheduler/scheduler.go index 340ffde9f80..75af1276ea4 100644 --- a/plugin/cmd/kube-scheduler/scheduler.go +++ b/plugin/cmd/kube-scheduler/scheduler.go @@ -61,10 +61,7 @@ func main() { go http.ListenAndServe(net.JoinHostPort(address.String(), strconv.Itoa(*port)), nil) configFactory := factory.NewConfigFactory(kubeClient) - configFactory.AddPredicate("CreateOnMinion1", scheduler.CreateOnMinion1) - configFactory.AddPredicate("CreateOnMinion2", scheduler.CreateOnMinion2) - - config, err := configFactory.Create([]string{"CreateOnMinion2"}, nil) + config, err := configFactory.Create(nil, nil) if err != nil { glog.Fatalf("Failed to create scheduler configuration: %v", err) } diff --git a/plugin/pkg/scheduler/factory/factory.go b/plugin/pkg/scheduler/factory/factory.go index db6dd2278c7..086002f95b3 100644 --- a/plugin/pkg/scheduler/factory/factory.go +++ b/plugin/pkg/scheduler/factory/factory.go @@ -112,7 +112,7 @@ func (factory *configFactory) Create(predicateKeys, priorityKeys []string) (*sch r := rand.New(rand.NewSource(time.Now().UnixNano())) - algo := algorithm.NewGenericScheduler(predicateFuncs, priorityFuncs[0], factory.PodLister, r) + algo := algorithm.NewGenericScheduler(predicateFuncs, priorityFuncs, factory.PodLister, r) podBackoff := podBackoff{ perPodBackoff: map[string]*backoffEntry{}, @@ -126,7 +126,6 @@ func (factory *configFactory) Create(predicateKeys, priorityKeys []string) (*sch NextPod: func() *api.Pod { pod := factory.PodQueue.Pop().(*api.Pod) glog.V(2).Infof("glog.v2 --> About to try and schedule pod %v", pod.Name) - glog.Errorf("glog.error --> About to try and schedule pod %v", pod.Name) return pod }, Error: factory.makeDefaultErrorFunc(&podBackoff, factory.PodQueue), @@ -137,7 +136,6 @@ func (factory *configFactory) getPredicateFunctions(keys []string) ([]algorithm. var function algorithm.FitPredicate predicates := []algorithm.FitPredicate{} for _, key := range keys { - glog.Errorf("Adding predicate function for key: %s", key) function = factory.PredicateMap[key] if function == nil { return nil, fmt.Errorf("Invalid predicate key %s specified - no corresponding function found", key) @@ -174,6 +172,7 @@ func (factory *configFactory) AddPredicate(key string, function algorithm.FitPre func (factory *configFactory) addDefaultPriorities() { factory.AddPriority("LeastRequestedPriority", algorithm.LeastRequestedPriority) factory.AddPriority("SpreadingPriority", algorithm.CalculateSpreadPriority) + factory.AddPriority("EqualPriority", algorithm.EqualPriority) } func (factory *configFactory) AddPriority(key string, function algorithm.PriorityFunction) { @@ -210,7 +209,7 @@ func (lw *listWatch) Watch(resourceVersion string) (watch.Interface, error) { func (factory *configFactory) createUnassignedPodLW() *listWatch { return &listWatch{ client: factory.Client, - fieldSelector: labels.Set{"DesiredState.Host": ""}.AsSelector(), + fieldSelector: labels.Set{"Status.Host": ""}.AsSelector(), resource: "pods", } } @@ -228,7 +227,7 @@ func parseSelectorOrDie(s string) labels.Selector { func (factory *configFactory) createAssignedPodLW() *listWatch { return &listWatch{ client: factory.Client, - fieldSelector: parseSelectorOrDie("DesiredState.Host!="), + fieldSelector: parseSelectorOrDie("Status.Host!="), resource: "pods", } } diff --git a/plugin/pkg/scheduler/factory/factory_test.go b/plugin/pkg/scheduler/factory/factory_test.go index 6c7d504c495..7d4e2db3de6 100644 --- a/plugin/pkg/scheduler/factory/factory_test.go +++ b/plugin/pkg/scheduler/factory/factory_test.go @@ -59,12 +59,12 @@ func TestCreateLists(t *testing.T) { }, // Assigned pod { - location: "/api/" + testapi.Version() + "/pods?fields=DesiredState.Host!%3D", + location: "/api/" + testapi.Version() + "/pods?fields=Status.Host!%3D", factory: factory.createAssignedPodLW, }, // Unassigned pod { - location: "/api/" + testapi.Version() + "/pods?fields=DesiredState.Host%3D", + location: "/api/" + testapi.Version() + "/pods?fields=Status.Host%3D", factory: factory.createUnassignedPodLW, }, } @@ -108,21 +108,21 @@ func TestCreateWatches(t *testing.T) { // Assigned pod watches { rv: "", - location: "/api/" + testapi.Version() + "/watch/pods?fields=DesiredState.Host!%3D&resourceVersion=", + location: "/api/" + testapi.Version() + "/watch/pods?fields=Status.Host!%3D&resourceVersion=", factory: factory.createAssignedPodLW, }, { rv: "42", - location: "/api/" + testapi.Version() + "/watch/pods?fields=DesiredState.Host!%3D&resourceVersion=42", + location: "/api/" + testapi.Version() + "/watch/pods?fields=Status.Host!%3D&resourceVersion=42", factory: factory.createAssignedPodLW, }, // Unassigned pod watches { rv: "", - location: "/api/" + testapi.Version() + "/watch/pods?fields=DesiredState.Host%3D&resourceVersion=", + location: "/api/" + testapi.Version() + "/watch/pods?fields=Status.Host%3D&resourceVersion=", factory: factory.createUnassignedPodLW, }, { rv: "42", - location: "/api/" + testapi.Version() + "/watch/pods?fields=DesiredState.Host%3D&resourceVersion=42", + location: "/api/" + testapi.Version() + "/watch/pods?fields=Status.Host%3D&resourceVersion=42", factory: factory.createUnassignedPodLW, }, } From 7f374030c1550014cc7f8687716c6e755e22fe7f Mon Sep 17 00:00:00 2001 From: Abhishek Gupta Date: Thu, 20 Nov 2014 15:06:56 -0800 Subject: [PATCH 3/6] Fixing gofmt errors --- pkg/scheduler/generic_scheduler.go | 6 +++--- pkg/scheduler/spreading.go | 4 ++-- plugin/pkg/scheduler/factory/factory.go | 6 +++--- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/pkg/scheduler/generic_scheduler.go b/pkg/scheduler/generic_scheduler.go index 1f119ce5caf..0d5673925d4 100644 --- a/pkg/scheduler/generic_scheduler.go +++ b/pkg/scheduler/generic_scheduler.go @@ -150,9 +150,9 @@ func EqualPriority(pod api.Pod, podLister PodLister, minionLister MinionLister) func NewGenericScheduler(predicates []FitPredicate, prioritizers []PriorityFunction, pods PodLister, random *rand.Rand) Scheduler { return &genericScheduler{ - predicates: predicates, + predicates: predicates, prioritizers: prioritizers, - pods: pods, - random: random, + pods: pods, + random: random, } } diff --git a/pkg/scheduler/spreading.go b/pkg/scheduler/spreading.go index cda90d460b5..7bc69240ee3 100644 --- a/pkg/scheduler/spreading.go +++ b/pkg/scheduler/spreading.go @@ -50,7 +50,7 @@ func CalculateSpreadPriority(pod api.Pod, podLister PodLister, minionLister Mini // than the filtered minion count values := make([]int, len(counts)) idx := 0 - for _, count := range counts { + for _, count := range counts { values[idx] = count idx++ } @@ -62,7 +62,7 @@ func CalculateSpreadPriority(pod api.Pod, podLister PodLister, minionLister Mini //score int for _, minion := range minions.Items { if maxCount > 0 { - fScore = 100 * ( float32(counts[minion.Name]) / float32(maxCount) ) + fScore = 100 * (float32(counts[minion.Name]) / float32(maxCount)) } result = append(result, HostPriority{host: minion.Name, score: int(fScore)}) } diff --git a/plugin/pkg/scheduler/factory/factory.go b/plugin/pkg/scheduler/factory/factory.go index 086002f95b3..ba6438067b9 100644 --- a/plugin/pkg/scheduler/factory/factory.go +++ b/plugin/pkg/scheduler/factory/factory.go @@ -58,11 +58,11 @@ type configFactory struct { func NewConfigFactory(client *client.Client) *configFactory { // initialize the factory struct factory := &configFactory{Client: client, - PodQueue: cache.NewFIFO(), - PodLister: &storeToPodLister{cache.NewStore()}, + PodQueue: cache.NewFIFO(), + PodLister: &storeToPodLister{cache.NewStore()}, MinionLister: &storeToMinionLister{cache.NewStore()}, PredicateMap: make(map[string]algorithm.FitPredicate), - PriorityMap: make(map[string]algorithm.PriorityFunction), + PriorityMap: make(map[string]algorithm.PriorityFunction), } // add default predicates From 13831856c90cb7eabab1a3efbcf7fb85b90ac2c6 Mon Sep 17 00:00:00 2001 From: Abhishek Gupta Date: Tue, 25 Nov 2014 18:10:25 -0800 Subject: [PATCH 4/6] Combining scheduler priority functions using weighted averages --- pkg/scheduler/generic_scheduler.go | 28 +++---- pkg/scheduler/generic_scheduler_test.go | 77 +++++++++++++++----- pkg/scheduler/priorities.go | 14 ++-- pkg/scheduler/priorities_test.go | 8 +- pkg/scheduler/spreading.go | 9 ++- pkg/scheduler/spreading_test.go | 12 +-- pkg/scheduler/types.go | 5 ++ plugin/pkg/scheduler/custom_predicates.go | 44 ----------- plugin/pkg/scheduler/factory/factory.go | 54 +++++++------- plugin/pkg/scheduler/factory/factory_test.go | 12 +-- 10 files changed, 136 insertions(+), 127 deletions(-) delete mode 100644 plugin/pkg/scheduler/custom_predicates.go diff --git a/pkg/scheduler/generic_scheduler.go b/pkg/scheduler/generic_scheduler.go index 0d5673925d4..d9d7af2558a 100644 --- a/pkg/scheduler/generic_scheduler.go +++ b/pkg/scheduler/generic_scheduler.go @@ -27,7 +27,7 @@ import ( type genericScheduler struct { predicates []FitPredicate - prioritizers []PriorityFunction + prioritizers []PriorityConfig pods PodLister random *rand.Rand randomLock sync.Mutex @@ -62,7 +62,7 @@ func (g *genericScheduler) selectHost(priorityList HostPriorityList) (string, er if len(priorityList) == 0 { return "", fmt.Errorf("empty priorityList") } - sort.Sort(priorityList) + sort.Sort(sort.Reverse(priorityList)) hosts := getMinHosts(priorityList) g.randomLock.Lock() @@ -97,19 +97,21 @@ func findNodesThatFit(pod api.Pod, podLister PodLister, predicates []FitPredicat return api.MinionList{Items: filtered}, nil } -func prioritizeNodes(pod api.Pod, podLister PodLister, priorities []PriorityFunction, minionLister MinionLister) (HostPriorityList, error) { +func prioritizeNodes(pod api.Pod, podLister PodLister, priorities []PriorityConfig, minionLister MinionLister) (HostPriorityList, error) { result := HostPriorityList{} combinedScores := map[string]int{} for _, priority := range priorities { - prioritizedList, err := priority(pod, podLister, minionLister) - if err != nil { - return HostPriorityList{}, err - } - if len(priorities) == 1 { - return prioritizedList, nil - } - for _, hostEntry := range prioritizedList { - combinedScores[hostEntry.host] += hostEntry.score + weight := priority.Weight + // skip the priority function if the weight is specified as 0 + if weight > 0 { + priorityFunc := priority.Function + prioritizedList, err := priorityFunc(pod, podLister, minionLister) + if err != nil { + return HostPriorityList{}, err + } + for _, hostEntry := range prioritizedList { + combinedScores[hostEntry.host] += hostEntry.score * weight + } } } for host, score := range combinedScores { @@ -148,7 +150,7 @@ func EqualPriority(pod api.Pod, podLister PodLister, minionLister MinionLister) return result, nil } -func NewGenericScheduler(predicates []FitPredicate, prioritizers []PriorityFunction, pods PodLister, random *rand.Rand) Scheduler { +func NewGenericScheduler(predicates []FitPredicate, prioritizers []PriorityConfig, pods PodLister, random *rand.Rand) Scheduler { return &genericScheduler{ predicates: predicates, prioritizers: prioritizers, diff --git a/pkg/scheduler/generic_scheduler_test.go b/pkg/scheduler/generic_scheduler_test.go index a6819160247..977c28c576b 100644 --- a/pkg/scheduler/generic_scheduler_test.go +++ b/pkg/scheduler/generic_scheduler_test.go @@ -18,6 +18,7 @@ package scheduler import ( "fmt" + "math" "math/rand" "strconv" "testing" @@ -59,6 +60,29 @@ func numericPriority(pod api.Pod, podLister PodLister, minionLister MinionLister return result, nil } +func reverseNumericPriority(pod api.Pod, podLister PodLister, minionLister MinionLister) (HostPriorityList, error) { + var maxScore float64 + minScore := math.MaxFloat64 + reverseResult := []HostPriority{} + result, err := numericPriority(pod, podLister, minionLister) + if err != nil { + return nil, err + } + + for _, hostPriority := range result { + maxScore = math.Max(maxScore, float64(hostPriority.score)) + minScore = math.Min(minScore, float64(hostPriority.score)) + } + for _, hostPriority := range result { + reverseResult = append(reverseResult, HostPriority{ + host: hostPriority.host, + score: int(maxScore + minScore - float64(hostPriority.score)), + }) + } + + return reverseResult, nil +} + func makeMinionList(nodeNames []string) api.MinionList { result := api.MinionList{ Items: make([]api.Minion, len(nodeNames)), @@ -81,28 +105,28 @@ func TestSelectHost(t *testing.T) { {host: "machine1.1", score: 1}, {host: "machine2.1", score: 2}, }, - possibleHosts: util.NewStringSet("machine1.1"), + possibleHosts: util.NewStringSet("machine2.1"), expectsErr: false, }, // equal scores { list: []HostPriority{ {host: "machine1.1", score: 1}, - {host: "machine1.2", score: 1}, - {host: "machine1.3", score: 1}, + {host: "machine1.2", score: 2}, + {host: "machine1.3", score: 2}, {host: "machine2.1", score: 2}, }, - possibleHosts: util.NewStringSet("machine1.1", "machine1.2", "machine1.3"), + possibleHosts: util.NewStringSet("machine1.2", "machine1.3", "machine2.1"), expectsErr: false, }, // out of order scores { list: []HostPriority{ - {host: "machine1.1", score: 1}, - {host: "machine1.2", score: 1}, + {host: "machine1.1", score: 3}, + {host: "machine1.2", score: 3}, {host: "machine2.1", score: 2}, - {host: "machine3.1", score: 3}, - {host: "machine1.3", score: 1}, + {host: "machine3.1", score: 1}, + {host: "machine1.3", score: 3}, }, possibleHosts: util.NewStringSet("machine1.1", "machine1.2", "machine1.3"), expectsErr: false, @@ -137,8 +161,9 @@ func TestSelectHost(t *testing.T) { func TestGenericScheduler(t *testing.T) { tests := []struct { + name string predicates []FitPredicate - prioritizers []PriorityFunction + prioritizers []PriorityConfig nodes []string pod api.Pod expectedHost string @@ -146,43 +171,57 @@ func TestGenericScheduler(t *testing.T) { }{ { predicates: []FitPredicate{falsePredicate}, - prioritizers: []PriorityFunction{EqualPriority}, + prioritizers: []PriorityConfig{{Function: EqualPriority, Weight: 1}}, nodes: []string{"machine1", "machine2"}, expectsErr: true, + name: "test 1", }, { predicates: []FitPredicate{truePredicate}, - prioritizers: []PriorityFunction{EqualPriority}, + prioritizers: []PriorityConfig{{Function: EqualPriority, Weight: 1}}, nodes: []string{"machine1", "machine2"}, - // Random choice between both, the rand seeded above with zero, chooses "machine2" - expectedHost: "machine2", + // Random choice between both, the rand seeded above with zero, chooses "machine1" + expectedHost: "machine1", + name: "test 2", }, { // Fits on a machine where the pod ID matches the machine name predicates: []FitPredicate{matchesPredicate}, - prioritizers: []PriorityFunction{EqualPriority}, + prioritizers: []PriorityConfig{{Function: EqualPriority, Weight: 1}}, nodes: []string{"machine1", "machine2"}, pod: api.Pod{ObjectMeta: api.ObjectMeta{Name: "machine2"}}, expectedHost: "machine2", + name: "test 3", }, { predicates: []FitPredicate{truePredicate}, - prioritizers: []PriorityFunction{numericPriority}, + prioritizers: []PriorityConfig{{Function: numericPriority, Weight: 1}}, nodes: []string{"3", "2", "1"}, - expectedHost: "1", + expectedHost: "3", + name: "test 4", }, { predicates: []FitPredicate{matchesPredicate}, - prioritizers: []PriorityFunction{numericPriority}, + prioritizers: []PriorityConfig{{Function: numericPriority, Weight: 1}}, nodes: []string{"3", "2", "1"}, pod: api.Pod{ObjectMeta: api.ObjectMeta{Name: "2"}}, expectedHost: "2", + name: "test 5", + }, + { + predicates: []FitPredicate{truePredicate}, + prioritizers: []PriorityConfig{{Function: numericPriority, Weight: 1}, {Function: reverseNumericPriority, Weight: 2}}, + nodes: []string{"3", "2", "1"}, + pod: api.Pod{ObjectMeta: api.ObjectMeta{Name: "2"}}, + expectedHost: "1", + name: "test 6", }, { predicates: []FitPredicate{truePredicate, falsePredicate}, - prioritizers: []PriorityFunction{numericPriority}, + prioritizers: []PriorityConfig{{Function: numericPriority, Weight: 1}}, nodes: []string{"3", "2", "1"}, expectsErr: true, + name: "test 7", }, } @@ -199,7 +238,7 @@ func TestGenericScheduler(t *testing.T) { t.Errorf("Unexpected error: %v", err) } if test.expectedHost != machine { - t.Errorf("Expected: %s, Saw: %s", test.expectedHost, machine) + t.Errorf("Failed : %s, Expected: %s, Saw: %s", test.name, test.expectedHost, machine) } } } diff --git a/pkg/scheduler/priorities.go b/pkg/scheduler/priorities.go index baa65c96b22..25b41aa6a15 100644 --- a/pkg/scheduler/priorities.go +++ b/pkg/scheduler/priorities.go @@ -22,11 +22,13 @@ import ( "github.com/golang/glog" ) -func calculatePercentage(requested, capacity int) int { +// the unused capacity is calculated on a scale of 0-10 +// 0 being the lowest priority and 10 being the highest +func calculateScore(requested, capacity int) int { if capacity == 0 { return 0 } - return (requested * 100) / capacity + return ((capacity - requested) * 10) / capacity } // Calculate the occupancy on a node. 'node' has information about the resources on the node. @@ -41,13 +43,13 @@ func calculateOccupancy(node api.Minion, pods []api.Pod) HostPriority { } } - percentageCPU := calculatePercentage(totalCPU, resources.GetIntegerResource(node.Spec.Capacity, resources.CPU, 0)) - percentageMemory := calculatePercentage(totalMemory, resources.GetIntegerResource(node.Spec.Capacity, resources.Memory, 0)) - glog.V(4).Infof("Least Requested Priority, AbsoluteRequested: (%d, %d) Percentage:(%d\\%m, %d\\%)", totalCPU, totalMemory, percentageCPU, percentageMemory) + cpuScore := calculateScore(totalCPU, resources.GetIntegerResource(node.Spec.Capacity, resources.CPU, 0)) + memoryScore := calculateScore(totalMemory, resources.GetIntegerResource(node.Spec.Capacity, resources.Memory, 0)) + glog.V(4).Infof("Least Requested Priority, AbsoluteRequested: (%d, %d) Score:(%d, %d)", totalCPU, totalMemory, cpuScore, memoryScore) return HostPriority{ host: node.Name, - score: int((percentageCPU + percentageMemory) / 2), + score: int((cpuScore + memoryScore) / 2), } } diff --git a/pkg/scheduler/priorities_test.go b/pkg/scheduler/priorities_test.go index 2cde2ff5d52..976da176445 100644 --- a/pkg/scheduler/priorities_test.go +++ b/pkg/scheduler/priorities_test.go @@ -74,12 +74,12 @@ func TestLeastRequested(t *testing.T) { }{ { nodes: []api.Minion{makeMinion("machine1", 4000, 10000), makeMinion("machine2", 4000, 10000)}, - expectedList: []HostPriority{{"machine1", 0}, {"machine2", 0}}, + expectedList: []HostPriority{{"machine1", 10}, {"machine2", 10}}, test: "nothing scheduled", }, { nodes: []api.Minion{makeMinion("machine1", 4000, 10000), makeMinion("machine2", 4000, 10000)}, - expectedList: []HostPriority{{"machine1", 0}, {"machine2", 0}}, + expectedList: []HostPriority{{"machine1", 10}, {"machine2", 10}}, test: "no resources requested", pods: []api.Pod{ {Status: machine1Status, ObjectMeta: api.ObjectMeta{Labels: labels2}}, @@ -90,8 +90,8 @@ func TestLeastRequested(t *testing.T) { }, { nodes: []api.Minion{makeMinion("machine1", 4000, 10000), makeMinion("machine2", 4000, 10000)}, - expectedList: []HostPriority{{"machine1", 37 /* int(75% / 2) */}, {"machine2", 62 /* int( 75% + 50% / 2) */}}, - test: "no resources requested", + expectedList: []HostPriority{{"machine1", 6 /* int(200%-75% / 2) */}, {"machine2", 3 /* int( 200%-125% / 2) */}}, + test: "resources requested", pods: []api.Pod{ {Spec: cpuOnly, Status: api.PodStatus{Host: "machine1"}}, {Spec: cpuAndMemory, Status: api.PodStatus{Host: "machine2"}}, diff --git a/pkg/scheduler/spreading.go b/pkg/scheduler/spreading.go index 7bc69240ee3..314a1d47f13 100644 --- a/pkg/scheduler/spreading.go +++ b/pkg/scheduler/spreading.go @@ -39,7 +39,7 @@ func CalculateSpreadPriority(pod api.Pod, podLister PodLister, minionLister Mini } var maxCount int - var fScore float32 + var fScore float32 = 10.0 counts := map[string]int{} if len(pods) > 0 { for _, pod := range pods { @@ -59,10 +59,11 @@ func CalculateSpreadPriority(pod api.Pod, podLister PodLister, minionLister Mini } result := []HostPriority{} - //score int + //score int - scale of 0-10 + // 0 being the lowest priority and 10 being the highest for _, minion := range minions.Items { if maxCount > 0 { - fScore = 100 * (float32(counts[minion.Name]) / float32(maxCount)) + fScore = 10 * (float32(maxCount-counts[minion.Name]) / float32(maxCount)) } result = append(result, HostPriority{host: minion.Name, score: int(fScore)}) } @@ -70,5 +71,5 @@ func CalculateSpreadPriority(pod api.Pod, podLister PodLister, minionLister Mini } func NewSpreadingScheduler(podLister PodLister, minionLister MinionLister, predicates []FitPredicate, random *rand.Rand) Scheduler { - return NewGenericScheduler(predicates, []PriorityFunction{CalculateSpreadPriority}, podLister, random) + return NewGenericScheduler(predicates, []PriorityConfig{{Function: CalculateSpreadPriority, Weight: 1}}, podLister, random) } diff --git a/pkg/scheduler/spreading_test.go b/pkg/scheduler/spreading_test.go index 2d71e4a9d81..0e9c9093464 100644 --- a/pkg/scheduler/spreading_test.go +++ b/pkg/scheduler/spreading_test.go @@ -47,21 +47,21 @@ func TestSpreadPriority(t *testing.T) { }{ { nodes: []string{"machine1", "machine2"}, - expectedList: []HostPriority{{"machine1", 0}, {"machine2", 0}}, + expectedList: []HostPriority{{"machine1", 10}, {"machine2", 10}}, test: "nothing scheduled", }, { pod: api.Pod{ObjectMeta: api.ObjectMeta{Labels: labels1}}, pods: []api.Pod{{Status: machine1Status}}, nodes: []string{"machine1", "machine2"}, - expectedList: []HostPriority{{"machine1", 0}, {"machine2", 0}}, + expectedList: []HostPriority{{"machine1", 10}, {"machine2", 10}}, test: "no labels", }, { pod: api.Pod{ObjectMeta: api.ObjectMeta{Labels: labels1}}, pods: []api.Pod{{Status: machine1Status, ObjectMeta: api.ObjectMeta{Labels: labels2}}}, nodes: []string{"machine1", "machine2"}, - expectedList: []HostPriority{{"machine1", 0}, {"machine2", 0}}, + expectedList: []HostPriority{{"machine1", 10}, {"machine2", 10}}, test: "different labels", }, { @@ -71,7 +71,7 @@ func TestSpreadPriority(t *testing.T) { {Status: machine2Status, ObjectMeta: api.ObjectMeta{Labels: labels1}}, }, nodes: []string{"machine1", "machine2"}, - expectedList: []HostPriority{{"machine1", 0}, {"machine2", 100}}, + expectedList: []HostPriority{{"machine1", 10}, {"machine2", 0}}, test: "one label match", }, { @@ -82,7 +82,7 @@ func TestSpreadPriority(t *testing.T) { {Status: machine2Status, ObjectMeta: api.ObjectMeta{Labels: labels1}}, }, nodes: []string{"machine1", "machine2"}, - expectedList: []HostPriority{{"machine1", 100}, {"machine2", 100}}, + expectedList: []HostPriority{{"machine1", 0}, {"machine2", 0}}, test: "two label matches on different machines", }, { @@ -94,7 +94,7 @@ func TestSpreadPriority(t *testing.T) { {Status: machine2Status, ObjectMeta: api.ObjectMeta{Labels: labels1}}, }, nodes: []string{"machine1", "machine2"}, - expectedList: []HostPriority{{"machine1", 50}, {"machine2", 100}}, + expectedList: []HostPriority{{"machine1", 5}, {"machine2", 0}}, test: "three label matches", }, } diff --git a/pkg/scheduler/types.go b/pkg/scheduler/types.go index cc899c03c97..471529b491d 100644 --- a/pkg/scheduler/types.go +++ b/pkg/scheduler/types.go @@ -47,3 +47,8 @@ func (h HostPriorityList) Swap(i, j int) { } type PriorityFunction func(pod api.Pod, podLister PodLister, minionLister MinionLister) (HostPriorityList, error) + +type PriorityConfig struct { + Function PriorityFunction + Weight int +} diff --git a/plugin/pkg/scheduler/custom_predicates.go b/plugin/pkg/scheduler/custom_predicates.go deleted file mode 100644 index bdee3950871..00000000000 --- a/plugin/pkg/scheduler/custom_predicates.go +++ /dev/null @@ -1,44 +0,0 @@ -/* -Copyright 2014 Google Inc. All rights reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package scheduler - -import ( - "github.com/GoogleCloudPlatform/kubernetes/pkg/api" - "github.com/golang/glog" -) - -func CreateOnMinion1(pod api.Pod, existingPods []api.Pod, node string) (bool, error) { - glog.V(2).Infof("custom predicate minion1 --> node: %s", node) - if node == "10.245.2.2" { - glog.V(2).Infof("custom predicate minion1 matched") - return true, nil - } else { - glog.V(2).Infof("custom predicate minion1 did not match") - return false, nil - } -} - -func CreateOnMinion2(pod api.Pod, existingPods []api.Pod, node string) (bool, error) { - glog.V(2).Infof("custom predicate minion2 --> node: %s", node) - if node == "10.245.2.3" { - glog.V(2).Infof("custom predicate minion2 matched") - return true, nil - } else { - glog.V(2).Infof("custom predicate minion2 did not match") - return false, nil - } -} diff --git a/plugin/pkg/scheduler/factory/factory.go b/plugin/pkg/scheduler/factory/factory.go index ba6438067b9..d648704ea40 100644 --- a/plugin/pkg/scheduler/factory/factory.go +++ b/plugin/pkg/scheduler/factory/factory.go @@ -49,9 +49,9 @@ type configFactory struct { // map of strings to predicate functions to be used // to filter the minions for scheduling pods PredicateMap map[string]algorithm.FitPredicate - // map of strings to priority functions to be used + // map of strings to priority config to be used // to prioritize the filtered minions for scheduling pods - PriorityMap map[string]algorithm.PriorityFunction + PriorityMap map[string]algorithm.PriorityConfig } // NewConfigFactory initializes the factory. @@ -62,13 +62,10 @@ func NewConfigFactory(client *client.Client) *configFactory { PodLister: &storeToPodLister{cache.NewStore()}, MinionLister: &storeToMinionLister{cache.NewStore()}, PredicateMap: make(map[string]algorithm.FitPredicate), - PriorityMap: make(map[string]algorithm.PriorityFunction), + PriorityMap: make(map[string]algorithm.PriorityConfig), } - // add default predicates factory.addDefaultPredicates() - - // add default predicates factory.addDefaultPriorities() return factory @@ -89,7 +86,7 @@ func (factory *configFactory) Create(predicateKeys, priorityKeys []string) (*sch glog.V(2).Infof("Custom priority list not provided, using default priorities") priorityKeys = []string{"LeastRequestedPriority"} } - priorityFuncs, err := factory.getPriorityFunctions(priorityKeys) + priorityConfigs, err := factory.getPriorityConfigs(priorityKeys) if err != nil { return nil, err } @@ -112,7 +109,7 @@ func (factory *configFactory) Create(predicateKeys, priorityKeys []string) (*sch r := rand.New(rand.NewSource(time.Now().UnixNano())) - algo := algorithm.NewGenericScheduler(predicateFuncs, priorityFuncs, factory.PodLister, r) + algo := algorithm.NewGenericScheduler(predicateFuncs, priorityConfigs, factory.PodLister, r) podBackoff := podBackoff{ perPodBackoff: map[string]*backoffEntry{}, @@ -133,11 +130,10 @@ func (factory *configFactory) Create(predicateKeys, priorityKeys []string) (*sch } func (factory *configFactory) getPredicateFunctions(keys []string) ([]algorithm.FitPredicate, error) { - var function algorithm.FitPredicate predicates := []algorithm.FitPredicate{} for _, key := range keys { - function = factory.PredicateMap[key] - if function == nil { + function, ok := factory.PredicateMap[key] + if !ok { return nil, fmt.Errorf("Invalid predicate key %s specified - no corresponding function found", key) } predicates = append(predicates, function) @@ -145,17 +141,16 @@ func (factory *configFactory) getPredicateFunctions(keys []string) ([]algorithm. return predicates, nil } -func (factory *configFactory) getPriorityFunctions(keys []string) ([]algorithm.PriorityFunction, error) { - var function algorithm.PriorityFunction - priorities := []algorithm.PriorityFunction{} +func (factory *configFactory) getPriorityConfigs(keys []string) ([]algorithm.PriorityConfig, error) { + configs := []algorithm.PriorityConfig{} for _, key := range keys { - function = factory.PriorityMap[key] - if function == nil { + config, ok := factory.PriorityMap[key] + if !ok { return nil, fmt.Errorf("Invalid priority key %s specified - no corresponding function found", key) } - priorities = append(priorities, function) + configs = append(configs, config) } - return priorities, nil + return configs, nil } func (factory *configFactory) addDefaultPredicates() { @@ -170,13 +165,22 @@ func (factory *configFactory) AddPredicate(key string, function algorithm.FitPre } func (factory *configFactory) addDefaultPriorities() { - factory.AddPriority("LeastRequestedPriority", algorithm.LeastRequestedPriority) - factory.AddPriority("SpreadingPriority", algorithm.CalculateSpreadPriority) - factory.AddPriority("EqualPriority", algorithm.EqualPriority) + factory.AddPriority("LeastRequestedPriority", algorithm.LeastRequestedPriority, 1) + factory.AddPriority("SpreadingPriority", algorithm.CalculateSpreadPriority, 1) + factory.AddPriority("EqualPriority", algorithm.EqualPriority, 0) } -func (factory *configFactory) AddPriority(key string, function algorithm.PriorityFunction) { - factory.PriorityMap[key] = function +func (factory *configFactory) AddPriority(key string, function algorithm.PriorityFunction, weight int) { + factory.PriorityMap[key] = algorithm.PriorityConfig{Function: function, Weight: weight} +} + +func (factory *configFactory) SetWeight(key string, weight int) { + config, ok := factory.PriorityMap[key] + if !ok { + glog.Errorf("Invalid priority key %s specified - no corresponding function found", key) + return + } + config.Weight = weight } type listWatch struct { @@ -209,7 +213,7 @@ func (lw *listWatch) Watch(resourceVersion string) (watch.Interface, error) { func (factory *configFactory) createUnassignedPodLW() *listWatch { return &listWatch{ client: factory.Client, - fieldSelector: labels.Set{"Status.Host": ""}.AsSelector(), + fieldSelector: labels.Set{"DesiredState.Host": ""}.AsSelector(), resource: "pods", } } @@ -227,7 +231,7 @@ func parseSelectorOrDie(s string) labels.Selector { func (factory *configFactory) createAssignedPodLW() *listWatch { return &listWatch{ client: factory.Client, - fieldSelector: parseSelectorOrDie("Status.Host!="), + fieldSelector: parseSelectorOrDie("DesiredState.Host!="), resource: "pods", } } diff --git a/plugin/pkg/scheduler/factory/factory_test.go b/plugin/pkg/scheduler/factory/factory_test.go index 7d4e2db3de6..6c7d504c495 100644 --- a/plugin/pkg/scheduler/factory/factory_test.go +++ b/plugin/pkg/scheduler/factory/factory_test.go @@ -59,12 +59,12 @@ func TestCreateLists(t *testing.T) { }, // Assigned pod { - location: "/api/" + testapi.Version() + "/pods?fields=Status.Host!%3D", + location: "/api/" + testapi.Version() + "/pods?fields=DesiredState.Host!%3D", factory: factory.createAssignedPodLW, }, // Unassigned pod { - location: "/api/" + testapi.Version() + "/pods?fields=Status.Host%3D", + location: "/api/" + testapi.Version() + "/pods?fields=DesiredState.Host%3D", factory: factory.createUnassignedPodLW, }, } @@ -108,21 +108,21 @@ func TestCreateWatches(t *testing.T) { // Assigned pod watches { rv: "", - location: "/api/" + testapi.Version() + "/watch/pods?fields=Status.Host!%3D&resourceVersion=", + location: "/api/" + testapi.Version() + "/watch/pods?fields=DesiredState.Host!%3D&resourceVersion=", factory: factory.createAssignedPodLW, }, { rv: "42", - location: "/api/" + testapi.Version() + "/watch/pods?fields=Status.Host!%3D&resourceVersion=42", + location: "/api/" + testapi.Version() + "/watch/pods?fields=DesiredState.Host!%3D&resourceVersion=42", factory: factory.createAssignedPodLW, }, // Unassigned pod watches { rv: "", - location: "/api/" + testapi.Version() + "/watch/pods?fields=Status.Host%3D&resourceVersion=", + location: "/api/" + testapi.Version() + "/watch/pods?fields=DesiredState.Host%3D&resourceVersion=", factory: factory.createUnassignedPodLW, }, { rv: "42", - location: "/api/" + testapi.Version() + "/watch/pods?fields=Status.Host%3D&resourceVersion=42", + location: "/api/" + testapi.Version() + "/watch/pods?fields=DesiredState.Host%3D&resourceVersion=42", factory: factory.createUnassignedPodLW, }, } From 5fa1dbc07bd623df621750f252c79c561faaea6c Mon Sep 17 00:00:00 2001 From: Abhishek Gupta Date: Mon, 1 Dec 2014 11:49:13 -0800 Subject: [PATCH 5/6] Implementing PR feedback - Making ConfigFactory struct public - Added comments for predicate/priority functions and new methods --- pkg/scheduler/generic_scheduler.go | 29 +++++++++++------ plugin/pkg/scheduler/factory/factory.go | 41 +++++++++++++++---------- 2 files changed, 44 insertions(+), 26 deletions(-) diff --git a/pkg/scheduler/generic_scheduler.go b/pkg/scheduler/generic_scheduler.go index d9d7af2558a..93174d139b9 100644 --- a/pkg/scheduler/generic_scheduler.go +++ b/pkg/scheduler/generic_scheduler.go @@ -58,6 +58,8 @@ func (g *genericScheduler) Schedule(pod api.Pod, minionLister MinionLister) (str return g.selectHost(priorityList) } +// This method takes a prioritized list of minions and sorts them in reverse order based on scores +// and then picks one randomly from the minions that had the highest score func (g *genericScheduler) selectHost(priorityList HostPriorityList) (string, error) { if len(priorityList) == 0 { return "", fmt.Errorf("empty priorityList") @@ -72,6 +74,8 @@ func (g *genericScheduler) selectHost(priorityList HostPriorityList) (string, er return hosts[ix], nil } +// Filters the minions to find the ones that fit based on the given predicate functions +// Each minion is passed through the predicate functions to determine if it is a fit func findNodesThatFit(pod api.Pod, podLister PodLister, predicates []FitPredicate, nodes api.MinionList) (api.MinionList, error) { filtered := []api.Minion{} machineToPods, err := MapPodsToMachines(podLister) @@ -97,21 +101,28 @@ func findNodesThatFit(pod api.Pod, podLister PodLister, predicates []FitPredicat return api.MinionList{Items: filtered}, nil } +// Prioritizes the minions by running the individual priority functions sequentially. +// Each priority function is expected to set a score of 0-10 +// 0 is the lowest priority score (least preferred minion) and 10 is the highest +// Each priority function can also have its own weight +// The minion scores returned by the priority function are multiplied by the weights to get weighted scores +// All scores are finally combined (added) to get the total weighted scores of all minions func prioritizeNodes(pod api.Pod, podLister PodLister, priorities []PriorityConfig, minionLister MinionLister) (HostPriorityList, error) { result := HostPriorityList{} combinedScores := map[string]int{} for _, priority := range priorities { weight := priority.Weight // skip the priority function if the weight is specified as 0 - if weight > 0 { - priorityFunc := priority.Function - prioritizedList, err := priorityFunc(pod, podLister, minionLister) - if err != nil { - return HostPriorityList{}, err - } - for _, hostEntry := range prioritizedList { - combinedScores[hostEntry.host] += hostEntry.score * weight - } + if weight == 0 { + continue + } + priorityFunc := priority.Function + prioritizedList, err := priorityFunc(pod, podLister, minionLister) + if err != nil { + return HostPriorityList{}, err + } + for _, hostEntry := range prioritizedList { + combinedScores[hostEntry.host] += hostEntry.score * weight } } for host, score := range combinedScores { diff --git a/plugin/pkg/scheduler/factory/factory.go b/plugin/pkg/scheduler/factory/factory.go index d648704ea40..47bc148ea05 100644 --- a/plugin/pkg/scheduler/factory/factory.go +++ b/plugin/pkg/scheduler/factory/factory.go @@ -37,8 +37,8 @@ import ( "github.com/golang/glog" ) -// configFactory knows how to fill out a scheduler config with its support functions. -type configFactory struct { +// ConfigFactory knows how to fill out a scheduler config with its support functions. +type ConfigFactory struct { Client *client.Client // queue for pods that need scheduling PodQueue *cache.FIFO @@ -55,9 +55,9 @@ type configFactory struct { } // NewConfigFactory initializes the factory. -func NewConfigFactory(client *client.Client) *configFactory { +func NewConfigFactory(client *client.Client) *ConfigFactory { // initialize the factory struct - factory := &configFactory{Client: client, + factory := &ConfigFactory{Client: client, PodQueue: cache.NewFIFO(), PodLister: &storeToPodLister{cache.NewStore()}, MinionLister: &storeToMinionLister{cache.NewStore()}, @@ -72,7 +72,7 @@ func NewConfigFactory(client *client.Client) *configFactory { } // Create creates a scheduler and all support functions. -func (factory *configFactory) Create(predicateKeys, priorityKeys []string) (*scheduler.Config, error) { +func (factory *ConfigFactory) Create(predicateKeys, priorityKeys []string) (*scheduler.Config, error) { if predicateKeys == nil { glog.V(2).Infof("Custom predicates list not provided, using default predicates") predicateKeys = []string{"PodFitsPorts", "PodFitsResources", "NoDiskConflict", "MatchNodeSelector"} @@ -129,7 +129,7 @@ func (factory *configFactory) Create(predicateKeys, priorityKeys []string) (*sch }, nil } -func (factory *configFactory) getPredicateFunctions(keys []string) ([]algorithm.FitPredicate, error) { +func (factory *ConfigFactory) getPredicateFunctions(keys []string) ([]algorithm.FitPredicate, error) { predicates := []algorithm.FitPredicate{} for _, key := range keys { function, ok := factory.PredicateMap[key] @@ -141,7 +141,7 @@ func (factory *configFactory) getPredicateFunctions(keys []string) ([]algorithm. return predicates, nil } -func (factory *configFactory) getPriorityConfigs(keys []string) ([]algorithm.PriorityConfig, error) { +func (factory *ConfigFactory) getPriorityConfigs(keys []string) ([]algorithm.PriorityConfig, error) { configs := []algorithm.PriorityConfig{} for _, key := range keys { config, ok := factory.PriorityMap[key] @@ -153,28 +153,35 @@ func (factory *configFactory) getPriorityConfigs(keys []string) ([]algorithm.Pri return configs, nil } -func (factory *configFactory) addDefaultPredicates() { +func (factory *ConfigFactory) addDefaultPredicates() { + // Fit is defined based on the absence of port conflicts. factory.AddPredicate("PodFitsPorts", algorithm.PodFitsPorts) + // Fit is determined by resource availability factory.AddPredicate("PodFitsResources", algorithm.NewResourceFitPredicate(factory.MinionLister)) + // Fit is determined by non-conflicting disk volumes factory.AddPredicate("NoDiskConflict", algorithm.NoDiskConflict) + // Fit is determined by node selector query factory.AddPredicate("MatchNodeSelector", algorithm.NewSelectorMatchPredicate(factory.MinionLister)) } -func (factory *configFactory) AddPredicate(key string, function algorithm.FitPredicate) { +func (factory *ConfigFactory) AddPredicate(key string, function algorithm.FitPredicate) { factory.PredicateMap[key] = function } -func (factory *configFactory) addDefaultPriorities() { +func (factory *ConfigFactory) addDefaultPriorities() { + // Prioritize nodes by least requested utilization. factory.AddPriority("LeastRequestedPriority", algorithm.LeastRequestedPriority, 1) + // spreads pods by minimizing the number of pods on the same minion with the same labels. factory.AddPriority("SpreadingPriority", algorithm.CalculateSpreadPriority, 1) + // EqualPriority is a prioritizer function that gives an equal weight of one to all minions factory.AddPriority("EqualPriority", algorithm.EqualPriority, 0) } -func (factory *configFactory) AddPriority(key string, function algorithm.PriorityFunction, weight int) { +func (factory *ConfigFactory) AddPriority(key string, function algorithm.PriorityFunction, weight int) { factory.PriorityMap[key] = algorithm.PriorityConfig{Function: function, Weight: weight} } -func (factory *configFactory) SetWeight(key string, weight int) { +func (factory *ConfigFactory) SetWeight(key string, weight int) { config, ok := factory.PriorityMap[key] if !ok { glog.Errorf("Invalid priority key %s specified - no corresponding function found", key) @@ -210,7 +217,7 @@ func (lw *listWatch) Watch(resourceVersion string) (watch.Interface, error) { // createUnassignedPodLW returns a listWatch that finds all pods that need to be // scheduled. -func (factory *configFactory) createUnassignedPodLW() *listWatch { +func (factory *ConfigFactory) createUnassignedPodLW() *listWatch { return &listWatch{ client: factory.Client, fieldSelector: labels.Set{"DesiredState.Host": ""}.AsSelector(), @@ -228,7 +235,7 @@ func parseSelectorOrDie(s string) labels.Selector { // createAssignedPodLW returns a listWatch that finds all pods that are // already scheduled. -func (factory *configFactory) createAssignedPodLW() *listWatch { +func (factory *ConfigFactory) createAssignedPodLW() *listWatch { return &listWatch{ client: factory.Client, fieldSelector: parseSelectorOrDie("DesiredState.Host!="), @@ -237,7 +244,7 @@ func (factory *configFactory) createAssignedPodLW() *listWatch { } // createMinionLW returns a listWatch that gets all changes to minions. -func (factory *configFactory) createMinionLW() *listWatch { +func (factory *ConfigFactory) createMinionLW() *listWatch { return &listWatch{ client: factory.Client, fieldSelector: parseSelectorOrDie(""), @@ -246,7 +253,7 @@ func (factory *configFactory) createMinionLW() *listWatch { } // pollMinions lists all minions and returns an enumerator for cache.Poller. -func (factory *configFactory) pollMinions() (cache.Enumerator, error) { +func (factory *ConfigFactory) pollMinions() (cache.Enumerator, error) { list := &api.MinionList{} err := factory.Client.Get().Path("minions").Do().Into(list) if err != nil { @@ -255,7 +262,7 @@ func (factory *configFactory) pollMinions() (cache.Enumerator, error) { return &minionEnumerator{list}, nil } -func (factory *configFactory) makeDefaultErrorFunc(backoff *podBackoff, podQueue *cache.FIFO) func(pod *api.Pod, err error) { +func (factory *ConfigFactory) makeDefaultErrorFunc(backoff *podBackoff, podQueue *cache.FIFO) func(pod *api.Pod, err error) { return func(pod *api.Pod, err error) { glog.Errorf("Error scheduling %v: %v; retrying", pod.Name, err) backoff.gc() From d17cebcd2a0bf676fe99dbfbaa1182720cb0adca Mon Sep 17 00:00:00 2001 From: Abhishek Gupta Date: Mon, 8 Dec 2014 12:51:05 -0800 Subject: [PATCH 6/6] Implementing PR feedback and adding test cases --- pkg/scheduler/generic_scheduler.go | 12 +- pkg/scheduler/priorities.go | 24 ++-- pkg/scheduler/priorities_test.go | 139 ++++++++++++++++++++++-- pkg/scheduler/spreading.go | 16 +-- plugin/pkg/scheduler/factory/factory.go | 5 +- 5 files changed, 160 insertions(+), 36 deletions(-) diff --git a/pkg/scheduler/generic_scheduler.go b/pkg/scheduler/generic_scheduler.go index 93174d139b9..3345886e4a5 100644 --- a/pkg/scheduler/generic_scheduler.go +++ b/pkg/scheduler/generic_scheduler.go @@ -66,7 +66,7 @@ func (g *genericScheduler) selectHost(priorityList HostPriorityList) (string, er } sort.Sort(sort.Reverse(priorityList)) - hosts := getMinHosts(priorityList) + hosts := getBestHosts(priorityList) g.randomLock.Lock() defer g.randomLock.Unlock() @@ -107,16 +107,16 @@ func findNodesThatFit(pod api.Pod, podLister PodLister, predicates []FitPredicat // Each priority function can also have its own weight // The minion scores returned by the priority function are multiplied by the weights to get weighted scores // All scores are finally combined (added) to get the total weighted scores of all minions -func prioritizeNodes(pod api.Pod, podLister PodLister, priorities []PriorityConfig, minionLister MinionLister) (HostPriorityList, error) { +func prioritizeNodes(pod api.Pod, podLister PodLister, priorityConfigs []PriorityConfig, minionLister MinionLister) (HostPriorityList, error) { result := HostPriorityList{} combinedScores := map[string]int{} - for _, priority := range priorities { - weight := priority.Weight + for _, priorityConfig := range priorityConfigs { + weight := priorityConfig.Weight // skip the priority function if the weight is specified as 0 if weight == 0 { continue } - priorityFunc := priority.Function + priorityFunc := priorityConfig.Function prioritizedList, err := priorityFunc(pod, podLister, minionLister) if err != nil { return HostPriorityList{}, err @@ -131,7 +131,7 @@ func prioritizeNodes(pod api.Pod, podLister PodLister, priorities []PriorityConf return result, nil } -func getMinHosts(list HostPriorityList) []string { +func getBestHosts(list HostPriorityList) []string { result := []string{} for _, hostEntry := range list { if hostEntry.score == list[0].score { diff --git a/pkg/scheduler/priorities.go b/pkg/scheduler/priorities.go index 25b41aa6a15..34a5bb651e2 100644 --- a/pkg/scheduler/priorities.go +++ b/pkg/scheduler/priorities.go @@ -24,27 +24,37 @@ import ( // the unused capacity is calculated on a scale of 0-10 // 0 being the lowest priority and 10 being the highest -func calculateScore(requested, capacity int) int { +func calculateScore(requested, capacity int, node string) int { if capacity == 0 { return 0 } + if requested > capacity { + glog.Errorf("Combined requested resources from existing pods exceeds capacity on minion: %s", node) + return 0 + } return ((capacity - requested) * 10) / capacity } // Calculate the occupancy on a node. 'node' has information about the resources on the node. // 'pods' is a list of pods currently scheduled on the node. -func calculateOccupancy(node api.Minion, pods []api.Pod) HostPriority { +func calculateOccupancy(pod api.Pod, node api.Minion, pods []api.Pod) HostPriority { totalCPU := 0 totalMemory := 0 - for _, pod := range pods { - for _, container := range pod.Spec.Containers { + for _, existingPod := range pods { + for _, container := range existingPod.Spec.Containers { totalCPU += container.CPU totalMemory += container.Memory } } + // Add the resources requested by the current pod being scheduled. + // This also helps differentiate between differently sized, but empty, minions. + for _, container := range pod.Spec.Containers { + totalCPU += container.CPU + totalMemory += container.Memory + } - cpuScore := calculateScore(totalCPU, resources.GetIntegerResource(node.Spec.Capacity, resources.CPU, 0)) - memoryScore := calculateScore(totalMemory, resources.GetIntegerResource(node.Spec.Capacity, resources.Memory, 0)) + cpuScore := calculateScore(totalCPU, resources.GetIntegerResource(node.Spec.Capacity, resources.CPU, 0), node.Name) + memoryScore := calculateScore(totalMemory, resources.GetIntegerResource(node.Spec.Capacity, resources.Memory, 0), node.Name) glog.V(4).Infof("Least Requested Priority, AbsoluteRequested: (%d, %d) Score:(%d, %d)", totalCPU, totalMemory, cpuScore, memoryScore) return HostPriority{ @@ -66,7 +76,7 @@ func LeastRequestedPriority(pod api.Pod, podLister PodLister, minionLister Minio list := HostPriorityList{} for _, node := range nodes.Items { - list = append(list, calculateOccupancy(node, podsToMachines[node.Name])) + list = append(list, calculateOccupancy(pod, node, podsToMachines[node.Name])) } return list, nil } diff --git a/pkg/scheduler/priorities_test.go b/pkg/scheduler/priorities_test.go index 976da176445..e8ebd7c5e86 100644 --- a/pkg/scheduler/priorities_test.go +++ b/pkg/scheduler/priorities_test.go @@ -52,12 +52,14 @@ func TestLeastRequested(t *testing.T) { machine2Status := api.PodStatus{ Host: "machine2", } + noResources := api.PodSpec{ + Containers: []api.Container{}, + } cpuOnly := api.PodSpec{ Containers: []api.Container{ {CPU: 1000}, {CPU: 2000}, }, - // Host: "machine1", } cpuAndMemory := api.PodSpec{ Containers: []api.Container{ @@ -73,14 +75,55 @@ func TestLeastRequested(t *testing.T) { test string }{ { + /* + Minion1 scores (remaining resources) on 0-10 scale + CPU Score: (4000 - 0) / 4000 = 10 + Memory Score: (10000 - 0) / 10000 = 10 + Minion1 Score: (10 + 10) / 2 = 10 + + Minion2 scores (remaining resources) on 0-10 scale + CPU Score: (4000 - 0) / 4000 = 10 + Memory Score: (10000 - 0) / 10000 = 10 + Minion2 Score: (10 + 10) / 2 = 10 + */ + pod: api.Pod{Spec: noResources}, nodes: []api.Minion{makeMinion("machine1", 4000, 10000), makeMinion("machine2", 4000, 10000)}, expectedList: []HostPriority{{"machine1", 10}, {"machine2", 10}}, - test: "nothing scheduled", + test: "nothing scheduled, nothing requested", }, { + /* + Minion1 scores on 0-10 scale + CPU Score: (4000 - 3000) / 4000 = 2.5 + Memory Score: (10000 - 5000) / 10000 = 5 + Minion1 Score: (2.5 + 5) / 2 = 3 + + Minion2 scores on 0-10 scale + CPU Score: (6000 - 3000) / 6000 = 5 + Memory Score: (10000 - 5000) / 10000 = 5 + Minion2 Score: (5 + 5) / 2 = 5 + */ + pod: api.Pod{Spec: cpuAndMemory}, + nodes: []api.Minion{makeMinion("machine1", 4000, 10000), makeMinion("machine2", 6000, 10000)}, + expectedList: []HostPriority{{"machine1", 3}, {"machine2", 5}}, + test: "nothing scheduled, resources requested, differently sized machines", + }, + { + /* + Minion1 scores on 0-10 scale + CPU Score: (4000 - 0) / 4000 = 10 + Memory Score: (10000 - 0) / 10000 = 10 + Minion1 Score: (10 + 10) / 2 = 10 + + Minion2 scores on 0-10 scale + CPU Score: (4000 - 0) / 4000 = 10 + Memory Score: (10000 - 0) / 10000 = 10 + Minion2 Score: (10 + 10) / 2 = 10 + */ + pod: api.Pod{Spec: noResources}, nodes: []api.Minion{makeMinion("machine1", 4000, 10000), makeMinion("machine2", 4000, 10000)}, expectedList: []HostPriority{{"machine1", 10}, {"machine2", 10}}, - test: "no resources requested", + test: "no resources requested, pods scheduled", pods: []api.Pod{ {Status: machine1Status, ObjectMeta: api.ObjectMeta{Labels: labels2}}, {Status: machine1Status, ObjectMeta: api.ObjectMeta{Labels: labels1}}, @@ -89,18 +132,96 @@ func TestLeastRequested(t *testing.T) { }, }, { - nodes: []api.Minion{makeMinion("machine1", 4000, 10000), makeMinion("machine2", 4000, 10000)}, - expectedList: []HostPriority{{"machine1", 6 /* int(200%-75% / 2) */}, {"machine2", 3 /* int( 200%-125% / 2) */}}, - test: "resources requested", + /* + Minion1 scores on 0-10 scale + CPU Score: (10000 - 6000) / 10000 = 4 + Memory Score: (20000 - 0) / 20000 = 10 + Minion1 Score: (4 + 10) / 2 = 7 + + Minion2 scores on 0-10 scale + CPU Score: (10000 - 6000) / 10000 = 4 + Memory Score: (20000 - 5000) / 20000 = 7.5 + Minion2 Score: (4 + 7.5) / 2 = 5 + */ + pod: api.Pod{Spec: noResources}, + nodes: []api.Minion{makeMinion("machine1", 10000, 20000), makeMinion("machine2", 10000, 20000)}, + expectedList: []HostPriority{{"machine1", 7}, {"machine2", 5}}, + test: "no resources requested, pods scheduled with resources", pods: []api.Pod{ - {Spec: cpuOnly, Status: api.PodStatus{Host: "machine1"}}, - {Spec: cpuAndMemory, Status: api.PodStatus{Host: "machine2"}}, + {Spec: cpuOnly, Status: machine1Status, ObjectMeta: api.ObjectMeta{Labels: labels2}}, + {Spec: cpuOnly, Status: machine1Status, ObjectMeta: api.ObjectMeta{Labels: labels1}}, + {Spec: cpuOnly, Status: machine2Status, ObjectMeta: api.ObjectMeta{Labels: labels1}}, + {Spec: cpuAndMemory, Status: machine2Status, ObjectMeta: api.ObjectMeta{Labels: labels1}}, }, }, { + /* + Minion1 scores on 0-10 scale + CPU Score: (10000 - 6000) / 10000 = 4 + Memory Score: (20000 - 5000) / 20000 = 7.5 + Minion1 Score: (4 + 7.5) / 2 = 5 + + Minion2 scores on 0-10 scale + CPU Score: (10000 - 6000) / 10000 = 4 + Memory Score: (20000 - 10000) / 20000 = 5 + Minion2 Score: (4 + 5) / 2 = 4 + */ + pod: api.Pod{Spec: cpuAndMemory}, + nodes: []api.Minion{makeMinion("machine1", 10000, 20000), makeMinion("machine2", 10000, 20000)}, + expectedList: []HostPriority{{"machine1", 5}, {"machine2", 4}}, + test: "resources requested, pods scheduled with resources", + pods: []api.Pod{ + {Spec: cpuOnly, Status: machine1Status}, + {Spec: cpuAndMemory, Status: machine2Status}, + }, + }, + { + /* + Minion1 scores on 0-10 scale + CPU Score: (10000 - 6000) / 10000 = 4 + Memory Score: (20000 - 5000) / 20000 = 7.5 + Minion1 Score: (4 + 7.5) / 2 = 5 + + Minion2 scores on 0-10 scale + CPU Score: (10000 - 6000) / 10000 = 4 + Memory Score: (50000 - 10000) / 50000 = 8 + Minion2 Score: (4 + 8) / 2 = 6 + */ + pod: api.Pod{Spec: cpuAndMemory}, + nodes: []api.Minion{makeMinion("machine1", 10000, 20000), makeMinion("machine2", 10000, 50000)}, + expectedList: []HostPriority{{"machine1", 5}, {"machine2", 6}}, + test: "resources requested, pods scheduled with resources, differently sized machines", + pods: []api.Pod{ + {Spec: cpuOnly, Status: machine1Status}, + {Spec: cpuAndMemory, Status: machine2Status}, + }, + }, + { + /* + Minion1 scores on 0-10 scale + CPU Score: (4000 - 6000) / 4000 = 0 + Memory Score: (10000 - 0) / 10000 = 10 + Minion1 Score: (0 + 10) / 2 = 5 + + Minion2 scores on 0-10 scale + CPU Score: (4000 - 6000) / 4000 = 0 + Memory Score: (10000 - 5000) / 10000 = 5 + Minion2 Score: (0 + 5) / 2 = 2 + */ + pod: api.Pod{Spec: cpuOnly}, + nodes: []api.Minion{makeMinion("machine1", 4000, 10000), makeMinion("machine2", 4000, 10000)}, + expectedList: []HostPriority{{"machine1", 5}, {"machine2", 2}}, + test: "requested resources exceed minion capacity", + pods: []api.Pod{ + {Spec: cpuOnly, Status: machine1Status}, + {Spec: cpuAndMemory, Status: machine2Status}, + }, + }, + { + pod: api.Pod{Spec: noResources}, nodes: []api.Minion{makeMinion("machine1", 0, 0), makeMinion("machine2", 0, 0)}, expectedList: []HostPriority{{"machine1", 0}, {"machine2", 0}}, - test: "zero minion resources", + test: "zero minion resources, pods scheduled with resources", pods: []api.Pod{ {Spec: cpuOnly}, {Spec: cpuAndMemory}, diff --git a/pkg/scheduler/spreading.go b/pkg/scheduler/spreading.go index 314a1d47f13..24e9a8278a1 100644 --- a/pkg/scheduler/spreading.go +++ b/pkg/scheduler/spreading.go @@ -18,7 +18,6 @@ package scheduler import ( "math/rand" - "sort" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" @@ -44,18 +43,11 @@ func CalculateSpreadPriority(pod api.Pod, podLister PodLister, minionLister Mini if len(pods) > 0 { for _, pod := range pods { counts[pod.Status.Host]++ + // Compute the maximum number of pods hosted on any minion + if counts[pod.Status.Host] > maxCount { + maxCount = counts[pod.Status.Host] + } } - - // doing this separately since the pod count can be much higher - // than the filtered minion count - values := make([]int, len(counts)) - idx := 0 - for _, count := range counts { - values[idx] = count - idx++ - } - sort.Sort(sort.IntSlice(values)) - maxCount = values[len(values)-1] } result := []HostPriority{} diff --git a/plugin/pkg/scheduler/factory/factory.go b/plugin/pkg/scheduler/factory/factory.go index 47bc148ea05..6fd88fbf2dd 100644 --- a/plugin/pkg/scheduler/factory/factory.go +++ b/plugin/pkg/scheduler/factory/factory.go @@ -57,7 +57,8 @@ type ConfigFactory struct { // NewConfigFactory initializes the factory. func NewConfigFactory(client *client.Client) *ConfigFactory { // initialize the factory struct - factory := &ConfigFactory{Client: client, + factory := &ConfigFactory{ + Client: client, PodQueue: cache.NewFIFO(), PodLister: &storeToPodLister{cache.NewStore()}, MinionLister: &storeToMinionLister{cache.NewStore()}, @@ -83,7 +84,7 @@ func (factory *ConfigFactory) Create(predicateKeys, priorityKeys []string) (*sch } if priorityKeys == nil { - glog.V(2).Infof("Custom priority list not provided, using default priorities") + glog.V(2).Infof("Custom priority list not provided, using default priority: LeastRequestedPriority") priorityKeys = []string{"LeastRequestedPriority"} } priorityConfigs, err := factory.getPriorityConfigs(priorityKeys)