mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-05 02:09:56 +00:00
Prepare for external scheduler
1. Change names of Pod statuses (Waiting, Running, Terminated). 2. Store assigned host in etcd. 3. Change pod key to /registry/pods/<podid>. Container location remains the same (/registry/hosts/<machine>/kublet).
This commit is contained in:
parent
e35dfedd79
commit
5cdce0e35a
@ -208,9 +208,12 @@ type PodStatus string
|
|||||||
|
|
||||||
// These are the valid statuses of pods.
|
// These are the valid statuses of pods.
|
||||||
const (
|
const (
|
||||||
|
// PodWaiting means that we're waiting for the pod to begin running.
|
||||||
|
PodWaiting = "Waiting"
|
||||||
|
// PodRunning means that the pod is up and running.
|
||||||
PodRunning PodStatus = "Running"
|
PodRunning PodStatus = "Running"
|
||||||
PodPending PodStatus = "Pending"
|
// PodTerminated means that the pod has stopped.
|
||||||
PodStopped PodStatus = "Stopped"
|
PodTerminated PodStatus = "Terminated"
|
||||||
)
|
)
|
||||||
|
|
||||||
// PodInfo contains one entry for every container with available info.
|
// PodInfo contains one entry for every container with available info.
|
||||||
|
@ -211,9 +211,12 @@ type PodStatus string
|
|||||||
|
|
||||||
// These are the valid statuses of pods.
|
// These are the valid statuses of pods.
|
||||||
const (
|
const (
|
||||||
|
// PodWaiting means that we're waiting for the pod to begin running.
|
||||||
|
PodWaiting = "Waiting"
|
||||||
|
// PodRunning means that the pod is up and running.
|
||||||
PodRunning PodStatus = "Running"
|
PodRunning PodStatus = "Running"
|
||||||
PodPending PodStatus = "Pending"
|
// PodTerminated means that the pod has stopped.
|
||||||
PodStopped PodStatus = "Stopped"
|
PodTerminated PodStatus = "Terminated"
|
||||||
)
|
)
|
||||||
|
|
||||||
// PodInfo contains one entry for every container with available info.
|
// PodInfo contains one entry for every container with available info.
|
||||||
|
@ -130,7 +130,7 @@ func (rm *ReplicationManager) watchControllers(resourceVersion *uint64) {
|
|||||||
func (rm *ReplicationManager) filterActivePods(pods []api.Pod) []api.Pod {
|
func (rm *ReplicationManager) filterActivePods(pods []api.Pod) []api.Pod {
|
||||||
var result []api.Pod
|
var result []api.Pod
|
||||||
for _, value := range pods {
|
for _, value := range pods {
|
||||||
if api.PodStopped != value.CurrentState.Status {
|
if api.PodTerminated != value.CurrentState.Status {
|
||||||
result = append(result, value)
|
result = append(result, value)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -76,13 +76,13 @@ func (p *PodCache) updatePodInfo(host, id string) error {
|
|||||||
func (p *PodCache) UpdateAllContainers() {
|
func (p *PodCache) UpdateAllContainers() {
|
||||||
pods, err := p.pods.ListPods(labels.Everything())
|
pods, err := p.pods.ListPods(labels.Everything())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("Error synchronizing container list: %#v", err)
|
glog.Errorf("Error synchronizing container list: %v", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
for _, pod := range pods {
|
for _, pod := range pods {
|
||||||
err := p.updatePodInfo(pod.CurrentState.Host, pod.ID)
|
err := p.updatePodInfo(pod.CurrentState.Host, pod.ID)
|
||||||
if err != nil && err != client.ErrPodInfoNotAvailable {
|
if err != nil && err != client.ErrPodInfoNotAvailable {
|
||||||
glog.Errorf("Error synchronizing container: %#v", err)
|
glog.Errorf("Error synchronizing container: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -33,7 +33,6 @@ import (
|
|||||||
// EtcdRegistry implements PodRegistry, ControllerRegistry and ServiceRegistry with backed by etcd.
|
// EtcdRegistry implements PodRegistry, ControllerRegistry and ServiceRegistry with backed by etcd.
|
||||||
type EtcdRegistry struct {
|
type EtcdRegistry struct {
|
||||||
helper tools.EtcdHelper
|
helper tools.EtcdHelper
|
||||||
machines MinionRegistry
|
|
||||||
manifestFactory ManifestFactory
|
manifestFactory ManifestFactory
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -44,7 +43,6 @@ type EtcdRegistry struct {
|
|||||||
func MakeEtcdRegistry(client tools.EtcdClient, machines MinionRegistry) *EtcdRegistry {
|
func MakeEtcdRegistry(client tools.EtcdClient, machines MinionRegistry) *EtcdRegistry {
|
||||||
registry := &EtcdRegistry{
|
registry := &EtcdRegistry{
|
||||||
helper: tools.EtcdHelper{client, api.Codec, api.ResourceVersioner},
|
helper: tools.EtcdHelper{client, api.Codec, api.ResourceVersioner},
|
||||||
machines: machines,
|
|
||||||
}
|
}
|
||||||
registry.manifestFactory = &BasicManifestFactory{
|
registry.manifestFactory = &BasicManifestFactory{
|
||||||
serviceRegistry: registry,
|
serviceRegistry: registry,
|
||||||
@ -52,37 +50,34 @@ func MakeEtcdRegistry(client tools.EtcdClient, machines MinionRegistry) *EtcdReg
|
|||||||
return registry
|
return registry
|
||||||
}
|
}
|
||||||
|
|
||||||
func makePodKey(machine, podID string) string {
|
func makePodKey(podID string) string {
|
||||||
return "/registry/hosts/" + machine + "/pods/" + podID
|
return "/registry/pods/" + podID
|
||||||
}
|
}
|
||||||
|
|
||||||
// ListPods obtains a list of pods that match selector.
|
// ListPods obtains a list of pods that match selector.
|
||||||
func (registry *EtcdRegistry) ListPods(selector labels.Selector) ([]api.Pod, error) {
|
func (registry *EtcdRegistry) ListPods(selector labels.Selector) ([]api.Pod, error) {
|
||||||
pods := []api.Pod{}
|
allPods := []api.Pod{}
|
||||||
machines, err := registry.machines.List()
|
filteredPods := []api.Pod{}
|
||||||
|
err := registry.helper.ExtractList("/registry/pods", &allPods)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
for _, machine := range machines {
|
for _, pod := range allPods {
|
||||||
var machinePods []api.Pod
|
|
||||||
err := registry.helper.ExtractList("/registry/hosts/"+machine+"/pods", &machinePods)
|
|
||||||
if err != nil {
|
|
||||||
return pods, err
|
|
||||||
}
|
|
||||||
for _, pod := range machinePods {
|
|
||||||
if selector.Matches(labels.Set(pod.Labels)) {
|
if selector.Matches(labels.Set(pod.Labels)) {
|
||||||
pod.CurrentState.Host = machine
|
filteredPods = append(filteredPods, pod)
|
||||||
pods = append(pods, pod)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
return filteredPods, nil
|
||||||
return pods, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetPod gets a specific pod specified by its ID.
|
// GetPod gets a specific pod specified by its ID.
|
||||||
func (registry *EtcdRegistry) GetPod(podID string) (*api.Pod, error) {
|
func (registry *EtcdRegistry) GetPod(podID string) (*api.Pod, error) {
|
||||||
pod, _, err := registry.findPod(podID)
|
var pod api.Pod
|
||||||
return &pod, err
|
err := registry.helper.ExtractObj(makePodKey(podID), &pod, false)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return &pod, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func makeContainerKey(machine string) string {
|
func makeContainerKey(machine string) string {
|
||||||
@ -90,32 +85,69 @@ func makeContainerKey(machine string) string {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// CreatePod creates a pod based on a specification, schedule it onto a specific machine.
|
// CreatePod creates a pod based on a specification, schedule it onto a specific machine.
|
||||||
func (registry *EtcdRegistry) CreatePod(machineIn string, pod api.Pod) error {
|
func (registry *EtcdRegistry) CreatePod(machine string, pod api.Pod) error {
|
||||||
podOut, machine, err := registry.findPod(pod.ID)
|
// TODO: When our client supports it, switch to atomic creates.
|
||||||
|
var pod2 api.Pod
|
||||||
|
err := registry.helper.ExtractObj(makePodKey(pod.ID), &pod2, false)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
// TODO: this error message looks racy.
|
return fmt.Errorf("a pod named %s already exists (%#v)", pod.ID, pod2)
|
||||||
return fmt.Errorf("a pod named %s already exists on %s (%#v)", pod.ID, machine, podOut)
|
|
||||||
}
|
|
||||||
return registry.runPod(pod, machineIn)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (registry *EtcdRegistry) runPod(pod api.Pod, machine string) error {
|
// Set status to "Waiting".
|
||||||
podKey := makePodKey(machine, pod.ID)
|
pod.CurrentState.Status = api.PodWaiting
|
||||||
err := registry.helper.SetObj(podKey, pod)
|
pod.CurrentState.Host = ""
|
||||||
|
|
||||||
manifest, err := registry.manifestFactory.MakeManifest(machine, pod)
|
err = registry.helper.SetObj(makePodKey(pod.ID), &pod)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: Until scheduler separation is completed, just assign here.
|
||||||
|
return registry.AssignPod(pod.ID, machine)
|
||||||
|
}
|
||||||
|
|
||||||
|
// AssignPod assigns the given pod to the given machine.
|
||||||
|
// TODO: hook this up via apiserver, not by calling it from CreatePod().
|
||||||
|
func (registry *EtcdRegistry) AssignPod(podID string, machine string) error {
|
||||||
|
podKey := makePodKey(podID)
|
||||||
|
var finalPod *api.Pod
|
||||||
|
err := registry.helper.AtomicUpdate(
|
||||||
|
podKey,
|
||||||
|
&api.Pod{},
|
||||||
|
func(obj interface{}) (interface{}, error) {
|
||||||
|
pod, ok := obj.(*api.Pod)
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("unexpected object: %#v", obj)
|
||||||
|
}
|
||||||
|
pod.CurrentState.Host = machine
|
||||||
|
pod.CurrentState.Status = api.PodWaiting
|
||||||
|
finalPod = pod
|
||||||
|
return pod, nil
|
||||||
|
},
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: move this to a watch/rectification loop.
|
||||||
|
manifest, err := registry.manifestFactory.MakeManifest(machine, *finalPod)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
contKey := makeContainerKey(machine)
|
contKey := makeContainerKey(machine)
|
||||||
err = registry.helper.AtomicUpdate(contKey, &api.ContainerManifestList{}, func(in interface{}) (interface{}, error) {
|
err = registry.helper.AtomicUpdate(
|
||||||
|
contKey,
|
||||||
|
&api.ContainerManifestList{},
|
||||||
|
func(in interface{}) (interface{}, error) {
|
||||||
manifests := *in.(*api.ContainerManifestList)
|
manifests := *in.(*api.ContainerManifestList)
|
||||||
manifests.Items = append(manifests.Items, manifest)
|
manifests.Items = append(manifests.Items, manifest)
|
||||||
return manifests, nil
|
return manifests, nil
|
||||||
})
|
},
|
||||||
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// Don't strand stuff.
|
// Don't strand stuff. This is a terrible hack that won't be needed
|
||||||
|
// when the above TODO is fixed.
|
||||||
err2 := registry.helper.Delete(podKey, false)
|
err2 := registry.helper.Delete(podKey, false)
|
||||||
if err2 != nil {
|
if err2 != nil {
|
||||||
glog.Errorf("Probably stranding a pod, couldn't delete %v: %#v", podKey, err2)
|
glog.Errorf("Probably stranding a pod, couldn't delete %v: %#v", podKey, err2)
|
||||||
@ -130,18 +162,9 @@ func (registry *EtcdRegistry) UpdatePod(pod api.Pod) error {
|
|||||||
|
|
||||||
// DeletePod deletes an existing pod specified by its ID.
|
// DeletePod deletes an existing pod specified by its ID.
|
||||||
func (registry *EtcdRegistry) DeletePod(podID string) error {
|
func (registry *EtcdRegistry) DeletePod(podID string) error {
|
||||||
_, machine, err := registry.findPod(podID)
|
var pod api.Pod
|
||||||
if err != nil {
|
podKey := makePodKey(podID)
|
||||||
return err
|
err := registry.helper.ExtractObj(podKey, &pod, false)
|
||||||
}
|
|
||||||
return registry.deletePodFromMachine(machine, podID)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (registry *EtcdRegistry) deletePodFromMachine(machine, podID string) error {
|
|
||||||
// First delete the pod, so a scheduler doesn't notice it getting removed from the
|
|
||||||
// machine and attempt to put it somewhere.
|
|
||||||
podKey := makePodKey(machine, podID)
|
|
||||||
err := registry.helper.Delete(podKey, true)
|
|
||||||
if tools.IsEtcdNotFound(err) {
|
if tools.IsEtcdNotFound(err) {
|
||||||
return apiserver.NewNotFoundErr("pod", podID)
|
return apiserver.NewNotFoundErr("pod", podID)
|
||||||
}
|
}
|
||||||
@ -149,6 +172,22 @@ func (registry *EtcdRegistry) deletePodFromMachine(machine, podID string) error
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// First delete the pod, so a scheduler doesn't notice it getting removed from the
|
||||||
|
// machine and attempt to put it somewhere.
|
||||||
|
err = registry.helper.Delete(podKey, true)
|
||||||
|
if tools.IsEtcdNotFound(err) {
|
||||||
|
return apiserver.NewNotFoundErr("pod", podID)
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
machine := pod.CurrentState.Host
|
||||||
|
if machine == "" {
|
||||||
|
// Pod was never scheduled anywhere, just return.
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// Next, remove the pod from the machine atomically.
|
// Next, remove the pod from the machine atomically.
|
||||||
contKey := makeContainerKey(machine)
|
contKey := makeContainerKey(machine)
|
||||||
return registry.helper.AtomicUpdate(contKey, &api.ContainerManifestList{}, func(in interface{}) (interface{}, error) {
|
return registry.helper.AtomicUpdate(contKey, &api.ContainerManifestList{}, func(in interface{}) (interface{}, error) {
|
||||||
@ -173,30 +212,6 @@ func (registry *EtcdRegistry) deletePodFromMachine(machine, podID string) error
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (registry *EtcdRegistry) getPodForMachine(machine, podID string) (pod api.Pod, err error) {
|
|
||||||
key := makePodKey(machine, podID)
|
|
||||||
err = registry.helper.ExtractObj(key, &pod, false)
|
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
pod.CurrentState.Host = machine
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
func (registry *EtcdRegistry) findPod(podID string) (api.Pod, string, error) {
|
|
||||||
machines, err := registry.machines.List()
|
|
||||||
if err != nil {
|
|
||||||
return api.Pod{}, "", err
|
|
||||||
}
|
|
||||||
for _, machine := range machines {
|
|
||||||
pod, err := registry.getPodForMachine(machine, podID)
|
|
||||||
if err == nil {
|
|
||||||
return pod, machine, nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return api.Pod{}, "", apiserver.NewNotFoundErr("pod", podID)
|
|
||||||
}
|
|
||||||
|
|
||||||
// ListControllers obtains a list of ReplicationControllers.
|
// ListControllers obtains a list of ReplicationControllers.
|
||||||
func (registry *EtcdRegistry) ListControllers() ([]api.ReplicationController, error) {
|
func (registry *EtcdRegistry) ListControllers() ([]api.ReplicationController, error) {
|
||||||
var controllers []api.ReplicationController
|
var controllers []api.ReplicationController
|
||||||
|
@ -24,7 +24,6 @@ import (
|
|||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
|
||||||
"github.com/coreos/go-etcd/etcd"
|
"github.com/coreos/go-etcd/etcd"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -38,7 +37,7 @@ func MakeTestEtcdRegistry(client tools.EtcdClient, machines []string) *EtcdRegis
|
|||||||
|
|
||||||
func TestEtcdGetPod(t *testing.T) {
|
func TestEtcdGetPod(t *testing.T) {
|
||||||
fakeClient := tools.MakeFakeEtcdClient(t)
|
fakeClient := tools.MakeFakeEtcdClient(t)
|
||||||
fakeClient.Set("/registry/hosts/machine/pods/foo", util.MakeJSONString(api.Pod{JSONBase: api.JSONBase{ID: "foo"}}), 0)
|
fakeClient.Set("/registry/pods/foo", api.EncodeOrDie(api.Pod{JSONBase: api.JSONBase{ID: "foo"}}), 0)
|
||||||
registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"})
|
registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"})
|
||||||
pod, err := registry.GetPod("foo")
|
pod, err := registry.GetPod("foo")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -52,7 +51,7 @@ func TestEtcdGetPod(t *testing.T) {
|
|||||||
|
|
||||||
func TestEtcdGetPodNotFound(t *testing.T) {
|
func TestEtcdGetPodNotFound(t *testing.T) {
|
||||||
fakeClient := tools.MakeFakeEtcdClient(t)
|
fakeClient := tools.MakeFakeEtcdClient(t)
|
||||||
fakeClient.Data["/registry/hosts/machine/pods/foo"] = tools.EtcdResponseWithError{
|
fakeClient.Data["/registry/pods/foo"] = tools.EtcdResponseWithError{
|
||||||
R: &etcd.Response{
|
R: &etcd.Response{
|
||||||
Node: nil,
|
Node: nil,
|
||||||
},
|
},
|
||||||
@ -68,7 +67,7 @@ func TestEtcdGetPodNotFound(t *testing.T) {
|
|||||||
func TestEtcdCreatePod(t *testing.T) {
|
func TestEtcdCreatePod(t *testing.T) {
|
||||||
fakeClient := tools.MakeFakeEtcdClient(t)
|
fakeClient := tools.MakeFakeEtcdClient(t)
|
||||||
fakeClient.TestIndex = true
|
fakeClient.TestIndex = true
|
||||||
fakeClient.Data["/registry/hosts/machine/pods/foo"] = tools.EtcdResponseWithError{
|
fakeClient.Data["/registry/pods/foo"] = tools.EtcdResponseWithError{
|
||||||
R: &etcd.Response{
|
R: &etcd.Response{
|
||||||
Node: nil,
|
Node: nil,
|
||||||
},
|
},
|
||||||
@ -94,7 +93,7 @@ func TestEtcdCreatePod(t *testing.T) {
|
|||||||
t.Errorf("unexpected error: %v", err)
|
t.Errorf("unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
resp, err := fakeClient.Get("/registry/hosts/machine/pods/foo", false, false)
|
resp, err := fakeClient.Get("/registry/pods/foo", false, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Unexpected error %v", err)
|
t.Fatalf("Unexpected error %v", err)
|
||||||
}
|
}
|
||||||
@ -121,10 +120,10 @@ func TestEtcdCreatePod(t *testing.T) {
|
|||||||
|
|
||||||
func TestEtcdCreatePodAlreadyExisting(t *testing.T) {
|
func TestEtcdCreatePodAlreadyExisting(t *testing.T) {
|
||||||
fakeClient := tools.MakeFakeEtcdClient(t)
|
fakeClient := tools.MakeFakeEtcdClient(t)
|
||||||
fakeClient.Data["/registry/hosts/machine/pods/foo"] = tools.EtcdResponseWithError{
|
fakeClient.Data["/registry/pods/foo"] = tools.EtcdResponseWithError{
|
||||||
R: &etcd.Response{
|
R: &etcd.Response{
|
||||||
Node: &etcd.Node{
|
Node: &etcd.Node{
|
||||||
Value: util.MakeJSONString(api.Pod{JSONBase: api.JSONBase{ID: "foo"}}),
|
Value: api.EncodeOrDie(api.Pod{JSONBase: api.JSONBase{ID: "foo"}}),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
E: nil,
|
E: nil,
|
||||||
@ -142,7 +141,7 @@ func TestEtcdCreatePodAlreadyExisting(t *testing.T) {
|
|||||||
|
|
||||||
func TestEtcdCreatePodWithContainersError(t *testing.T) {
|
func TestEtcdCreatePodWithContainersError(t *testing.T) {
|
||||||
fakeClient := tools.MakeFakeEtcdClient(t)
|
fakeClient := tools.MakeFakeEtcdClient(t)
|
||||||
fakeClient.Data["/registry/hosts/machine/pods/foo"] = tools.EtcdResponseWithError{
|
fakeClient.Data["/registry/pods/foo"] = tools.EtcdResponseWithError{
|
||||||
R: &etcd.Response{
|
R: &etcd.Response{
|
||||||
Node: nil,
|
Node: nil,
|
||||||
},
|
},
|
||||||
@ -163,7 +162,7 @@ func TestEtcdCreatePodWithContainersError(t *testing.T) {
|
|||||||
if err == nil {
|
if err == nil {
|
||||||
t.Error("Unexpected non-error")
|
t.Error("Unexpected non-error")
|
||||||
}
|
}
|
||||||
_, err = fakeClient.Get("/registry/hosts/machine/pods/foo", false, false)
|
_, err = fakeClient.Get("/registry/pods/foo", false, false)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
t.Error("Unexpected non-error")
|
t.Error("Unexpected non-error")
|
||||||
}
|
}
|
||||||
@ -174,7 +173,7 @@ func TestEtcdCreatePodWithContainersError(t *testing.T) {
|
|||||||
|
|
||||||
func TestEtcdCreatePodWithContainersNotFound(t *testing.T) {
|
func TestEtcdCreatePodWithContainersNotFound(t *testing.T) {
|
||||||
fakeClient := tools.MakeFakeEtcdClient(t)
|
fakeClient := tools.MakeFakeEtcdClient(t)
|
||||||
fakeClient.Data["/registry/hosts/machine/pods/foo"] = tools.EtcdResponseWithError{
|
fakeClient.Data["/registry/pods/foo"] = tools.EtcdResponseWithError{
|
||||||
R: &etcd.Response{
|
R: &etcd.Response{
|
||||||
Node: nil,
|
Node: nil,
|
||||||
},
|
},
|
||||||
@ -206,7 +205,7 @@ func TestEtcdCreatePodWithContainersNotFound(t *testing.T) {
|
|||||||
t.Errorf("unexpected error: %v", err)
|
t.Errorf("unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
resp, err := fakeClient.Get("/registry/hosts/machine/pods/foo", false, false)
|
resp, err := fakeClient.Get("/registry/pods/foo", false, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Unexpected error %v", err)
|
t.Fatalf("Unexpected error %v", err)
|
||||||
}
|
}
|
||||||
@ -234,7 +233,7 @@ func TestEtcdCreatePodWithContainersNotFound(t *testing.T) {
|
|||||||
func TestEtcdCreatePodWithExistingContainers(t *testing.T) {
|
func TestEtcdCreatePodWithExistingContainers(t *testing.T) {
|
||||||
fakeClient := tools.MakeFakeEtcdClient(t)
|
fakeClient := tools.MakeFakeEtcdClient(t)
|
||||||
fakeClient.TestIndex = true
|
fakeClient.TestIndex = true
|
||||||
fakeClient.Data["/registry/hosts/machine/pods/foo"] = tools.EtcdResponseWithError{
|
fakeClient.Data["/registry/pods/foo"] = tools.EtcdResponseWithError{
|
||||||
R: &etcd.Response{
|
R: &etcd.Response{
|
||||||
Node: nil,
|
Node: nil,
|
||||||
},
|
},
|
||||||
@ -265,7 +264,7 @@ func TestEtcdCreatePodWithExistingContainers(t *testing.T) {
|
|||||||
t.Errorf("unexpected error: %v", err)
|
t.Errorf("unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
resp, err := fakeClient.Get("/registry/hosts/machine/pods/foo", false, false)
|
resp, err := fakeClient.Get("/registry/pods/foo", false, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Unexpected error %v", err)
|
t.Fatalf("Unexpected error %v", err)
|
||||||
}
|
}
|
||||||
@ -294,8 +293,11 @@ func TestEtcdDeletePod(t *testing.T) {
|
|||||||
fakeClient := tools.MakeFakeEtcdClient(t)
|
fakeClient := tools.MakeFakeEtcdClient(t)
|
||||||
fakeClient.TestIndex = true
|
fakeClient.TestIndex = true
|
||||||
|
|
||||||
key := "/registry/hosts/machine/pods/foo"
|
key := "/registry/pods/foo"
|
||||||
fakeClient.Set(key, util.MakeJSONString(api.Pod{JSONBase: api.JSONBase{ID: "foo"}}), 0)
|
fakeClient.Set(key, api.EncodeOrDie(api.Pod{
|
||||||
|
JSONBase: api.JSONBase{ID: "foo"},
|
||||||
|
CurrentState: api.PodState{Host: "machine"},
|
||||||
|
}), 0)
|
||||||
fakeClient.Set("/registry/hosts/machine/kubelet", api.EncodeOrDie(&api.ContainerManifestList{
|
fakeClient.Set("/registry/hosts/machine/kubelet", api.EncodeOrDie(&api.ContainerManifestList{
|
||||||
Items: []api.ContainerManifest{
|
Items: []api.ContainerManifest{
|
||||||
{ID: "foo"},
|
{ID: "foo"},
|
||||||
@ -327,8 +329,11 @@ func TestEtcdDeletePodMultipleContainers(t *testing.T) {
|
|||||||
fakeClient := tools.MakeFakeEtcdClient(t)
|
fakeClient := tools.MakeFakeEtcdClient(t)
|
||||||
fakeClient.TestIndex = true
|
fakeClient.TestIndex = true
|
||||||
|
|
||||||
key := "/registry/hosts/machine/pods/foo"
|
key := "/registry/pods/foo"
|
||||||
fakeClient.Set(key, util.MakeJSONString(api.Pod{JSONBase: api.JSONBase{ID: "foo"}}), 0)
|
fakeClient.Set(key, api.EncodeOrDie(api.Pod{
|
||||||
|
JSONBase: api.JSONBase{ID: "foo"},
|
||||||
|
CurrentState: api.PodState{Host: "machine"},
|
||||||
|
}), 0)
|
||||||
fakeClient.Set("/registry/hosts/machine/kubelet", api.EncodeOrDie(&api.ContainerManifestList{
|
fakeClient.Set("/registry/hosts/machine/kubelet", api.EncodeOrDie(&api.ContainerManifestList{
|
||||||
Items: []api.ContainerManifest{
|
Items: []api.ContainerManifest{
|
||||||
{ID: "foo"},
|
{ID: "foo"},
|
||||||
@ -363,7 +368,7 @@ func TestEtcdDeletePodMultipleContainers(t *testing.T) {
|
|||||||
|
|
||||||
func TestEtcdEmptyListPods(t *testing.T) {
|
func TestEtcdEmptyListPods(t *testing.T) {
|
||||||
fakeClient := tools.MakeFakeEtcdClient(t)
|
fakeClient := tools.MakeFakeEtcdClient(t)
|
||||||
key := "/registry/hosts/machine/pods"
|
key := "/registry/pods"
|
||||||
fakeClient.Data[key] = tools.EtcdResponseWithError{
|
fakeClient.Data[key] = tools.EtcdResponseWithError{
|
||||||
R: &etcd.Response{
|
R: &etcd.Response{
|
||||||
Node: &etcd.Node{
|
Node: &etcd.Node{
|
||||||
@ -385,7 +390,7 @@ func TestEtcdEmptyListPods(t *testing.T) {
|
|||||||
|
|
||||||
func TestEtcdListPodsNotFound(t *testing.T) {
|
func TestEtcdListPodsNotFound(t *testing.T) {
|
||||||
fakeClient := tools.MakeFakeEtcdClient(t)
|
fakeClient := tools.MakeFakeEtcdClient(t)
|
||||||
key := "/registry/hosts/machine/pods"
|
key := "/registry/pods"
|
||||||
fakeClient.Data[key] = tools.EtcdResponseWithError{
|
fakeClient.Data[key] = tools.EtcdResponseWithError{
|
||||||
R: &etcd.Response{},
|
R: &etcd.Response{},
|
||||||
E: tools.EtcdErrorNotFound,
|
E: tools.EtcdErrorNotFound,
|
||||||
@ -403,16 +408,22 @@ func TestEtcdListPodsNotFound(t *testing.T) {
|
|||||||
|
|
||||||
func TestEtcdListPods(t *testing.T) {
|
func TestEtcdListPods(t *testing.T) {
|
||||||
fakeClient := tools.MakeFakeEtcdClient(t)
|
fakeClient := tools.MakeFakeEtcdClient(t)
|
||||||
key := "/registry/hosts/machine/pods"
|
key := "/registry/pods"
|
||||||
fakeClient.Data[key] = tools.EtcdResponseWithError{
|
fakeClient.Data[key] = tools.EtcdResponseWithError{
|
||||||
R: &etcd.Response{
|
R: &etcd.Response{
|
||||||
Node: &etcd.Node{
|
Node: &etcd.Node{
|
||||||
Nodes: []*etcd.Node{
|
Nodes: []*etcd.Node{
|
||||||
{
|
{
|
||||||
Value: util.MakeJSONString(api.Pod{JSONBase: api.JSONBase{ID: "foo"}}),
|
Value: api.EncodeOrDie(api.Pod{
|
||||||
|
JSONBase: api.JSONBase{ID: "foo"},
|
||||||
|
CurrentState: api.PodState{Host: "machine"},
|
||||||
|
}),
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
Value: util.MakeJSONString(api.Pod{JSONBase: api.JSONBase{ID: "bar"}}),
|
Value: api.EncodeOrDie(api.Pod{
|
||||||
|
JSONBase: api.JSONBase{ID: "bar"},
|
||||||
|
CurrentState: api.PodState{Host: "machine"},
|
||||||
|
}),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@ -478,10 +489,10 @@ func TestEtcdListControllers(t *testing.T) {
|
|||||||
Node: &etcd.Node{
|
Node: &etcd.Node{
|
||||||
Nodes: []*etcd.Node{
|
Nodes: []*etcd.Node{
|
||||||
{
|
{
|
||||||
Value: util.MakeJSONString(api.ReplicationController{JSONBase: api.JSONBase{ID: "foo"}}),
|
Value: api.EncodeOrDie(api.ReplicationController{JSONBase: api.JSONBase{ID: "foo"}}),
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
Value: util.MakeJSONString(api.ReplicationController{JSONBase: api.JSONBase{ID: "bar"}}),
|
Value: api.EncodeOrDie(api.ReplicationController{JSONBase: api.JSONBase{ID: "bar"}}),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@ -501,7 +512,7 @@ func TestEtcdListControllers(t *testing.T) {
|
|||||||
|
|
||||||
func TestEtcdGetController(t *testing.T) {
|
func TestEtcdGetController(t *testing.T) {
|
||||||
fakeClient := tools.MakeFakeEtcdClient(t)
|
fakeClient := tools.MakeFakeEtcdClient(t)
|
||||||
fakeClient.Set("/registry/controllers/foo", util.MakeJSONString(api.ReplicationController{JSONBase: api.JSONBase{ID: "foo"}}), 0)
|
fakeClient.Set("/registry/controllers/foo", api.EncodeOrDie(api.ReplicationController{JSONBase: api.JSONBase{ID: "foo"}}), 0)
|
||||||
registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"})
|
registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"})
|
||||||
ctrl, err := registry.GetController("foo")
|
ctrl, err := registry.GetController("foo")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -594,7 +605,7 @@ func TestEtcdUpdateController(t *testing.T) {
|
|||||||
fakeClient := tools.MakeFakeEtcdClient(t)
|
fakeClient := tools.MakeFakeEtcdClient(t)
|
||||||
fakeClient.TestIndex = true
|
fakeClient.TestIndex = true
|
||||||
|
|
||||||
resp, _ := fakeClient.Set("/registry/controllers/foo", util.MakeJSONString(api.ReplicationController{JSONBase: api.JSONBase{ID: "foo"}}), 0)
|
resp, _ := fakeClient.Set("/registry/controllers/foo", api.EncodeOrDie(api.ReplicationController{JSONBase: api.JSONBase{ID: "foo"}}), 0)
|
||||||
registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"})
|
registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"})
|
||||||
err := registry.UpdateController(api.ReplicationController{
|
err := registry.UpdateController(api.ReplicationController{
|
||||||
JSONBase: api.JSONBase{ID: "foo", ResourceVersion: resp.Node.ModifiedIndex},
|
JSONBase: api.JSONBase{ID: "foo", ResourceVersion: resp.Node.ModifiedIndex},
|
||||||
@ -620,10 +631,10 @@ func TestEtcdListServices(t *testing.T) {
|
|||||||
Node: &etcd.Node{
|
Node: &etcd.Node{
|
||||||
Nodes: []*etcd.Node{
|
Nodes: []*etcd.Node{
|
||||||
{
|
{
|
||||||
Value: util.MakeJSONString(api.Service{JSONBase: api.JSONBase{ID: "foo"}}),
|
Value: api.EncodeOrDie(api.Service{JSONBase: api.JSONBase{ID: "foo"}}),
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
Value: util.MakeJSONString(api.Service{JSONBase: api.JSONBase{ID: "bar"}}),
|
Value: api.EncodeOrDie(api.Service{JSONBase: api.JSONBase{ID: "bar"}}),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@ -681,7 +692,7 @@ func TestEtcdCreateServiceAlreadyExisting(t *testing.T) {
|
|||||||
|
|
||||||
func TestEtcdGetService(t *testing.T) {
|
func TestEtcdGetService(t *testing.T) {
|
||||||
fakeClient := tools.MakeFakeEtcdClient(t)
|
fakeClient := tools.MakeFakeEtcdClient(t)
|
||||||
fakeClient.Set("/registry/services/specs/foo", util.MakeJSONString(api.Service{JSONBase: api.JSONBase{ID: "foo"}}), 0)
|
fakeClient.Set("/registry/services/specs/foo", api.EncodeOrDie(api.Service{JSONBase: api.JSONBase{ID: "foo"}}), 0)
|
||||||
registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"})
|
registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"})
|
||||||
service, err := registry.GetService("foo")
|
service, err := registry.GetService("foo")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -733,7 +744,7 @@ func TestEtcdUpdateService(t *testing.T) {
|
|||||||
fakeClient := tools.MakeFakeEtcdClient(t)
|
fakeClient := tools.MakeFakeEtcdClient(t)
|
||||||
fakeClient.TestIndex = true
|
fakeClient.TestIndex = true
|
||||||
|
|
||||||
resp, _ := fakeClient.Set("/registry/services/specs/foo", util.MakeJSONString(api.Service{JSONBase: api.JSONBase{ID: "foo"}}), 0)
|
resp, _ := fakeClient.Set("/registry/services/specs/foo", api.EncodeOrDie(api.Service{JSONBase: api.JSONBase{ID: "foo"}}), 0)
|
||||||
registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"})
|
registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"})
|
||||||
testService := api.Service{
|
testService := api.Service{
|
||||||
JSONBase: api.JSONBase{ID: "foo", ResourceVersion: resp.Node.ModifiedIndex},
|
JSONBase: api.JSONBase{ID: "foo", ResourceVersion: resp.Node.ModifiedIndex},
|
||||||
|
@ -116,8 +116,8 @@ func (storage *PodRegistryStorage) fillPodInfo(pod *api.Pod) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func makePodStatus(pod *api.Pod) api.PodStatus {
|
func makePodStatus(pod *api.Pod) api.PodStatus {
|
||||||
if pod.CurrentState.Info == nil {
|
if pod.CurrentState.Info == nil || pod.CurrentState.Host == "" {
|
||||||
return api.PodPending
|
return api.PodWaiting
|
||||||
}
|
}
|
||||||
running := 0
|
running := 0
|
||||||
stopped := 0
|
stopped := 0
|
||||||
@ -138,11 +138,11 @@ func makePodStatus(pod *api.Pod) api.PodStatus {
|
|||||||
case running > 0 && stopped == 0 && unknown == 0:
|
case running > 0 && stopped == 0 && unknown == 0:
|
||||||
return api.PodRunning
|
return api.PodRunning
|
||||||
case running == 0 && stopped > 0 && unknown == 0:
|
case running == 0 && stopped > 0 && unknown == 0:
|
||||||
return api.PodStopped
|
return api.PodTerminated
|
||||||
case running == 0 && stopped == 0 && unknown > 0:
|
case running == 0 && stopped == 0 && unknown > 0:
|
||||||
return api.PodPending
|
return api.PodWaiting
|
||||||
default:
|
default:
|
||||||
return api.PodPending
|
return api.PodWaiting
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -251,7 +251,7 @@ func (storage *PodRegistryStorage) waitForPodRunning(pod api.Pod) (interface{},
|
|||||||
return nil, fmt.Errorf("Error %#v is not an api.Pod!", podObj)
|
return nil, fmt.Errorf("Error %#v is not an api.Pod!", podObj)
|
||||||
}
|
}
|
||||||
switch podPtr.CurrentState.Status {
|
switch podPtr.CurrentState.Status {
|
||||||
case api.PodRunning, api.PodStopped:
|
case api.PodRunning, api.PodTerminated:
|
||||||
return pod, nil
|
return pod, nil
|
||||||
default:
|
default:
|
||||||
time.Sleep(storage.podPollPeriod)
|
time.Sleep(storage.podPollPeriod)
|
||||||
|
@ -288,10 +288,13 @@ func TestMakePodStatus(t *testing.T) {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
pod := &api.Pod{DesiredState: desiredState}
|
currentState := api.PodState{
|
||||||
|
Host: "machine",
|
||||||
|
}
|
||||||
|
pod := &api.Pod{DesiredState: desiredState, CurrentState: currentState}
|
||||||
status := makePodStatus(pod)
|
status := makePodStatus(pod)
|
||||||
if status != api.PodPending {
|
if status != api.PodWaiting {
|
||||||
t.Errorf("Expected 'Pending', got '%s'", status)
|
t.Errorf("Expected 'Waiting', got '%s'", status)
|
||||||
}
|
}
|
||||||
|
|
||||||
runningState := docker.Container{
|
runningState := docker.Container{
|
||||||
@ -313,6 +316,7 @@ func TestMakePodStatus(t *testing.T) {
|
|||||||
"containerA": runningState,
|
"containerA": runningState,
|
||||||
"containerB": runningState,
|
"containerB": runningState,
|
||||||
},
|
},
|
||||||
|
Host: "machine",
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
status = makePodStatus(pod)
|
status = makePodStatus(pod)
|
||||||
@ -328,11 +332,12 @@ func TestMakePodStatus(t *testing.T) {
|
|||||||
"containerA": stoppedState,
|
"containerA": stoppedState,
|
||||||
"containerB": stoppedState,
|
"containerB": stoppedState,
|
||||||
},
|
},
|
||||||
|
Host: "machine",
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
status = makePodStatus(pod)
|
status = makePodStatus(pod)
|
||||||
if status != api.PodStopped {
|
if status != api.PodTerminated {
|
||||||
t.Errorf("Expected 'Stopped', got '%s'", status)
|
t.Errorf("Expected 'Terminated', got '%s'", status)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Mixed state.
|
// Mixed state.
|
||||||
@ -343,11 +348,12 @@ func TestMakePodStatus(t *testing.T) {
|
|||||||
"containerA": runningState,
|
"containerA": runningState,
|
||||||
"containerB": stoppedState,
|
"containerB": stoppedState,
|
||||||
},
|
},
|
||||||
|
Host: "machine",
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
status = makePodStatus(pod)
|
status = makePodStatus(pod)
|
||||||
if status != api.PodPending {
|
if status != api.PodWaiting {
|
||||||
t.Errorf("Expected 'Pending', got '%s'", status)
|
t.Errorf("Expected 'Waiting', got '%s'", status)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Mixed state.
|
// Mixed state.
|
||||||
@ -357,11 +363,12 @@ func TestMakePodStatus(t *testing.T) {
|
|||||||
Info: map[string]docker.Container{
|
Info: map[string]docker.Container{
|
||||||
"containerA": runningState,
|
"containerA": runningState,
|
||||||
},
|
},
|
||||||
|
Host: "machine",
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
status = makePodStatus(pod)
|
status = makePodStatus(pod)
|
||||||
if status != api.PodPending {
|
if status != api.PodWaiting {
|
||||||
t.Errorf("Expected 'Pending', got '%s'", status)
|
t.Errorf("Expected 'Waiting', got '%s'", status)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -406,7 +413,7 @@ func TestCreatePod(t *testing.T) {
|
|||||||
pod: &api.Pod{
|
pod: &api.Pod{
|
||||||
JSONBase: api.JSONBase{ID: "foo"},
|
JSONBase: api.JSONBase{ID: "foo"},
|
||||||
CurrentState: api.PodState{
|
CurrentState: api.PodState{
|
||||||
Status: api.PodPending,
|
Host: "machine",
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user