From 09294b90ce8dae05f982da05c6b82e9077a18e3f Mon Sep 17 00:00:00 2001 From: Clayton Coleman Date: Tue, 15 Jul 2014 10:52:39 -0400 Subject: [PATCH 1/2] Refactor Kubelet config sources for clarity Create a new "Pod" concept which can identify pods from many config sources. --- pkg/api/validation.go | 3 - pkg/api/validation_test.go | 7 +- pkg/kubelet/config/config.go | 304 ++++++++++++++++++++++++++++++ pkg/kubelet/config/config_test.go | 212 +++++++++++++++++++++ pkg/kubelet/config/etcd.go | 140 ++++++++++++++ pkg/kubelet/config/etcd_test.go | 147 +++++++++++++++ pkg/kubelet/config/file.go | 149 +++++++++++++++ pkg/kubelet/config/file_test.go | 211 +++++++++++++++++++++ pkg/kubelet/config/http.go | 117 ++++++++++++ pkg/kubelet/config/http_test.go | 124 ++++++++++++ pkg/kubelet/types.go | 60 ++++++ pkg/kubelet/validation.go | 36 ++++ pkg/kubelet/validation_test.go | 42 +++++ pkg/tools/etcd_tools.go | 5 + pkg/tools/fake_etcd_client.go | 4 + 15 files changed, 1553 insertions(+), 8 deletions(-) create mode 100644 pkg/kubelet/config/config.go create mode 100644 pkg/kubelet/config/config_test.go create mode 100644 pkg/kubelet/config/etcd.go create mode 100644 pkg/kubelet/config/etcd_test.go create mode 100644 pkg/kubelet/config/file.go create mode 100644 pkg/kubelet/config/file_test.go create mode 100644 pkg/kubelet/config/http.go create mode 100644 pkg/kubelet/config/http_test.go create mode 100644 pkg/kubelet/types.go create mode 100644 pkg/kubelet/validation.go create mode 100644 pkg/kubelet/validation_test.go diff --git a/pkg/api/validation.go b/pkg/api/validation.go index 8b7c76609c3..ecef6e88985 100644 --- a/pkg/api/validation.go +++ b/pkg/api/validation.go @@ -272,9 +272,6 @@ func ValidateManifest(manifest *ContainerManifest) []error { } else if !supportedManifestVersions.Has(strings.ToLower(manifest.Version)) { allErrs.Append(makeNotSupportedError("ContainerManifest.Version", manifest.Version)) } - if !util.IsDNSSubdomain(manifest.ID) { - allErrs.Append(makeInvalidError("ContainerManifest.ID", manifest.ID)) - } allVolumes, errs := validateVolumes(manifest.Volumes) if len(errs) != 0 { allErrs.Append(errs...) diff --git a/pkg/api/validation_test.go b/pkg/api/validation_test.go index 5a82be65081..6916db9da51 100644 --- a/pkg/api/validation_test.go +++ b/pkg/api/validation_test.go @@ -242,11 +242,8 @@ func TestValidateManifest(t *testing.T) { } errorCases := map[string]ContainerManifest{ - "empty version": {Version: "", ID: "abc"}, - "invalid version": {Version: "bogus", ID: "abc"}, - "zero-length id": {Version: "v1beta1", ID: ""}, - "id > 255 characters": {Version: "v1beta1", ID: strings.Repeat("a", 256)}, - "id not a DNS subdomain": {Version: "v1beta1", ID: "a.b.c."}, + "empty version": {Version: "", ID: "abc"}, + "invalid version": {Version: "bogus", ID: "abc"}, "invalid volume name": { Version: "v1beta1", ID: "abc", diff --git a/pkg/kubelet/config/config.go b/pkg/kubelet/config/config.go new file mode 100644 index 00000000000..33206edce1a --- /dev/null +++ b/pkg/kubelet/config/config.go @@ -0,0 +1,304 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package config + +import ( + "fmt" + "reflect" + "sync" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util/config" + "github.com/golang/glog" +) + +// PodConfigListener receives notifications for changes to a configuration. +type PodConfigListener interface { + // OnUpdate is invoked when the kubelet.Pod configuration has been changed by one of + // the sources. The update is properly normalized to remove duplicates. + OnUpdate(pod kubelet.PodUpdate) +} + +// ListenerFunc implements the PodConfigListener interface +type ListenerFunc func(update kubelet.PodUpdate) + +func (h ListenerFunc) OnUpdate(update kubelet.PodUpdate) { + h(update) +} + +// PodConfigNotificationMode describes how changes are sent to the update channel +type PodConfigNotificationMode int + +const ( + // PodConfigNotificationSnapshot delivers the full configuration as a SET whenever + // any change occurs + PodConfigNotificationSnapshot = iota + // PodConfigNotificationSetsAndUpdates delivers an UPDATE message whenever pods are + // changed, and a SET message if there are any additions or removals. + PodConfigNotificationSnapshotAndUpdates + // PodConfigNotificationIncremental delivers ADD, UPDATE, and REMOVE to the update channel + PodConfigNotificationIncremental +) + +// PodConfig is a configuration mux that merges many sources of pod configuration into a single +// consistent structure, and then delivers incremental change notifications to listeners +// in order. +type PodConfig struct { + pods *podStorage + mux *config.Mux + + // the channel of denormalized changes passed to listeners + updates chan kubelet.PodUpdate +} + +// NewPodConfig creates an object that can merge many configuration sources into a stream +// of normalized updates to a pod configuration. +func NewPodConfig(mode PodConfigNotificationMode) *PodConfig { + updates := make(chan kubelet.PodUpdate, 1) + storage := newPodStorage(updates, mode) + podConfig := &PodConfig{ + pods: storage, + mux: config.NewMux(storage), + updates: updates, + } + return podConfig +} + +// Channel creates or returns a config source channel. The channel +// only accepts PodUpdates +func (c *PodConfig) Channel(source string) chan<- interface{} { + return c.mux.Channel(source) +} + +// Updates returns a channel of updates to the configuration, properly denormalized. +func (c *PodConfig) Updates() <-chan kubelet.PodUpdate { + return c.updates +} + +// Sync requests the full configuration be delivered to the update channel. +func (c *PodConfig) Sync() { + c.pods.Sync() +} + +// podStorage manages the current pod state at any point in time and ensures updates +// to the channel are delivered in order. Note that this object is an in-memory source of +// "truth" and on creation contains zero entries. Once all previously read sources are +// available, then this object should be considered authoritative. +type podStorage struct { + podLock sync.RWMutex + // map of source name to pod name to pod reference + pods map[string]map[string]*kubelet.Pod + mode PodConfigNotificationMode + + // ensures that updates are delivered in strict order + // on the updates channel + updateLock sync.Mutex + updates chan<- kubelet.PodUpdate +} + +// TODO: PodConfigNotificationMode could be handled by a listener to the updates channel +// in the future, especially with multiple listeners. +// TODO: allow initialization of the current state of the store with snapshotted version. +func newPodStorage(updates chan<- kubelet.PodUpdate, mode PodConfigNotificationMode) *podStorage { + return &podStorage{ + pods: make(map[string]map[string]*kubelet.Pod), + mode: mode, + updates: updates, + } +} + +// Merge normalizes a set of incoming changes from different sources into a map of all Pods +// and ensures that redundant changes are filtered out, and then pushes zero or more minimal +// updates onto the update channel. Ensures that updates are delivered in order. +func (s *podStorage) Merge(source string, change interface{}) error { + s.updateLock.Lock() + defer s.updateLock.Unlock() + + adds, updates, deletes := s.merge(source, change) + + // deliver update notifications + switch s.mode { + case PodConfigNotificationIncremental: + if len(deletes.Pods) > 0 { + s.updates <- *deletes + } + if len(adds.Pods) > 0 { + s.updates <- *adds + } + if len(updates.Pods) > 0 { + s.updates <- *updates + } + + case PodConfigNotificationSnapshotAndUpdates: + if len(updates.Pods) > 0 { + s.updates <- *updates + } + if len(deletes.Pods) > 0 || len(adds.Pods) > 0 { + s.updates <- kubelet.PodUpdate{s.MergedState().([]kubelet.Pod), kubelet.SET} + } + + case PodConfigNotificationSnapshot: + if len(updates.Pods) > 0 || len(deletes.Pods) > 0 || len(adds.Pods) > 0 { + s.updates <- kubelet.PodUpdate{s.MergedState().([]kubelet.Pod), kubelet.SET} + } + + default: + panic(fmt.Sprintf("unsupported PodConfigNotificationMode: %#v", s.mode)) + } + + return nil +} + +func (s *podStorage) merge(source string, change interface{}) (adds, updates, deletes *kubelet.PodUpdate) { + s.podLock.Lock() + defer s.podLock.Unlock() + + adds = &kubelet.PodUpdate{Op: kubelet.ADD} + updates = &kubelet.PodUpdate{Op: kubelet.UPDATE} + deletes = &kubelet.PodUpdate{Op: kubelet.REMOVE} + + pods := s.pods[source] + if pods == nil { + pods = make(map[string]*kubelet.Pod) + } + + update := change.(kubelet.PodUpdate) + switch update.Op { + case kubelet.ADD, kubelet.UPDATE: + if update.Op == kubelet.ADD { + glog.Infof("Adding new pods from source %s : %v", source, update.Pods) + } else { + glog.Infof("Updating pods from source %s : %v", source, update.Pods) + } + + filtered := filterInvalidPods(update.Pods, source) + for _, ref := range filtered { + name := ref.Name + if existing, found := pods[name]; found { + if !reflect.DeepEqual(existing.Manifest, ref.Manifest) { + // this is an update + existing.Manifest = ref.Manifest + updates.Pods = append(updates.Pods, *existing) + continue + } + // this is a no-op + continue + } + // this is an add + ref.Namespace = source + pods[name] = ref + adds.Pods = append(adds.Pods, *ref) + } + + case kubelet.REMOVE: + glog.Infof("Removing a pod %v", update) + for _, value := range update.Pods { + name := value.Name + if existing, found := pods[name]; found { + // this is a delete + delete(pods, name) + deletes.Pods = append(deletes.Pods, *existing) + continue + } + // this is a no-op + } + + case kubelet.SET: + glog.Infof("Setting pods for source %s : %v", source, update) + // Clear the old map entries by just creating a new map + oldPods := pods + pods = make(map[string]*kubelet.Pod) + + filtered := filterInvalidPods(update.Pods, source) + for _, ref := range filtered { + name := ref.Name + if existing, found := oldPods[name]; found { + pods[name] = existing + if !reflect.DeepEqual(existing.Manifest, ref.Manifest) { + // this is an update + existing.Manifest = ref.Manifest + updates.Pods = append(updates.Pods, *existing) + continue + } + // this is a no-op + continue + } + ref.Namespace = source + pods[name] = ref + adds.Pods = append(adds.Pods, *ref) + } + + for name, existing := range oldPods { + if _, found := pods[name]; !found { + // this is a delete + deletes.Pods = append(deletes.Pods, *existing) + } + } + + default: + glog.Infof("Received invalid update type: %v", update) + + } + + s.pods[source] = pods + return adds, updates, deletes +} + +func filterInvalidPods(pods []kubelet.Pod, source string) (filtered []*kubelet.Pod) { + names := util.StringSet{} + for i := range pods { + var errors []error + if names.Has(pods[i].Name) { + errors = append(errors, api.ValidationError{api.ErrTypeDuplicate, "Pod.Name", pods[i].Name}) + } else { + names.Insert(pods[i].Name) + } + if errs := kubelet.ValidatePod(&pods[i]); len(errs) != 0 { + errors = append(errors, errs...) + } + if len(errors) > 0 { + glog.Warningf("Pod %d from %s failed validation, ignoring: %v", i+1, source, errors) + continue + } + filtered = append(filtered, &pods[i]) + } + return +} + +// Sync sends a copy of the current state through the update channel +func (s *podStorage) Sync() { + s.updateLock.Lock() + defer s.updateLock.Unlock() + s.updates <- kubelet.PodUpdate{s.MergedState().([]kubelet.Pod), kubelet.SET} +} + +// Object implements config.Accessor +func (s *podStorage) MergedState() interface{} { + s.podLock.RLock() + defer s.podLock.RUnlock() + pods := make([]kubelet.Pod, 0) + for source, sourcePods := range s.pods { + for _, podRef := range sourcePods { + pod := *podRef + pod.Namespace = source + pods = append(pods, pod) + } + } + return pods +} diff --git a/pkg/kubelet/config/config_test.go b/pkg/kubelet/config/config_test.go new file mode 100644 index 00000000000..338b77bcfc2 --- /dev/null +++ b/pkg/kubelet/config/config_test.go @@ -0,0 +1,212 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package config + +import ( + "reflect" + "testing" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet" +) + +// TODO: remove this +func expectError(t *testing.T, err error) { + if err == nil { + t.Errorf("Expected error, Got %v", err) + } +} + +// TODO: remove this +func expectNoError(t *testing.T, err error) { + if err != nil { + t.Errorf("Expected no error, Got %v", err) + } +} + +func expectEmptyChannel(t *testing.T, ch <-chan interface{}) { + select { + case update := <-ch: + t.Errorf("Expected no update in channel, Got %v", update) + default: + } +} + +type sortedPods []kubelet.Pod + +func (s sortedPods) Len() int { + return len(s) +} +func (s sortedPods) Swap(i, j int) { + s[i], s[j] = s[j], s[i] +} +func (s sortedPods) Less(i, j int) bool { + if s[i].Namespace < s[j].Namespace { + return true + } + return s[i].Name < s[j].Name +} + +func CreateValidPod(name, namespace string) kubelet.Pod { + return kubelet.Pod{ + Name: name, + Namespace: namespace, + Manifest: api.ContainerManifest{ + Version: "v1beta1", + }, + } +} + +func CreatePodUpdate(op kubelet.PodOperation, pods ...kubelet.Pod) kubelet.PodUpdate { + newPods := make([]kubelet.Pod, len(pods)) + for i := range pods { + newPods[i] = pods[i] + } + return kubelet.PodUpdate{newPods, op} +} + +func createPodConfigTester(mode PodConfigNotificationMode) (chan<- interface{}, <-chan kubelet.PodUpdate, *PodConfig) { + config := NewPodConfig(mode) + channel := config.Channel("test") + ch := config.Updates() + return channel, ch, config +} + +func expectPodUpdate(t *testing.T, ch <-chan kubelet.PodUpdate, expected ...kubelet.PodUpdate) { + for i := range expected { + update := <-ch + if !reflect.DeepEqual(expected[i], update) { + t.Fatalf("Expected %#v, Got %#v", expected[i], update) + } + } + expectNoPodUpdate(t, ch) +} + +func expectNoPodUpdate(t *testing.T, ch <-chan kubelet.PodUpdate) { + select { + case update := <-ch: + t.Errorf("Expected no update in channel, Got %#v", update) + default: + } +} + +func TestNewPodAdded(t *testing.T) { + channel, ch, config := createPodConfigTester(PodConfigNotificationIncremental) + + // see an update + podUpdate := CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "")) + channel <- podUpdate + expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "test"))) + + config.Sync() + expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, CreateValidPod("foo", "test"))) +} + +func TestInvalidPodFiltered(t *testing.T) { + channel, ch, _ := createPodConfigTester(PodConfigNotificationIncremental) + + // see an update + podUpdate := CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "")) + channel <- podUpdate + expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "test"))) + + // add an invalid update + podUpdate = CreatePodUpdate(kubelet.UPDATE, kubelet.Pod{Name: "foo"}) + channel <- podUpdate + expectNoPodUpdate(t, ch) +} + +func TestNewPodAddedSnapshotAndUpdates(t *testing.T) { + channel, ch, config := createPodConfigTester(PodConfigNotificationSnapshotAndUpdates) + + // see an set + podUpdate := CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "")) + channel <- podUpdate + expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, CreateValidPod("foo", "test"))) + + config.Sync() + expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, CreateValidPod("foo", "test"))) + + // container updates are separated as UPDATE + pod := podUpdate.Pods[0] + pod.Manifest.Containers = []api.Container{{Name: "bar", Image: "test"}} + channel <- CreatePodUpdate(kubelet.ADD, pod) + expectPodUpdate(t, ch, CreatePodUpdate(kubelet.UPDATE, pod)) +} + +func TestNewPodAddedSnapshot(t *testing.T) { + channel, ch, config := createPodConfigTester(PodConfigNotificationSnapshot) + + // see an set + podUpdate := CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "")) + channel <- podUpdate + expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, CreateValidPod("foo", "test"))) + + config.Sync() + expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, CreateValidPod("foo", "test"))) + + // container updates are separated as UPDATE + pod := podUpdate.Pods[0] + pod.Manifest.Containers = []api.Container{{Name: "bar", Image: "test"}} + channel <- CreatePodUpdate(kubelet.ADD, pod) + expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, pod)) +} + +func TestNewPodAddedUpdatedRemoved(t *testing.T) { + channel, ch, _ := createPodConfigTester(PodConfigNotificationIncremental) + + // should register an add + podUpdate := CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "")) + channel <- podUpdate + expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "test"))) + + // should ignore ADDs that are identical + expectNoPodUpdate(t, ch) + + // an kubelet.ADD should be converted to kubelet.UPDATE + pod := CreateValidPod("foo", "test") + pod.Manifest.Containers = []api.Container{{Name: "bar", Image: "test"}} + podUpdate = CreatePodUpdate(kubelet.ADD, pod) + channel <- podUpdate + expectPodUpdate(t, ch, CreatePodUpdate(kubelet.UPDATE, pod)) + + podUpdate = CreatePodUpdate(kubelet.REMOVE, kubelet.Pod{Name: "foo"}) + channel <- podUpdate + expectPodUpdate(t, ch, CreatePodUpdate(kubelet.REMOVE, pod)) +} + +func TestNewPodAddedUpdatedSet(t *testing.T) { + channel, ch, _ := createPodConfigTester(PodConfigNotificationIncremental) + + // should register an add + podUpdate := CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", ""), CreateValidPod("foo2", ""), CreateValidPod("foo3", "")) + channel <- podUpdate + expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "test"), CreateValidPod("foo2", "test"), CreateValidPod("foo3", "test"))) + + // should ignore ADDs that are identical + expectNoPodUpdate(t, ch) + + // should be converted to an kubelet.ADD, kubelet.REMOVE, and kubelet.UPDATE + pod := CreateValidPod("foo2", "test") + pod.Manifest.Containers = []api.Container{{Name: "bar", Image: "test"}} + podUpdate = CreatePodUpdate(kubelet.SET, pod, CreateValidPod("foo3", ""), CreateValidPod("foo4", "test")) + channel <- podUpdate + expectPodUpdate(t, ch, + CreatePodUpdate(kubelet.REMOVE, CreateValidPod("foo", "test")), + CreatePodUpdate(kubelet.ADD, CreateValidPod("foo4", "test")), + CreatePodUpdate(kubelet.UPDATE, pod)) +} diff --git a/pkg/kubelet/config/etcd.go b/pkg/kubelet/config/etcd.go new file mode 100644 index 00000000000..48dc7b411b4 --- /dev/null +++ b/pkg/kubelet/config/etcd.go @@ -0,0 +1,140 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or sied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Reads the pod configuration from etcd using the Kubernetes etcd schema +package config + +import ( + "fmt" + "path" + "time" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet" + "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + "github.com/coreos/go-etcd/etcd" + "github.com/golang/glog" + "gopkg.in/v1/yaml" +) + +func EtcdKeyForHost(hostname string) string { + return path.Join("/", "registry", "hosts", hostname, "kubelet") +} + +// TODO(lavalamp): Use a watcher interface instead of the etcd client directly +type SourceEtcd struct { + key string + client tools.EtcdClient + updates chan<- interface{} + + waitDuration time.Duration +} + +// NewSourceEtcd creates a config source that watches and pulls from a key in etcd +func NewSourceEtcd(key string, client tools.EtcdClient, period time.Duration, updates chan<- interface{}) *SourceEtcd { + config := &SourceEtcd{ + key: key, + client: client, + updates: updates, + + waitDuration: period, + } + glog.Infof("Watching etcd for %s", key) + go util.Forever(config.run, period) + return config +} + +// run loops forever looking for changes to a key in etcd +func (s *SourceEtcd) run() { + index := uint64(0) + for { + lastIndex, err := s.fetchNextState(index) + if err != nil { + if !tools.IsEtcdNotFound(err) { + glog.Errorf("Unable to extract from the response (%s): %%v", s.key, err) + } + return + } + index = lastIndex + 1 + } +} + +// fetchNextState fetches the key (or waits for a change to a key) and then returns +// the index read. It will watch no longer than s.waitDuration and then return +func (s *SourceEtcd) fetchNextState(fromIndex uint64) (lastIndex uint64, err error) { + var response *etcd.Response + + if fromIndex == 0 { + response, err = s.client.Get(s.key, true, false) + } else { + response, err = s.client.Watch(s.key, fromIndex, false, nil, stopChannel(s.waitDuration)) + if tools.IsEtcdWatchStoppedByUser(err) { + return fromIndex, nil + } + } + if err != nil { + return 0, err + } + + pods, err := responseToPods(response) + if err != nil { + glog.Infof("Response was in error: %#v", response) + return 0, fmt.Errorf("error parsing response: %#v", err) + } + + glog.Infof("Got state from etcd: %+v", pods) + s.updates <- kubelet.PodUpdate{pods, kubelet.SET} + + return response.Node.ModifiedIndex, nil +} + +// responseToPods takes an etcd Response object, and turns it into a structured list of containers. +// It returns a list of containers, or an error if one occurs. +func responseToPods(response *etcd.Response) ([]kubelet.Pod, error) { + pods := []kubelet.Pod{} + if response.Node == nil || len(response.Node.Value) == 0 { + return pods, fmt.Errorf("no nodes field: %v", response) + } + + manifests := []api.ContainerManifest{} + if err := yaml.Unmarshal([]byte(response.Node.Value), &manifests); err != nil { + return pods, fmt.Errorf("could not unmarshal manifests: %v", err) + } + + for i, manifest := range manifests { + name := manifest.ID + if name == "" { + name = fmt.Sprintf("_%d", i+1) + } + pods = append(pods, kubelet.Pod{Name: name, Manifest: manifest}) + } + return pods, nil +} + +// stopChannel creates a channel that is closed after a duration for use with etcd client API +func stopChannel(until time.Duration) chan bool { + stop := make(chan bool) + go func() { + select { + case <-time.After(until): + } + stop <- true + close(stop) + }() + return stop + +} diff --git a/pkg/kubelet/config/etcd_test.go b/pkg/kubelet/config/etcd_test.go new file mode 100644 index 00000000000..462ac77ea2d --- /dev/null +++ b/pkg/kubelet/config/etcd_test.go @@ -0,0 +1,147 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package config + +import ( + "reflect" + "testing" + "time" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet" + "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + "github.com/coreos/go-etcd/etcd" +) + +// TODO(lavalamp): Use the etcd watcher from the tools package, and make sure all test cases here are tested there. + +func TestGetEtcdData(t *testing.T) { + fakeClient := tools.MakeFakeEtcdClient(t) + ch := make(chan interface{}) + fakeClient.Data["/registry/hosts/machine/kubelet"] = tools.EtcdResponseWithError{ + R: &etcd.Response{ + Node: &etcd.Node{ + Value: util.MakeJSONString([]api.ContainerManifest{api.ContainerManifest{ID: "foo"}}), + ModifiedIndex: 1, + }, + }, + E: nil, + } + NewSourceEtcd("/registry/hosts/machine/kubelet", fakeClient, time.Millisecond, ch) + + //TODO: update FakeEtcdClient.Watch to handle receiver=nil with a given index + //returns an infinite stream of updates + for i := 0; i < 2; i++ { + update := (<-ch).(kubelet.PodUpdate) + expected := CreatePodUpdate(kubelet.SET, kubelet.Pod{Name: "foo", Manifest: api.ContainerManifest{ID: "foo"}}) + if !reflect.DeepEqual(expected, update) { + t.Errorf("Expected %#v, Got %#v", expected, update) + } + } +} + +func TestGetEtcdNoData(t *testing.T) { + fakeClient := tools.MakeFakeEtcdClient(t) + ch := make(chan interface{}, 1) + fakeClient.Data["/registry/hosts/machine/kubelet"] = tools.EtcdResponseWithError{ + R: &etcd.Response{}, + E: nil, + } + c := SourceEtcd{"/registry/hosts/machine/kubelet", fakeClient, ch, time.Millisecond} + _, err := c.fetchNextState(0) + expectError(t, err) + expectEmptyChannel(t, ch) +} + +func TestGetEtcd(t *testing.T) { + fakeClient := tools.MakeFakeEtcdClient(t) + ch := make(chan interface{}, 1) + fakeClient.Data["/registry/hosts/machine/kubelet"] = tools.EtcdResponseWithError{ + R: &etcd.Response{ + Node: &etcd.Node{ + Value: util.MakeJSONString([]api.ContainerManifest{api.ContainerManifest{ID: "foo"}}), + ModifiedIndex: 1, + }, + }, + E: nil, + } + c := SourceEtcd{"/registry/hosts/machine/kubelet", fakeClient, ch, time.Millisecond} + lastIndex, err := c.fetchNextState(0) + expectNoError(t, err) + if lastIndex != 1 { + t.Errorf("Expected %#v, Got %#v", 1, lastIndex) + } + update := (<-ch).(kubelet.PodUpdate) + expected := CreatePodUpdate(kubelet.SET, kubelet.Pod{Name: "foo", Manifest: api.ContainerManifest{ID: "foo"}}) + if !reflect.DeepEqual(expected, update) { + t.Errorf("Expected %#v, Got %#v", expected, update) + } +} + +func TestWatchEtcd(t *testing.T) { + fakeClient := tools.MakeFakeEtcdClient(t) + ch := make(chan interface{}, 1) + fakeClient.Data["/registry/hosts/machine/kubelet"] = tools.EtcdResponseWithError{ + R: &etcd.Response{ + Node: &etcd.Node{ + Value: util.MakeJSONString([]api.Container{}), + ModifiedIndex: 2, + }, + }, + E: nil, + } + c := SourceEtcd{"/registry/hosts/machine/kubelet", fakeClient, ch, time.Millisecond} + lastIndex, err := c.fetchNextState(1) + expectNoError(t, err) + if lastIndex != 2 { + t.Errorf("Expected %d, Got %d", 1, lastIndex) + } + update := (<-ch).(kubelet.PodUpdate) + expected := CreatePodUpdate(kubelet.SET) + if !reflect.DeepEqual(expected, update) { + t.Errorf("Expected %#v, Got %#v", expected, update) + } +} + +func TestGetEtcdNotFound(t *testing.T) { + fakeClient := tools.MakeFakeEtcdClient(t) + ch := make(chan interface{}, 1) + fakeClient.Data["/registry/hosts/machine/kubelet"] = tools.EtcdResponseWithError{ + R: &etcd.Response{}, + E: tools.EtcdErrorNotFound, + } + c := SourceEtcd{"/registry/hosts/machine/kubelet", fakeClient, ch, time.Millisecond} + _, err := c.fetchNextState(0) + expectError(t, err) + expectEmptyChannel(t, ch) +} + +func TestGetEtcdError(t *testing.T) { + fakeClient := tools.MakeFakeEtcdClient(t) + ch := make(chan interface{}, 1) + fakeClient.Data["/registry/hosts/machine/kubelet"] = tools.EtcdResponseWithError{ + R: &etcd.Response{}, + E: &etcd.EtcdError{ + ErrorCode: 200, // non not found error + }, + } + c := SourceEtcd{"/registry/hosts/machine/kubelet", fakeClient, ch, time.Millisecond} + _, err := c.fetchNextState(0) + expectError(t, err) + expectEmptyChannel(t, ch) +} diff --git a/pkg/kubelet/config/file.go b/pkg/kubelet/config/file.go new file mode 100644 index 00000000000..6128b8d8bfd --- /dev/null +++ b/pkg/kubelet/config/file.go @@ -0,0 +1,149 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or sied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Reads the pod configuration from file or a directory of files +package config + +import ( + "crypto/sha1" + "encoding/base64" + "fmt" + "io/ioutil" + "os" + "path/filepath" + "sort" + "time" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + "github.com/golang/glog" + "gopkg.in/v1/yaml" +) + +type SourceFile struct { + path string + updates chan<- interface{} +} + +func NewSourceFile(path string, period time.Duration, updates chan<- interface{}) *SourceFile { + config := &SourceFile{ + path: path, + updates: updates, + } + glog.Infof("Watching file %s", path) + go util.Forever(config.run, period) + return config +} + +func (s *SourceFile) run() { + if err := s.extractFromPath(); err != nil { + glog.Errorf("Unable to read config file: %s", err) + } +} + +func (s *SourceFile) extractFromPath() error { + path := s.path + statInfo, err := os.Stat(path) + if err != nil { + if !os.IsNotExist(err) { + return fmt.Errorf("unable to access path: %s", err) + } + return fmt.Errorf("path does not exist: %s", path) + } + + switch { + case statInfo.Mode().IsDir(): + pods, err := extractFromDir(path) + if err != nil { + return err + } + s.updates <- kubelet.PodUpdate{pods, kubelet.SET} + + case statInfo.Mode().IsRegular(): + pod, err := extractFromFile(path) + if err != nil { + return err + } + s.updates <- kubelet.PodUpdate{[]kubelet.Pod{pod}, kubelet.SET} + + default: + return fmt.Errorf("path is not a directory or file") + } + + return nil +} + +func extractFromDir(name string) ([]kubelet.Pod, error) { + pods := []kubelet.Pod{} + + files, err := filepath.Glob(filepath.Join(name, "[^.]*")) + if err != nil { + return pods, err + } + + sort.Strings(files) + + for _, file := range files { + pod, err := extractFromFile(file) + if err != nil { + return []kubelet.Pod{}, err + } + pods = append(pods, pod) + } + return pods, nil +} + +func extractFromFile(name string) (kubelet.Pod, error) { + var pod kubelet.Pod + + file, err := os.Open(name) + if err != nil { + return pod, err + } + defer file.Close() + + data, err := ioutil.ReadAll(file) + if err != nil { + glog.Errorf("Couldn't read from file: %v", err) + return pod, err + } + + if err := yaml.Unmarshal(data, &pod.Manifest); err != nil { + return pod, fmt.Errorf("could not unmarshal manifest: %v", err) + } + + podName := pod.Manifest.ID + if podName == "" { + podName = simpleSubdomainSafeHash(name) + } + pod.Name = podName + + return pod, nil +} + +var simpleSubdomainSafeEncoding = base64.NewEncoding("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ012345678900") + +// simpleSubdomainSafeHash generates a compact hash of the input that uses characters +// only in the range a-zA-Z0-9, making it suitable for DNS subdomain labels +func simpleSubdomainSafeHash(s string) string { + hasher := sha1.New() + hasher.Write([]byte(s)) + sha := simpleSubdomainSafeEncoding.EncodeToString(hasher.Sum(nil)) + if len(sha) > 20 { + sha = sha[:20] + } + return sha +} diff --git a/pkg/kubelet/config/file_test.go b/pkg/kubelet/config/file_test.go new file mode 100644 index 00000000000..6909eca5efb --- /dev/null +++ b/pkg/kubelet/config/file_test.go @@ -0,0 +1,211 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package config + +import ( + "encoding/json" + "io/ioutil" + "os" + "reflect" + "sort" + "testing" + "time" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet" + "gopkg.in/v1/yaml" +) + +func TestExtractFromNonExistentFile(t *testing.T) { + ch := make(chan interface{}, 1) + c := SourceFile{"/some/fake/file", ch} + err := c.extractFromPath() + expectError(t, err) +} + +func TestUpdateOnNonExistentFile(t *testing.T) { + ch := make(chan interface{}) + NewSourceFile("random_non_existent_path", time.Millisecond, ch) + select { + case got := <-ch: + t.Errorf("Expected no update, Got %#v", got) + case <-time.After(2 * time.Millisecond): + } +} + +func writeTestFile(t *testing.T, dir, name string, contents string) *os.File { + file, err := ioutil.TempFile(os.TempDir(), "test_pod_config") + if err != nil { + t.Fatalf("Unable to create test file %#v", err) + } + file.Close() + if err := ioutil.WriteFile(file.Name(), []byte(contents), 0555); err != nil { + t.Fatalf("Unable to write test file %#v", err) + } + return file +} + +func TestReadFromFile(t *testing.T) { + file := writeTestFile(t, os.TempDir(), "test_pod_config", "version: v1beta1\nid: test\ncontainers:\n- image: test/image") + defer os.Remove(file.Name()) + + ch := make(chan interface{}) + NewSourceFile(file.Name(), time.Millisecond, ch) + select { + case got := <-ch: + update := got.(kubelet.PodUpdate) + expected := CreatePodUpdate(kubelet.SET, kubelet.Pod{ + Name: "test", + Manifest: api.ContainerManifest{ + ID: "test", + Version: "v1beta1", + Containers: []api.Container{api.Container{ + Image: "test/image"}, + }, + }, + }) + if !reflect.DeepEqual(expected, update) { + t.Errorf("Expected %#v, Got %#v", expected, update) + } + + case <-time.After(2 * time.Millisecond): + t.Errorf("Expected update, timeout instead") + } +} + +func TestExtractFromBadDataFile(t *testing.T) { + file := writeTestFile(t, os.TempDir(), "test_pod_config", string([]byte{1, 2, 3})) + defer os.Remove(file.Name()) + + ch := make(chan interface{}, 1) + c := SourceFile{file.Name(), ch} + err := c.extractFromPath() + expectError(t, err) + expectEmptyChannel(t, ch) +} + +func TestExtractFromValidDataFile(t *testing.T) { + manifest := api.ContainerManifest{ID: ""} + + text, err := json.Marshal(manifest) + expectNoError(t, err) + file := writeTestFile(t, os.TempDir(), "test_pod_config", string(text)) + defer os.Remove(file.Name()) + + ch := make(chan interface{}, 1) + c := SourceFile{file.Name(), ch} + err = c.extractFromPath() + expectNoError(t, err) + + update := (<-ch).(kubelet.PodUpdate) + expected := CreatePodUpdate(kubelet.SET, kubelet.Pod{Name: simpleSubdomainSafeHash(file.Name()), Manifest: manifest}) + if !reflect.DeepEqual(expected, update) { + t.Errorf("Expected %#v, Got %#v", expected, update) + } +} + +func TestExtractFromEmptyDir(t *testing.T) { + dirName, err := ioutil.TempDir("", "foo") + expectNoError(t, err) + defer os.RemoveAll(dirName) + + ch := make(chan interface{}, 1) + c := SourceFile{dirName, ch} + err = c.extractFromPath() + expectNoError(t, err) + + update := (<-ch).(kubelet.PodUpdate) + expected := CreatePodUpdate(kubelet.SET) + if !reflect.DeepEqual(expected, update) { + t.Errorf("Expected %#v, Got %#v", expected, update) + } +} + +func TestExtractFromDir(t *testing.T) { + manifests := []api.ContainerManifest{ + {ID: "", Containers: []api.Container{{Image: "foo"}}}, + {ID: "", Containers: []api.Container{{Image: "bar"}}}, + } + files := make([]*os.File, len(manifests)) + + dirName, err := ioutil.TempDir("", "foo") + expectNoError(t, err) + + for i, manifest := range manifests { + data, err := json.Marshal(manifest) + expectNoError(t, err) + file, err := ioutil.TempFile(dirName, manifest.ID) + expectNoError(t, err) + name := file.Name() + expectNoError(t, file.Close()) + ioutil.WriteFile(name, data, 0755) + files[i] = file + } + + ch := make(chan interface{}, 1) + c := SourceFile{dirName, ch} + err = c.extractFromPath() + expectNoError(t, err) + + update := (<-ch).(kubelet.PodUpdate) + expected := CreatePodUpdate( + kubelet.SET, + kubelet.Pod{Name: simpleSubdomainSafeHash(files[0].Name()), Manifest: manifests[0]}, + kubelet.Pod{Name: simpleSubdomainSafeHash(files[1].Name()), Manifest: manifests[1]}, + ) + sort.Sort(sortedPods(update.Pods)) + sort.Sort(sortedPods(expected.Pods)) + if !reflect.DeepEqual(expected, update) { + t.Errorf("Expected %#v, Got %#v", expected, update) + } +} + +// These are used for testing extract json (below) +type TestData struct { + Value string + Number int +} + +type TestObject struct { + Name string + Data TestData +} + +func verifyStringEquals(t *testing.T, actual, expected string) { + if actual != expected { + t.Errorf("Verification failed. Expected: %s, Found %s", expected, actual) + } +} + +func verifyIntEquals(t *testing.T, actual, expected int) { + if actual != expected { + t.Errorf("Verification failed. Expected: %d, Found %d", expected, actual) + } +} + +func TestExtractJSON(t *testing.T) { + obj := TestObject{} + data := `{ "name": "foo", "data": { "value": "bar", "number": 10 } }` + + if err := yaml.Unmarshal([]byte(data), &obj); err != nil { + t.Fatalf("Could not unmarshal JSON: %v", err) + } + + verifyStringEquals(t, obj.Name, "foo") + verifyStringEquals(t, obj.Data.Value, "bar") + verifyIntEquals(t, obj.Data.Number, 10) +} diff --git a/pkg/kubelet/config/http.go b/pkg/kubelet/config/http.go new file mode 100644 index 00000000000..b28f68056e7 --- /dev/null +++ b/pkg/kubelet/config/http.go @@ -0,0 +1,117 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or sied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Reads the pod configuration from an HTTP GET response +package config + +import ( + "fmt" + "io/ioutil" + "net/http" + "time" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + "github.com/golang/glog" + "gopkg.in/v1/yaml" +) + +type SourceURL struct { + url string + updates chan<- interface{} +} + +func NewSourceURL(url string, period time.Duration, updates chan<- interface{}) *SourceURL { + config := &SourceURL{ + url: url, + updates: updates, + } + glog.Infof("Watching URL %s", url) + go util.Forever(config.run, period) + return config +} + +func (s *SourceURL) run() { + if err := s.extractFromURL(); err != nil { + glog.Errorf("Failed to read URL: %s", err) + } +} + +func (s *SourceURL) extractFromURL() error { + resp, err := http.Get(s.url) + if err != nil { + return err + } + defer resp.Body.Close() + data, err := ioutil.ReadAll(resp.Body) + if err != nil { + return err + } + if len(data) == 0 { + return fmt.Errorf("zero-length data received from %v", s.url) + } + + // First try as if it's a single manifest + var pod kubelet.Pod + singleErr := yaml.Unmarshal(data, &pod.Manifest) + // TODO: replace with validation + if singleErr == nil && pod.Manifest.Version == "" { + // If data is a []ContainerManifest, trying to put it into a ContainerManifest + // will not give an error but also won't set any of the fields. + // Our docs say that the version field is mandatory, so using that to judge wether + // this was actually successful. + singleErr = fmt.Errorf("got blank version field") + } + + if singleErr == nil { + name := pod.Manifest.ID + if name == "" { + name = "1" + } + pod.Name = name + s.updates <- kubelet.PodUpdate{[]kubelet.Pod{pod}, kubelet.SET} + return nil + } + + // That didn't work, so try an array of manifests. + var manifests []api.ContainerManifest + multiErr := yaml.Unmarshal(data, &manifests) + // We're not sure if the person reading the logs is going to care about the single or + // multiple manifest unmarshalling attempt, so we need to put both in the logs, as is + // done at the end. Hence not returning early here. + if multiErr == nil && len(manifests) > 0 && manifests[0].Version == "" { + multiErr = fmt.Errorf("got blank version field") + } + if multiErr == nil { + pods := []kubelet.Pod{} + for i := range manifests { + pod := kubelet.Pod{Manifest: manifests[i]} + name := pod.Manifest.ID + if name == "" { + name = fmt.Sprintf("%d", i+1) + } + pod.Name = name + pods = append(pods, pod) + } + s.updates <- kubelet.PodUpdate{pods, kubelet.SET} + return nil + } + + return fmt.Errorf("%v: received '%v', but couldn't parse as a "+ + "single manifest (%v: %+v) or as multiple manifests (%v: %+v).\n", + s.url, string(data), singleErr, pod.Manifest, multiErr, manifests) +} diff --git a/pkg/kubelet/config/http_test.go b/pkg/kubelet/config/http_test.go new file mode 100644 index 00000000000..b1306f06116 --- /dev/null +++ b/pkg/kubelet/config/http_test.go @@ -0,0 +1,124 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package config + +import ( + "encoding/json" + "net/http/httptest" + "reflect" + "testing" + "time" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" +) + +func TestURLErrorNotExistNoUpdate(t *testing.T) { + ch := make(chan interface{}) + NewSourceURL("http://localhost:49575/_not_found_", time.Millisecond, ch) + select { + case got := <-ch: + t.Errorf("Expected no update, Got %#v", got) + case <-time.After(2 * time.Millisecond): + } +} + +func TestExtractFromHttpBadness(t *testing.T) { + ch := make(chan interface{}, 1) + c := SourceURL{"http://localhost:49575/_not_found_", ch} + err := c.extractFromURL() + expectError(t, err) + expectEmptyChannel(t, ch) +} + +func TestExtractFromHttpSingle(t *testing.T) { + manifests := []api.ContainerManifest{ + {Version: "v1beta1", ID: "foo"}, + } + // Taking a single-manifest from a URL allows kubelet to be used + // in the implementation of google's container VM image. + data, err := json.Marshal(manifests[0]) + + fakeHandler := util.FakeHandler{ + StatusCode: 200, + ResponseBody: string(data), + } + testServer := httptest.NewServer(&fakeHandler) + ch := make(chan interface{}, 1) + c := SourceURL{testServer.URL, ch} + + err = c.extractFromURL() + expectNoError(t, err) + update := (<-ch).(kubelet.PodUpdate) + expected := CreatePodUpdate(kubelet.SET, kubelet.Pod{Name: "foo", Manifest: manifests[0]}) + if !reflect.DeepEqual(expected, update) { + t.Errorf("Expected: %#v, Got: %#v", expected, update) + } +} + +func TestExtractFromHttpMultiple(t *testing.T) { + manifests := []api.ContainerManifest{ + {Version: "v1beta1", ID: ""}, + {Version: "v1beta1", ID: "bar"}, + } + data, err := json.Marshal(manifests) + if err != nil { + t.Fatalf("Some weird json problem: %v", err) + } + + fakeHandler := util.FakeHandler{ + StatusCode: 200, + ResponseBody: string(data), + } + testServer := httptest.NewServer(&fakeHandler) + ch := make(chan interface{}, 1) + c := SourceURL{testServer.URL, ch} + + err = c.extractFromURL() + expectNoError(t, err) + + update := (<-ch).(kubelet.PodUpdate) + expected := CreatePodUpdate(kubelet.SET, kubelet.Pod{Name: "1", Manifest: manifests[0]}, kubelet.Pod{Name: "bar", Manifest: manifests[1]}) + if !reflect.DeepEqual(expected, update) { + t.Errorf("Expected: %#v, Got: %#v", expected, update) + } +} + +func TestExtractFromHttpEmptyArray(t *testing.T) { + manifests := []api.ContainerManifest{} + data, err := json.Marshal(manifests) + if err != nil { + t.Fatalf("Some weird json problem: %v", err) + } + + fakeHandler := util.FakeHandler{ + StatusCode: 200, + ResponseBody: string(data), + } + testServer := httptest.NewServer(&fakeHandler) + ch := make(chan interface{}, 1) + c := SourceURL{testServer.URL, ch} + + err = c.extractFromURL() + expectNoError(t, err) + update := (<-ch).(kubelet.PodUpdate) + expected := CreatePodUpdate(kubelet.SET) + if !reflect.DeepEqual(expected, update) { + t.Errorf("Expected: %#v, Got: %#v", expected, update) + } +} diff --git a/pkg/kubelet/types.go b/pkg/kubelet/types.go new file mode 100644 index 00000000000..ca5ba15ccdc --- /dev/null +++ b/pkg/kubelet/types.go @@ -0,0 +1,60 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kubelet + +import ( + "fmt" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" +) + +// Pod represents the structure of a pod on the Kubelet, distinct from the apiserver +// representation of a Pod. +type Pod struct { + Namespace string + Name string + Manifest api.ContainerManifest +} + +// PodOperation defines what changes will be made on a pod configuration. +type PodOperation int + +const ( + // This is the current pod configuration + SET PodOperation = iota + // Pods with the given ids are new to this source + ADD + // Pods with the given ids have been removed from this source + REMOVE + // Pods with the given ids have been updated in this source + UPDATE +) + +// PodUpdate defines an operation sent on the channel. You can add or remove single services by +// sending an array of size one and Op == ADD|REMOVE (with REMOVE, only the ID is required). +// For setting the state of the system to a given state for this source configuration, set +// Pods as desired and Op to SET, which will reset the system state to that specified in this +// operation for this source channel. To remove all pods, set Pods to empty array and Op to SET. +type PodUpdate struct { + Pods []Pod + Op PodOperation +} + +//GetPodFullName returns a name that full identifies a pod across all config sources. +func GetPodFullName(pod *Pod) string { + return fmt.Sprintf("%s.%s", pod.Name, pod.Namespace) +} diff --git a/pkg/kubelet/validation.go b/pkg/kubelet/validation.go new file mode 100644 index 00000000000..710c14bb9b0 --- /dev/null +++ b/pkg/kubelet/validation.go @@ -0,0 +1,36 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kubelet + +import ( + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" +) + +func makeInvalidError(field string, value interface{}) api.ValidationError { + return api.ValidationError{api.ErrTypeInvalid, field, value} +} + +func ValidatePod(pod *Pod) (errors []error) { + if !util.IsDNSSubdomain(pod.Name) { + errors = append(errors, makeInvalidError("Pod.Name", pod.Name)) + } + if errs := api.ValidateManifest(&pod.Manifest); len(errs) != 0 { + errors = append(errors, errs...) + } + return errors +} diff --git a/pkg/kubelet/validation_test.go b/pkg/kubelet/validation_test.go new file mode 100644 index 00000000000..dfb02fa2f23 --- /dev/null +++ b/pkg/kubelet/validation_test.go @@ -0,0 +1,42 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kubelet_test + +import ( + "strings" + "testing" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + . "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet" +) + +func TestValidatePodNoName(t *testing.T) { + errorCases := map[string]Pod{ + // manifest is tested in api/validation_test.go, ensure it is invoked + "empty version": {Name: "test", Manifest: api.ContainerManifest{Version: "", ID: "abc"}}, + + // Name + "zero-length name": {Name: "", Manifest: api.ContainerManifest{Version: "v1beta1"}}, + "name > 255 characters": {Name: strings.Repeat("a", 256), Manifest: api.ContainerManifest{Version: "v1beta1"}}, + "name not a DNS subdomain": {Name: "a.b.c.", Manifest: api.ContainerManifest{Version: "v1beta1"}}, + } + for k, v := range errorCases { + if errs := ValidatePod(&v); len(errs) == 0 { + t.Errorf("expected failure for %s", k) + } + } +} diff --git a/pkg/tools/etcd_tools.go b/pkg/tools/etcd_tools.go index 1d3e39d03e4..1c6a3fc8eea 100644 --- a/pkg/tools/etcd_tools.go +++ b/pkg/tools/etcd_tools.go @@ -70,6 +70,11 @@ func IsEtcdConflict(err error) bool { return isEtcdErrorNum(err, 101) } +// IsEtcdWatchStoppedByUser returns true iff err is a client triggered stop. +func IsEtcdWatchStoppedByUser(err error) bool { + return etcd.ErrWatchStoppedByUser == err +} + // Returns true iff err is an etcd error, whose errorCode matches errorCode func isEtcdErrorNum(err error, errorCode int) bool { etcdError, ok := err.(*etcd.EtcdError) diff --git a/pkg/tools/fake_etcd_client.go b/pkg/tools/fake_etcd_client.go index 6dd3c38ec99..5979e71daee 100644 --- a/pkg/tools/fake_etcd_client.go +++ b/pkg/tools/fake_etcd_client.go @@ -124,6 +124,10 @@ func (f *FakeEtcdClient) Watch(prefix string, waitIndex uint64, recursive bool, defer close(injectedError) f.WatchInjectError = injectedError + if receiver == nil { + return f.Get(prefix, false, recursive) + } + f.watchCompletedChan <- true select { case <-stop: From 7767c2a2ace79f55d1f8e4cb75eb1e3bd48d2db9 Mon Sep 17 00:00:00 2001 From: Clayton Coleman Date: Tue, 15 Jul 2014 16:24:41 -0400 Subject: [PATCH 2/2] Switch the Kubelet to use kubelet/config Also transfer the Kubelet from using ContainerManifest.ID to source specific identifiers with namespacing. Move goroutine behavior out of kubelet/ and into integration.go and cmd/kubelet/kubelet.go for better isolation. --- cmd/integration/integration.go | 49 ++-- cmd/kubelet/kubelet.go | 65 ++++- pkg/kubelet/docker.go | 8 +- pkg/kubelet/kubelet.go | 508 +++++++------------------------- pkg/kubelet/kubelet_test.go | 513 +++++---------------------------- pkg/kubelet/server.go | 67 +++-- pkg/kubelet/server_test.go | 28 +- 7 files changed, 329 insertions(+), 909 deletions(-) diff --git a/cmd/integration/integration.go b/cmd/integration/integration.go index d16947e78ac..07fb02ffc7c 100644 --- a/cmd/integration/integration.go +++ b/cmd/integration/integration.go @@ -32,6 +32,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/controller" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet" + "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/config" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/master" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" @@ -85,6 +86,9 @@ func startComponents(manifestURL string) (apiServerURL string) { handler := delegateHandler{} apiserver := httptest.NewServer(&handler) + + etcdClient := etcd.NewClient(servers) + cl := client.New(apiserver.URL, nil) cl.PollPeriod = time.Second * 1 cl.Sync = true @@ -93,32 +97,39 @@ func startComponents(manifestURL string) (apiServerURL string) { m := master.New(servers, machineList, fakePodInfoGetter{}, nil, "", cl) handler.delegate = m.ConstructHandler("/api/v1beta1") - controllerManager := controller.MakeReplicationManager(etcd.NewClient(servers), cl) - + controllerManager := controller.MakeReplicationManager(etcdClient, cl) controllerManager.Run(1 * time.Second) - // Kubelet - myKubelet := kubelet.Kubelet{ - Hostname: machineList[0], - DockerClient: &fakeDocker1, - DockerPuller: &kubelet.FakeDockerPuller{}, - FileCheckFrequency: 5 * time.Second, - SyncFrequency: 5 * time.Second, - HTTPCheckFrequency: 5 * time.Second, + // Kubelet (localhost) + cfg1 := config.NewPodConfig(config.PodConfigNotificationSnapshotAndUpdates) + config.NewSourceEtcd(config.EtcdKeyForHost(machineList[0]), etcdClient, 30*time.Second, cfg1.Channel("etcd")) + config.NewSourceURL(manifestURL, 5*time.Second, cfg1.Channel("url")) + myKubelet := &kubelet.Kubelet{ + Hostname: machineList[0], + DockerClient: &fakeDocker1, + DockerPuller: &kubelet.FakeDockerPuller{}, } - go myKubelet.RunKubelet("", "", manifestURL, servers, "localhost", 10250) + go util.Forever(func() { myKubelet.Run(cfg1.Updates()) }, 0) + go util.Forever(cfg1.Sync, 3*time.Second) + go util.Forever(func() { + kubelet.ListenAndServeKubeletServer(myKubelet, cfg1.Channel("http"), http.DefaultServeMux, "localhost", 10250) + }, 0) + // Kubelet (machine) // Create a second kubelet so that the guestbook example's two redis slaves both // have a place they can schedule. - otherKubelet := kubelet.Kubelet{ - Hostname: machineList[1], - DockerClient: &fakeDocker2, - DockerPuller: &kubelet.FakeDockerPuller{}, - FileCheckFrequency: 5 * time.Second, - SyncFrequency: 5 * time.Second, - HTTPCheckFrequency: 5 * time.Second, + cfg2 := config.NewPodConfig(config.PodConfigNotificationSnapshotAndUpdates) + config.NewSourceEtcd(config.EtcdKeyForHost(machineList[1]), etcdClient, 30*time.Second, cfg2.Channel("etcd")) + otherKubelet := &kubelet.Kubelet{ + Hostname: machineList[1], + DockerClient: &fakeDocker2, + DockerPuller: &kubelet.FakeDockerPuller{}, } - go otherKubelet.RunKubelet("", "", "", servers, "localhost", 10251) + go util.Forever(func() { otherKubelet.Run(cfg2.Updates()) }, 0) + go util.Forever(cfg2.Sync, 3*time.Second) + go util.Forever(func() { + kubelet.ListenAndServeKubeletServer(otherKubelet, cfg2.Channel("http"), http.DefaultServeMux, "localhost", 10251) + }, 0) return apiserver.URL } diff --git a/cmd/kubelet/kubelet.go b/cmd/kubelet/kubelet.go index c570fcd99d7..45c918ebc33 100644 --- a/cmd/kubelet/kubelet.go +++ b/cmd/kubelet/kubelet.go @@ -23,16 +23,20 @@ package main import ( "flag" "math/rand" + "net/http" "os" "os/exec" + "strings" "time" _ "github.com/GoogleCloudPlatform/kubernetes/pkg/healthz" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet" + kconfig "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/config" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/coreos/go-etcd/etcd" "github.com/fsouza/go-dockerclient" "github.com/golang/glog" + "github.com/google/cadvisor/client" ) var ( @@ -77,7 +81,7 @@ func getHostname() string { } hostname = fqdn } - return string(hostname) + return strings.TrimSpace(string(hostname)) } func main() { @@ -93,12 +97,57 @@ func main() { glog.Fatal("Couldn't connect to docker.") } - k := kubelet.Kubelet{ - Hostname: getHostname(), - DockerClient: dockerClient, - FileCheckFrequency: *fileCheckFrequency, - SyncFrequency: *syncFrequency, - HTTPCheckFrequency: *httpCheckFrequency, + cadvisorClient, err := cadvisor.NewClient("http://127.0.0.1:5000") + if err != nil { + glog.Errorf("Error on creating cadvisor client: %v", err) } - k.RunKubelet(*dockerEndpoint, *config, *manifestURL, etcdServerList, *address, *port) + + hostname := getHostname() + + k := &kubelet.Kubelet{ + Hostname: hostname, + DockerClient: dockerClient, + CadvisorClient: cadvisorClient, + } + + // source of all configuration + cfg := kconfig.NewPodConfig(kconfig.PodConfigNotificationSnapshotAndUpdates) + + // define file config source + if *config != "" { + kconfig.NewSourceFile(*config, *fileCheckFrequency, cfg.Channel("file")) + } + + // define url config source + if *manifestURL != "" { + kconfig.NewSourceURL(*manifestURL, *httpCheckFrequency, cfg.Channel("http")) + } + + // define etcd config source and initialize etcd client + if len(etcdServerList) > 0 { + glog.Infof("Watching for etcd configs at %v", etcdServerList) + k.EtcdClient = etcd.NewClient(etcdServerList) + kconfig.NewSourceEtcd(kconfig.EtcdKeyForHost(hostname), k.EtcdClient, 30*time.Second, cfg.Channel("etcd")) + } + + // TODO: block until all sources have delivered at least one update to the channel, or break the sync loop + // up into "per source" synchronizations + + // start the kubelet + go util.Forever(func() { k.Run(cfg.Updates()) }, 0) + + // resynchronize periodically + // TODO: make this part of PodConfig so that it is only delivered after syncFrequency has elapsed without + // an update + go util.Forever(cfg.Sync, *syncFrequency) + + // start the kubelet server + if *address != "" { + go util.Forever(func() { + kubelet.ListenAndServeKubeletServer(k, cfg.Channel("http"), http.DefaultServeMux, *address, *port) + }, 0) + } + + // runs forever + select {} } diff --git a/pkg/kubelet/docker.go b/pkg/kubelet/docker.go index 7399c5d5e3a..4c02a09639b 100644 --- a/pkg/kubelet/docker.go +++ b/pkg/kubelet/docker.go @@ -174,14 +174,14 @@ func unescapeDash(in string) (out string) { const containerNamePrefix = "k8s" // Creates a name which can be reversed to identify both manifest id and container name. -func buildDockerName(manifest *api.ContainerManifest, container *api.Container) string { +func buildDockerName(pod *Pod, container *api.Container) string { // Note, manifest.ID could be blank. - return fmt.Sprintf("%s--%s--%s--%08x", containerNamePrefix, escapeDash(container.Name), escapeDash(manifest.ID), rand.Uint32()) + return fmt.Sprintf("%s--%s--%s--%08x", containerNamePrefix, escapeDash(container.Name), escapeDash(GetPodFullName(pod)), rand.Uint32()) } // Upacks a container name, returning the manifest id and container name we would have used to // construct the docker name. If the docker name isn't one we created, we may return empty strings. -func parseDockerName(name string) (manifestID, containerName string) { +func parseDockerName(name string) (podFullName, containerName string) { // For some reason docker appears to be appending '/' to names. // If its there, strip it. if name[0] == '/' { @@ -195,7 +195,7 @@ func parseDockerName(name string) (manifestID, containerName string) { containerName = unescapeDash(parts[1]) } if len(parts) > 2 { - manifestID = unescapeDash(parts[2]) + podFullName = unescapeDash(parts[2]) } return } diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index b099a156ad1..a332c4194f9 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -20,13 +20,7 @@ import ( "encoding/json" "errors" "fmt" - "io/ioutil" - "net" "net/http" - "os" - "path" - "path/filepath" - "sort" "strconv" "strings" "sync" @@ -40,9 +34,7 @@ import ( "github.com/coreos/go-etcd/etcd" "github.com/fsouza/go-dockerclient" "github.com/golang/glog" - "github.com/google/cadvisor/client" "github.com/google/cadvisor/info" - "gopkg.in/v1/yaml" ) const defaultChanSize = 1024 @@ -58,6 +50,13 @@ type CadvisorInterface interface { MachineInfo() (*info.MachineInfo, error) } +// SyncHandler is an interface implemented by Kubelet, for testability +type SyncHandler interface { + SyncPods([]Pod) error +} + +type volumeMap map[string]volume.Interface + // New creates a new Kubelet. // TODO: currently it is only called by test code. // Need cleanup. @@ -65,94 +64,35 @@ func New() *Kubelet { return &Kubelet{} } -type volumeMap map[string]volume.Interface - // Kubelet is the main kubelet implementation. type Kubelet struct { - Hostname string - EtcdClient tools.EtcdClient - DockerClient DockerInterface - DockerPuller DockerPuller - CadvisorClient CadvisorInterface - FileCheckFrequency time.Duration - SyncFrequency time.Duration - HTTPCheckFrequency time.Duration - pullLock sync.Mutex - HealthChecker health.HealthChecker - LogServer http.Handler + Hostname string + DockerClient DockerInterface + + // Optional, no events will be sent without it + EtcdClient tools.EtcdClient + // Optional, no statistics will be available if omitted + CadvisorClient CadvisorInterface + // Optional, defaults to simple implementaiton + HealthChecker health.HealthChecker + // Optional, defaults to simple Docker implementation + DockerPuller DockerPuller + // Optional, defaults to /logs/ from /var/log + LogServer http.Handler } -type manifestUpdate struct { - source string - manifests []api.ContainerManifest -} - -const ( - fileSource = "file" - etcdSource = "etcd" - httpClientSource = "http_client" - httpServerSource = "http_server" -) - -// RunKubelet starts background goroutines. If config_path, manifest_url, or address are empty, -// they are not watched. Never returns. -func (kl *Kubelet) RunKubelet(dockerEndpoint, configPath, manifestURL string, etcdServers []string, address string, port uint) { +// Run starts the kubelet reacting to config updates +func (kl *Kubelet) Run(updates <-chan PodUpdate) { if kl.LogServer == nil { kl.LogServer = http.StripPrefix("/logs/", http.FileServer(http.Dir("/var/log/"))) } - if kl.CadvisorClient == nil { - var err error - kl.CadvisorClient, err = cadvisor.NewClient("http://127.0.0.1:5000") - if err != nil { - glog.Errorf("Error on creating cadvisor client: %v", err) - } - } if kl.DockerPuller == nil { kl.DockerPuller = NewDockerPuller(kl.DockerClient) } - updateChannel := make(chan manifestUpdate) - if configPath != "" { - glog.Infof("Watching for file configs at %s", configPath) - go util.Forever(func() { - kl.WatchFiles(configPath, updateChannel) - }, kl.FileCheckFrequency) + if kl.HealthChecker == nil { + kl.HealthChecker = health.NewHealthChecker() } - if manifestURL != "" { - glog.Infof("Watching for HTTP configs at %s", manifestURL) - go util.Forever(func() { - if err := kl.extractFromHTTP(manifestURL, updateChannel); err != nil { - glog.Errorf("Error syncing http: %v", err) - } - }, kl.HTTPCheckFrequency) - } - if len(etcdServers) > 0 { - glog.Infof("Watching for etcd configs at %v", etcdServers) - kl.EtcdClient = etcd.NewClient(etcdServers) - go util.Forever(func() { kl.SyncAndSetupEtcdWatch(updateChannel) }, 20*time.Second) - } - if address != "" { - glog.Infof("Starting to listen on %s:%d", address, port) - handler := Server{ - Kubelet: kl, - UpdateChannel: updateChannel, - DelegateHandler: http.DefaultServeMux, - } - s := &http.Server{ - Addr: net.JoinHostPort(address, strconv.FormatUint(uint64(port), 10)), - Handler: &handler, - ReadTimeout: 10 * time.Second, - WriteTimeout: 10 * time.Second, - MaxHeaderBytes: 1 << 20, - } - go util.Forever(func() { s.ListenAndServe() }, 0) - } - kl.HealthChecker = health.NewHealthChecker() - kl.syncLoop(updateChannel, kl) -} - -// SyncHandler is an interface implemented by Kubelet, for testability -type SyncHandler interface { - SyncManifests([]api.ContainerManifest) error + kl.syncLoop(updates, kl) } // LogEvent logs an event to the etcd backend. @@ -186,7 +126,7 @@ func makeEnvironmentVariables(container *api.Container) []string { return result } -func makeVolumesAndBinds(manifestID string, container *api.Container, podVolumes volumeMap) (map[string]struct{}, []string) { +func makeVolumesAndBinds(pod *Pod, container *api.Container, podVolumes volumeMap) (map[string]struct{}, []string) { volumes := map[string]struct{}{} binds := []string{} for _, volume := range container.VolumeMounts { @@ -201,7 +141,7 @@ func makeVolumesAndBinds(manifestID string, container *api.Container, podVolumes // TODO(jonesdl) This clause should be deleted and an error should be thrown. The default // behavior is now supported by the EmptyDirectory type. volumes[volume.MountPath] = struct{}{} - basePath = fmt.Sprintf("/exports/%s/%s:%s", manifestID, volume.Name, volume.MountPath) + basePath = fmt.Sprintf("/exports/%s/%s:%s", GetPodFullName(pod), volume.Name, volume.MountPath) } if volume.ReadOnly { basePath += ":ro" @@ -268,14 +208,14 @@ func (kl *Kubelet) mountExternalVolumes(manifest *api.ContainerManifest) (volume return podVolumes, nil } -// Run a single container from a manifest. Returns the docker container ID -func (kl *Kubelet) runContainer(manifest *api.ContainerManifest, container *api.Container, podVolumes volumeMap, netMode string) (id DockerID, err error) { +// Run a single container from a pod. Returns the docker container ID +func (kl *Kubelet) runContainer(pod *Pod, container *api.Container, podVolumes volumeMap, netMode string) (id DockerID, err error) { envVariables := makeEnvironmentVariables(container) - volumes, binds := makeVolumesAndBinds(manifest.ID, container, podVolumes) + volumes, binds := makeVolumesAndBinds(pod, container, podVolumes) exposedPorts, portBindings := makePortsAndBindings(container) opts := docker.CreateContainerOptions{ - Name: buildDockerName(manifest, container), + Name: buildDockerName(pod, container), Config: &docker.Config{ Cmd: container.Command, Env: envVariables, @@ -301,13 +241,14 @@ func (kl *Kubelet) runContainer(manifest *api.ContainerManifest, container *api. } // Kill a docker container -func (kl *Kubelet) killContainer(container docker.APIContainers) error { - err := kl.DockerClient.StopContainer(container.ID, 10) - manifestID, containerName := parseDockerName(container.Names[0]) +func (kl *Kubelet) killContainer(dockerContainer docker.APIContainers) error { + err := kl.DockerClient.StopContainer(dockerContainer.ID, 10) + podFullName, containerName := parseDockerName(dockerContainer.Names[0]) kl.LogEvent(&api.Event{ Event: "STOP", Manifest: &api.ContainerManifest{ - ID: manifestID, + //TODO: This should be reported using either the apiserver schema or the kubelet schema + ID: podFullName, }, Container: &api.Container{ Name: containerName, @@ -317,247 +258,17 @@ func (kl *Kubelet) killContainer(container docker.APIContainers) error { return err } -func (kl *Kubelet) extractFromFile(name string) (api.ContainerManifest, error) { - var file *os.File - var err error - var manifest api.ContainerManifest - - if file, err = os.Open(name); err != nil { - return manifest, err - } - defer file.Close() - - data, err := ioutil.ReadAll(file) - if err != nil { - glog.Errorf("Couldn't read from file: %v", err) - return manifest, err - } - if err = kl.ExtractYAMLData(data, &manifest); err != nil { - return manifest, err - } - return manifest, nil -} - -func (kl *Kubelet) extractFromDir(name string) ([]api.ContainerManifest, error) { - var manifests []api.ContainerManifest - - files, err := filepath.Glob(filepath.Join(name, "[^.]*")) - if err != nil { - return manifests, err - } - - sort.Strings(files) - - for _, file := range files { - manifest, err := kl.extractFromFile(file) - if err != nil { - return manifests, err - } - manifests = append(manifests, manifest) - } - return manifests, nil -} - -// WatchFiles watches a file or direcory of files for changes to the set of pods that -// should run on this Kubelet. -func (kl *Kubelet) WatchFiles(configPath string, updateChannel chan<- manifestUpdate) { - statInfo, err := os.Stat(configPath) - if err != nil { - if !os.IsNotExist(err) { - glog.Errorf("Error accessing path: %v", err) - } - return - } - - switch { - case statInfo.Mode().IsDir(): - manifests, err := kl.extractFromDir(configPath) - if err != nil { - glog.Errorf("Error polling dir: %v", err) - return - } - updateChannel <- manifestUpdate{fileSource, manifests} - case statInfo.Mode().IsRegular(): - manifest, err := kl.extractFromFile(configPath) - if err != nil { - glog.Errorf("Error polling file: %v", err) - return - } - updateChannel <- manifestUpdate{fileSource, []api.ContainerManifest{manifest}} - default: - glog.Errorf("Error accessing config - not a directory or file") - } -} - -func (kl *Kubelet) extractFromHTTP(url string, updateChannel chan<- manifestUpdate) error { - resp, err := http.Get(url) - if err != nil { - return err - } - defer resp.Body.Close() - data, err := ioutil.ReadAll(resp.Body) - if err != nil { - return err - } - if len(data) == 0 { - return fmt.Errorf("zero-length data received from %v", url) - } - - // First try as if it's a single manifest - var manifest api.ContainerManifest - singleErr := yaml.Unmarshal(data, &manifest) - if singleErr == nil && manifest.Version == "" { - // If data is a []ContainerManifest, trying to put it into a ContainerManifest - // will not give an error but also won't set any of the fields. - // Our docs say that the version field is mandatory, so using that to judge wether - // this was actually successful. - singleErr = fmt.Errorf("got blank version field") - } - if singleErr == nil { - updateChannel <- manifestUpdate{httpClientSource, []api.ContainerManifest{manifest}} - return nil - } - - // That didn't work, so try an array of manifests. - var manifests []api.ContainerManifest - multiErr := yaml.Unmarshal(data, &manifests) - // We're not sure if the person reading the logs is going to care about the single or - // multiple manifest unmarshalling attempt, so we need to put both in the logs, as is - // done at the end. Hence not returning early here. - if multiErr == nil && len(manifests) > 0 && manifests[0].Version == "" { - multiErr = fmt.Errorf("got blank version field") - } - if multiErr == nil { - updateChannel <- manifestUpdate{httpClientSource, manifests} - return nil - } - return fmt.Errorf("%v: received '%v', but couldn't parse as a "+ - "single manifest (%v: %+v) or as multiple manifests (%v: %+v).\n", - url, string(data), singleErr, manifest, multiErr, manifests) -} - -// ResponseToManifests takes an etcd Response object, and turns it into a structured list of containers. -// It returns a list of containers, or an error if one occurs. -func (kl *Kubelet) ResponseToManifests(response *etcd.Response) ([]api.ContainerManifest, error) { - if response.Node == nil || len(response.Node.Value) == 0 { - return nil, fmt.Errorf("no nodes field: %v", response) - } - var manifests []api.ContainerManifest - err := kl.ExtractYAMLData([]byte(response.Node.Value), &manifests) - return manifests, err -} - -func (kl *Kubelet) getKubeletStateFromEtcd(key string, updateChannel chan<- manifestUpdate) error { - response, err := kl.EtcdClient.Get(key, true, false) - if err != nil { - if tools.IsEtcdNotFound(err) { - return nil - } - glog.Errorf("Error on etcd get of %s: %v", key, err) - return err - } - manifests, err := kl.ResponseToManifests(response) - if err != nil { - glog.Errorf("Error parsing response (%v): %s", response, err) - return err - } - glog.Infof("Got state from etcd: %+v", manifests) - updateChannel <- manifestUpdate{etcdSource, manifests} - return nil -} - -// SyncAndSetupEtcdWatch synchronizes with etcd, and sets up an etcd watch for new configurations. -// The channel to send new configurations across -// This function loops forever and is intended to be run in a go routine. -func (kl *Kubelet) SyncAndSetupEtcdWatch(updateChannel chan<- manifestUpdate) { - key := path.Join("registry", "hosts", strings.TrimSpace(kl.Hostname), "kubelet") - - // First fetch the initial configuration (watch only gives changes...) - for { - err := kl.getKubeletStateFromEtcd(key, updateChannel) - if err == nil { - // We got a successful response, etcd is up, set up the watch. - break - } - time.Sleep(30 * time.Second) - } - - done := make(chan bool) - go util.Forever(func() { kl.TimeoutWatch(done) }, 0) - for { - // The etcd client will close the watch channel when it exits. So we need - // to create and service a new one every time. - watchChannel := make(chan *etcd.Response) - // We don't push this through Forever because if it dies, we just do it again in 30 secs. - // anyway. - go kl.WatchEtcd(watchChannel, updateChannel) - - kl.getKubeletStateFromEtcd(key, updateChannel) - glog.V(1).Infof("Setting up a watch for configuration changes in etcd for %s", key) - kl.EtcdClient.Watch(key, 0, true, watchChannel, done) - } -} - -// TimeoutWatch timeout the watch after 30 seconds. -func (kl *Kubelet) TimeoutWatch(done chan bool) { - t := time.Tick(30 * time.Second) - for _ = range t { - done <- true - } -} - -// ExtractYAMLData extracts data from YAML file into a list of containers. -func (kl *Kubelet) ExtractYAMLData(buf []byte, output interface{}) error { - if err := yaml.Unmarshal(buf, output); err != nil { - glog.Errorf("Couldn't unmarshal configuration: %v", err) - return err - } - return nil -} - -func (kl *Kubelet) extractFromEtcd(response *etcd.Response) ([]api.ContainerManifest, error) { - var manifests []api.ContainerManifest - if response.Node == nil || len(response.Node.Value) == 0 { - return manifests, fmt.Errorf("no nodes field: %v", response) - } - err := kl.ExtractYAMLData([]byte(response.Node.Value), &manifests) - return manifests, err -} - -// WatchEtcd watches etcd for changes, receives config objects from the etcd client watch. -// This function loops until the watchChannel is closed, and is intended to be run as a goroutine. -func (kl *Kubelet) WatchEtcd(watchChannel <-chan *etcd.Response, updateChannel chan<- manifestUpdate) { - defer util.HandleCrash() - for { - watchResponse := <-watchChannel - // This means the channel has been closed. - if watchResponse == nil { - return - } - glog.Infof("Got etcd change: %v", watchResponse) - manifests, err := kl.extractFromEtcd(watchResponse) - if err != nil { - glog.Errorf("Error handling response from etcd: %v", err) - continue - } - glog.Infof("manifests: %+v", manifests) - // Ok, we have a valid configuration, send to channel for - // rejiggering. - updateChannel <- manifestUpdate{etcdSource, manifests} - } -} - const ( networkContainerName = "net" networkContainerImage = "kubernetes/pause:latest" ) -// Create a network container for a manifest. Returns the docker container ID of the newly created container. -func (kl *Kubelet) createNetworkContainer(manifest *api.ContainerManifest) (DockerID, error) { +// createNetworkContainer starts the network container for a pod. Returns the docker container ID of the newly created container. +func (kl *Kubelet) createNetworkContainer(pod *Pod) (DockerID, error) { var ports []api.Port // Docker only exports ports from the network container. Let's // collect all of the relevant ports and export them. - for _, container := range manifest.Containers { + for _, container := range pod.Manifest.Containers { ports = append(ports, container.Ports...) } container := &api.Container{ @@ -566,32 +277,36 @@ func (kl *Kubelet) createNetworkContainer(manifest *api.ContainerManifest) (Dock Ports: ports, } kl.DockerPuller.Pull(networkContainerImage) - return kl.runContainer(manifest, container, nil, "") + return kl.runContainer(pod, container, nil, "") } -func (kl *Kubelet) syncManifest(manifest *api.ContainerManifest, dockerContainers DockerContainers, keepChannel chan<- DockerID) error { - // Make sure we have a network container +func (kl *Kubelet) syncPod(pod *Pod, dockerContainers DockerContainers, keepChannel chan<- DockerID) error { + podFullName := GetPodFullName(pod) + var netID DockerID - if networkDockerContainer, found := dockerContainers.FindPodContainer(manifest.ID, networkContainerName); found { + if networkDockerContainer, found := dockerContainers.FindPodContainer(podFullName, networkContainerName); found { netID = DockerID(networkDockerContainer.ID) } else { - dockerNetworkID, err := kl.createNetworkContainer(manifest) + glog.Infof("Network container doesn't exist, creating") + dockerNetworkID, err := kl.createNetworkContainer(pod) if err != nil { - glog.Errorf("Failed to introspect network container. (%v) Skipping manifest %s", err, manifest.ID) + glog.Errorf("Failed to introspect network container. (%v) Skipping pod %s", err, podFullName) return err } netID = dockerNetworkID } keepChannel <- netID - podVolumes, err := kl.mountExternalVolumes(manifest) + + podVolumes, err := kl.mountExternalVolumes(&pod.Manifest) if err != nil { - glog.Errorf("Unable to mount volumes for manifest %s: (%v)", manifest.ID, err) + glog.Errorf("Unable to mount volumes for pod %s: (%v)", podFullName, err) } - for _, container := range manifest.Containers { - if dockerContainer, found := dockerContainers.FindPodContainer(manifest.ID, container.Name); found { + + for _, container := range pod.Manifest.Containers { + if dockerContainer, found := dockerContainers.FindPodContainer(podFullName, container.Name); found { containerID := DockerID(dockerContainer.ID) - glog.Infof("manifest %s container %s exists as %v", manifest.ID, container.Name, containerID) - glog.V(1).Infof("manifest %s container %s exists as %v", manifest.ID, container.Name, containerID) + glog.Infof("pod %s container %s exists as %v", podFullName, container.Name, containerID) + glog.V(1).Infof("pod %s container %s exists as %v", podFullName, container.Name, containerID) // TODO: This should probably be separated out into a separate goroutine. healthy, err := kl.healthy(container, dockerContainer) @@ -604,22 +319,22 @@ func (kl *Kubelet) syncManifest(manifest *api.ContainerManifest, dockerContainer continue } - glog.V(1).Infof("manifest %s container %s is unhealthy %d.", manifest.ID, container.Name, healthy) + glog.V(1).Infof("pod %s container %s is unhealthy.", podFullName, container.Name, healthy) if err := kl.killContainer(*dockerContainer); err != nil { - glog.V(1).Infof("Failed to kill container %s: %v", containerID, err) + glog.V(1).Infof("Failed to kill container %s: %v", dockerContainer.ID, err) continue } } - glog.Infof("%+v doesn't exist, creating", container) + glog.Infof("Container doesn't exist, creating %#v", container) if err := kl.DockerPuller.Pull(container.Image); err != nil { - glog.Errorf("Failed to create container: %v skipping manifest %s container %s.", err, manifest.ID, container.Name) + glog.Errorf("Failed to pull image: %v skipping pod %s container %s.", err, podFullName, container.Name) continue } - containerID, err := kl.runContainer(manifest, &container, podVolumes, "container:"+string(netID)) + containerID, err := kl.runContainer(pod, &container, podVolumes, "container:"+string(netID)) if err != nil { // TODO(bburns) : Perhaps blacklist a container after N failures? - glog.Errorf("Error running manifest %s container %s: %v", manifest.ID, container.Name, err) + glog.Errorf("Error running pod %s container %s: %v", podFullName, container.Name, err) continue } keepChannel <- containerID @@ -629,9 +344,10 @@ func (kl *Kubelet) syncManifest(manifest *api.ContainerManifest, dockerContainer type empty struct{} -// SyncManifests synchronizes the configured list of containers (desired state) with the host current state. -func (kl *Kubelet) SyncManifests(config []api.ContainerManifest) error { - glog.Infof("Desired: %+v", config) +// SyncPods synchronizes the configured list of pods (desired state) with the host current state. +func (kl *Kubelet) SyncPods(pods []Pod) error { + glog.Infof("Desired [%s]: %+v", kl.Hostname, pods) + var err error dockerIdsToKeep := map[DockerID]empty{} keepChannel := make(chan DockerID, defaultChanSize) waitGroup := sync.WaitGroup{} @@ -643,18 +359,18 @@ func (kl *Kubelet) SyncManifests(config []api.ContainerManifest) error { } // Check for any containers that need starting - for ix := range config { + for i := range pods { waitGroup.Add(1) go func(index int) { defer util.HandleCrash() defer waitGroup.Done() // necessary to dereference by index here b/c otherwise the shared value // in the for each is re-used. - err := kl.syncManifest(&config[index], dockerContainers, keepChannel) + err := kl.syncPod(&pods[index], dockerContainers, keepChannel) if err != nil { - glog.Errorf("Error syncing manifest: %v skipping.", err) + glog.Errorf("Error syncing pod: %v skipping.", err) } - }(ix) + }(i) } ch := make(chan bool) go func() { @@ -663,7 +379,7 @@ func (kl *Kubelet) SyncManifests(config []api.ContainerManifest) error { } ch <- true }() - if len(config) > 0 { + if len(pods) > 0 { waitGroup.Wait() } close(keepChannel) @@ -687,69 +403,51 @@ func (kl *Kubelet) SyncManifests(config []api.ContainerManifest) error { return err } -// Check that all Port.HostPort values are unique across all manifests. -func checkHostPortConflicts(allManifests []api.ContainerManifest, newManifest *api.ContainerManifest) []error { - allErrs := []error{} - - allPorts := map[int]bool{} +// filterHostPortConflicts removes pods that conflict on Port.HostPort values +func filterHostPortConflicts(pods []Pod) []Pod { + filtered := []Pod{} + ports := map[int]bool{} extract := func(p *api.Port) int { return p.HostPort } - for i := range allManifests { - manifest := &allManifests[i] - errs := api.AccumulateUniquePorts(manifest.Containers, allPorts, extract) - if len(errs) != 0 { - allErrs = append(allErrs, errs...) + for i := range pods { + pod := &pods[i] + if errs := api.AccumulateUniquePorts(pod.Manifest.Containers, ports, extract); len(errs) != 0 { + glog.Warningf("Pod %s has conflicting ports, ignoring: %v", GetPodFullName(pod), errs) + continue } + filtered = append(filtered, *pod) } - if errs := api.AccumulateUniquePorts(newManifest.Containers, allPorts, extract); len(errs) != 0 { - allErrs = append(allErrs, errs...) - } - return allErrs + + return filtered } // syncLoop is the main loop for processing changes. It watches for changes from // four channels (file, etcd, server, and http) and creates a union of them. For // any new change seen, will run a sync against desired state and running state. If // no changes are seen to the configuration, will synchronize the last known desired -// state every sync_frequency seconds. -// Never returns. -func (kl *Kubelet) syncLoop(updateChannel <-chan manifestUpdate, handler SyncHandler) { - last := make(map[string][]api.ContainerManifest) +// state every sync_frequency seconds. Never returns. +func (kl *Kubelet) syncLoop(updates <-chan PodUpdate, handler SyncHandler) { for { + var pods []Pod select { - case u := <-updateChannel: - glog.Infof("Got configuration from %s: %+v", u.source, u.manifests) - last[u.source] = u.manifests - case <-time.After(kl.SyncFrequency): - } + case u := <-updates: + switch u.Op { + case SET: + glog.Infof("Containers changed [%s]", kl.Hostname) + pods = u.Pods - allManifests := []api.ContainerManifest{} - allIds := util.StringSet{} - for src, srcManifests := range last { - for i := range srcManifests { - allErrs := []error{} + case UPDATE: + //TODO: implement updates of containers + glog.Infof("Containers updated, not implemented [%s]", kl.Hostname) + continue - m := &srcManifests[i] - if allIds.Has(m.ID) { - allErrs = append(allErrs, api.ValidationError{api.ErrTypeDuplicate, "ContainerManifest.ID", m.ID}) - } else { - allIds.Insert(m.ID) - } - if errs := api.ValidateManifest(m); len(errs) != 0 { - allErrs = append(allErrs, errs...) - } - // Check for host-wide HostPort conflicts. - if errs := checkHostPortConflicts(allManifests, m); len(errs) != 0 { - allErrs = append(allErrs, errs...) - } - if len(allErrs) > 0 { - glog.Warningf("Manifest from %s failed validation, ignoring: %v", src, allErrs) - } + default: + panic("syncLoop does not support incremental changes") } - // TODO(thockin): There's no reason to collect manifests by value. Don't pessimize. - allManifests = append(allManifests, srcManifests...) } - err := handler.SyncManifests(allManifests) + pods = filterHostPortConflicts(pods) + + err := handler.SyncPods(pods) if err != nil { glog.Errorf("Couldn't sync containers : %v", err) } @@ -778,12 +476,12 @@ func (kl *Kubelet) statsFromContainerPath(containerPath string, req *info.Contai } // GetPodInfo returns information from Docker about the containers in a pod -func (kl *Kubelet) GetPodInfo(manifestID string) (api.PodInfo, error) { - return getDockerPodInfo(kl.DockerClient, manifestID) +func (kl *Kubelet) GetPodInfo(podFullName string) (api.PodInfo, error) { + return getDockerPodInfo(kl.DockerClient, podFullName) } // GetContainerInfo returns stats (from Cadvisor) for a container. -func (kl *Kubelet) GetContainerInfo(manifestID, containerName string, req *info.ContainerInfoRequest) (*info.ContainerInfo, error) { +func (kl *Kubelet) GetContainerInfo(podFullName, containerName string, req *info.ContainerInfoRequest) (*info.ContainerInfo, error) { if kl.CadvisorClient == nil { return nil, nil } @@ -791,7 +489,7 @@ func (kl *Kubelet) GetContainerInfo(manifestID, containerName string, req *info. if err != nil { return nil, err } - dockerContainer, found := dockerContainers.FindPodContainer(manifestID, containerName) + dockerContainer, found := dockerContainers.FindPodContainer(podFullName, containerName) if !found { return nil, errors.New("couldn't find container") } diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index cea31dd716e..75a2c739d6a 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -19,8 +19,6 @@ package kubelet import ( "encoding/json" "fmt" - "io/ioutil" - "net/http/httptest" "reflect" "sync" "testing" @@ -28,9 +26,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/health" "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" - "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/volume" - "github.com/coreos/go-etcd/etcd" "github.com/fsouza/go-dockerclient" "github.com/google/cadvisor/info" "github.com/stretchr/testify/mock" @@ -43,29 +39,6 @@ func expectNoError(t *testing.T, err error) { } } -// These are used for testing extract json (below) -type TestData struct { - Value string - Number int -} - -type TestObject struct { - Name string - Data TestData -} - -func verifyStringEquals(t *testing.T, actual, expected string) { - if actual != expected { - t.Errorf("Verification failed. Expected: %s, Found %s", expected, actual) - } -} - -func verifyIntEquals(t *testing.T, actual, expected int) { - if actual != expected { - t.Errorf("Verification failed. Expected: %d, Found %d", expected, actual) - } -} - func verifyNoError(t *testing.T, e error) { if e != nil { t.Errorf("Expected no error, found %#v", e) @@ -91,17 +64,6 @@ func makeTestKubelet(t *testing.T) (*Kubelet, *tools.FakeEtcdClient, *FakeDocker return kubelet, fakeEtcdClient, fakeDocker } -func TestExtractJSON(t *testing.T) { - obj := TestObject{} - kubelet, _, _ := makeTestKubelet(t) - data := `{ "name": "foo", "data": { "value": "bar", "number": 10 } }` - kubelet.ExtractYAMLData([]byte(data), &obj) - - verifyStringEquals(t, obj.Name, "foo") - verifyStringEquals(t, obj.Data.Value, "bar") - verifyIntEquals(t, obj.Data.Number, 10) -} - func verifyCalls(t *testing.T, fakeDocker *FakeDockerClient, calls []string) { verifyStringArrayEquals(t, fakeDocker.called, calls) } @@ -120,14 +82,15 @@ func verifyStringArrayEquals(t *testing.T, actual, expected []string) { } } -func verifyPackUnpack(t *testing.T, manifestID, containerName string) { +func verifyPackUnpack(t *testing.T, podNamespace, podName, containerName string) { name := buildDockerName( - &api.ContainerManifest{ID: manifestID}, + &Pod{Name: podName, Namespace: podNamespace}, &api.Container{Name: containerName}, ) - returnedManifestID, returnedContainerName := parseDockerName(name) - if manifestID != returnedManifestID || containerName != returnedContainerName { - t.Errorf("For (%s, %s), unpacked (%s, %s)", manifestID, containerName, returnedManifestID, returnedContainerName) + podFullName := fmt.Sprintf("%s.%s", podName, podNamespace) + returnedPodFullName, returnedContainerName := parseDockerName(name) + if podFullName != returnedPodFullName || containerName != returnedContainerName { + t.Errorf("For (%s, %s), unpacked (%s, %s)", podFullName, containerName, returnedPodFullName, returnedContainerName) } } @@ -138,11 +101,11 @@ func verifyBoolean(t *testing.T, expected, value bool) { } func TestContainerManifestNaming(t *testing.T) { - verifyPackUnpack(t, "manifest1234", "container5678") - verifyPackUnpack(t, "manifest--", "container__") - verifyPackUnpack(t, "--manifest", "__container") - verifyPackUnpack(t, "m___anifest_", "container-_-") - verifyPackUnpack(t, "_m___anifest", "-_-container") + verifyPackUnpack(t, "file", "manifest1234", "container5678") + verifyPackUnpack(t, "file", "manifest--", "container__") + verifyPackUnpack(t, "file", "--manifest", "__container") + verifyPackUnpack(t, "", "m___anifest_", "container-_-") + verifyPackUnpack(t, "other", "_m___anifest", "-_-container") } func TestGetContainerID(t *testing.T) { @@ -224,39 +187,12 @@ func TestKillContainer(t *testing.T) { verifyCalls(t, fakeDocker, []string{"stop"}) } -func TestResponseToContainersNil(t *testing.T) { - kubelet, _, _ := makeTestKubelet(t) - list, err := kubelet.ResponseToManifests(&etcd.Response{Node: nil}) - if len(list) != 0 { - t.Errorf("Unexpected non-zero list: %#v", list) - } - if err == nil { - t.Error("Unexpected non-error") - } -} - -func TestResponseToManifests(t *testing.T) { - kubelet, _, _ := makeTestKubelet(t) - list, err := kubelet.ResponseToManifests(&etcd.Response{ - Node: &etcd.Node{ - Value: util.MakeJSONString([]api.ContainerManifest{ - {ID: "foo"}, - {ID: "bar"}, - }), - }, - }) - if len(list) != 2 || list[0].ID != "foo" || list[1].ID != "bar" { - t.Errorf("Unexpected list: %#v", list) - } - expectNoError(t, err) -} - type channelReader struct { - list [][]api.ContainerManifest + list [][]Pod wg sync.WaitGroup } -func startReading(channel <-chan manifestUpdate) *channelReader { +func startReading(channel <-chan interface{}) *channelReader { cr := &channelReader{} cr.wg.Add(1) go func() { @@ -265,118 +201,44 @@ func startReading(channel <-chan manifestUpdate) *channelReader { if !ok { break } - cr.list = append(cr.list, update.manifests) + cr.list = append(cr.list, update.(PodUpdate).Pods) } cr.wg.Done() }() return cr } -func (cr *channelReader) GetList() [][]api.ContainerManifest { +func (cr *channelReader) GetList() [][]Pod { cr.wg.Wait() return cr.list } -func TestGetKubeletStateFromEtcdNoData(t *testing.T) { - kubelet, fakeClient, _ := makeTestKubelet(t) - channel := make(chan manifestUpdate) - reader := startReading(channel) - fakeClient.Data["/registry/hosts/machine/kubelet"] = tools.EtcdResponseWithError{ - R: &etcd.Response{}, - E: nil, - } - err := kubelet.getKubeletStateFromEtcd("/registry/hosts/machine/kubelet", channel) - if err == nil { - t.Error("Unexpected no err.") - } - close(channel) - list := reader.GetList() - if len(list) != 0 { - t.Errorf("Unexpected list: %#v", list) - } -} - -func TestGetKubeletStateFromEtcd(t *testing.T) { - kubelet, fakeClient, _ := makeTestKubelet(t) - channel := make(chan manifestUpdate) - reader := startReading(channel) - fakeClient.Data["/registry/hosts/machine/kubelet"] = tools.EtcdResponseWithError{ - R: &etcd.Response{ - Node: &etcd.Node{ - Value: util.MakeJSONString([]api.Container{}), - }, - }, - E: nil, - } - err := kubelet.getKubeletStateFromEtcd("/registry/hosts/machine/kubelet", channel) - expectNoError(t, err) - close(channel) - list := reader.GetList() - if len(list) != 1 { - t.Errorf("Unexpected list: %#v", list) - } -} - -func TestGetKubeletStateFromEtcdNotFound(t *testing.T) { - kubelet, fakeClient, _ := makeTestKubelet(t) - channel := make(chan manifestUpdate) - reader := startReading(channel) - fakeClient.Data["/registry/hosts/machine/kubelet"] = tools.EtcdResponseWithError{ - R: &etcd.Response{}, - E: tools.EtcdErrorNotFound, - } - err := kubelet.getKubeletStateFromEtcd("/registry/hosts/machine/kubelet", channel) - expectNoError(t, err) - close(channel) - list := reader.GetList() - if len(list) != 0 { - t.Errorf("Unexpected list: %#v", list) - } -} - -func TestGetKubeletStateFromEtcdError(t *testing.T) { - kubelet, fakeClient, _ := makeTestKubelet(t) - channel := make(chan manifestUpdate) - reader := startReading(channel) - fakeClient.Data["/registry/hosts/machine/kubelet"] = tools.EtcdResponseWithError{ - R: &etcd.Response{}, - E: &etcd.EtcdError{ - ErrorCode: 200, // non not found error - }, - } - err := kubelet.getKubeletStateFromEtcd("/registry/hosts/machine/kubelet", channel) - if err == nil { - t.Error("Unexpected non-error") - } - close(channel) - list := reader.GetList() - if len(list) != 0 { - t.Errorf("Unexpected list: %#v", list) - } -} - -func TestSyncManifestsDoesNothing(t *testing.T) { +func TestSyncPodsDoesNothing(t *testing.T) { kubelet, _, fakeDocker := makeTestKubelet(t) fakeDocker.containerList = []docker.APIContainers{ { - // format is k8s---- - Names: []string{"/k8s--bar--foo"}, + // format is k8s---- + Names: []string{"/k8s--bar--foo.test"}, ID: "1234", }, { // network container - Names: []string{"/k8s--net--foo--"}, + Names: []string{"/k8s--net--foo.test--"}, ID: "9876", }, } fakeDocker.container = &docker.Container{ ID: "1234", } - err := kubelet.SyncManifests([]api.ContainerManifest{ + err := kubelet.SyncPods([]Pod{ { - ID: "foo", - Containers: []api.Container{ - {Name: "bar"}, + Name: "foo", + Namespace: "test", + Manifest: api.ContainerManifest{ + ID: "foo", + Containers: []api.Container{ + {Name: "bar"}, + }, }, }, }) @@ -384,17 +246,17 @@ func TestSyncManifestsDoesNothing(t *testing.T) { verifyCalls(t, fakeDocker, []string{"list", "list"}) } -func TestSyncManifestsDeletes(t *testing.T) { +func TestSyncPodsDeletes(t *testing.T) { kubelet, _, fakeDocker := makeTestKubelet(t) fakeDocker.containerList = []docker.APIContainers{ { // the k8s prefix is required for the kubelet to manage the container - Names: []string{"/k8s--foo--bar"}, + Names: []string{"/k8s--foo--bar.test"}, ID: "1234", }, { // network container - Names: []string{"/k8s--net--foo--"}, + Names: []string{"/k8s--net--foo.test--"}, ID: "9876", }, { @@ -402,7 +264,7 @@ func TestSyncManifestsDeletes(t *testing.T) { ID: "4567", }, } - err := kubelet.SyncManifests([]api.ContainerManifest{}) + err := kubelet.SyncPods([]Pod{}) expectNoError(t, err) verifyCalls(t, fakeDocker, []string{"list", "list", "stop", "stop"}) @@ -425,29 +287,33 @@ func (f *FalseHealthChecker) HealthCheck(container api.Container) (health.Status return health.Unhealthy, nil } -func TestSyncManifestsUnhealthy(t *testing.T) { +func TestSyncPodsUnhealthy(t *testing.T) { kubelet, _, fakeDocker := makeTestKubelet(t) kubelet.HealthChecker = &FalseHealthChecker{} fakeDocker.containerList = []docker.APIContainers{ { // the k8s prefix is required for the kubelet to manage the container - Names: []string{"/k8s--bar--foo"}, + Names: []string{"/k8s--bar--foo.test"}, ID: "1234", }, { // network container - Names: []string{"/k8s--net--foo--"}, + Names: []string{"/k8s--net--foo.test--"}, ID: "9876", }, } - err := kubelet.SyncManifests([]api.ContainerManifest{ + err := kubelet.SyncPods([]Pod{ { - ID: "foo", - Containers: []api.Container{ - {Name: "bar", - LivenessProbe: &api.LivenessProbe{ - // Always returns healthy == false - Type: "false", + Name: "foo", + Namespace: "test", + Manifest: api.ContainerManifest{ + ID: "foo", + Containers: []api.Container{ + {Name: "bar", + LivenessProbe: &api.LivenessProbe{ + // Always returns healthy == false + Type: "false", + }, }, }, }, @@ -582,14 +448,20 @@ func TestMakeVolumesAndBinds(t *testing.T) { }, } + pod := Pod{ + Name: "pod", + Namespace: "test", + } + podVolumes := make(volumeMap) podVolumes["disk4"] = &volume.HostDirectory{"/mnt/host"} - volumes, binds := makeVolumesAndBinds("pod", &container, podVolumes) + volumes, binds := makeVolumesAndBinds(&pod, &container, podVolumes) expectedVolumes := []string{"/mnt/path", "/mnt/path2"} - expectedBinds := []string{"/exports/pod/disk:/mnt/path", "/exports/pod/disk2:/mnt/path2:ro", "/mnt/path3:/mnt/path3", + expectedBinds := []string{"/exports/pod.test/disk:/mnt/path", "/exports/pod.test/disk2:/mnt/path2:ro", "/mnt/path3:/mnt/path3", "/mnt/host:/mnt/path4"} + if len(volumes) != len(expectedVolumes) { t.Errorf("Unexpected volumes. Expected %#v got %#v. Container was: %#v", expectedVolumes, volumes, container) } @@ -669,274 +541,29 @@ func TestMakePortsAndBindings(t *testing.T) { } func TestCheckHostPortConflicts(t *testing.T) { - successCaseAll := []api.ContainerManifest{ - {Containers: []api.Container{{Ports: []api.Port{{HostPort: 80}}}}}, - {Containers: []api.Container{{Ports: []api.Port{{HostPort: 81}}}}}, - {Containers: []api.Container{{Ports: []api.Port{{HostPort: 82}}}}}, + successCaseAll := []Pod{ + {Manifest: api.ContainerManifest{Containers: []api.Container{{Ports: []api.Port{{HostPort: 80}}}}}}, + {Manifest: api.ContainerManifest{Containers: []api.Container{{Ports: []api.Port{{HostPort: 81}}}}}}, + {Manifest: api.ContainerManifest{Containers: []api.Container{{Ports: []api.Port{{HostPort: 82}}}}}}, } - successCaseNew := api.ContainerManifest{ - Containers: []api.Container{{Ports: []api.Port{{HostPort: 83}}}}, + successCaseNew := Pod{ + Manifest: api.ContainerManifest{Containers: []api.Container{{Ports: []api.Port{{HostPort: 83}}}}}, } - if errs := checkHostPortConflicts(successCaseAll, &successCaseNew); len(errs) != 0 { - t.Errorf("Expected success: %v", errs) + expected := append(successCaseAll, successCaseNew) + if actual := filterHostPortConflicts(expected); !reflect.DeepEqual(actual, expected) { + t.Errorf("Expected %#v, Got %#v", expected, actual) } - failureCaseAll := []api.ContainerManifest{ - {Containers: []api.Container{{Ports: []api.Port{{HostPort: 80}}}}}, - {Containers: []api.Container{{Ports: []api.Port{{HostPort: 81}}}}}, - {Containers: []api.Container{{Ports: []api.Port{{HostPort: 82}}}}}, + failureCaseAll := []Pod{ + {Manifest: api.ContainerManifest{Containers: []api.Container{{Ports: []api.Port{{HostPort: 80}}}}}}, + {Manifest: api.ContainerManifest{Containers: []api.Container{{Ports: []api.Port{{HostPort: 81}}}}}}, + {Manifest: api.ContainerManifest{Containers: []api.Container{{Ports: []api.Port{{HostPort: 82}}}}}}, } - failureCaseNew := api.ContainerManifest{ - Containers: []api.Container{{Ports: []api.Port{{HostPort: 81}}}}, + failureCaseNew := Pod{ + Manifest: api.ContainerManifest{Containers: []api.Container{{Ports: []api.Port{{HostPort: 81}}}}}, } - if errs := checkHostPortConflicts(failureCaseAll, &failureCaseNew); len(errs) == 0 { - t.Errorf("Expected failure") - } -} - -func TestExtractFromNonExistentFile(t *testing.T) { - kubelet := New() - _, err := kubelet.extractFromFile("/some/fake/file") - if err == nil { - t.Error("Unexpected non-error.") - } -} - -func TestExtractFromBadDataFile(t *testing.T) { - kubelet := New() - - badData := []byte{1, 2, 3} - file, err := ioutil.TempFile("", "foo") - expectNoError(t, err) - name := file.Name() - file.Close() - ioutil.WriteFile(name, badData, 0755) - _, err = kubelet.extractFromFile(name) - - if err == nil { - t.Error("Unexpected non-error.") - } -} - -func TestExtractFromValidDataFile(t *testing.T) { - kubelet := New() - - manifest := api.ContainerManifest{ID: "bar"} - data, err := json.Marshal(manifest) - expectNoError(t, err) - file, err := ioutil.TempFile("", "foo") - expectNoError(t, err) - name := file.Name() - expectNoError(t, file.Close()) - ioutil.WriteFile(name, data, 0755) - - read, err := kubelet.extractFromFile(name) - expectNoError(t, err) - if !reflect.DeepEqual(read, manifest) { - t.Errorf("Unexpected difference. Expected %#v, got %#v", manifest, read) - } -} - -func TestExtractFromEmptyDir(t *testing.T) { - kubelet := New() - - dirName, err := ioutil.TempDir("", "foo") - expectNoError(t, err) - - _, err = kubelet.extractFromDir(dirName) - expectNoError(t, err) -} - -func TestExtractFromDir(t *testing.T) { - kubelet := New() - - manifests := []api.ContainerManifest{ - {ID: "aaaa"}, - {ID: "bbbb"}, - } - - dirName, err := ioutil.TempDir("", "foo") - expectNoError(t, err) - - for _, manifest := range manifests { - data, err := json.Marshal(manifest) - expectNoError(t, err) - file, err := ioutil.TempFile(dirName, manifest.ID) - expectNoError(t, err) - name := file.Name() - expectNoError(t, file.Close()) - ioutil.WriteFile(name, data, 0755) - } - - read, err := kubelet.extractFromDir(dirName) - expectNoError(t, err) - if !reflect.DeepEqual(read, manifests) { - t.Errorf("Unexpected difference. Expected %#v, got %#v", manifests, read) - } -} - -func TestExtractFromHttpBadness(t *testing.T) { - kubelet := New() - updateChannel := make(chan manifestUpdate) - reader := startReading(updateChannel) - - err := kubelet.extractFromHTTP("http://localhost:12345", updateChannel) - if err == nil { - t.Error("Unexpected non-error.") - } - close(updateChannel) - list := reader.GetList() - - if len(list) != 0 { - t.Errorf("Unexpected list: %#v", list) - } -} - -func TestExtractFromHttpSingle(t *testing.T) { - kubelet := New() - updateChannel := make(chan manifestUpdate) - reader := startReading(updateChannel) - - manifests := []api.ContainerManifest{ - {Version: "v1beta1", ID: "foo"}, - } - // Taking a single-manifest from a URL allows kubelet to be used - // in the implementation of google's container VM image. - data, err := json.Marshal(manifests[0]) - - fakeHandler := util.FakeHandler{ - StatusCode: 200, - ResponseBody: string(data), - } - testServer := httptest.NewServer(&fakeHandler) - - err = kubelet.extractFromHTTP(testServer.URL, updateChannel) - if err != nil { - t.Errorf("Unexpected error: %#v", err) - } - close(updateChannel) - - read := reader.GetList() - - if len(read) != 1 { - t.Errorf("Unexpected list: %#v", read) - return - } - if !reflect.DeepEqual(manifests, read[0]) { - t.Errorf("Unexpected difference. Expected: %#v, Saw: %#v", manifests, read[0]) - } -} - -func TestExtractFromHttpMultiple(t *testing.T) { - kubelet := New() - updateChannel := make(chan manifestUpdate) - reader := startReading(updateChannel) - - manifests := []api.ContainerManifest{ - {Version: "v1beta1", ID: "foo"}, - {Version: "v1beta1", ID: "bar"}, - } - data, err := json.Marshal(manifests) - if err != nil { - t.Fatalf("Some weird json problem: %v", err) - } - - t.Logf("Serving: %v", string(data)) - - fakeHandler := util.FakeHandler{ - StatusCode: 200, - ResponseBody: string(data), - } - testServer := httptest.NewServer(&fakeHandler) - - err = kubelet.extractFromHTTP(testServer.URL, updateChannel) - if err != nil { - t.Errorf("Unexpected error: %#v", err) - } - close(updateChannel) - - read := reader.GetList() - - if len(read) != 1 { - t.Errorf("Unexpected list: %#v", read) - return - } - if !reflect.DeepEqual(manifests, read[0]) { - t.Errorf("Unexpected difference. Expected: %#v, Saw: %#v", manifests, read[0]) - } -} - -func TestExtractFromHttpEmptyArray(t *testing.T) { - kubelet := New() - updateChannel := make(chan manifestUpdate) - reader := startReading(updateChannel) - - manifests := []api.ContainerManifest{} - data, err := json.Marshal(manifests) - if err != nil { - t.Fatalf("Some weird json problem: %v", err) - } - - t.Logf("Serving: %v", string(data)) - - fakeHandler := util.FakeHandler{ - StatusCode: 200, - ResponseBody: string(data), - } - testServer := httptest.NewServer(&fakeHandler) - - err = kubelet.extractFromHTTP(testServer.URL, updateChannel) - if err != nil { - t.Errorf("Unexpected error: %#v", err) - } - close(updateChannel) - - read := reader.GetList() - - if len(read) != 1 { - t.Errorf("Unexpected list: %#v", read) - return - } - if len(read[0]) != 0 { - t.Errorf("Unexpected manifests: %#v", read[0]) - } -} - -func TestWatchEtcd(t *testing.T) { - watchChannel := make(chan *etcd.Response) - updateChannel := make(chan manifestUpdate) - kubelet := New() - reader := startReading(updateChannel) - - manifest := []api.ContainerManifest{ - { - ID: "foo", - }, - } - data, err := json.Marshal(manifest) - expectNoError(t, err) - - var wg sync.WaitGroup - wg.Add(1) - go func() { - kubelet.WatchEtcd(watchChannel, updateChannel) - wg.Done() - }() - - watchChannel <- &etcd.Response{ - Node: &etcd.Node{ - Value: string(data), - }, - } - close(watchChannel) - wg.Wait() - close(updateChannel) - - read := reader.GetList() - if len(read) != 1 { - t.Errorf("Expected number of results: %v", len(read)) - } else if !reflect.DeepEqual(read[0], manifest) { - t.Errorf("Unexpected manifest(s) %#v %#v", read[0], manifest) + if actual := filterHostPortConflicts(append(failureCaseAll, failureCaseNew)); !reflect.DeepEqual(failureCaseAll, actual) { + t.Errorf("Expected %#v, Got %#v", expected, actual) } } diff --git a/pkg/kubelet/server.go b/pkg/kubelet/server.go index 78d8a8ffa04..2b0f93f8482 100644 --- a/pkg/kubelet/server.go +++ b/pkg/kubelet/server.go @@ -22,28 +22,49 @@ import ( "fmt" "io" "io/ioutil" + "net" "net/http" "net/url" "path" + "strconv" "strings" + "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/httplog" + "github.com/golang/glog" "github.com/google/cadvisor/info" "gopkg.in/v1/yaml" ) // Server is a http.Handler which exposes kubelet functionality over HTTP. type Server struct { - Kubelet kubeletInterface - UpdateChannel chan<- manifestUpdate - DelegateHandler http.Handler + host HostInterface + updates chan<- interface{} + handler http.Handler } -// kubeletInterface contains all the kubelet methods required by the server. +func ListenAndServeKubeletServer(host HostInterface, updates chan<- interface{}, delegate http.Handler, address string, port uint) { + glog.Infof("Starting to listen on %s:%d", address, port) + handler := Server{ + host: host, + updates: updates, + handler: delegate, + } + s := &http.Server{ + Addr: net.JoinHostPort(address, strconv.FormatUint(uint64(port), 10)), + Handler: &handler, + ReadTimeout: 10 * time.Second, + WriteTimeout: 10 * time.Second, + MaxHeaderBytes: 1 << 20, + } + s.ListenAndServe() +} + +// HostInterface contains all the kubelet methods required by the server. // For testablitiy. -type kubeletInterface interface { - GetContainerInfo(podID, containerName string, req *info.ContainerInfoRequest) (*info.ContainerInfo, error) +type HostInterface interface { + GetContainerInfo(podFullName, containerName string, req *info.ContainerInfoRequest) (*info.ContainerInfo, error) GetRootInfo(req *info.ContainerInfoRequest) (*info.ContainerInfo, error) GetMachineInfo() (*info.MachineInfo, error) GetPodInfo(name string) (api.PodInfo, error) @@ -78,13 +99,15 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, req *http.Request) { } if u.Path == "/container" { // This is to provide backward compatibility. It only supports a single manifest - var manifest api.ContainerManifest - err = yaml.Unmarshal(data, &manifest) + var pod Pod + err = yaml.Unmarshal(data, &pod.Manifest) if err != nil { s.error(w, err) return } - s.UpdateChannel <- manifestUpdate{httpServerSource, []api.ContainerManifest{manifest}} + //TODO: sha1 of manifest? + pod.Name = "1" + s.updates <- PodUpdate{[]Pod{pod}, SET} } else if u.Path == "/containers" { var manifests []api.ContainerManifest err = yaml.Unmarshal(data, &manifests) @@ -92,15 +115,23 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, req *http.Request) { s.error(w, err) return } - s.UpdateChannel <- manifestUpdate{httpServerSource, manifests} + pods := make([]Pod, len(manifests)) + for i := range manifests { + pods[i].Name = fmt.Sprintf("%d", i+1) + pods[i].Manifest = manifests[i] + } + s.updates <- PodUpdate{pods, SET} } case u.Path == "/podInfo": podID := u.Query().Get("podID") if len(podID) == 0 { + w.WriteHeader(http.StatusBadRequest) http.Error(w, "Missing 'podID=' query entry.", http.StatusBadRequest) return } - info, err := s.Kubelet.GetPodInfo(podID) + // TODO: backwards compatibility with existing API, needs API change + podFullName := GetPodFullName(&Pod{Name: podID, Namespace: "etcd"}) + info, err := s.host.GetPodInfo(podFullName) if err == ErrNoContainersInPod { http.Error(w, "Pod does not exist", http.StatusNotFound) return @@ -120,7 +151,7 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, req *http.Request) { case strings.HasPrefix(u.Path, "/stats"): s.serveStats(w, req) case strings.HasPrefix(u.Path, "/spec"): - info, err := s.Kubelet.GetMachineInfo() + info, err := s.host.GetMachineInfo() if err != nil { s.error(w, err) return @@ -133,14 +164,16 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, req *http.Request) { w.Header().Add("Content-type", "application/json") w.Write(data) case strings.HasPrefix(u.Path, "/logs/"): - s.Kubelet.ServeLogs(w, req) + s.host.ServeLogs(w, req) default: - s.DelegateHandler.ServeHTTP(w, req) + if s.handler != nil { + s.handler.ServeHTTP(w, req) + } } } func (s *Server) serveStats(w http.ResponseWriter, req *http.Request) { - // /stats// + // /stats// components := strings.Split(strings.TrimPrefix(path.Clean(req.URL.Path), "/"), "/") var stats *info.ContainerInfo var err error @@ -153,13 +186,13 @@ func (s *Server) serveStats(w http.ResponseWriter, req *http.Request) { switch len(components) { case 1: // Machine stats - stats, err = s.Kubelet.GetRootInfo(&query) + stats, err = s.host.GetRootInfo(&query) case 2: // pod stats // TODO(monnand) Implement this errors.New("pod level status currently unimplemented") case 3: - stats, err = s.Kubelet.GetContainerInfo(components[1], components[2], &query) + stats, err = s.host.GetContainerInfo(components[1], components[2], &query) default: http.Error(w, "unknown resource.", http.StatusNotFound) return diff --git a/pkg/kubelet/server_test.go b/pkg/kubelet/server_test.go index 0f838bc1a2e..42493516663 100644 --- a/pkg/kubelet/server_test.go +++ b/pkg/kubelet/server_test.go @@ -36,7 +36,7 @@ import ( type fakeKubelet struct { infoFunc func(name string) (api.PodInfo, error) - containerInfoFunc func(podID, containerName string, req *info.ContainerInfoRequest) (*info.ContainerInfo, error) + containerInfoFunc func(podFullName, containerName string, req *info.ContainerInfoRequest) (*info.ContainerInfo, error) rootInfoFunc func(query *info.ContainerInfoRequest) (*info.ContainerInfo, error) machineInfoFunc func() (*info.MachineInfo, error) logFunc func(w http.ResponseWriter, req *http.Request) @@ -46,8 +46,8 @@ func (fk *fakeKubelet) GetPodInfo(name string) (api.PodInfo, error) { return fk.infoFunc(name) } -func (fk *fakeKubelet) GetContainerInfo(podID, containerName string, req *info.ContainerInfoRequest) (*info.ContainerInfo, error) { - return fk.containerInfoFunc(podID, containerName, req) +func (fk *fakeKubelet) GetContainerInfo(podFullName, containerName string, req *info.ContainerInfoRequest) (*info.ContainerInfo, error) { + return fk.containerInfoFunc(podFullName, containerName, req) } func (fk *fakeKubelet) GetRootInfo(req *info.ContainerInfoRequest) (*info.ContainerInfo, error) { @@ -63,7 +63,7 @@ func (fk *fakeKubelet) ServeLogs(w http.ResponseWriter, req *http.Request) { } type serverTestFramework struct { - updateChan chan manifestUpdate + updateChan chan interface{} updateReader *channelReader serverUnderTest *Server fakeKubelet *fakeKubelet @@ -72,13 +72,13 @@ type serverTestFramework struct { func makeServerTest() *serverTestFramework { fw := &serverTestFramework{ - updateChan: make(chan manifestUpdate), + updateChan: make(chan interface{}), } fw.updateReader = startReading(fw.updateChan) fw.fakeKubelet = &fakeKubelet{} fw.serverUnderTest = &Server{ - Kubelet: fw.fakeKubelet, - UpdateChannel: fw.updateChan, + host: fw.fakeKubelet, + updates: fw.updateChan, } fw.testHTTPServer = httptest.NewServer(fw.serverUnderTest) return fw @@ -106,8 +106,9 @@ func TestContainer(t *testing.T) { if len(received) != 1 { t.Errorf("Expected 1 manifest, but got %v", len(received)) } - if !reflect.DeepEqual(expected, received[0]) { - t.Errorf("Expected %#v, but got %#v", expected, received[0]) + expectedPods := []Pod{Pod{Name: "1", Manifest: expected[0]}} + if !reflect.DeepEqual(expectedPods, received[0]) { + t.Errorf("Expected %#v, but got %#v", expectedPods, received[0]) } } @@ -128,8 +129,9 @@ func TestContainers(t *testing.T) { if len(received) != 1 { t.Errorf("Expected 1 update, but got %v", len(received)) } - if !reflect.DeepEqual(expected, received[0]) { - t.Errorf("Expected %#v, but got %#v", expected, received[0]) + expectedPods := []Pod{Pod{Name: "1", Manifest: expected[0]}, Pod{Name: "2", Manifest: expected[1]}} + if !reflect.DeepEqual(expectedPods, received[0]) { + t.Errorf("Expected %#v, but got %#v", expectedPods, received[0]) } } @@ -137,10 +139,10 @@ func TestPodInfo(t *testing.T) { fw := makeServerTest() expected := api.PodInfo{"goodpod": docker.Container{ID: "myContainerID"}} fw.fakeKubelet.infoFunc = func(name string) (api.PodInfo, error) { - if name == "goodpod" { + if name == "goodpod.etcd" { return expected, nil } - return nil, fmt.Errorf("bad pod") + return nil, fmt.Errorf("bad pod %s", name) } resp, err := http.Get(fw.testHTTPServer.URL + "/podInfo?podID=goodpod") if err != nil {