From 4380637be75b8fd3768748a920b0943e3119b27f Mon Sep 17 00:00:00 2001 From: Brendan Burns Date: Thu, 9 Oct 2014 20:30:34 -0700 Subject: [PATCH] Add update to the pod etcd handler. --- pkg/api/validation/validation.go | 25 ++++ pkg/api/validation/validation_test.go | 173 ++++++++++++++++++++++++++ pkg/registry/etcd/etcd.go | 41 +++++- pkg/registry/etcd/etcd_test.go | 142 +++++++++++++++++++++ 4 files changed, 380 insertions(+), 1 deletion(-) diff --git a/pkg/api/validation/validation.go b/pkg/api/validation/validation.go index 3b1e597518c..afe6e0e994c 100644 --- a/pkg/api/validation/validation.go +++ b/pkg/api/validation/validation.go @@ -17,6 +17,7 @@ limitations under the License. package validation import ( + "reflect" "strings" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" @@ -338,6 +339,30 @@ func ValidatePod(pod *api.Pod) errs.ErrorList { return allErrs } +// ValidatePodUpdate tests to see if the update is legal +func ValidatePodUpdate(newPod, oldPod *api.Pod) errs.ErrorList { + allErrs := errs.ErrorList{} + + if len(newPod.DesiredState.Manifest.Containers) != len(oldPod.DesiredState.Manifest.Containers) { + allErrs = append(allErrs, errs.NewFieldInvalid("DesiredState.Manifest.Containers", newPod.DesiredState.Manifest.Containers)) + return allErrs + } + pod := *newPod + pod.Labels = oldPod.Labels + pod.TypeMeta.ResourceVersion = oldPod.TypeMeta.ResourceVersion + // Tricky, we need to copy the container list so that we don't overwrite the update + var newContainers []api.Container + for ix, container := range pod.DesiredState.Manifest.Containers { + container.Image = oldPod.DesiredState.Manifest.Containers[ix].Image + newContainers = append(newContainers, container) + } + pod.DesiredState.Manifest.Containers = newContainers + if !reflect.DeepEqual(&pod, oldPod) { + allErrs = append(allErrs, errs.NewFieldInvalid("DesiredState.Manifest.Containers", newPod.DesiredState.Manifest.Containers)) + } + return allErrs +} + // ValidateService tests if required fields in the service are set. func ValidateService(service *api.Service) errs.ErrorList { allErrs := errs.ErrorList{} diff --git a/pkg/api/validation/validation_test.go b/pkg/api/validation/validation_test.go index 4e802b5444c..423e102d5fb 100644 --- a/pkg/api/validation/validation_test.go +++ b/pkg/api/validation/validation_test.go @@ -416,6 +416,179 @@ func TestValidatePod(t *testing.T) { } } +func TestValidatePodUpdate(t *testing.T) { + tests := []struct { + a api.Pod + b api.Pod + isValid bool + test string + }{ + {api.Pod{}, api.Pod{}, true, "nothing"}, + { + api.Pod{ + TypeMeta: api.TypeMeta{ID: "foo"}, + }, + api.Pod{ + TypeMeta: api.TypeMeta{ID: "bar"}, + }, + false, + "ids", + }, + { + api.Pod{ + TypeMeta: api.TypeMeta{ID: "foo"}, + Labels: map[string]string{ + "foo": "bar", + }, + }, + api.Pod{ + TypeMeta: api.TypeMeta{ID: "foo"}, + Labels: map[string]string{ + "bar": "foo", + }, + }, + true, + "labels", + }, + { + api.Pod{ + TypeMeta: api.TypeMeta{ID: "foo"}, + DesiredState: api.PodState{ + Manifest: api.ContainerManifest{ + Containers: []api.Container{ + { + Image: "foo:V1", + }, + }, + }, + }, + }, + api.Pod{ + TypeMeta: api.TypeMeta{ID: "foo"}, + DesiredState: api.PodState{ + Manifest: api.ContainerManifest{ + Containers: []api.Container{ + { + Image: "foo:V2", + }, + { + Image: "bar:V2", + }, + }, + }, + }, + }, + false, + "more containers", + }, + { + api.Pod{ + TypeMeta: api.TypeMeta{ID: "foo"}, + DesiredState: api.PodState{ + Manifest: api.ContainerManifest{ + Containers: []api.Container{ + { + Image: "foo:V1", + }, + }, + }, + }, + }, + api.Pod{ + TypeMeta: api.TypeMeta{ID: "foo"}, + DesiredState: api.PodState{ + Manifest: api.ContainerManifest{ + Containers: []api.Container{ + { + Image: "foo:V2", + }, + }, + }, + }, + }, + true, + "image change", + }, + { + api.Pod{ + TypeMeta: api.TypeMeta{ID: "foo"}, + DesiredState: api.PodState{ + Manifest: api.ContainerManifest{ + Containers: []api.Container{ + { + Image: "foo:V1", + CPU: 100, + }, + }, + }, + }, + }, + api.Pod{ + TypeMeta: api.TypeMeta{ID: "foo"}, + DesiredState: api.PodState{ + Manifest: api.ContainerManifest{ + Containers: []api.Container{ + { + Image: "foo:V2", + CPU: 1000, + }, + }, + }, + }, + }, + false, + "cpu change", + }, + { + api.Pod{ + TypeMeta: api.TypeMeta{ID: "foo"}, + DesiredState: api.PodState{ + Manifest: api.ContainerManifest{ + Containers: []api.Container{ + { + Image: "foo:V1", + Ports: []api.Port{ + {HostPort: 8080, ContainerPort: 80}, + }, + }, + }, + }, + }, + }, + api.Pod{ + TypeMeta: api.TypeMeta{ID: "foo"}, + DesiredState: api.PodState{ + Manifest: api.ContainerManifest{ + Containers: []api.Container{ + { + Image: "foo:V2", + Ports: []api.Port{ + {HostPort: 8000, ContainerPort: 80}, + }, + }, + }, + }, + }, + }, + false, + "port change", + }, + } + + for _, test := range tests { + errs := ValidatePodUpdate(&test.a, &test.b) + if test.isValid { + if len(errs) != 0 { + t.Errorf("unexpected invalid: %s %v, %v", test.test, test.a, test.b) + } + } else { + if len(errs) == 0 { + t.Errorf("unexpected valid: %s %v, %v", test.test, test.a, test.b) + } + } + } +} + func TestValidateService(t *testing.T) { testCases := []struct { name string diff --git a/pkg/registry/etcd/etcd.go b/pkg/registry/etcd/etcd.go index 36a5f8feb62..9d0d4860b88 100644 --- a/pkg/registry/etcd/etcd.go +++ b/pkg/registry/etcd/etcd.go @@ -21,7 +21,9 @@ import ( "strconv" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" etcderr "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors/etcd" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/validation" "github.com/GoogleCloudPlatform/kubernetes/pkg/constraint" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/pod" @@ -198,7 +200,44 @@ func (r *Registry) assignPod(podID string, machine string) error { } func (r *Registry) UpdatePod(ctx api.Context, pod *api.Pod) error { - return fmt.Errorf("unimplemented!") + var podOut api.Pod + podKey := makePodKey(pod.ID) + err := r.EtcdHelper.ExtractObj(podKey, &podOut, false) + if err != nil { + return err + } + scheduled := podOut.DesiredState.Host != "" + if scheduled { + pod.DesiredState.Host = podOut.DesiredState.Host + // If it's already been scheduled, limit the types of updates we'll accept. + errs := validation.ValidatePodUpdate(pod, &podOut) + if len(errs) != 0 { + return errors.NewInvalid("Pod", pod.ID, errs) + } + } + // There's no race with the scheduler, because either this write will fail because the host + // has been updated, or the host update will fail because this pod has been updated. + err = r.EtcdHelper.SetObj(podKey, pod) + if err != nil { + return err + } + if !scheduled { + // never scheduled, just update. + return nil + } + containerKey := makeContainerKey(podOut.DesiredState.Host) + return r.AtomicUpdate(containerKey, &api.ContainerManifestList{}, func(in runtime.Object) (runtime.Object, error) { + manifests := in.(*api.ContainerManifestList) + for ix := range manifests.Items { + if manifests.Items[ix].ID == pod.ID { + manifests.Items[ix] = pod.DesiredState.Manifest + return manifests, nil + } + } + // This really shouldn't happen + glog.Warningf("Couldn't find: %s in %#v", pod.ID, manifests) + return manifests, fmt.Errorf("Failed to update pod, couldn't find %s in %#v", pod.ID, manifests) + }) } // DeletePod deletes an existing pod specified by its ID. diff --git a/pkg/registry/etcd/etcd_test.go b/pkg/registry/etcd/etcd_test.go index 8a02c921e46..dbd964ffe8f 100644 --- a/pkg/registry/etcd/etcd_test.go +++ b/pkg/registry/etcd/etcd_test.go @@ -364,6 +364,148 @@ func TestEtcdCreatePodWithExistingContainers(t *testing.T) { } } +func TestEtcdUpdatePodNotFound(t *testing.T) { + ctx := api.NewContext() + fakeClient := tools.NewFakeEtcdClient(t) + fakeClient.TestIndex = true + + key := "/registry/pods/foo" + fakeClient.Data[key] = tools.EtcdResponseWithError{ + R: &etcd.Response{}, + E: tools.EtcdErrorNotFound, + } + + registry := NewTestEtcdRegistry(fakeClient) + podIn := api.Pod{ + TypeMeta: api.TypeMeta{ID: "foo", ResourceVersion: "1"}, + Labels: map[string]string{ + "foo": "bar", + }, + } + err := registry.UpdatePod(ctx, &podIn) + if err == nil { + t.Errorf("unexpected non-error") + } +} + +func TestEtcdUpdatePodNotScheduled(t *testing.T) { + ctx := api.NewContext() + fakeClient := tools.NewFakeEtcdClient(t) + fakeClient.TestIndex = true + + key := "/registry/pods/foo" + fakeClient.Set(key, runtime.EncodeOrDie(latest.Codec, &api.Pod{ + TypeMeta: api.TypeMeta{ID: "foo"}, + }), 1) + + registry := NewTestEtcdRegistry(fakeClient) + podIn := api.Pod{ + TypeMeta: api.TypeMeta{ID: "foo", ResourceVersion: "1"}, + Labels: map[string]string{ + "foo": "bar", + }, + } + err := registry.UpdatePod(ctx, &podIn) + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + response, err := fakeClient.Get(key, false, false) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + var podOut api.Pod + latest.Codec.DecodeInto([]byte(response.Node.Value), &podOut) + if !reflect.DeepEqual(podOut, podIn) { + t.Errorf("expected: %v, got: %v", podOut, podIn) + } +} + +func TestEtcdUpdatePodScheduled(t *testing.T) { + ctx := api.NewContext() + fakeClient := tools.NewFakeEtcdClient(t) + fakeClient.TestIndex = true + + key := "/registry/pods/foo" + fakeClient.Set(key, runtime.EncodeOrDie(latest.Codec, &api.Pod{ + TypeMeta: api.TypeMeta{ID: "foo"}, + DesiredState: api.PodState{ + Host: "machine", + Manifest: api.ContainerManifest{ + ID: "foo", + Containers: []api.Container{ + { + Image: "foo:v1", + }, + }, + }, + }, + }), 1) + + contKey := "/registry/hosts/machine/kubelet" + fakeClient.Set(contKey, runtime.EncodeOrDie(latest.Codec, &api.ContainerManifestList{ + Items: []api.ContainerManifest{ + { + ID: "foo", + Containers: []api.Container{ + { + Image: "foo:v1", + }, + }, + }, + { + ID: "bar", + Containers: []api.Container{ + { + Image: "bar:v1", + }, + }, + }, + }, + }), 0) + + registry := NewTestEtcdRegistry(fakeClient) + podIn := api.Pod{ + TypeMeta: api.TypeMeta{ID: "foo", ResourceVersion: "1"}, + DesiredState: api.PodState{ + Manifest: api.ContainerManifest{ + ID: "foo", + Containers: []api.Container{ + { + Image: "foo:v2", + }, + }, + }, + }, + Labels: map[string]string{ + "foo": "bar", + }, + } + err := registry.UpdatePod(ctx, &podIn) + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + response, err := fakeClient.Get(key, false, false) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + var podOut api.Pod + latest.Codec.DecodeInto([]byte(response.Node.Value), &podOut) + podIn.DesiredState.Host = "machine" + if !reflect.DeepEqual(podOut, podIn) { + t.Errorf("expected: %#v, got: %#v", podOut, podIn) + } + + response, err = fakeClient.Get(contKey, false, false) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + var list api.ContainerManifestList + latest.Codec.DecodeInto([]byte(response.Node.Value), &list) + if len(list.Items) != 2 || !reflect.DeepEqual(list.Items[0], podIn.DesiredState.Manifest) { + t.Errorf("unexpected container list: %d %v %v", len(list.Items), list.Items[0], podIn.DesiredState.Manifest) + } +} + func TestEtcdDeletePod(t *testing.T) { ctx := api.NewContext() fakeClient := tools.NewFakeEtcdClient(t)