Implement sync behavior for controllers.

This commit is contained in:
Brendan Burns 2014-06-26 12:43:08 -07:00
parent ab308ad13a
commit a391b2ff03
4 changed files with 77 additions and 7 deletions

View File

@ -47,7 +47,7 @@ func (c *Client) Verb(verb string) *Request {
c: c,
path: "/api/v1beta1",
sync: true,
timeout: 10 * time.Second,
timeout: 20 * time.Second,
pollPeriod: 20 * time.Second,
}
}

View File

@ -77,7 +77,7 @@ func (m *Master) init(cloud cloudprovider.Interface) {
go podCache.Loop()
m.storage = map[string]apiserver.RESTStorage{
"pods": registry.MakePodRegistryStorage(m.podRegistry, containerInfo, registry.MakeFirstFitScheduler(m.minionRegistry, m.podRegistry, m.random), cloud, podCache),
"replicationControllers": registry.MakeControllerRegistryStorage(m.controllerRegistry),
"replicationControllers": registry.MakeControllerRegistryStorage(m.controllerRegistry, m.podRegistry),
"services": registry.MakeServiceRegistryStorage(m.serviceRegistry, cloud, m.minionRegistry),
"minions": registry.MakeMinionRegistryStorage(m.minionRegistry),
}

View File

@ -18,6 +18,7 @@ package registry
import (
"fmt"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver"
@ -26,12 +27,17 @@ import (
// Implementation of RESTStorage for the api server.
type ControllerRegistryStorage struct {
registry ControllerRegistry
registry ControllerRegistry
podRegistry PodRegistry
// Period in between polls when waiting for a controller to complete
pollPeriod time.Duration
}
func MakeControllerRegistryStorage(registry ControllerRegistry) apiserver.RESTStorage {
func MakeControllerRegistryStorage(registry ControllerRegistry, podRegistry PodRegistry) apiserver.RESTStorage {
return &ControllerRegistryStorage{
registry: registry,
registry: registry,
podRegistry: podRegistry,
pollPeriod: time.Second * 10,
}
}
@ -81,7 +87,7 @@ func (storage *ControllerRegistryStorage) Create(obj interface{}) (<-chan interf
if err != nil {
return nil, err
}
return storage.registry.GetController(controller.ID)
return storage.waitForController(controller)
}), nil
}
@ -98,6 +104,20 @@ func (storage *ControllerRegistryStorage) Update(obj interface{}) (<-chan interf
if err != nil {
return nil, err
}
return storage.registry.GetController(controller.ID)
return storage.waitForController(controller)
}), nil
}
func (storage *ControllerRegistryStorage) waitForController(ctrl api.ReplicationController) (interface{}, error) {
for {
pods, err := storage.podRegistry.ListPods(labels.Set(ctrl.DesiredState.ReplicaSelector).AsSelector())
if err != nil {
return ctrl, err
}
if len(pods) == ctrl.DesiredState.Replicas {
break
}
time.Sleep(storage.pollPeriod)
}
return ctrl, nil
}

View File

@ -22,6 +22,7 @@ import (
"io/ioutil"
"reflect"
"testing"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
@ -185,3 +186,52 @@ func TestControllerParsing(t *testing.T) {
t.Errorf("Parsing failed: %s %#v %#v", string(data), controller, expectedController)
}
}
func TestCreateController(t *testing.T) {
mockRegistry := MockControllerRegistry{}
mockPodRegistry := MockPodRegistry{
pods: []api.Pod{
{
JSONBase: api.JSONBase{ID: "foo"},
},
},
}
storage := ControllerRegistryStorage{
registry: &mockRegistry,
podRegistry: &mockPodRegistry,
pollPeriod: time.Millisecond * 1,
}
controller := api.ReplicationController{
JSONBase: api.JSONBase{ID: "test"},
DesiredState: api.ReplicationControllerState{
Replicas: 2,
},
}
channel, err := storage.Create(controller)
expectNoError(t, err)
select {
case <-time.After(time.Second * 1):
// Do nothing, this is expected.
case <-channel:
t.Error("Unexpected read from async channel")
}
mockPodRegistry.pods = []api.Pod{
{
JSONBase: api.JSONBase{ID: "foo"},
},
{
JSONBase: api.JSONBase{ID: "bar"},
},
}
time.Sleep(time.Millisecond * 30)
select {
case <-time.After(time.Second * 1):
t.Error("Unexpected timeout")
case <-channel:
// Do nothing, this is expected
}
}