Merge pull request #201 from jjhuff/kubelet_config_cleanup

Cleanup handling of config channels in RunSyncLoop
This commit is contained in:
Daniel Smith 2014-06-22 11:02:58 -07:00
commit db3ffe6d2b
4 changed files with 75 additions and 72 deletions

View File

@ -77,31 +77,39 @@ type Kubelet struct {
pullLock sync.Mutex
}
type manifestUpdate struct {
source string
manifests []api.ContainerManifest
}
const (
fileSource = "file"
etcdSource = "etcd"
httpClientSource = "http_client"
httpServerSource = "http_server"
)
// Starts background goroutines. If file, manifest_url, or address are empty,
// they are not watched. Never returns.
func (kl *Kubelet) RunKubelet(file, manifest_url, etcd_servers, address string, port uint) {
fileChannel := make(chan []api.ContainerManifest)
etcdChannel := make(chan []api.ContainerManifest)
httpChannel := make(chan []api.ContainerManifest)
serverChannel := make(chan []api.ContainerManifest)
updateChannel := make(chan manifestUpdate)
if file != "" {
go util.Forever(func() { kl.WatchFile(file, fileChannel) }, 20*time.Second)
go util.Forever(func() { kl.WatchFile(file, updateChannel) }, 20*time.Second)
}
if manifest_url != "" {
go util.Forever(func() { kl.WatchHTTP(manifest_url, httpChannel) }, 20*time.Second)
go util.Forever(func() { kl.WatchHTTP(manifest_url, updateChannel) }, 20*time.Second)
}
if etcd_servers != "" {
servers := []string{etcd_servers}
log.Printf("Creating etcd client pointing to %v", servers)
kl.Client = etcd.NewClient(servers)
go util.Forever(func() { kl.SyncAndSetupEtcdWatch(etcdChannel) }, 20*time.Second)
go util.Forever(func() { kl.SyncAndSetupEtcdWatch(updateChannel) }, 20*time.Second)
}
if address != "" {
log.Printf("Starting to listen on %s:%d", address, port)
handler := KubeletServer{
Kubelet: kl,
UpdateChannel: serverChannel,
UpdateChannel: updateChannel,
}
s := &http.Server{
// TODO: This is broken if address is an ipv6 address.
@ -113,7 +121,7 @@ func (kl *Kubelet) RunKubelet(file, manifest_url, etcd_servers, address string,
}
go util.Forever(func() { s.ListenAndServe() }, 0)
}
kl.RunSyncLoop(etcdChannel, fileChannel, serverChannel, httpChannel, kl)
kl.RunSyncLoop(updateChannel, kl)
}
// Interface implemented by Kubelet, for testability
@ -413,23 +421,22 @@ func (kl *Kubelet) extractFromDir(name string) ([]api.ContainerManifest, error)
return manifests, nil
}
func (kl *Kubelet) extractMultipleFromReader(reader io.Reader, changeChannel chan<- []api.ContainerManifest) error {
func (kl *Kubelet) extractMultipleFromReader(reader io.Reader) ([]api.ContainerManifest, error) {
var manifests []api.ContainerManifest
data, err := ioutil.ReadAll(reader)
if err != nil {
log.Printf("Couldn't read from reader: %v", err)
return err
return manifests, err
}
if err = kl.ExtractYAMLData(data, &manifests); err != nil {
return err
return manifests, err
}
changeChannel <- manifests
return nil
return manifests, nil
}
// Watch a file for changes to the set of pods that should run on this Kubelet
// This function loops forever and is intended to be run as a goroutine
func (kl *Kubelet) WatchFile(file string, changeChannel chan<- []api.ContainerManifest) {
func (kl *Kubelet) WatchFile(file string, updateChannel chan<- manifestUpdate) {
for {
var err error
@ -446,19 +453,19 @@ func (kl *Kubelet) WatchFile(file string, changeChannel chan<- []api.ContainerMa
log.Printf("Error polling dir: %#v", err)
continue
}
changeChannel <- manifests
updateChannel <- manifestUpdate{fileSource, manifests}
} else {
manifest, err := kl.extractFromFile(file)
if err != nil {
log.Printf("Error polling file: %#v", err)
continue
}
changeChannel <- []api.ContainerManifest{manifest}
updateChannel <- manifestUpdate{fileSource, []api.ContainerManifest{manifest}}
}
}
}
func (kl *Kubelet) extractFromHTTP(url string, changeChannel chan<- []api.ContainerManifest) error {
func (kl *Kubelet) extractFromHTTP(url string, updateChannel chan<- manifestUpdate) error {
client := &http.Client{}
request, err := http.NewRequest("GET", url, nil)
if err != nil {
@ -469,16 +476,21 @@ func (kl *Kubelet) extractFromHTTP(url string, changeChannel chan<- []api.Contai
return err
}
defer response.Body.Close()
return kl.extractMultipleFromReader(response.Body, changeChannel)
manifests, err := kl.extractMultipleFromReader(response.Body)
if err != nil {
return err
}
updateChannel <- manifestUpdate{httpClientSource, manifests}
return nil
}
// Watch an HTTP endpoint for changes to the set of pods that should run on this Kubelet
// This function runs forever and is intended to be run as a goroutine
func (kl *Kubelet) WatchHTTP(url string, changeChannel chan<- []api.ContainerManifest) {
func (kl *Kubelet) WatchHTTP(url string, updateChannel chan<- manifestUpdate) {
for {
var err error
time.Sleep(kl.HTTPCheckFrequency)
err = kl.extractFromHTTP(url, changeChannel)
err = kl.extractFromHTTP(url, updateChannel)
if err != nil {
log.Printf("Error syncing http: %#v", err)
}
@ -496,7 +508,7 @@ func (kl *Kubelet) ResponseToManifests(response *etcd.Response) ([]api.Container
return manifests, err
}
func (kl *Kubelet) getKubeletStateFromEtcd(key string, changeChannel chan<- []api.ContainerManifest) error {
func (kl *Kubelet) getKubeletStateFromEtcd(key string, updateChannel chan<- manifestUpdate) error {
response, err := kl.Client.Get(key+"/kubelet", true, false)
if err != nil {
log.Printf("Error on get on %s: %#v", key, err)
@ -515,18 +527,18 @@ func (kl *Kubelet) getKubeletStateFromEtcd(key string, changeChannel chan<- []ap
return err
}
log.Printf("Got initial state from etcd: %+v", manifests)
changeChannel <- manifests
updateChannel <- manifestUpdate{etcdSource, manifests}
return nil
}
// Sync with etcd, and set up an etcd watch for new configurations
// The channel to send new configurations across
// This function loops forever and is intended to be run in a go routine.
func (kl *Kubelet) SyncAndSetupEtcdWatch(changeChannel chan<- []api.ContainerManifest) {
func (kl *Kubelet) SyncAndSetupEtcdWatch(updateChannel chan<- manifestUpdate) {
key := "/registry/hosts/" + strings.TrimSpace(kl.Hostname)
// First fetch the initial configuration (watch only gives changes...)
for {
err := kl.getKubeletStateFromEtcd(key, changeChannel)
err := kl.getKubeletStateFromEtcd(key, updateChannel)
if err == nil {
// We got a successful response, etcd is up, set up the watch.
break
@ -542,9 +554,9 @@ func (kl *Kubelet) SyncAndSetupEtcdWatch(changeChannel chan<- []api.ContainerMan
watchChannel := make(chan *etcd.Response)
// We don't push this through Forever because if it dies, we just do it again in 30 secs.
// anyway.
go kl.WatchEtcd(watchChannel, changeChannel)
go kl.WatchEtcd(watchChannel, updateChannel)
kl.getKubeletStateFromEtcd(key, changeChannel)
kl.getKubeletStateFromEtcd(key, updateChannel)
log.Printf("Setting up a watch for configuration changes in etcd for %s", key)
kl.Client.Watch(key, 0, true, watchChannel, done)
}
@ -579,7 +591,7 @@ func (kl *Kubelet) extractFromEtcd(response *etcd.Response) ([]api.ContainerMani
// Watch etcd for changes, receives config objects from the etcd client watch.
// This function loops until the watchChannel is closed, and is intended to be run as a goroutine.
func (kl *Kubelet) WatchEtcd(watchChannel <-chan *etcd.Response, changeChannel chan<- []api.ContainerManifest) {
func (kl *Kubelet) WatchEtcd(watchChannel <-chan *etcd.Response, updateChannel chan<- manifestUpdate) {
defer util.HandleCrash()
for {
watchResponse := <-watchChannel
@ -596,7 +608,7 @@ func (kl *Kubelet) WatchEtcd(watchChannel <-chan *etcd.Response, changeChannel c
log.Printf("manifests: %#v", manifests)
// Ok, we have a valid configuration, send to channel for
// rejiggering.
changeChannel <- manifests
updateChannel <- manifestUpdate{etcdSource, manifests}
}
}
@ -710,29 +722,20 @@ func (kl *Kubelet) SyncManifests(config []api.ContainerManifest) error {
// no changes are seen to the configuration, will synchronize the last known desired
// state every sync_frequency seconds.
// Never returns.
func (kl *Kubelet) RunSyncLoop(etcdChannel, fileChannel, serverChannel, httpChannel <-chan []api.ContainerManifest, handler SyncHandler) {
var lastFile, lastEtcd, lastHttp, lastServer []api.ContainerManifest
func (kl *Kubelet) RunSyncLoop(updateChannel <-chan manifestUpdate, handler SyncHandler) {
last := make(map[string][]api.ContainerManifest)
for {
select {
case manifests := <-fileChannel:
log.Printf("Got new configuration from file/dir... %v", manifests)
lastFile = manifests
case manifests := <-etcdChannel:
log.Printf("Got new configuration from etcd... %v", manifests)
lastEtcd = manifests
case manifests := <-httpChannel:
log.Printf("Got new configuration from external http... %v", manifests)
lastHttp = manifests
case manifests := <-serverChannel:
log.Printf("Got new configuration from our server... %v", manifests)
lastServer = manifests
case u := <-updateChannel:
log.Printf("Got configuration from %s: %#v", u.source, u.manifests)
last[u.source] = u.manifests
case <-time.After(kl.SyncFrequency):
}
manifests := append([]api.ContainerManifest{}, lastFile...)
manifests = append(manifests, lastEtcd...)
manifests = append(manifests, lastHttp...)
manifests = append(manifests, lastServer...)
manifests := []api.ContainerManifest{}
for _, m := range last {
manifests = append(manifests, m...)
}
err := handler.SyncManifests(manifests)
if err != nil {

View File

@ -29,7 +29,7 @@ import (
type KubeletServer struct {
Kubelet kubeletInterface
UpdateChannel chan []api.ContainerManifest
UpdateChannel chan manifestUpdate
}
// kubeletInterface contains all the kubelet methods required by the server.
@ -67,7 +67,7 @@ func (s *KubeletServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
s.error(w, err)
return
}
s.UpdateChannel <- []api.ContainerManifest{manifest}
s.UpdateChannel <- manifestUpdate{httpServerSource, []api.ContainerManifest{manifest}}
} else if u.Path == "/containers" {
var manifests []api.ContainerManifest
err = yaml.Unmarshal(data, &manifests)
@ -75,7 +75,7 @@ func (s *KubeletServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
s.error(w, err)
return
}
s.UpdateChannel <- manifests
s.UpdateChannel <- manifestUpdate{httpServerSource, manifests}
}
case u.Path == "/containerStats":
container := u.Query().Get("container")

View File

@ -33,7 +33,7 @@ func (fk *fakeKubelet) GetContainerStats(name string) (*api.ContainerStats, erro
}
type serverTestFramework struct {
updateChan chan []api.ContainerManifest
updateChan chan manifestUpdate
updateReader *channelReader
serverUnderTest *KubeletServer
fakeKubelet *fakeKubelet
@ -42,7 +42,7 @@ type serverTestFramework struct {
func makeServerTest() *serverTestFramework {
fw := &serverTestFramework{
updateChan: make(chan []api.ContainerManifest),
updateChan: make(chan manifestUpdate),
}
fw.updateReader = startReading(fw.updateChan)
fw.fakeKubelet = &fakeKubelet{}

View File

@ -375,16 +375,16 @@ type channelReader struct {
wg sync.WaitGroup
}
func startReading(channel <-chan []api.ContainerManifest) *channelReader {
func startReading(channel <-chan manifestUpdate) *channelReader {
cr := &channelReader{}
cr.wg.Add(1)
go func() {
for {
containers, ok := <-channel
update, ok := <-channel
if !ok {
break
}
cr.list = append(cr.list, containers)
cr.list = append(cr.list, update.manifests)
}
cr.wg.Done()
}()
@ -401,7 +401,7 @@ func TestGetKubeletStateFromEtcdNoData(t *testing.T) {
kubelet := Kubelet{
Client: fakeClient,
}
channel := make(chan []api.ContainerManifest)
channel := make(chan manifestUpdate)
reader := startReading(channel)
fakeClient.Data["/registry/hosts/machine/kubelet"] = util.EtcdResponseWithError{
R: &etcd.Response{},
@ -423,7 +423,7 @@ func TestGetKubeletStateFromEtcd(t *testing.T) {
kubelet := Kubelet{
Client: fakeClient,
}
channel := make(chan []api.ContainerManifest)
channel := make(chan manifestUpdate)
reader := startReading(channel)
fakeClient.Data["/registry/hosts/machine/kubelet"] = util.EtcdResponseWithError{
R: &etcd.Response{
@ -447,7 +447,7 @@ func TestGetKubeletStateFromEtcdNotFound(t *testing.T) {
kubelet := Kubelet{
Client: fakeClient,
}
channel := make(chan []api.ContainerManifest)
channel := make(chan manifestUpdate)
reader := startReading(channel)
fakeClient.Data["/registry/hosts/machine/kubelet"] = util.EtcdResponseWithError{
R: &etcd.Response{},
@ -469,7 +469,7 @@ func TestGetKubeletStateFromEtcdError(t *testing.T) {
kubelet := Kubelet{
Client: fakeClient,
}
channel := make(chan []api.ContainerManifest)
channel := make(chan manifestUpdate)
reader := startReading(channel)
fakeClient.Data["/registry/hosts/machine/kubelet"] = util.EtcdResponseWithError{
R: &etcd.Response{},
@ -811,14 +811,14 @@ func TestExtractFromDir(t *testing.T) {
func TestExtractFromHttpBadness(t *testing.T) {
kubelet := Kubelet{}
changeChannel := make(chan []api.ContainerManifest)
reader := startReading(changeChannel)
updateChannel := make(chan manifestUpdate)
reader := startReading(updateChannel)
err := kubelet.extractFromHTTP("http://localhost:12345", changeChannel)
err := kubelet.extractFromHTTP("http://localhost:12345", updateChannel)
if err == nil {
t.Error("Unexpected non-error.")
}
close(changeChannel)
close(updateChannel)
list := reader.GetList()
if len(list) != 0 {
@ -828,8 +828,8 @@ func TestExtractFromHttpBadness(t *testing.T) {
func TestExtractFromHttp(t *testing.T) {
kubelet := Kubelet{}
changeChannel := make(chan []api.ContainerManifest)
reader := startReading(changeChannel)
updateChannel := make(chan manifestUpdate)
reader := startReading(updateChannel)
manifests := []api.ContainerManifest{
{Id: "foo"},
@ -842,11 +842,11 @@ func TestExtractFromHttp(t *testing.T) {
}
testServer := httptest.NewServer(&fakeHandler)
err = kubelet.extractFromHTTP(testServer.URL, changeChannel)
err = kubelet.extractFromHTTP(testServer.URL, updateChannel)
if err != nil {
t.Errorf("Unexpected error: %#v", err)
}
close(changeChannel)
close(updateChannel)
read := reader.GetList()
@ -860,9 +860,9 @@ func TestExtractFromHttp(t *testing.T) {
func TestWatchEtcd(t *testing.T) {
watchChannel := make(chan *etcd.Response)
changeChannel := make(chan []api.ContainerManifest)
updateChannel := make(chan manifestUpdate)
kubelet := Kubelet{}
reader := startReading(changeChannel)
reader := startReading(updateChannel)
manifest := []api.ContainerManifest{
{
@ -872,7 +872,7 @@ func TestWatchEtcd(t *testing.T) {
data, err := json.Marshal(manifest)
expectNoError(t, err)
go kubelet.WatchEtcd(watchChannel, changeChannel)
go kubelet.WatchEtcd(watchChannel, updateChannel)
watchChannel <- &etcd.Response{
Node: &etcd.Node{
@ -880,7 +880,7 @@ func TestWatchEtcd(t *testing.T) {
},
}
close(watchChannel)
close(changeChannel)
close(updateChannel)
read := reader.GetList()
if len(read) != 1 ||