Merge pull request #5598 from wojtek-t/kubelet_http_read_pod

Kubelet read pods from http
This commit is contained in:
Victor Marmol 2015-03-19 11:49:28 -07:00
commit 6f6485909e
2 changed files with 265 additions and 6 deletions

View File

@ -123,9 +123,36 @@ func (s *sourceURL) extractFromURL() error {
return nil
}
return fmt.Errorf("%v: received '%v', but couldn't parse as a "+
"single manifest (%v: %+v) or as multiple manifests (%v: %+v).\n",
s.url, string(data), singleErr, manifest, multiErr, manifests)
// Parsing it as ContainerManifest(s) failed.
// Try to parse it as Pod(s).
// First try as it is a single pod.
parsed, pod, singlePodErr := tryDecodeSinglePod(data, s.url)
if parsed {
if singlePodErr != nil {
// It parsed but could not be used.
return singlePodErr
}
s.updates <- kubelet.PodUpdate{[]api.Pod{pod}, kubelet.SET, kubelet.HTTPSource}
return nil
}
// That didn't work, so try a list of pods.
parsed, pods, multiPodErr := tryDecodePodList(data, s.url)
if parsed {
if multiPodErr != nil {
// It parsed but could not be used.
return multiPodErr
}
s.updates <- kubelet.PodUpdate{pods.Items, kubelet.SET, kubelet.HTTPSource}
return nil
}
return fmt.Errorf("%v: received '%v', but couldn't parse as neither "+
"single (%v: %+v) or multiple manifests (%v: %+v) nor "+
"single (%v) or multiple pods (%v).\n",
s.url, string(data), singleErr, manifest, multiErr, manifests,
singlePodErr, multiPodErr)
}
func tryDecodeSingle(data []byte) (parsed bool, manifest v1beta1.ContainerManifest, pod api.Pod, err error) {
@ -183,6 +210,53 @@ func tryDecodeList(data []byte) (parsed bool, manifests []v1beta1.ContainerManif
return true, manifests, pods, nil
}
func tryDecodeSinglePod(data []byte, url string) (parsed bool, pod api.Pod, err error) {
obj, err := api.Scheme.Decode(data)
if err != nil {
return false, pod, err
}
// Check whether the object could be converted to single pod.
if _, ok := obj.(*api.Pod); !ok {
err = fmt.Errorf("invalid pod: %+v", obj)
return false, pod, err
}
newPod := obj.(*api.Pod)
// Apply default values and validate the pod.
if err = applyDefaults(newPod, url); err != nil {
return true, pod, err
}
if errs := validation.ValidatePod(newPod); len(errs) > 0 {
err = fmt.Errorf("invalid pod: %v", errs)
return true, pod, err
}
return true, *newPod, nil
}
func tryDecodePodList(data []byte, url string) (parsed bool, pods api.PodList, err error) {
obj, err := api.Scheme.Decode(data)
if err != nil {
return false, pods, err
}
// Check whether the object could be converted to list of pods.
if _, ok := obj.(*api.PodList); !ok {
err = fmt.Errorf("invalid pods list: %+v", obj)
return false, pods, err
}
newPods := obj.(*api.PodList)
// Apply default values and validate pods.
for i := range newPods.Items {
newPod := &newPods.Items[i]
if err = applyDefaults(newPod, url); err != nil {
return true, pods, err
}
if errs := validation.ValidatePod(newPod); len(errs) > 0 {
err = fmt.Errorf("invalid pod: %v", errs)
return true, pods, err
}
}
return true, *newPods, err
}
func applyDefaults(pod *api.Pod, url string) error {
if len(pod.UID) == 0 {
hasher := md5.New()
@ -203,8 +277,9 @@ func applyDefaults(pod *api.Pod, url string) error {
}
glog.V(5).Infof("Generated Name %q for UID %q from URL %s", pod.Name, pod.UID, url)
// Always overrides the namespace.
pod.Namespace = kubelet.NamespaceDefault
if pod.Namespace == "" {
pod.Namespace = kubelet.NamespaceDefault
}
glog.V(5).Infof("Using namespace %q for pod %q from URL %s", pod.Namespace, pod.Name, url)
pod.ObjectMeta.SelfLink = fmt.Sprintf("/api/v1beta2/pods/%s?namespace=%s",
pod.Name, pod.Namespace)

View File

@ -26,6 +26,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/v1beta1"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/v1beta3"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/validation"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
@ -117,7 +118,7 @@ func TestExtractInvalidManifest(t *testing.T) {
}
}
func TestExtractFromHTTP(t *testing.T) {
func TestExtractManifestFromHTTP(t *testing.T) {
hostname, _ := os.Hostname()
hostname = strings.ToLower(hostname)
@ -287,3 +288,186 @@ func TestExtractFromHTTP(t *testing.T) {
}
}
}
func TestExtractPodsFromHTTP(t *testing.T) {
hostname, _ := os.Hostname()
hostname = strings.ToLower(hostname)
var testCases = []struct {
desc string
pods interface{}
expected kubelet.PodUpdate
}{
{
desc: "Single pod v1beta1",
pods: v1beta1.Pod{
TypeMeta: v1beta1.TypeMeta{
Kind: "Pod",
APIVersion: "v1beta1",
ID: "foo",
UID: "111",
Namespace: "mynamespace",
},
DesiredState: v1beta1.PodState{
Manifest: v1beta1.ContainerManifest{
Containers: []v1beta1.Container{{Name: "1", Image: "foo", ImagePullPolicy: v1beta1.PullAlways}},
},
},
},
expected: CreatePodUpdate(kubelet.SET,
kubelet.HTTPSource,
api.Pod{
ObjectMeta: api.ObjectMeta{
UID: "111",
Name: "foo" + "-" + hostname,
Namespace: "mynamespace",
SelfLink: "/api/v1beta2/pods/foo-" + hostname + "?namespace=mynamespace",
},
Spec: api.PodSpec{
RestartPolicy: api.RestartPolicyAlways,
DNSPolicy: api.DNSClusterFirst,
Containers: []api.Container{{
Name: "1",
Image: "foo",
TerminationMessagePath: "/dev/termination-log",
ImagePullPolicy: "Always"}},
},
}),
},
{
desc: "Single pod v1beta3",
pods: v1beta3.Pod{
TypeMeta: v1beta3.TypeMeta{
Kind: "Pod",
APIVersion: "v1beta3",
},
ObjectMeta: v1beta3.ObjectMeta{
Name: "foo",
UID: "111",
Namespace: "mynamespace",
},
Spec: v1beta3.PodSpec{
Containers: []v1beta3.Container{{Name: "1", Image: "foo", ImagePullPolicy: v1beta3.PullAlways}},
},
},
expected: CreatePodUpdate(kubelet.SET,
kubelet.HTTPSource,
api.Pod{
ObjectMeta: api.ObjectMeta{
UID: "111",
Name: "foo" + "-" + hostname,
Namespace: "mynamespace",
SelfLink: "/api/v1beta2/pods/foo-" + hostname + "?namespace=mynamespace",
},
Spec: api.PodSpec{
RestartPolicy: api.RestartPolicyAlways,
DNSPolicy: api.DNSClusterFirst,
Containers: []api.Container{{
Name: "1",
Image: "foo",
TerminationMessagePath: "/dev/termination-log",
ImagePullPolicy: "Always"}},
},
}),
},
{
desc: "Multiple pods",
pods: v1beta3.PodList{
TypeMeta: v1beta3.TypeMeta{
Kind: "PodList",
APIVersion: "v1beta3",
},
Items: []v1beta3.Pod{
{
ObjectMeta: v1beta3.ObjectMeta{
Name: "foo",
UID: "111",
},
Spec: v1beta3.PodSpec{
Containers: []v1beta3.Container{{Name: "1", Image: "foo", ImagePullPolicy: v1beta3.PullAlways}},
},
},
{
ObjectMeta: v1beta3.ObjectMeta{
Name: "bar",
UID: "222",
},
Spec: v1beta3.PodSpec{
Containers: []v1beta3.Container{{Name: "2", Image: "bar", ImagePullPolicy: ""}},
},
},
},
},
expected: CreatePodUpdate(kubelet.SET,
kubelet.HTTPSource,
api.Pod{
ObjectMeta: api.ObjectMeta{
UID: "111",
Name: "foo" + "-" + hostname,
Namespace: "default",
SelfLink: "/api/v1beta2/pods/foo-" + hostname + "?namespace=default",
},
Spec: api.PodSpec{
RestartPolicy: api.RestartPolicyAlways,
DNSPolicy: api.DNSClusterFirst,
Containers: []api.Container{{
Name: "1",
Image: "foo",
TerminationMessagePath: "/dev/termination-log",
ImagePullPolicy: "Always"}},
},
},
api.Pod{
ObjectMeta: api.ObjectMeta{
UID: "222",
Name: "bar" + "-" + hostname,
Namespace: "default",
SelfLink: "/api/v1beta2/pods/bar-" + hostname + "?namespace=default",
},
Spec: api.PodSpec{
RestartPolicy: api.RestartPolicyAlways,
DNSPolicy: api.DNSClusterFirst,
Containers: []api.Container{{
Name: "2",
Image: "bar",
TerminationMessagePath: "/dev/termination-log",
ImagePullPolicy: "IfNotPresent"}},
},
}),
},
{
desc: "Empty Array",
pods: []v1beta3.Pod{},
expected: CreatePodUpdate(kubelet.SET, kubelet.HTTPSource),
},
}
for _, testCase := range testCases {
data, err := json.Marshal(testCase.pods)
if err != nil {
t.Fatalf("%s: Some weird json problem: %v", testCase.desc, err)
}
fakeHandler := util.FakeHandler{
StatusCode: 200,
ResponseBody: string(data),
}
testServer := httptest.NewServer(&fakeHandler)
defer testServer.Close()
ch := make(chan interface{}, 1)
c := sourceURL{testServer.URL, ch, nil}
if err := c.extractFromURL(); err != nil {
t.Errorf("%s: Unexpected error: %v", testCase.desc, err)
continue
}
update := (<-ch).(kubelet.PodUpdate)
if !api.Semantic.DeepEqual(testCase.expected, update) {
t.Errorf("%s: Expected: %#v, Got: %#v", testCase.desc, testCase.expected, update)
}
for i := range update.Pods {
if errs := validation.ValidatePod(&update.Pods[i]); len(errs) != 0 {
t.Errorf("%s: Expected no validation errors on %#v, Got %v", testCase.desc, update.Pods[i], errors.NewAggregate(errs))
}
}
}
}