diff --git a/cmd/kubelet/kubelet.go b/cmd/kubelet/kubelet.go index 7c6071036d1..56308f9e555 100644 --- a/cmd/kubelet/kubelet.go +++ b/cmd/kubelet/kubelet.go @@ -33,7 +33,7 @@ import ( ) var ( - file = flag.String("config", "", "Path to the config file") + file = flag.String("config", "", "Path to the config file/dir") etcdServers = flag.String("etcd_servers", "", "Url of etcd servers in the cluster") syncFrequency = flag.Duration("sync_frequency", 10*time.Second, "Max period between synchronizing running containers and config") fileCheckFrequency = flag.Duration("file_check_frequency", 20*time.Second, "Duration between checking file for new data") diff --git a/cmd/localkube/localkube.go b/cmd/localkube/localkube.go index 5f201162194..42f108fcc12 100644 --- a/cmd/localkube/localkube.go +++ b/cmd/localkube/localkube.go @@ -38,7 +38,7 @@ import ( // kubelet flags var ( - file = flag.String("config", "", "Path to the config file") + file = flag.String("config", "", "Path to the config file/dir") syncFrequency = flag.Duration("sync_frequency", 10*time.Second, "Max period between synchronizing running containers and config") fileCheckFrequency = flag.Duration("file_check_frequency", 20*time.Second, "Duration between checking file for new data") httpCheckFrequency = flag.Duration("http_check_frequency", 20*time.Second, "Duration between checking http for new data") diff --git a/hack/local-up.sh b/hack/local-up.sh index ff1ee378772..7cd2b110048 100755 --- a/hack/local-up.sh +++ b/hack/local-up.sh @@ -46,6 +46,6 @@ ETCD_PID=$! sleep 5 echo "Running localkube as root (so it can talk to docker's unix socket)" -sudo $(dirname $0)/../output/go/localkube +sudo $(dirname $0)/../output/go/localkube $* kill $ETCD_PID diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index a1c7dac82d9..25cd9fa3e59 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -17,7 +17,6 @@ limitations under the License. package kubelet import ( - "bytes" "encoding/json" "fmt" "io" @@ -27,6 +26,8 @@ import ( "net/http" "os" "os/exec" + "path/filepath" + "sort" "strconv" "strings" "sync" @@ -79,12 +80,14 @@ type Kubelet struct { // Starts background goroutines. If file, manifest_url, or address are empty, // they are not watched. Never returns. func (kl *Kubelet) RunKubelet(file, manifest_url, etcd_servers, address string, port uint) { - fileChannel := make(chan api.ContainerManifest) + fileChannel := make(chan []api.ContainerManifest) etcdChannel := make(chan []api.ContainerManifest) - httpChannel := make(chan api.ContainerManifest) - serverChannel := make(chan api.ContainerManifest) + httpChannel := make(chan []api.ContainerManifest) + serverChannel := make(chan []api.ContainerManifest) - go util.Forever(func() { kl.WatchFile(file, fileChannel) }, 20*time.Second) + if file != "" { + go util.Forever(func() { kl.WatchFile(file, fileChannel) }, 20*time.Second) + } if manifest_url != "" { go util.Forever(func() { kl.WatchHTTP(manifest_url, httpChannel) }, 20*time.Second) } @@ -363,72 +366,114 @@ func (kl *Kubelet) KillContainer(name string) error { return err } -func (kl *Kubelet) extractFromFile(lastData []byte, name string, changeChannel chan<- api.ContainerManifest) ([]byte, error) { +func (kl *Kubelet) extractFromFile(name string) (api.ContainerManifest, error) { var file *os.File var err error - if file, err = os.Open(name); err != nil { - return lastData, err - } - - return kl.extractFromReader(lastData, file, changeChannel) -} - -func (kl *Kubelet) extractFromReader(lastData []byte, reader io.Reader, changeChannel chan<- api.ContainerManifest) ([]byte, error) { var manifest api.ContainerManifest - data, err := ioutil.ReadAll(reader) + + if file, err = os.Open(name); err != nil { + return manifest, err + } + defer file.Close() + + data, err := ioutil.ReadAll(file) if err != nil { - log.Printf("Couldn't read file: %v", err) - return lastData, err + log.Printf("Couldn't read from file: %v", err) + return manifest, err } if err = kl.ExtractYAMLData(data, &manifest); err != nil { - return lastData, err + return manifest, err } - if !bytes.Equal(lastData, data) { - lastData = data - // Ok, we have a valid configuration, send to channel for - // rejiggering. - changeChannel <- manifest - return data, nil + return manifest, nil +} + +func (kl *Kubelet) extractFromDir(name string) ([]api.ContainerManifest, error) { + var manifests []api.ContainerManifest + + files, err := filepath.Glob(filepath.Join(name, "*")) + if err != nil { + return manifests, err } - return lastData, nil + + sort.Strings(files) + + for _, file := range files { + manifest, err := kl.extractFromFile(file) + if err != nil { + log.Printf("Couldn't read from file %s: %v", file, err) + return manifests, err + } + manifests = append(manifests, manifest) + } + return manifests, nil +} + +func (kl *Kubelet) extractMultipleFromReader(reader io.Reader, changeChannel chan<- []api.ContainerManifest) error { + var manifests []api.ContainerManifest + data, err := ioutil.ReadAll(reader) + if err != nil { + log.Printf("Couldn't read from reader: %v", err) + return err + } + if err = kl.ExtractYAMLData(data, &manifests); err != nil { + return err + } + changeChannel <- manifests + return nil } // Watch a file for changes to the set of pods that should run on this Kubelet // This function loops forever and is intended to be run as a goroutine -func (kl *Kubelet) WatchFile(file string, changeChannel chan<- api.ContainerManifest) { - var lastData []byte +func (kl *Kubelet) WatchFile(file string, changeChannel chan<- []api.ContainerManifest) { for { var err error + time.Sleep(kl.FileCheckFrequency) - lastData, err = kl.extractFromFile(lastData, file, changeChannel) + + fileInfo, err := os.Stat(file) if err != nil { log.Printf("Error polling file: %#v", err) + continue + } + if fileInfo.IsDir() { + manifests, err := kl.extractFromDir(file) + if err != nil { + log.Printf("Error polling dir: %#v", err) + continue + } + changeChannel <- manifests + } else { + manifest, err := kl.extractFromFile(file) + if err != nil { + log.Printf("Error polling file: %#v", err) + continue + } + changeChannel <- []api.ContainerManifest{manifest} } } } -func (kl *Kubelet) extractFromHTTP(lastData []byte, url string, changeChannel chan<- api.ContainerManifest) ([]byte, error) { +func (kl *Kubelet) extractFromHTTP(url string, changeChannel chan<- []api.ContainerManifest) error { client := &http.Client{} request, err := http.NewRequest("GET", url, nil) if err != nil { - return lastData, err + return err } response, err := client.Do(request) if err != nil { - return lastData, err + return err } defer response.Body.Close() - return kl.extractFromReader(lastData, response.Body, changeChannel) + return kl.extractMultipleFromReader(response.Body, changeChannel) } // Watch an HTTP endpoint for changes to the set of pods that should run on this Kubelet // This function runs forever and is intended to be run as a goroutine -func (kl *Kubelet) WatchHTTP(url string, changeChannel chan<- api.ContainerManifest) { - var lastData []byte +func (kl *Kubelet) WatchHTTP(url string, changeChannel chan<- []api.ContainerManifest) { for { var err error time.Sleep(kl.HTTPCheckFrequency) - lastData, err = kl.extractFromHTTP(lastData, url, changeChannel) + err = kl.extractFromHTTP(url, changeChannel) if err != nil { log.Printf("Error syncing http: %#v", err) } @@ -655,27 +700,27 @@ func (kl *Kubelet) SyncManifests(config []api.ContainerManifest) error { } // runSyncLoop is the main loop for processing changes. It watches for changes from -// four channels (file, etcd, server, and http) and creates a union of the two. For +// four channels (file, etcd, server, and http) and creates a union of them. For // any new change seen, will run a sync against desired state and running state. If // no changes are seen to the configuration, will synchronize the last known desired // state every sync_frequency seconds. // Never returns. -func (kl *Kubelet) RunSyncLoop(etcdChannel <-chan []api.ContainerManifest, fileChannel, serverChannel, httpChannel <-chan api.ContainerManifest, handler SyncHandler) { +func (kl *Kubelet) RunSyncLoop(etcdChannel, fileChannel, serverChannel, httpChannel <-chan []api.ContainerManifest, handler SyncHandler) { var lastFile, lastEtcd, lastHttp, lastServer []api.ContainerManifest for { select { - case manifest := <-fileChannel: - log.Printf("Got new manifest from file... %v", manifest) - lastFile = []api.ContainerManifest{manifest} + case manifests := <-fileChannel: + log.Printf("Got new configuration from file/dir... %v", manifests) + lastFile = manifests case manifests := <-etcdChannel: log.Printf("Got new configuration from etcd... %v", manifests) lastEtcd = manifests - case manifest := <-httpChannel: - log.Printf("Got new manifest from external http... %v", manifest) - lastHttp = []api.ContainerManifest{manifest} - case manifest := <-serverChannel: - log.Printf("Got new manifest from our server... %v", manifest) - lastServer = []api.ContainerManifest{manifest} + case manifests := <-httpChannel: + log.Printf("Got new configuration from external http... %v", manifests) + lastHttp = manifests + case manifests := <-serverChannel: + log.Printf("Got new configuration from our server... %v", manifests) + lastServer = manifests case <-time.After(kl.SyncFrequency): } @@ -683,6 +728,7 @@ func (kl *Kubelet) RunSyncLoop(etcdChannel <-chan []api.ContainerManifest, fileC manifests = append(manifests, lastEtcd...) manifests = append(manifests, lastHttp...) manifests = append(manifests, lastServer...) + err := handler.SyncManifests(manifests) if err != nil { log.Printf("Couldn't sync containers : %#v", err) diff --git a/pkg/kubelet/kubelet_server.go b/pkg/kubelet/kubelet_server.go index 04899cee38c..34ae0c9bc42 100644 --- a/pkg/kubelet/kubelet_server.go +++ b/pkg/kubelet/kubelet_server.go @@ -29,7 +29,7 @@ import ( type KubeletServer struct { Kubelet kubeletInterface - UpdateChannel chan api.ContainerManifest + UpdateChannel chan []api.ContainerManifest } // kubeletInterface contains all the kubelet methods required by the server. @@ -52,20 +52,31 @@ func (s *KubeletServer) ServeHTTP(w http.ResponseWriter, req *http.Request) { return } switch { - case u.Path == "/container": + case u.Path == "/container" || u.Path == "/containers": defer req.Body.Close() data, err := ioutil.ReadAll(req.Body) if err != nil { s.error(w, err) return } - var manifest api.ContainerManifest - err = yaml.Unmarshal(data, &manifest) - if err != nil { - s.error(w, err) - return + if u.Path == "/container" { + // This is to provide backward compatibility. It only supports a single manifest + var manifest api.ContainerManifest + err = yaml.Unmarshal(data, &manifest) + if err != nil { + s.error(w, err) + return + } + s.UpdateChannel <- []api.ContainerManifest{manifest} + } else if u.Path == "/containers" { + var manifests []api.ContainerManifest + err = yaml.Unmarshal(data, &manifests) + if err != nil { + s.error(w, err) + return + } + s.UpdateChannel <- manifests } - s.UpdateChannel <- manifest case u.Path == "/containerStats": container := u.Query().Get("container") if len(container) == 0 { diff --git a/pkg/kubelet/kubelet_server_test.go b/pkg/kubelet/kubelet_server_test.go index ba2a3531f46..c8c026d1314 100644 --- a/pkg/kubelet/kubelet_server_test.go +++ b/pkg/kubelet/kubelet_server_test.go @@ -8,7 +8,6 @@ import ( "net/http" "net/http/httptest" "reflect" - "sync" "testing" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" @@ -33,37 +32,9 @@ func (fk *fakeKubelet) GetContainerStats(name string) (*api.ContainerStats, erro return fk.statsFunc(name) } -// If we made everything distribute a list of ContainerManifests, we could just use -// channelReader. -type channelReaderSingle struct { - list []api.ContainerManifest - wg sync.WaitGroup -} - -func startReadingSingle(channel <-chan api.ContainerManifest) *channelReaderSingle { - cr := &channelReaderSingle{} - cr.wg.Add(1) - go func() { - for { - manifest, ok := <-channel - if !ok { - break - } - cr.list = append(cr.list, manifest) - } - cr.wg.Done() - }() - return cr -} - -func (cr *channelReaderSingle) GetList() []api.ContainerManifest { - cr.wg.Wait() - return cr.list -} - type serverTestFramework struct { - updateChan chan api.ContainerManifest - updateReader *channelReaderSingle + updateChan chan []api.ContainerManifest + updateReader *channelReader serverUnderTest *KubeletServer fakeKubelet *fakeKubelet testHttpServer *httptest.Server @@ -71,9 +42,9 @@ type serverTestFramework struct { func makeServerTest() *serverTestFramework { fw := &serverTestFramework{ - updateChan: make(chan api.ContainerManifest), + updateChan: make(chan []api.ContainerManifest), } - fw.updateReader = startReadingSingle(fw.updateChan) + fw.updateReader = startReading(fw.updateChan) fw.fakeKubelet = &fakeKubelet{} fw.serverUnderTest = &KubeletServer{ Kubelet: fw.fakeKubelet, @@ -91,8 +62,10 @@ func readResp(resp *http.Response) (string, error) { func TestContainer(t *testing.T) { fw := makeServerTest() - expected := api.ContainerManifest{Id: "test_manifest"} - body := bytes.NewBuffer([]byte(util.MakeJSONString(expected))) + expected := []api.ContainerManifest{ + {Id: "test_manifest"}, + } + body := bytes.NewBuffer([]byte(util.MakeJSONString(expected[0]))) // Only send a single ContainerManifest resp, err := http.Post(fw.testHttpServer.URL+"/container", "application/json", body) if err != nil { t.Errorf("Post returned: %v", err) @@ -108,6 +81,28 @@ func TestContainer(t *testing.T) { } } +func TestContainers(t *testing.T) { + fw := makeServerTest() + expected := []api.ContainerManifest{ + {Id: "test_manifest_1"}, + {Id: "test_manifest_2"}, + } + body := bytes.NewBuffer([]byte(util.MakeJSONString(expected))) + resp, err := http.Post(fw.testHttpServer.URL+"/containers", "application/json", body) + if err != nil { + t.Errorf("Post returned: %v", err) + } + resp.Body.Close() + close(fw.updateChan) + received := fw.updateReader.GetList() + if len(received) != 1 { + t.Errorf("Expected 1 update, but got %v", len(received)) + } + if !reflect.DeepEqual(expected, received[0]) { + t.Errorf("Expected %#v, but got %#v", expected, received[0]) + } +} + func TestContainerInfo(t *testing.T) { fw := makeServerTest() expected := "good container info string" diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 9116d9ce897..24a3f567380 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -16,7 +16,6 @@ limitations under the License. package kubelet import ( - "bytes" "encoding/json" "fmt" "io/ioutil" @@ -730,164 +729,132 @@ func TestMakePortsAndBindings(t *testing.T) { func TestExtractFromNonExistentFile(t *testing.T) { kubelet := Kubelet{} - changeChannel := make(chan api.ContainerManifest) - lastData := []byte{1, 2, 3} - data, err := kubelet.extractFromFile(lastData, "/some/fake/file", changeChannel) + _, err := kubelet.extractFromFile("/some/fake/file") if err == nil { t.Error("Unexpected non-error.") } - if !bytes.Equal(data, lastData) { - t.Errorf("Unexpected data response. Expected %#v, found %#v", lastData, data) - } } func TestExtractFromBadDataFile(t *testing.T) { kubelet := Kubelet{} - changeChannel := make(chan api.ContainerManifest) - lastData := []byte{1, 2, 3} + + badData := []byte{1, 2, 3} file, err := ioutil.TempFile("", "foo") expectNoError(t, err) name := file.Name() file.Close() - ioutil.WriteFile(name, lastData, 0755) - data, err := kubelet.extractFromFile(lastData, name, changeChannel) + ioutil.WriteFile(name, badData, 0755) + _, err = kubelet.extractFromFile(name) if err == nil { t.Error("Unexpected non-error.") } - if !bytes.Equal(data, lastData) { - t.Errorf("Unexpected data response. Expected %#v, found %#v", lastData, data) - } + } -func TestExtractFromSameDataFile(t *testing.T) { +func TestExtractFromValidDataFile(t *testing.T) { kubelet := Kubelet{} - changeChannel := make(chan api.ContainerManifest) - manifest := api.ContainerManifest{ - Id: "foo", - } - lastData, err := json.Marshal(manifest) + + manifest := api.ContainerManifest{Id: "bar"} + data, err := json.Marshal(manifest) expectNoError(t, err) file, err := ioutil.TempFile("", "foo") expectNoError(t, err) name := file.Name() expectNoError(t, file.Close()) - ioutil.WriteFile(name, lastData, 0755) - data, err := kubelet.extractFromFile(lastData, name, changeChannel) + ioutil.WriteFile(name, data, 0755) + read, err := kubelet.extractFromFile(name) expectNoError(t, err) - if !bytes.Equal(data, lastData) { - t.Errorf("Unexpected data response. Expected %#v, found %#v", lastData, data) + if !reflect.DeepEqual(read, manifest) { + t.Errorf("Unexpected difference. Expected %#v, got %#v", manifest, read) } } -func TestExtractFromChangedDataFile(t *testing.T) { +func TestExtractFromEmptyDir(t *testing.T) { kubelet := Kubelet{} - changeChannel := make(chan api.ContainerManifest) - reader := startReadingSingle(changeChannel) - oldManifest := api.ContainerManifest{ - Id: "foo", - } - newManifest := api.ContainerManifest{ - Id: "bar", - } - lastData, err := json.Marshal(oldManifest) - expectNoError(t, err) - newData, err := json.Marshal(newManifest) - expectNoError(t, err) - file, err := ioutil.TempFile("", "foo") - expectNoError(t, err) - name := file.Name() - expectNoError(t, file.Close()) - ioutil.WriteFile(name, newData, 0755) - data, err := kubelet.extractFromFile(lastData, name, changeChannel) - close(changeChannel) + dirName, err := ioutil.TempDir("", "foo") expectNoError(t, err) - if !bytes.Equal(data, newData) { - t.Errorf("Unexpected data response. Expected %#v, found %#v", lastData, data) + + _, err = kubelet.extractFromDir(dirName) + expectNoError(t, err) +} + +func TestExtractFromDir(t *testing.T) { + kubelet := Kubelet{} + + manifests := []api.ContainerManifest{ + {Id: "aaaa"}, + {Id: "bbbb"}, } - read := reader.GetList() - if len(read) != 1 { - t.Errorf("Unexpected channel traffic: %#v", read) + + dirName, err := ioutil.TempDir("", "foo") + expectNoError(t, err) + + for _, manifest := range manifests { + data, err := json.Marshal(manifest) + expectNoError(t, err) + file, err := ioutil.TempFile(dirName, manifest.Id) + expectNoError(t, err) + name := file.Name() + expectNoError(t, file.Close()) + ioutil.WriteFile(name, data, 0755) } - if !reflect.DeepEqual(read[0], newManifest) { - t.Errorf("Unexpected difference. Expected %#v, got %#v", newManifest, read[0]) + + read, err := kubelet.extractFromDir(dirName) + expectNoError(t, err) + if !reflect.DeepEqual(read, manifests) { + t.Errorf("Unexpected difference. Expected %#v, got %#v", manifests, read) } } func TestExtractFromHttpBadness(t *testing.T) { kubelet := Kubelet{} - lastData := []byte{1, 2, 3} - changeChannel := make(chan api.ContainerManifest) - data, err := kubelet.extractFromHTTP(lastData, "http://localhost:12345", changeChannel) + changeChannel := make(chan []api.ContainerManifest) + reader := startReading(changeChannel) + + err := kubelet.extractFromHTTP("http://localhost:12345", changeChannel) if err == nil { t.Error("Unexpected non-error.") } - if !bytes.Equal(lastData, data) { - t.Errorf("Unexpected difference. Expected: %#v, Saw: %#v", lastData, data) + close(changeChannel) + list := reader.GetList() + + if len(list) != 0 { + t.Errorf("Unexpected list: %#v", list) } } -func TestExtractFromHttpNoChange(t *testing.T) { +func TestExtractFromHttp(t *testing.T) { kubelet := Kubelet{} - changeChannel := make(chan api.ContainerManifest) + changeChannel := make(chan []api.ContainerManifest) + reader := startReading(changeChannel) - manifest := api.ContainerManifest{ - Id: "foo", + manifests := []api.ContainerManifest{ + {Id: "foo"}, } - lastData, err := json.Marshal(manifest) + data, err := json.Marshal(manifests) fakeHandler := util.FakeHandler{ StatusCode: 200, - ResponseBody: string(lastData), + ResponseBody: string(data), } testServer := httptest.NewServer(&fakeHandler) - data, err := kubelet.extractFromHTTP(lastData, testServer.URL, changeChannel) + err = kubelet.extractFromHTTP(testServer.URL, changeChannel) if err != nil { t.Errorf("Unexpected error: %#v", err) } - if !bytes.Equal(lastData, data) { - t.Errorf("Unexpected difference. Expected: %#v, Saw: %#v", lastData, data) - } -} - -func TestExtractFromHttpChanges(t *testing.T) { - kubelet := Kubelet{} - changeChannel := make(chan api.ContainerManifest) - reader := startReadingSingle(changeChannel) - - manifest := api.ContainerManifest{ - Id: "foo", - } - newManifest := api.ContainerManifest{ - Id: "bar", - } - lastData, _ := json.Marshal(manifest) - newData, _ := json.Marshal(newManifest) - fakeHandler := util.FakeHandler{ - StatusCode: 200, - ResponseBody: string(newData), - } - testServer := httptest.NewServer(&fakeHandler) - - data, err := kubelet.extractFromHTTP(lastData, testServer.URL, changeChannel) close(changeChannel) read := reader.GetList() - if err != nil { - t.Errorf("Unexpected error: %#v", err) - } if len(read) != 1 { t.Errorf("Unexpected list: %#v", read) } - if !bytes.Equal(newData, data) { - t.Errorf("Unexpected difference. Expected: %#v, Saw: %#v", lastData, data) - } - if !reflect.DeepEqual(newManifest, read[0]) { - t.Errorf("Unexpected difference. Expected: %#v, Saw: %#v", newManifest, read[0]) + if !reflect.DeepEqual(manifests, read[0]) { + t.Errorf("Unexpected difference. Expected: %#v, Saw: %#v", manifests, read[0]) } }