From 9ed8486fd7db8f7581b196f5bb95217bec0d5ea1 Mon Sep 17 00:00:00 2001 From: Brendan Burns Date: Wed, 24 Sep 2014 09:32:36 -0700 Subject: [PATCH] Add a generic scheduler. --- pkg/scheduler/generic_scheduler.go | 104 ++++++++++++++ pkg/scheduler/generic_scheduler_test.go | 162 ++++++++++++++++++++++ pkg/scheduler/randomfit.go | 22 ++- pkg/scheduler/spreading_scheduler.go | 28 +--- pkg/scheduler/spreading_scheduler_test.go | 69 +++++++++ pkg/scheduler/types.go | 9 ++ 6 files changed, 363 insertions(+), 31 deletions(-) create mode 100644 pkg/scheduler/generic_scheduler.go create mode 100644 pkg/scheduler/generic_scheduler_test.go create mode 100644 pkg/scheduler/spreading_scheduler_test.go diff --git a/pkg/scheduler/generic_scheduler.go b/pkg/scheduler/generic_scheduler.go new file mode 100644 index 00000000000..d1974ca660b --- /dev/null +++ b/pkg/scheduler/generic_scheduler.go @@ -0,0 +1,104 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scheduler + +import ( + "fmt" + "math/rand" + "sort" + "sync" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" +) + +type genericScheduler struct { + predicates []FitPredicate + prioritizer PriorityFunction + pods PodLister + random *rand.Rand + randomLock sync.Mutex +} + +type listMinionLister struct { + nodes []string +} + +func (l *listMinionLister) List() ([]string, error) { + return l.nodes, nil +} + +func (g *genericScheduler) Schedule(pod api.Pod, minionLister MinionLister) (string, error) { + minions, err := minionLister.List() + if err != nil { + return "", err + } + filtered := []string{} + machineToPods, err := MapPodsToMachines(g.pods) + if err != nil { + return "", err + } + for _, minion := range minions { + fits := true + for _, predicate := range g.predicates { + fit, err := predicate(pod, machineToPods[minion], minion) + if err != nil { + return "", err + } + if !fit { + fits = false + break + } + } + if fits { + filtered = append(filtered, minion) + } + } + priorityList, err := g.prioritizer(pod, g.pods, &listMinionLister{filtered}) + if err != nil { + return "", err + } + if len(priorityList) == 0 { + return "", fmt.Errorf("failed to find a fit for pod: %v", pod) + } + sort.Sort(priorityList) + + hosts := getMinHosts(priorityList) + g.randomLock.Lock() + defer g.randomLock.Unlock() + + ix := g.random.Int() % len(hosts) + return hosts[ix], nil +} + +func getMinHosts(list HostPriorityList) []string { + result := []string{} + for _, hostEntry := range list { + if hostEntry.score == list[0].score { + result = append(result, hostEntry.host) + } + } + return result +} + +func NewGenericScheduler(predicates []FitPredicate, prioritizer PriorityFunction, pods PodLister, random *rand.Rand) Scheduler { + return &genericScheduler{ + predicates: predicates, + prioritizer: prioritizer, + pods: pods, + random: random, + } +} diff --git a/pkg/scheduler/generic_scheduler_test.go b/pkg/scheduler/generic_scheduler_test.go new file mode 100644 index 00000000000..c0cbb8b65e4 --- /dev/null +++ b/pkg/scheduler/generic_scheduler_test.go @@ -0,0 +1,162 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scheduler + +import ( + "fmt" + "math/rand" + "strconv" + "testing" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" +) + +func falsePredicate(pod api.Pod, existingPods []api.Pod, node string) (bool, error) { + return false, nil +} + +func truePredicate(pod api.Pod, existingPods []api.Pod, node string) (bool, error) { + return true, nil +} + +func matchesPredicate(pod api.Pod, existingPods []api.Pod, node string) (bool, error) { + return pod.ID == node, nil +} + +func evenPriority(pod api.Pod, podLister PodLister, minionLister MinionLister) (HostPriorityList, error) { + nodes, err := minionLister.List() + result := []HostPriority{} + + if err != nil { + fmt.Errorf("failed to list nodes: %v", err) + return []HostPriority{}, err + } + for _, minion := range nodes { + result = append(result, HostPriority{ + host: minion, + score: 1, + }) + } + return result, nil +} + +func numericPriority(pod api.Pod, podLister PodLister, minionLister MinionLister) (HostPriorityList, error) { + nodes, err := minionLister.List() + result := []HostPriority{} + + if err != nil { + fmt.Errorf("failed to list nodes: %v", err) + return nil, err + } + for _, minion := range nodes { + score, err := strconv.Atoi(minion) + if err != nil { + return nil, err + } + result = append(result, HostPriority{ + host: minion, + score: score, + }) + } + return result, nil +} + +func TestGenericScheduler(t *testing.T) { + tests := []struct { + predicates []FitPredicate + prioritizer PriorityFunction + nodes []string + existingPods []api.Pod + pod api.Pod + expectedHost string + expectsErr bool + }{ + { + []FitPredicate{falsePredicate}, + evenPriority, + []string{"machine1", "machine2"}, + []api.Pod{}, + api.Pod{}, + "", + true, + }, + { + []FitPredicate{truePredicate}, + evenPriority, + []string{"machine1", "machine2"}, + []api.Pod{}, + api.Pod{}, + // Random choice between both, the rand seeded above with zero, chooses "machine2" + "machine2", + false, + }, + { + []FitPredicate{matchesPredicate}, + evenPriority, + []string{"machine1", "machine2"}, + []api.Pod{}, + api.Pod{JSONBase: api.JSONBase{ID: "machine2"}}, + "machine2", + false, + }, + { + []FitPredicate{truePredicate}, + numericPriority, + []string{"3", "2", "1"}, + []api.Pod{}, + api.Pod{}, + "1", + false, + }, + { + []FitPredicate{matchesPredicate}, + numericPriority, + []string{"3", "2", "1"}, + []api.Pod{}, + api.Pod{JSONBase: api.JSONBase{ID: "2"}}, + "2", + false, + }, + { + []FitPredicate{truePredicate, falsePredicate}, + numericPriority, + []string{"3", "2", "1"}, + []api.Pod{}, + api.Pod{}, + "", + true, + }, + } + + for _, test := range tests { + random := rand.New(rand.NewSource(0)) + scheduler := NewGenericScheduler(test.predicates, test.prioritizer, FakePodLister(test.existingPods), random) + machine, err := scheduler.Schedule(test.pod, &listMinionLister{nodes: test.nodes}) + if test.expectsErr { + if err == nil { + t.Error("Unexpected non-error") + } + } else { + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + if test.expectedHost != machine { + t.Errorf("Expected: %s, Saw: %s", test.expectedHost, machine) + } + } + } +} diff --git a/pkg/scheduler/randomfit.go b/pkg/scheduler/randomfit.go index 836cbd6c632..03c641133a0 100644 --- a/pkg/scheduler/randomfit.go +++ b/pkg/scheduler/randomfit.go @@ -78,22 +78,30 @@ func containsPort(pod api.Pod, port api.Port) bool { return false } +func MapPodsToMachines(lister PodLister) (map[string][]api.Pod, error) { + machineToPods := map[string][]api.Pod{} + // TODO: perform more targeted query... + pods, err := lister.ListPods(labels.Everything()) + if err != nil { + return map[string][]api.Pod{}, err + } + for _, scheduledPod := range pods { + host := scheduledPod.CurrentState.Host + machineToPods[host] = append(machineToPods[host], scheduledPod) + } + return machineToPods, nil +} + // Schedule schedules a pod on a random machine which matches its requirement. func (s *RandomFitScheduler) Schedule(pod api.Pod, minionLister MinionLister) (string, error) { machines, err := minionLister.List() if err != nil { return "", err } - machineToPods := map[string][]api.Pod{} - // TODO: perform more targeted query... - pods, err := s.podLister.ListPods(labels.Everything()) + machineToPods, err := MapPodsToMachines(s.podLister) if err != nil { return "", err } - for _, scheduledPod := range pods { - host := scheduledPod.CurrentState.Host - machineToPods[host] = append(machineToPods[host], scheduledPod) - } var machineOptions []string for _, machine := range machines { podFits := true diff --git a/pkg/scheduler/spreading_scheduler.go b/pkg/scheduler/spreading_scheduler.go index e7f992b8f9a..cabf0702ae4 100644 --- a/pkg/scheduler/spreading_scheduler.go +++ b/pkg/scheduler/spreading_scheduler.go @@ -17,27 +17,18 @@ limitations under the License. package scheduler import ( - "fmt" "math/rand" - "sort" - "sync" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" ) -type SpreadingScheduler struct { - pods PodLister - random *rand.Rand - randomLock sync.Mutex -} - // CalculateSpreadPriority spreads pods by minimizing the number of pods on the same machine with the same labels. // Importantly, if there are services in the system that span multiple heterogenous sets of pods, this spreading priority // may not provide optimal spreading for the members of that Service. // TODO: consider if we want to include Service label sets in the scheduling priority. -func CalculateSpreadPriority(selector labels.Selector, podLister PodLister, minionLister MinionLister) (HostPriorityList, error) { - pods, err := podLister.ListPods(selector) +func CalculateSpreadPriority(pod api.Pod, podLister PodLister, minionLister MinionLister) (HostPriorityList, error) { + pods, err := podLister.ListPods(labels.SelectorFromSet(pod.Labels)) if err != nil { return nil, err } @@ -58,17 +49,6 @@ func CalculateSpreadPriority(selector labels.Selector, podLister PodLister, mini return result, nil } -// Schedule schedules pods to maximize spreading of identical pods across multiple hosts. -// Does not currently take hostPort scheduling into account. -// TODO: combine priority based and fit based schedulers into a single scheduler. -func (s *SpreadingScheduler) Schedule(pod api.Pod, minionLister MinionLister) (string, error) { - priorities, err := CalculateSpreadPriority(labels.SelectorFromSet(pod.Labels), s.pods, minionLister) - if err != nil { - return "", err - } - sort.Sort(priorities) - if len(priorities) == 0 { - return "", fmt.Errorf("failed to find a fit: %v", pod) - } - return priorities[0].host, nil +func NewSpreadingScheduler(podLister PodLister, minionLister MinionLister, predicates []FitPredicate, random *rand.Rand) Scheduler { + return NewGenericScheduler(predicates, CalculateSpreadPriority, podLister, random) } diff --git a/pkg/scheduler/spreading_scheduler_test.go b/pkg/scheduler/spreading_scheduler_test.go new file mode 100644 index 00000000000..59f54c30bc1 --- /dev/null +++ b/pkg/scheduler/spreading_scheduler_test.go @@ -0,0 +1,69 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scheduler + +import ( + "reflect" + "testing" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" +) + +func TestSpreadPriority(t *testing.T) { + labels1 := map[string]string{ + "foo": "bar", + "baz": "blah", + } + labels2 := map[string]string{ + "bar": "foo", + "baz": "blah", + } + machine1State := api.PodState{ + Host: "machine1", + } + machine2State := api.PodState{ + Host: "machine2", + } + tests := []struct { + pod api.Pod + pods []api.Pod + nodes []string + expectErr bool + expectedList HostPriorityList + test string + }{ + {api.Pod{}, []api.Pod{}, []string{"machine1", "machine2"}, false, []HostPriority{{"machine1", 0}, {"machine2", 0}}, "nothing scheduled"}, + {api.Pod{Labels: labels1}, []api.Pod{{CurrentState: machine1State}}, []string{"machine1", "machine2"}, false, []HostPriority{{"machine1", 0}, {"machine2", 0}}, "no labels"}, + {api.Pod{Labels: labels1}, []api.Pod{{CurrentState: machine1State, Labels: labels2}}, []string{"machine1", "machine2"}, false, []HostPriority{{"machine1", 0}, {"machine2", 0}}, "different labels"}, + {api.Pod{Labels: labels1}, []api.Pod{{CurrentState: machine1State, Labels: labels2}, {CurrentState: machine2State, Labels: labels1}}, []string{"machine1", "machine2"}, false, []HostPriority{{"machine1", 0}, {"machine2", 1}}, "one label match"}, + {api.Pod{Labels: labels1}, []api.Pod{{CurrentState: machine1State, Labels: labels2}, {CurrentState: machine1State, Labels: labels1}, {CurrentState: machine2State, Labels: labels1}}, []string{"machine1", "machine2"}, false, []HostPriority{{"machine1", 1}, {"machine2", 1}}, "two label matches on different machines"}, + {api.Pod{Labels: labels1}, []api.Pod{{CurrentState: machine1State, Labels: labels2}, {CurrentState: machine1State, Labels: labels1}, {CurrentState: machine2State, Labels: labels1}, {CurrentState: machine2State, Labels: labels1}}, []string{"machine1", "machine2"}, false, []HostPriority{{"machine1", 1}, {"machine2", 2}}, "three label matches"}, + } + + for _, test := range tests { + list, err := CalculateSpreadPriority(test.pod, FakePodLister(test.pods), &listMinionLister{test.nodes}) + if test.expectErr { + if err == nil { + t.Errorf("%s: unexpected non-error", test.test) + } + } else { + if !reflect.DeepEqual(test.expectedList, list) { + t.Errorf("%s: expected %#v, got %#v", test.test, test.expectedList, list) + } + } + } +} diff --git a/pkg/scheduler/types.go b/pkg/scheduler/types.go index 90d7b642ec8..cc899c03c97 100644 --- a/pkg/scheduler/types.go +++ b/pkg/scheduler/types.go @@ -16,6 +16,13 @@ limitations under the License. package scheduler +import ( + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" +) + +// FitPredicate is a function that indicates if a pod fits into an existing node. +type FitPredicate func(pod api.Pod, existingPods []api.Pod, node string) (bool, error) + // HostPriority represents the priority of scheduling to a particular host, lower priority is better. type HostPriority struct { host string @@ -38,3 +45,5 @@ func (h HostPriorityList) Less(i, j int) bool { func (h HostPriorityList) Swap(i, j int) { h[i], h[j] = h[j], h[i] } + +type PriorityFunction func(pod api.Pod, podLister PodLister, minionLister MinionLister) (HostPriorityList, error)