From 1441a84673bfc62d24c2598c0f6e124ddf904035 Mon Sep 17 00:00:00 2001 From: Justin Huff Date: Sat, 21 Jun 2014 14:20:35 -0700 Subject: [PATCH] Cleanup handling of config channels in RunSyncLoop by passing a map instead of a bunch of vars. --- pkg/kubelet/kubelet.go | 99 +++++++++++++++--------------- pkg/kubelet/kubelet_server.go | 6 +- pkg/kubelet/kubelet_server_test.go | 4 +- pkg/kubelet/kubelet_test.go | 38 ++++++------ 4 files changed, 75 insertions(+), 72 deletions(-) diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 05b8cafcd44..f7ac1d206c8 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -77,31 +77,39 @@ type Kubelet struct { pullLock sync.Mutex } +type manifestUpdate struct { + source string + manifests []api.ContainerManifest +} + +const ( + fileSource = "file" + etcdSource = "etcd" + httpClientSource = "http_client" + httpServerSource = "http_server" +) + // Starts background goroutines. If file, manifest_url, or address are empty, // they are not watched. Never returns. func (kl *Kubelet) RunKubelet(file, manifest_url, etcd_servers, address string, port uint) { - fileChannel := make(chan []api.ContainerManifest) - etcdChannel := make(chan []api.ContainerManifest) - httpChannel := make(chan []api.ContainerManifest) - serverChannel := make(chan []api.ContainerManifest) - + updateChannel := make(chan manifestUpdate) if file != "" { - go util.Forever(func() { kl.WatchFile(file, fileChannel) }, 20*time.Second) + go util.Forever(func() { kl.WatchFile(file, updateChannel) }, 20*time.Second) } if manifest_url != "" { - go util.Forever(func() { kl.WatchHTTP(manifest_url, httpChannel) }, 20*time.Second) + go util.Forever(func() { kl.WatchHTTP(manifest_url, updateChannel) }, 20*time.Second) } if etcd_servers != "" { servers := []string{etcd_servers} log.Printf("Creating etcd client pointing to %v", servers) kl.Client = etcd.NewClient(servers) - go util.Forever(func() { kl.SyncAndSetupEtcdWatch(etcdChannel) }, 20*time.Second) + go util.Forever(func() { kl.SyncAndSetupEtcdWatch(updateChannel) }, 20*time.Second) } if address != "" { log.Printf("Starting to listen on %s:%d", address, port) handler := KubeletServer{ Kubelet: kl, - UpdateChannel: serverChannel, + UpdateChannel: updateChannel, } s := &http.Server{ // TODO: This is broken if address is an ipv6 address. @@ -113,7 +121,7 @@ func (kl *Kubelet) RunKubelet(file, manifest_url, etcd_servers, address string, } go util.Forever(func() { s.ListenAndServe() }, 0) } - kl.RunSyncLoop(etcdChannel, fileChannel, serverChannel, httpChannel, kl) + kl.RunSyncLoop(updateChannel, kl) } // Interface implemented by Kubelet, for testability @@ -413,23 +421,22 @@ func (kl *Kubelet) extractFromDir(name string) ([]api.ContainerManifest, error) return manifests, nil } -func (kl *Kubelet) extractMultipleFromReader(reader io.Reader, changeChannel chan<- []api.ContainerManifest) error { +func (kl *Kubelet) extractMultipleFromReader(reader io.Reader) ([]api.ContainerManifest, error) { var manifests []api.ContainerManifest data, err := ioutil.ReadAll(reader) if err != nil { log.Printf("Couldn't read from reader: %v", err) - return err + return manifests, err } if err = kl.ExtractYAMLData(data, &manifests); err != nil { - return err + return manifests, err } - changeChannel <- manifests - return nil + return manifests, nil } // Watch a file for changes to the set of pods that should run on this Kubelet // This function loops forever and is intended to be run as a goroutine -func (kl *Kubelet) WatchFile(file string, changeChannel chan<- []api.ContainerManifest) { +func (kl *Kubelet) WatchFile(file string, updateChannel chan<- manifestUpdate) { for { var err error @@ -446,19 +453,19 @@ func (kl *Kubelet) WatchFile(file string, changeChannel chan<- []api.ContainerMa log.Printf("Error polling dir: %#v", err) continue } - changeChannel <- manifests + updateChannel <- manifestUpdate{fileSource, manifests} } else { manifest, err := kl.extractFromFile(file) if err != nil { log.Printf("Error polling file: %#v", err) continue } - changeChannel <- []api.ContainerManifest{manifest} + updateChannel <- manifestUpdate{fileSource, []api.ContainerManifest{manifest}} } } } -func (kl *Kubelet) extractFromHTTP(url string, changeChannel chan<- []api.ContainerManifest) error { +func (kl *Kubelet) extractFromHTTP(url string, updateChannel chan<- manifestUpdate) error { client := &http.Client{} request, err := http.NewRequest("GET", url, nil) if err != nil { @@ -469,16 +476,21 @@ func (kl *Kubelet) extractFromHTTP(url string, changeChannel chan<- []api.Contai return err } defer response.Body.Close() - return kl.extractMultipleFromReader(response.Body, changeChannel) + manifests, err := kl.extractMultipleFromReader(response.Body) + if err != nil { + return err + } + updateChannel <- manifestUpdate{httpClientSource, manifests} + return nil } // Watch an HTTP endpoint for changes to the set of pods that should run on this Kubelet // This function runs forever and is intended to be run as a goroutine -func (kl *Kubelet) WatchHTTP(url string, changeChannel chan<- []api.ContainerManifest) { +func (kl *Kubelet) WatchHTTP(url string, updateChannel chan<- manifestUpdate) { for { var err error time.Sleep(kl.HTTPCheckFrequency) - err = kl.extractFromHTTP(url, changeChannel) + err = kl.extractFromHTTP(url, updateChannel) if err != nil { log.Printf("Error syncing http: %#v", err) } @@ -496,7 +508,7 @@ func (kl *Kubelet) ResponseToManifests(response *etcd.Response) ([]api.Container return manifests, err } -func (kl *Kubelet) getKubeletStateFromEtcd(key string, changeChannel chan<- []api.ContainerManifest) error { +func (kl *Kubelet) getKubeletStateFromEtcd(key string, updateChannel chan<- manifestUpdate) error { response, err := kl.Client.Get(key+"/kubelet", true, false) if err != nil { log.Printf("Error on get on %s: %#v", key, err) @@ -515,18 +527,18 @@ func (kl *Kubelet) getKubeletStateFromEtcd(key string, changeChannel chan<- []ap return err } log.Printf("Got initial state from etcd: %+v", manifests) - changeChannel <- manifests + updateChannel <- manifestUpdate{etcdSource, manifests} return nil } // Sync with etcd, and set up an etcd watch for new configurations // The channel to send new configurations across // This function loops forever and is intended to be run in a go routine. -func (kl *Kubelet) SyncAndSetupEtcdWatch(changeChannel chan<- []api.ContainerManifest) { +func (kl *Kubelet) SyncAndSetupEtcdWatch(updateChannel chan<- manifestUpdate) { key := "/registry/hosts/" + strings.TrimSpace(kl.Hostname) // First fetch the initial configuration (watch only gives changes...) for { - err := kl.getKubeletStateFromEtcd(key, changeChannel) + err := kl.getKubeletStateFromEtcd(key, updateChannel) if err == nil { // We got a successful response, etcd is up, set up the watch. break @@ -542,9 +554,9 @@ func (kl *Kubelet) SyncAndSetupEtcdWatch(changeChannel chan<- []api.ContainerMan watchChannel := make(chan *etcd.Response) // We don't push this through Forever because if it dies, we just do it again in 30 secs. // anyway. - go kl.WatchEtcd(watchChannel, changeChannel) + go kl.WatchEtcd(watchChannel, updateChannel) - kl.getKubeletStateFromEtcd(key, changeChannel) + kl.getKubeletStateFromEtcd(key, updateChannel) log.Printf("Setting up a watch for configuration changes in etcd for %s", key) kl.Client.Watch(key, 0, true, watchChannel, done) } @@ -579,7 +591,7 @@ func (kl *Kubelet) extractFromEtcd(response *etcd.Response) ([]api.ContainerMani // Watch etcd for changes, receives config objects from the etcd client watch. // This function loops until the watchChannel is closed, and is intended to be run as a goroutine. -func (kl *Kubelet) WatchEtcd(watchChannel <-chan *etcd.Response, changeChannel chan<- []api.ContainerManifest) { +func (kl *Kubelet) WatchEtcd(watchChannel <-chan *etcd.Response, updateChannel chan<- manifestUpdate) { defer util.HandleCrash() for { watchResponse := <-watchChannel @@ -596,7 +608,7 @@ func (kl *Kubelet) WatchEtcd(watchChannel <-chan *etcd.Response, changeChannel c log.Printf("manifests: %#v", manifests) // Ok, we have a valid configuration, send to channel for // rejiggering. - changeChannel <- manifests + updateChannel <- manifestUpdate{etcdSource, manifests} } } @@ -710,29 +722,20 @@ func (kl *Kubelet) SyncManifests(config []api.ContainerManifest) error { // no changes are seen to the configuration, will synchronize the last known desired // state every sync_frequency seconds. // Never returns. -func (kl *Kubelet) RunSyncLoop(etcdChannel, fileChannel, serverChannel, httpChannel <-chan []api.ContainerManifest, handler SyncHandler) { - var lastFile, lastEtcd, lastHttp, lastServer []api.ContainerManifest +func (kl *Kubelet) RunSyncLoop(updateChannel <-chan manifestUpdate, handler SyncHandler) { + last := make(map[string][]api.ContainerManifest) for { select { - case manifests := <-fileChannel: - log.Printf("Got new configuration from file/dir... %v", manifests) - lastFile = manifests - case manifests := <-etcdChannel: - log.Printf("Got new configuration from etcd... %v", manifests) - lastEtcd = manifests - case manifests := <-httpChannel: - log.Printf("Got new configuration from external http... %v", manifests) - lastHttp = manifests - case manifests := <-serverChannel: - log.Printf("Got new configuration from our server... %v", manifests) - lastServer = manifests + case u := <-updateChannel: + log.Printf("Got configuration from %s: %#v", u.source, u.manifests) + last[u.source] = u.manifests case <-time.After(kl.SyncFrequency): } - manifests := append([]api.ContainerManifest{}, lastFile...) - manifests = append(manifests, lastEtcd...) - manifests = append(manifests, lastHttp...) - manifests = append(manifests, lastServer...) + manifests := []api.ContainerManifest{} + for _, m := range last { + manifests = append(manifests, m...) + } err := handler.SyncManifests(manifests) if err != nil { diff --git a/pkg/kubelet/kubelet_server.go b/pkg/kubelet/kubelet_server.go index 34ae0c9bc42..cc8f9c69c05 100644 --- a/pkg/kubelet/kubelet_server.go +++ b/pkg/kubelet/kubelet_server.go @@ -29,7 +29,7 @@ import ( type KubeletServer struct { Kubelet kubeletInterface - UpdateChannel chan []api.ContainerManifest + UpdateChannel chan manifestUpdate } // kubeletInterface contains all the kubelet methods required by the server. @@ -67,7 +67,7 @@ func (s *KubeletServer) ServeHTTP(w http.ResponseWriter, req *http.Request) { s.error(w, err) return } - s.UpdateChannel <- []api.ContainerManifest{manifest} + s.UpdateChannel <- manifestUpdate{httpServerSource, []api.ContainerManifest{manifest}} } else if u.Path == "/containers" { var manifests []api.ContainerManifest err = yaml.Unmarshal(data, &manifests) @@ -75,7 +75,7 @@ func (s *KubeletServer) ServeHTTP(w http.ResponseWriter, req *http.Request) { s.error(w, err) return } - s.UpdateChannel <- manifests + s.UpdateChannel <- manifestUpdate{httpServerSource, manifests} } case u.Path == "/containerStats": container := u.Query().Get("container") diff --git a/pkg/kubelet/kubelet_server_test.go b/pkg/kubelet/kubelet_server_test.go index c8c026d1314..884f2b43707 100644 --- a/pkg/kubelet/kubelet_server_test.go +++ b/pkg/kubelet/kubelet_server_test.go @@ -33,7 +33,7 @@ func (fk *fakeKubelet) GetContainerStats(name string) (*api.ContainerStats, erro } type serverTestFramework struct { - updateChan chan []api.ContainerManifest + updateChan chan manifestUpdate updateReader *channelReader serverUnderTest *KubeletServer fakeKubelet *fakeKubelet @@ -42,7 +42,7 @@ type serverTestFramework struct { func makeServerTest() *serverTestFramework { fw := &serverTestFramework{ - updateChan: make(chan []api.ContainerManifest), + updateChan: make(chan manifestUpdate), } fw.updateReader = startReading(fw.updateChan) fw.fakeKubelet = &fakeKubelet{} diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index fd395640652..32b75ab4fe9 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -375,16 +375,16 @@ type channelReader struct { wg sync.WaitGroup } -func startReading(channel <-chan []api.ContainerManifest) *channelReader { +func startReading(channel <-chan manifestUpdate) *channelReader { cr := &channelReader{} cr.wg.Add(1) go func() { for { - containers, ok := <-channel + update, ok := <-channel if !ok { break } - cr.list = append(cr.list, containers) + cr.list = append(cr.list, update.manifests) } cr.wg.Done() }() @@ -401,7 +401,7 @@ func TestGetKubeletStateFromEtcdNoData(t *testing.T) { kubelet := Kubelet{ Client: fakeClient, } - channel := make(chan []api.ContainerManifest) + channel := make(chan manifestUpdate) reader := startReading(channel) fakeClient.Data["/registry/hosts/machine/kubelet"] = util.EtcdResponseWithError{ R: &etcd.Response{}, @@ -423,7 +423,7 @@ func TestGetKubeletStateFromEtcd(t *testing.T) { kubelet := Kubelet{ Client: fakeClient, } - channel := make(chan []api.ContainerManifest) + channel := make(chan manifestUpdate) reader := startReading(channel) fakeClient.Data["/registry/hosts/machine/kubelet"] = util.EtcdResponseWithError{ R: &etcd.Response{ @@ -447,7 +447,7 @@ func TestGetKubeletStateFromEtcdNotFound(t *testing.T) { kubelet := Kubelet{ Client: fakeClient, } - channel := make(chan []api.ContainerManifest) + channel := make(chan manifestUpdate) reader := startReading(channel) fakeClient.Data["/registry/hosts/machine/kubelet"] = util.EtcdResponseWithError{ R: &etcd.Response{}, @@ -469,7 +469,7 @@ func TestGetKubeletStateFromEtcdError(t *testing.T) { kubelet := Kubelet{ Client: fakeClient, } - channel := make(chan []api.ContainerManifest) + channel := make(chan manifestUpdate) reader := startReading(channel) fakeClient.Data["/registry/hosts/machine/kubelet"] = util.EtcdResponseWithError{ R: &etcd.Response{}, @@ -811,14 +811,14 @@ func TestExtractFromDir(t *testing.T) { func TestExtractFromHttpBadness(t *testing.T) { kubelet := Kubelet{} - changeChannel := make(chan []api.ContainerManifest) - reader := startReading(changeChannel) + updateChannel := make(chan manifestUpdate) + reader := startReading(updateChannel) - err := kubelet.extractFromHTTP("http://localhost:12345", changeChannel) + err := kubelet.extractFromHTTP("http://localhost:12345", updateChannel) if err == nil { t.Error("Unexpected non-error.") } - close(changeChannel) + close(updateChannel) list := reader.GetList() if len(list) != 0 { @@ -828,8 +828,8 @@ func TestExtractFromHttpBadness(t *testing.T) { func TestExtractFromHttp(t *testing.T) { kubelet := Kubelet{} - changeChannel := make(chan []api.ContainerManifest) - reader := startReading(changeChannel) + updateChannel := make(chan manifestUpdate) + reader := startReading(updateChannel) manifests := []api.ContainerManifest{ {Id: "foo"}, @@ -842,11 +842,11 @@ func TestExtractFromHttp(t *testing.T) { } testServer := httptest.NewServer(&fakeHandler) - err = kubelet.extractFromHTTP(testServer.URL, changeChannel) + err = kubelet.extractFromHTTP(testServer.URL, updateChannel) if err != nil { t.Errorf("Unexpected error: %#v", err) } - close(changeChannel) + close(updateChannel) read := reader.GetList() @@ -860,9 +860,9 @@ func TestExtractFromHttp(t *testing.T) { func TestWatchEtcd(t *testing.T) { watchChannel := make(chan *etcd.Response) - changeChannel := make(chan []api.ContainerManifest) + updateChannel := make(chan manifestUpdate) kubelet := Kubelet{} - reader := startReading(changeChannel) + reader := startReading(updateChannel) manifest := []api.ContainerManifest{ { @@ -872,7 +872,7 @@ func TestWatchEtcd(t *testing.T) { data, err := json.Marshal(manifest) expectNoError(t, err) - go kubelet.WatchEtcd(watchChannel, changeChannel) + go kubelet.WatchEtcd(watchChannel, updateChannel) watchChannel <- &etcd.Response{ Node: &etcd.Node{ @@ -880,7 +880,7 @@ func TestWatchEtcd(t *testing.T) { }, } close(watchChannel) - close(changeChannel) + close(updateChannel) read := reader.GetList() if len(read) != 1 ||