diff --git a/cmd/integration/integration.go b/cmd/integration/integration.go index fc673027bc6..3a5026fd8c7 100644 --- a/cmd/integration/integration.go +++ b/cmd/integration/integration.go @@ -168,8 +168,11 @@ func startComponents(manifestURL string) (apiServerURL string) { handler.delegate = m.Handler // Scheduler - schedulerConfigFactory := &factory.ConfigFactory{cl} - schedulerConfig := schedulerConfigFactory.Create() + schedulerConfigFactory := factory.NewConfigFactory(cl) + schedulerConfig, err := schedulerConfigFactory.Create(nil, nil) + if err != nil { + glog.Fatal("Couldn't create scheduler config: %v", err) + } scheduler.New(schedulerConfig).Run() endpoints := service.NewEndpointController(cl) diff --git a/pkg/scheduler/generic_scheduler.go b/pkg/scheduler/generic_scheduler.go index a65b726e9f0..3345886e4a5 100644 --- a/pkg/scheduler/generic_scheduler.go +++ b/pkg/scheduler/generic_scheduler.go @@ -26,11 +26,11 @@ import ( ) type genericScheduler struct { - predicates []FitPredicate - prioritizer PriorityFunction - pods PodLister - random *rand.Rand - randomLock sync.Mutex + predicates []FitPredicate + prioritizers []PriorityConfig + pods PodLister + random *rand.Rand + randomLock sync.Mutex } func (g *genericScheduler) Schedule(pod api.Pod, minionLister MinionLister) (string, error) { @@ -38,27 +38,35 @@ func (g *genericScheduler) Schedule(pod api.Pod, minionLister MinionLister) (str if err != nil { return "", err } + if len(minions.Items) == 0 { + return "", fmt.Errorf("no minions available to schedule pods") + } + filteredNodes, err := findNodesThatFit(pod, g.pods, g.predicates, minions) if err != nil { return "", err } - priorityList, err := g.prioritizer(pod, g.pods, FakeMinionLister(filteredNodes)) + + priorityList, err := prioritizeNodes(pod, g.pods, g.prioritizers, FakeMinionLister(filteredNodes)) if err != nil { return "", err } if len(priorityList) == 0 { return "", fmt.Errorf("failed to find a fit for pod: %v", pod) } + return g.selectHost(priorityList) } +// This method takes a prioritized list of minions and sorts them in reverse order based on scores +// and then picks one randomly from the minions that had the highest score func (g *genericScheduler) selectHost(priorityList HostPriorityList) (string, error) { if len(priorityList) == 0 { return "", fmt.Errorf("empty priorityList") } - sort.Sort(priorityList) + sort.Sort(sort.Reverse(priorityList)) - hosts := getMinHosts(priorityList) + hosts := getBestHosts(priorityList) g.randomLock.Lock() defer g.randomLock.Unlock() @@ -66,6 +74,8 @@ func (g *genericScheduler) selectHost(priorityList HostPriorityList) (string, er return hosts[ix], nil } +// Filters the minions to find the ones that fit based on the given predicate functions +// Each minion is passed through the predicate functions to determine if it is a fit func findNodesThatFit(pod api.Pod, podLister PodLister, predicates []FitPredicate, nodes api.MinionList) (api.MinionList, error) { filtered := []api.Minion{} machineToPods, err := MapPodsToMachines(podLister) @@ -91,7 +101,37 @@ func findNodesThatFit(pod api.Pod, podLister PodLister, predicates []FitPredicat return api.MinionList{Items: filtered}, nil } -func getMinHosts(list HostPriorityList) []string { +// Prioritizes the minions by running the individual priority functions sequentially. +// Each priority function is expected to set a score of 0-10 +// 0 is the lowest priority score (least preferred minion) and 10 is the highest +// Each priority function can also have its own weight +// The minion scores returned by the priority function are multiplied by the weights to get weighted scores +// All scores are finally combined (added) to get the total weighted scores of all minions +func prioritizeNodes(pod api.Pod, podLister PodLister, priorityConfigs []PriorityConfig, minionLister MinionLister) (HostPriorityList, error) { + result := HostPriorityList{} + combinedScores := map[string]int{} + for _, priorityConfig := range priorityConfigs { + weight := priorityConfig.Weight + // skip the priority function if the weight is specified as 0 + if weight == 0 { + continue + } + priorityFunc := priorityConfig.Function + prioritizedList, err := priorityFunc(pod, podLister, minionLister) + if err != nil { + return HostPriorityList{}, err + } + for _, hostEntry := range prioritizedList { + combinedScores[hostEntry.host] += hostEntry.score * weight + } + } + for host, score := range combinedScores { + result = append(result, HostPriority{host: host, score: score}) + } + return result, nil +} + +func getBestHosts(list HostPriorityList) []string { result := []string{} for _, hostEntry := range list { if hostEntry.score == list[0].score { @@ -121,11 +161,11 @@ func EqualPriority(pod api.Pod, podLister PodLister, minionLister MinionLister) return result, nil } -func NewGenericScheduler(predicates []FitPredicate, prioritizer PriorityFunction, pods PodLister, random *rand.Rand) Scheduler { +func NewGenericScheduler(predicates []FitPredicate, prioritizers []PriorityConfig, pods PodLister, random *rand.Rand) Scheduler { return &genericScheduler{ - predicates: predicates, - prioritizer: prioritizer, - pods: pods, - random: random, + predicates: predicates, + prioritizers: prioritizers, + pods: pods, + random: random, } } diff --git a/pkg/scheduler/generic_scheduler_test.go b/pkg/scheduler/generic_scheduler_test.go index c5cbbcdca33..977c28c576b 100644 --- a/pkg/scheduler/generic_scheduler_test.go +++ b/pkg/scheduler/generic_scheduler_test.go @@ -18,6 +18,7 @@ package scheduler import ( "fmt" + "math" "math/rand" "strconv" "testing" @@ -59,6 +60,29 @@ func numericPriority(pod api.Pod, podLister PodLister, minionLister MinionLister return result, nil } +func reverseNumericPriority(pod api.Pod, podLister PodLister, minionLister MinionLister) (HostPriorityList, error) { + var maxScore float64 + minScore := math.MaxFloat64 + reverseResult := []HostPriority{} + result, err := numericPriority(pod, podLister, minionLister) + if err != nil { + return nil, err + } + + for _, hostPriority := range result { + maxScore = math.Max(maxScore, float64(hostPriority.score)) + minScore = math.Min(minScore, float64(hostPriority.score)) + } + for _, hostPriority := range result { + reverseResult = append(reverseResult, HostPriority{ + host: hostPriority.host, + score: int(maxScore + minScore - float64(hostPriority.score)), + }) + } + + return reverseResult, nil +} + func makeMinionList(nodeNames []string) api.MinionList { result := api.MinionList{ Items: make([]api.Minion, len(nodeNames)), @@ -81,28 +105,28 @@ func TestSelectHost(t *testing.T) { {host: "machine1.1", score: 1}, {host: "machine2.1", score: 2}, }, - possibleHosts: util.NewStringSet("machine1.1"), + possibleHosts: util.NewStringSet("machine2.1"), expectsErr: false, }, // equal scores { list: []HostPriority{ {host: "machine1.1", score: 1}, - {host: "machine1.2", score: 1}, - {host: "machine1.3", score: 1}, + {host: "machine1.2", score: 2}, + {host: "machine1.3", score: 2}, {host: "machine2.1", score: 2}, }, - possibleHosts: util.NewStringSet("machine1.1", "machine1.2", "machine1.3"), + possibleHosts: util.NewStringSet("machine1.2", "machine1.3", "machine2.1"), expectsErr: false, }, // out of order scores { list: []HostPriority{ - {host: "machine1.1", score: 1}, - {host: "machine1.2", score: 1}, + {host: "machine1.1", score: 3}, + {host: "machine1.2", score: 3}, {host: "machine2.1", score: 2}, - {host: "machine3.1", score: 3}, - {host: "machine1.3", score: 1}, + {host: "machine3.1", score: 1}, + {host: "machine1.3", score: 3}, }, possibleHosts: util.NewStringSet("machine1.1", "machine1.2", "machine1.3"), expectsErr: false, @@ -137,58 +161,73 @@ func TestSelectHost(t *testing.T) { func TestGenericScheduler(t *testing.T) { tests := []struct { + name string predicates []FitPredicate - prioritizer PriorityFunction + prioritizers []PriorityConfig nodes []string pod api.Pod expectedHost string expectsErr bool }{ { - predicates: []FitPredicate{falsePredicate}, - prioritizer: EqualPriority, - nodes: []string{"machine1", "machine2"}, - expectsErr: true, + predicates: []FitPredicate{falsePredicate}, + prioritizers: []PriorityConfig{{Function: EqualPriority, Weight: 1}}, + nodes: []string{"machine1", "machine2"}, + expectsErr: true, + name: "test 1", }, { - predicates: []FitPredicate{truePredicate}, - prioritizer: EqualPriority, - nodes: []string{"machine1", "machine2"}, - // Random choice between both, the rand seeded above with zero, chooses "machine2" - expectedHost: "machine2", + predicates: []FitPredicate{truePredicate}, + prioritizers: []PriorityConfig{{Function: EqualPriority, Weight: 1}}, + nodes: []string{"machine1", "machine2"}, + // Random choice between both, the rand seeded above with zero, chooses "machine1" + expectedHost: "machine1", + name: "test 2", }, { // Fits on a machine where the pod ID matches the machine name predicates: []FitPredicate{matchesPredicate}, - prioritizer: EqualPriority, + prioritizers: []PriorityConfig{{Function: EqualPriority, Weight: 1}}, nodes: []string{"machine1", "machine2"}, pod: api.Pod{ObjectMeta: api.ObjectMeta{Name: "machine2"}}, expectedHost: "machine2", + name: "test 3", }, { predicates: []FitPredicate{truePredicate}, - prioritizer: numericPriority, + prioritizers: []PriorityConfig{{Function: numericPriority, Weight: 1}}, nodes: []string{"3", "2", "1"}, - expectedHost: "1", + expectedHost: "3", + name: "test 4", }, { predicates: []FitPredicate{matchesPredicate}, - prioritizer: numericPriority, + prioritizers: []PriorityConfig{{Function: numericPriority, Weight: 1}}, nodes: []string{"3", "2", "1"}, pod: api.Pod{ObjectMeta: api.ObjectMeta{Name: "2"}}, expectedHost: "2", + name: "test 5", }, { - predicates: []FitPredicate{truePredicate, falsePredicate}, - prioritizer: numericPriority, - nodes: []string{"3", "2", "1"}, - expectsErr: true, + predicates: []FitPredicate{truePredicate}, + prioritizers: []PriorityConfig{{Function: numericPriority, Weight: 1}, {Function: reverseNumericPriority, Weight: 2}}, + nodes: []string{"3", "2", "1"}, + pod: api.Pod{ObjectMeta: api.ObjectMeta{Name: "2"}}, + expectedHost: "1", + name: "test 6", + }, + { + predicates: []FitPredicate{truePredicate, falsePredicate}, + prioritizers: []PriorityConfig{{Function: numericPriority, Weight: 1}}, + nodes: []string{"3", "2", "1"}, + expectsErr: true, + name: "test 7", }, } for _, test := range tests { random := rand.New(rand.NewSource(0)) - scheduler := NewGenericScheduler(test.predicates, test.prioritizer, FakePodLister([]api.Pod{}), random) + scheduler := NewGenericScheduler(test.predicates, test.prioritizers, FakePodLister([]api.Pod{}), random) machine, err := scheduler.Schedule(test.pod, FakeMinionLister(makeMinionList(test.nodes))) if test.expectsErr { if err == nil { @@ -199,7 +238,7 @@ func TestGenericScheduler(t *testing.T) { t.Errorf("Unexpected error: %v", err) } if test.expectedHost != machine { - t.Errorf("Expected: %s, Saw: %s", test.expectedHost, machine) + t.Errorf("Failed : %s, Expected: %s, Saw: %s", test.name, test.expectedHost, machine) } } } diff --git a/pkg/scheduler/priorities.go b/pkg/scheduler/priorities.go index baa65c96b22..34a5bb651e2 100644 --- a/pkg/scheduler/priorities.go +++ b/pkg/scheduler/priorities.go @@ -22,32 +22,44 @@ import ( "github.com/golang/glog" ) -func calculatePercentage(requested, capacity int) int { +// the unused capacity is calculated on a scale of 0-10 +// 0 being the lowest priority and 10 being the highest +func calculateScore(requested, capacity int, node string) int { if capacity == 0 { return 0 } - return (requested * 100) / capacity + if requested > capacity { + glog.Errorf("Combined requested resources from existing pods exceeds capacity on minion: %s", node) + return 0 + } + return ((capacity - requested) * 10) / capacity } // Calculate the occupancy on a node. 'node' has information about the resources on the node. // 'pods' is a list of pods currently scheduled on the node. -func calculateOccupancy(node api.Minion, pods []api.Pod) HostPriority { +func calculateOccupancy(pod api.Pod, node api.Minion, pods []api.Pod) HostPriority { totalCPU := 0 totalMemory := 0 - for _, pod := range pods { - for _, container := range pod.Spec.Containers { + for _, existingPod := range pods { + for _, container := range existingPod.Spec.Containers { totalCPU += container.CPU totalMemory += container.Memory } } + // Add the resources requested by the current pod being scheduled. + // This also helps differentiate between differently sized, but empty, minions. + for _, container := range pod.Spec.Containers { + totalCPU += container.CPU + totalMemory += container.Memory + } - percentageCPU := calculatePercentage(totalCPU, resources.GetIntegerResource(node.Spec.Capacity, resources.CPU, 0)) - percentageMemory := calculatePercentage(totalMemory, resources.GetIntegerResource(node.Spec.Capacity, resources.Memory, 0)) - glog.V(4).Infof("Least Requested Priority, AbsoluteRequested: (%d, %d) Percentage:(%d\\%m, %d\\%)", totalCPU, totalMemory, percentageCPU, percentageMemory) + cpuScore := calculateScore(totalCPU, resources.GetIntegerResource(node.Spec.Capacity, resources.CPU, 0), node.Name) + memoryScore := calculateScore(totalMemory, resources.GetIntegerResource(node.Spec.Capacity, resources.Memory, 0), node.Name) + glog.V(4).Infof("Least Requested Priority, AbsoluteRequested: (%d, %d) Score:(%d, %d)", totalCPU, totalMemory, cpuScore, memoryScore) return HostPriority{ host: node.Name, - score: int((percentageCPU + percentageMemory) / 2), + score: int((cpuScore + memoryScore) / 2), } } @@ -64,7 +76,7 @@ func LeastRequestedPriority(pod api.Pod, podLister PodLister, minionLister Minio list := HostPriorityList{} for _, node := range nodes.Items { - list = append(list, calculateOccupancy(node, podsToMachines[node.Name])) + list = append(list, calculateOccupancy(pod, node, podsToMachines[node.Name])) } return list, nil } diff --git a/pkg/scheduler/priorities_test.go b/pkg/scheduler/priorities_test.go index 2cde2ff5d52..e8ebd7c5e86 100644 --- a/pkg/scheduler/priorities_test.go +++ b/pkg/scheduler/priorities_test.go @@ -52,12 +52,14 @@ func TestLeastRequested(t *testing.T) { machine2Status := api.PodStatus{ Host: "machine2", } + noResources := api.PodSpec{ + Containers: []api.Container{}, + } cpuOnly := api.PodSpec{ Containers: []api.Container{ {CPU: 1000}, {CPU: 2000}, }, - // Host: "machine1", } cpuAndMemory := api.PodSpec{ Containers: []api.Container{ @@ -73,14 +75,55 @@ func TestLeastRequested(t *testing.T) { test string }{ { + /* + Minion1 scores (remaining resources) on 0-10 scale + CPU Score: (4000 - 0) / 4000 = 10 + Memory Score: (10000 - 0) / 10000 = 10 + Minion1 Score: (10 + 10) / 2 = 10 + + Minion2 scores (remaining resources) on 0-10 scale + CPU Score: (4000 - 0) / 4000 = 10 + Memory Score: (10000 - 0) / 10000 = 10 + Minion2 Score: (10 + 10) / 2 = 10 + */ + pod: api.Pod{Spec: noResources}, nodes: []api.Minion{makeMinion("machine1", 4000, 10000), makeMinion("machine2", 4000, 10000)}, - expectedList: []HostPriority{{"machine1", 0}, {"machine2", 0}}, - test: "nothing scheduled", + expectedList: []HostPriority{{"machine1", 10}, {"machine2", 10}}, + test: "nothing scheduled, nothing requested", }, { + /* + Minion1 scores on 0-10 scale + CPU Score: (4000 - 3000) / 4000 = 2.5 + Memory Score: (10000 - 5000) / 10000 = 5 + Minion1 Score: (2.5 + 5) / 2 = 3 + + Minion2 scores on 0-10 scale + CPU Score: (6000 - 3000) / 6000 = 5 + Memory Score: (10000 - 5000) / 10000 = 5 + Minion2 Score: (5 + 5) / 2 = 5 + */ + pod: api.Pod{Spec: cpuAndMemory}, + nodes: []api.Minion{makeMinion("machine1", 4000, 10000), makeMinion("machine2", 6000, 10000)}, + expectedList: []HostPriority{{"machine1", 3}, {"machine2", 5}}, + test: "nothing scheduled, resources requested, differently sized machines", + }, + { + /* + Minion1 scores on 0-10 scale + CPU Score: (4000 - 0) / 4000 = 10 + Memory Score: (10000 - 0) / 10000 = 10 + Minion1 Score: (10 + 10) / 2 = 10 + + Minion2 scores on 0-10 scale + CPU Score: (4000 - 0) / 4000 = 10 + Memory Score: (10000 - 0) / 10000 = 10 + Minion2 Score: (10 + 10) / 2 = 10 + */ + pod: api.Pod{Spec: noResources}, nodes: []api.Minion{makeMinion("machine1", 4000, 10000), makeMinion("machine2", 4000, 10000)}, - expectedList: []HostPriority{{"machine1", 0}, {"machine2", 0}}, - test: "no resources requested", + expectedList: []HostPriority{{"machine1", 10}, {"machine2", 10}}, + test: "no resources requested, pods scheduled", pods: []api.Pod{ {Status: machine1Status, ObjectMeta: api.ObjectMeta{Labels: labels2}}, {Status: machine1Status, ObjectMeta: api.ObjectMeta{Labels: labels1}}, @@ -89,18 +132,96 @@ func TestLeastRequested(t *testing.T) { }, }, { - nodes: []api.Minion{makeMinion("machine1", 4000, 10000), makeMinion("machine2", 4000, 10000)}, - expectedList: []HostPriority{{"machine1", 37 /* int(75% / 2) */}, {"machine2", 62 /* int( 75% + 50% / 2) */}}, - test: "no resources requested", + /* + Minion1 scores on 0-10 scale + CPU Score: (10000 - 6000) / 10000 = 4 + Memory Score: (20000 - 0) / 20000 = 10 + Minion1 Score: (4 + 10) / 2 = 7 + + Minion2 scores on 0-10 scale + CPU Score: (10000 - 6000) / 10000 = 4 + Memory Score: (20000 - 5000) / 20000 = 7.5 + Minion2 Score: (4 + 7.5) / 2 = 5 + */ + pod: api.Pod{Spec: noResources}, + nodes: []api.Minion{makeMinion("machine1", 10000, 20000), makeMinion("machine2", 10000, 20000)}, + expectedList: []HostPriority{{"machine1", 7}, {"machine2", 5}}, + test: "no resources requested, pods scheduled with resources", pods: []api.Pod{ - {Spec: cpuOnly, Status: api.PodStatus{Host: "machine1"}}, - {Spec: cpuAndMemory, Status: api.PodStatus{Host: "machine2"}}, + {Spec: cpuOnly, Status: machine1Status, ObjectMeta: api.ObjectMeta{Labels: labels2}}, + {Spec: cpuOnly, Status: machine1Status, ObjectMeta: api.ObjectMeta{Labels: labels1}}, + {Spec: cpuOnly, Status: machine2Status, ObjectMeta: api.ObjectMeta{Labels: labels1}}, + {Spec: cpuAndMemory, Status: machine2Status, ObjectMeta: api.ObjectMeta{Labels: labels1}}, }, }, { + /* + Minion1 scores on 0-10 scale + CPU Score: (10000 - 6000) / 10000 = 4 + Memory Score: (20000 - 5000) / 20000 = 7.5 + Minion1 Score: (4 + 7.5) / 2 = 5 + + Minion2 scores on 0-10 scale + CPU Score: (10000 - 6000) / 10000 = 4 + Memory Score: (20000 - 10000) / 20000 = 5 + Minion2 Score: (4 + 5) / 2 = 4 + */ + pod: api.Pod{Spec: cpuAndMemory}, + nodes: []api.Minion{makeMinion("machine1", 10000, 20000), makeMinion("machine2", 10000, 20000)}, + expectedList: []HostPriority{{"machine1", 5}, {"machine2", 4}}, + test: "resources requested, pods scheduled with resources", + pods: []api.Pod{ + {Spec: cpuOnly, Status: machine1Status}, + {Spec: cpuAndMemory, Status: machine2Status}, + }, + }, + { + /* + Minion1 scores on 0-10 scale + CPU Score: (10000 - 6000) / 10000 = 4 + Memory Score: (20000 - 5000) / 20000 = 7.5 + Minion1 Score: (4 + 7.5) / 2 = 5 + + Minion2 scores on 0-10 scale + CPU Score: (10000 - 6000) / 10000 = 4 + Memory Score: (50000 - 10000) / 50000 = 8 + Minion2 Score: (4 + 8) / 2 = 6 + */ + pod: api.Pod{Spec: cpuAndMemory}, + nodes: []api.Minion{makeMinion("machine1", 10000, 20000), makeMinion("machine2", 10000, 50000)}, + expectedList: []HostPriority{{"machine1", 5}, {"machine2", 6}}, + test: "resources requested, pods scheduled with resources, differently sized machines", + pods: []api.Pod{ + {Spec: cpuOnly, Status: machine1Status}, + {Spec: cpuAndMemory, Status: machine2Status}, + }, + }, + { + /* + Minion1 scores on 0-10 scale + CPU Score: (4000 - 6000) / 4000 = 0 + Memory Score: (10000 - 0) / 10000 = 10 + Minion1 Score: (0 + 10) / 2 = 5 + + Minion2 scores on 0-10 scale + CPU Score: (4000 - 6000) / 4000 = 0 + Memory Score: (10000 - 5000) / 10000 = 5 + Minion2 Score: (0 + 5) / 2 = 2 + */ + pod: api.Pod{Spec: cpuOnly}, + nodes: []api.Minion{makeMinion("machine1", 4000, 10000), makeMinion("machine2", 4000, 10000)}, + expectedList: []HostPriority{{"machine1", 5}, {"machine2", 2}}, + test: "requested resources exceed minion capacity", + pods: []api.Pod{ + {Spec: cpuOnly, Status: machine1Status}, + {Spec: cpuAndMemory, Status: machine2Status}, + }, + }, + { + pod: api.Pod{Spec: noResources}, nodes: []api.Minion{makeMinion("machine1", 0, 0), makeMinion("machine2", 0, 0)}, expectedList: []HostPriority{{"machine1", 0}, {"machine2", 0}}, - test: "zero minion resources", + test: "zero minion resources, pods scheduled with resources", pods: []api.Pod{ {Spec: cpuOnly}, {Spec: cpuAndMemory}, diff --git a/pkg/scheduler/spreading.go b/pkg/scheduler/spreading.go index 751d7f45195..24e9a8278a1 100644 --- a/pkg/scheduler/spreading.go +++ b/pkg/scheduler/spreading.go @@ -37,18 +37,31 @@ func CalculateSpreadPriority(pod api.Pod, podLister PodLister, minionLister Mini return nil, err } + var maxCount int + var fScore float32 = 10.0 counts := map[string]int{} - for _, pod := range pods { - counts[pod.Status.Host]++ + if len(pods) > 0 { + for _, pod := range pods { + counts[pod.Status.Host]++ + // Compute the maximum number of pods hosted on any minion + if counts[pod.Status.Host] > maxCount { + maxCount = counts[pod.Status.Host] + } + } } result := []HostPriority{} + //score int - scale of 0-10 + // 0 being the lowest priority and 10 being the highest for _, minion := range minions.Items { - result = append(result, HostPriority{host: minion.Name, score: counts[minion.Name]}) + if maxCount > 0 { + fScore = 10 * (float32(maxCount-counts[minion.Name]) / float32(maxCount)) + } + result = append(result, HostPriority{host: minion.Name, score: int(fScore)}) } return result, nil } func NewSpreadingScheduler(podLister PodLister, minionLister MinionLister, predicates []FitPredicate, random *rand.Rand) Scheduler { - return NewGenericScheduler(predicates, CalculateSpreadPriority, podLister, random) + return NewGenericScheduler(predicates, []PriorityConfig{{Function: CalculateSpreadPriority, Weight: 1}}, podLister, random) } diff --git a/pkg/scheduler/spreading_test.go b/pkg/scheduler/spreading_test.go index 4301fd40f7b..0e9c9093464 100644 --- a/pkg/scheduler/spreading_test.go +++ b/pkg/scheduler/spreading_test.go @@ -47,21 +47,21 @@ func TestSpreadPriority(t *testing.T) { }{ { nodes: []string{"machine1", "machine2"}, - expectedList: []HostPriority{{"machine1", 0}, {"machine2", 0}}, + expectedList: []HostPriority{{"machine1", 10}, {"machine2", 10}}, test: "nothing scheduled", }, { pod: api.Pod{ObjectMeta: api.ObjectMeta{Labels: labels1}}, pods: []api.Pod{{Status: machine1Status}}, nodes: []string{"machine1", "machine2"}, - expectedList: []HostPriority{{"machine1", 0}, {"machine2", 0}}, + expectedList: []HostPriority{{"machine1", 10}, {"machine2", 10}}, test: "no labels", }, { pod: api.Pod{ObjectMeta: api.ObjectMeta{Labels: labels1}}, pods: []api.Pod{{Status: machine1Status, ObjectMeta: api.ObjectMeta{Labels: labels2}}}, nodes: []string{"machine1", "machine2"}, - expectedList: []HostPriority{{"machine1", 0}, {"machine2", 0}}, + expectedList: []HostPriority{{"machine1", 10}, {"machine2", 10}}, test: "different labels", }, { @@ -71,7 +71,7 @@ func TestSpreadPriority(t *testing.T) { {Status: machine2Status, ObjectMeta: api.ObjectMeta{Labels: labels1}}, }, nodes: []string{"machine1", "machine2"}, - expectedList: []HostPriority{{"machine1", 0}, {"machine2", 1}}, + expectedList: []HostPriority{{"machine1", 10}, {"machine2", 0}}, test: "one label match", }, { @@ -82,7 +82,7 @@ func TestSpreadPriority(t *testing.T) { {Status: machine2Status, ObjectMeta: api.ObjectMeta{Labels: labels1}}, }, nodes: []string{"machine1", "machine2"}, - expectedList: []HostPriority{{"machine1", 1}, {"machine2", 1}}, + expectedList: []HostPriority{{"machine1", 0}, {"machine2", 0}}, test: "two label matches on different machines", }, { @@ -94,7 +94,7 @@ func TestSpreadPriority(t *testing.T) { {Status: machine2Status, ObjectMeta: api.ObjectMeta{Labels: labels1}}, }, nodes: []string{"machine1", "machine2"}, - expectedList: []HostPriority{{"machine1", 1}, {"machine2", 2}}, + expectedList: []HostPriority{{"machine1", 5}, {"machine2", 0}}, test: "three label matches", }, } diff --git a/pkg/scheduler/types.go b/pkg/scheduler/types.go index cc899c03c97..471529b491d 100644 --- a/pkg/scheduler/types.go +++ b/pkg/scheduler/types.go @@ -47,3 +47,8 @@ func (h HostPriorityList) Swap(i, j int) { } type PriorityFunction func(pod api.Pod, podLister PodLister, minionLister MinionLister) (HostPriorityList, error) + +type PriorityConfig struct { + Function PriorityFunction + Weight int +} diff --git a/pkg/standalone/standalone.go b/pkg/standalone/standalone.go index abf23b770cd..377162c87e1 100644 --- a/pkg/standalone/standalone.go +++ b/pkg/standalone/standalone.go @@ -104,8 +104,11 @@ func RunApiServer(cl *client.Client, etcdClient tools.EtcdClient, addr string, p // RunScheduler starts up a scheduler in it's own goroutine func RunScheduler(cl *client.Client) { // Scheduler - schedulerConfigFactory := &factory.ConfigFactory{cl} - schedulerConfig := schedulerConfigFactory.Create() + schedulerConfigFactory := factory.NewConfigFactory(cl) + schedulerConfig, err := schedulerConfigFactory.Create(nil, nil) + if err != nil { + glog.Fatal("Couldn't create scheduler config: %v", err) + } scheduler.New(schedulerConfig).Run() } diff --git a/plugin/cmd/kube-scheduler/scheduler.go b/plugin/cmd/kube-scheduler/scheduler.go index 3b5541af0be..75af1276ea4 100644 --- a/plugin/cmd/kube-scheduler/scheduler.go +++ b/plugin/cmd/kube-scheduler/scheduler.go @@ -60,8 +60,11 @@ func main() { go http.ListenAndServe(net.JoinHostPort(address.String(), strconv.Itoa(*port)), nil) - configFactory := &factory.ConfigFactory{Client: kubeClient} - config := configFactory.Create() + configFactory := factory.NewConfigFactory(kubeClient) + config, err := configFactory.Create(nil, nil) + if err != nil { + glog.Fatalf("Failed to create scheduler configuration: %v", err) + } s := scheduler.New(config) s.Run() diff --git a/plugin/pkg/scheduler/factory/factory.go b/plugin/pkg/scheduler/factory/factory.go index b827edc4148..6fd88fbf2dd 100644 --- a/plugin/pkg/scheduler/factory/factory.go +++ b/plugin/pkg/scheduler/factory/factory.go @@ -40,46 +40,77 @@ import ( // ConfigFactory knows how to fill out a scheduler config with its support functions. type ConfigFactory struct { Client *client.Client + // queue for pods that need scheduling + PodQueue *cache.FIFO + // a means to list all scheduled pods + PodLister *storeToPodLister + // a means to list all minions + MinionLister *storeToMinionLister + // map of strings to predicate functions to be used + // to filter the minions for scheduling pods + PredicateMap map[string]algorithm.FitPredicate + // map of strings to priority config to be used + // to prioritize the filtered minions for scheduling pods + PriorityMap map[string]algorithm.PriorityConfig +} + +// NewConfigFactory initializes the factory. +func NewConfigFactory(client *client.Client) *ConfigFactory { + // initialize the factory struct + factory := &ConfigFactory{ + Client: client, + PodQueue: cache.NewFIFO(), + PodLister: &storeToPodLister{cache.NewStore()}, + MinionLister: &storeToMinionLister{cache.NewStore()}, + PredicateMap: make(map[string]algorithm.FitPredicate), + PriorityMap: make(map[string]algorithm.PriorityConfig), + } + + factory.addDefaultPredicates() + factory.addDefaultPriorities() + + return factory } // Create creates a scheduler and all support functions. -func (factory *ConfigFactory) Create() *scheduler.Config { +func (factory *ConfigFactory) Create(predicateKeys, priorityKeys []string) (*scheduler.Config, error) { + if predicateKeys == nil { + glog.V(2).Infof("Custom predicates list not provided, using default predicates") + predicateKeys = []string{"PodFitsPorts", "PodFitsResources", "NoDiskConflict", "MatchNodeSelector"} + } + predicateFuncs, err := factory.getPredicateFunctions(predicateKeys) + if err != nil { + return nil, err + } + + if priorityKeys == nil { + glog.V(2).Infof("Custom priority list not provided, using default priority: LeastRequestedPriority") + priorityKeys = []string{"LeastRequestedPriority"} + } + priorityConfigs, err := factory.getPriorityConfigs(priorityKeys) + if err != nil { + return nil, err + } + // Watch and queue pods that need scheduling. - podQueue := cache.NewFIFO() - cache.NewReflector(factory.createUnassignedPodLW(), &api.Pod{}, podQueue).Run() + cache.NewReflector(factory.createUnassignedPodLW(), &api.Pod{}, factory.PodQueue).Run() // Watch and cache all running pods. Scheduler needs to find all pods // so it knows where it's safe to place a pod. Cache this locally. - podCache := cache.NewStore() - cache.NewReflector(factory.createAssignedPodLW(), &api.Pod{}, podCache).Run() + cache.NewReflector(factory.createAssignedPodLW(), &api.Pod{}, factory.PodLister.Store).Run() // Watch minions. // Minions may be listed frequently, so provide a local up-to-date cache. - minionCache := cache.NewStore() if false { // Disable this code until minions support watches. - cache.NewReflector(factory.createMinionLW(), &api.Minion{}, minionCache).Run() + cache.NewReflector(factory.createMinionLW(), &api.Minion{}, factory.MinionLister.Store).Run() } else { - cache.NewPoller(factory.pollMinions, 10*time.Second, minionCache).Run() + cache.NewPoller(factory.pollMinions, 10*time.Second, factory.MinionLister.Store).Run() } r := rand.New(rand.NewSource(time.Now().UnixNano())) - minionLister := &storeToMinionLister{minionCache} - algo := algorithm.NewGenericScheduler( - []algorithm.FitPredicate{ - // Fit is defined based on the absence of port conflicts. - algorithm.PodFitsPorts, - // Fit is determined by resource availability - algorithm.NewResourceFitPredicate(minionLister), - // Fit is determined by non-conflicting disk volumes - algorithm.NoDiskConflict, - // Fit is determined by node selector query - algorithm.NewSelectorMatchPredicate(minionLister), - }, - // Prioritize nodes by least requested utilization. - algorithm.LeastRequestedPriority, - &storeToPodLister{podCache}, r) + algo := algorithm.NewGenericScheduler(predicateFuncs, priorityConfigs, factory.PodLister, r) podBackoff := podBackoff{ perPodBackoff: map[string]*backoffEntry{}, @@ -87,19 +118,77 @@ func (factory *ConfigFactory) Create() *scheduler.Config { } return &scheduler.Config{ - MinionLister: minionLister, + MinionLister: factory.MinionLister, Algorithm: algo, Binder: &binder{factory.Client}, NextPod: func() *api.Pod { - pod := podQueue.Pop().(*api.Pod) - glog.V(2).Infof("About to try and schedule pod %v\n"+ - "\tknown minions: %v\n"+ - "\tknown scheduled pods: %v\n", - pod.Name, minionCache.ContainedIDs(), podCache.ContainedIDs()) + pod := factory.PodQueue.Pop().(*api.Pod) + glog.V(2).Infof("glog.v2 --> About to try and schedule pod %v", pod.Name) return pod }, - Error: factory.makeDefaultErrorFunc(&podBackoff, podQueue), + Error: factory.makeDefaultErrorFunc(&podBackoff, factory.PodQueue), + }, nil +} + +func (factory *ConfigFactory) getPredicateFunctions(keys []string) ([]algorithm.FitPredicate, error) { + predicates := []algorithm.FitPredicate{} + for _, key := range keys { + function, ok := factory.PredicateMap[key] + if !ok { + return nil, fmt.Errorf("Invalid predicate key %s specified - no corresponding function found", key) + } + predicates = append(predicates, function) } + return predicates, nil +} + +func (factory *ConfigFactory) getPriorityConfigs(keys []string) ([]algorithm.PriorityConfig, error) { + configs := []algorithm.PriorityConfig{} + for _, key := range keys { + config, ok := factory.PriorityMap[key] + if !ok { + return nil, fmt.Errorf("Invalid priority key %s specified - no corresponding function found", key) + } + configs = append(configs, config) + } + return configs, nil +} + +func (factory *ConfigFactory) addDefaultPredicates() { + // Fit is defined based on the absence of port conflicts. + factory.AddPredicate("PodFitsPorts", algorithm.PodFitsPorts) + // Fit is determined by resource availability + factory.AddPredicate("PodFitsResources", algorithm.NewResourceFitPredicate(factory.MinionLister)) + // Fit is determined by non-conflicting disk volumes + factory.AddPredicate("NoDiskConflict", algorithm.NoDiskConflict) + // Fit is determined by node selector query + factory.AddPredicate("MatchNodeSelector", algorithm.NewSelectorMatchPredicate(factory.MinionLister)) +} + +func (factory *ConfigFactory) AddPredicate(key string, function algorithm.FitPredicate) { + factory.PredicateMap[key] = function +} + +func (factory *ConfigFactory) addDefaultPriorities() { + // Prioritize nodes by least requested utilization. + factory.AddPriority("LeastRequestedPriority", algorithm.LeastRequestedPriority, 1) + // spreads pods by minimizing the number of pods on the same minion with the same labels. + factory.AddPriority("SpreadingPriority", algorithm.CalculateSpreadPriority, 1) + // EqualPriority is a prioritizer function that gives an equal weight of one to all minions + factory.AddPriority("EqualPriority", algorithm.EqualPriority, 0) +} + +func (factory *ConfigFactory) AddPriority(key string, function algorithm.PriorityFunction, weight int) { + factory.PriorityMap[key] = algorithm.PriorityConfig{Function: function, Weight: weight} +} + +func (factory *ConfigFactory) SetWeight(key string, weight int) { + config, ok := factory.PriorityMap[key] + if !ok { + glog.Errorf("Invalid priority key %s specified - no corresponding function found", key) + return + } + config.Weight = weight } type listWatch struct { diff --git a/plugin/pkg/scheduler/factory/factory_test.go b/plugin/pkg/scheduler/factory/factory_test.go index d1bb337d6ca..6c7d504c495 100644 --- a/plugin/pkg/scheduler/factory/factory_test.go +++ b/plugin/pkg/scheduler/factory/factory_test.go @@ -42,12 +42,12 @@ func TestCreate(t *testing.T) { server := httptest.NewServer(&handler) defer server.Close() client := client.NewOrDie(&client.Config{Host: server.URL, Version: testapi.Version()}) - factory := ConfigFactory{client} - factory.Create() + factory := NewConfigFactory(client) + factory.Create(nil, nil) } func TestCreateLists(t *testing.T) { - factory := ConfigFactory{nil} + factory := NewConfigFactory(nil) table := []struct { location string factory func() *listWatch @@ -85,7 +85,7 @@ func TestCreateLists(t *testing.T) { } func TestCreateWatches(t *testing.T) { - factory := ConfigFactory{nil} + factory := NewConfigFactory(nil) table := []struct { rv string location string @@ -136,6 +136,7 @@ func TestCreateWatches(t *testing.T) { server := httptest.NewServer(&handler) defer server.Close() factory.Client = client.NewOrDie(&client.Config{Host: server.URL, Version: testapi.Version()}) + // This test merely tests that the correct request is made. item.factory().Watch(item.rv) handler.ValidateRequest(t, item.location, "GET", nil) @@ -167,7 +168,7 @@ func TestPollMinions(t *testing.T) { server := httptest.NewServer(mux) defer server.Close() client := client.NewOrDie(&client.Config{Host: server.URL, Version: testapi.Version()}) - cf := ConfigFactory{client} + cf := NewConfigFactory(client) ce, err := cf.pollMinions() if err != nil { @@ -194,7 +195,7 @@ func TestDefaultErrorFunc(t *testing.T) { mux.Handle("/api/"+testapi.Version()+"/pods/foo", &handler) server := httptest.NewServer(mux) defer server.Close() - factory := ConfigFactory{client.NewOrDie(&client.Config{Host: server.URL, Version: testapi.Version()})} + factory := NewConfigFactory(client.NewOrDie(&client.Config{Host: server.URL, Version: testapi.Version()})) queue := cache.NewFIFO() podBackoff := podBackoff{ perPodBackoff: map[string]*backoffEntry{},