Implement minion registry. Minions now a first-class object.

This commit is contained in:
Daniel Smith 2014-06-19 18:31:38 -07:00
parent d460c01ade
commit 79ee5aa250
9 changed files with 70 additions and 43 deletions

View File

@ -39,7 +39,7 @@ func main() {
servers := []string{"http://localhost:4001"} servers := []string{"http://localhost:4001"}
log.Printf("Creating etcd client pointing to %v", servers) log.Printf("Creating etcd client pointing to %v", servers)
etcdClient := etcd.NewClient(servers) etcdClient := etcd.NewClient(servers)
machineList := []string{"machine"} machineList := registry.MakeMinionRegistry([]string{"machine"})
reg := registry.MakeEtcdRegistry(etcdClient, machineList) reg := registry.MakeEtcdRegistry(etcdClient, machineList)

View File

@ -34,8 +34,8 @@ type Master struct {
podRegistry registry.PodRegistry podRegistry registry.PodRegistry
controllerRegistry registry.ControllerRegistry controllerRegistry registry.ControllerRegistry
serviceRegistry registry.ServiceRegistry serviceRegistry registry.ServiceRegistry
minionRegistry registry.MinionRegistry
minions []string
random *rand.Rand random *rand.Rand
storage map[string]apiserver.RESTStorage storage map[string]apiserver.RESTStorage
} }
@ -46,37 +46,40 @@ func NewMemoryServer(minions []string, cloud cloudprovider.Interface) *Master {
podRegistry: registry.MakeMemoryRegistry(), podRegistry: registry.MakeMemoryRegistry(),
controllerRegistry: registry.MakeMemoryRegistry(), controllerRegistry: registry.MakeMemoryRegistry(),
serviceRegistry: registry.MakeMemoryRegistry(), serviceRegistry: registry.MakeMemoryRegistry(),
minionRegistry: registry.MakeMinionRegistry(minions),
} }
m.init(minions, cloud) m.init(cloud)
return m return m
} }
// Returns a new apiserver. // Returns a new apiserver.
func New(etcdServers, minions []string, cloud cloudprovider.Interface) *Master { func New(etcdServers, minions []string, cloud cloudprovider.Interface) *Master {
etcdClient := etcd.NewClient(etcdServers) etcdClient := etcd.NewClient(etcdServers)
minionRegistry := registry.MakeMinionRegistry(minions)
m := &Master{ m := &Master{
podRegistry: registry.MakeEtcdRegistry(etcdClient, minions), podRegistry: registry.MakeEtcdRegistry(etcdClient, minionRegistry),
controllerRegistry: registry.MakeEtcdRegistry(etcdClient, minions), controllerRegistry: registry.MakeEtcdRegistry(etcdClient, minionRegistry),
serviceRegistry: registry.MakeEtcdRegistry(etcdClient, minions), serviceRegistry: registry.MakeEtcdRegistry(etcdClient, minionRegistry),
minionRegistry: minionRegistry,
} }
m.init(minions, cloud) m.init(cloud)
return m return m
} }
func (m *Master) init(minions []string, cloud cloudprovider.Interface) { func (m *Master) init(cloud cloudprovider.Interface) {
containerInfo := &client.HTTPContainerInfo{ containerInfo := &client.HTTPContainerInfo{
Client: http.DefaultClient, Client: http.DefaultClient,
Port: 10250, Port: 10250,
} }
m.minions = minions
m.random = rand.New(rand.NewSource(int64(time.Now().Nanosecond()))) m.random = rand.New(rand.NewSource(int64(time.Now().Nanosecond())))
podCache := NewPodCache(containerInfo, m.podRegistry, time.Second*30) podCache := NewPodCache(containerInfo, m.podRegistry, time.Second*30)
go podCache.Loop() go podCache.Loop()
m.storage = map[string]apiserver.RESTStorage{ m.storage = map[string]apiserver.RESTStorage{
"pods": registry.MakePodRegistryStorage(m.podRegistry, containerInfo, registry.MakeFirstFitScheduler(m.minions, m.podRegistry, m.random), cloud, podCache), "pods": registry.MakePodRegistryStorage(m.podRegistry, containerInfo, registry.MakeFirstFitScheduler(m.minionRegistry, m.podRegistry, m.random), cloud, podCache),
"replicationControllers": registry.MakeControllerRegistryStorage(m.controllerRegistry), "replicationControllers": registry.MakeControllerRegistryStorage(m.controllerRegistry),
"services": registry.MakeServiceRegistryStorage(m.serviceRegistry, cloud, m.minions), "services": registry.MakeServiceRegistryStorage(m.serviceRegistry, cloud, m.minionRegistry),
"minions": registry.MakeMinionRegistryStorage(m.minionRegistry),
} }
} }

View File

@ -32,7 +32,7 @@ import (
// EtcdRegistry is an implementation of both ControllerRegistry and PodRegistry which is backed with etcd. // EtcdRegistry is an implementation of both ControllerRegistry and PodRegistry which is backed with etcd.
type EtcdRegistry struct { type EtcdRegistry struct {
etcdClient util.EtcdClient etcdClient util.EtcdClient
machines []string machines MinionRegistry
manifestFactory ManifestFactory manifestFactory ManifestFactory
} }
@ -40,7 +40,7 @@ type EtcdRegistry struct {
// 'client' is the connection to etcd // 'client' is the connection to etcd
// 'machines' is the list of machines // 'machines' is the list of machines
// 'scheduler' is the scheduling algorithm to use. // 'scheduler' is the scheduling algorithm to use.
func MakeEtcdRegistry(client util.EtcdClient, machines []string) *EtcdRegistry { func MakeEtcdRegistry(client util.EtcdClient, machines MinionRegistry) *EtcdRegistry {
registry := &EtcdRegistry{ registry := &EtcdRegistry{
etcdClient: client, etcdClient: client,
machines: machines, machines: machines,
@ -61,7 +61,11 @@ func (registry *EtcdRegistry) helper() *util.EtcdHelper {
func (registry *EtcdRegistry) ListPods(selector labels.Selector) ([]api.Pod, error) { func (registry *EtcdRegistry) ListPods(selector labels.Selector) ([]api.Pod, error) {
pods := []api.Pod{} pods := []api.Pod{}
for _, machine := range registry.machines { machines, err := registry.machines.List()
if err != nil {
return nil, err
}
for _, machine := range machines {
var machinePods []api.Pod var machinePods []api.Pod
err := registry.helper().ExtractList("/registry/hosts/"+machine+"/pods", &machinePods) err := registry.helper().ExtractList("/registry/hosts/"+machine+"/pods", &machinePods)
if err != nil { if err != nil {
@ -175,7 +179,11 @@ func (registry *EtcdRegistry) getPodForMachine(machine, podID string) (pod api.P
} }
func (registry *EtcdRegistry) findPod(podID string) (api.Pod, string, error) { func (registry *EtcdRegistry) findPod(podID string) (api.Pod, string, error) {
for _, machine := range registry.machines { machines, err := registry.machines.List()
if err != nil {
return api.Pod{}, "", err
}
for _, machine := range machines {
pod, err := registry.getPodForMachine(machine, podID) pod, err := registry.getPodForMachine(machine, podID)
if err == nil { if err == nil {
return pod, machine, nil return pod, machine, nil

View File

@ -28,7 +28,7 @@ import (
) )
func MakeTestEtcdRegistry(client util.EtcdClient, machines []string) *EtcdRegistry { func MakeTestEtcdRegistry(client util.EtcdClient, machines []string) *EtcdRegistry {
registry := MakeEtcdRegistry(client, machines) registry := MakeEtcdRegistry(client, MakeMinionRegistry(machines))
registry.manifestFactory = &BasicManifestFactory{ registry.manifestFactory = &BasicManifestFactory{
serviceRegistry: &MockServiceRegistry{}, serviceRegistry: &MockServiceRegistry{},
} }

View File

@ -92,7 +92,7 @@ func TestMinionRegistryStorage(t *testing.T) {
if err != nil { if err != nil {
t.Errorf("got error calling List") t.Errorf("got error calling List")
} }
if !reflect.DeepEqual(list.(string), []string{"baz", "foo"}) { if !reflect.DeepEqual(list.([]string), []string{"baz", "foo"}) {
t.Errorf("Unexpected list value: %#v", list) t.Errorf("Unexpected list value: %#v", list)
} }
} }

View File

@ -31,11 +31,11 @@ type Scheduler interface {
// RandomScheduler choses machines uniformly at random. // RandomScheduler choses machines uniformly at random.
type RandomScheduler struct { type RandomScheduler struct {
machines []string machines MinionRegistry
random rand.Rand random rand.Rand
} }
func MakeRandomScheduler(machines []string, random rand.Rand) Scheduler { func MakeRandomScheduler(machines MinionRegistry, random rand.Rand) Scheduler {
return &RandomScheduler{ return &RandomScheduler{
machines: machines, machines: machines,
random: random, random: random,
@ -43,35 +43,43 @@ func MakeRandomScheduler(machines []string, random rand.Rand) Scheduler {
} }
func (s *RandomScheduler) Schedule(pod api.Pod) (string, error) { func (s *RandomScheduler) Schedule(pod api.Pod) (string, error) {
return s.machines[s.random.Int()%len(s.machines)], nil machines, err := s.machines.List()
if err != nil {
return "", err
}
return machines[s.random.Int()%len(machines)], nil
} }
// RoundRobinScheduler chooses machines in order. // RoundRobinScheduler chooses machines in order.
type RoundRobinScheduler struct { type RoundRobinScheduler struct {
machines []string machines MinionRegistry
currentIndex int currentIndex int
} }
func MakeRoundRobinScheduler(machines []string) Scheduler { func MakeRoundRobinScheduler(machines MinionRegistry) Scheduler {
return &RoundRobinScheduler{ return &RoundRobinScheduler{
machines: machines, machines: machines,
currentIndex: 0, currentIndex: -1,
} }
} }
func (s *RoundRobinScheduler) Schedule(pod api.Pod) (string, error) { func (s *RoundRobinScheduler) Schedule(pod api.Pod) (string, error) {
result := s.machines[s.currentIndex] machines, err := s.machines.List()
s.currentIndex = (s.currentIndex + 1) % len(s.machines) if err != nil {
return "", err
}
s.currentIndex = (s.currentIndex + 1) % len(machines)
result := machines[s.currentIndex]
return result, nil return result, nil
} }
type FirstFitScheduler struct { type FirstFitScheduler struct {
machines []string machines MinionRegistry
registry PodRegistry registry PodRegistry
random *rand.Rand random *rand.Rand
} }
func MakeFirstFitScheduler(machines []string, registry PodRegistry, random *rand.Rand) Scheduler { func MakeFirstFitScheduler(machines MinionRegistry, registry PodRegistry, random *rand.Rand) Scheduler {
return &FirstFitScheduler{ return &FirstFitScheduler{
machines: machines, machines: machines,
registry: registry, registry: registry,
@ -91,6 +99,10 @@ func (s *FirstFitScheduler) containsPort(pod api.Pod, port api.Port) bool {
} }
func (s *FirstFitScheduler) Schedule(pod api.Pod) (string, error) { func (s *FirstFitScheduler) Schedule(pod api.Pod) (string, error) {
machines, err := s.machines.List()
if err != nil {
return "", err
}
machineToPods := map[string][]api.Pod{} machineToPods := map[string][]api.Pod{}
pods, err := s.registry.ListPods(labels.Everything()) pods, err := s.registry.ListPods(labels.Everything())
if err != nil { if err != nil {
@ -101,7 +113,7 @@ func (s *FirstFitScheduler) Schedule(pod api.Pod) (string, error) {
machineToPods[host] = append(machineToPods[host], scheduledPod) machineToPods[host] = append(machineToPods[host], scheduledPod)
} }
var machineOptions []string var machineOptions []string
for _, machine := range s.machines { for _, machine := range machines {
podFits := true podFits := true
for _, scheduledPod := range machineToPods[machine] { for _, scheduledPod := range machineToPods[machine] {
for _, container := range pod.DesiredState.Manifest.Containers { for _, container := range pod.DesiredState.Manifest.Containers {

View File

@ -32,7 +32,7 @@ func expectSchedule(scheduler Scheduler, pod api.Pod, expected string, t *testin
} }
func TestRoundRobinScheduler(t *testing.T) { func TestRoundRobinScheduler(t *testing.T) {
scheduler := MakeRoundRobinScheduler([]string{"m1", "m2", "m3", "m4"}) scheduler := MakeRoundRobinScheduler(MakeMinionRegistry([]string{"m1", "m2", "m3", "m4"}))
expectSchedule(scheduler, api.Pod{}, "m1", t) expectSchedule(scheduler, api.Pod{}, "m1", t)
expectSchedule(scheduler, api.Pod{}, "m2", t) expectSchedule(scheduler, api.Pod{}, "m2", t)
expectSchedule(scheduler, api.Pod{}, "m3", t) expectSchedule(scheduler, api.Pod{}, "m3", t)
@ -41,7 +41,7 @@ func TestRoundRobinScheduler(t *testing.T) {
func TestRandomScheduler(t *testing.T) { func TestRandomScheduler(t *testing.T) {
random := rand.New(rand.NewSource(0)) random := rand.New(rand.NewSource(0))
scheduler := MakeRandomScheduler([]string{"m1", "m2", "m3", "m4"}, *random) scheduler := MakeRandomScheduler(MakeMinionRegistry([]string{"m1", "m2", "m3", "m4"}), *random)
_, err := scheduler.Schedule(api.Pod{}) _, err := scheduler.Schedule(api.Pod{})
expectNoError(t, err) expectNoError(t, err)
} }
@ -49,7 +49,7 @@ func TestRandomScheduler(t *testing.T) {
func TestFirstFitSchedulerNothingScheduled(t *testing.T) { func TestFirstFitSchedulerNothingScheduled(t *testing.T) {
mockRegistry := MockPodRegistry{} mockRegistry := MockPodRegistry{}
r := rand.New(rand.NewSource(0)) r := rand.New(rand.NewSource(0))
scheduler := MakeFirstFitScheduler([]string{"m1", "m2", "m3"}, &mockRegistry, r) scheduler := MakeFirstFitScheduler(MakeMinionRegistry([]string{"m1", "m2", "m3"}), &mockRegistry, r)
expectSchedule(scheduler, api.Pod{}, "m3", t) expectSchedule(scheduler, api.Pod{}, "m3", t)
} }
@ -81,7 +81,7 @@ func TestFirstFitSchedulerFirstScheduled(t *testing.T) {
}, },
} }
r := rand.New(rand.NewSource(0)) r := rand.New(rand.NewSource(0))
scheduler := MakeFirstFitScheduler([]string{"m1", "m2", "m3"}, &mockRegistry, r) scheduler := MakeFirstFitScheduler(MakeMinionRegistry([]string{"m1", "m2", "m3"}), &mockRegistry, r)
expectSchedule(scheduler, makePod("", 8080), "m3", t) expectSchedule(scheduler, makePod("", 8080), "m3", t)
} }
@ -94,7 +94,7 @@ func TestFirstFitSchedulerFirstScheduledComplicated(t *testing.T) {
}, },
} }
r := rand.New(rand.NewSource(0)) r := rand.New(rand.NewSource(0))
scheduler := MakeFirstFitScheduler([]string{"m1", "m2", "m3"}, &mockRegistry, r) scheduler := MakeFirstFitScheduler(MakeMinionRegistry([]string{"m1", "m2", "m3"}), &mockRegistry, r)
expectSchedule(scheduler, makePod("", 8080, 8081), "m3", t) expectSchedule(scheduler, makePod("", 8080, 8081), "m3", t)
} }
@ -107,7 +107,7 @@ func TestFirstFitSchedulerFirstScheduledImpossible(t *testing.T) {
}, },
} }
r := rand.New(rand.NewSource(0)) r := rand.New(rand.NewSource(0))
scheduler := MakeFirstFitScheduler([]string{"m1", "m2", "m3"}, &mockRegistry, r) scheduler := MakeFirstFitScheduler(MakeMinionRegistry([]string{"m1", "m2", "m3"}), &mockRegistry, r)
_, err := scheduler.Schedule(makePod("", 8080, 8081)) _, err := scheduler.Schedule(makePod("", 8080, 8081))
if err == nil { if err == nil {
t.Error("Unexpected non-error.") t.Error("Unexpected non-error.")

View File

@ -30,14 +30,14 @@ import (
type ServiceRegistryStorage struct { type ServiceRegistryStorage struct {
registry ServiceRegistry registry ServiceRegistry
cloud cloudprovider.Interface cloud cloudprovider.Interface
hosts []string machines MinionRegistry
} }
func MakeServiceRegistryStorage(registry ServiceRegistry, cloud cloudprovider.Interface, hosts []string) apiserver.RESTStorage { func MakeServiceRegistryStorage(registry ServiceRegistry, cloud cloudprovider.Interface, machines MinionRegistry) apiserver.RESTStorage {
return &ServiceRegistryStorage{ return &ServiceRegistryStorage{
registry: registry, registry: registry,
cloud: cloud, cloud: cloud,
hosts: hosts, machines: machines,
} }
} }
@ -117,7 +117,11 @@ func (sr *ServiceRegistryStorage) Create(obj interface{}) (<-chan interface{}, e
balancer, ok = sr.cloud.TCPLoadBalancer() balancer, ok = sr.cloud.TCPLoadBalancer()
} }
if ok && balancer != nil { if ok && balancer != nil {
err := balancer.CreateTCPLoadBalancer(srv.ID, "us-central1", srv.Port, sr.hosts) hosts, err := sr.machines.List()
if err != nil {
return nil, err
}
err = balancer.CreateTCPLoadBalancer(srv.ID, "us-central1", srv.Port, hosts)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -29,7 +29,7 @@ func TestServiceRegistry(t *testing.T) {
fakeCloud := &cloudprovider.FakeCloud{} fakeCloud := &cloudprovider.FakeCloud{}
machines := []string{"foo", "bar", "baz"} machines := []string{"foo", "bar", "baz"}
storage := MakeServiceRegistryStorage(memory, fakeCloud, machines) storage := MakeServiceRegistryStorage(memory, fakeCloud, MakeMinionRegistry(machines))
svc := api.Service{ svc := api.Service{
JSONBase: api.JSONBase{ID: "foo"}, JSONBase: api.JSONBase{ID: "foo"},
@ -51,7 +51,7 @@ func TestServiceRegistryExternalService(t *testing.T) {
fakeCloud := &cloudprovider.FakeCloud{} fakeCloud := &cloudprovider.FakeCloud{}
machines := []string{"foo", "bar", "baz"} machines := []string{"foo", "bar", "baz"}
storage := MakeServiceRegistryStorage(memory, fakeCloud, machines) storage := MakeServiceRegistryStorage(memory, fakeCloud, MakeMinionRegistry(machines))
svc := api.Service{ svc := api.Service{
JSONBase: api.JSONBase{ID: "foo"}, JSONBase: api.JSONBase{ID: "foo"},
@ -76,7 +76,7 @@ func TestServiceRegistryExternalServiceError(t *testing.T) {
} }
machines := []string{"foo", "bar", "baz"} machines := []string{"foo", "bar", "baz"}
storage := MakeServiceRegistryStorage(memory, fakeCloud, machines) storage := MakeServiceRegistryStorage(memory, fakeCloud, MakeMinionRegistry(machines))
svc := api.Service{ svc := api.Service{
JSONBase: api.JSONBase{ID: "foo"}, JSONBase: api.JSONBase{ID: "foo"},
@ -99,7 +99,7 @@ func TestServiceRegistryDelete(t *testing.T) {
fakeCloud := &cloudprovider.FakeCloud{} fakeCloud := &cloudprovider.FakeCloud{}
machines := []string{"foo", "bar", "baz"} machines := []string{"foo", "bar", "baz"}
storage := MakeServiceRegistryStorage(memory, fakeCloud, machines) storage := MakeServiceRegistryStorage(memory, fakeCloud, MakeMinionRegistry(machines))
svc := api.Service{ svc := api.Service{
JSONBase: api.JSONBase{ID: "foo"}, JSONBase: api.JSONBase{ID: "foo"},
@ -123,7 +123,7 @@ func TestServiceRegistryDeleteExternal(t *testing.T) {
fakeCloud := &cloudprovider.FakeCloud{} fakeCloud := &cloudprovider.FakeCloud{}
machines := []string{"foo", "bar", "baz"} machines := []string{"foo", "bar", "baz"}
storage := MakeServiceRegistryStorage(memory, fakeCloud, machines) storage := MakeServiceRegistryStorage(memory, fakeCloud, MakeMinionRegistry(machines))
svc := api.Service{ svc := api.Service{
JSONBase: api.JSONBase{ID: "foo"}, JSONBase: api.JSONBase{ID: "foo"},