diff --git a/pkg/client/request.go b/pkg/client/request.go index 16d5f7061d8..072285a0fe7 100644 --- a/pkg/client/request.go +++ b/pkg/client/request.go @@ -47,7 +47,7 @@ func (c *Client) Verb(verb string) *Request { c: c, path: "/api/v1beta1", sync: true, - timeout: 10 * time.Second, + timeout: 20 * time.Second, pollPeriod: 20 * time.Second, } } diff --git a/pkg/master/master.go b/pkg/master/master.go index 4cfdbf52546..1985fb33e04 100644 --- a/pkg/master/master.go +++ b/pkg/master/master.go @@ -77,7 +77,7 @@ func (m *Master) init(cloud cloudprovider.Interface) { go podCache.Loop() m.storage = map[string]apiserver.RESTStorage{ "pods": registry.MakePodRegistryStorage(m.podRegistry, containerInfo, registry.MakeFirstFitScheduler(m.minionRegistry, m.podRegistry, m.random), cloud, podCache), - "replicationControllers": registry.MakeControllerRegistryStorage(m.controllerRegistry), + "replicationControllers": registry.MakeControllerRegistryStorage(m.controllerRegistry, m.podRegistry), "services": registry.MakeServiceRegistryStorage(m.serviceRegistry, cloud, m.minionRegistry), "minions": registry.MakeMinionRegistryStorage(m.minionRegistry), } diff --git a/pkg/registry/controller_registry.go b/pkg/registry/controller_registry.go index 64fac8258d1..296ce9c756a 100644 --- a/pkg/registry/controller_registry.go +++ b/pkg/registry/controller_registry.go @@ -18,6 +18,7 @@ package registry import ( "fmt" + "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver" @@ -26,12 +27,17 @@ import ( // Implementation of RESTStorage for the api server. type ControllerRegistryStorage struct { - registry ControllerRegistry + registry ControllerRegistry + podRegistry PodRegistry + // Period in between polls when waiting for a controller to complete + pollPeriod time.Duration } -func MakeControllerRegistryStorage(registry ControllerRegistry) apiserver.RESTStorage { +func MakeControllerRegistryStorage(registry ControllerRegistry, podRegistry PodRegistry) apiserver.RESTStorage { return &ControllerRegistryStorage{ - registry: registry, + registry: registry, + podRegistry: podRegistry, + pollPeriod: time.Second * 10, } } @@ -81,7 +87,7 @@ func (storage *ControllerRegistryStorage) Create(obj interface{}) (<-chan interf if err != nil { return nil, err } - return storage.registry.GetController(controller.ID) + return storage.waitForController(controller) }), nil } @@ -98,6 +104,20 @@ func (storage *ControllerRegistryStorage) Update(obj interface{}) (<-chan interf if err != nil { return nil, err } - return storage.registry.GetController(controller.ID) + return storage.waitForController(controller) }), nil } + +func (storage *ControllerRegistryStorage) waitForController(ctrl api.ReplicationController) (interface{}, error) { + for { + pods, err := storage.podRegistry.ListPods(labels.Set(ctrl.DesiredState.ReplicaSelector).AsSelector()) + if err != nil { + return ctrl, err + } + if len(pods) == ctrl.DesiredState.Replicas { + break + } + time.Sleep(storage.pollPeriod) + } + return ctrl, nil +} diff --git a/pkg/registry/controller_registry_test.go b/pkg/registry/controller_registry_test.go index 801b7ef064f..693db2019c2 100644 --- a/pkg/registry/controller_registry_test.go +++ b/pkg/registry/controller_registry_test.go @@ -22,6 +22,7 @@ import ( "io/ioutil" "reflect" "testing" + "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" @@ -185,3 +186,52 @@ func TestControllerParsing(t *testing.T) { t.Errorf("Parsing failed: %s %#v %#v", string(data), controller, expectedController) } } + +func TestCreateController(t *testing.T) { + mockRegistry := MockControllerRegistry{} + mockPodRegistry := MockPodRegistry{ + pods: []api.Pod{ + { + JSONBase: api.JSONBase{ID: "foo"}, + }, + }, + } + storage := ControllerRegistryStorage{ + registry: &mockRegistry, + podRegistry: &mockPodRegistry, + pollPeriod: time.Millisecond * 1, + } + controller := api.ReplicationController{ + JSONBase: api.JSONBase{ID: "test"}, + DesiredState: api.ReplicationControllerState{ + Replicas: 2, + }, + } + channel, err := storage.Create(controller) + expectNoError(t, err) + + select { + case <-time.After(time.Second * 1): + // Do nothing, this is expected. + case <-channel: + t.Error("Unexpected read from async channel") + } + + mockPodRegistry.pods = []api.Pod{ + { + JSONBase: api.JSONBase{ID: "foo"}, + }, + { + JSONBase: api.JSONBase{ID: "bar"}, + }, + } + + time.Sleep(time.Millisecond * 30) + + select { + case <-time.After(time.Second * 1): + t.Error("Unexpected timeout") + case <-channel: + // Do nothing, this is expected + } +}