From c512c549a154a69779c4cf20d0fc9e0aef90607a Mon Sep 17 00:00:00 2001 From: Wojciech Tyczynski Date: Wed, 18 Mar 2015 15:27:44 +0100 Subject: [PATCH] Kubelet http read both ContainerManifest and Pod --- pkg/kubelet/config/http.go | 85 ++++++++++++++- pkg/kubelet/config/http_test.go | 186 +++++++++++++++++++++++++++++++- 2 files changed, 265 insertions(+), 6 deletions(-) diff --git a/pkg/kubelet/config/http.go b/pkg/kubelet/config/http.go index 54ead0c0e74..68d6e807d28 100644 --- a/pkg/kubelet/config/http.go +++ b/pkg/kubelet/config/http.go @@ -123,9 +123,36 @@ func (s *sourceURL) extractFromURL() error { return nil } - return fmt.Errorf("%v: received '%v', but couldn't parse as a "+ - "single manifest (%v: %+v) or as multiple manifests (%v: %+v).\n", - s.url, string(data), singleErr, manifest, multiErr, manifests) + // Parsing it as ContainerManifest(s) failed. + // Try to parse it as Pod(s). + + // First try as it is a single pod. + parsed, pod, singlePodErr := tryDecodeSinglePod(data, s.url) + if parsed { + if singlePodErr != nil { + // It parsed but could not be used. + return singlePodErr + } + s.updates <- kubelet.PodUpdate{[]api.Pod{pod}, kubelet.SET, kubelet.HTTPSource} + return nil + } + + // That didn't work, so try a list of pods. + parsed, pods, multiPodErr := tryDecodePodList(data, s.url) + if parsed { + if multiPodErr != nil { + // It parsed but could not be used. + return multiPodErr + } + s.updates <- kubelet.PodUpdate{pods.Items, kubelet.SET, kubelet.HTTPSource} + return nil + } + + return fmt.Errorf("%v: received '%v', but couldn't parse as neither "+ + "single (%v: %+v) or multiple manifests (%v: %+v) nor "+ + "single (%v) or multiple pods (%v).\n", + s.url, string(data), singleErr, manifest, multiErr, manifests, + singlePodErr, multiPodErr) } func tryDecodeSingle(data []byte) (parsed bool, manifest v1beta1.ContainerManifest, pod api.Pod, err error) { @@ -183,6 +210,53 @@ func tryDecodeList(data []byte) (parsed bool, manifests []v1beta1.ContainerManif return true, manifests, pods, nil } +func tryDecodeSinglePod(data []byte, url string) (parsed bool, pod api.Pod, err error) { + obj, err := api.Scheme.Decode(data) + if err != nil { + return false, pod, err + } + // Check whether the object could be converted to single pod. + if _, ok := obj.(*api.Pod); !ok { + err = fmt.Errorf("invalid pod: %+v", obj) + return false, pod, err + } + newPod := obj.(*api.Pod) + // Apply default values and validate the pod. + if err = applyDefaults(newPod, url); err != nil { + return true, pod, err + } + if errs := validation.ValidatePod(newPod); len(errs) > 0 { + err = fmt.Errorf("invalid pod: %v", errs) + return true, pod, err + } + return true, *newPod, nil +} + +func tryDecodePodList(data []byte, url string) (parsed bool, pods api.PodList, err error) { + obj, err := api.Scheme.Decode(data) + if err != nil { + return false, pods, err + } + // Check whether the object could be converted to list of pods. + if _, ok := obj.(*api.PodList); !ok { + err = fmt.Errorf("invalid pods list: %+v", obj) + return false, pods, err + } + newPods := obj.(*api.PodList) + // Apply default values and validate pods. + for i := range newPods.Items { + newPod := &newPods.Items[i] + if err = applyDefaults(newPod, url); err != nil { + return true, pods, err + } + if errs := validation.ValidatePod(newPod); len(errs) > 0 { + err = fmt.Errorf("invalid pod: %v", errs) + return true, pods, err + } + } + return true, *newPods, err +} + func applyDefaults(pod *api.Pod, url string) error { if len(pod.UID) == 0 { hasher := md5.New() @@ -203,8 +277,9 @@ func applyDefaults(pod *api.Pod, url string) error { } glog.V(5).Infof("Generated Name %q for UID %q from URL %s", pod.Name, pod.UID, url) - // Always overrides the namespace. - pod.Namespace = kubelet.NamespaceDefault + if pod.Namespace == "" { + pod.Namespace = kubelet.NamespaceDefault + } glog.V(5).Infof("Using namespace %q for pod %q from URL %s", pod.Namespace, pod.Name, url) pod.ObjectMeta.SelfLink = fmt.Sprintf("/api/v1beta2/pods/%s?namespace=%s", pod.Name, pod.Namespace) diff --git a/pkg/kubelet/config/http_test.go b/pkg/kubelet/config/http_test.go index 59baba517ba..533769c67ef 100644 --- a/pkg/kubelet/config/http_test.go +++ b/pkg/kubelet/config/http_test.go @@ -26,6 +26,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/v1beta1" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/v1beta3" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/validation" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" @@ -117,7 +118,7 @@ func TestExtractInvalidManifest(t *testing.T) { } } -func TestExtractFromHTTP(t *testing.T) { +func TestExtractManifestFromHTTP(t *testing.T) { hostname, _ := os.Hostname() hostname = strings.ToLower(hostname) @@ -281,3 +282,186 @@ func TestExtractFromHTTP(t *testing.T) { } } } + +func TestExtractPodsFromHTTP(t *testing.T) { + hostname, _ := os.Hostname() + hostname = strings.ToLower(hostname) + + var testCases = []struct { + desc string + pods interface{} + expected kubelet.PodUpdate + }{ + { + desc: "Single pod v1beta1", + pods: v1beta1.Pod{ + TypeMeta: v1beta1.TypeMeta{ + Kind: "Pod", + APIVersion: "v1beta1", + ID: "foo", + UID: "111", + Namespace: "mynamespace", + }, + DesiredState: v1beta1.PodState{ + Manifest: v1beta1.ContainerManifest{ + Containers: []v1beta1.Container{{Name: "1", Image: "foo", ImagePullPolicy: v1beta1.PullAlways}}, + }, + }, + }, + expected: CreatePodUpdate(kubelet.SET, + kubelet.HTTPSource, + api.Pod{ + ObjectMeta: api.ObjectMeta{ + UID: "111", + Name: "foo" + "-" + hostname, + Namespace: "mynamespace", + SelfLink: "/api/v1beta2/pods/foo-" + hostname + "?namespace=mynamespace", + }, + Spec: api.PodSpec{ + RestartPolicy: api.RestartPolicyAlways, + DNSPolicy: api.DNSClusterFirst, + Containers: []api.Container{{ + Name: "1", + Image: "foo", + TerminationMessagePath: "/dev/termination-log", + ImagePullPolicy: "Always"}}, + }, + }), + }, + { + desc: "Single pod v1beta3", + pods: v1beta3.Pod{ + TypeMeta: v1beta3.TypeMeta{ + Kind: "Pod", + APIVersion: "v1beta3", + }, + ObjectMeta: v1beta3.ObjectMeta{ + Name: "foo", + UID: "111", + Namespace: "mynamespace", + }, + Spec: v1beta3.PodSpec{ + Containers: []v1beta3.Container{{Name: "1", Image: "foo", ImagePullPolicy: v1beta3.PullAlways}}, + }, + }, + expected: CreatePodUpdate(kubelet.SET, + kubelet.HTTPSource, + api.Pod{ + ObjectMeta: api.ObjectMeta{ + UID: "111", + Name: "foo" + "-" + hostname, + Namespace: "mynamespace", + SelfLink: "/api/v1beta2/pods/foo-" + hostname + "?namespace=mynamespace", + }, + Spec: api.PodSpec{ + RestartPolicy: api.RestartPolicyAlways, + DNSPolicy: api.DNSClusterFirst, + Containers: []api.Container{{ + Name: "1", + Image: "foo", + TerminationMessagePath: "/dev/termination-log", + ImagePullPolicy: "Always"}}, + }, + }), + }, + { + desc: "Multiple pods", + pods: v1beta3.PodList{ + TypeMeta: v1beta3.TypeMeta{ + Kind: "PodList", + APIVersion: "v1beta3", + }, + Items: []v1beta3.Pod{ + { + ObjectMeta: v1beta3.ObjectMeta{ + Name: "foo", + UID: "111", + }, + Spec: v1beta3.PodSpec{ + Containers: []v1beta3.Container{{Name: "1", Image: "foo", ImagePullPolicy: v1beta3.PullAlways}}, + }, + }, + { + ObjectMeta: v1beta3.ObjectMeta{ + Name: "bar", + UID: "222", + }, + Spec: v1beta3.PodSpec{ + Containers: []v1beta3.Container{{Name: "2", Image: "bar", ImagePullPolicy: ""}}, + }, + }, + }, + }, + expected: CreatePodUpdate(kubelet.SET, + kubelet.HTTPSource, + api.Pod{ + ObjectMeta: api.ObjectMeta{ + UID: "111", + Name: "foo" + "-" + hostname, + Namespace: "default", + SelfLink: "/api/v1beta2/pods/foo-" + hostname + "?namespace=default", + }, + Spec: api.PodSpec{ + RestartPolicy: api.RestartPolicyAlways, + DNSPolicy: api.DNSClusterFirst, + Containers: []api.Container{{ + Name: "1", + Image: "foo", + TerminationMessagePath: "/dev/termination-log", + ImagePullPolicy: "Always"}}, + }, + }, + api.Pod{ + ObjectMeta: api.ObjectMeta{ + UID: "222", + Name: "bar" + "-" + hostname, + Namespace: "default", + SelfLink: "/api/v1beta2/pods/bar-" + hostname + "?namespace=default", + }, + Spec: api.PodSpec{ + RestartPolicy: api.RestartPolicyAlways, + DNSPolicy: api.DNSClusterFirst, + Containers: []api.Container{{ + Name: "2", + Image: "bar", + TerminationMessagePath: "/dev/termination-log", + ImagePullPolicy: "IfNotPresent"}}, + }, + }), + }, + { + desc: "Empty Array", + pods: []v1beta3.Pod{}, + expected: CreatePodUpdate(kubelet.SET, kubelet.HTTPSource), + }, + } + + for _, testCase := range testCases { + data, err := json.Marshal(testCase.pods) + if err != nil { + t.Fatalf("%s: Some weird json problem: %v", testCase.desc, err) + } + fakeHandler := util.FakeHandler{ + StatusCode: 200, + ResponseBody: string(data), + } + testServer := httptest.NewServer(&fakeHandler) + defer testServer.Close() + ch := make(chan interface{}, 1) + c := sourceURL{testServer.URL, ch, nil} + if err := c.extractFromURL(); err != nil { + t.Errorf("%s: Unexpected error: %v", testCase.desc, err) + continue + } + update := (<-ch).(kubelet.PodUpdate) + + if !api.Semantic.DeepEqual(testCase.expected, update) { + t.Errorf("%s: Expected: %#v, Got: %#v", testCase.desc, testCase.expected, update) + } + for i := range update.Pods { + if errs := validation.ValidatePod(&update.Pods[i]); len(errs) != 0 { + t.Errorf("%s: Expected no validation errors on %#v, Got %v", testCase.desc, update.Pods[i], errors.NewAggregate(errs)) + } + } + } +}