From 51c5907c554b02125fee490bec407286e1365dbe Mon Sep 17 00:00:00 2001 From: Brendan Burns Date: Thu, 24 Jul 2014 22:03:07 -0700 Subject: [PATCH 1/2] Make individual controller actions asynchronous. --- pkg/controller/replication_controller.go | 17 +++++++++++++++-- pkg/controller/replication_controller_test.go | 6 ++++++ 2 files changed, 21 insertions(+), 2 deletions(-) diff --git a/pkg/controller/replication_controller.go b/pkg/controller/replication_controller.go index b5d283e23a4..be23725b28d 100644 --- a/pkg/controller/replication_controller.go +++ b/pkg/controller/replication_controller.go @@ -19,6 +19,7 @@ package controller import ( "encoding/json" "fmt" + "sync" "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" @@ -182,15 +183,27 @@ func (rm *ReplicationManager) syncReplicationController(controllerSpec api.Repli diff := len(filteredList) - controllerSpec.DesiredState.Replicas if diff < 0 { diff *= -1 + wait := sync.WaitGroup{} + wait.Add(diff) glog.Infof("Too few replicas, creating %d\n", diff) for i := 0; i < diff; i++ { - rm.podControl.createReplica(controllerSpec) + go func() { + defer wait.Done() + rm.podControl.createReplica(controllerSpec) + }() } + wait.Wait() } else if diff > 0 { glog.Infof("Too many replicas, deleting %d\n", diff) + wait := sync.WaitGroup{} + wait.Add(diff) for i := 0; i < diff; i++ { - rm.podControl.deletePod(filteredList[i].ID) + go func(ix int) { + defer wait.Done() + rm.podControl.deletePod(filteredList[ix].ID) + }(i) } + wait.Wait() } return nil } diff --git a/pkg/controller/replication_controller_test.go b/pkg/controller/replication_controller_test.go index 95530b980c2..1d6b0e1cf5b 100644 --- a/pkg/controller/replication_controller_test.go +++ b/pkg/controller/replication_controller_test.go @@ -21,6 +21,7 @@ import ( "fmt" "net/http/httptest" "reflect" + "sync" "testing" "time" @@ -41,13 +42,18 @@ func makeURL(suffix string) string { type FakePodControl struct { controllerSpec []api.ReplicationController deletePodID []string + lock sync.Mutex } func (f *FakePodControl) createReplica(spec api.ReplicationController) { + f.lock.Lock() + defer f.lock.Unlock() f.controllerSpec = append(f.controllerSpec, spec) } func (f *FakePodControl) deletePod(podID string) error { + f.lock.Lock() + defer f.lock.Unlock() f.deletePodID = append(f.deletePodID, podID) return nil } From 1a3e4f8bafc0449358b9e61026d127ae78c95c4c Mon Sep 17 00:00:00 2001 From: Brendan Burns Date: Sat, 26 Jul 2014 22:00:34 -0700 Subject: [PATCH 2/2] Add some more synchronization. --- pkg/kubelet/fake_docker_client.go | 2 ++ pkg/registry/pod_registry.go | 20 ++++++++++++++------ 2 files changed, 16 insertions(+), 6 deletions(-) diff --git a/pkg/kubelet/fake_docker_client.go b/pkg/kubelet/fake_docker_client.go index b5cc3d55903..baaf58d090c 100644 --- a/pkg/kubelet/fake_docker_client.go +++ b/pkg/kubelet/fake_docker_client.go @@ -36,6 +36,8 @@ type FakeDockerClient struct { } func (f *FakeDockerClient) clearCalls() { + f.lock.Lock() + defer f.lock.Unlock() f.called = []string{} } diff --git a/pkg/registry/pod_registry.go b/pkg/registry/pod_registry.go index e358ea71853..53849f1956d 100644 --- a/pkg/registry/pod_registry.go +++ b/pkg/registry/pod_registry.go @@ -19,6 +19,7 @@ package registry import ( "fmt" "strings" + "sync" "time" "code.google.com/p/go-uuid/uuid" @@ -40,6 +41,7 @@ type PodRegistryStorage struct { minionLister scheduler.MinionLister cloud cloudprovider.Interface podPollPeriod time.Duration + lock sync.Mutex } // MakePodRegistryStorage makes a RESTStorage object for a pod registry. @@ -193,6 +195,17 @@ func (storage *PodRegistryStorage) Extract(body []byte) (interface{}, error) { return pod, err } +func (storage *PodRegistryStorage) scheduleAndCreatePod(pod api.Pod) error { + storage.lock.Lock() + defer storage.lock.Unlock() + // TODO(lavalamp): Separate scheduler more cleanly. + machine, err := storage.scheduler.Schedule(pod, storage.minionLister) + if err != nil { + return err + } + return storage.registry.CreatePod(machine, pod) +} + func (storage *PodRegistryStorage) Create(obj interface{}) (<-chan interface{}, error) { pod := obj.(api.Pod) if len(pod.ID) == 0 { @@ -201,12 +214,7 @@ func (storage *PodRegistryStorage) Create(obj interface{}) (<-chan interface{}, pod.DesiredState.Manifest.ID = pod.ID return apiserver.MakeAsync(func() (interface{}, error) { - // TODO(lavalamp): Separate scheduler more cleanly. - machine, err := storage.scheduler.Schedule(pod, storage.minionLister) - if err != nil { - return nil, err - } - err = storage.registry.CreatePod(machine, pod) + err := storage.scheduleAndCreatePod(pod) if err != nil { return nil, err }