Merge pull request #3701 from thockin/fix-klet-manifest-reading

Make kubelet's file source go through conversion
This commit is contained in:
Dawn Chen 2015-01-22 14:40:40 -08:00
commit 6db42eda4e
4 changed files with 241 additions and 54 deletions

View File

@ -29,6 +29,7 @@ import (
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/v1beta1"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet"
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
@ -145,13 +146,26 @@ func extractFromFile(filename string) (api.BoundPod, error) {
return pod, err
}
manifest := &api.ContainerManifest{}
// TODO: use api.Scheme.DecodeInto
if err := yaml.Unmarshal(data, manifest); err != nil {
// This is awful. DecodeInto() expects to find an APIObject, which
// Manifest is not. We keep reading manifest for now for compat, but
// we will eventually change it to read Pod (at which point this all
// becomes nicer). Until then, we assert that the ContainerManifest
// structure on disk is always v1beta1. Read that, convert it to a
// "current" ContainerManifest (should be ~identical), then convert
// that to a BoundPod (which is a well-understood conversion). This
// avoids writing a v1beta1.ContainerManifest -> api.BoundPod
// conversion which would be identical to the api.ContainerManifest ->
// api.BoundPod conversion.
oldManifest := &v1beta1.ContainerManifest{}
if err := yaml.Unmarshal(data, oldManifest); err != nil {
return pod, fmt.Errorf("can't unmarshal file %q: %v", filename, err)
}
if err := api.Scheme.Convert(manifest, &pod); err != nil {
newManifest := &api.ContainerManifest{}
if err := api.Scheme.Convert(oldManifest, newManifest); err != nil {
return pod, fmt.Errorf("can't convert pod from file %q: %v", filename, err)
}
if err := api.Scheme.Convert(newManifest, &pod); err != nil {
return pod, fmt.Errorf("can't convert pod from file %q: %v", filename, err)
}

View File

@ -26,27 +26,28 @@ import (
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/v1beta1"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/validation"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet"
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
)
func ExampleManifestAndPod(id string) (api.ContainerManifest, api.BoundPod) {
manifest := api.ContainerManifest{
func ExampleManifestAndPod(id string) (v1beta1.ContainerManifest, api.BoundPod) {
manifest := v1beta1.ContainerManifest{
ID: id,
UUID: types.UID(id),
Containers: []api.Container{
Containers: []v1beta1.Container{
{
Name: "c" + id,
Image: "foo",
TerminationMessagePath: "/somepath",
},
},
Volumes: []api.Volume{
Volumes: []v1beta1.Volume{
{
Name: "host-dir",
Source: api.VolumeSource{
HostPath: &api.HostPath{"/dir/path"},
Source: v1beta1.VolumeSource{
HostDir: &v1beta1.HostPath{"/dir/path"},
},
},
},
@ -120,7 +121,7 @@ func TestReadFromFile(t *testing.T) {
"version": "v1beta1",
"uuid": "12345",
"id": "test",
"containers": [{ "image": "test/image" }]
"containers": [{ "image": "test/image", imagePullPolicy: "PullAlways"}]
}`)
defer os.Remove(file.Name())
@ -137,7 +138,13 @@ func TestReadFromFile(t *testing.T) {
SelfLink: "",
},
Spec: api.PodSpec{
Containers: []api.Container{{Image: "test/image", TerminationMessagePath: "/dev/termination-log"}},
Containers: []api.Container{
{
Image: "test/image",
TerminationMessagePath: "/dev/termination-log",
ImagePullPolicy: api.PullAlways,
},
},
},
})
@ -163,6 +170,99 @@ func TestReadFromFile(t *testing.T) {
}
}
func TestReadFromFileWithoutID(t *testing.T) {
file := writeTestFile(t, os.TempDir(), "test_pod_config",
`{
"version": "v1beta1",
"uuid": "12345",
"containers": [{ "image": "test/image", imagePullPolicy: "PullAlways"}]
}`)
defer os.Remove(file.Name())
ch := make(chan interface{})
NewSourceFile(file.Name(), time.Millisecond, ch)
select {
case got := <-ch:
update := got.(kubelet.PodUpdate)
expected := CreatePodUpdate(kubelet.SET, kubelet.FileSource, api.BoundPod{
ObjectMeta: api.ObjectMeta{
Name: "",
UID: "12345",
Namespace: "",
SelfLink: "",
},
Spec: api.PodSpec{
Containers: []api.Container{
{
Image: "test/image",
TerminationMessagePath: "/dev/termination-log",
ImagePullPolicy: api.PullAlways,
},
},
},
})
if len(update.Pods[0].ObjectMeta.Name) == 0 {
t.Errorf("Name did not get defaulted")
}
update.Pods[0].ObjectMeta.Name = ""
update.Pods[0].ObjectMeta.Namespace = ""
update.Pods[0].ObjectMeta.SelfLink = ""
if !api.Semantic.DeepEqual(expected, update) {
t.Fatalf("Expected %#v, Got %#v", expected, update)
}
case <-time.After(2 * time.Millisecond):
t.Errorf("Expected update, timeout instead")
}
}
func TestReadV1Beta2FromFile(t *testing.T) {
file := writeTestFile(t, os.TempDir(), "test_pod_config",
`{
"version": "v1beta2",
"uuid": "12345",
"id": "test",
"containers": [{ "image": "test/image", imagePullPolicy: "PullAlways"}]
}`)
defer os.Remove(file.Name())
ch := make(chan interface{})
NewSourceFile(file.Name(), time.Millisecond, ch)
select {
case got := <-ch:
update := got.(kubelet.PodUpdate)
expected := CreatePodUpdate(kubelet.SET, kubelet.FileSource, api.BoundPod{
ObjectMeta: api.ObjectMeta{
Name: "test",
UID: "12345",
Namespace: "",
SelfLink: "",
},
Spec: api.PodSpec{
Containers: []api.Container{
{
Image: "test/image",
TerminationMessagePath: "/dev/termination-log",
ImagePullPolicy: api.PullAlways,
},
},
},
})
update.Pods[0].ObjectMeta.Namespace = ""
update.Pods[0].ObjectMeta.SelfLink = ""
if !api.Semantic.DeepEqual(expected, update) {
t.Fatalf("Expected %#v, Got %#v", expected, update)
}
case <-time.After(2 * time.Millisecond):
t.Errorf("Expected update, timeout instead")
}
}
func TestReadFromFileWithDefaults(t *testing.T) {
file := writeTestFile(t, os.TempDir(), "test_pod_config",
`{
@ -224,7 +324,7 @@ func TestExtractFromDir(t *testing.T) {
manifest, expectedPod := ExampleManifestAndPod("1")
manifest2, expectedPod2 := ExampleManifestAndPod("2")
manifests := []api.ContainerManifest{manifest, manifest2}
manifests := []v1beta1.ContainerManifest{manifest, manifest2}
pods := []api.BoundPod{expectedPod, expectedPod2}
files := make([]*os.File, len(manifests))

View File

@ -28,6 +28,7 @@ import (
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/v1beta1"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/validation"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet"
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
@ -84,40 +85,25 @@ func (s *sourceURL) extractFromURL() error {
s.data = data
// First try as if it's a single manifest
var manifest api.ContainerManifest
// TODO: should be api.Scheme.Decode
singleErr := yaml.Unmarshal(data, &manifest)
if singleErr == nil {
if errs := validation.ValidateManifest(&manifest); len(errs) > 0 {
singleErr = fmt.Errorf("invalid manifest: %v", errs)
}
}
if singleErr == nil {
pod := api.BoundPod{}
if err := api.Scheme.Convert(&manifest, &pod); err != nil {
return err
parsed, manifest, pod, singleErr := tryDecodeSingle(data)
if parsed {
if singleErr != nil {
// It parsed but could not be used.
return singleErr
}
// It parsed!
applyDefaults(&pod, s.url)
s.updates <- kubelet.PodUpdate{[]api.BoundPod{pod}, kubelet.SET, kubelet.HTTPSource}
return nil
}
// That didn't work, so try an array of manifests.
var manifests []api.ContainerManifest
// TODO: should be api.Scheme.Decode
multiErr := yaml.Unmarshal(data, &manifests)
// We're not sure if the person reading the logs is going to care about the single or
// multiple manifest unmarshalling attempt, so we need to put both in the logs, as is
// done at the end. Hence not returning early here.
if multiErr == nil {
for _, manifest := range manifests {
if errs := validation.ValidateManifest(&manifest); len(errs) > 0 {
multiErr = fmt.Errorf("invalid manifest: %v", errs)
break
}
parsed, manifests, pods, multiErr := tryDecodeList(data)
if parsed {
if multiErr != nil {
// It parsed but could not be used.
return multiErr
}
}
if multiErr == nil {
// A single manifest that did not pass semantic validation will yield an empty
// array of manifests (and no error) when unmarshaled as such. In that case,
// if the single manifest at least had a Version, we return the single-manifest
@ -125,16 +111,12 @@ func (s *sourceURL) extractFromURL() error {
if len(manifests) == 0 && len(manifest.Version) != 0 {
return singleErr
}
list := api.ContainerManifestList{Items: manifests}
boundPods := &api.BoundPods{}
if err := api.Scheme.Convert(&list, boundPods); err != nil {
return err
}
for i := range boundPods.Items {
pod := &boundPods.Items[i]
// Assume it parsed.
for i := range pods.Items {
pod := &pods.Items[i]
applyDefaults(pod, s.url)
}
s.updates <- kubelet.PodUpdate{boundPods.Items, kubelet.SET, kubelet.HTTPSource}
s.updates <- kubelet.PodUpdate{pods.Items, kubelet.SET, kubelet.HTTPSource}
return nil
}
@ -143,6 +125,61 @@ func (s *sourceURL) extractFromURL() error {
s.url, string(data), singleErr, manifest, multiErr, manifests)
}
func tryDecodeSingle(data []byte) (parsed bool, manifest v1beta1.ContainerManifest, pod api.BoundPod, err error) {
// TODO: should be api.Scheme.Decode
// This is awful. DecodeInto() expects to find an APIObject, which
// Manifest is not. We keep reading manifest for now for compat, but
// we will eventually change it to read Pod (at which point this all
// becomes nicer). Until then, we assert that the ContainerManifest
// structure on disk is always v1beta1. Read that, convert it to a
// "current" ContainerManifest (should be ~identical), then convert
// that to a BoundPod (which is a well-understood conversion). This
// avoids writing a v1beta1.ContainerManifest -> api.BoundPod
// conversion which would be identical to the api.ContainerManifest ->
// api.BoundPod conversion.
if err = yaml.Unmarshal(data, &manifest); err != nil {
return false, manifest, pod, err
}
newManifest := api.ContainerManifest{}
if err = api.Scheme.Convert(&manifest, &newManifest); err != nil {
return false, manifest, pod, err
}
if errs := validation.ValidateManifest(&newManifest); len(errs) > 0 {
err = fmt.Errorf("invalid manifest: %v", errs)
return false, manifest, pod, err
}
if err = api.Scheme.Convert(&newManifest, &pod); err != nil {
return true, manifest, pod, err
}
// Success.
return true, manifest, pod, nil
}
func tryDecodeList(data []byte) (parsed bool, manifests []v1beta1.ContainerManifest, pods api.BoundPods, err error) {
// TODO: should be api.Scheme.Decode
// See the comment in tryDecodeSingle().
if err = yaml.Unmarshal(data, &manifests); err != nil {
return false, manifests, pods, err
}
newManifests := []api.ContainerManifest{}
if err = api.Scheme.Convert(&manifests, &newManifests); err != nil {
return false, manifests, pods, err
}
for i := range newManifests {
manifest := &newManifests[i]
if errs := validation.ValidateManifest(manifest); len(errs) > 0 {
err = fmt.Errorf("invalid manifest: %v", errs)
return false, manifests, pods, err
}
}
list := api.ContainerManifestList{Items: newManifests}
if err = api.Scheme.Convert(&list, &pods); err != nil {
return true, manifests, pods, err
}
// Success.
return true, manifests, pods, nil
}
func applyDefaults(pod *api.BoundPod, url string) {
if len(pod.UID) == 0 {
hasher := md5.New()

View File

@ -24,6 +24,7 @@ import (
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/v1beta1"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/validation"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
@ -122,8 +123,9 @@ func TestExtractFromHTTP(t *testing.T) {
expected kubelet.PodUpdate
}{
{
desc: "Single manifest",
manifests: api.ContainerManifest{Version: "v1beta1", ID: "foo", UUID: "111"},
desc: "Single manifest",
manifests: v1beta1.ContainerManifest{Version: "v1beta1", ID: "foo", UUID: "111",
Containers: []v1beta1.Container{{Name: "1", Image: "foo", ImagePullPolicy: v1beta1.PullAlways}}},
expected: CreatePodUpdate(kubelet.SET,
kubelet.HTTPSource,
api.BoundPod{
@ -135,6 +137,11 @@ func TestExtractFromHTTP(t *testing.T) {
Spec: api.PodSpec{
RestartPolicy: api.RestartPolicy{Always: &api.RestartPolicyAlways{}},
DNSPolicy: api.DNSClusterFirst,
Containers: []api.Container{{
Name: "1",
Image: "foo",
TerminationMessagePath: "/dev/termination-log",
ImagePullPolicy: "Always"}},
},
}),
},
@ -155,11 +162,36 @@ func TestExtractFromHTTP(t *testing.T) {
},
}),
},
{
desc: "Single manifest with v1beta2",
manifests: v1beta1.ContainerManifest{Version: "v1beta2", ID: "foo", UUID: "111",
Containers: []v1beta1.Container{{Name: "1", Image: "foo", ImagePullPolicy: v1beta1.PullAlways}}},
expected: CreatePodUpdate(kubelet.SET,
kubelet.HTTPSource,
api.BoundPod{
ObjectMeta: api.ObjectMeta{
UID: "111",
Name: "foo",
Namespace: "foobar",
},
Spec: api.PodSpec{
RestartPolicy: api.RestartPolicy{Always: &api.RestartPolicyAlways{}},
DNSPolicy: api.DNSClusterFirst,
Containers: []api.Container{{
Name: "1",
Image: "foo",
TerminationMessagePath: "/dev/termination-log",
ImagePullPolicy: "Always"}},
},
}),
},
{
desc: "Multiple manifests",
manifests: []api.ContainerManifest{
{Version: "v1beta1", ID: "foo", UUID: "111", Containers: []api.Container{{Name: "1", Image: "foo"}}},
{Version: "v1beta1", ID: "bar", UUID: "222", Containers: []api.Container{{Name: "1", Image: "foo"}}},
manifests: []v1beta1.ContainerManifest{
{Version: "v1beta1", ID: "foo", UUID: "111",
Containers: []v1beta1.Container{{Name: "1", Image: "foo", ImagePullPolicy: v1beta1.PullAlways}}},
{Version: "v1beta1", ID: "bar", UUID: "222",
Containers: []v1beta1.Container{{Name: "1", Image: "foo", ImagePullPolicy: ""}}},
},
expected: CreatePodUpdate(kubelet.SET,
kubelet.HTTPSource,
@ -170,11 +202,13 @@ func TestExtractFromHTTP(t *testing.T) {
Namespace: "foobar",
},
Spec: api.PodSpec{
RestartPolicy: api.RestartPolicy{Always: &api.RestartPolicyAlways{}},
DNSPolicy: api.DNSClusterFirst,
Containers: []api.Container{{
Name: "1",
Image: "foo",
TerminationMessagePath: "/dev/termination-log",
ImagePullPolicy: "IfNotPresent"}},
ImagePullPolicy: "Always"}},
},
},
api.BoundPod{
@ -184,6 +218,8 @@ func TestExtractFromHTTP(t *testing.T) {
Namespace: "foobar",
},
Spec: api.PodSpec{
RestartPolicy: api.RestartPolicy{Always: &api.RestartPolicyAlways{}},
DNSPolicy: api.DNSClusterFirst,
Containers: []api.Container{{
Name: "1",
Image: "foo",
@ -194,7 +230,7 @@ func TestExtractFromHTTP(t *testing.T) {
},
{
desc: "Empty Array",
manifests: []api.ContainerManifest{},
manifests: []v1beta1.ContainerManifest{},
expected: CreatePodUpdate(kubelet.SET, kubelet.HTTPSource),
},
}