Fix up usage and tests, split into multiple files.

Doing this in multiple commits in an attempt to preserve the file movement history.
This commit is contained in:
Daniel Smith 2014-06-28 15:35:51 -07:00
parent 21e63cf75a
commit 0760e9bc2c
12 changed files with 444 additions and 192 deletions

View File

@ -25,6 +25,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry"
"github.com/GoogleCloudPlatform/kubernetes/pkg/scheduler"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/coreos/go-etcd/etcd"
"github.com/golang/glog"
@ -37,6 +38,7 @@ type Master struct {
serviceRegistry registry.ServiceRegistry
minionRegistry registry.MinionRegistry
// TODO: don't reuse non-threadsafe objects.
random *rand.Rand
storage map[string]apiserver.RESTStorage
}
@ -86,8 +88,9 @@ func (m *Master) init(cloud cloudprovider.Interface) {
m.random = rand.New(rand.NewSource(int64(time.Now().Nanosecond())))
podCache := NewPodCache(containerInfo, m.podRegistry, time.Second*30)
go podCache.Loop()
s := scheduler.MakeFirstFitScheduler(m.podRegistry, m.random)
m.storage = map[string]apiserver.RESTStorage{
"pods": registry.MakePodRegistryStorage(m.podRegistry, containerInfo, registry.MakeFirstFitScheduler(m.minionRegistry, m.podRegistry, m.random), cloud, podCache),
"pods": registry.MakePodRegistryStorage(m.podRegistry, containerInfo, s, m.minionRegistry, cloud, podCache),
"replicationControllers": registry.MakeControllerRegistryStorage(m.controllerRegistry, m.podRegistry),
"services": registry.MakeServiceRegistryStorage(m.serviceRegistry, cloud, m.minionRegistry),
"minions": registry.MakeMinionRegistryStorage(m.minionRegistry),

View File

@ -25,6 +25,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/scheduler"
"github.com/golang/glog"
)
@ -33,22 +34,30 @@ type PodRegistryStorage struct {
registry PodRegistry
containerInfo client.ContainerInfo
podCache client.ContainerInfo
scheduler Scheduler
scheduler scheduler.Scheduler
minionLister scheduler.MinionLister
cloud cloudprovider.Interface
}
// MakePodRegistryStorage makes a RESTStorage object for a pod registry.
// Parameters:
// registry The pod registry
// containerInfo Source of fresh container info
// scheduler The scheduler for assigning pods to machines
// cloud Interface to a cloud provider (may be null)
// podCache Source of cached container info
func MakePodRegistryStorage(registry PodRegistry, containerInfo client.ContainerInfo, scheduler Scheduler, cloud cloudprovider.Interface, podCache client.ContainerInfo) apiserver.RESTStorage {
// registry: The pod registry
// containerInfo: Source of fresh container info
// scheduler: The scheduler for assigning pods to machines
// minionLister: Object which can list available minions for the scheduler
// cloud: Interface to a cloud provider (may be null)
// podCache: Source of cached container info
func MakePodRegistryStorage(registry PodRegistry,
containerInfo client.ContainerInfo,
scheduler scheduler.Scheduler,
minionLister scheduler.MinionLister,
cloud cloudprovider.Interface,
podCache client.ContainerInfo) apiserver.RESTStorage {
return &PodRegistryStorage{
registry: registry,
containerInfo: containerInfo,
scheduler: scheduler,
minionLister: minionLister,
cloud: cloud,
podCache: podCache,
}
@ -150,7 +159,7 @@ func (storage *PodRegistryStorage) Create(obj interface{}) (<-chan interface{},
return apiserver.MakeAsync(func() (interface{}, error) {
// TODO(lavalamp): Separate scheduler more cleanly.
machine, err := storage.scheduler.Schedule(pod)
machine, err := storage.scheduler.Schedule(pod, storage.minionLister)
if err != nil {
return nil, err
}

19
pkg/scheduler/doc.go Normal file
View File

@ -0,0 +1,19 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package scheduler contains a generic Scheduler interface and several
// implementations.
package scheduler

86
pkg/scheduler/firstfit.go Normal file
View File

@ -0,0 +1,86 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package scheduler
import (
"fmt"
"math/rand"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
)
type FirstFitScheduler struct {
podLister PodLister
// TODO: *rand.Rand is *not* threadsafe
random *rand.Rand
}
func MakeFirstFitScheduler(podLister PodLister, random *rand.Rand) Scheduler {
return &FirstFitScheduler{
podLister: podLister,
random: random,
}
}
func (s *FirstFitScheduler) containsPort(pod api.Pod, port api.Port) bool {
for _, container := range pod.DesiredState.Manifest.Containers {
for _, podPort := range container.Ports {
if podPort.HostPort == port.HostPort {
return true
}
}
}
return false
}
func (s *FirstFitScheduler) Schedule(pod api.Pod, minionLister MinionLister) (string, error) {
machines, err := minionLister.List()
if err != nil {
return "", err
}
machineToPods := map[string][]api.Pod{}
pods, err := s.podLister.ListPods(labels.Everything())
if err != nil {
return "", err
}
for _, scheduledPod := range pods {
host := scheduledPod.CurrentState.Host
machineToPods[host] = append(machineToPods[host], scheduledPod)
}
var machineOptions []string
for _, machine := range machines {
podFits := true
for _, scheduledPod := range machineToPods[machine] {
for _, container := range pod.DesiredState.Manifest.Containers {
for _, port := range container.Ports {
if s.containsPort(scheduledPod, port) {
podFits = false
}
}
}
}
if podFits {
machineOptions = append(machineOptions, machine)
}
}
if len(machineOptions) == 0 {
return "", fmt.Errorf("failed to find fit for %#v", pod)
} else {
return machineOptions[s.random.Int()%len(machineOptions)], nil
}
}

View File

@ -0,0 +1,78 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package scheduler
import (
"math/rand"
"testing"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
)
func TestFirstFitSchedulerNothingScheduled(t *testing.T) {
fakeRegistry := FakePodLister{}
r := rand.New(rand.NewSource(0))
st := schedulerTester{
t: t,
scheduler: MakeFirstFitScheduler(&fakeRegistry, r),
minionLister: FakeMinionLister{"m1", "m2", "m3"},
}
st.expectSchedule(api.Pod{}, "m3")
}
func TestFirstFitSchedulerFirstScheduled(t *testing.T) {
fakeRegistry := FakePodLister{
makePod("m1", 8080),
}
r := rand.New(rand.NewSource(0))
st := schedulerTester{
t: t,
scheduler: MakeFirstFitScheduler(fakeRegistry, r),
minionLister: FakeMinionLister{"m1", "m2", "m3"},
}
st.expectSchedule(makePod("", 8080), "m3")
}
func TestFirstFitSchedulerFirstScheduledComplicated(t *testing.T) {
fakeRegistry := FakePodLister{
makePod("m1", 80, 8080),
makePod("m2", 8081, 8082, 8083),
makePod("m3", 80, 443, 8085),
}
r := rand.New(rand.NewSource(0))
st := schedulerTester{
t: t,
scheduler: MakeFirstFitScheduler(fakeRegistry, r),
minionLister: FakeMinionLister{"m1", "m2", "m3"},
}
st.expectSchedule(makePod("", 8080, 8081), "m3")
}
func TestFirstFitSchedulerFirstScheduledImpossible(t *testing.T) {
fakeRegistry := FakePodLister{
makePod("m1", 8080),
makePod("m2", 8081),
makePod("m3", 8080),
}
r := rand.New(rand.NewSource(0))
st := schedulerTester{
t: t,
scheduler: MakeFirstFitScheduler(fakeRegistry, r),
minionLister: FakeMinionLister{"m1", "m2", "m3"},
}
st.expectFailure(makePod("", 8080, 8081))
}

52
pkg/scheduler/listers.go Normal file
View File

@ -0,0 +1,52 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package scheduler
import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
)
// Anything that can list minions for a scheduler.
type MinionLister interface {
List() (machines []string, err error)
}
// Make a MinionLister from a []string
type FakeMinionLister []string
// Returns minions as a []string
func (f FakeMinionLister) List() ([]string, error) {
return []string(f), nil
}
// Anything that can list pods for a scheduler
type PodLister interface {
ListPods(labels.Selector) ([]api.Pod, error)
}
// Make a MinionLister from an []api.Pods
type FakePodLister []api.Pod
func (f FakePodLister) ListPods(s labels.Selector) (selected []api.Pod, err error) {
for _, pod := range f {
if s.Matches(labels.Set(pod.Labels)) {
selected = append(selected, pod)
}
}
return selected, nil
}

43
pkg/scheduler/random.go Normal file
View File

@ -0,0 +1,43 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package scheduler
import (
"math/rand"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
)
// RandomScheduler choses machines uniformly at random.
type RandomScheduler struct {
// TODO: rand.Rand is *NOT* thread safe.
random *rand.Rand
}
func MakeRandomScheduler(random *rand.Rand) Scheduler {
return &RandomScheduler{
random: random,
}
}
func (s *RandomScheduler) Schedule(pod api.Pod, minionLister MinionLister) (string, error) {
machines, err := minionLister.List()
if err != nil {
return "", err
}
return machines[s.random.Int()%len(machines)], nil
}

View File

@ -0,0 +1,34 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package scheduler
import (
"math/rand"
"testing"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
)
func TestRandomScheduler(t *testing.T) {
random := rand.New(rand.NewSource(0))
st := schedulerTester{
t: t,
scheduler: MakeRandomScheduler(random),
minionLister: FakeMinionLister{"m1", "m2", "m3", "m4"},
}
st.expectSuccess(api.Pod{})
}

View File

@ -0,0 +1,42 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package scheduler
import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
)
// RoundRobinScheduler chooses machines in order.
type RoundRobinScheduler struct {
currentIndex int
}
func MakeRoundRobinScheduler() Scheduler {
return &RoundRobinScheduler{
currentIndex: -1,
}
}
func (s *RoundRobinScheduler) Schedule(pod api.Pod, minionLister MinionLister) (string, error) {
machines, err := minionLister.List()
if err != nil {
return "", err
}
s.currentIndex = (s.currentIndex + 1) % len(machines)
result := machines[s.currentIndex]
return result, nil
}

View File

@ -0,0 +1,35 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package scheduler
import (
"testing"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
)
func TestRoundRobinScheduler(t *testing.T) {
st := schedulerTester{
t: t,
scheduler: MakeRoundRobinScheduler(),
minionLister: FakeMinionLister{"m1", "m2", "m3", "m4"},
}
st.expectSchedule(api.Pod{}, "m1")
st.expectSchedule(api.Pod{}, "m2")
st.expectSchedule(api.Pod{}, "m3")
st.expectSchedule(api.Pod{}, "m4")
}

View File

@ -14,129 +14,14 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package registry
package scheduler
import (
"fmt"
"math/rand"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
)
// Anything that can list minions for a scheduler.
type MinionLister interface {
List() (machines []string, err error)
}
// Make a MinionLister from a []string
type StringMinionLister []string
func (s StringMinionLister) List() ([]string, error) {
return []string(s), nil
}
// Scheduler is an interface implemented by things that know how to schedule pods onto machines.
// Scheduler is an interface implemented by things that know how to schedule pods
// onto machines.
type Scheduler interface {
Schedule(api.Pod, MinionLister) (string, error)
}
// RandomScheduler choses machines uniformly at random.
type RandomScheduler struct {
random rand.Rand
}
func MakeRandomScheduler(random rand.Rand) Scheduler {
return &RandomScheduler{
random: random,
}
}
func (s *RandomScheduler) Schedule(pod api.Pod, minionLister MinionLister) (string, error) {
machines, err := minionLister.List()
if err != nil {
return "", err
}
return machines[s.random.Int()%len(machines)], nil
}
// RoundRobinScheduler chooses machines in order.
type RoundRobinScheduler struct {
currentIndex int
}
func MakeRoundRobinScheduler() Scheduler {
return &RoundRobinScheduler{
currentIndex: -1,
}
}
func (s *RoundRobinScheduler) Schedule(pod api.Pod, minionLister MinionLister) (string, error) {
machines, err := minionLister.List()
if err != nil {
return "", err
}
s.currentIndex = (s.currentIndex + 1) % len(machines)
result := machines[s.currentIndex]
return result, nil
}
type FirstFitScheduler struct {
registry PodRegistry
random *rand.Rand
}
func MakeFirstFitScheduler(registry PodRegistry, random *rand.Rand) Scheduler {
return &FirstFitScheduler{
registry: registry,
random: random,
}
}
func (s *FirstFitScheduler) containsPort(pod api.Pod, port api.Port) bool {
for _, container := range pod.DesiredState.Manifest.Containers {
for _, podPort := range container.Ports {
if podPort.HostPort == port.HostPort {
return true
}
}
}
return false
}
func (s *FirstFitScheduler) Schedule(pod api.Pod, minionLister MinionLister) (string, error) {
machines, err := minionLister.List()
if err != nil {
return "", err
}
machineToPods := map[string][]api.Pod{}
pods, err := s.registry.ListPods(labels.Everything())
if err != nil {
return "", err
}
for _, scheduledPod := range pods {
host := scheduledPod.CurrentState.Host
machineToPods[host] = append(machineToPods[host], scheduledPod)
}
var machineOptions []string
for _, machine := range machines {
podFits := true
for _, scheduledPod := range machineToPods[machine] {
for _, container := range pod.DesiredState.Manifest.Containers {
for _, port := range container.Ports {
if s.containsPort(scheduledPod, port) {
podFits = false
}
}
}
}
if podFits {
machineOptions = append(machineOptions, machine)
}
}
if len(machineOptions) == 0 {
return "", fmt.Errorf("failed to find fit for %#v", pod)
} else {
return machineOptions[s.random.Int()%len(machineOptions)], nil
}
Schedule(api.Pod, MinionLister) (selectedMachine string, err error)
}

View File

@ -14,43 +14,49 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package registry
package scheduler
import (
"math/rand"
"testing"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
)
func expectSchedule(scheduler Scheduler, pod api.Pod, expected string, t *testing.T) {
actual, err := scheduler.Schedule(pod)
expectNoError(t, err)
// Some functions used by multiple scheduler tests.
type schedulerTester struct {
t *testing.T
scheduler Scheduler
minionLister MinionLister
}
// Call if you know exactly where pod should get scheduled.
func (st *schedulerTester) expectSchedule(pod api.Pod, expected string) {
actual, err := st.scheduler.Schedule(pod, st.minionLister)
if err != nil {
st.t.Errorf("Unexpected error %v\nTried to scheduler: %#v", err, pod)
return
}
if actual != expected {
t.Errorf("Unexpected scheduling value: %v, expected %v", actual, expected)
st.t.Errorf("Unexpected scheduling value: %v, expected %v", actual, expected)
}
}
func TestRoundRobinScheduler(t *testing.T) {
scheduler := MakeRoundRobinScheduler(MakeMinionRegistry([]string{"m1", "m2", "m3", "m4"}))
expectSchedule(scheduler, api.Pod{}, "m1", t)
expectSchedule(scheduler, api.Pod{}, "m2", t)
expectSchedule(scheduler, api.Pod{}, "m3", t)
expectSchedule(scheduler, api.Pod{}, "m4", t)
// Call if you can't predict where pod will be scheduled.
func (st *schedulerTester) expectSuccess(pod api.Pod) {
_, err := st.scheduler.Schedule(pod, st.minionLister)
if err != nil {
st.t.Errorf("Unexpected error %v\nTried to scheduler: %#v", err, pod)
return
}
}
func TestRandomScheduler(t *testing.T) {
random := rand.New(rand.NewSource(0))
scheduler := MakeRandomScheduler(MakeMinionRegistry([]string{"m1", "m2", "m3", "m4"}), *random)
_, err := scheduler.Schedule(api.Pod{})
expectNoError(t, err)
}
func TestFirstFitSchedulerNothingScheduled(t *testing.T) {
mockRegistry := MockPodRegistry{}
r := rand.New(rand.NewSource(0))
scheduler := MakeFirstFitScheduler(MakeMinionRegistry([]string{"m1", "m2", "m3"}), &mockRegistry, r)
expectSchedule(scheduler, api.Pod{}, "m3", t)
// Call if pod should *not* schedule.
func (st *schedulerTester) expectFailure(pod api.Pod) {
_, err := st.scheduler.Schedule(pod, st.minionLister)
if err == nil {
st.t.Error("Unexpected non-error")
}
}
func makePod(host string, hostPorts ...int) api.Pod {
@ -73,43 +79,3 @@ func makePod(host string, hostPorts ...int) api.Pod {
},
}
}
func TestFirstFitSchedulerFirstScheduled(t *testing.T) {
mockRegistry := MockPodRegistry{
pods: []api.Pod{
makePod("m1", 8080),
},
}
r := rand.New(rand.NewSource(0))
scheduler := MakeFirstFitScheduler(MakeMinionRegistry([]string{"m1", "m2", "m3"}), &mockRegistry, r)
expectSchedule(scheduler, makePod("", 8080), "m3", t)
}
func TestFirstFitSchedulerFirstScheduledComplicated(t *testing.T) {
mockRegistry := MockPodRegistry{
pods: []api.Pod{
makePod("m1", 80, 8080),
makePod("m2", 8081, 8082, 8083),
makePod("m3", 80, 443, 8085),
},
}
r := rand.New(rand.NewSource(0))
scheduler := MakeFirstFitScheduler(MakeMinionRegistry([]string{"m1", "m2", "m3"}), &mockRegistry, r)
expectSchedule(scheduler, makePod("", 8080, 8081), "m3", t)
}
func TestFirstFitSchedulerFirstScheduledImpossible(t *testing.T) {
mockRegistry := MockPodRegistry{
pods: []api.Pod{
makePod("m1", 8080),
makePod("m2", 8081),
makePod("m3", 8080),
},
}
r := rand.New(rand.NewSource(0))
scheduler := MakeFirstFitScheduler(MakeMinionRegistry([]string{"m1", "m2", "m3"}), &mockRegistry, r)
_, err := scheduler.Schedule(makePod("", 8080, 8081))
if err == nil {
t.Error("Unexpected non-error.")
}
}