Make individual controller actions asynchronous.

This commit is contained in:
Brendan Burns 2014-07-24 22:03:07 -07:00
parent cb28f25b1b
commit 51c5907c55
2 changed files with 21 additions and 2 deletions

View File

@ -19,6 +19,7 @@ package controller
import ( import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"sync"
"time" "time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
@ -182,15 +183,27 @@ func (rm *ReplicationManager) syncReplicationController(controllerSpec api.Repli
diff := len(filteredList) - controllerSpec.DesiredState.Replicas diff := len(filteredList) - controllerSpec.DesiredState.Replicas
if diff < 0 { if diff < 0 {
diff *= -1 diff *= -1
wait := sync.WaitGroup{}
wait.Add(diff)
glog.Infof("Too few replicas, creating %d\n", diff) glog.Infof("Too few replicas, creating %d\n", diff)
for i := 0; i < diff; i++ { for i := 0; i < diff; i++ {
go func() {
defer wait.Done()
rm.podControl.createReplica(controllerSpec) rm.podControl.createReplica(controllerSpec)
}()
} }
wait.Wait()
} else if diff > 0 { } else if diff > 0 {
glog.Infof("Too many replicas, deleting %d\n", diff) glog.Infof("Too many replicas, deleting %d\n", diff)
wait := sync.WaitGroup{}
wait.Add(diff)
for i := 0; i < diff; i++ { for i := 0; i < diff; i++ {
rm.podControl.deletePod(filteredList[i].ID) go func(ix int) {
defer wait.Done()
rm.podControl.deletePod(filteredList[ix].ID)
}(i)
} }
wait.Wait()
} }
return nil return nil
} }

View File

@ -21,6 +21,7 @@ import (
"fmt" "fmt"
"net/http/httptest" "net/http/httptest"
"reflect" "reflect"
"sync"
"testing" "testing"
"time" "time"
@ -41,13 +42,18 @@ func makeURL(suffix string) string {
type FakePodControl struct { type FakePodControl struct {
controllerSpec []api.ReplicationController controllerSpec []api.ReplicationController
deletePodID []string deletePodID []string
lock sync.Mutex
} }
func (f *FakePodControl) createReplica(spec api.ReplicationController) { func (f *FakePodControl) createReplica(spec api.ReplicationController) {
f.lock.Lock()
defer f.lock.Unlock()
f.controllerSpec = append(f.controllerSpec, spec) f.controllerSpec = append(f.controllerSpec, spec)
} }
func (f *FakePodControl) deletePod(podID string) error { func (f *FakePodControl) deletePod(podID string) error {
f.lock.Lock()
defer f.lock.Unlock()
f.deletePodID = append(f.deletePodID, podID) f.deletePodID = append(f.deletePodID, podID)
return nil return nil
} }