From 13d7a5959a15ceea459f35e2fa8ab47681cd19b1 Mon Sep 17 00:00:00 2001 From: Brendan Burns Date: Fri, 27 Jun 2014 17:43:36 -0700 Subject: [PATCH] Add sync behavior to the pod registry. Expand tests. --- pkg/api/types.go | 10 ++++- pkg/controller/replication_controller.go | 3 +- pkg/registry/controller_registry_test.go | 2 +- pkg/registry/mock_registry.go | 2 + pkg/registry/pod_registry.go | 37 +++++++++++++++--- pkg/registry/pod_registry_test.go | 48 ++++++++++++++++++++++-- 6 files changed, 89 insertions(+), 13 deletions(-) diff --git a/pkg/api/types.go b/pkg/api/types.go index f7786b38eab..fb9a4b3b16b 100644 --- a/pkg/api/types.go +++ b/pkg/api/types.go @@ -100,10 +100,18 @@ type JSONBase struct { SelfLink string `json:"selfLink,omitempty" yaml:"selfLink,omitempty"` } +type PodStatus string + +const ( + PodRunning PodStatus = "Running" + PodPending PodStatus = "Pending" + PodStopped PodStatus = "Stopped" +) + // PodState is the state of a pod, used as either input (desired state) or output (current state) type PodState struct { Manifest ContainerManifest `json:"manifest,omitempty" yaml:"manifest,omitempty"` - Status string `json:"status,omitempty" yaml:"status,omitempty"` + Status PodStatus `json:"status,omitempty" yaml:"status,omitempty"` Host string `json:"host,omitempty" yaml:"host,omitempty"` HostIP string `json:"hostIP,omitempty" yaml:"hostIP,omitempty"` Info interface{} `json:"info,omitempty" yaml:"info,omitempty"` diff --git a/pkg/controller/replication_controller.go b/pkg/controller/replication_controller.go index e156247e544..133e5aac21a 100644 --- a/pkg/controller/replication_controller.go +++ b/pkg/controller/replication_controller.go @@ -20,7 +20,6 @@ import ( "encoding/json" "fmt" "math/rand" - "strings" "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" @@ -170,7 +169,7 @@ func (rm *ReplicationManager) handleWatchResponse(response *etcd.Response) (*api func (rm *ReplicationManager) filterActivePods(pods []api.Pod) []api.Pod { var result []api.Pod for _, value := range pods { - if strings.Index(value.CurrentState.Status, "Exit") == -1 { + if api.PodStopped != value.CurrentState.Status { result = append(result, value) } } diff --git a/pkg/registry/controller_registry_test.go b/pkg/registry/controller_registry_test.go index 73728ca91dc..ffcbb2a6ea9 100644 --- a/pkg/registry/controller_registry_test.go +++ b/pkg/registry/controller_registry_test.go @@ -211,7 +211,7 @@ func TestCreateController(t *testing.T) { expectNoError(t, err) select { - case <-time.After(time.Second * 1): + case <-time.After(time.Millisecond * 100): // Do nothing, this is expected. case <-channel: t.Error("Unexpected read from async channel") diff --git a/pkg/registry/mock_registry.go b/pkg/registry/mock_registry.go index 0d36ec0700a..2db7b4fe5c0 100644 --- a/pkg/registry/mock_registry.go +++ b/pkg/registry/mock_registry.go @@ -66,8 +66,10 @@ func (registry *MockPodRegistry) CreatePod(machine string, pod api.Pod) error { func (registry *MockPodRegistry) UpdatePod(pod api.Pod) error { registry.Lock() defer registry.Unlock() + registry.pod = &pod return registry.err } + func (registry *MockPodRegistry) DeletePod(podId string) error { registry.Lock() defer registry.Unlock() diff --git a/pkg/registry/pod_registry.go b/pkg/registry/pod_registry.go index c8a1856c13f..6b1321366e4 100644 --- a/pkg/registry/pod_registry.go +++ b/pkg/registry/pod_registry.go @@ -19,6 +19,7 @@ package registry import ( "fmt" "strings" + "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver" @@ -37,6 +38,7 @@ type PodRegistryStorage struct { scheduler scheduler.Scheduler minionLister scheduler.MinionLister cloud cloudprovider.Interface + podPollPeriod time.Duration } // MakePodRegistryStorage makes a RESTStorage object for a pod registry. @@ -60,6 +62,7 @@ func MakePodRegistryStorage(registry PodRegistry, minionLister: minionLister, cloud: cloud, podCache: podCache, + podPollPeriod: time.Second * 10, } } @@ -85,17 +88,17 @@ func (storage *PodRegistryStorage) List(selector labels.Selector) (interface{}, return result, err } -func makePodStatus(info interface{}) string { +func makePodStatus(info interface{}) api.PodStatus { if state, ok := info.(map[string]interface{})["State"]; ok { if running, ok := state.(map[string]interface{})["Running"]; ok { if running.(bool) { - return "Running" + return api.PodRunning } else { - return "Stopped" + return api.PodStopped } } } - return "Pending" + return api.PodPending } func getInstanceIP(cloud cloudprovider.Interface, host string) string { @@ -167,7 +170,7 @@ func (storage *PodRegistryStorage) Create(obj interface{}) (<-chan interface{}, if err != nil { return nil, err } - return storage.registry.GetPod(pod.ID) + return storage.waitForPodRunning(pod) }), nil } @@ -182,6 +185,28 @@ func (storage *PodRegistryStorage) Update(obj interface{}) (<-chan interface{}, if err != nil { return nil, err } - return storage.registry.GetPod(pod.ID) + return storage.waitForPodRunning(pod) }), nil } + +func (storage *PodRegistryStorage) waitForPodRunning(pod api.Pod) (interface{}, error) { + for { + podObj, err := storage.Get(pod.ID) + + if err != nil || podObj == nil { + return nil, err + } + podPtr, ok := podObj.(*api.Pod) + if !ok { + // This should really never happen. + return nil, fmt.Errorf("Error %#v is not an api.Pod!", podObj) + } + switch podPtr.CurrentState.Status { + case api.PodRunning, api.PodStopped: + return pod, nil + default: + time.Sleep(storage.podPollPeriod) + } + } + return pod, nil +} diff --git a/pkg/registry/pod_registry_test.go b/pkg/registry/pod_registry_test.go index e2e06f42cad..42505ca876c 100644 --- a/pkg/registry/pod_registry_test.go +++ b/pkg/registry/pod_registry_test.go @@ -20,10 +20,12 @@ import ( "fmt" "reflect" "testing" + "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/scheduler" ) func expectNoError(t *testing.T, err error) { @@ -152,7 +154,7 @@ func TestGetPodCloud(t *testing.T) { func TestMakePodStatus(t *testing.T) { status := makePodStatus(map[string]interface{}{}) - if status != "Pending" { + if status != api.PodPending { t.Errorf("Expected 'Pending', got '%s'", status) } @@ -162,7 +164,7 @@ func TestMakePodStatus(t *testing.T) { }, }) - if status != "Stopped" { + if status != api.PodStopped { t.Errorf("Expected 'Stopped', got '%s'", status) } @@ -172,7 +174,47 @@ func TestMakePodStatus(t *testing.T) { }, }) - if status != "Running" { + if status != api.PodRunning { t.Errorf("Expected 'Running', got '%s'", status) } } + +func TestCreatePod(t *testing.T) { + mockRegistry := MockPodRegistry{ + pod: &api.Pod{ + JSONBase: api.JSONBase{ID: "foo"}, + CurrentState: api.PodState{ + Status: api.PodPending, + }, + }, + } + storage := PodRegistryStorage{ + registry: &mockRegistry, + podPollPeriod: time.Millisecond * 100, + scheduler: scheduler.MakeRoundRobinScheduler(), + minionLister: MakeMinionRegistry([]string{"machine"}), + } + pod := api.Pod{ + JSONBase: api.JSONBase{ID: "foo"}, + } + channel, err := storage.Create(pod) + expectNoError(t, err) + select { + case <-time.After(time.Millisecond * 100): + // Do nothing, this is expected. + case <-channel: + t.Error("Unexpected read from async channel") + } + mockRegistry.UpdatePod(api.Pod{ + JSONBase: api.JSONBase{ID: "foo"}, + CurrentState: api.PodState{ + Status: api.PodRunning, + }, + }) + select { + case <-time.After(time.Second * 1): + t.Error("Unexpected timeout") + case <-channel: + // Do nothing, this is expected. + } +}