This commit is contained in:
Daniel Smith 2014-06-17 17:56:18 -07:00
parent 7e464aa55c
commit 1b94f7b244

View File

@ -22,7 +22,6 @@ import (
"log" "log"
"math/rand" "math/rand"
"strings" "strings"
"sync"
"time" "time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
@ -38,7 +37,7 @@ type ReplicationManager struct {
etcdClient util.EtcdClient etcdClient util.EtcdClient
kubeClient client.ClientInterface kubeClient client.ClientInterface
podControl PodControlInterface podControl PodControlInterface
updateLock sync.Mutex syncTime <-chan time.Time
} }
// An interface that knows how to add or delete pods // An interface that knows how to add or delete pods
@ -86,7 +85,7 @@ func MakeReplicationManager(etcdClient util.EtcdClient, kubeClient client.Client
// Begin watching and syncing. // Begin watching and syncing.
func (rm *ReplicationManager) Run(period time.Duration) { func (rm *ReplicationManager) Run(period time.Duration) {
go util.Forever(func() { rm.synchronize() }, period) rm.syncTime = time.Tick(period)
go util.Forever(func() { rm.watchControllers() }, period) go util.Forever(func() { rm.watchControllers() }, period)
} }
@ -101,23 +100,27 @@ func (rm *ReplicationManager) watchControllers() {
}() }()
for { for {
watchResponse, ok := <-watchChannel select {
if !ok { case <-rm.syncTime:
// watchChannel has been closed. Let the util.Forever() that rm.synchronize()
// called us call us again. case watchResponse, ok := <-watchChannel:
return if !ok {
// watchChannel has been closed. Let the util.Forever() that
// called us call us again.
return
}
if watchResponse == nil {
time.Sleep(time.Second * 10)
continue
}
log.Printf("Got watch: %#v", watchResponse)
controller, err := rm.handleWatchResponse(watchResponse)
if err != nil {
log.Printf("Error handling data: %#v, %#v", err, watchResponse)
continue
}
rm.syncReplicationController(*controller)
} }
if watchResponse == nil {
time.Sleep(time.Second * 10)
continue
}
log.Printf("Got watch: %#v", watchResponse)
controller, err := rm.handleWatchResponse(watchResponse)
if err != nil {
log.Printf("Error handling data: %#v, %#v", err, watchResponse)
continue
}
rm.syncReplicationController(*controller)
} }
} }
@ -148,8 +151,6 @@ func (rm *ReplicationManager) filterActivePods(pods []api.Pod) []api.Pod {
} }
func (rm *ReplicationManager) syncReplicationController(controllerSpec api.ReplicationController) error { func (rm *ReplicationManager) syncReplicationController(controllerSpec api.ReplicationController) error {
rm.updateLock.Lock()
defer rm.updateLock.Unlock()
podList, err := rm.kubeClient.ListPods(controllerSpec.DesiredState.ReplicasInSet) podList, err := rm.kubeClient.ListPods(controllerSpec.DesiredState.ReplicasInSet)
if err != nil { if err != nil {
return err return err
@ -173,30 +174,16 @@ func (rm *ReplicationManager) syncReplicationController(controllerSpec api.Repli
} }
func (rm *ReplicationManager) synchronize() { func (rm *ReplicationManager) synchronize() {
response, err := rm.etcdClient.Get("/registry/controllers", false, false) var controllerSpecs []api.ReplicationController
helper := util.EtcdHelper{rm.etcdClient}
err := helper.ExtractList("/registry/controllers", &controllerSpecs)
if err != nil { if err != nil {
log.Printf("Synchronization error %#v", err) log.Printf("Synchronization error: %v (%#v)", err, err)
} }
// TODO(bburns): There is a race here, if we get a version of the controllers, and then it is for _, controllerSpec := range controllerSpecs {
// updated, its possible that the watch will pick up the change first, and then we will execute err = rm.syncReplicationController(controllerSpec)
// using the old version of the controller. if err != nil {
// Probably the correct thing to do is to use the version number in etcd to detect when log.Printf("Error synchronizing: %#v", err)
// we are stale.
// Punting on this for now, but this could lead to some nasty bugs, so we should really fix it
// sooner rather than later.
if response != nil && response.Node != nil && response.Node.Nodes != nil {
for _, value := range response.Node.Nodes {
var controllerSpec api.ReplicationController
err := json.Unmarshal([]byte(value.Value), &controllerSpec)
if err != nil {
log.Printf("Unexpected error: %#v", err)
continue
}
log.Printf("Synchronizing %s\n", controllerSpec.ID)
err = rm.syncReplicationController(controllerSpec)
if err != nil {
log.Printf("Error synchronizing: %#v", err)
}
} }
} }
} }