Add sync behavior to the pod registry. Expand tests.

This commit is contained in:
Brendan Burns 2014-06-27 17:43:36 -07:00
parent d75bd790d3
commit 13d7a5959a
6 changed files with 89 additions and 13 deletions

View File

@ -100,10 +100,18 @@ type JSONBase struct {
SelfLink string `json:"selfLink,omitempty" yaml:"selfLink,omitempty"`
}
type PodStatus string
const (
PodRunning PodStatus = "Running"
PodPending PodStatus = "Pending"
PodStopped PodStatus = "Stopped"
)
// PodState is the state of a pod, used as either input (desired state) or output (current state)
type PodState struct {
Manifest ContainerManifest `json:"manifest,omitempty" yaml:"manifest,omitempty"`
Status string `json:"status,omitempty" yaml:"status,omitempty"`
Status PodStatus `json:"status,omitempty" yaml:"status,omitempty"`
Host string `json:"host,omitempty" yaml:"host,omitempty"`
HostIP string `json:"hostIP,omitempty" yaml:"hostIP,omitempty"`
Info interface{} `json:"info,omitempty" yaml:"info,omitempty"`

View File

@ -20,7 +20,6 @@ import (
"encoding/json"
"fmt"
"math/rand"
"strings"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
@ -170,7 +169,7 @@ func (rm *ReplicationManager) handleWatchResponse(response *etcd.Response) (*api
func (rm *ReplicationManager) filterActivePods(pods []api.Pod) []api.Pod {
var result []api.Pod
for _, value := range pods {
if strings.Index(value.CurrentState.Status, "Exit") == -1 {
if api.PodStopped != value.CurrentState.Status {
result = append(result, value)
}
}

View File

@ -211,7 +211,7 @@ func TestCreateController(t *testing.T) {
expectNoError(t, err)
select {
case <-time.After(time.Second * 1):
case <-time.After(time.Millisecond * 100):
// Do nothing, this is expected.
case <-channel:
t.Error("Unexpected read from async channel")

View File

@ -66,8 +66,10 @@ func (registry *MockPodRegistry) CreatePod(machine string, pod api.Pod) error {
func (registry *MockPodRegistry) UpdatePod(pod api.Pod) error {
registry.Lock()
defer registry.Unlock()
registry.pod = &pod
return registry.err
}
func (registry *MockPodRegistry) DeletePod(podId string) error {
registry.Lock()
defer registry.Unlock()

View File

@ -19,6 +19,7 @@ package registry
import (
"fmt"
"strings"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver"
@ -37,6 +38,7 @@ type PodRegistryStorage struct {
scheduler scheduler.Scheduler
minionLister scheduler.MinionLister
cloud cloudprovider.Interface
podPollPeriod time.Duration
}
// MakePodRegistryStorage makes a RESTStorage object for a pod registry.
@ -60,6 +62,7 @@ func MakePodRegistryStorage(registry PodRegistry,
minionLister: minionLister,
cloud: cloud,
podCache: podCache,
podPollPeriod: time.Second * 10,
}
}
@ -85,17 +88,17 @@ func (storage *PodRegistryStorage) List(selector labels.Selector) (interface{},
return result, err
}
func makePodStatus(info interface{}) string {
func makePodStatus(info interface{}) api.PodStatus {
if state, ok := info.(map[string]interface{})["State"]; ok {
if running, ok := state.(map[string]interface{})["Running"]; ok {
if running.(bool) {
return "Running"
return api.PodRunning
} else {
return "Stopped"
return api.PodStopped
}
}
}
return "Pending"
return api.PodPending
}
func getInstanceIP(cloud cloudprovider.Interface, host string) string {
@ -167,7 +170,7 @@ func (storage *PodRegistryStorage) Create(obj interface{}) (<-chan interface{},
if err != nil {
return nil, err
}
return storage.registry.GetPod(pod.ID)
return storage.waitForPodRunning(pod)
}), nil
}
@ -182,6 +185,28 @@ func (storage *PodRegistryStorage) Update(obj interface{}) (<-chan interface{},
if err != nil {
return nil, err
}
return storage.registry.GetPod(pod.ID)
return storage.waitForPodRunning(pod)
}), nil
}
func (storage *PodRegistryStorage) waitForPodRunning(pod api.Pod) (interface{}, error) {
for {
podObj, err := storage.Get(pod.ID)
if err != nil || podObj == nil {
return nil, err
}
podPtr, ok := podObj.(*api.Pod)
if !ok {
// This should really never happen.
return nil, fmt.Errorf("Error %#v is not an api.Pod!", podObj)
}
switch podPtr.CurrentState.Status {
case api.PodRunning, api.PodStopped:
return pod, nil
default:
time.Sleep(storage.podPollPeriod)
}
}
return pod, nil
}

View File

@ -20,10 +20,12 @@ import (
"fmt"
"reflect"
"testing"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/scheduler"
)
func expectNoError(t *testing.T, err error) {
@ -152,7 +154,7 @@ func TestGetPodCloud(t *testing.T) {
func TestMakePodStatus(t *testing.T) {
status := makePodStatus(map[string]interface{}{})
if status != "Pending" {
if status != api.PodPending {
t.Errorf("Expected 'Pending', got '%s'", status)
}
@ -162,7 +164,7 @@ func TestMakePodStatus(t *testing.T) {
},
})
if status != "Stopped" {
if status != api.PodStopped {
t.Errorf("Expected 'Stopped', got '%s'", status)
}
@ -172,7 +174,47 @@ func TestMakePodStatus(t *testing.T) {
},
})
if status != "Running" {
if status != api.PodRunning {
t.Errorf("Expected 'Running', got '%s'", status)
}
}
func TestCreatePod(t *testing.T) {
mockRegistry := MockPodRegistry{
pod: &api.Pod{
JSONBase: api.JSONBase{ID: "foo"},
CurrentState: api.PodState{
Status: api.PodPending,
},
},
}
storage := PodRegistryStorage{
registry: &mockRegistry,
podPollPeriod: time.Millisecond * 100,
scheduler: scheduler.MakeRoundRobinScheduler(),
minionLister: MakeMinionRegistry([]string{"machine"}),
}
pod := api.Pod{
JSONBase: api.JSONBase{ID: "foo"},
}
channel, err := storage.Create(pod)
expectNoError(t, err)
select {
case <-time.After(time.Millisecond * 100):
// Do nothing, this is expected.
case <-channel:
t.Error("Unexpected read from async channel")
}
mockRegistry.UpdatePod(api.Pod{
JSONBase: api.JSONBase{ID: "foo"},
CurrentState: api.PodState{
Status: api.PodRunning,
},
})
select {
case <-time.After(time.Second * 1):
t.Error("Unexpected timeout")
case <-channel:
// Do nothing, this is expected.
}
}