mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-27 13:37:30 +00:00
Make kubelet's HTTP source go through conversion
This commit is contained in:
parent
4ff2865cd1
commit
d63162b7e7
@ -170,6 +170,99 @@ func TestReadFromFile(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestReadFromFileWithoutID(t *testing.T) {
|
||||||
|
file := writeTestFile(t, os.TempDir(), "test_pod_config",
|
||||||
|
`{
|
||||||
|
"version": "v1beta1",
|
||||||
|
"uuid": "12345",
|
||||||
|
"containers": [{ "image": "test/image", imagePullPolicy: "PullAlways"}]
|
||||||
|
}`)
|
||||||
|
defer os.Remove(file.Name())
|
||||||
|
|
||||||
|
ch := make(chan interface{})
|
||||||
|
NewSourceFile(file.Name(), time.Millisecond, ch)
|
||||||
|
select {
|
||||||
|
case got := <-ch:
|
||||||
|
update := got.(kubelet.PodUpdate)
|
||||||
|
expected := CreatePodUpdate(kubelet.SET, kubelet.FileSource, api.BoundPod{
|
||||||
|
ObjectMeta: api.ObjectMeta{
|
||||||
|
Name: "",
|
||||||
|
UID: "12345",
|
||||||
|
Namespace: "",
|
||||||
|
SelfLink: "",
|
||||||
|
},
|
||||||
|
Spec: api.PodSpec{
|
||||||
|
Containers: []api.Container{
|
||||||
|
{
|
||||||
|
Image: "test/image",
|
||||||
|
TerminationMessagePath: "/dev/termination-log",
|
||||||
|
ImagePullPolicy: api.PullAlways,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
if len(update.Pods[0].ObjectMeta.Name) == 0 {
|
||||||
|
t.Errorf("Name did not get defaulted")
|
||||||
|
}
|
||||||
|
update.Pods[0].ObjectMeta.Name = ""
|
||||||
|
update.Pods[0].ObjectMeta.Namespace = ""
|
||||||
|
update.Pods[0].ObjectMeta.SelfLink = ""
|
||||||
|
|
||||||
|
if !api.Semantic.DeepEqual(expected, update) {
|
||||||
|
t.Fatalf("Expected %#v, Got %#v", expected, update)
|
||||||
|
}
|
||||||
|
|
||||||
|
case <-time.After(2 * time.Millisecond):
|
||||||
|
t.Errorf("Expected update, timeout instead")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestReadV1Beta2FromFile(t *testing.T) {
|
||||||
|
file := writeTestFile(t, os.TempDir(), "test_pod_config",
|
||||||
|
`{
|
||||||
|
"version": "v1beta2",
|
||||||
|
"uuid": "12345",
|
||||||
|
"id": "test",
|
||||||
|
"containers": [{ "image": "test/image", imagePullPolicy: "PullAlways"}]
|
||||||
|
}`)
|
||||||
|
defer os.Remove(file.Name())
|
||||||
|
|
||||||
|
ch := make(chan interface{})
|
||||||
|
NewSourceFile(file.Name(), time.Millisecond, ch)
|
||||||
|
select {
|
||||||
|
case got := <-ch:
|
||||||
|
update := got.(kubelet.PodUpdate)
|
||||||
|
expected := CreatePodUpdate(kubelet.SET, kubelet.FileSource, api.BoundPod{
|
||||||
|
ObjectMeta: api.ObjectMeta{
|
||||||
|
Name: "test",
|
||||||
|
UID: "12345",
|
||||||
|
Namespace: "",
|
||||||
|
SelfLink: "",
|
||||||
|
},
|
||||||
|
Spec: api.PodSpec{
|
||||||
|
Containers: []api.Container{
|
||||||
|
{
|
||||||
|
Image: "test/image",
|
||||||
|
TerminationMessagePath: "/dev/termination-log",
|
||||||
|
ImagePullPolicy: api.PullAlways,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
update.Pods[0].ObjectMeta.Namespace = ""
|
||||||
|
update.Pods[0].ObjectMeta.SelfLink = ""
|
||||||
|
|
||||||
|
if !api.Semantic.DeepEqual(expected, update) {
|
||||||
|
t.Fatalf("Expected %#v, Got %#v", expected, update)
|
||||||
|
}
|
||||||
|
|
||||||
|
case <-time.After(2 * time.Millisecond):
|
||||||
|
t.Errorf("Expected update, timeout instead")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestReadFromFileWithDefaults(t *testing.T) {
|
func TestReadFromFileWithDefaults(t *testing.T) {
|
||||||
file := writeTestFile(t, os.TempDir(), "test_pod_config",
|
file := writeTestFile(t, os.TempDir(), "test_pod_config",
|
||||||
`{
|
`{
|
||||||
|
@ -28,6 +28,7 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||||
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/v1beta1"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/validation"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/validation"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
|
||||||
@ -84,40 +85,25 @@ func (s *sourceURL) extractFromURL() error {
|
|||||||
s.data = data
|
s.data = data
|
||||||
|
|
||||||
// First try as if it's a single manifest
|
// First try as if it's a single manifest
|
||||||
var manifest api.ContainerManifest
|
parsed, manifest, pod, singleErr := tryDecodeSingle(data)
|
||||||
// TODO: should be api.Scheme.Decode
|
if parsed {
|
||||||
singleErr := yaml.Unmarshal(data, &manifest)
|
if singleErr != nil {
|
||||||
if singleErr == nil {
|
// It parsed but could not be used.
|
||||||
if errs := validation.ValidateManifest(&manifest); len(errs) > 0 {
|
return singleErr
|
||||||
singleErr = fmt.Errorf("invalid manifest: %v", errs)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if singleErr == nil {
|
|
||||||
pod := api.BoundPod{}
|
|
||||||
if err := api.Scheme.Convert(&manifest, &pod); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
// It parsed!
|
||||||
applyDefaults(&pod, s.url)
|
applyDefaults(&pod, s.url)
|
||||||
s.updates <- kubelet.PodUpdate{[]api.BoundPod{pod}, kubelet.SET, kubelet.HTTPSource}
|
s.updates <- kubelet.PodUpdate{[]api.BoundPod{pod}, kubelet.SET, kubelet.HTTPSource}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// That didn't work, so try an array of manifests.
|
// That didn't work, so try an array of manifests.
|
||||||
var manifests []api.ContainerManifest
|
parsed, manifests, pods, multiErr := tryDecodeList(data)
|
||||||
// TODO: should be api.Scheme.Decode
|
if parsed {
|
||||||
multiErr := yaml.Unmarshal(data, &manifests)
|
if multiErr != nil {
|
||||||
// We're not sure if the person reading the logs is going to care about the single or
|
// It parsed but could not be used.
|
||||||
// multiple manifest unmarshalling attempt, so we need to put both in the logs, as is
|
return multiErr
|
||||||
// done at the end. Hence not returning early here.
|
|
||||||
if multiErr == nil {
|
|
||||||
for _, manifest := range manifests {
|
|
||||||
if errs := validation.ValidateManifest(&manifest); len(errs) > 0 {
|
|
||||||
multiErr = fmt.Errorf("invalid manifest: %v", errs)
|
|
||||||
break
|
|
||||||
}
|
}
|
||||||
}
|
|
||||||
}
|
|
||||||
if multiErr == nil {
|
|
||||||
// A single manifest that did not pass semantic validation will yield an empty
|
// A single manifest that did not pass semantic validation will yield an empty
|
||||||
// array of manifests (and no error) when unmarshaled as such. In that case,
|
// array of manifests (and no error) when unmarshaled as such. In that case,
|
||||||
// if the single manifest at least had a Version, we return the single-manifest
|
// if the single manifest at least had a Version, we return the single-manifest
|
||||||
@ -125,16 +111,12 @@ func (s *sourceURL) extractFromURL() error {
|
|||||||
if len(manifests) == 0 && len(manifest.Version) != 0 {
|
if len(manifests) == 0 && len(manifest.Version) != 0 {
|
||||||
return singleErr
|
return singleErr
|
||||||
}
|
}
|
||||||
list := api.ContainerManifestList{Items: manifests}
|
// Assume it parsed.
|
||||||
boundPods := &api.BoundPods{}
|
for i := range pods.Items {
|
||||||
if err := api.Scheme.Convert(&list, boundPods); err != nil {
|
pod := &pods.Items[i]
|
||||||
return err
|
|
||||||
}
|
|
||||||
for i := range boundPods.Items {
|
|
||||||
pod := &boundPods.Items[i]
|
|
||||||
applyDefaults(pod, s.url)
|
applyDefaults(pod, s.url)
|
||||||
}
|
}
|
||||||
s.updates <- kubelet.PodUpdate{boundPods.Items, kubelet.SET, kubelet.HTTPSource}
|
s.updates <- kubelet.PodUpdate{pods.Items, kubelet.SET, kubelet.HTTPSource}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -143,6 +125,61 @@ func (s *sourceURL) extractFromURL() error {
|
|||||||
s.url, string(data), singleErr, manifest, multiErr, manifests)
|
s.url, string(data), singleErr, manifest, multiErr, manifests)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func tryDecodeSingle(data []byte) (parsed bool, manifest v1beta1.ContainerManifest, pod api.BoundPod, err error) {
|
||||||
|
// TODO: should be api.Scheme.Decode
|
||||||
|
// This is awful. DecodeInto() expects to find an APIObject, which
|
||||||
|
// Manifest is not. We keep reading manifest for now for compat, but
|
||||||
|
// we will eventually change it to read Pod (at which point this all
|
||||||
|
// becomes nicer). Until then, we assert that the ContainerManifest
|
||||||
|
// structure on disk is always v1beta1. Read that, convert it to a
|
||||||
|
// "current" ContainerManifest (should be ~identical), then convert
|
||||||
|
// that to a BoundPod (which is a well-understood conversion). This
|
||||||
|
// avoids writing a v1beta1.ContainerManifest -> api.BoundPod
|
||||||
|
// conversion which would be identical to the api.ContainerManifest ->
|
||||||
|
// api.BoundPod conversion.
|
||||||
|
if err = yaml.Unmarshal(data, &manifest); err != nil {
|
||||||
|
return false, manifest, pod, err
|
||||||
|
}
|
||||||
|
newManifest := api.ContainerManifest{}
|
||||||
|
if err = api.Scheme.Convert(&manifest, &newManifest); err != nil {
|
||||||
|
return false, manifest, pod, err
|
||||||
|
}
|
||||||
|
if errs := validation.ValidateManifest(&newManifest); len(errs) > 0 {
|
||||||
|
err = fmt.Errorf("invalid manifest: %v", errs)
|
||||||
|
return false, manifest, pod, err
|
||||||
|
}
|
||||||
|
if err = api.Scheme.Convert(&newManifest, &pod); err != nil {
|
||||||
|
return true, manifest, pod, err
|
||||||
|
}
|
||||||
|
// Success.
|
||||||
|
return true, manifest, pod, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func tryDecodeList(data []byte) (parsed bool, manifests []v1beta1.ContainerManifest, pods api.BoundPods, err error) {
|
||||||
|
// TODO: should be api.Scheme.Decode
|
||||||
|
// See the comment in tryDecodeSingle().
|
||||||
|
if err = yaml.Unmarshal(data, &manifests); err != nil {
|
||||||
|
return false, manifests, pods, err
|
||||||
|
}
|
||||||
|
newManifests := []api.ContainerManifest{}
|
||||||
|
if err = api.Scheme.Convert(&manifests, &newManifests); err != nil {
|
||||||
|
return false, manifests, pods, err
|
||||||
|
}
|
||||||
|
for i := range newManifests {
|
||||||
|
manifest := &newManifests[i]
|
||||||
|
if errs := validation.ValidateManifest(manifest); len(errs) > 0 {
|
||||||
|
err = fmt.Errorf("invalid manifest: %v", errs)
|
||||||
|
return false, manifests, pods, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
list := api.ContainerManifestList{Items: newManifests}
|
||||||
|
if err = api.Scheme.Convert(&list, &pods); err != nil {
|
||||||
|
return true, manifests, pods, err
|
||||||
|
}
|
||||||
|
// Success.
|
||||||
|
return true, manifests, pods, nil
|
||||||
|
}
|
||||||
|
|
||||||
func applyDefaults(pod *api.BoundPod, url string) {
|
func applyDefaults(pod *api.BoundPod, url string) {
|
||||||
if len(pod.UID) == 0 {
|
if len(pod.UID) == 0 {
|
||||||
hasher := md5.New()
|
hasher := md5.New()
|
||||||
|
@ -24,6 +24,7 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||||
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/v1beta1"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/validation"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/validation"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
||||||
@ -123,7 +124,8 @@ func TestExtractFromHTTP(t *testing.T) {
|
|||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
desc: "Single manifest",
|
desc: "Single manifest",
|
||||||
manifests: api.ContainerManifest{Version: "v1beta1", ID: "foo", UUID: "111"},
|
manifests: v1beta1.ContainerManifest{Version: "v1beta1", ID: "foo", UUID: "111",
|
||||||
|
Containers: []v1beta1.Container{{Name: "1", Image: "foo", ImagePullPolicy: v1beta1.PullAlways}}},
|
||||||
expected: CreatePodUpdate(kubelet.SET,
|
expected: CreatePodUpdate(kubelet.SET,
|
||||||
kubelet.HTTPSource,
|
kubelet.HTTPSource,
|
||||||
api.BoundPod{
|
api.BoundPod{
|
||||||
@ -135,6 +137,11 @@ func TestExtractFromHTTP(t *testing.T) {
|
|||||||
Spec: api.PodSpec{
|
Spec: api.PodSpec{
|
||||||
RestartPolicy: api.RestartPolicy{Always: &api.RestartPolicyAlways{}},
|
RestartPolicy: api.RestartPolicy{Always: &api.RestartPolicyAlways{}},
|
||||||
DNSPolicy: api.DNSClusterFirst,
|
DNSPolicy: api.DNSClusterFirst,
|
||||||
|
Containers: []api.Container{{
|
||||||
|
Name: "1",
|
||||||
|
Image: "foo",
|
||||||
|
TerminationMessagePath: "/dev/termination-log",
|
||||||
|
ImagePullPolicy: "Always"}},
|
||||||
},
|
},
|
||||||
}),
|
}),
|
||||||
},
|
},
|
||||||
@ -155,11 +162,36 @@ func TestExtractFromHTTP(t *testing.T) {
|
|||||||
},
|
},
|
||||||
}),
|
}),
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
desc: "Single manifest with v1beta2",
|
||||||
|
manifests: v1beta1.ContainerManifest{Version: "v1beta2", ID: "foo", UUID: "111",
|
||||||
|
Containers: []v1beta1.Container{{Name: "1", Image: "foo", ImagePullPolicy: v1beta1.PullAlways}}},
|
||||||
|
expected: CreatePodUpdate(kubelet.SET,
|
||||||
|
kubelet.HTTPSource,
|
||||||
|
api.BoundPod{
|
||||||
|
ObjectMeta: api.ObjectMeta{
|
||||||
|
UID: "111",
|
||||||
|
Name: "foo",
|
||||||
|
Namespace: "foobar",
|
||||||
|
},
|
||||||
|
Spec: api.PodSpec{
|
||||||
|
RestartPolicy: api.RestartPolicy{Always: &api.RestartPolicyAlways{}},
|
||||||
|
DNSPolicy: api.DNSClusterFirst,
|
||||||
|
Containers: []api.Container{{
|
||||||
|
Name: "1",
|
||||||
|
Image: "foo",
|
||||||
|
TerminationMessagePath: "/dev/termination-log",
|
||||||
|
ImagePullPolicy: "Always"}},
|
||||||
|
},
|
||||||
|
}),
|
||||||
|
},
|
||||||
{
|
{
|
||||||
desc: "Multiple manifests",
|
desc: "Multiple manifests",
|
||||||
manifests: []api.ContainerManifest{
|
manifests: []v1beta1.ContainerManifest{
|
||||||
{Version: "v1beta1", ID: "foo", UUID: "111", Containers: []api.Container{{Name: "1", Image: "foo"}}},
|
{Version: "v1beta1", ID: "foo", UUID: "111",
|
||||||
{Version: "v1beta1", ID: "bar", UUID: "222", Containers: []api.Container{{Name: "1", Image: "foo"}}},
|
Containers: []v1beta1.Container{{Name: "1", Image: "foo", ImagePullPolicy: v1beta1.PullAlways}}},
|
||||||
|
{Version: "v1beta1", ID: "bar", UUID: "222",
|
||||||
|
Containers: []v1beta1.Container{{Name: "1", Image: "foo", ImagePullPolicy: ""}}},
|
||||||
},
|
},
|
||||||
expected: CreatePodUpdate(kubelet.SET,
|
expected: CreatePodUpdate(kubelet.SET,
|
||||||
kubelet.HTTPSource,
|
kubelet.HTTPSource,
|
||||||
@ -170,11 +202,13 @@ func TestExtractFromHTTP(t *testing.T) {
|
|||||||
Namespace: "foobar",
|
Namespace: "foobar",
|
||||||
},
|
},
|
||||||
Spec: api.PodSpec{
|
Spec: api.PodSpec{
|
||||||
|
RestartPolicy: api.RestartPolicy{Always: &api.RestartPolicyAlways{}},
|
||||||
|
DNSPolicy: api.DNSClusterFirst,
|
||||||
Containers: []api.Container{{
|
Containers: []api.Container{{
|
||||||
Name: "1",
|
Name: "1",
|
||||||
Image: "foo",
|
Image: "foo",
|
||||||
TerminationMessagePath: "/dev/termination-log",
|
TerminationMessagePath: "/dev/termination-log",
|
||||||
ImagePullPolicy: "IfNotPresent"}},
|
ImagePullPolicy: "Always"}},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
api.BoundPod{
|
api.BoundPod{
|
||||||
@ -184,6 +218,8 @@ func TestExtractFromHTTP(t *testing.T) {
|
|||||||
Namespace: "foobar",
|
Namespace: "foobar",
|
||||||
},
|
},
|
||||||
Spec: api.PodSpec{
|
Spec: api.PodSpec{
|
||||||
|
RestartPolicy: api.RestartPolicy{Always: &api.RestartPolicyAlways{}},
|
||||||
|
DNSPolicy: api.DNSClusterFirst,
|
||||||
Containers: []api.Container{{
|
Containers: []api.Container{{
|
||||||
Name: "1",
|
Name: "1",
|
||||||
Image: "foo",
|
Image: "foo",
|
||||||
@ -194,7 +230,7 @@ func TestExtractFromHTTP(t *testing.T) {
|
|||||||
},
|
},
|
||||||
{
|
{
|
||||||
desc: "Empty Array",
|
desc: "Empty Array",
|
||||||
manifests: []api.ContainerManifest{},
|
manifests: []v1beta1.ContainerManifest{},
|
||||||
expected: CreatePodUpdate(kubelet.SET, kubelet.HTTPSource),
|
expected: CreatePodUpdate(kubelet.SET, kubelet.HTTPSource),
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user