From 5736cc8d8ab3e91ba62d322e10693c3f6519a2bf Mon Sep 17 00:00:00 2001 From: Justin Huff Date: Wed, 18 Jun 2014 15:53:04 -0700 Subject: [PATCH 1/5] Pass through args to localkube --- hack/local-up.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hack/local-up.sh b/hack/local-up.sh index ff1ee378772..7cd2b110048 100755 --- a/hack/local-up.sh +++ b/hack/local-up.sh @@ -46,6 +46,6 @@ ETCD_PID=$! sleep 5 echo "Running localkube as root (so it can talk to docker's unix socket)" -sudo $(dirname $0)/../output/go/localkube +sudo $(dirname $0)/../output/go/localkube $* kill $ETCD_PID From f49b9c2429dee672a7e738a58bb62d7b503f94a3 Mon Sep 17 00:00:00 2001 From: Justin Huff Date: Fri, 20 Jun 2014 09:31:18 -0700 Subject: [PATCH 2/5] Fix merge conflicts --- pkg/kubelet/kubelet.go | 88 +++++++++-------- pkg/kubelet/kubelet_server.go | 27 ++++-- pkg/kubelet/kubelet_server_test.go | 65 ++++++------- pkg/kubelet/kubelet_test.go | 147 ++++++++++------------------- 4 files changed, 148 insertions(+), 179 deletions(-) diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index a1c7dac82d9..fc7236a7235 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -17,7 +17,6 @@ limitations under the License. package kubelet import ( - "bytes" "encoding/json" "fmt" "io" @@ -79,12 +78,14 @@ type Kubelet struct { // Starts background goroutines. If file, manifest_url, or address are empty, // they are not watched. Never returns. func (kl *Kubelet) RunKubelet(file, manifest_url, etcd_servers, address string, port uint) { - fileChannel := make(chan api.ContainerManifest) + fileChannel := make(chan []api.ContainerManifest) etcdChannel := make(chan []api.ContainerManifest) - httpChannel := make(chan api.ContainerManifest) - serverChannel := make(chan api.ContainerManifest) + httpChannel := make(chan []api.ContainerManifest) + serverChannel := make(chan []api.ContainerManifest) - go util.Forever(func() { kl.WatchFile(file, fileChannel) }, 20*time.Second) + if file != "" { + go util.Forever(func() { kl.WatchFile(file, fileChannel) }, 20*time.Second) + } if manifest_url != "" { go util.Forever(func() { kl.WatchHTTP(manifest_url, httpChannel) }, 20*time.Second) } @@ -363,72 +364,78 @@ func (kl *Kubelet) KillContainer(name string) error { return err } -func (kl *Kubelet) extractFromFile(lastData []byte, name string, changeChannel chan<- api.ContainerManifest) ([]byte, error) { +func (kl *Kubelet) extractFromFile(name string, changeChannel chan<- []api.ContainerManifest) error { var file *os.File var err error if file, err = os.Open(name); err != nil { - return lastData, err + return err } - return kl.extractFromReader(lastData, file, changeChannel) + return kl.extractFromReader(file, changeChannel) } -func (kl *Kubelet) extractFromReader(lastData []byte, reader io.Reader, changeChannel chan<- api.ContainerManifest) ([]byte, error) { +func (kl *Kubelet) extractFromReader(reader io.Reader, changeChannel chan<- []api.ContainerManifest) error { var manifest api.ContainerManifest data, err := ioutil.ReadAll(reader) if err != nil { - log.Printf("Couldn't read file: %v", err) - return lastData, err + log.Printf("Couldn't read from reader: %v", err) + return err } if err = kl.ExtractYAMLData(data, &manifest); err != nil { - return lastData, err + return err } - if !bytes.Equal(lastData, data) { - lastData = data - // Ok, we have a valid configuration, send to channel for - // rejiggering. - changeChannel <- manifest - return data, nil + changeChannel <- []api.ContainerManifest{manifest} + return nil +} + +func (kl *Kubelet) extractMultipleFromReader(reader io.Reader, changeChannel chan<- []api.ContainerManifest) error { + var manifests []api.ContainerManifest + data, err := ioutil.ReadAll(reader) + if err != nil { + log.Printf("Couldn't read from reader: %v", err) + return err } - return lastData, nil + if err = kl.ExtractYAMLData(data, &manifests); err != nil { + return err + } + changeChannel <- manifests + return nil } // Watch a file for changes to the set of pods that should run on this Kubelet // This function loops forever and is intended to be run as a goroutine -func (kl *Kubelet) WatchFile(file string, changeChannel chan<- api.ContainerManifest) { - var lastData []byte +func (kl *Kubelet) WatchFile(file string, changeChannel chan<- []api.ContainerManifest) { for { var err error time.Sleep(kl.FileCheckFrequency) - lastData, err = kl.extractFromFile(lastData, file, changeChannel) + err = kl.extractFromFile(file, changeChannel) if err != nil { log.Printf("Error polling file: %#v", err) } } } -func (kl *Kubelet) extractFromHTTP(lastData []byte, url string, changeChannel chan<- api.ContainerManifest) ([]byte, error) { +func (kl *Kubelet) extractFromHTTP(url string, changeChannel chan<- []api.ContainerManifest) error { client := &http.Client{} request, err := http.NewRequest("GET", url, nil) if err != nil { - return lastData, err + return err } response, err := client.Do(request) if err != nil { - return lastData, err + return err } defer response.Body.Close() - return kl.extractFromReader(lastData, response.Body, changeChannel) + return kl.extractMultipleFromReader(response.Body, changeChannel) } // Watch an HTTP endpoint for changes to the set of pods that should run on this Kubelet // This function runs forever and is intended to be run as a goroutine -func (kl *Kubelet) WatchHTTP(url string, changeChannel chan<- api.ContainerManifest) { - var lastData []byte +func (kl *Kubelet) WatchHTTP(url string, changeChannel chan<- []api.ContainerManifest) { for { var err error time.Sleep(kl.HTTPCheckFrequency) - lastData, err = kl.extractFromHTTP(lastData, url, changeChannel) + err = kl.extractFromHTTP(url, changeChannel) if err != nil { log.Printf("Error syncing http: %#v", err) } @@ -655,27 +662,27 @@ func (kl *Kubelet) SyncManifests(config []api.ContainerManifest) error { } // runSyncLoop is the main loop for processing changes. It watches for changes from -// four channels (file, etcd, server, and http) and creates a union of the two. For +// four channels (file, etcd, server, and http) and creates a union of them. For // any new change seen, will run a sync against desired state and running state. If // no changes are seen to the configuration, will synchronize the last known desired // state every sync_frequency seconds. // Never returns. -func (kl *Kubelet) RunSyncLoop(etcdChannel <-chan []api.ContainerManifest, fileChannel, serverChannel, httpChannel <-chan api.ContainerManifest, handler SyncHandler) { +func (kl *Kubelet) RunSyncLoop(etcdChannel, fileChannel, serverChannel, httpChannel <-chan []api.ContainerManifest, handler SyncHandler) { var lastFile, lastEtcd, lastHttp, lastServer []api.ContainerManifest for { select { - case manifest := <-fileChannel: - log.Printf("Got new manifest from file... %v", manifest) - lastFile = []api.ContainerManifest{manifest} + case manifests := <-fileChannel: + log.Printf("Got new configuration from file... %v", manifests) + lastFile = manifests case manifests := <-etcdChannel: log.Printf("Got new configuration from etcd... %v", manifests) lastEtcd = manifests - case manifest := <-httpChannel: - log.Printf("Got new manifest from external http... %v", manifest) - lastHttp = []api.ContainerManifest{manifest} - case manifest := <-serverChannel: - log.Printf("Got new manifest from our server... %v", manifest) - lastServer = []api.ContainerManifest{manifest} + case manifests := <-httpChannel: + log.Printf("Got new configuration from external http... %v", manifests) + lastHttp = manifests + case manifests := <-serverChannel: + log.Printf("Got new configuration from our server... %v", manifests) + lastServer = manifests case <-time.After(kl.SyncFrequency): } @@ -683,6 +690,7 @@ func (kl *Kubelet) RunSyncLoop(etcdChannel <-chan []api.ContainerManifest, fileC manifests = append(manifests, lastEtcd...) manifests = append(manifests, lastHttp...) manifests = append(manifests, lastServer...) + err := handler.SyncManifests(manifests) if err != nil { log.Printf("Couldn't sync containers : %#v", err) diff --git a/pkg/kubelet/kubelet_server.go b/pkg/kubelet/kubelet_server.go index 04899cee38c..34ae0c9bc42 100644 --- a/pkg/kubelet/kubelet_server.go +++ b/pkg/kubelet/kubelet_server.go @@ -29,7 +29,7 @@ import ( type KubeletServer struct { Kubelet kubeletInterface - UpdateChannel chan api.ContainerManifest + UpdateChannel chan []api.ContainerManifest } // kubeletInterface contains all the kubelet methods required by the server. @@ -52,20 +52,31 @@ func (s *KubeletServer) ServeHTTP(w http.ResponseWriter, req *http.Request) { return } switch { - case u.Path == "/container": + case u.Path == "/container" || u.Path == "/containers": defer req.Body.Close() data, err := ioutil.ReadAll(req.Body) if err != nil { s.error(w, err) return } - var manifest api.ContainerManifest - err = yaml.Unmarshal(data, &manifest) - if err != nil { - s.error(w, err) - return + if u.Path == "/container" { + // This is to provide backward compatibility. It only supports a single manifest + var manifest api.ContainerManifest + err = yaml.Unmarshal(data, &manifest) + if err != nil { + s.error(w, err) + return + } + s.UpdateChannel <- []api.ContainerManifest{manifest} + } else if u.Path == "/containers" { + var manifests []api.ContainerManifest + err = yaml.Unmarshal(data, &manifests) + if err != nil { + s.error(w, err) + return + } + s.UpdateChannel <- manifests } - s.UpdateChannel <- manifest case u.Path == "/containerStats": container := u.Query().Get("container") if len(container) == 0 { diff --git a/pkg/kubelet/kubelet_server_test.go b/pkg/kubelet/kubelet_server_test.go index ba2a3531f46..c8c026d1314 100644 --- a/pkg/kubelet/kubelet_server_test.go +++ b/pkg/kubelet/kubelet_server_test.go @@ -8,7 +8,6 @@ import ( "net/http" "net/http/httptest" "reflect" - "sync" "testing" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" @@ -33,37 +32,9 @@ func (fk *fakeKubelet) GetContainerStats(name string) (*api.ContainerStats, erro return fk.statsFunc(name) } -// If we made everything distribute a list of ContainerManifests, we could just use -// channelReader. -type channelReaderSingle struct { - list []api.ContainerManifest - wg sync.WaitGroup -} - -func startReadingSingle(channel <-chan api.ContainerManifest) *channelReaderSingle { - cr := &channelReaderSingle{} - cr.wg.Add(1) - go func() { - for { - manifest, ok := <-channel - if !ok { - break - } - cr.list = append(cr.list, manifest) - } - cr.wg.Done() - }() - return cr -} - -func (cr *channelReaderSingle) GetList() []api.ContainerManifest { - cr.wg.Wait() - return cr.list -} - type serverTestFramework struct { - updateChan chan api.ContainerManifest - updateReader *channelReaderSingle + updateChan chan []api.ContainerManifest + updateReader *channelReader serverUnderTest *KubeletServer fakeKubelet *fakeKubelet testHttpServer *httptest.Server @@ -71,9 +42,9 @@ type serverTestFramework struct { func makeServerTest() *serverTestFramework { fw := &serverTestFramework{ - updateChan: make(chan api.ContainerManifest), + updateChan: make(chan []api.ContainerManifest), } - fw.updateReader = startReadingSingle(fw.updateChan) + fw.updateReader = startReading(fw.updateChan) fw.fakeKubelet = &fakeKubelet{} fw.serverUnderTest = &KubeletServer{ Kubelet: fw.fakeKubelet, @@ -91,8 +62,10 @@ func readResp(resp *http.Response) (string, error) { func TestContainer(t *testing.T) { fw := makeServerTest() - expected := api.ContainerManifest{Id: "test_manifest"} - body := bytes.NewBuffer([]byte(util.MakeJSONString(expected))) + expected := []api.ContainerManifest{ + {Id: "test_manifest"}, + } + body := bytes.NewBuffer([]byte(util.MakeJSONString(expected[0]))) // Only send a single ContainerManifest resp, err := http.Post(fw.testHttpServer.URL+"/container", "application/json", body) if err != nil { t.Errorf("Post returned: %v", err) @@ -108,6 +81,28 @@ func TestContainer(t *testing.T) { } } +func TestContainers(t *testing.T) { + fw := makeServerTest() + expected := []api.ContainerManifest{ + {Id: "test_manifest_1"}, + {Id: "test_manifest_2"}, + } + body := bytes.NewBuffer([]byte(util.MakeJSONString(expected))) + resp, err := http.Post(fw.testHttpServer.URL+"/containers", "application/json", body) + if err != nil { + t.Errorf("Post returned: %v", err) + } + resp.Body.Close() + close(fw.updateChan) + received := fw.updateReader.GetList() + if len(received) != 1 { + t.Errorf("Expected 1 update, but got %v", len(received)) + } + if !reflect.DeepEqual(expected, received[0]) { + t.Errorf("Expected %#v, but got %#v", expected, received[0]) + } +} + func TestContainerInfo(t *testing.T) { fw := makeServerTest() expected := "good container info string" diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 9116d9ce897..f966489e333 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -16,7 +16,6 @@ limitations under the License. package kubelet import ( - "bytes" "encoding/json" "fmt" "io/ioutil" @@ -730,164 +729,120 @@ func TestMakePortsAndBindings(t *testing.T) { func TestExtractFromNonExistentFile(t *testing.T) { kubelet := Kubelet{} - changeChannel := make(chan api.ContainerManifest) - lastData := []byte{1, 2, 3} - data, err := kubelet.extractFromFile(lastData, "/some/fake/file", changeChannel) + changeChannel := make(chan []api.ContainerManifest) + reader := startReading(changeChannel) + + err := kubelet.extractFromFile("/some/fake/file", changeChannel) + close(changeChannel) + if err == nil { t.Error("Unexpected non-error.") } - if !bytes.Equal(data, lastData) { - t.Errorf("Unexpected data response. Expected %#v, found %#v", lastData, data) + + list := reader.GetList() + if len(list) != 0 { + t.Errorf("Unexpected list: %#v", list) } } func TestExtractFromBadDataFile(t *testing.T) { kubelet := Kubelet{} - changeChannel := make(chan api.ContainerManifest) - lastData := []byte{1, 2, 3} + changeChannel := make(chan []api.ContainerManifest) + reader := startReading(changeChannel) + + badData := []byte{1, 2, 3} file, err := ioutil.TempFile("", "foo") expectNoError(t, err) name := file.Name() file.Close() - ioutil.WriteFile(name, lastData, 0755) - data, err := kubelet.extractFromFile(lastData, name, changeChannel) + ioutil.WriteFile(name, badData, 0755) + err = kubelet.extractFromFile(name, changeChannel) + close(changeChannel) if err == nil { t.Error("Unexpected non-error.") } - if !bytes.Equal(data, lastData) { - t.Errorf("Unexpected data response. Expected %#v, found %#v", lastData, data) + + list := reader.GetList() + if len(list) != 0 { + t.Errorf("Unexpected list: %#v", list) } } -func TestExtractFromSameDataFile(t *testing.T) { +func TestExtractFromValidDataFile(t *testing.T) { kubelet := Kubelet{} - changeChannel := make(chan api.ContainerManifest) - manifest := api.ContainerManifest{ - Id: "foo", + changeChannel := make(chan []api.ContainerManifest) + reader := startReading(changeChannel) + + manifests := []api.ContainerManifest{ + {Id: "bar"}, } - lastData, err := json.Marshal(manifest) + data, err := json.Marshal(manifests[0]) // Right now, files only support a single manifest expectNoError(t, err) file, err := ioutil.TempFile("", "foo") expectNoError(t, err) name := file.Name() expectNoError(t, file.Close()) - ioutil.WriteFile(name, lastData, 0755) - data, err := kubelet.extractFromFile(lastData, name, changeChannel) - - expectNoError(t, err) - if !bytes.Equal(data, lastData) { - t.Errorf("Unexpected data response. Expected %#v, found %#v", lastData, data) - } -} - -func TestExtractFromChangedDataFile(t *testing.T) { - kubelet := Kubelet{} - changeChannel := make(chan api.ContainerManifest) - reader := startReadingSingle(changeChannel) - oldManifest := api.ContainerManifest{ - Id: "foo", - } - newManifest := api.ContainerManifest{ - Id: "bar", - } - lastData, err := json.Marshal(oldManifest) - expectNoError(t, err) - newData, err := json.Marshal(newManifest) - expectNoError(t, err) - file, err := ioutil.TempFile("", "foo") - expectNoError(t, err) - name := file.Name() - expectNoError(t, file.Close()) - ioutil.WriteFile(name, newData, 0755) - data, err := kubelet.extractFromFile(lastData, name, changeChannel) + ioutil.WriteFile(name, data, 0755) + err = kubelet.extractFromFile(name, changeChannel) close(changeChannel) expectNoError(t, err) - if !bytes.Equal(data, newData) { - t.Errorf("Unexpected data response. Expected %#v, found %#v", lastData, data) - } read := reader.GetList() if len(read) != 1 { t.Errorf("Unexpected channel traffic: %#v", read) } - if !reflect.DeepEqual(read[0], newManifest) { - t.Errorf("Unexpected difference. Expected %#v, got %#v", newManifest, read[0]) + if !reflect.DeepEqual(read[0], manifests) { + t.Errorf("Unexpected difference. Expected %#v, got %#v", manifests, read[0]) } } func TestExtractFromHttpBadness(t *testing.T) { kubelet := Kubelet{} - lastData := []byte{1, 2, 3} - changeChannel := make(chan api.ContainerManifest) - data, err := kubelet.extractFromHTTP(lastData, "http://localhost:12345", changeChannel) + changeChannel := make(chan []api.ContainerManifest) + reader := startReading(changeChannel) + + err := kubelet.extractFromHTTP("http://localhost:12345", changeChannel) if err == nil { t.Error("Unexpected non-error.") } - if !bytes.Equal(lastData, data) { - t.Errorf("Unexpected difference. Expected: %#v, Saw: %#v", lastData, data) + close(changeChannel) + list := reader.GetList() + + if len(list) != 0 { + t.Errorf("Unexpected list: %#v", list) } } -func TestExtractFromHttpNoChange(t *testing.T) { +func TestExtractFromHttp(t *testing.T) { kubelet := Kubelet{} - changeChannel := make(chan api.ContainerManifest) + changeChannel := make(chan []api.ContainerManifest) + reader := startReading(changeChannel) - manifest := api.ContainerManifest{ - Id: "foo", + manifests := []api.ContainerManifest{ + {Id: "foo"}, } - lastData, err := json.Marshal(manifest) + data, err := json.Marshal(manifests) fakeHandler := util.FakeHandler{ StatusCode: 200, - ResponseBody: string(lastData), + ResponseBody: string(data), } testServer := httptest.NewServer(&fakeHandler) - data, err := kubelet.extractFromHTTP(lastData, testServer.URL, changeChannel) + err = kubelet.extractFromHTTP(testServer.URL, changeChannel) if err != nil { t.Errorf("Unexpected error: %#v", err) } - if !bytes.Equal(lastData, data) { - t.Errorf("Unexpected difference. Expected: %#v, Saw: %#v", lastData, data) - } -} - -func TestExtractFromHttpChanges(t *testing.T) { - kubelet := Kubelet{} - changeChannel := make(chan api.ContainerManifest) - reader := startReadingSingle(changeChannel) - - manifest := api.ContainerManifest{ - Id: "foo", - } - newManifest := api.ContainerManifest{ - Id: "bar", - } - lastData, _ := json.Marshal(manifest) - newData, _ := json.Marshal(newManifest) - fakeHandler := util.FakeHandler{ - StatusCode: 200, - ResponseBody: string(newData), - } - testServer := httptest.NewServer(&fakeHandler) - - data, err := kubelet.extractFromHTTP(lastData, testServer.URL, changeChannel) close(changeChannel) read := reader.GetList() - if err != nil { - t.Errorf("Unexpected error: %#v", err) - } if len(read) != 1 { t.Errorf("Unexpected list: %#v", read) } - if !bytes.Equal(newData, data) { - t.Errorf("Unexpected difference. Expected: %#v, Saw: %#v", lastData, data) - } - if !reflect.DeepEqual(newManifest, read[0]) { - t.Errorf("Unexpected difference. Expected: %#v, Saw: %#v", newManifest, read[0]) + if !reflect.DeepEqual(manifests, read[0]) { + t.Errorf("Unexpected difference. Expected: %#v, Saw: %#v", manifests, read[0]) } } From d204f76484360f13e7742b5ed6494e23fcb4899d Mon Sep 17 00:00:00 2001 From: Justin Huff Date: Thu, 19 Jun 2014 13:06:52 -0700 Subject: [PATCH 3/5] Add config dir support to kubelet --- cmd/kubelet/kubelet.go | 2 +- cmd/localkube/localkube.go | 2 +- pkg/kubelet/kubelet.go | 64 +++++++++++++++++++++++-------- pkg/kubelet/kubelet_test.go | 76 +++++++++++++++++++++---------------- 4 files changed, 95 insertions(+), 49 deletions(-) diff --git a/cmd/kubelet/kubelet.go b/cmd/kubelet/kubelet.go index 7c6071036d1..56308f9e555 100644 --- a/cmd/kubelet/kubelet.go +++ b/cmd/kubelet/kubelet.go @@ -33,7 +33,7 @@ import ( ) var ( - file = flag.String("config", "", "Path to the config file") + file = flag.String("config", "", "Path to the config file/dir") etcdServers = flag.String("etcd_servers", "", "Url of etcd servers in the cluster") syncFrequency = flag.Duration("sync_frequency", 10*time.Second, "Max period between synchronizing running containers and config") fileCheckFrequency = flag.Duration("file_check_frequency", 20*time.Second, "Duration between checking file for new data") diff --git a/cmd/localkube/localkube.go b/cmd/localkube/localkube.go index 5f201162194..42f108fcc12 100644 --- a/cmd/localkube/localkube.go +++ b/cmd/localkube/localkube.go @@ -38,7 +38,7 @@ import ( // kubelet flags var ( - file = flag.String("config", "", "Path to the config file") + file = flag.String("config", "", "Path to the config file/dir") syncFrequency = flag.Duration("sync_frequency", 10*time.Second, "Max period between synchronizing running containers and config") fileCheckFrequency = flag.Duration("file_check_frequency", 20*time.Second, "Duration between checking file for new data") httpCheckFrequency = flag.Duration("http_check_frequency", 20*time.Second, "Duration between checking http for new data") diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index fc7236a7235..2f1c922c630 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -26,6 +26,7 @@ import ( "net/http" "os" "os/exec" + "path/filepath" "strconv" "strings" "sync" @@ -364,28 +365,43 @@ func (kl *Kubelet) KillContainer(name string) error { return err } -func (kl *Kubelet) extractFromFile(name string, changeChannel chan<- []api.ContainerManifest) error { +func (kl *Kubelet) extractFromFile(name string) (api.ContainerManifest, error) { var file *os.File var err error + var manifest api.ContainerManifest + if file, err = os.Open(name); err != nil { - return err + return manifest, err } - return kl.extractFromReader(file, changeChannel) -} - -func (kl *Kubelet) extractFromReader(reader io.Reader, changeChannel chan<- []api.ContainerManifest) error { - var manifest api.ContainerManifest - data, err := ioutil.ReadAll(reader) + data, err := ioutil.ReadAll(file) if err != nil { - log.Printf("Couldn't read from reader: %v", err) - return err + log.Printf("Couldn't read from file: %v", err) + return manifest, err } if err = kl.ExtractYAMLData(data, &manifest); err != nil { - return err + return manifest, err } - changeChannel <- []api.ContainerManifest{manifest} - return nil + return manifest, nil +} + +func (kl *Kubelet) extractFromDir(name string) ([]api.ContainerManifest, error) { + var manifests []api.ContainerManifest + + files, err := filepath.Glob(filepath.Join(name, "*")) + if err != nil { + return manifests, err + } + + for _, file := range files { + manifest, err := kl.extractFromFile(file) + if err != nil { + log.Printf("Couldn't read from file %s: %v", file, err) + return manifests, err + } + manifests = append(manifests, manifest) + } + return manifests, nil } func (kl *Kubelet) extractMultipleFromReader(reader io.Reader, changeChannel chan<- []api.ContainerManifest) error { @@ -407,10 +423,28 @@ func (kl *Kubelet) extractMultipleFromReader(reader io.Reader, changeChannel cha func (kl *Kubelet) WatchFile(file string, changeChannel chan<- []api.ContainerManifest) { for { var err error + time.Sleep(kl.FileCheckFrequency) - err = kl.extractFromFile(file, changeChannel) + + fileInfo, err := os.Stat(file) if err != nil { log.Printf("Error polling file: %#v", err) + continue + } + if fileInfo.IsDir() { + manifests, err := kl.extractFromDir(file) + if err != nil { + log.Printf("Error polling dir: %#v", err) + continue + } + changeChannel <- manifests + } else { + manifest, err := kl.extractFromFile(file) + if err != nil { + log.Printf("Error polling file: %#v", err) + continue + } + changeChannel <- []api.ContainerManifest{manifest} } } } @@ -672,7 +706,7 @@ func (kl *Kubelet) RunSyncLoop(etcdChannel, fileChannel, serverChannel, httpChan for { select { case manifests := <-fileChannel: - log.Printf("Got new configuration from file... %v", manifests) + log.Printf("Got new configuration from file/dir... %v", manifests) lastFile = manifests case manifests := <-etcdChannel: log.Printf("Got new configuration from etcd... %v", manifests) diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index f966489e333..b0c7b24bf2c 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -729,26 +729,14 @@ func TestMakePortsAndBindings(t *testing.T) { func TestExtractFromNonExistentFile(t *testing.T) { kubelet := Kubelet{} - changeChannel := make(chan []api.ContainerManifest) - reader := startReading(changeChannel) - - err := kubelet.extractFromFile("/some/fake/file", changeChannel) - close(changeChannel) - + _, err := kubelet.extractFromFile("/some/fake/file") if err == nil { t.Error("Unexpected non-error.") } - - list := reader.GetList() - if len(list) != 0 { - t.Errorf("Unexpected list: %#v", list) - } } func TestExtractFromBadDataFile(t *testing.T) { kubelet := Kubelet{} - changeChannel := make(chan []api.ContainerManifest) - reader := startReading(changeChannel) badData := []byte{1, 2, 3} file, err := ioutil.TempFile("", "foo") @@ -756,44 +744,68 @@ func TestExtractFromBadDataFile(t *testing.T) { name := file.Name() file.Close() ioutil.WriteFile(name, badData, 0755) - err = kubelet.extractFromFile(name, changeChannel) - close(changeChannel) + _, err = kubelet.extractFromFile(name) if err == nil { t.Error("Unexpected non-error.") } - list := reader.GetList() - if len(list) != 0 { - t.Errorf("Unexpected list: %#v", list) - } } func TestExtractFromValidDataFile(t *testing.T) { kubelet := Kubelet{} - changeChannel := make(chan []api.ContainerManifest) - reader := startReading(changeChannel) - manifests := []api.ContainerManifest{ - {Id: "bar"}, - } - data, err := json.Marshal(manifests[0]) // Right now, files only support a single manifest + manifest := api.ContainerManifest{Id: "bar"} + data, err := json.Marshal(manifest) expectNoError(t, err) file, err := ioutil.TempFile("", "foo") expectNoError(t, err) name := file.Name() expectNoError(t, file.Close()) ioutil.WriteFile(name, data, 0755) - err = kubelet.extractFromFile(name, changeChannel) - close(changeChannel) + read, err := kubelet.extractFromFile(name) expectNoError(t, err) - read := reader.GetList() - if len(read) != 1 { - t.Errorf("Unexpected channel traffic: %#v", read) + if !reflect.DeepEqual(read, manifest) { + t.Errorf("Unexpected difference. Expected %#v, got %#v", manifest, read) } - if !reflect.DeepEqual(read[0], manifests) { - t.Errorf("Unexpected difference. Expected %#v, got %#v", manifests, read[0]) +} + +func TestExtractFromEmptyDir(t *testing.T) { + kubelet := Kubelet{} + + dirName, err := ioutil.TempDir("", "foo") + expectNoError(t, err) + + _, err = kubelet.extractFromDir(dirName) + expectNoError(t, err) +} + +func TestExtractFromDir(t *testing.T) { + kubelet := Kubelet{} + + manifests := []api.ContainerManifest{ + {Id: "foo"}, + {Id: "bar"}, + } + + dirName, err := ioutil.TempDir("", "foo") + expectNoError(t, err) + + for _, manifest := range manifests { + data, err := json.Marshal(manifest) + expectNoError(t, err) + file, err := ioutil.TempFile(dirName, "kub") + expectNoError(t, err) + name := file.Name() + expectNoError(t, file.Close()) + ioutil.WriteFile(name, data, 0755) + } + + read, err := kubelet.extractFromDir(dirName) + expectNoError(t, err) + if !reflect.DeepEqual(read, manifests) { + t.Errorf("Unexpected difference. Expected %#v, got %#v", manifests, read) } } From d5bf045a6c3ee029fbe607181d7e021b10507e8b Mon Sep 17 00:00:00 2001 From: Justin Huff Date: Thu, 19 Jun 2014 14:59:52 -0700 Subject: [PATCH 4/5] Make sure that config files are closed properly --- pkg/kubelet/kubelet.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 2f1c922c630..968c153e6c4 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -373,6 +373,7 @@ func (kl *Kubelet) extractFromFile(name string) (api.ContainerManifest, error) { if file, err = os.Open(name); err != nil { return manifest, err } + defer file.Close() data, err := ioutil.ReadAll(file) if err != nil { From ecf7d1147764ca7e2950c239dfc67edccae89a11 Mon Sep 17 00:00:00 2001 From: Justin Huff Date: Thu, 19 Jun 2014 15:14:57 -0700 Subject: [PATCH 5/5] Make config dir handling deterministic --- pkg/kubelet/kubelet.go | 3 +++ pkg/kubelet/kubelet_test.go | 6 +++--- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 968c153e6c4..25cd9fa3e59 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -27,6 +27,7 @@ import ( "os" "os/exec" "path/filepath" + "sort" "strconv" "strings" "sync" @@ -394,6 +395,8 @@ func (kl *Kubelet) extractFromDir(name string) ([]api.ContainerManifest, error) return manifests, err } + sort.Strings(files) + for _, file := range files { manifest, err := kl.extractFromFile(file) if err != nil { diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index b0c7b24bf2c..24a3f567380 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -785,8 +785,8 @@ func TestExtractFromDir(t *testing.T) { kubelet := Kubelet{} manifests := []api.ContainerManifest{ - {Id: "foo"}, - {Id: "bar"}, + {Id: "aaaa"}, + {Id: "bbbb"}, } dirName, err := ioutil.TempDir("", "foo") @@ -795,7 +795,7 @@ func TestExtractFromDir(t *testing.T) { for _, manifest := range manifests { data, err := json.Marshal(manifest) expectNoError(t, err) - file, err := ioutil.TempFile(dirName, "kub") + file, err := ioutil.TempFile(dirName, manifest.Id) expectNoError(t, err) name := file.Name() expectNoError(t, file.Close())