Scheduler changes to allow multiple priority functions

This commit is contained in:
Abhishek Gupta 2014-11-20 14:42:31 -08:00
parent 6b712cc700
commit 1eb28b0aa3
8 changed files with 83 additions and 50 deletions

View File

@ -26,11 +26,11 @@ import (
) )
type genericScheduler struct { type genericScheduler struct {
predicates []FitPredicate predicates []FitPredicate
prioritizer PriorityFunction prioritizers []PriorityFunction
pods PodLister pods PodLister
random *rand.Rand random *rand.Rand
randomLock sync.Mutex randomLock sync.Mutex
} }
func (g *genericScheduler) Schedule(pod api.Pod, minionLister MinionLister) (string, error) { func (g *genericScheduler) Schedule(pod api.Pod, minionLister MinionLister) (string, error) {
@ -47,7 +47,7 @@ func (g *genericScheduler) Schedule(pod api.Pod, minionLister MinionLister) (str
return "", err return "", err
} }
priorityList, err := g.prioritizer(pod, g.pods, FakeMinionLister(filteredNodes)) priorityList, err := prioritizeNodes(pod, g.pods, g.prioritizers, FakeMinionLister(filteredNodes))
if err != nil { if err != nil {
return "", err return "", err
} }
@ -97,6 +97,27 @@ func findNodesThatFit(pod api.Pod, podLister PodLister, predicates []FitPredicat
return api.MinionList{Items: filtered}, nil return api.MinionList{Items: filtered}, nil
} }
func prioritizeNodes(pod api.Pod, podLister PodLister, priorities []PriorityFunction, minionLister MinionLister) (HostPriorityList, error) {
result := HostPriorityList{}
combinedScores := map[string]int{}
for _, priority := range priorities {
prioritizedList, err := priority(pod, podLister, minionLister)
if err != nil {
return HostPriorityList{}, err
}
if len(priorities) == 1 {
return prioritizedList, nil
}
for _, hostEntry := range prioritizedList {
combinedScores[hostEntry.host] += hostEntry.score
}
}
for host, score := range combinedScores {
result = append(result, HostPriority{host: host, score: score})
}
return result, nil
}
func getMinHosts(list HostPriorityList) []string { func getMinHosts(list HostPriorityList) []string {
result := []string{} result := []string{}
for _, hostEntry := range list { for _, hostEntry := range list {
@ -127,10 +148,10 @@ func EqualPriority(pod api.Pod, podLister PodLister, minionLister MinionLister)
return result, nil return result, nil
} }
func NewGenericScheduler(predicates []FitPredicate, prioritizer PriorityFunction, pods PodLister, random *rand.Rand) Scheduler { func NewGenericScheduler(predicates []FitPredicate, prioritizers []PriorityFunction, pods PodLister, random *rand.Rand) Scheduler {
return &genericScheduler{ return &genericScheduler{
predicates: predicates, predicates: predicates,
prioritizer: prioritizer, prioritizers: prioritizers,
pods: pods, pods: pods,
random: random, random: random,
} }

View File

@ -138,57 +138,57 @@ func TestSelectHost(t *testing.T) {
func TestGenericScheduler(t *testing.T) { func TestGenericScheduler(t *testing.T) {
tests := []struct { tests := []struct {
predicates []FitPredicate predicates []FitPredicate
prioritizer PriorityFunction prioritizers []PriorityFunction
nodes []string nodes []string
pod api.Pod pod api.Pod
expectedHost string expectedHost string
expectsErr bool expectsErr bool
}{ }{
{ {
predicates: []FitPredicate{falsePredicate}, predicates: []FitPredicate{falsePredicate},
prioritizer: EqualPriority, prioritizers: []PriorityFunction{EqualPriority},
nodes: []string{"machine1", "machine2"}, nodes: []string{"machine1", "machine2"},
expectsErr: true, expectsErr: true,
}, },
{ {
predicates: []FitPredicate{truePredicate}, predicates: []FitPredicate{truePredicate},
prioritizer: EqualPriority, prioritizers: []PriorityFunction{EqualPriority},
nodes: []string{"machine1", "machine2"}, nodes: []string{"machine1", "machine2"},
// Random choice between both, the rand seeded above with zero, chooses "machine2" // Random choice between both, the rand seeded above with zero, chooses "machine2"
expectedHost: "machine2", expectedHost: "machine2",
}, },
{ {
// Fits on a machine where the pod ID matches the machine name // Fits on a machine where the pod ID matches the machine name
predicates: []FitPredicate{matchesPredicate}, predicates: []FitPredicate{matchesPredicate},
prioritizer: EqualPriority, prioritizers: []PriorityFunction{EqualPriority},
nodes: []string{"machine1", "machine2"}, nodes: []string{"machine1", "machine2"},
pod: api.Pod{ObjectMeta: api.ObjectMeta{Name: "machine2"}}, pod: api.Pod{ObjectMeta: api.ObjectMeta{Name: "machine2"}},
expectedHost: "machine2", expectedHost: "machine2",
}, },
{ {
predicates: []FitPredicate{truePredicate}, predicates: []FitPredicate{truePredicate},
prioritizer: numericPriority, prioritizers: []PriorityFunction{numericPriority},
nodes: []string{"3", "2", "1"}, nodes: []string{"3", "2", "1"},
expectedHost: "1", expectedHost: "1",
}, },
{ {
predicates: []FitPredicate{matchesPredicate}, predicates: []FitPredicate{matchesPredicate},
prioritizer: numericPriority, prioritizers: []PriorityFunction{numericPriority},
nodes: []string{"3", "2", "1"}, nodes: []string{"3", "2", "1"},
pod: api.Pod{ObjectMeta: api.ObjectMeta{Name: "2"}}, pod: api.Pod{ObjectMeta: api.ObjectMeta{Name: "2"}},
expectedHost: "2", expectedHost: "2",
}, },
{ {
predicates: []FitPredicate{truePredicate, falsePredicate}, predicates: []FitPredicate{truePredicate, falsePredicate},
prioritizer: numericPriority, prioritizers: []PriorityFunction{numericPriority},
nodes: []string{"3", "2", "1"}, nodes: []string{"3", "2", "1"},
expectsErr: true, expectsErr: true,
}, },
} }
for _, test := range tests { for _, test := range tests {
random := rand.New(rand.NewSource(0)) random := rand.New(rand.NewSource(0))
scheduler := NewGenericScheduler(test.predicates, test.prioritizer, FakePodLister([]api.Pod{}), random) scheduler := NewGenericScheduler(test.predicates, test.prioritizers, FakePodLister([]api.Pod{}), random)
machine, err := scheduler.Schedule(test.pod, FakeMinionLister(makeMinionList(test.nodes))) machine, err := scheduler.Schedule(test.pod, FakeMinionLister(makeMinionList(test.nodes)))
if test.expectsErr { if test.expectsErr {
if err == nil { if err == nil {

View File

@ -74,7 +74,6 @@ func isVolumeConflict(volume api.Volume, pod *api.Pod) bool {
// TODO: migrate this into some per-volume specific code? // TODO: migrate this into some per-volume specific code?
func NoDiskConflict(pod api.Pod, existingPods []api.Pod, node string) (bool, error) { func NoDiskConflict(pod api.Pod, existingPods []api.Pod, node string) (bool, error) {
manifest := &(pod.Spec) manifest := &(pod.Spec)
glog.Errorf("custom predicate NoDiskConflict --> node: %s", node)
for ix := range manifest.Volumes { for ix := range manifest.Volumes {
for podIx := range existingPods { for podIx := range existingPods {
if isVolumeConflict(manifest.Volumes[ix], &existingPods[podIx]) { if isVolumeConflict(manifest.Volumes[ix], &existingPods[podIx]) {
@ -105,7 +104,6 @@ func getResourceRequest(pod *api.Pod) resourceRequest {
// PodFitsResources calculates fit based on requested, rather than used resources // PodFitsResources calculates fit based on requested, rather than used resources
func (r *ResourceFit) PodFitsResources(pod api.Pod, existingPods []api.Pod, node string) (bool, error) { func (r *ResourceFit) PodFitsResources(pod api.Pod, existingPods []api.Pod, node string) (bool, error) {
glog.Errorf("custom predicate PodFitsResources --> node: %s", node)
podRequest := getResourceRequest(&pod) podRequest := getResourceRequest(&pod)
if podRequest.milliCPU == 0 && podRequest.memory == 0 { if podRequest.milliCPU == 0 && podRequest.memory == 0 {
// no resources requested always fits. // no resources requested always fits.
@ -154,7 +152,6 @@ type NodeSelector struct {
func (n *NodeSelector) PodSelectorMatches(pod api.Pod, existingPods []api.Pod, node string) (bool, error) { func (n *NodeSelector) PodSelectorMatches(pod api.Pod, existingPods []api.Pod, node string) (bool, error) {
if len(pod.Spec.NodeSelector) == 0 { if len(pod.Spec.NodeSelector) == 0 {
glog.Errorf("custom predicate PodSelectorMatches --> node: %s", node)
return true, nil return true, nil
} }
selector := labels.SelectorFromSet(pod.Spec.NodeSelector) selector := labels.SelectorFromSet(pod.Spec.NodeSelector)
@ -166,7 +163,6 @@ func (n *NodeSelector) PodSelectorMatches(pod api.Pod, existingPods []api.Pod, n
} }
func PodFitsPorts(pod api.Pod, existingPods []api.Pod, node string) (bool, error) { func PodFitsPorts(pod api.Pod, existingPods []api.Pod, node string) (bool, error) {
glog.Errorf("custom predicate PodFitsPorts --> node: %s", node)
existingPorts := getUsedPorts(existingPods...) existingPorts := getUsedPorts(existingPods...)
wantPorts := getUsedPorts(pod) wantPorts := getUsedPorts(pod)
for wport := range wantPorts { for wport := range wantPorts {

View File

@ -18,6 +18,7 @@ package scheduler
import ( import (
"math/rand" "math/rand"
"sort"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
@ -37,18 +38,37 @@ func CalculateSpreadPriority(pod api.Pod, podLister PodLister, minionLister Mini
return nil, err return nil, err
} }
var maxCount int
var fScore float32
counts := map[string]int{} counts := map[string]int{}
for _, pod := range pods { if len(pods) > 0 {
counts[pod.Status.Host]++ for _, pod := range pods {
counts[pod.Status.Host]++
}
// doing this separately since the pod count can be much higher
// than the filtered minion count
values := make([]int, len(counts))
idx := 0
for _, count := range counts {
values[idx] = count
idx++
}
sort.Sort(sort.IntSlice(values))
maxCount = values[len(values)-1]
} }
result := []HostPriority{} result := []HostPriority{}
//score int
for _, minion := range minions.Items { for _, minion := range minions.Items {
result = append(result, HostPriority{host: minion.Name, score: counts[minion.Name]}) if maxCount > 0 {
fScore = 100 * ( float32(counts[minion.Name]) / float32(maxCount) )
}
result = append(result, HostPriority{host: minion.Name, score: int(fScore)})
} }
return result, nil return result, nil
} }
func NewSpreadingScheduler(podLister PodLister, minionLister MinionLister, predicates []FitPredicate, random *rand.Rand) Scheduler { func NewSpreadingScheduler(podLister PodLister, minionLister MinionLister, predicates []FitPredicate, random *rand.Rand) Scheduler {
return NewGenericScheduler(predicates, CalculateSpreadPriority, podLister, random) return NewGenericScheduler(predicates, []PriorityFunction{CalculateSpreadPriority}, podLister, random)
} }

View File

@ -71,7 +71,7 @@ func TestSpreadPriority(t *testing.T) {
{Status: machine2Status, ObjectMeta: api.ObjectMeta{Labels: labels1}}, {Status: machine2Status, ObjectMeta: api.ObjectMeta{Labels: labels1}},
}, },
nodes: []string{"machine1", "machine2"}, nodes: []string{"machine1", "machine2"},
expectedList: []HostPriority{{"machine1", 0}, {"machine2", 1}}, expectedList: []HostPriority{{"machine1", 0}, {"machine2", 100}},
test: "one label match", test: "one label match",
}, },
{ {
@ -82,7 +82,7 @@ func TestSpreadPriority(t *testing.T) {
{Status: machine2Status, ObjectMeta: api.ObjectMeta{Labels: labels1}}, {Status: machine2Status, ObjectMeta: api.ObjectMeta{Labels: labels1}},
}, },
nodes: []string{"machine1", "machine2"}, nodes: []string{"machine1", "machine2"},
expectedList: []HostPriority{{"machine1", 1}, {"machine2", 1}}, expectedList: []HostPriority{{"machine1", 100}, {"machine2", 100}},
test: "two label matches on different machines", test: "two label matches on different machines",
}, },
{ {
@ -94,7 +94,7 @@ func TestSpreadPriority(t *testing.T) {
{Status: machine2Status, ObjectMeta: api.ObjectMeta{Labels: labels1}}, {Status: machine2Status, ObjectMeta: api.ObjectMeta{Labels: labels1}},
}, },
nodes: []string{"machine1", "machine2"}, nodes: []string{"machine1", "machine2"},
expectedList: []HostPriority{{"machine1", 1}, {"machine2", 2}}, expectedList: []HostPriority{{"machine1", 50}, {"machine2", 100}},
test: "three label matches", test: "three label matches",
}, },
} }

View File

@ -61,10 +61,7 @@ func main() {
go http.ListenAndServe(net.JoinHostPort(address.String(), strconv.Itoa(*port)), nil) go http.ListenAndServe(net.JoinHostPort(address.String(), strconv.Itoa(*port)), nil)
configFactory := factory.NewConfigFactory(kubeClient) configFactory := factory.NewConfigFactory(kubeClient)
configFactory.AddPredicate("CreateOnMinion1", scheduler.CreateOnMinion1) config, err := configFactory.Create(nil, nil)
configFactory.AddPredicate("CreateOnMinion2", scheduler.CreateOnMinion2)
config, err := configFactory.Create([]string{"CreateOnMinion2"}, nil)
if err != nil { if err != nil {
glog.Fatalf("Failed to create scheduler configuration: %v", err) glog.Fatalf("Failed to create scheduler configuration: %v", err)
} }

View File

@ -112,7 +112,7 @@ func (factory *configFactory) Create(predicateKeys, priorityKeys []string) (*sch
r := rand.New(rand.NewSource(time.Now().UnixNano())) r := rand.New(rand.NewSource(time.Now().UnixNano()))
algo := algorithm.NewGenericScheduler(predicateFuncs, priorityFuncs[0], factory.PodLister, r) algo := algorithm.NewGenericScheduler(predicateFuncs, priorityFuncs, factory.PodLister, r)
podBackoff := podBackoff{ podBackoff := podBackoff{
perPodBackoff: map[string]*backoffEntry{}, perPodBackoff: map[string]*backoffEntry{},
@ -126,7 +126,6 @@ func (factory *configFactory) Create(predicateKeys, priorityKeys []string) (*sch
NextPod: func() *api.Pod { NextPod: func() *api.Pod {
pod := factory.PodQueue.Pop().(*api.Pod) pod := factory.PodQueue.Pop().(*api.Pod)
glog.V(2).Infof("glog.v2 --> About to try and schedule pod %v", pod.Name) glog.V(2).Infof("glog.v2 --> About to try and schedule pod %v", pod.Name)
glog.Errorf("glog.error --> About to try and schedule pod %v", pod.Name)
return pod return pod
}, },
Error: factory.makeDefaultErrorFunc(&podBackoff, factory.PodQueue), Error: factory.makeDefaultErrorFunc(&podBackoff, factory.PodQueue),
@ -137,7 +136,6 @@ func (factory *configFactory) getPredicateFunctions(keys []string) ([]algorithm.
var function algorithm.FitPredicate var function algorithm.FitPredicate
predicates := []algorithm.FitPredicate{} predicates := []algorithm.FitPredicate{}
for _, key := range keys { for _, key := range keys {
glog.Errorf("Adding predicate function for key: %s", key)
function = factory.PredicateMap[key] function = factory.PredicateMap[key]
if function == nil { if function == nil {
return nil, fmt.Errorf("Invalid predicate key %s specified - no corresponding function found", key) return nil, fmt.Errorf("Invalid predicate key %s specified - no corresponding function found", key)
@ -174,6 +172,7 @@ func (factory *configFactory) AddPredicate(key string, function algorithm.FitPre
func (factory *configFactory) addDefaultPriorities() { func (factory *configFactory) addDefaultPriorities() {
factory.AddPriority("LeastRequestedPriority", algorithm.LeastRequestedPriority) factory.AddPriority("LeastRequestedPriority", algorithm.LeastRequestedPriority)
factory.AddPriority("SpreadingPriority", algorithm.CalculateSpreadPriority) factory.AddPriority("SpreadingPriority", algorithm.CalculateSpreadPriority)
factory.AddPriority("EqualPriority", algorithm.EqualPriority)
} }
func (factory *configFactory) AddPriority(key string, function algorithm.PriorityFunction) { func (factory *configFactory) AddPriority(key string, function algorithm.PriorityFunction) {
@ -210,7 +209,7 @@ func (lw *listWatch) Watch(resourceVersion string) (watch.Interface, error) {
func (factory *configFactory) createUnassignedPodLW() *listWatch { func (factory *configFactory) createUnassignedPodLW() *listWatch {
return &listWatch{ return &listWatch{
client: factory.Client, client: factory.Client,
fieldSelector: labels.Set{"DesiredState.Host": ""}.AsSelector(), fieldSelector: labels.Set{"Status.Host": ""}.AsSelector(),
resource: "pods", resource: "pods",
} }
} }
@ -228,7 +227,7 @@ func parseSelectorOrDie(s string) labels.Selector {
func (factory *configFactory) createAssignedPodLW() *listWatch { func (factory *configFactory) createAssignedPodLW() *listWatch {
return &listWatch{ return &listWatch{
client: factory.Client, client: factory.Client,
fieldSelector: parseSelectorOrDie("DesiredState.Host!="), fieldSelector: parseSelectorOrDie("Status.Host!="),
resource: "pods", resource: "pods",
} }
} }

View File

@ -59,12 +59,12 @@ func TestCreateLists(t *testing.T) {
}, },
// Assigned pod // Assigned pod
{ {
location: "/api/" + testapi.Version() + "/pods?fields=DesiredState.Host!%3D", location: "/api/" + testapi.Version() + "/pods?fields=Status.Host!%3D",
factory: factory.createAssignedPodLW, factory: factory.createAssignedPodLW,
}, },
// Unassigned pod // Unassigned pod
{ {
location: "/api/" + testapi.Version() + "/pods?fields=DesiredState.Host%3D", location: "/api/" + testapi.Version() + "/pods?fields=Status.Host%3D",
factory: factory.createUnassignedPodLW, factory: factory.createUnassignedPodLW,
}, },
} }
@ -108,21 +108,21 @@ func TestCreateWatches(t *testing.T) {
// Assigned pod watches // Assigned pod watches
{ {
rv: "", rv: "",
location: "/api/" + testapi.Version() + "/watch/pods?fields=DesiredState.Host!%3D&resourceVersion=", location: "/api/" + testapi.Version() + "/watch/pods?fields=Status.Host!%3D&resourceVersion=",
factory: factory.createAssignedPodLW, factory: factory.createAssignedPodLW,
}, { }, {
rv: "42", rv: "42",
location: "/api/" + testapi.Version() + "/watch/pods?fields=DesiredState.Host!%3D&resourceVersion=42", location: "/api/" + testapi.Version() + "/watch/pods?fields=Status.Host!%3D&resourceVersion=42",
factory: factory.createAssignedPodLW, factory: factory.createAssignedPodLW,
}, },
// Unassigned pod watches // Unassigned pod watches
{ {
rv: "", rv: "",
location: "/api/" + testapi.Version() + "/watch/pods?fields=DesiredState.Host%3D&resourceVersion=", location: "/api/" + testapi.Version() + "/watch/pods?fields=Status.Host%3D&resourceVersion=",
factory: factory.createUnassignedPodLW, factory: factory.createUnassignedPodLW,
}, { }, {
rv: "42", rv: "42",
location: "/api/" + testapi.Version() + "/watch/pods?fields=DesiredState.Host%3D&resourceVersion=42", location: "/api/" + testapi.Version() + "/watch/pods?fields=Status.Host%3D&resourceVersion=42",
factory: factory.createUnassignedPodLW, factory: factory.createUnassignedPodLW,
}, },
} }