Cleanup handling of config channels in RunSyncLoop by passing a map

instead of a bunch of vars.
This commit is contained in:
Justin Huff 2014-06-21 14:20:35 -07:00
parent dc35a98ebe
commit 1441a84673
4 changed files with 75 additions and 72 deletions

View File

@ -77,31 +77,39 @@ type Kubelet struct {
pullLock sync.Mutex pullLock sync.Mutex
} }
type manifestUpdate struct {
source string
manifests []api.ContainerManifest
}
const (
fileSource = "file"
etcdSource = "etcd"
httpClientSource = "http_client"
httpServerSource = "http_server"
)
// Starts background goroutines. If file, manifest_url, or address are empty, // Starts background goroutines. If file, manifest_url, or address are empty,
// they are not watched. Never returns. // they are not watched. Never returns.
func (kl *Kubelet) RunKubelet(file, manifest_url, etcd_servers, address string, port uint) { func (kl *Kubelet) RunKubelet(file, manifest_url, etcd_servers, address string, port uint) {
fileChannel := make(chan []api.ContainerManifest) updateChannel := make(chan manifestUpdate)
etcdChannel := make(chan []api.ContainerManifest)
httpChannel := make(chan []api.ContainerManifest)
serverChannel := make(chan []api.ContainerManifest)
if file != "" { if file != "" {
go util.Forever(func() { kl.WatchFile(file, fileChannel) }, 20*time.Second) go util.Forever(func() { kl.WatchFile(file, updateChannel) }, 20*time.Second)
} }
if manifest_url != "" { if manifest_url != "" {
go util.Forever(func() { kl.WatchHTTP(manifest_url, httpChannel) }, 20*time.Second) go util.Forever(func() { kl.WatchHTTP(manifest_url, updateChannel) }, 20*time.Second)
} }
if etcd_servers != "" { if etcd_servers != "" {
servers := []string{etcd_servers} servers := []string{etcd_servers}
log.Printf("Creating etcd client pointing to %v", servers) log.Printf("Creating etcd client pointing to %v", servers)
kl.Client = etcd.NewClient(servers) kl.Client = etcd.NewClient(servers)
go util.Forever(func() { kl.SyncAndSetupEtcdWatch(etcdChannel) }, 20*time.Second) go util.Forever(func() { kl.SyncAndSetupEtcdWatch(updateChannel) }, 20*time.Second)
} }
if address != "" { if address != "" {
log.Printf("Starting to listen on %s:%d", address, port) log.Printf("Starting to listen on %s:%d", address, port)
handler := KubeletServer{ handler := KubeletServer{
Kubelet: kl, Kubelet: kl,
UpdateChannel: serverChannel, UpdateChannel: updateChannel,
} }
s := &http.Server{ s := &http.Server{
// TODO: This is broken if address is an ipv6 address. // TODO: This is broken if address is an ipv6 address.
@ -113,7 +121,7 @@ func (kl *Kubelet) RunKubelet(file, manifest_url, etcd_servers, address string,
} }
go util.Forever(func() { s.ListenAndServe() }, 0) go util.Forever(func() { s.ListenAndServe() }, 0)
} }
kl.RunSyncLoop(etcdChannel, fileChannel, serverChannel, httpChannel, kl) kl.RunSyncLoop(updateChannel, kl)
} }
// Interface implemented by Kubelet, for testability // Interface implemented by Kubelet, for testability
@ -413,23 +421,22 @@ func (kl *Kubelet) extractFromDir(name string) ([]api.ContainerManifest, error)
return manifests, nil return manifests, nil
} }
func (kl *Kubelet) extractMultipleFromReader(reader io.Reader, changeChannel chan<- []api.ContainerManifest) error { func (kl *Kubelet) extractMultipleFromReader(reader io.Reader) ([]api.ContainerManifest, error) {
var manifests []api.ContainerManifest var manifests []api.ContainerManifest
data, err := ioutil.ReadAll(reader) data, err := ioutil.ReadAll(reader)
if err != nil { if err != nil {
log.Printf("Couldn't read from reader: %v", err) log.Printf("Couldn't read from reader: %v", err)
return err return manifests, err
} }
if err = kl.ExtractYAMLData(data, &manifests); err != nil { if err = kl.ExtractYAMLData(data, &manifests); err != nil {
return err return manifests, err
} }
changeChannel <- manifests return manifests, nil
return nil
} }
// Watch a file for changes to the set of pods that should run on this Kubelet // Watch a file for changes to the set of pods that should run on this Kubelet
// This function loops forever and is intended to be run as a goroutine // This function loops forever and is intended to be run as a goroutine
func (kl *Kubelet) WatchFile(file string, changeChannel chan<- []api.ContainerManifest) { func (kl *Kubelet) WatchFile(file string, updateChannel chan<- manifestUpdate) {
for { for {
var err error var err error
@ -446,19 +453,19 @@ func (kl *Kubelet) WatchFile(file string, changeChannel chan<- []api.ContainerMa
log.Printf("Error polling dir: %#v", err) log.Printf("Error polling dir: %#v", err)
continue continue
} }
changeChannel <- manifests updateChannel <- manifestUpdate{fileSource, manifests}
} else { } else {
manifest, err := kl.extractFromFile(file) manifest, err := kl.extractFromFile(file)
if err != nil { if err != nil {
log.Printf("Error polling file: %#v", err) log.Printf("Error polling file: %#v", err)
continue continue
} }
changeChannel <- []api.ContainerManifest{manifest} updateChannel <- manifestUpdate{fileSource, []api.ContainerManifest{manifest}}
} }
} }
} }
func (kl *Kubelet) extractFromHTTP(url string, changeChannel chan<- []api.ContainerManifest) error { func (kl *Kubelet) extractFromHTTP(url string, updateChannel chan<- manifestUpdate) error {
client := &http.Client{} client := &http.Client{}
request, err := http.NewRequest("GET", url, nil) request, err := http.NewRequest("GET", url, nil)
if err != nil { if err != nil {
@ -469,16 +476,21 @@ func (kl *Kubelet) extractFromHTTP(url string, changeChannel chan<- []api.Contai
return err return err
} }
defer response.Body.Close() defer response.Body.Close()
return kl.extractMultipleFromReader(response.Body, changeChannel) manifests, err := kl.extractMultipleFromReader(response.Body)
if err != nil {
return err
}
updateChannel <- manifestUpdate{httpClientSource, manifests}
return nil
} }
// Watch an HTTP endpoint for changes to the set of pods that should run on this Kubelet // Watch an HTTP endpoint for changes to the set of pods that should run on this Kubelet
// This function runs forever and is intended to be run as a goroutine // This function runs forever and is intended to be run as a goroutine
func (kl *Kubelet) WatchHTTP(url string, changeChannel chan<- []api.ContainerManifest) { func (kl *Kubelet) WatchHTTP(url string, updateChannel chan<- manifestUpdate) {
for { for {
var err error var err error
time.Sleep(kl.HTTPCheckFrequency) time.Sleep(kl.HTTPCheckFrequency)
err = kl.extractFromHTTP(url, changeChannel) err = kl.extractFromHTTP(url, updateChannel)
if err != nil { if err != nil {
log.Printf("Error syncing http: %#v", err) log.Printf("Error syncing http: %#v", err)
} }
@ -496,7 +508,7 @@ func (kl *Kubelet) ResponseToManifests(response *etcd.Response) ([]api.Container
return manifests, err return manifests, err
} }
func (kl *Kubelet) getKubeletStateFromEtcd(key string, changeChannel chan<- []api.ContainerManifest) error { func (kl *Kubelet) getKubeletStateFromEtcd(key string, updateChannel chan<- manifestUpdate) error {
response, err := kl.Client.Get(key+"/kubelet", true, false) response, err := kl.Client.Get(key+"/kubelet", true, false)
if err != nil { if err != nil {
log.Printf("Error on get on %s: %#v", key, err) log.Printf("Error on get on %s: %#v", key, err)
@ -515,18 +527,18 @@ func (kl *Kubelet) getKubeletStateFromEtcd(key string, changeChannel chan<- []ap
return err return err
} }
log.Printf("Got initial state from etcd: %+v", manifests) log.Printf("Got initial state from etcd: %+v", manifests)
changeChannel <- manifests updateChannel <- manifestUpdate{etcdSource, manifests}
return nil return nil
} }
// Sync with etcd, and set up an etcd watch for new configurations // Sync with etcd, and set up an etcd watch for new configurations
// The channel to send new configurations across // The channel to send new configurations across
// This function loops forever and is intended to be run in a go routine. // This function loops forever and is intended to be run in a go routine.
func (kl *Kubelet) SyncAndSetupEtcdWatch(changeChannel chan<- []api.ContainerManifest) { func (kl *Kubelet) SyncAndSetupEtcdWatch(updateChannel chan<- manifestUpdate) {
key := "/registry/hosts/" + strings.TrimSpace(kl.Hostname) key := "/registry/hosts/" + strings.TrimSpace(kl.Hostname)
// First fetch the initial configuration (watch only gives changes...) // First fetch the initial configuration (watch only gives changes...)
for { for {
err := kl.getKubeletStateFromEtcd(key, changeChannel) err := kl.getKubeletStateFromEtcd(key, updateChannel)
if err == nil { if err == nil {
// We got a successful response, etcd is up, set up the watch. // We got a successful response, etcd is up, set up the watch.
break break
@ -542,9 +554,9 @@ func (kl *Kubelet) SyncAndSetupEtcdWatch(changeChannel chan<- []api.ContainerMan
watchChannel := make(chan *etcd.Response) watchChannel := make(chan *etcd.Response)
// We don't push this through Forever because if it dies, we just do it again in 30 secs. // We don't push this through Forever because if it dies, we just do it again in 30 secs.
// anyway. // anyway.
go kl.WatchEtcd(watchChannel, changeChannel) go kl.WatchEtcd(watchChannel, updateChannel)
kl.getKubeletStateFromEtcd(key, changeChannel) kl.getKubeletStateFromEtcd(key, updateChannel)
log.Printf("Setting up a watch for configuration changes in etcd for %s", key) log.Printf("Setting up a watch for configuration changes in etcd for %s", key)
kl.Client.Watch(key, 0, true, watchChannel, done) kl.Client.Watch(key, 0, true, watchChannel, done)
} }
@ -579,7 +591,7 @@ func (kl *Kubelet) extractFromEtcd(response *etcd.Response) ([]api.ContainerMani
// Watch etcd for changes, receives config objects from the etcd client watch. // Watch etcd for changes, receives config objects from the etcd client watch.
// This function loops until the watchChannel is closed, and is intended to be run as a goroutine. // This function loops until the watchChannel is closed, and is intended to be run as a goroutine.
func (kl *Kubelet) WatchEtcd(watchChannel <-chan *etcd.Response, changeChannel chan<- []api.ContainerManifest) { func (kl *Kubelet) WatchEtcd(watchChannel <-chan *etcd.Response, updateChannel chan<- manifestUpdate) {
defer util.HandleCrash() defer util.HandleCrash()
for { for {
watchResponse := <-watchChannel watchResponse := <-watchChannel
@ -596,7 +608,7 @@ func (kl *Kubelet) WatchEtcd(watchChannel <-chan *etcd.Response, changeChannel c
log.Printf("manifests: %#v", manifests) log.Printf("manifests: %#v", manifests)
// Ok, we have a valid configuration, send to channel for // Ok, we have a valid configuration, send to channel for
// rejiggering. // rejiggering.
changeChannel <- manifests updateChannel <- manifestUpdate{etcdSource, manifests}
} }
} }
@ -710,29 +722,20 @@ func (kl *Kubelet) SyncManifests(config []api.ContainerManifest) error {
// no changes are seen to the configuration, will synchronize the last known desired // no changes are seen to the configuration, will synchronize the last known desired
// state every sync_frequency seconds. // state every sync_frequency seconds.
// Never returns. // Never returns.
func (kl *Kubelet) RunSyncLoop(etcdChannel, fileChannel, serverChannel, httpChannel <-chan []api.ContainerManifest, handler SyncHandler) { func (kl *Kubelet) RunSyncLoop(updateChannel <-chan manifestUpdate, handler SyncHandler) {
var lastFile, lastEtcd, lastHttp, lastServer []api.ContainerManifest last := make(map[string][]api.ContainerManifest)
for { for {
select { select {
case manifests := <-fileChannel: case u := <-updateChannel:
log.Printf("Got new configuration from file/dir... %v", manifests) log.Printf("Got configuration from %s: %#v", u.source, u.manifests)
lastFile = manifests last[u.source] = u.manifests
case manifests := <-etcdChannel:
log.Printf("Got new configuration from etcd... %v", manifests)
lastEtcd = manifests
case manifests := <-httpChannel:
log.Printf("Got new configuration from external http... %v", manifests)
lastHttp = manifests
case manifests := <-serverChannel:
log.Printf("Got new configuration from our server... %v", manifests)
lastServer = manifests
case <-time.After(kl.SyncFrequency): case <-time.After(kl.SyncFrequency):
} }
manifests := append([]api.ContainerManifest{}, lastFile...) manifests := []api.ContainerManifest{}
manifests = append(manifests, lastEtcd...) for _, m := range last {
manifests = append(manifests, lastHttp...) manifests = append(manifests, m...)
manifests = append(manifests, lastServer...) }
err := handler.SyncManifests(manifests) err := handler.SyncManifests(manifests)
if err != nil { if err != nil {

View File

@ -29,7 +29,7 @@ import (
type KubeletServer struct { type KubeletServer struct {
Kubelet kubeletInterface Kubelet kubeletInterface
UpdateChannel chan []api.ContainerManifest UpdateChannel chan manifestUpdate
} }
// kubeletInterface contains all the kubelet methods required by the server. // kubeletInterface contains all the kubelet methods required by the server.
@ -67,7 +67,7 @@ func (s *KubeletServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
s.error(w, err) s.error(w, err)
return return
} }
s.UpdateChannel <- []api.ContainerManifest{manifest} s.UpdateChannel <- manifestUpdate{httpServerSource, []api.ContainerManifest{manifest}}
} else if u.Path == "/containers" { } else if u.Path == "/containers" {
var manifests []api.ContainerManifest var manifests []api.ContainerManifest
err = yaml.Unmarshal(data, &manifests) err = yaml.Unmarshal(data, &manifests)
@ -75,7 +75,7 @@ func (s *KubeletServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
s.error(w, err) s.error(w, err)
return return
} }
s.UpdateChannel <- manifests s.UpdateChannel <- manifestUpdate{httpServerSource, manifests}
} }
case u.Path == "/containerStats": case u.Path == "/containerStats":
container := u.Query().Get("container") container := u.Query().Get("container")

View File

@ -33,7 +33,7 @@ func (fk *fakeKubelet) GetContainerStats(name string) (*api.ContainerStats, erro
} }
type serverTestFramework struct { type serverTestFramework struct {
updateChan chan []api.ContainerManifest updateChan chan manifestUpdate
updateReader *channelReader updateReader *channelReader
serverUnderTest *KubeletServer serverUnderTest *KubeletServer
fakeKubelet *fakeKubelet fakeKubelet *fakeKubelet
@ -42,7 +42,7 @@ type serverTestFramework struct {
func makeServerTest() *serverTestFramework { func makeServerTest() *serverTestFramework {
fw := &serverTestFramework{ fw := &serverTestFramework{
updateChan: make(chan []api.ContainerManifest), updateChan: make(chan manifestUpdate),
} }
fw.updateReader = startReading(fw.updateChan) fw.updateReader = startReading(fw.updateChan)
fw.fakeKubelet = &fakeKubelet{} fw.fakeKubelet = &fakeKubelet{}

View File

@ -375,16 +375,16 @@ type channelReader struct {
wg sync.WaitGroup wg sync.WaitGroup
} }
func startReading(channel <-chan []api.ContainerManifest) *channelReader { func startReading(channel <-chan manifestUpdate) *channelReader {
cr := &channelReader{} cr := &channelReader{}
cr.wg.Add(1) cr.wg.Add(1)
go func() { go func() {
for { for {
containers, ok := <-channel update, ok := <-channel
if !ok { if !ok {
break break
} }
cr.list = append(cr.list, containers) cr.list = append(cr.list, update.manifests)
} }
cr.wg.Done() cr.wg.Done()
}() }()
@ -401,7 +401,7 @@ func TestGetKubeletStateFromEtcdNoData(t *testing.T) {
kubelet := Kubelet{ kubelet := Kubelet{
Client: fakeClient, Client: fakeClient,
} }
channel := make(chan []api.ContainerManifest) channel := make(chan manifestUpdate)
reader := startReading(channel) reader := startReading(channel)
fakeClient.Data["/registry/hosts/machine/kubelet"] = util.EtcdResponseWithError{ fakeClient.Data["/registry/hosts/machine/kubelet"] = util.EtcdResponseWithError{
R: &etcd.Response{}, R: &etcd.Response{},
@ -423,7 +423,7 @@ func TestGetKubeletStateFromEtcd(t *testing.T) {
kubelet := Kubelet{ kubelet := Kubelet{
Client: fakeClient, Client: fakeClient,
} }
channel := make(chan []api.ContainerManifest) channel := make(chan manifestUpdate)
reader := startReading(channel) reader := startReading(channel)
fakeClient.Data["/registry/hosts/machine/kubelet"] = util.EtcdResponseWithError{ fakeClient.Data["/registry/hosts/machine/kubelet"] = util.EtcdResponseWithError{
R: &etcd.Response{ R: &etcd.Response{
@ -447,7 +447,7 @@ func TestGetKubeletStateFromEtcdNotFound(t *testing.T) {
kubelet := Kubelet{ kubelet := Kubelet{
Client: fakeClient, Client: fakeClient,
} }
channel := make(chan []api.ContainerManifest) channel := make(chan manifestUpdate)
reader := startReading(channel) reader := startReading(channel)
fakeClient.Data["/registry/hosts/machine/kubelet"] = util.EtcdResponseWithError{ fakeClient.Data["/registry/hosts/machine/kubelet"] = util.EtcdResponseWithError{
R: &etcd.Response{}, R: &etcd.Response{},
@ -469,7 +469,7 @@ func TestGetKubeletStateFromEtcdError(t *testing.T) {
kubelet := Kubelet{ kubelet := Kubelet{
Client: fakeClient, Client: fakeClient,
} }
channel := make(chan []api.ContainerManifest) channel := make(chan manifestUpdate)
reader := startReading(channel) reader := startReading(channel)
fakeClient.Data["/registry/hosts/machine/kubelet"] = util.EtcdResponseWithError{ fakeClient.Data["/registry/hosts/machine/kubelet"] = util.EtcdResponseWithError{
R: &etcd.Response{}, R: &etcd.Response{},
@ -811,14 +811,14 @@ func TestExtractFromDir(t *testing.T) {
func TestExtractFromHttpBadness(t *testing.T) { func TestExtractFromHttpBadness(t *testing.T) {
kubelet := Kubelet{} kubelet := Kubelet{}
changeChannel := make(chan []api.ContainerManifest) updateChannel := make(chan manifestUpdate)
reader := startReading(changeChannel) reader := startReading(updateChannel)
err := kubelet.extractFromHTTP("http://localhost:12345", changeChannel) err := kubelet.extractFromHTTP("http://localhost:12345", updateChannel)
if err == nil { if err == nil {
t.Error("Unexpected non-error.") t.Error("Unexpected non-error.")
} }
close(changeChannel) close(updateChannel)
list := reader.GetList() list := reader.GetList()
if len(list) != 0 { if len(list) != 0 {
@ -828,8 +828,8 @@ func TestExtractFromHttpBadness(t *testing.T) {
func TestExtractFromHttp(t *testing.T) { func TestExtractFromHttp(t *testing.T) {
kubelet := Kubelet{} kubelet := Kubelet{}
changeChannel := make(chan []api.ContainerManifest) updateChannel := make(chan manifestUpdate)
reader := startReading(changeChannel) reader := startReading(updateChannel)
manifests := []api.ContainerManifest{ manifests := []api.ContainerManifest{
{Id: "foo"}, {Id: "foo"},
@ -842,11 +842,11 @@ func TestExtractFromHttp(t *testing.T) {
} }
testServer := httptest.NewServer(&fakeHandler) testServer := httptest.NewServer(&fakeHandler)
err = kubelet.extractFromHTTP(testServer.URL, changeChannel) err = kubelet.extractFromHTTP(testServer.URL, updateChannel)
if err != nil { if err != nil {
t.Errorf("Unexpected error: %#v", err) t.Errorf("Unexpected error: %#v", err)
} }
close(changeChannel) close(updateChannel)
read := reader.GetList() read := reader.GetList()
@ -860,9 +860,9 @@ func TestExtractFromHttp(t *testing.T) {
func TestWatchEtcd(t *testing.T) { func TestWatchEtcd(t *testing.T) {
watchChannel := make(chan *etcd.Response) watchChannel := make(chan *etcd.Response)
changeChannel := make(chan []api.ContainerManifest) updateChannel := make(chan manifestUpdate)
kubelet := Kubelet{} kubelet := Kubelet{}
reader := startReading(changeChannel) reader := startReading(updateChannel)
manifest := []api.ContainerManifest{ manifest := []api.ContainerManifest{
{ {
@ -872,7 +872,7 @@ func TestWatchEtcd(t *testing.T) {
data, err := json.Marshal(manifest) data, err := json.Marshal(manifest)
expectNoError(t, err) expectNoError(t, err)
go kubelet.WatchEtcd(watchChannel, changeChannel) go kubelet.WatchEtcd(watchChannel, updateChannel)
watchChannel <- &etcd.Response{ watchChannel <- &etcd.Response{
Node: &etcd.Node{ Node: &etcd.Node{
@ -880,7 +880,7 @@ func TestWatchEtcd(t *testing.T) {
}, },
} }
close(watchChannel) close(watchChannel)
close(changeChannel) close(updateChannel)
read := reader.GetList() read := reader.GetList()
if len(read) != 1 || if len(read) != 1 ||