Merge pull request #961 from lavalamp/removeAssignCall

Remove synchronous assignPod call from create pod
This commit is contained in:
brendandburns 2014-08-25 15:25:27 -07:00
commit 5155222edc
14 changed files with 289 additions and 103 deletions

View File

@ -38,6 +38,9 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/master" "github.com/GoogleCloudPlatform/kubernetes/pkg/master"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/wait" "github.com/GoogleCloudPlatform/kubernetes/pkg/util/wait"
"github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler"
"github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler/factory"
"github.com/coreos/go-etcd/etcd" "github.com/coreos/go-etcd/etcd"
"github.com/golang/glog" "github.com/golang/glog"
) )
@ -64,7 +67,7 @@ func (fakePodInfoGetter) GetPodInfo(host, podID string) (api.PodInfo, error) {
Port: 10251, Port: 10251,
} }
default: default:
glog.Fatalf("Can't get info for: %v, %v", host, podID) glog.Fatalf("Can't get info for: '%v', '%v'", host, podID)
} }
return c.GetPodInfo("localhost", podID) return c.GetPodInfo("localhost", podID)
} }
@ -106,6 +109,9 @@ func startComponents(manifestURL string) (apiServerURL string) {
storage, codec := m.API_v1beta1() storage, codec := m.API_v1beta1()
handler.delegate = apiserver.Handle(storage, codec, "/api/v1beta1") handler.delegate = apiserver.Handle(storage, codec, "/api/v1beta1")
// Scheduler
scheduler.New((&factory.ConfigFactory{cl}).Create()).Run()
controllerManager := controller.NewReplicationManager(cl) controllerManager := controller.NewReplicationManager(cl)
// Prove that controllerManager's watch works by making it not sync until after this // Prove that controllerManager's watch works by making it not sync until after this

View File

@ -0,0 +1,27 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package constraint
import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
)
// Allowed returns true if manifests is a collection of manifests
// which can run without conflict on a single minion.
func Allowed(manifests []api.ContainerManifest) bool {
return !PortsConflict(manifests)
}

View File

@ -0,0 +1,98 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package constraint
import (
"testing"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
)
func containerWithHostPorts(ports ...int) api.Container {
c := api.Container{}
for _, p := range ports {
c.Ports = append(c.Ports, api.Port{HostPort: p})
}
return c
}
func manifestWithContainers(containers ...api.Container) api.ContainerManifest {
m := api.ContainerManifest{}
for _, c := range containers {
m.Containers = append(m.Containers, c)
}
return m
}
func TestAllowed(t *testing.T) {
table := []struct {
allowed bool
manifests []api.ContainerManifest
}{
{
allowed: true,
manifests: []api.ContainerManifest{
manifestWithContainers(
containerWithHostPorts(1, 2, 3),
containerWithHostPorts(4, 5, 6),
),
manifestWithContainers(
containerWithHostPorts(7, 8, 9),
containerWithHostPorts(10, 11, 12),
),
},
},
{
allowed: true,
manifests: []api.ContainerManifest{
manifestWithContainers(
containerWithHostPorts(0, 0),
containerWithHostPorts(0, 0),
),
manifestWithContainers(
containerWithHostPorts(0, 0),
containerWithHostPorts(0, 0),
),
},
},
{
allowed: false,
manifests: []api.ContainerManifest{
manifestWithContainers(
containerWithHostPorts(3, 3),
),
},
},
{
allowed: false,
manifests: []api.ContainerManifest{
manifestWithContainers(
containerWithHostPorts(6),
),
manifestWithContainers(
containerWithHostPorts(6),
),
},
},
}
for _, item := range table {
if e, a := item.allowed, Allowed(item.manifests); e != a {
t.Errorf("Expected %v, got %v: \n%v\v", e, a, item.manifests)
}
}
}

23
pkg/constraint/doc.go Normal file
View File

@ -0,0 +1,23 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package constraint has functions for ensuring that collections of
// containers are allowed to run together on a single host.
//
// TODO: Add resource math. Phrase this code in a way that makes it easy
// to call from schedulers as well as from the feasiblity check that
// apiserver performs.
package constraint

41
pkg/constraint/ports.go Normal file
View File

@ -0,0 +1,41 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package constraint
import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
)
// PortsConflict returns true iff two containers attempt to expose
// the same host port.
func PortsConflict(manifests []api.ContainerManifest) bool {
hostPorts := map[int]struct{}{}
for _, manifest := range manifests {
for _, container := range manifest.Containers {
for _, port := range container.Ports {
if port.HostPort == 0 {
continue
}
if _, exists := hostPorts[port.HostPort]; exists {
return true
}
hostPorts[port.HostPort] = struct{}{}
}
}
}
return false
}

View File

@ -17,7 +17,6 @@ limitations under the License.
package master package master
import ( import (
"math/rand"
"net/http" "net/http"
"time" "time"
@ -32,7 +31,6 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/minion" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/minion"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/pod" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/pod"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/service" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/service"
"github.com/GoogleCloudPlatform/kubernetes/pkg/scheduler"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/util"
goetcd "github.com/coreos/go-etcd/etcd" goetcd "github.com/coreos/go-etcd/etcd"
@ -111,16 +109,12 @@ func (m *Master) init(cloud cloudprovider.Interface, podInfoGetter client.PodInf
endpoints := endpoint.NewEndpointController(m.serviceRegistry, m.client) endpoints := endpoint.NewEndpointController(m.serviceRegistry, m.client)
go util.Forever(func() { endpoints.SyncServiceEndpoints() }, time.Second*10) go util.Forever(func() { endpoints.SyncServiceEndpoints() }, time.Second*10)
random := rand.New(rand.NewSource(int64(time.Now().Nanosecond())))
s := scheduler.NewRandomFitScheduler(m.podRegistry, random)
m.storage = map[string]apiserver.RESTStorage{ m.storage = map[string]apiserver.RESTStorage{
"pods": pod.NewRegistryStorage(&pod.RegistryStorageConfig{ "pods": pod.NewRegistryStorage(&pod.RegistryStorageConfig{
CloudProvider: cloud, CloudProvider: cloud,
MinionLister: m.minionRegistry,
PodCache: podCache, PodCache: podCache,
PodInfoGetter: podInfoGetter, PodInfoGetter: podInfoGetter,
Registry: m.podRegistry, Registry: m.podRegistry,
Scheduler: s,
}), }),
"replicationControllers": controller.NewRegistryStorage(m.controllerRegistry, m.podRegistry), "replicationControllers": controller.NewRegistryStorage(m.controllerRegistry, m.podRegistry),
"services": service.NewRegistryStorage(m.serviceRegistry, cloud, m.minionRegistry), "services": service.NewRegistryStorage(m.serviceRegistry, cloud, m.minionRegistry),

View File

@ -48,6 +48,7 @@ func NewPodCache(info client.PodInfoGetter, pods pod.Registry) *PodCache {
// GetPodInfo Implements the PodInfoGetter.GetPodInfo. // GetPodInfo Implements the PodInfoGetter.GetPodInfo.
// The returned value should be treated as read-only. // The returned value should be treated as read-only.
// TODO: Remove the host from this call, it's totally unnecessary.
func (p *PodCache) GetPodInfo(host, podID string) (api.PodInfo, error) { func (p *PodCache) GetPodInfo(host, podID string) (api.PodInfo, error) {
p.podLock.Lock() p.podLock.Lock()
defer p.podLock.Unlock() defer p.podLock.Unlock()

View File

@ -21,6 +21,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver" "github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver"
"github.com/GoogleCloudPlatform/kubernetes/pkg/constraint"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/minion" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/minion"
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools" "github.com/GoogleCloudPlatform/kubernetes/pkg/tools"
@ -99,19 +100,15 @@ func makeContainerKey(machine string) string {
return "/registry/hosts/" + machine + "/kubelet" return "/registry/hosts/" + machine + "/kubelet"
} }
// CreatePod creates a pod based on a specification, schedule it onto a specific machine. // CreatePod creates a pod based on a specification.
func (r *Registry) CreatePod(machine string, pod api.Pod) error { func (r *Registry) CreatePod(pod api.Pod) error {
// Set current status to "Waiting". // Set current status to "Waiting".
pod.CurrentState.Status = api.PodWaiting pod.CurrentState.Status = api.PodWaiting
pod.CurrentState.Host = "" pod.CurrentState.Host = ""
// DesiredState.Host == "" is a signal to the scheduler that this pod needs scheduling. // DesiredState.Host == "" is a signal to the scheduler that this pod needs scheduling.
pod.DesiredState.Status = api.PodRunning pod.DesiredState.Status = api.PodRunning
pod.DesiredState.Host = "" pod.DesiredState.Host = ""
if err := r.CreateObj(makePodKey(pod.ID), &pod); err != nil { return r.CreateObj(makePodKey(pod.ID), &pod)
return err
}
// TODO: Until scheduler separation is completed, just assign here.
return r.assignPod(pod.ID, machine)
} }
// ApplyBinding implements binding's registry // ApplyBinding implements binding's registry
@ -119,23 +116,28 @@ func (r *Registry) ApplyBinding(binding *api.Binding) error {
return r.assignPod(binding.PodID, binding.Host) return r.assignPod(binding.PodID, binding.Host)
} }
// assignPod assigns the given pod to the given machine. // setPodHostTo sets the given pod's host to 'machine' iff it was previously 'oldMachine'.
// TODO: hook this up via apiserver, not by calling it from CreatePod(). // Returns the current state of the pod, or an error.
func (r *Registry) assignPod(podID string, machine string) error { func (r *Registry) setPodHostTo(podID, oldMachine, machine string) (finalPod *api.Pod, err error) {
podKey := makePodKey(podID) podKey := makePodKey(podID)
var finalPod *api.Pod err = r.AtomicUpdate(podKey, &api.Pod{}, func(obj interface{}) (interface{}, error) {
err := r.AtomicUpdate(podKey, &api.Pod{}, func(obj interface{}) (interface{}, error) {
pod, ok := obj.(*api.Pod) pod, ok := obj.(*api.Pod)
if !ok { if !ok {
return nil, fmt.Errorf("unexpected object: %#v", obj) return nil, fmt.Errorf("unexpected object: %#v", obj)
} }
if pod.DesiredState.Host != "" { if pod.DesiredState.Host != oldMachine {
return nil, fmt.Errorf("pod %v is already assigned to host %v", pod.ID, pod.DesiredState.Host) return nil, fmt.Errorf("pod %v is already assigned to host %v", pod.ID, pod.DesiredState.Host)
} }
pod.DesiredState.Host = machine pod.DesiredState.Host = machine
finalPod = pod finalPod = pod
return pod, nil return pod, nil
}) })
return finalPod, err
}
// assignPod assigns the given pod to the given machine.
func (r *Registry) assignPod(podID string, machine string) error {
finalPod, err := r.setPodHostTo(podID, "", machine)
if err != nil { if err != nil {
return err return err
} }
@ -148,14 +150,16 @@ func (r *Registry) assignPod(podID string, machine string) error {
err = r.AtomicUpdate(contKey, &api.ContainerManifestList{}, func(in interface{}) (interface{}, error) { err = r.AtomicUpdate(contKey, &api.ContainerManifestList{}, func(in interface{}) (interface{}, error) {
manifests := *in.(*api.ContainerManifestList) manifests := *in.(*api.ContainerManifestList)
manifests.Items = append(manifests.Items, manifest) manifests.Items = append(manifests.Items, manifest)
if !constraint.Allowed(manifests.Items) {
return nil, fmt.Errorf("The assignment would cause a constraint violation")
}
return manifests, nil return manifests, nil
}) })
if err != nil { if err != nil {
// Don't strand stuff. This is a terrible hack that won't be needed // Put the pod's host back the way it was. This is a terrible hack that
// when the above TODO is fixed. // won't be needed if we convert this to a rectification loop.
err2 := r.Delete(podKey, false) if _, err2 := r.setPodHostTo(podID, machine, ""); err2 != nil {
if err2 != nil { glog.Errorf("Stranding pod %v; couldn't clear host after previous error: %v", podID, err2)
glog.Errorf("Probably stranding a pod, couldn't delete %v: %#v", podKey, err2)
} }
} }
return err return err

View File

@ -78,7 +78,7 @@ func TestEtcdCreatePod(t *testing.T) {
} }
fakeClient.Set("/registry/hosts/machine/kubelet", api.EncodeOrDie(&api.ContainerManifestList{}), 0) fakeClient.Set("/registry/hosts/machine/kubelet", api.EncodeOrDie(&api.ContainerManifestList{}), 0)
registry := NewTestEtcdRegistry(fakeClient, []string{"machine"}) registry := NewTestEtcdRegistry(fakeClient, []string{"machine"})
err := registry.CreatePod("machine", api.Pod{ err := registry.CreatePod(api.Pod{
JSONBase: api.JSONBase{ JSONBase: api.JSONBase{
ID: "foo", ID: "foo",
}, },
@ -93,7 +93,13 @@ func TestEtcdCreatePod(t *testing.T) {
}, },
}) })
if err != nil { if err != nil {
t.Errorf("unexpected error: %v", err) t.Fatalf("unexpected error: %v", err)
}
// Suddenly, a wild scheduler appears:
err = registry.ApplyBinding(&api.Binding{PodID: "foo", Host: "machine"})
if err != nil {
t.Fatalf("unexpected error: %v", err)
} }
resp, err := fakeClient.Get("/registry/pods/foo", false, false) resp, err := fakeClient.Get("/registry/pods/foo", false, false)
@ -132,7 +138,7 @@ func TestEtcdCreatePodAlreadyExisting(t *testing.T) {
E: nil, E: nil,
} }
registry := NewTestEtcdRegistry(fakeClient, []string{"machine"}) registry := NewTestEtcdRegistry(fakeClient, []string{"machine"})
err := registry.CreatePod("machine", api.Pod{ err := registry.CreatePod(api.Pod{
JSONBase: api.JSONBase{ JSONBase: api.JSONBase{
ID: "foo", ID: "foo",
}, },
@ -158,20 +164,27 @@ func TestEtcdCreatePodWithContainersError(t *testing.T) {
E: tools.EtcdErrorValueRequired, E: tools.EtcdErrorValueRequired,
} }
registry := NewTestEtcdRegistry(fakeClient, []string{"machine"}) registry := NewTestEtcdRegistry(fakeClient, []string{"machine"})
err := registry.CreatePod("machine", api.Pod{ err := registry.CreatePod(api.Pod{
JSONBase: api.JSONBase{ JSONBase: api.JSONBase{
ID: "foo", ID: "foo",
}, },
}) })
if err == nil { if err != nil {
t.Fatalf("Unexpected non-error") t.Fatalf("Unexpected error: %v", err)
} }
_, err = fakeClient.Get("/registry/pods/foo", false, false)
// Suddenly, a wild scheduler appears:
err = registry.ApplyBinding(&api.Binding{PodID: "foo", Host: "machine"})
if err == nil { if err == nil {
t.Error("Unexpected non-error") t.Fatalf("Unexpected non error.")
} }
if !tools.IsEtcdNotFound(err) {
t.Errorf("Unexpected error: %#v", err) existingPod, err := registry.GetPod("foo")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if existingPod.DesiredState.Host == "machine" {
t.Fatal("Pod's host changed in response to an unappliable binding.")
} }
} }
@ -191,7 +204,7 @@ func TestEtcdCreatePodWithContainersNotFound(t *testing.T) {
E: tools.EtcdErrorNotFound, E: tools.EtcdErrorNotFound,
} }
registry := NewTestEtcdRegistry(fakeClient, []string{"machine"}) registry := NewTestEtcdRegistry(fakeClient, []string{"machine"})
err := registry.CreatePod("machine", api.Pod{ err := registry.CreatePod(api.Pod{
JSONBase: api.JSONBase{ JSONBase: api.JSONBase{
ID: "foo", ID: "foo",
}, },
@ -210,6 +223,12 @@ func TestEtcdCreatePodWithContainersNotFound(t *testing.T) {
t.Fatalf("unexpected error: %v", err) t.Fatalf("unexpected error: %v", err)
} }
// Suddenly, a wild scheduler appears:
err = registry.ApplyBinding(&api.Binding{PodID: "foo", Host: "machine"})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
resp, err := fakeClient.Get("/registry/pods/foo", false, false) resp, err := fakeClient.Get("/registry/pods/foo", false, false)
if err != nil { if err != nil {
t.Fatalf("Unexpected error %v", err) t.Fatalf("Unexpected error %v", err)
@ -250,7 +269,7 @@ func TestEtcdCreatePodWithExistingContainers(t *testing.T) {
}, },
}), 0) }), 0)
registry := NewTestEtcdRegistry(fakeClient, []string{"machine"}) registry := NewTestEtcdRegistry(fakeClient, []string{"machine"})
err := registry.CreatePod("machine", api.Pod{ err := registry.CreatePod(api.Pod{
JSONBase: api.JSONBase{ JSONBase: api.JSONBase{
ID: "foo", ID: "foo",
}, },
@ -266,7 +285,13 @@ func TestEtcdCreatePodWithExistingContainers(t *testing.T) {
}, },
}) })
if err != nil { if err != nil {
t.Errorf("unexpected error: %v", err) t.Fatalf("unexpected error: %v", err)
}
// Suddenly, a wild scheduler appears:
err = registry.ApplyBinding(&api.Binding{PodID: "foo", Host: "machine"})
if err != nil {
t.Fatalf("unexpected error: %v", err)
} }
resp, err := fakeClient.Get("/registry/pods/foo", false, false) resp, err := fakeClient.Get("/registry/pods/foo", false, false)

View File

@ -30,8 +30,8 @@ type Registry interface {
WatchPods(resourceVersion uint64) (watch.Interface, error) WatchPods(resourceVersion uint64) (watch.Interface, error)
// Get a specific pod // Get a specific pod
GetPod(podID string) (*api.Pod, error) GetPod(podID string) (*api.Pod, error)
// Create a pod based on a specification, schedule it onto a specific machine. // Create a pod based on a specification.
CreatePod(machine string, pod api.Pod) error CreatePod(pod api.Pod) error
// Update an existing pod // Update an existing pod
UpdatePod(pod api.Pod) error UpdatePod(pod api.Pod) error
// Delete an existing pod // Delete an existing pod

View File

@ -27,7 +27,6 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider" "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/scheduler"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
@ -39,33 +38,27 @@ import (
type RegistryStorage struct { type RegistryStorage struct {
cloudProvider cloudprovider.Interface cloudProvider cloudprovider.Interface
mu sync.Mutex mu sync.Mutex
minionLister scheduler.MinionLister
podCache client.PodInfoGetter podCache client.PodInfoGetter
podInfoGetter client.PodInfoGetter podInfoGetter client.PodInfoGetter
podPollPeriod time.Duration podPollPeriod time.Duration
registry Registry registry Registry
scheduler scheduler.Scheduler
} }
type RegistryStorageConfig struct { type RegistryStorageConfig struct {
CloudProvider cloudprovider.Interface CloudProvider cloudprovider.Interface
MinionLister scheduler.MinionLister
PodCache client.PodInfoGetter PodCache client.PodInfoGetter
PodInfoGetter client.PodInfoGetter PodInfoGetter client.PodInfoGetter
Registry Registry Registry Registry
Scheduler scheduler.Scheduler
} }
// NewRegistryStorage returns a new RegistryStorage. // NewRegistryStorage returns a new RegistryStorage.
func NewRegistryStorage(config *RegistryStorageConfig) apiserver.RESTStorage { func NewRegistryStorage(config *RegistryStorageConfig) apiserver.RESTStorage {
return &RegistryStorage{ return &RegistryStorage{
cloudProvider: config.CloudProvider, cloudProvider: config.CloudProvider,
minionLister: config.MinionLister,
podCache: config.PodCache, podCache: config.PodCache,
podInfoGetter: config.PodInfoGetter, podInfoGetter: config.PodInfoGetter,
podPollPeriod: time.Second * 10, podPollPeriod: time.Second * 10,
registry: config.Registry, registry: config.Registry,
scheduler: config.Scheduler,
} }
} }
@ -82,7 +75,7 @@ func (rs *RegistryStorage) Create(obj interface{}) (<-chan interface{}, error) {
pod.CreationTimestamp = util.Now() pod.CreationTimestamp = util.Now()
return apiserver.MakeAsync(func() (interface{}, error) { return apiserver.MakeAsync(func() (interface{}, error) {
if err := rs.scheduleAndCreatePod(*pod); err != nil { if err := rs.registry.CreatePod(*pod); err != nil {
return nil, err return nil, err
} }
return rs.registry.GetPod(pod.ID) return rs.registry.GetPod(pod.ID)
@ -133,10 +126,11 @@ func (rs *RegistryStorage) Watch(label, field labels.Selector, resourceVersion u
pod := e.Object.(*api.Pod) pod := e.Object.(*api.Pod)
fields := labels.Set{ fields := labels.Set{
"ID": pod.ID, "ID": pod.ID,
"DesiredState.Status": string(pod.CurrentState.Status), "DesiredState.Status": string(pod.DesiredState.Status),
"DesiredState.Host": pod.CurrentState.Host, "DesiredState.Host": pod.DesiredState.Host,
} }
return e, label.Matches(labels.Set(pod.Labels)) && field.Matches(fields) passesFilter := label.Matches(labels.Set(pod.Labels)) && field.Matches(fields)
return e, passesFilter
}), nil }), nil
} }
@ -158,6 +152,10 @@ func (rs *RegistryStorage) Update(obj interface{}) (<-chan interface{}, error) {
} }
func (rs *RegistryStorage) fillPodInfo(pod *api.Pod) { func (rs *RegistryStorage) fillPodInfo(pod *api.Pod) {
pod.CurrentState.Host = pod.DesiredState.Host
if pod.CurrentState.Host == "" {
return
}
// Get cached info for the list currently. // Get cached info for the list currently.
// TODO: Optionally use fresh info // TODO: Optionally use fresh info
if rs.podCache != nil { if rs.podCache != nil {
@ -240,17 +238,6 @@ func getPodStatus(pod *api.Pod) api.PodStatus {
} }
} }
func (rs *RegistryStorage) scheduleAndCreatePod(pod api.Pod) error {
rs.mu.Lock()
defer rs.mu.Unlock()
// TODO(lavalamp): Separate scheduler more cleanly.
machine, err := rs.scheduler.Schedule(pod, rs.minionLister)
if err != nil {
return err
}
return rs.registry.CreatePod(machine, pod)
}
func (rs *RegistryStorage) waitForPodRunning(pod api.Pod) (interface{}, error) { func (rs *RegistryStorage) waitForPodRunning(pod api.Pod) (interface{}, error) {
for { for {
podObj, err := rs.Get(pod.ID) podObj, err := rs.Get(pod.ID)

View File

@ -25,9 +25,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/fake" "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/fake"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/minion"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest"
"github.com/GoogleCloudPlatform/kubernetes/pkg/scheduler"
"github.com/fsouza/go-dockerclient" "github.com/fsouza/go-dockerclient"
) )
@ -58,7 +56,6 @@ func TestCreatePodRegistryError(t *testing.T) {
podRegistry := registrytest.NewPodRegistry(nil) podRegistry := registrytest.NewPodRegistry(nil)
podRegistry.Err = fmt.Errorf("test error") podRegistry.Err = fmt.Errorf("test error")
storage := RegistryStorage{ storage := RegistryStorage{
scheduler: &registrytest.Scheduler{},
registry: podRegistry, registry: podRegistry,
} }
desiredState := api.PodState{ desiredState := api.PodState{
@ -74,31 +71,10 @@ func TestCreatePodRegistryError(t *testing.T) {
expectApiStatusError(t, ch, podRegistry.Err.Error()) expectApiStatusError(t, ch, podRegistry.Err.Error())
} }
func TestCreatePodSchedulerError(t *testing.T) {
mockScheduler := registrytest.Scheduler{
Err: fmt.Errorf("test error"),
}
storage := RegistryStorage{
scheduler: &mockScheduler,
}
desiredState := api.PodState{
Manifest: api.ContainerManifest{
Version: "v1beta1",
},
}
pod := &api.Pod{DesiredState: desiredState}
ch, err := storage.Create(pod)
if err != nil {
t.Errorf("Expected %#v, Got %#v", nil, err)
}
expectApiStatusError(t, ch, mockScheduler.Err.Error())
}
func TestCreatePodSetsIds(t *testing.T) { func TestCreatePodSetsIds(t *testing.T) {
podRegistry := registrytest.NewPodRegistry(nil) podRegistry := registrytest.NewPodRegistry(nil)
podRegistry.Err = fmt.Errorf("test error") podRegistry.Err = fmt.Errorf("test error")
storage := RegistryStorage{ storage := RegistryStorage{
scheduler: &registrytest.Scheduler{Machine: "test"},
registry: podRegistry, registry: podRegistry,
} }
desiredState := api.PodState{ desiredState := api.PodState{
@ -347,7 +323,6 @@ func TestPodStorageValidatesCreate(t *testing.T) {
podRegistry := registrytest.NewPodRegistry(nil) podRegistry := registrytest.NewPodRegistry(nil)
podRegistry.Err = fmt.Errorf("test error") podRegistry.Err = fmt.Errorf("test error")
storage := RegistryStorage{ storage := RegistryStorage{
scheduler: &registrytest.Scheduler{Machine: "test"},
registry: podRegistry, registry: podRegistry,
} }
pod := &api.Pod{} pod := &api.Pod{}
@ -364,7 +339,6 @@ func TestPodStorageValidatesUpdate(t *testing.T) {
podRegistry := registrytest.NewPodRegistry(nil) podRegistry := registrytest.NewPodRegistry(nil)
podRegistry.Err = fmt.Errorf("test error") podRegistry.Err = fmt.Errorf("test error")
storage := RegistryStorage{ storage := RegistryStorage{
scheduler: &registrytest.Scheduler{Machine: "test"},
registry: podRegistry, registry: podRegistry,
} }
pod := &api.Pod{} pod := &api.Pod{}
@ -388,8 +362,6 @@ func TestCreatePod(t *testing.T) {
storage := RegistryStorage{ storage := RegistryStorage{
registry: podRegistry, registry: podRegistry,
podPollPeriod: time.Millisecond * 100, podPollPeriod: time.Millisecond * 100,
scheduler: scheduler.NewRoundRobinScheduler(),
minionLister: minion.NewRegistry([]string{"machine"}),
} }
desiredState := api.PodState{ desiredState := api.PodState{
Manifest: api.ContainerManifest{ Manifest: api.ContainerManifest{
@ -438,7 +410,7 @@ func TestFillPodInfo(t *testing.T) {
storage := RegistryStorage{ storage := RegistryStorage{
podCache: &fakeGetter, podCache: &fakeGetter,
} }
pod := api.Pod{} pod := api.Pod{DesiredState: api.PodState{Host: "foo"}}
storage.fillPodInfo(&pod) storage.fillPodInfo(&pod)
if !reflect.DeepEqual(fakeGetter.info, pod.CurrentState.Info) { if !reflect.DeepEqual(fakeGetter.info, pod.CurrentState.Info) {
t.Errorf("Expected: %#v, Got %#v", fakeGetter.info, pod.CurrentState.Info) t.Errorf("Expected: %#v, Got %#v", fakeGetter.info, pod.CurrentState.Info)
@ -461,7 +433,7 @@ func TestFillPodInfoNoData(t *testing.T) {
storage := RegistryStorage{ storage := RegistryStorage{
podCache: &fakeGetter, podCache: &fakeGetter,
} }
pod := api.Pod{} pod := api.Pod{DesiredState: api.PodState{Host: "foo"}}
storage.fillPodInfo(&pod) storage.fillPodInfo(&pod)
if !reflect.DeepEqual(fakeGetter.info, pod.CurrentState.Info) { if !reflect.DeepEqual(fakeGetter.info, pod.CurrentState.Info) {
t.Errorf("Expected %#v, Got %#v", fakeGetter.info, pod.CurrentState.Info) t.Errorf("Expected %#v, Got %#v", fakeGetter.info, pod.CurrentState.Info)

View File

@ -26,7 +26,6 @@ import (
type PodRegistry struct { type PodRegistry struct {
Err error Err error
Machine string
Pod *api.Pod Pod *api.Pod
Pods []api.Pod Pods []api.Pod
sync.Mutex sync.Mutex
@ -66,10 +65,9 @@ func (r *PodRegistry) GetPod(podId string) (*api.Pod, error) {
return r.Pod, r.Err return r.Pod, r.Err
} }
func (r *PodRegistry) CreatePod(machine string, pod api.Pod) error { func (r *PodRegistry) CreatePod(pod api.Pod) error {
r.Lock() r.Lock()
defer r.Unlock() defer r.Unlock()
r.Machine = machine
r.Pod = &pod r.Pod = &pod
r.mux.Action(watch.Added, &pod) r.mux.Action(watch.Added, &pod)
return r.Err return r.Err

View File

@ -69,7 +69,14 @@ func (factory *ConfigFactory) Create() *scheduler.Config {
Algorithm: algo, Algorithm: algo,
Binder: &binder{factory.Client}, Binder: &binder{factory.Client},
NextPod: func() *api.Pod { NextPod: func() *api.Pod {
return podQueue.Pop().(*api.Pod) pod := podQueue.Pop().(*api.Pod)
// TODO: Remove or reduce verbosity by sep 6th, 2014. Leave until then to
// make it easy to find scheduling problems.
glog.Infof("About to try and schedule pod %v\n"+
"\tknown minions: %v\n"+
"\tknown scheduled pods: %v\n",
pod.ID, minionCache.Contains(), podCache.Contains())
return pod
}, },
Error: factory.makeDefaultErrorFunc(podQueue), Error: factory.makeDefaultErrorFunc(podQueue),
} }
@ -193,5 +200,8 @@ type binder struct {
// Bind just does a POST binding RPC. // Bind just does a POST binding RPC.
func (b *binder) Bind(binding *api.Binding) error { func (b *binder) Bind(binding *api.Binding) error {
// TODO: Remove or reduce verbosity by sep 6th, 2014. Leave until then to
// make it easy to find scheduling problems.
glog.Infof("Attempting to bind %v to %v", binding.PodID, binding.Host)
return b.Post().Path("bindings").Body(binding).Do().Error() return b.Post().Path("bindings").Body(binding).Do().Error()
} }