diff --git a/pkg/scheduler/generic_scheduler.go b/pkg/scheduler/generic_scheduler.go new file mode 100644 index 00000000000..cecc130a76f --- /dev/null +++ b/pkg/scheduler/generic_scheduler.go @@ -0,0 +1,110 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scheduler + +import ( + "fmt" + "math/rand" + "sort" + "sync" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" +) + +type genericScheduler struct { + predicates []FitPredicate + prioritizer PriorityFunction + pods PodLister + random *rand.Rand + randomLock sync.Mutex +} + +func (g *genericScheduler) Schedule(pod api.Pod, minionLister MinionLister) (string, error) { + minions, err := minionLister.List() + if err != nil { + return "", err + } + filteredNodes, err := findNodesThatFit(pod, g.pods, g.predicates, minions) + if err != nil { + return "", err + } + priorityList, err := g.prioritizer(pod, g.pods, FakeMinionLister(filteredNodes)) + if err != nil { + return "", err + } + if len(priorityList) == 0 { + return "", fmt.Errorf("failed to find a fit for pod: %v", pod) + } + return g.selectHost(priorityList) +} + +func (g *genericScheduler) selectHost(priorityList HostPriorityList) (string, error) { + sort.Sort(priorityList) + + hosts := getMinHosts(priorityList) + g.randomLock.Lock() + defer g.randomLock.Unlock() + + ix := g.random.Int() % len(hosts) + return hosts[ix], nil +} + +func findNodesThatFit(pod api.Pod, podLister PodLister, predicates []FitPredicate, nodes []string) ([]string, error) { + filtered := []string{} + machineToPods, err := MapPodsToMachines(podLister) + if err != nil { + return nil, err + } + for _, node := range nodes { + fits := true + for _, predicate := range predicates { + fit, err := predicate(pod, machineToPods[node], node) + if err != nil { + return nil, err + } + if !fit { + fits = false + break + } + } + if fits { + filtered = append(filtered, node) + } + } + return filtered, nil +} + +func getMinHosts(list HostPriorityList) []string { + result := []string{} + for _, hostEntry := range list { + if hostEntry.score == list[0].score { + result = append(result, hostEntry.host) + } else { + break + } + } + return result +} + +func NewGenericScheduler(predicates []FitPredicate, prioritizer PriorityFunction, pods PodLister, random *rand.Rand) Scheduler { + return &genericScheduler{ + predicates: predicates, + prioritizer: prioritizer, + pods: pods, + random: random, + } +} diff --git a/pkg/scheduler/generic_scheduler_test.go b/pkg/scheduler/generic_scheduler_test.go new file mode 100644 index 00000000000..31ef5730c42 --- /dev/null +++ b/pkg/scheduler/generic_scheduler_test.go @@ -0,0 +1,138 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scheduler + +import ( + "fmt" + "math/rand" + "strconv" + "testing" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" +) + +func matchesPredicate(pod api.Pod, existingPods []api.Pod, node string) (bool, error) { + return pod.ID == node, nil +} + +func evenPriority(pod api.Pod, podLister PodLister, minionLister MinionLister) (HostPriorityList, error) { + nodes, err := minionLister.List() + result := []HostPriority{} + + if err != nil { + fmt.Errorf("failed to list nodes: %v", err) + return []HostPriority{}, err + } + for _, minion := range nodes { + result = append(result, HostPriority{ + host: minion, + score: 1, + }) + } + return result, nil +} + +func numericPriority(pod api.Pod, podLister PodLister, minionLister MinionLister) (HostPriorityList, error) { + nodes, err := minionLister.List() + result := []HostPriority{} + + if err != nil { + fmt.Errorf("failed to list nodes: %v", err) + return nil, err + } + for _, minion := range nodes { + score, err := strconv.Atoi(minion) + if err != nil { + return nil, err + } + result = append(result, HostPriority{ + host: minion, + score: score, + }) + } + return result, nil +} + +func TestGenericScheduler(t *testing.T) { + tests := []struct { + predicates []FitPredicate + prioritizer PriorityFunction + nodes []string + pod api.Pod + expectedHost string + expectsErr bool + }{ + { + predicates: []FitPredicate{falsePredicate}, + prioritizer: evenPriority, + nodes: []string{"machine1", "machine2"}, + expectsErr: true, + }, + { + predicates: []FitPredicate{truePredicate}, + prioritizer: evenPriority, + nodes: []string{"machine1", "machine2"}, + // Random choice between both, the rand seeded above with zero, chooses "machine2" + expectedHost: "machine2", + }, + { + // Fits on a machine where the pod ID matches the machine name + predicates: []FitPredicate{matchesPredicate}, + prioritizer: evenPriority, + nodes: []string{"machine1", "machine2"}, + pod: api.Pod{JSONBase: api.JSONBase{ID: "machine2"}}, + expectedHost: "machine2", + }, + { + predicates: []FitPredicate{truePredicate}, + prioritizer: numericPriority, + nodes: []string{"3", "2", "1"}, + expectedHost: "1", + }, + { + predicates: []FitPredicate{matchesPredicate}, + prioritizer: numericPriority, + nodes: []string{"3", "2", "1"}, + pod: api.Pod{JSONBase: api.JSONBase{ID: "2"}}, + expectedHost: "2", + }, + { + predicates: []FitPredicate{truePredicate, falsePredicate}, + prioritizer: numericPriority, + nodes: []string{"3", "2", "1"}, + expectsErr: true, + }, + } + + for _, test := range tests { + random := rand.New(rand.NewSource(0)) + scheduler := NewGenericScheduler(test.predicates, test.prioritizer, FakePodLister([]api.Pod{}), random) + machine, err := scheduler.Schedule(test.pod, FakeMinionLister(test.nodes)) + if test.expectsErr { + if err == nil { + t.Error("Unexpected non-error") + } + } else { + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + if test.expectedHost != machine { + t.Errorf("Expected: %s, Saw: %s", test.expectedHost, machine) + } + } + } +} diff --git a/pkg/scheduler/randomfit.go b/pkg/scheduler/randomfit.go index 836cbd6c632..7ae9b4fe57d 100644 --- a/pkg/scheduler/randomfit.go +++ b/pkg/scheduler/randomfit.go @@ -25,9 +25,6 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" ) -// FitPredicate is a function that indicates if a pod fits into an existing node. -type FitPredicate func(pod api.Pod, existingPods []api.Pod, node string) (bool, error) - // RandomFitScheduler is a Scheduler which schedules a Pod on a random machine which matches its requirement. type RandomFitScheduler struct { podLister PodLister @@ -78,22 +75,32 @@ func containsPort(pod api.Pod, port api.Port) bool { return false } +// MapPodsToMachines obtains a list of pods and pivots that list into a map where the keys are host names +// and the values are the list of pods running on that host. +func MapPodsToMachines(lister PodLister) (map[string][]api.Pod, error) { + machineToPods := map[string][]api.Pod{} + // TODO: perform more targeted query... + pods, err := lister.ListPods(labels.Everything()) + if err != nil { + return map[string][]api.Pod{}, err + } + for _, scheduledPod := range pods { + host := scheduledPod.CurrentState.Host + machineToPods[host] = append(machineToPods[host], scheduledPod) + } + return machineToPods, nil +} + // Schedule schedules a pod on a random machine which matches its requirement. func (s *RandomFitScheduler) Schedule(pod api.Pod, minionLister MinionLister) (string, error) { machines, err := minionLister.List() if err != nil { return "", err } - machineToPods := map[string][]api.Pod{} - // TODO: perform more targeted query... - pods, err := s.podLister.ListPods(labels.Everything()) + machineToPods, err := MapPodsToMachines(s.podLister) if err != nil { return "", err } - for _, scheduledPod := range pods { - host := scheduledPod.CurrentState.Host - machineToPods[host] = append(machineToPods[host], scheduledPod) - } var machineOptions []string for _, machine := range machines { podFits := true diff --git a/pkg/scheduler/spreading_scheduler.go b/pkg/scheduler/spreading_scheduler.go new file mode 100644 index 00000000000..cabf0702ae4 --- /dev/null +++ b/pkg/scheduler/spreading_scheduler.go @@ -0,0 +1,54 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scheduler + +import ( + "math/rand" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" +) + +// CalculateSpreadPriority spreads pods by minimizing the number of pods on the same machine with the same labels. +// Importantly, if there are services in the system that span multiple heterogenous sets of pods, this spreading priority +// may not provide optimal spreading for the members of that Service. +// TODO: consider if we want to include Service label sets in the scheduling priority. +func CalculateSpreadPriority(pod api.Pod, podLister PodLister, minionLister MinionLister) (HostPriorityList, error) { + pods, err := podLister.ListPods(labels.SelectorFromSet(pod.Labels)) + if err != nil { + return nil, err + } + minions, err := minionLister.List() + if err != nil { + return nil, err + } + + counts := map[string]int{} + for _, pod := range pods { + counts[pod.CurrentState.Host]++ + } + + result := []HostPriority{} + for _, minion := range minions { + result = append(result, HostPriority{host: minion, score: counts[minion]}) + } + return result, nil +} + +func NewSpreadingScheduler(podLister PodLister, minionLister MinionLister, predicates []FitPredicate, random *rand.Rand) Scheduler { + return NewGenericScheduler(predicates, CalculateSpreadPriority, podLister, random) +} diff --git a/pkg/scheduler/spreading_scheduler_test.go b/pkg/scheduler/spreading_scheduler_test.go new file mode 100644 index 00000000000..de715e59374 --- /dev/null +++ b/pkg/scheduler/spreading_scheduler_test.go @@ -0,0 +1,111 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scheduler + +import ( + "reflect" + "testing" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" +) + +func TestSpreadPriority(t *testing.T) { + labels1 := map[string]string{ + "foo": "bar", + "baz": "blah", + } + labels2 := map[string]string{ + "bar": "foo", + "baz": "blah", + } + machine1State := api.PodState{ + Host: "machine1", + } + machine2State := api.PodState{ + Host: "machine2", + } + tests := []struct { + pod api.Pod + pods []api.Pod + nodes []string + expectedList HostPriorityList + test string + }{ + { + nodes: []string{"machine1", "machine2"}, + expectedList: []HostPriority{{"machine1", 0}, {"machine2", 0}}, + test: "nothing scheduled", + }, + { + pod: api.Pod{Labels: labels1}, + pods: []api.Pod{{CurrentState: machine1State}}, + nodes: []string{"machine1", "machine2"}, + expectedList: []HostPriority{{"machine1", 0}, {"machine2", 0}}, + test: "no labels", + }, + { + pod: api.Pod{Labels: labels1}, + pods: []api.Pod{{CurrentState: machine1State, Labels: labels2}}, + nodes: []string{"machine1", "machine2"}, + expectedList: []HostPriority{{"machine1", 0}, {"machine2", 0}}, + test: "different labels", + }, + { + pod: api.Pod{Labels: labels1}, + pods: []api.Pod{ + {CurrentState: machine1State, Labels: labels2}, + {CurrentState: machine2State, Labels: labels1}, + }, + nodes: []string{"machine1", "machine2"}, + expectedList: []HostPriority{{"machine1", 0}, {"machine2", 1}}, + test: "one label match", + }, + { + pod: api.Pod{Labels: labels1}, + pods: []api.Pod{ + {CurrentState: machine1State, Labels: labels2}, + {CurrentState: machine1State, Labels: labels1}, + {CurrentState: machine2State, Labels: labels1}, + }, + nodes: []string{"machine1", "machine2"}, + expectedList: []HostPriority{{"machine1", 1}, {"machine2", 1}}, + test: "two label matches on different machines", + }, + { + pod: api.Pod{Labels: labels1}, + pods: []api.Pod{ + {CurrentState: machine1State, Labels: labels2}, + {CurrentState: machine1State, Labels: labels1}, + {CurrentState: machine2State, Labels: labels1}, + {CurrentState: machine2State, Labels: labels1}, + }, + nodes: []string{"machine1", "machine2"}, + expectedList: []HostPriority{{"machine1", 1}, {"machine2", 2}}, + test: "three label matches", + }, + } + + for _, test := range tests { + list, err := CalculateSpreadPriority(test.pod, FakePodLister(test.pods), FakeMinionLister(test.nodes)) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + if !reflect.DeepEqual(test.expectedList, list) { + t.Errorf("%s: expected %#v, got %#v", test.test, test.expectedList, list) + } + } +} diff --git a/pkg/scheduler/types.go b/pkg/scheduler/types.go new file mode 100644 index 00000000000..cc899c03c97 --- /dev/null +++ b/pkg/scheduler/types.go @@ -0,0 +1,49 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scheduler + +import ( + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" +) + +// FitPredicate is a function that indicates if a pod fits into an existing node. +type FitPredicate func(pod api.Pod, existingPods []api.Pod, node string) (bool, error) + +// HostPriority represents the priority of scheduling to a particular host, lower priority is better. +type HostPriority struct { + host string + score int +} + +type HostPriorityList []HostPriority + +func (h HostPriorityList) Len() int { + return len(h) +} + +func (h HostPriorityList) Less(i, j int) bool { + if h[i].score == h[j].score { + return h[i].host < h[j].host + } + return h[i].score < h[j].score +} + +func (h HostPriorityList) Swap(i, j int) { + h[i], h[j] = h[j], h[i] +} + +type PriorityFunction func(pod api.Pod, podLister PodLister, minionLister MinionLister) (HostPriorityList, error)