Merge pull request #173 from jjhuff/multi_manifest

Add config dir support to kubelet
This commit is contained in:
Daniel Smith 2014-06-20 09:44:36 -07:00
commit f6c17b7af6
7 changed files with 207 additions and 188 deletions

View File

@ -33,7 +33,7 @@ import (
) )
var ( var (
file = flag.String("config", "", "Path to the config file") file = flag.String("config", "", "Path to the config file/dir")
etcdServers = flag.String("etcd_servers", "", "Url of etcd servers in the cluster") etcdServers = flag.String("etcd_servers", "", "Url of etcd servers in the cluster")
syncFrequency = flag.Duration("sync_frequency", 10*time.Second, "Max period between synchronizing running containers and config") syncFrequency = flag.Duration("sync_frequency", 10*time.Second, "Max period between synchronizing running containers and config")
fileCheckFrequency = flag.Duration("file_check_frequency", 20*time.Second, "Duration between checking file for new data") fileCheckFrequency = flag.Duration("file_check_frequency", 20*time.Second, "Duration between checking file for new data")

View File

@ -38,7 +38,7 @@ import (
// kubelet flags // kubelet flags
var ( var (
file = flag.String("config", "", "Path to the config file") file = flag.String("config", "", "Path to the config file/dir")
syncFrequency = flag.Duration("sync_frequency", 10*time.Second, "Max period between synchronizing running containers and config") syncFrequency = flag.Duration("sync_frequency", 10*time.Second, "Max period between synchronizing running containers and config")
fileCheckFrequency = flag.Duration("file_check_frequency", 20*time.Second, "Duration between checking file for new data") fileCheckFrequency = flag.Duration("file_check_frequency", 20*time.Second, "Duration between checking file for new data")
httpCheckFrequency = flag.Duration("http_check_frequency", 20*time.Second, "Duration between checking http for new data") httpCheckFrequency = flag.Duration("http_check_frequency", 20*time.Second, "Duration between checking http for new data")

View File

@ -46,6 +46,6 @@ ETCD_PID=$!
sleep 5 sleep 5
echo "Running localkube as root (so it can talk to docker's unix socket)" echo "Running localkube as root (so it can talk to docker's unix socket)"
sudo $(dirname $0)/../output/go/localkube sudo $(dirname $0)/../output/go/localkube $*
kill $ETCD_PID kill $ETCD_PID

View File

@ -17,7 +17,6 @@ limitations under the License.
package kubelet package kubelet
import ( import (
"bytes"
"encoding/json" "encoding/json"
"fmt" "fmt"
"io" "io"
@ -27,6 +26,8 @@ import (
"net/http" "net/http"
"os" "os"
"os/exec" "os/exec"
"path/filepath"
"sort"
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
@ -79,12 +80,14 @@ type Kubelet struct {
// Starts background goroutines. If file, manifest_url, or address are empty, // Starts background goroutines. If file, manifest_url, or address are empty,
// they are not watched. Never returns. // they are not watched. Never returns.
func (kl *Kubelet) RunKubelet(file, manifest_url, etcd_servers, address string, port uint) { func (kl *Kubelet) RunKubelet(file, manifest_url, etcd_servers, address string, port uint) {
fileChannel := make(chan api.ContainerManifest) fileChannel := make(chan []api.ContainerManifest)
etcdChannel := make(chan []api.ContainerManifest) etcdChannel := make(chan []api.ContainerManifest)
httpChannel := make(chan api.ContainerManifest) httpChannel := make(chan []api.ContainerManifest)
serverChannel := make(chan api.ContainerManifest) serverChannel := make(chan []api.ContainerManifest)
go util.Forever(func() { kl.WatchFile(file, fileChannel) }, 20*time.Second) if file != "" {
go util.Forever(func() { kl.WatchFile(file, fileChannel) }, 20*time.Second)
}
if manifest_url != "" { if manifest_url != "" {
go util.Forever(func() { kl.WatchHTTP(manifest_url, httpChannel) }, 20*time.Second) go util.Forever(func() { kl.WatchHTTP(manifest_url, httpChannel) }, 20*time.Second)
} }
@ -363,72 +366,114 @@ func (kl *Kubelet) KillContainer(name string) error {
return err return err
} }
func (kl *Kubelet) extractFromFile(lastData []byte, name string, changeChannel chan<- api.ContainerManifest) ([]byte, error) { func (kl *Kubelet) extractFromFile(name string) (api.ContainerManifest, error) {
var file *os.File var file *os.File
var err error var err error
if file, err = os.Open(name); err != nil {
return lastData, err
}
return kl.extractFromReader(lastData, file, changeChannel)
}
func (kl *Kubelet) extractFromReader(lastData []byte, reader io.Reader, changeChannel chan<- api.ContainerManifest) ([]byte, error) {
var manifest api.ContainerManifest var manifest api.ContainerManifest
data, err := ioutil.ReadAll(reader)
if file, err = os.Open(name); err != nil {
return manifest, err
}
defer file.Close()
data, err := ioutil.ReadAll(file)
if err != nil { if err != nil {
log.Printf("Couldn't read file: %v", err) log.Printf("Couldn't read from file: %v", err)
return lastData, err return manifest, err
} }
if err = kl.ExtractYAMLData(data, &manifest); err != nil { if err = kl.ExtractYAMLData(data, &manifest); err != nil {
return lastData, err return manifest, err
} }
if !bytes.Equal(lastData, data) { return manifest, nil
lastData = data }
// Ok, we have a valid configuration, send to channel for
// rejiggering. func (kl *Kubelet) extractFromDir(name string) ([]api.ContainerManifest, error) {
changeChannel <- manifest var manifests []api.ContainerManifest
return data, nil
files, err := filepath.Glob(filepath.Join(name, "*"))
if err != nil {
return manifests, err
} }
return lastData, nil
sort.Strings(files)
for _, file := range files {
manifest, err := kl.extractFromFile(file)
if err != nil {
log.Printf("Couldn't read from file %s: %v", file, err)
return manifests, err
}
manifests = append(manifests, manifest)
}
return manifests, nil
}
func (kl *Kubelet) extractMultipleFromReader(reader io.Reader, changeChannel chan<- []api.ContainerManifest) error {
var manifests []api.ContainerManifest
data, err := ioutil.ReadAll(reader)
if err != nil {
log.Printf("Couldn't read from reader: %v", err)
return err
}
if err = kl.ExtractYAMLData(data, &manifests); err != nil {
return err
}
changeChannel <- manifests
return nil
} }
// Watch a file for changes to the set of pods that should run on this Kubelet // Watch a file for changes to the set of pods that should run on this Kubelet
// This function loops forever and is intended to be run as a goroutine // This function loops forever and is intended to be run as a goroutine
func (kl *Kubelet) WatchFile(file string, changeChannel chan<- api.ContainerManifest) { func (kl *Kubelet) WatchFile(file string, changeChannel chan<- []api.ContainerManifest) {
var lastData []byte
for { for {
var err error var err error
time.Sleep(kl.FileCheckFrequency) time.Sleep(kl.FileCheckFrequency)
lastData, err = kl.extractFromFile(lastData, file, changeChannel)
fileInfo, err := os.Stat(file)
if err != nil { if err != nil {
log.Printf("Error polling file: %#v", err) log.Printf("Error polling file: %#v", err)
continue
}
if fileInfo.IsDir() {
manifests, err := kl.extractFromDir(file)
if err != nil {
log.Printf("Error polling dir: %#v", err)
continue
}
changeChannel <- manifests
} else {
manifest, err := kl.extractFromFile(file)
if err != nil {
log.Printf("Error polling file: %#v", err)
continue
}
changeChannel <- []api.ContainerManifest{manifest}
} }
} }
} }
func (kl *Kubelet) extractFromHTTP(lastData []byte, url string, changeChannel chan<- api.ContainerManifest) ([]byte, error) { func (kl *Kubelet) extractFromHTTP(url string, changeChannel chan<- []api.ContainerManifest) error {
client := &http.Client{} client := &http.Client{}
request, err := http.NewRequest("GET", url, nil) request, err := http.NewRequest("GET", url, nil)
if err != nil { if err != nil {
return lastData, err return err
} }
response, err := client.Do(request) response, err := client.Do(request)
if err != nil { if err != nil {
return lastData, err return err
} }
defer response.Body.Close() defer response.Body.Close()
return kl.extractFromReader(lastData, response.Body, changeChannel) return kl.extractMultipleFromReader(response.Body, changeChannel)
} }
// Watch an HTTP endpoint for changes to the set of pods that should run on this Kubelet // Watch an HTTP endpoint for changes to the set of pods that should run on this Kubelet
// This function runs forever and is intended to be run as a goroutine // This function runs forever and is intended to be run as a goroutine
func (kl *Kubelet) WatchHTTP(url string, changeChannel chan<- api.ContainerManifest) { func (kl *Kubelet) WatchHTTP(url string, changeChannel chan<- []api.ContainerManifest) {
var lastData []byte
for { for {
var err error var err error
time.Sleep(kl.HTTPCheckFrequency) time.Sleep(kl.HTTPCheckFrequency)
lastData, err = kl.extractFromHTTP(lastData, url, changeChannel) err = kl.extractFromHTTP(url, changeChannel)
if err != nil { if err != nil {
log.Printf("Error syncing http: %#v", err) log.Printf("Error syncing http: %#v", err)
} }
@ -655,27 +700,27 @@ func (kl *Kubelet) SyncManifests(config []api.ContainerManifest) error {
} }
// runSyncLoop is the main loop for processing changes. It watches for changes from // runSyncLoop is the main loop for processing changes. It watches for changes from
// four channels (file, etcd, server, and http) and creates a union of the two. For // four channels (file, etcd, server, and http) and creates a union of them. For
// any new change seen, will run a sync against desired state and running state. If // any new change seen, will run a sync against desired state and running state. If
// no changes are seen to the configuration, will synchronize the last known desired // no changes are seen to the configuration, will synchronize the last known desired
// state every sync_frequency seconds. // state every sync_frequency seconds.
// Never returns. // Never returns.
func (kl *Kubelet) RunSyncLoop(etcdChannel <-chan []api.ContainerManifest, fileChannel, serverChannel, httpChannel <-chan api.ContainerManifest, handler SyncHandler) { func (kl *Kubelet) RunSyncLoop(etcdChannel, fileChannel, serverChannel, httpChannel <-chan []api.ContainerManifest, handler SyncHandler) {
var lastFile, lastEtcd, lastHttp, lastServer []api.ContainerManifest var lastFile, lastEtcd, lastHttp, lastServer []api.ContainerManifest
for { for {
select { select {
case manifest := <-fileChannel: case manifests := <-fileChannel:
log.Printf("Got new manifest from file... %v", manifest) log.Printf("Got new configuration from file/dir... %v", manifests)
lastFile = []api.ContainerManifest{manifest} lastFile = manifests
case manifests := <-etcdChannel: case manifests := <-etcdChannel:
log.Printf("Got new configuration from etcd... %v", manifests) log.Printf("Got new configuration from etcd... %v", manifests)
lastEtcd = manifests lastEtcd = manifests
case manifest := <-httpChannel: case manifests := <-httpChannel:
log.Printf("Got new manifest from external http... %v", manifest) log.Printf("Got new configuration from external http... %v", manifests)
lastHttp = []api.ContainerManifest{manifest} lastHttp = manifests
case manifest := <-serverChannel: case manifests := <-serverChannel:
log.Printf("Got new manifest from our server... %v", manifest) log.Printf("Got new configuration from our server... %v", manifests)
lastServer = []api.ContainerManifest{manifest} lastServer = manifests
case <-time.After(kl.SyncFrequency): case <-time.After(kl.SyncFrequency):
} }
@ -683,6 +728,7 @@ func (kl *Kubelet) RunSyncLoop(etcdChannel <-chan []api.ContainerManifest, fileC
manifests = append(manifests, lastEtcd...) manifests = append(manifests, lastEtcd...)
manifests = append(manifests, lastHttp...) manifests = append(manifests, lastHttp...)
manifests = append(manifests, lastServer...) manifests = append(manifests, lastServer...)
err := handler.SyncManifests(manifests) err := handler.SyncManifests(manifests)
if err != nil { if err != nil {
log.Printf("Couldn't sync containers : %#v", err) log.Printf("Couldn't sync containers : %#v", err)

View File

@ -29,7 +29,7 @@ import (
type KubeletServer struct { type KubeletServer struct {
Kubelet kubeletInterface Kubelet kubeletInterface
UpdateChannel chan api.ContainerManifest UpdateChannel chan []api.ContainerManifest
} }
// kubeletInterface contains all the kubelet methods required by the server. // kubeletInterface contains all the kubelet methods required by the server.
@ -52,20 +52,31 @@ func (s *KubeletServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
return return
} }
switch { switch {
case u.Path == "/container": case u.Path == "/container" || u.Path == "/containers":
defer req.Body.Close() defer req.Body.Close()
data, err := ioutil.ReadAll(req.Body) data, err := ioutil.ReadAll(req.Body)
if err != nil { if err != nil {
s.error(w, err) s.error(w, err)
return return
} }
var manifest api.ContainerManifest if u.Path == "/container" {
err = yaml.Unmarshal(data, &manifest) // This is to provide backward compatibility. It only supports a single manifest
if err != nil { var manifest api.ContainerManifest
s.error(w, err) err = yaml.Unmarshal(data, &manifest)
return if err != nil {
s.error(w, err)
return
}
s.UpdateChannel <- []api.ContainerManifest{manifest}
} else if u.Path == "/containers" {
var manifests []api.ContainerManifest
err = yaml.Unmarshal(data, &manifests)
if err != nil {
s.error(w, err)
return
}
s.UpdateChannel <- manifests
} }
s.UpdateChannel <- manifest
case u.Path == "/containerStats": case u.Path == "/containerStats":
container := u.Query().Get("container") container := u.Query().Get("container")
if len(container) == 0 { if len(container) == 0 {

View File

@ -8,7 +8,6 @@ import (
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
"reflect" "reflect"
"sync"
"testing" "testing"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
@ -33,37 +32,9 @@ func (fk *fakeKubelet) GetContainerStats(name string) (*api.ContainerStats, erro
return fk.statsFunc(name) return fk.statsFunc(name)
} }
// If we made everything distribute a list of ContainerManifests, we could just use
// channelReader.
type channelReaderSingle struct {
list []api.ContainerManifest
wg sync.WaitGroup
}
func startReadingSingle(channel <-chan api.ContainerManifest) *channelReaderSingle {
cr := &channelReaderSingle{}
cr.wg.Add(1)
go func() {
for {
manifest, ok := <-channel
if !ok {
break
}
cr.list = append(cr.list, manifest)
}
cr.wg.Done()
}()
return cr
}
func (cr *channelReaderSingle) GetList() []api.ContainerManifest {
cr.wg.Wait()
return cr.list
}
type serverTestFramework struct { type serverTestFramework struct {
updateChan chan api.ContainerManifest updateChan chan []api.ContainerManifest
updateReader *channelReaderSingle updateReader *channelReader
serverUnderTest *KubeletServer serverUnderTest *KubeletServer
fakeKubelet *fakeKubelet fakeKubelet *fakeKubelet
testHttpServer *httptest.Server testHttpServer *httptest.Server
@ -71,9 +42,9 @@ type serverTestFramework struct {
func makeServerTest() *serverTestFramework { func makeServerTest() *serverTestFramework {
fw := &serverTestFramework{ fw := &serverTestFramework{
updateChan: make(chan api.ContainerManifest), updateChan: make(chan []api.ContainerManifest),
} }
fw.updateReader = startReadingSingle(fw.updateChan) fw.updateReader = startReading(fw.updateChan)
fw.fakeKubelet = &fakeKubelet{} fw.fakeKubelet = &fakeKubelet{}
fw.serverUnderTest = &KubeletServer{ fw.serverUnderTest = &KubeletServer{
Kubelet: fw.fakeKubelet, Kubelet: fw.fakeKubelet,
@ -91,8 +62,10 @@ func readResp(resp *http.Response) (string, error) {
func TestContainer(t *testing.T) { func TestContainer(t *testing.T) {
fw := makeServerTest() fw := makeServerTest()
expected := api.ContainerManifest{Id: "test_manifest"} expected := []api.ContainerManifest{
body := bytes.NewBuffer([]byte(util.MakeJSONString(expected))) {Id: "test_manifest"},
}
body := bytes.NewBuffer([]byte(util.MakeJSONString(expected[0]))) // Only send a single ContainerManifest
resp, err := http.Post(fw.testHttpServer.URL+"/container", "application/json", body) resp, err := http.Post(fw.testHttpServer.URL+"/container", "application/json", body)
if err != nil { if err != nil {
t.Errorf("Post returned: %v", err) t.Errorf("Post returned: %v", err)
@ -108,6 +81,28 @@ func TestContainer(t *testing.T) {
} }
} }
func TestContainers(t *testing.T) {
fw := makeServerTest()
expected := []api.ContainerManifest{
{Id: "test_manifest_1"},
{Id: "test_manifest_2"},
}
body := bytes.NewBuffer([]byte(util.MakeJSONString(expected)))
resp, err := http.Post(fw.testHttpServer.URL+"/containers", "application/json", body)
if err != nil {
t.Errorf("Post returned: %v", err)
}
resp.Body.Close()
close(fw.updateChan)
received := fw.updateReader.GetList()
if len(received) != 1 {
t.Errorf("Expected 1 update, but got %v", len(received))
}
if !reflect.DeepEqual(expected, received[0]) {
t.Errorf("Expected %#v, but got %#v", expected, received[0])
}
}
func TestContainerInfo(t *testing.T) { func TestContainerInfo(t *testing.T) {
fw := makeServerTest() fw := makeServerTest()
expected := "good container info string" expected := "good container info string"

View File

@ -16,7 +16,6 @@ limitations under the License.
package kubelet package kubelet
import ( import (
"bytes"
"encoding/json" "encoding/json"
"fmt" "fmt"
"io/ioutil" "io/ioutil"
@ -730,164 +729,132 @@ func TestMakePortsAndBindings(t *testing.T) {
func TestExtractFromNonExistentFile(t *testing.T) { func TestExtractFromNonExistentFile(t *testing.T) {
kubelet := Kubelet{} kubelet := Kubelet{}
changeChannel := make(chan api.ContainerManifest) _, err := kubelet.extractFromFile("/some/fake/file")
lastData := []byte{1, 2, 3}
data, err := kubelet.extractFromFile(lastData, "/some/fake/file", changeChannel)
if err == nil { if err == nil {
t.Error("Unexpected non-error.") t.Error("Unexpected non-error.")
} }
if !bytes.Equal(data, lastData) {
t.Errorf("Unexpected data response. Expected %#v, found %#v", lastData, data)
}
} }
func TestExtractFromBadDataFile(t *testing.T) { func TestExtractFromBadDataFile(t *testing.T) {
kubelet := Kubelet{} kubelet := Kubelet{}
changeChannel := make(chan api.ContainerManifest)
lastData := []byte{1, 2, 3} badData := []byte{1, 2, 3}
file, err := ioutil.TempFile("", "foo") file, err := ioutil.TempFile("", "foo")
expectNoError(t, err) expectNoError(t, err)
name := file.Name() name := file.Name()
file.Close() file.Close()
ioutil.WriteFile(name, lastData, 0755) ioutil.WriteFile(name, badData, 0755)
data, err := kubelet.extractFromFile(lastData, name, changeChannel) _, err = kubelet.extractFromFile(name)
if err == nil { if err == nil {
t.Error("Unexpected non-error.") t.Error("Unexpected non-error.")
} }
if !bytes.Equal(data, lastData) {
t.Errorf("Unexpected data response. Expected %#v, found %#v", lastData, data)
}
} }
func TestExtractFromSameDataFile(t *testing.T) { func TestExtractFromValidDataFile(t *testing.T) {
kubelet := Kubelet{} kubelet := Kubelet{}
changeChannel := make(chan api.ContainerManifest)
manifest := api.ContainerManifest{ manifest := api.ContainerManifest{Id: "bar"}
Id: "foo", data, err := json.Marshal(manifest)
}
lastData, err := json.Marshal(manifest)
expectNoError(t, err) expectNoError(t, err)
file, err := ioutil.TempFile("", "foo") file, err := ioutil.TempFile("", "foo")
expectNoError(t, err) expectNoError(t, err)
name := file.Name() name := file.Name()
expectNoError(t, file.Close()) expectNoError(t, file.Close())
ioutil.WriteFile(name, lastData, 0755) ioutil.WriteFile(name, data, 0755)
data, err := kubelet.extractFromFile(lastData, name, changeChannel)
read, err := kubelet.extractFromFile(name)
expectNoError(t, err) expectNoError(t, err)
if !bytes.Equal(data, lastData) { if !reflect.DeepEqual(read, manifest) {
t.Errorf("Unexpected data response. Expected %#v, found %#v", lastData, data) t.Errorf("Unexpected difference. Expected %#v, got %#v", manifest, read)
} }
} }
func TestExtractFromChangedDataFile(t *testing.T) { func TestExtractFromEmptyDir(t *testing.T) {
kubelet := Kubelet{} kubelet := Kubelet{}
changeChannel := make(chan api.ContainerManifest)
reader := startReadingSingle(changeChannel)
oldManifest := api.ContainerManifest{
Id: "foo",
}
newManifest := api.ContainerManifest{
Id: "bar",
}
lastData, err := json.Marshal(oldManifest)
expectNoError(t, err)
newData, err := json.Marshal(newManifest)
expectNoError(t, err)
file, err := ioutil.TempFile("", "foo")
expectNoError(t, err)
name := file.Name()
expectNoError(t, file.Close())
ioutil.WriteFile(name, newData, 0755)
data, err := kubelet.extractFromFile(lastData, name, changeChannel)
close(changeChannel)
dirName, err := ioutil.TempDir("", "foo")
expectNoError(t, err) expectNoError(t, err)
if !bytes.Equal(data, newData) {
t.Errorf("Unexpected data response. Expected %#v, found %#v", lastData, data) _, err = kubelet.extractFromDir(dirName)
expectNoError(t, err)
}
func TestExtractFromDir(t *testing.T) {
kubelet := Kubelet{}
manifests := []api.ContainerManifest{
{Id: "aaaa"},
{Id: "bbbb"},
} }
read := reader.GetList()
if len(read) != 1 { dirName, err := ioutil.TempDir("", "foo")
t.Errorf("Unexpected channel traffic: %#v", read) expectNoError(t, err)
for _, manifest := range manifests {
data, err := json.Marshal(manifest)
expectNoError(t, err)
file, err := ioutil.TempFile(dirName, manifest.Id)
expectNoError(t, err)
name := file.Name()
expectNoError(t, file.Close())
ioutil.WriteFile(name, data, 0755)
} }
if !reflect.DeepEqual(read[0], newManifest) {
t.Errorf("Unexpected difference. Expected %#v, got %#v", newManifest, read[0]) read, err := kubelet.extractFromDir(dirName)
expectNoError(t, err)
if !reflect.DeepEqual(read, manifests) {
t.Errorf("Unexpected difference. Expected %#v, got %#v", manifests, read)
} }
} }
func TestExtractFromHttpBadness(t *testing.T) { func TestExtractFromHttpBadness(t *testing.T) {
kubelet := Kubelet{} kubelet := Kubelet{}
lastData := []byte{1, 2, 3} changeChannel := make(chan []api.ContainerManifest)
changeChannel := make(chan api.ContainerManifest) reader := startReading(changeChannel)
data, err := kubelet.extractFromHTTP(lastData, "http://localhost:12345", changeChannel)
err := kubelet.extractFromHTTP("http://localhost:12345", changeChannel)
if err == nil { if err == nil {
t.Error("Unexpected non-error.") t.Error("Unexpected non-error.")
} }
if !bytes.Equal(lastData, data) { close(changeChannel)
t.Errorf("Unexpected difference. Expected: %#v, Saw: %#v", lastData, data) list := reader.GetList()
if len(list) != 0 {
t.Errorf("Unexpected list: %#v", list)
} }
} }
func TestExtractFromHttpNoChange(t *testing.T) { func TestExtractFromHttp(t *testing.T) {
kubelet := Kubelet{} kubelet := Kubelet{}
changeChannel := make(chan api.ContainerManifest) changeChannel := make(chan []api.ContainerManifest)
reader := startReading(changeChannel)
manifest := api.ContainerManifest{ manifests := []api.ContainerManifest{
Id: "foo", {Id: "foo"},
} }
lastData, err := json.Marshal(manifest) data, err := json.Marshal(manifests)
fakeHandler := util.FakeHandler{ fakeHandler := util.FakeHandler{
StatusCode: 200, StatusCode: 200,
ResponseBody: string(lastData), ResponseBody: string(data),
} }
testServer := httptest.NewServer(&fakeHandler) testServer := httptest.NewServer(&fakeHandler)
data, err := kubelet.extractFromHTTP(lastData, testServer.URL, changeChannel) err = kubelet.extractFromHTTP(testServer.URL, changeChannel)
if err != nil { if err != nil {
t.Errorf("Unexpected error: %#v", err) t.Errorf("Unexpected error: %#v", err)
} }
if !bytes.Equal(lastData, data) {
t.Errorf("Unexpected difference. Expected: %#v, Saw: %#v", lastData, data)
}
}
func TestExtractFromHttpChanges(t *testing.T) {
kubelet := Kubelet{}
changeChannel := make(chan api.ContainerManifest)
reader := startReadingSingle(changeChannel)
manifest := api.ContainerManifest{
Id: "foo",
}
newManifest := api.ContainerManifest{
Id: "bar",
}
lastData, _ := json.Marshal(manifest)
newData, _ := json.Marshal(newManifest)
fakeHandler := util.FakeHandler{
StatusCode: 200,
ResponseBody: string(newData),
}
testServer := httptest.NewServer(&fakeHandler)
data, err := kubelet.extractFromHTTP(lastData, testServer.URL, changeChannel)
close(changeChannel) close(changeChannel)
read := reader.GetList() read := reader.GetList()
if err != nil {
t.Errorf("Unexpected error: %#v", err)
}
if len(read) != 1 { if len(read) != 1 {
t.Errorf("Unexpected list: %#v", read) t.Errorf("Unexpected list: %#v", read)
} }
if !bytes.Equal(newData, data) { if !reflect.DeepEqual(manifests, read[0]) {
t.Errorf("Unexpected difference. Expected: %#v, Saw: %#v", lastData, data) t.Errorf("Unexpected difference. Expected: %#v, Saw: %#v", manifests, read[0])
}
if !reflect.DeepEqual(newManifest, read[0]) {
t.Errorf("Unexpected difference. Expected: %#v, Saw: %#v", newManifest, read[0])
} }
} }