diff --git a/pkg/controller/replication_controller.go b/pkg/controller/replication_controller.go index 552c836c433..90479489f01 100644 --- a/pkg/controller/replication_controller.go +++ b/pkg/controller/replication_controller.go @@ -22,7 +22,6 @@ import ( "log" "math/rand" "strings" - "sync" "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" @@ -38,7 +37,7 @@ type ReplicationManager struct { etcdClient util.EtcdClient kubeClient client.ClientInterface podControl PodControlInterface - updateLock sync.Mutex + syncTime <-chan time.Time } // An interface that knows how to add or delete pods @@ -86,7 +85,7 @@ func MakeReplicationManager(etcdClient util.EtcdClient, kubeClient client.Client // Begin watching and syncing. func (rm *ReplicationManager) Run(period time.Duration) { - go util.Forever(func() { rm.synchronize() }, period) + rm.syncTime = time.Tick(period) go util.Forever(func() { rm.watchControllers() }, period) } @@ -101,23 +100,27 @@ func (rm *ReplicationManager) watchControllers() { }() for { - watchResponse, ok := <-watchChannel - if !ok { - // watchChannel has been closed. Let the util.Forever() that - // called us call us again. - return + select { + case <-rm.syncTime: + rm.synchronize() + case watchResponse, ok := <-watchChannel: + if !ok { + // watchChannel has been closed. Let the util.Forever() that + // called us call us again. + return + } + if watchResponse == nil { + time.Sleep(time.Second * 10) + continue + } + log.Printf("Got watch: %#v", watchResponse) + controller, err := rm.handleWatchResponse(watchResponse) + if err != nil { + log.Printf("Error handling data: %#v, %#v", err, watchResponse) + continue + } + rm.syncReplicationController(*controller) } - if watchResponse == nil { - time.Sleep(time.Second * 10) - continue - } - log.Printf("Got watch: %#v", watchResponse) - controller, err := rm.handleWatchResponse(watchResponse) - if err != nil { - log.Printf("Error handling data: %#v, %#v", err, watchResponse) - continue - } - rm.syncReplicationController(*controller) } } @@ -148,8 +151,6 @@ func (rm *ReplicationManager) filterActivePods(pods []api.Pod) []api.Pod { } func (rm *ReplicationManager) syncReplicationController(controllerSpec api.ReplicationController) error { - rm.updateLock.Lock() - defer rm.updateLock.Unlock() podList, err := rm.kubeClient.ListPods(controllerSpec.DesiredState.ReplicasInSet) if err != nil { return err @@ -173,30 +174,16 @@ func (rm *ReplicationManager) syncReplicationController(controllerSpec api.Repli } func (rm *ReplicationManager) synchronize() { - response, err := rm.etcdClient.Get("/registry/controllers", false, false) + var controllerSpecs []api.ReplicationController + helper := util.EtcdHelper{rm.etcdClient} + err := helper.ExtractList("/registry/controllers", &controllerSpecs) if err != nil { - log.Printf("Synchronization error %#v", err) + log.Printf("Synchronization error: %v (%#v)", err, err) } - // TODO(bburns): There is a race here, if we get a version of the controllers, and then it is - // updated, its possible that the watch will pick up the change first, and then we will execute - // using the old version of the controller. - // Probably the correct thing to do is to use the version number in etcd to detect when - // we are stale. - // Punting on this for now, but this could lead to some nasty bugs, so we should really fix it - // sooner rather than later. - if response != nil && response.Node != nil && response.Node.Nodes != nil { - for _, value := range response.Node.Nodes { - var controllerSpec api.ReplicationController - err := json.Unmarshal([]byte(value.Value), &controllerSpec) - if err != nil { - log.Printf("Unexpected error: %#v", err) - continue - } - log.Printf("Synchronizing %s\n", controllerSpec.ID) - err = rm.syncReplicationController(controllerSpec) - if err != nil { - log.Printf("Error synchronizing: %#v", err) - } + for _, controllerSpec := range controllerSpecs { + err = rm.syncReplicationController(controllerSpec) + if err != nil { + log.Printf("Error synchronizing: %#v", err) } } }