From d63162b7e7cdbe0d902e9a10ae1fa712e003d34c Mon Sep 17 00:00:00 2001 From: Tim Hockin Date: Wed, 21 Jan 2015 21:57:14 -0800 Subject: [PATCH] Make kubelet's HTTP source go through conversion --- pkg/kubelet/config/file_test.go | 93 ++++++++++++++++++++++++++++ pkg/kubelet/config/http.go | 105 +++++++++++++++++++++----------- pkg/kubelet/config/http_test.go | 50 ++++++++++++--- 3 files changed, 207 insertions(+), 41 deletions(-) diff --git a/pkg/kubelet/config/file_test.go b/pkg/kubelet/config/file_test.go index 34f5332bec6..ade512aa7aa 100644 --- a/pkg/kubelet/config/file_test.go +++ b/pkg/kubelet/config/file_test.go @@ -170,6 +170,99 @@ func TestReadFromFile(t *testing.T) { } } +func TestReadFromFileWithoutID(t *testing.T) { + file := writeTestFile(t, os.TempDir(), "test_pod_config", + `{ + "version": "v1beta1", + "uuid": "12345", + "containers": [{ "image": "test/image", imagePullPolicy: "PullAlways"}] + }`) + defer os.Remove(file.Name()) + + ch := make(chan interface{}) + NewSourceFile(file.Name(), time.Millisecond, ch) + select { + case got := <-ch: + update := got.(kubelet.PodUpdate) + expected := CreatePodUpdate(kubelet.SET, kubelet.FileSource, api.BoundPod{ + ObjectMeta: api.ObjectMeta{ + Name: "", + UID: "12345", + Namespace: "", + SelfLink: "", + }, + Spec: api.PodSpec{ + Containers: []api.Container{ + { + Image: "test/image", + TerminationMessagePath: "/dev/termination-log", + ImagePullPolicy: api.PullAlways, + }, + }, + }, + }) + + if len(update.Pods[0].ObjectMeta.Name) == 0 { + t.Errorf("Name did not get defaulted") + } + update.Pods[0].ObjectMeta.Name = "" + update.Pods[0].ObjectMeta.Namespace = "" + update.Pods[0].ObjectMeta.SelfLink = "" + + if !api.Semantic.DeepEqual(expected, update) { + t.Fatalf("Expected %#v, Got %#v", expected, update) + } + + case <-time.After(2 * time.Millisecond): + t.Errorf("Expected update, timeout instead") + } +} + +func TestReadV1Beta2FromFile(t *testing.T) { + file := writeTestFile(t, os.TempDir(), "test_pod_config", + `{ + "version": "v1beta2", + "uuid": "12345", + "id": "test", + "containers": [{ "image": "test/image", imagePullPolicy: "PullAlways"}] + }`) + defer os.Remove(file.Name()) + + ch := make(chan interface{}) + NewSourceFile(file.Name(), time.Millisecond, ch) + select { + case got := <-ch: + update := got.(kubelet.PodUpdate) + expected := CreatePodUpdate(kubelet.SET, kubelet.FileSource, api.BoundPod{ + ObjectMeta: api.ObjectMeta{ + Name: "test", + UID: "12345", + Namespace: "", + SelfLink: "", + }, + Spec: api.PodSpec{ + Containers: []api.Container{ + { + Image: "test/image", + TerminationMessagePath: "/dev/termination-log", + ImagePullPolicy: api.PullAlways, + }, + }, + }, + }) + + update.Pods[0].ObjectMeta.Namespace = "" + update.Pods[0].ObjectMeta.SelfLink = "" + + if !api.Semantic.DeepEqual(expected, update) { + t.Fatalf("Expected %#v, Got %#v", expected, update) + } + + case <-time.After(2 * time.Millisecond): + t.Errorf("Expected update, timeout instead") + } +} + func TestReadFromFileWithDefaults(t *testing.T) { file := writeTestFile(t, os.TempDir(), "test_pod_config", `{ diff --git a/pkg/kubelet/config/http.go b/pkg/kubelet/config/http.go index a57cc446ef4..fb30489e867 100644 --- a/pkg/kubelet/config/http.go +++ b/pkg/kubelet/config/http.go @@ -28,6 +28,7 @@ import ( "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/v1beta1" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/validation" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet" "github.com/GoogleCloudPlatform/kubernetes/pkg/types" @@ -84,40 +85,25 @@ func (s *sourceURL) extractFromURL() error { s.data = data // First try as if it's a single manifest - var manifest api.ContainerManifest - // TODO: should be api.Scheme.Decode - singleErr := yaml.Unmarshal(data, &manifest) - if singleErr == nil { - if errs := validation.ValidateManifest(&manifest); len(errs) > 0 { - singleErr = fmt.Errorf("invalid manifest: %v", errs) - } - } - if singleErr == nil { - pod := api.BoundPod{} - if err := api.Scheme.Convert(&manifest, &pod); err != nil { - return err + parsed, manifest, pod, singleErr := tryDecodeSingle(data) + if parsed { + if singleErr != nil { + // It parsed but could not be used. + return singleErr } + // It parsed! applyDefaults(&pod, s.url) s.updates <- kubelet.PodUpdate{[]api.BoundPod{pod}, kubelet.SET, kubelet.HTTPSource} return nil } // That didn't work, so try an array of manifests. - var manifests []api.ContainerManifest - // TODO: should be api.Scheme.Decode - multiErr := yaml.Unmarshal(data, &manifests) - // We're not sure if the person reading the logs is going to care about the single or - // multiple manifest unmarshalling attempt, so we need to put both in the logs, as is - // done at the end. Hence not returning early here. - if multiErr == nil { - for _, manifest := range manifests { - if errs := validation.ValidateManifest(&manifest); len(errs) > 0 { - multiErr = fmt.Errorf("invalid manifest: %v", errs) - break - } + parsed, manifests, pods, multiErr := tryDecodeList(data) + if parsed { + if multiErr != nil { + // It parsed but could not be used. + return multiErr } - } - if multiErr == nil { // A single manifest that did not pass semantic validation will yield an empty // array of manifests (and no error) when unmarshaled as such. In that case, // if the single manifest at least had a Version, we return the single-manifest @@ -125,16 +111,12 @@ func (s *sourceURL) extractFromURL() error { if len(manifests) == 0 && len(manifest.Version) != 0 { return singleErr } - list := api.ContainerManifestList{Items: manifests} - boundPods := &api.BoundPods{} - if err := api.Scheme.Convert(&list, boundPods); err != nil { - return err - } - for i := range boundPods.Items { - pod := &boundPods.Items[i] + // Assume it parsed. + for i := range pods.Items { + pod := &pods.Items[i] applyDefaults(pod, s.url) } - s.updates <- kubelet.PodUpdate{boundPods.Items, kubelet.SET, kubelet.HTTPSource} + s.updates <- kubelet.PodUpdate{pods.Items, kubelet.SET, kubelet.HTTPSource} return nil } @@ -143,6 +125,61 @@ func (s *sourceURL) extractFromURL() error { s.url, string(data), singleErr, manifest, multiErr, manifests) } +func tryDecodeSingle(data []byte) (parsed bool, manifest v1beta1.ContainerManifest, pod api.BoundPod, err error) { + // TODO: should be api.Scheme.Decode + // This is awful. DecodeInto() expects to find an APIObject, which + // Manifest is not. We keep reading manifest for now for compat, but + // we will eventually change it to read Pod (at which point this all + // becomes nicer). Until then, we assert that the ContainerManifest + // structure on disk is always v1beta1. Read that, convert it to a + // "current" ContainerManifest (should be ~identical), then convert + // that to a BoundPod (which is a well-understood conversion). This + // avoids writing a v1beta1.ContainerManifest -> api.BoundPod + // conversion which would be identical to the api.ContainerManifest -> + // api.BoundPod conversion. + if err = yaml.Unmarshal(data, &manifest); err != nil { + return false, manifest, pod, err + } + newManifest := api.ContainerManifest{} + if err = api.Scheme.Convert(&manifest, &newManifest); err != nil { + return false, manifest, pod, err + } + if errs := validation.ValidateManifest(&newManifest); len(errs) > 0 { + err = fmt.Errorf("invalid manifest: %v", errs) + return false, manifest, pod, err + } + if err = api.Scheme.Convert(&newManifest, &pod); err != nil { + return true, manifest, pod, err + } + // Success. + return true, manifest, pod, nil +} + +func tryDecodeList(data []byte) (parsed bool, manifests []v1beta1.ContainerManifest, pods api.BoundPods, err error) { + // TODO: should be api.Scheme.Decode + // See the comment in tryDecodeSingle(). + if err = yaml.Unmarshal(data, &manifests); err != nil { + return false, manifests, pods, err + } + newManifests := []api.ContainerManifest{} + if err = api.Scheme.Convert(&manifests, &newManifests); err != nil { + return false, manifests, pods, err + } + for i := range newManifests { + manifest := &newManifests[i] + if errs := validation.ValidateManifest(manifest); len(errs) > 0 { + err = fmt.Errorf("invalid manifest: %v", errs) + return false, manifests, pods, err + } + } + list := api.ContainerManifestList{Items: newManifests} + if err = api.Scheme.Convert(&list, &pods); err != nil { + return true, manifests, pods, err + } + // Success. + return true, manifests, pods, nil +} + func applyDefaults(pod *api.BoundPod, url string) { if len(pod.UID) == 0 { hasher := md5.New() diff --git a/pkg/kubelet/config/http_test.go b/pkg/kubelet/config/http_test.go index 38f157072e9..5036ab55db8 100644 --- a/pkg/kubelet/config/http_test.go +++ b/pkg/kubelet/config/http_test.go @@ -24,6 +24,7 @@ import ( "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/v1beta1" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/validation" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" @@ -122,8 +123,9 @@ func TestExtractFromHTTP(t *testing.T) { expected kubelet.PodUpdate }{ { - desc: "Single manifest", - manifests: api.ContainerManifest{Version: "v1beta1", ID: "foo", UUID: "111"}, + desc: "Single manifest", + manifests: v1beta1.ContainerManifest{Version: "v1beta1", ID: "foo", UUID: "111", + Containers: []v1beta1.Container{{Name: "1", Image: "foo", ImagePullPolicy: v1beta1.PullAlways}}}, expected: CreatePodUpdate(kubelet.SET, kubelet.HTTPSource, api.BoundPod{ @@ -135,6 +137,11 @@ func TestExtractFromHTTP(t *testing.T) { Spec: api.PodSpec{ RestartPolicy: api.RestartPolicy{Always: &api.RestartPolicyAlways{}}, DNSPolicy: api.DNSClusterFirst, + Containers: []api.Container{{ + Name: "1", + Image: "foo", + TerminationMessagePath: "/dev/termination-log", + ImagePullPolicy: "Always"}}, }, }), }, @@ -155,11 +162,36 @@ func TestExtractFromHTTP(t *testing.T) { }, }), }, + { + desc: "Single manifest with v1beta2", + manifests: v1beta1.ContainerManifest{Version: "v1beta2", ID: "foo", UUID: "111", + Containers: []v1beta1.Container{{Name: "1", Image: "foo", ImagePullPolicy: v1beta1.PullAlways}}}, + expected: CreatePodUpdate(kubelet.SET, + kubelet.HTTPSource, + api.BoundPod{ + ObjectMeta: api.ObjectMeta{ + UID: "111", + Name: "foo", + Namespace: "foobar", + }, + Spec: api.PodSpec{ + RestartPolicy: api.RestartPolicy{Always: &api.RestartPolicyAlways{}}, + DNSPolicy: api.DNSClusterFirst, + Containers: []api.Container{{ + Name: "1", + Image: "foo", + TerminationMessagePath: "/dev/termination-log", + ImagePullPolicy: "Always"}}, + }, + }), + }, { desc: "Multiple manifests", - manifests: []api.ContainerManifest{ - {Version: "v1beta1", ID: "foo", UUID: "111", Containers: []api.Container{{Name: "1", Image: "foo"}}}, - {Version: "v1beta1", ID: "bar", UUID: "222", Containers: []api.Container{{Name: "1", Image: "foo"}}}, + manifests: []v1beta1.ContainerManifest{ + {Version: "v1beta1", ID: "foo", UUID: "111", + Containers: []v1beta1.Container{{Name: "1", Image: "foo", ImagePullPolicy: v1beta1.PullAlways}}}, + {Version: "v1beta1", ID: "bar", UUID: "222", + Containers: []v1beta1.Container{{Name: "1", Image: "foo", ImagePullPolicy: ""}}}, }, expected: CreatePodUpdate(kubelet.SET, kubelet.HTTPSource, @@ -170,11 +202,13 @@ func TestExtractFromHTTP(t *testing.T) { Namespace: "foobar", }, Spec: api.PodSpec{ + RestartPolicy: api.RestartPolicy{Always: &api.RestartPolicyAlways{}}, + DNSPolicy: api.DNSClusterFirst, Containers: []api.Container{{ Name: "1", Image: "foo", TerminationMessagePath: "/dev/termination-log", - ImagePullPolicy: "IfNotPresent"}}, + ImagePullPolicy: "Always"}}, }, }, api.BoundPod{ @@ -184,6 +218,8 @@ func TestExtractFromHTTP(t *testing.T) { Namespace: "foobar", }, Spec: api.PodSpec{ + RestartPolicy: api.RestartPolicy{Always: &api.RestartPolicyAlways{}}, + DNSPolicy: api.DNSClusterFirst, Containers: []api.Container{{ Name: "1", Image: "foo", @@ -194,7 +230,7 @@ func TestExtractFromHTTP(t *testing.T) { }, { desc: "Empty Array", - manifests: []api.ContainerManifest{}, + manifests: []v1beta1.ContainerManifest{}, expected: CreatePodUpdate(kubelet.SET, kubelet.HTTPSource), }, }