Merge pull request #557 from lavalamp/podLocation

Prepare for external scheduler
This commit is contained in:
brendandburns 2014-08-11 15:27:24 -07:00
commit 3222f61bca
9 changed files with 184 additions and 132 deletions

View File

@ -208,9 +208,12 @@ type PodStatus string
// These are the valid statuses of pods.
const (
// PodWaiting means that we're waiting for the pod to begin running.
PodWaiting PodStatus = "Waiting"
// PodRunning means that the pod is up and running.
PodRunning PodStatus = "Running"
PodPending PodStatus = "Pending"
PodStopped PodStatus = "Stopped"
// PodTerminated means that the pod has stopped.
PodTerminated PodStatus = "Terminated"
)
// PodInfo contains one entry for every container with available info.

View File

@ -211,9 +211,12 @@ type PodStatus string
// These are the valid statuses of pods.
const (
// PodWaiting means that we're waiting for the pod to begin running.
PodWaiting PodStatus = "Waiting"
// PodRunning means that the pod is up and running.
PodRunning PodStatus = "Running"
PodPending PodStatus = "Pending"
PodStopped PodStatus = "Stopped"
// PodTerminated means that the pod has stopped.
PodTerminated PodStatus = "Terminated"
)
// PodInfo contains one entry for every container with available info.

View File

@ -130,7 +130,7 @@ func (rm *ReplicationManager) watchControllers(resourceVersion *uint64) {
func (rm *ReplicationManager) filterActivePods(pods []api.Pod) []api.Pod {
var result []api.Pod
for _, value := range pods {
if api.PodStopped != value.CurrentState.Status {
if api.PodTerminated != value.CurrentState.Status {
result = append(result, value)
}
}

View File

@ -76,13 +76,13 @@ func (p *PodCache) updatePodInfo(host, id string) error {
func (p *PodCache) UpdateAllContainers() {
pods, err := p.pods.ListPods(labels.Everything())
if err != nil {
glog.Errorf("Error synchronizing container list: %#v", err)
glog.Errorf("Error synchronizing container list: %v", err)
return
}
for _, pod := range pods {
err := p.updatePodInfo(pod.CurrentState.Host, pod.ID)
if err != nil && err != client.ErrPodInfoNotAvailable {
glog.Errorf("Error synchronizing container: %#v", err)
glog.Errorf("Error synchronizing container: %v", err)
}
}
}

View File

@ -33,7 +33,6 @@ import (
// EtcdRegistry implements PodRegistry, ControllerRegistry and ServiceRegistry with backed by etcd.
type EtcdRegistry struct {
helper tools.EtcdHelper
machines MinionRegistry
manifestFactory ManifestFactory
}
@ -43,8 +42,7 @@ type EtcdRegistry struct {
// 'scheduler' is the scheduling algorithm to use.
func MakeEtcdRegistry(client tools.EtcdClient, machines MinionRegistry) *EtcdRegistry {
registry := &EtcdRegistry{
helper: tools.EtcdHelper{client, api.Codec, api.ResourceVersioner},
machines: machines,
helper: tools.EtcdHelper{client, api.Codec, api.ResourceVersioner},
}
registry.manifestFactory = &BasicManifestFactory{
serviceRegistry: registry,
@ -52,37 +50,42 @@ func MakeEtcdRegistry(client tools.EtcdClient, machines MinionRegistry) *EtcdReg
return registry
}
func makePodKey(machine, podID string) string {
return "/registry/hosts/" + machine + "/pods/" + podID
func makePodKey(podID string) string {
return "/registry/pods/" + podID
}
// ListPods obtains a list of pods that match selector.
func (registry *EtcdRegistry) ListPods(selector labels.Selector) ([]api.Pod, error) {
pods := []api.Pod{}
machines, err := registry.machines.List()
allPods := []api.Pod{}
filteredPods := []api.Pod{}
err := registry.helper.ExtractList("/registry/pods", &allPods)
if err != nil {
return nil, err
}
for _, machine := range machines {
var machinePods []api.Pod
err := registry.helper.ExtractList("/registry/hosts/"+machine+"/pods", &machinePods)
if err != nil {
return pods, err
}
for _, pod := range machinePods {
if selector.Matches(labels.Set(pod.Labels)) {
pod.CurrentState.Host = machine
pods = append(pods, pod)
}
for _, pod := range allPods {
if selector.Matches(labels.Set(pod.Labels)) {
// TODO: Currently nothing sets CurrentState.Host. We need a feedback loop that sets
// the CurrentState.Host and Status fields. Here we pretend that reality perfectly
// matches our desires.
pod.CurrentState.Host = pod.DesiredState.Host
filteredPods = append(filteredPods, pod)
}
}
return pods, nil
return filteredPods, nil
}
// GetPod gets a specific pod specified by its ID.
func (registry *EtcdRegistry) GetPod(podID string) (*api.Pod, error) {
pod, _, err := registry.findPod(podID)
return &pod, err
var pod api.Pod
err := registry.helper.ExtractObj(makePodKey(podID), &pod, false)
if err != nil {
return nil, err
}
// TODO: Currently nothing sets CurrentState.Host. We need a feedback loop that sets
// the CurrentState.Host and Status fields. Here we pretend that reality perfectly
// matches our desires.
pod.CurrentState.Host = pod.DesiredState.Host
return &pod, nil
}
func makeContainerKey(machine string) string {
@ -90,32 +93,65 @@ func makeContainerKey(machine string) string {
}
// CreatePod creates a pod based on a specification, schedule it onto a specific machine.
func (registry *EtcdRegistry) CreatePod(machineIn string, pod api.Pod) error {
podOut, machine, err := registry.findPod(pod.ID)
if err == nil {
// TODO: this error message looks racy.
return fmt.Errorf("a pod named %s already exists on %s (%#v)", pod.ID, machine, podOut)
func (registry *EtcdRegistry) CreatePod(machine string, pod api.Pod) error {
// Set current status to "Waiting".
pod.CurrentState.Status = api.PodWaiting
pod.CurrentState.Host = ""
// DesiredState.Host == "" is a signal to the scheduler that this pod needs scheduling.
pod.DesiredState.Status = api.PodRunning
pod.DesiredState.Host = ""
err := registry.helper.CreateObj(makePodKey(pod.ID), &pod)
if err != nil {
return err
}
return registry.runPod(pod, machineIn)
// TODO: Until scheduler separation is completed, just assign here.
return registry.AssignPod(pod.ID, machine)
}
func (registry *EtcdRegistry) runPod(pod api.Pod, machine string) error {
podKey := makePodKey(machine, pod.ID)
err := registry.helper.SetObj(podKey, pod)
// AssignPod assigns the given pod to the given machine.
// TODO: hook this up via apiserver, not by calling it from CreatePod().
func (registry *EtcdRegistry) AssignPod(podID string, machine string) error {
podKey := makePodKey(podID)
var finalPod *api.Pod
err := registry.helper.AtomicUpdate(
podKey,
&api.Pod{},
func(obj interface{}) (interface{}, error) {
pod, ok := obj.(*api.Pod)
if !ok {
return nil, fmt.Errorf("unexpected object: %#v", obj)
}
pod.DesiredState.Host = machine
finalPod = pod
return pod, nil
},
)
if err != nil {
return err
}
manifest, err := registry.manifestFactory.MakeManifest(machine, pod)
// TODO: move this to a watch/rectification loop.
manifest, err := registry.manifestFactory.MakeManifest(machine, *finalPod)
if err != nil {
return err
}
contKey := makeContainerKey(machine)
err = registry.helper.AtomicUpdate(contKey, &api.ContainerManifestList{}, func(in interface{}) (interface{}, error) {
manifests := *in.(*api.ContainerManifestList)
manifests.Items = append(manifests.Items, manifest)
return manifests, nil
})
err = registry.helper.AtomicUpdate(
contKey,
&api.ContainerManifestList{},
func(in interface{}) (interface{}, error) {
manifests := *in.(*api.ContainerManifestList)
manifests.Items = append(manifests.Items, manifest)
return manifests, nil
},
)
if err != nil {
// Don't strand stuff.
// Don't strand stuff. This is a terrible hack that won't be needed
// when the above TODO is fixed.
err2 := registry.helper.Delete(podKey, false)
if err2 != nil {
glog.Errorf("Probably stranding a pod, couldn't delete %v: %#v", podKey, err2)
@ -130,18 +166,9 @@ func (registry *EtcdRegistry) UpdatePod(pod api.Pod) error {
// DeletePod deletes an existing pod specified by its ID.
func (registry *EtcdRegistry) DeletePod(podID string) error {
_, machine, err := registry.findPod(podID)
if err != nil {
return err
}
return registry.deletePodFromMachine(machine, podID)
}
func (registry *EtcdRegistry) deletePodFromMachine(machine, podID string) error {
// First delete the pod, so a scheduler doesn't notice it getting removed from the
// machine and attempt to put it somewhere.
podKey := makePodKey(machine, podID)
err := registry.helper.Delete(podKey, true)
var pod api.Pod
podKey := makePodKey(podID)
err := registry.helper.ExtractObj(podKey, &pod, false)
if tools.IsEtcdNotFound(err) {
return apiserver.NewNotFoundErr("pod", podID)
}
@ -149,6 +176,22 @@ func (registry *EtcdRegistry) deletePodFromMachine(machine, podID string) error
return err
}
// First delete the pod, so a scheduler doesn't notice it getting removed from the
// machine and attempt to put it somewhere.
err = registry.helper.Delete(podKey, true)
if tools.IsEtcdNotFound(err) {
return apiserver.NewNotFoundErr("pod", podID)
}
if err != nil {
return err
}
machine := pod.DesiredState.Host
if machine == "" {
// Pod was never scheduled anywhere, just return.
return nil
}
// Next, remove the pod from the machine atomically.
contKey := makeContainerKey(machine)
return registry.helper.AtomicUpdate(contKey, &api.ContainerManifestList{}, func(in interface{}) (interface{}, error) {
@ -173,30 +216,6 @@ func (registry *EtcdRegistry) deletePodFromMachine(machine, podID string) error
})
}
func (registry *EtcdRegistry) getPodForMachine(machine, podID string) (pod api.Pod, err error) {
key := makePodKey(machine, podID)
err = registry.helper.ExtractObj(key, &pod, false)
if err != nil {
return
}
pod.CurrentState.Host = machine
return
}
func (registry *EtcdRegistry) findPod(podID string) (api.Pod, string, error) {
machines, err := registry.machines.List()
if err != nil {
return api.Pod{}, "", err
}
for _, machine := range machines {
pod, err := registry.getPodForMachine(machine, podID)
if err == nil {
return pod, machine, nil
}
}
return api.Pod{}, "", apiserver.NewNotFoundErr("pod", podID)
}
// ListControllers obtains a list of ReplicationControllers.
func (registry *EtcdRegistry) ListControllers() ([]api.ReplicationController, error) {
var controllers []api.ReplicationController

View File

@ -24,7 +24,6 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/coreos/go-etcd/etcd"
)
@ -38,7 +37,7 @@ func MakeTestEtcdRegistry(client tools.EtcdClient, machines []string) *EtcdRegis
func TestEtcdGetPod(t *testing.T) {
fakeClient := tools.MakeFakeEtcdClient(t)
fakeClient.Set("/registry/hosts/machine/pods/foo", util.MakeJSONString(api.Pod{JSONBase: api.JSONBase{ID: "foo"}}), 0)
fakeClient.Set("/registry/pods/foo", api.EncodeOrDie(api.Pod{JSONBase: api.JSONBase{ID: "foo"}}), 0)
registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"})
pod, err := registry.GetPod("foo")
if err != nil {
@ -52,7 +51,7 @@ func TestEtcdGetPod(t *testing.T) {
func TestEtcdGetPodNotFound(t *testing.T) {
fakeClient := tools.MakeFakeEtcdClient(t)
fakeClient.Data["/registry/hosts/machine/pods/foo"] = tools.EtcdResponseWithError{
fakeClient.Data["/registry/pods/foo"] = tools.EtcdResponseWithError{
R: &etcd.Response{
Node: nil,
},
@ -68,7 +67,7 @@ func TestEtcdGetPodNotFound(t *testing.T) {
func TestEtcdCreatePod(t *testing.T) {
fakeClient := tools.MakeFakeEtcdClient(t)
fakeClient.TestIndex = true
fakeClient.Data["/registry/hosts/machine/pods/foo"] = tools.EtcdResponseWithError{
fakeClient.Data["/registry/pods/foo"] = tools.EtcdResponseWithError{
R: &etcd.Response{
Node: nil,
},
@ -94,7 +93,7 @@ func TestEtcdCreatePod(t *testing.T) {
t.Errorf("unexpected error: %v", err)
}
resp, err := fakeClient.Get("/registry/hosts/machine/pods/foo", false, false)
resp, err := fakeClient.Get("/registry/pods/foo", false, false)
if err != nil {
t.Fatalf("Unexpected error %v", err)
}
@ -121,10 +120,10 @@ func TestEtcdCreatePod(t *testing.T) {
func TestEtcdCreatePodAlreadyExisting(t *testing.T) {
fakeClient := tools.MakeFakeEtcdClient(t)
fakeClient.Data["/registry/hosts/machine/pods/foo"] = tools.EtcdResponseWithError{
fakeClient.Data["/registry/pods/foo"] = tools.EtcdResponseWithError{
R: &etcd.Response{
Node: &etcd.Node{
Value: util.MakeJSONString(api.Pod{JSONBase: api.JSONBase{ID: "foo"}}),
Value: api.EncodeOrDie(api.Pod{JSONBase: api.JSONBase{ID: "foo"}}),
},
},
E: nil,
@ -142,7 +141,8 @@ func TestEtcdCreatePodAlreadyExisting(t *testing.T) {
func TestEtcdCreatePodWithContainersError(t *testing.T) {
fakeClient := tools.MakeFakeEtcdClient(t)
fakeClient.Data["/registry/hosts/machine/pods/foo"] = tools.EtcdResponseWithError{
fakeClient.TestIndex = true
fakeClient.Data["/registry/pods/foo"] = tools.EtcdResponseWithError{
R: &etcd.Response{
Node: nil,
},
@ -161,9 +161,9 @@ func TestEtcdCreatePodWithContainersError(t *testing.T) {
},
})
if err == nil {
t.Error("Unexpected non-error")
t.Fatalf("Unexpected non-error")
}
_, err = fakeClient.Get("/registry/hosts/machine/pods/foo", false, false)
_, err = fakeClient.Get("/registry/pods/foo", false, false)
if err == nil {
t.Error("Unexpected non-error")
}
@ -174,7 +174,8 @@ func TestEtcdCreatePodWithContainersError(t *testing.T) {
func TestEtcdCreatePodWithContainersNotFound(t *testing.T) {
fakeClient := tools.MakeFakeEtcdClient(t)
fakeClient.Data["/registry/hosts/machine/pods/foo"] = tools.EtcdResponseWithError{
fakeClient.TestIndex = true
fakeClient.Data["/registry/pods/foo"] = tools.EtcdResponseWithError{
R: &etcd.Response{
Node: nil,
},
@ -203,10 +204,10 @@ func TestEtcdCreatePodWithContainersNotFound(t *testing.T) {
},
})
if err != nil {
t.Errorf("unexpected error: %v", err)
t.Fatalf("unexpected error: %v", err)
}
resp, err := fakeClient.Get("/registry/hosts/machine/pods/foo", false, false)
resp, err := fakeClient.Get("/registry/pods/foo", false, false)
if err != nil {
t.Fatalf("Unexpected error %v", err)
}
@ -234,7 +235,7 @@ func TestEtcdCreatePodWithContainersNotFound(t *testing.T) {
func TestEtcdCreatePodWithExistingContainers(t *testing.T) {
fakeClient := tools.MakeFakeEtcdClient(t)
fakeClient.TestIndex = true
fakeClient.Data["/registry/hosts/machine/pods/foo"] = tools.EtcdResponseWithError{
fakeClient.Data["/registry/pods/foo"] = tools.EtcdResponseWithError{
R: &etcd.Response{
Node: nil,
},
@ -265,7 +266,7 @@ func TestEtcdCreatePodWithExistingContainers(t *testing.T) {
t.Errorf("unexpected error: %v", err)
}
resp, err := fakeClient.Get("/registry/hosts/machine/pods/foo", false, false)
resp, err := fakeClient.Get("/registry/pods/foo", false, false)
if err != nil {
t.Fatalf("Unexpected error %v", err)
}
@ -294,8 +295,11 @@ func TestEtcdDeletePod(t *testing.T) {
fakeClient := tools.MakeFakeEtcdClient(t)
fakeClient.TestIndex = true
key := "/registry/hosts/machine/pods/foo"
fakeClient.Set(key, util.MakeJSONString(api.Pod{JSONBase: api.JSONBase{ID: "foo"}}), 0)
key := "/registry/pods/foo"
fakeClient.Set(key, api.EncodeOrDie(api.Pod{
JSONBase: api.JSONBase{ID: "foo"},
DesiredState: api.PodState{Host: "machine"},
}), 0)
fakeClient.Set("/registry/hosts/machine/kubelet", api.EncodeOrDie(&api.ContainerManifestList{
Items: []api.ContainerManifest{
{ID: "foo"},
@ -327,8 +331,11 @@ func TestEtcdDeletePodMultipleContainers(t *testing.T) {
fakeClient := tools.MakeFakeEtcdClient(t)
fakeClient.TestIndex = true
key := "/registry/hosts/machine/pods/foo"
fakeClient.Set(key, util.MakeJSONString(api.Pod{JSONBase: api.JSONBase{ID: "foo"}}), 0)
key := "/registry/pods/foo"
fakeClient.Set(key, api.EncodeOrDie(api.Pod{
JSONBase: api.JSONBase{ID: "foo"},
DesiredState: api.PodState{Host: "machine"},
}), 0)
fakeClient.Set("/registry/hosts/machine/kubelet", api.EncodeOrDie(&api.ContainerManifestList{
Items: []api.ContainerManifest{
{ID: "foo"},
@ -363,7 +370,7 @@ func TestEtcdDeletePodMultipleContainers(t *testing.T) {
func TestEtcdEmptyListPods(t *testing.T) {
fakeClient := tools.MakeFakeEtcdClient(t)
key := "/registry/hosts/machine/pods"
key := "/registry/pods"
fakeClient.Data[key] = tools.EtcdResponseWithError{
R: &etcd.Response{
Node: &etcd.Node{
@ -385,7 +392,7 @@ func TestEtcdEmptyListPods(t *testing.T) {
func TestEtcdListPodsNotFound(t *testing.T) {
fakeClient := tools.MakeFakeEtcdClient(t)
key := "/registry/hosts/machine/pods"
key := "/registry/pods"
fakeClient.Data[key] = tools.EtcdResponseWithError{
R: &etcd.Response{},
E: tools.EtcdErrorNotFound,
@ -403,16 +410,22 @@ func TestEtcdListPodsNotFound(t *testing.T) {
func TestEtcdListPods(t *testing.T) {
fakeClient := tools.MakeFakeEtcdClient(t)
key := "/registry/hosts/machine/pods"
key := "/registry/pods"
fakeClient.Data[key] = tools.EtcdResponseWithError{
R: &etcd.Response{
Node: &etcd.Node{
Nodes: []*etcd.Node{
{
Value: util.MakeJSONString(api.Pod{JSONBase: api.JSONBase{ID: "foo"}}),
Value: api.EncodeOrDie(api.Pod{
JSONBase: api.JSONBase{ID: "foo"},
DesiredState: api.PodState{Host: "machine"},
}),
},
{
Value: util.MakeJSONString(api.Pod{JSONBase: api.JSONBase{ID: "bar"}}),
Value: api.EncodeOrDie(api.Pod{
JSONBase: api.JSONBase{ID: "bar"},
DesiredState: api.PodState{Host: "machine"},
}),
},
},
},
@ -478,10 +491,10 @@ func TestEtcdListControllers(t *testing.T) {
Node: &etcd.Node{
Nodes: []*etcd.Node{
{
Value: util.MakeJSONString(api.ReplicationController{JSONBase: api.JSONBase{ID: "foo"}}),
Value: api.EncodeOrDie(api.ReplicationController{JSONBase: api.JSONBase{ID: "foo"}}),
},
{
Value: util.MakeJSONString(api.ReplicationController{JSONBase: api.JSONBase{ID: "bar"}}),
Value: api.EncodeOrDie(api.ReplicationController{JSONBase: api.JSONBase{ID: "bar"}}),
},
},
},
@ -501,7 +514,7 @@ func TestEtcdListControllers(t *testing.T) {
func TestEtcdGetController(t *testing.T) {
fakeClient := tools.MakeFakeEtcdClient(t)
fakeClient.Set("/registry/controllers/foo", util.MakeJSONString(api.ReplicationController{JSONBase: api.JSONBase{ID: "foo"}}), 0)
fakeClient.Set("/registry/controllers/foo", api.EncodeOrDie(api.ReplicationController{JSONBase: api.JSONBase{ID: "foo"}}), 0)
registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"})
ctrl, err := registry.GetController("foo")
if err != nil {
@ -577,7 +590,7 @@ func TestEtcdCreateController(t *testing.T) {
func TestEtcdCreateControllerAlreadyExisting(t *testing.T) {
fakeClient := tools.MakeFakeEtcdClient(t)
fakeClient.Set("/registry/controllers/foo", util.MakeJSONString(api.ReplicationController{JSONBase: api.JSONBase{ID: "foo"}}), 0)
fakeClient.Set("/registry/controllers/foo", api.EncodeOrDie(api.ReplicationController{JSONBase: api.JSONBase{ID: "foo"}}), 0)
registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"})
err := registry.CreateController(api.ReplicationController{
@ -594,7 +607,7 @@ func TestEtcdUpdateController(t *testing.T) {
fakeClient := tools.MakeFakeEtcdClient(t)
fakeClient.TestIndex = true
resp, _ := fakeClient.Set("/registry/controllers/foo", util.MakeJSONString(api.ReplicationController{JSONBase: api.JSONBase{ID: "foo"}}), 0)
resp, _ := fakeClient.Set("/registry/controllers/foo", api.EncodeOrDie(api.ReplicationController{JSONBase: api.JSONBase{ID: "foo"}}), 0)
registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"})
err := registry.UpdateController(api.ReplicationController{
JSONBase: api.JSONBase{ID: "foo", ResourceVersion: resp.Node.ModifiedIndex},
@ -620,10 +633,10 @@ func TestEtcdListServices(t *testing.T) {
Node: &etcd.Node{
Nodes: []*etcd.Node{
{
Value: util.MakeJSONString(api.Service{JSONBase: api.JSONBase{ID: "foo"}}),
Value: api.EncodeOrDie(api.Service{JSONBase: api.JSONBase{ID: "foo"}}),
},
{
Value: util.MakeJSONString(api.Service{JSONBase: api.JSONBase{ID: "bar"}}),
Value: api.EncodeOrDie(api.Service{JSONBase: api.JSONBase{ID: "bar"}}),
},
},
},
@ -669,7 +682,7 @@ func TestEtcdCreateService(t *testing.T) {
func TestEtcdCreateServiceAlreadyExisting(t *testing.T) {
fakeClient := tools.MakeFakeEtcdClient(t)
fakeClient.Set("/registry/services/specs/foo", util.MakeJSONString(api.Service{JSONBase: api.JSONBase{ID: "foo"}}), 0)
fakeClient.Set("/registry/services/specs/foo", api.EncodeOrDie(api.Service{JSONBase: api.JSONBase{ID: "foo"}}), 0)
registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"})
err := registry.CreateService(api.Service{
JSONBase: api.JSONBase{ID: "foo"},
@ -681,7 +694,7 @@ func TestEtcdCreateServiceAlreadyExisting(t *testing.T) {
func TestEtcdGetService(t *testing.T) {
fakeClient := tools.MakeFakeEtcdClient(t)
fakeClient.Set("/registry/services/specs/foo", util.MakeJSONString(api.Service{JSONBase: api.JSONBase{ID: "foo"}}), 0)
fakeClient.Set("/registry/services/specs/foo", api.EncodeOrDie(api.Service{JSONBase: api.JSONBase{ID: "foo"}}), 0)
registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"})
service, err := registry.GetService("foo")
if err != nil {
@ -733,7 +746,7 @@ func TestEtcdUpdateService(t *testing.T) {
fakeClient := tools.MakeFakeEtcdClient(t)
fakeClient.TestIndex = true
resp, _ := fakeClient.Set("/registry/services/specs/foo", util.MakeJSONString(api.Service{JSONBase: api.JSONBase{ID: "foo"}}), 0)
resp, _ := fakeClient.Set("/registry/services/specs/foo", api.EncodeOrDie(api.Service{JSONBase: api.JSONBase{ID: "foo"}}), 0)
registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"})
testService := api.Service{
JSONBase: api.JSONBase{ID: "foo", ResourceVersion: resp.Node.ModifiedIndex},
@ -771,7 +784,7 @@ func TestEtcdUpdateEndpoints(t *testing.T) {
Endpoints: []string{"baz", "bar"},
}
fakeClient.Set("/registry/services/endpoints/foo", util.MakeJSONString(api.Endpoints{}), 0)
fakeClient.Set("/registry/services/endpoints/foo", api.EncodeOrDie(api.Endpoints{}), 0)
err := registry.UpdateEndpoints(endpoints)
if err != nil {

View File

@ -116,8 +116,8 @@ func (storage *PodRegistryStorage) fillPodInfo(pod *api.Pod) {
}
func makePodStatus(pod *api.Pod) api.PodStatus {
if pod.CurrentState.Info == nil {
return api.PodPending
if pod.CurrentState.Info == nil || pod.CurrentState.Host == "" {
return api.PodWaiting
}
running := 0
stopped := 0
@ -138,11 +138,11 @@ func makePodStatus(pod *api.Pod) api.PodStatus {
case running > 0 && stopped == 0 && unknown == 0:
return api.PodRunning
case running == 0 && stopped > 0 && unknown == 0:
return api.PodStopped
return api.PodTerminated
case running == 0 && stopped == 0 && unknown > 0:
return api.PodPending
return api.PodWaiting
default:
return api.PodPending
return api.PodWaiting
}
}
@ -251,7 +251,7 @@ func (storage *PodRegistryStorage) waitForPodRunning(pod api.Pod) (interface{},
return nil, fmt.Errorf("Error %#v is not an api.Pod!", podObj)
}
switch podPtr.CurrentState.Status {
case api.PodRunning, api.PodStopped:
case api.PodRunning, api.PodTerminated:
return pod, nil
default:
time.Sleep(storage.podPollPeriod)

View File

@ -288,10 +288,13 @@ func TestMakePodStatus(t *testing.T) {
},
},
}
pod := &api.Pod{DesiredState: desiredState}
currentState := api.PodState{
Host: "machine",
}
pod := &api.Pod{DesiredState: desiredState, CurrentState: currentState}
status := makePodStatus(pod)
if status != api.PodPending {
t.Errorf("Expected 'Pending', got '%s'", status)
if status != api.PodWaiting {
t.Errorf("Expected 'Waiting', got '%s'", status)
}
runningState := docker.Container{
@ -313,6 +316,7 @@ func TestMakePodStatus(t *testing.T) {
"containerA": runningState,
"containerB": runningState,
},
Host: "machine",
},
}
status = makePodStatus(pod)
@ -328,11 +332,12 @@ func TestMakePodStatus(t *testing.T) {
"containerA": stoppedState,
"containerB": stoppedState,
},
Host: "machine",
},
}
status = makePodStatus(pod)
if status != api.PodStopped {
t.Errorf("Expected 'Stopped', got '%s'", status)
if status != api.PodTerminated {
t.Errorf("Expected 'Terminated', got '%s'", status)
}
// Mixed state.
@ -343,11 +348,12 @@ func TestMakePodStatus(t *testing.T) {
"containerA": runningState,
"containerB": stoppedState,
},
Host: "machine",
},
}
status = makePodStatus(pod)
if status != api.PodPending {
t.Errorf("Expected 'Pending', got '%s'", status)
if status != api.PodWaiting {
t.Errorf("Expected 'Waiting', got '%s'", status)
}
// Mixed state.
@ -357,11 +363,12 @@ func TestMakePodStatus(t *testing.T) {
Info: map[string]docker.Container{
"containerA": runningState,
},
Host: "machine",
},
}
status = makePodStatus(pod)
if status != api.PodPending {
t.Errorf("Expected 'Pending', got '%s'", status)
if status != api.PodWaiting {
t.Errorf("Expected 'Waiting', got '%s'", status)
}
}
@ -406,7 +413,7 @@ func TestCreatePod(t *testing.T) {
pod: &api.Pod{
JSONBase: api.JSONBase{ID: "foo"},
CurrentState: api.PodState{
Status: api.PodPending,
Host: "machine",
},
},
}

View File

@ -88,6 +88,7 @@ func (f *FakeEtcdClient) generateIndex() uint64 {
}
f.ChangeIndex++
f.t.Logf("generating index %v", f.ChangeIndex)
return f.ChangeIndex
}
@ -116,7 +117,7 @@ func (f *FakeEtcdClient) Get(key string, sort, recursive bool) (*etcd.Response,
func (f *FakeEtcdClient) nodeExists(key string) bool {
result, ok := f.Data[key]
return ok && result.R != nil && result.R.Node != nil
return ok && result.R != nil && result.R.Node != nil && result.E == nil
}
func (f *FakeEtcdClient) setLocked(key, value string, ttl uint64) (*etcd.Response, error) {
@ -129,6 +130,7 @@ func (f *FakeEtcdClient) setLocked(key, value string, ttl uint64) (*etcd.Respons
if f.nodeExists(key) {
prevResult := f.Data[key]
createdIndex := prevResult.R.Node.CreatedIndex
f.t.Logf("updating %v, index %v -> %v", key, createdIndex, i)
result := EtcdResponseWithError{
R: &etcd.Response{
Node: &etcd.Node{
@ -142,6 +144,7 @@ func (f *FakeEtcdClient) setLocked(key, value string, ttl uint64) (*etcd.Respons
return result.R, nil
}
f.t.Logf("creating %v, index %v", key, i)
result := EtcdResponseWithError{
R: &etcd.Response{
Node: &etcd.Node{
@ -164,6 +167,7 @@ func (f *FakeEtcdClient) Set(key, value string, ttl uint64) (*etcd.Response, err
func (f *FakeEtcdClient) CompareAndSwap(key, value string, ttl uint64, prevValue string, prevIndex uint64) (*etcd.Response, error) {
if f.Err != nil {
f.t.Logf("c&s: returning err %v", f.Err)
return nil, f.Err
}
@ -180,16 +184,19 @@ func (f *FakeEtcdClient) CompareAndSwap(key, value string, ttl uint64, prevValue
defer f.Mutex.Unlock()
if !f.nodeExists(key) {
f.t.Logf("c&s: node doesn't exist")
return nil, EtcdErrorNotFound
}
prevNode := f.Data[key].R.Node
if prevValue != "" && prevValue != prevNode.Value {
f.t.Logf("body didn't match")
return nil, EtcdErrorTestFailed
}
if prevIndex != 0 && prevIndex != prevNode.ModifiedIndex {
f.t.Logf("got index %v but needed %v", prevIndex, prevNode.ModifiedIndex)
return nil, EtcdErrorTestFailed
}