Stop updating boundPods objects.

Does not clean up your existing boundPods records.
Does not clean up all the dead code.  Future PRs from me
will do that.
This commit is contained in:
Eric Tune 2015-03-11 13:34:57 -07:00
parent 7d53425bbc
commit e9b6c75b6a
2 changed files with 5 additions and 264 deletions

View File

@ -31,8 +31,6 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools" "github.com/GoogleCloudPlatform/kubernetes/pkg/tools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
"github.com/golang/glog"
) )
// rest implements a RESTStorage for pods against etcd // rest implements a RESTStorage for pods against etcd
@ -64,7 +62,7 @@ func NewREST(h tools.EtcdHelper, factory pod.BoundPodFactory) (*REST, *BindingRE
} }
statusStore := *store statusStore := *store
bindings := &podLifecycle{h} bindings := &podLifecycle{}
store.CreateStrategy = pod.Strategy store.CreateStrategy = pod.Strategy
store.UpdateStrategy = pod.Strategy store.UpdateStrategy = pod.Strategy
store.AfterUpdate = bindings.AfterUpdate store.AfterUpdate = bindings.AfterUpdate
@ -130,10 +128,6 @@ func (r *REST) ResourceLocation(ctx api.Context, name string) (string, error) {
return pod.ResourceLocation(r, ctx, name) return pod.ResourceLocation(r, ctx, name)
} }
func makeBoundPodsKey(machine string) string {
return "/registry/nodes/" + machine + "/boundpods"
}
// BindingREST implements the REST endpoint for binding pods to nodes when etcd is in use. // BindingREST implements the REST endpoint for binding pods to nodes when etcd is in use.
type BindingREST struct { type BindingREST struct {
store *etcdgeneric.Etcd store *etcdgeneric.Etcd
@ -185,81 +179,21 @@ func (r *BindingREST) setPodHostTo(ctx api.Context, podID, oldMachine, machine s
// assignPod assigns the given pod to the given machine. // assignPod assigns the given pod to the given machine.
func (r *BindingREST) assignPod(ctx api.Context, podID string, machine string) error { func (r *BindingREST) assignPod(ctx api.Context, podID string, machine string) error {
finalPod, err := r.setPodHostTo(ctx, podID, "", machine) _, err := r.setPodHostTo(ctx, podID, "", machine)
if err != nil { if err != nil {
return err return err
} }
boundPod, err := r.factory.MakeBoundPod(machine, finalPod)
if err != nil {
return err
}
contKey := makeBoundPodsKey(machine)
err = r.store.Helper.AtomicUpdate(contKey, &api.BoundPods{}, true, func(in runtime.Object) (runtime.Object, error) {
boundPodList := in.(*api.BoundPods)
boundPodList.Items = append(boundPodList.Items, *boundPod)
return boundPodList, nil
})
if err != nil {
// Put the pod's host back the way it was. This is a terrible hack, but
// can't really be helped, since there's not really a way to do atomic
// multi-object changes in etcd.
if _, err2 := r.setPodHostTo(ctx, podID, machine, ""); err2 != nil {
glog.Errorf("Stranding pod %v; couldn't clear host after previous error: %v", podID, err2)
}
}
return err return err
} }
type podLifecycle struct { type podLifecycle struct{}
tools.EtcdHelper
}
func (h *podLifecycle) AfterUpdate(obj runtime.Object) error { func (h *podLifecycle) AfterUpdate(obj runtime.Object) error {
pod := obj.(*api.Pod) return nil
if len(pod.Status.Host) == 0 {
return nil
}
containerKey := makeBoundPodsKey(pod.Status.Host)
return h.AtomicUpdate(containerKey, &api.BoundPods{}, true, func(in runtime.Object) (runtime.Object, error) {
boundPods := in.(*api.BoundPods)
for ix := range boundPods.Items {
if boundPods.Items[ix].Name == pod.Name && boundPods.Items[ix].Namespace == pod.Namespace {
boundPods.Items[ix].Spec = pod.Spec
return boundPods, nil
}
}
// This really shouldn't happen
glog.Warningf("Couldn't find: %s in %#v", pod.Name, boundPods)
return boundPods, fmt.Errorf("failed to update pod, couldn't find %s in %#v", pod.Name, boundPods)
})
} }
func (h *podLifecycle) AfterDelete(obj runtime.Object) error { func (h *podLifecycle) AfterDelete(obj runtime.Object) error {
pod := obj.(*api.Pod) return nil
if len(pod.Status.Host) == 0 {
return nil
}
containerKey := makeBoundPodsKey(pod.Status.Host)
return h.AtomicUpdate(containerKey, &api.BoundPods{}, true, func(in runtime.Object) (runtime.Object, error) {
pods := in.(*api.BoundPods)
newPods := make([]api.BoundPod, 0, len(pods.Items))
found := false
for _, boundPod := range pods.Items {
if boundPod.Name != pod.Name || boundPod.Namespace != pod.Namespace {
newPods = append(newPods, boundPod)
} else {
found = true
}
}
if !found {
// This really shouldn't happen, it indicates something is broken, and likely
// there is a lost pod somewhere.
// However it is "deleted" so log it and move on
glog.Warningf("Couldn't find: %s in %#v", pod.Name, pods)
}
pods.Items = newPods
return pods, nil
})
} }
// StatusREST implements the REST endpoint for changing the status of a pod. // StatusREST implements the REST endpoint for changing the status of a pod.

View File

@ -669,30 +669,6 @@ func TestResourceLocation(t *testing.T) {
func TestDeletePod(t *testing.T) { func TestDeletePod(t *testing.T) {
fakeEtcdClient, helper := newHelper(t) fakeEtcdClient, helper := newHelper(t)
fakeEtcdClient.ChangeIndex = 1 fakeEtcdClient.ChangeIndex = 1
fakeEtcdClient.Data["/registry/nodes/machine/boundpods"] = tools.EtcdResponseWithError{
R: &etcd.Response{
Node: &etcd.Node{
Value: runtime.EncodeOrDie(latest.Codec, &api.BoundPods{
Items: []api.BoundPod{
{
ObjectMeta: api.ObjectMeta{
Name: "foo",
Namespace: "other",
},
},
{
ObjectMeta: api.ObjectMeta{
Name: "foo",
Namespace: api.NamespaceDefault,
},
},
},
}),
ModifiedIndex: 1,
CreatedIndex: 1,
},
},
}
fakeEtcdClient.Data["/registry/pods/default/foo"] = tools.EtcdResponseWithError{ fakeEtcdClient.Data["/registry/pods/default/foo"] = tools.EtcdResponseWithError{
R: &etcd.Response{ R: &etcd.Response{
Node: &etcd.Node{ Node: &etcd.Node{
@ -719,15 +695,6 @@ func TestDeletePod(t *testing.T) {
if cache.clearedNamespace != "default" || cache.clearedName != "foo" { if cache.clearedNamespace != "default" || cache.clearedName != "foo" {
t.Fatalf("Unexpected cache delete: %s %s %#v", cache.clearedName, cache.clearedNamespace, result) t.Fatalf("Unexpected cache delete: %s %s %#v", cache.clearedName, cache.clearedNamespace, result)
} }
actual := &api.BoundPods{}
if err := helper.ExtractObj("/registry/nodes/machine/boundpods", actual, false); err != nil {
t.Fatalf("unexpected error: %v", err)
}
// verify bound pods removes the correct namsepace
if len(actual.Items) != 1 || actual.Items[0].Namespace != "other" {
t.Errorf("bound pods should be empty: %#v", actual)
}
} }
// TestEtcdGetDifferentNamespace ensures same-name pods in different namespaces do not clash // TestEtcdGetDifferentNamespace ensures same-name pods in different namespaces do not clash
@ -811,7 +778,6 @@ func TestEtcdCreate(t *testing.T) {
}, },
E: tools.EtcdErrorNotFound, E: tools.EtcdErrorNotFound,
} }
fakeClient.Set("/registry/nodes/machine/boundpods", runtime.EncodeOrDie(latest.Codec, &api.BoundPods{}), 0)
_, err := registry.Create(ctx, validNewPod()) _, err := registry.Create(ctx, validNewPod())
if err != nil { if err != nil {
t.Fatalf("unexpected error: %v", err) t.Fatalf("unexpected error: %v", err)
@ -839,16 +805,7 @@ func TestEtcdCreate(t *testing.T) {
if pod.Name != "foo" { if pod.Name != "foo" {
t.Errorf("Unexpected pod: %#v %s", pod, resp.Node.Value) t.Errorf("Unexpected pod: %#v %s", pod, resp.Node.Value)
} }
var boundPods api.BoundPods
resp, err = fakeClient.Get("/registry/nodes/machine/boundpods", false, false)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
err = latest.Codec.DecodeInto([]byte(resp.Node.Value), &boundPods)
if len(boundPods.Items) != 1 || boundPods.Items[0].Name != "foo" {
t.Errorf("Unexpected boundPod list: %#v", boundPods)
}
} }
// Ensure that when scheduler creates a binding for a pod that has already been deleted // Ensure that when scheduler creates a binding for a pod that has already been deleted
@ -919,47 +876,6 @@ func TestEtcdCreateAlreadyExisting(t *testing.T) {
} }
} }
func TestEtcdCreateWithContainersError(t *testing.T) {
registry, bindingRegistry, _, fakeClient, _ := newStorage(t)
ctx := api.NewDefaultContext()
fakeClient.TestIndex = true
key, _ := registry.store.KeyFunc(ctx, "foo")
fakeClient.Data[key] = tools.EtcdResponseWithError{
R: &etcd.Response{
Node: nil,
},
E: tools.EtcdErrorNotFound,
}
fakeClient.Data["/registry/nodes/machine/boundpods"] = tools.EtcdResponseWithError{
R: &etcd.Response{
Node: nil,
},
E: tools.EtcdErrorNodeExist, // validate that ApplyBinding is translating Create errors
}
_, err := registry.Create(ctx, validNewPod())
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// Suddenly, a wild scheduler appears:
_, err = bindingRegistry.Create(ctx, &api.Binding{
ObjectMeta: api.ObjectMeta{Namespace: api.NamespaceDefault, Name: "foo"},
Target: api.ObjectReference{Name: "machine"},
})
if !errors.IsAlreadyExists(err) {
t.Fatalf("Unexpected error returned: %#v", err)
}
obj, err := registry.Get(ctx, "foo")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
existingPod := obj.(*api.Pod)
if existingPod.Status.Host == "machine" {
t.Fatal("Pod's host changed in response to an non-apply-able binding.")
}
}
func TestEtcdCreateWithContainersNotFound(t *testing.T) { func TestEtcdCreateWithContainersNotFound(t *testing.T) {
registry, bindingRegistry, _, fakeClient, _ := newStorage(t) registry, bindingRegistry, _, fakeClient, _ := newStorage(t)
ctx := api.NewDefaultContext() ctx := api.NewDefaultContext()
@ -971,12 +887,6 @@ func TestEtcdCreateWithContainersNotFound(t *testing.T) {
}, },
E: tools.EtcdErrorNotFound, E: tools.EtcdErrorNotFound,
} }
fakeClient.Data["/registry/nodes/machine/boundpods"] = tools.EtcdResponseWithError{
R: &etcd.Response{
Node: nil,
},
E: tools.EtcdErrorNotFound,
}
_, err := registry.Create(ctx, validNewPod()) _, err := registry.Create(ctx, validNewPod())
if err != nil { if err != nil {
t.Fatalf("unexpected error: %v", err) t.Fatalf("unexpected error: %v", err)
@ -1004,16 +914,6 @@ func TestEtcdCreateWithContainersNotFound(t *testing.T) {
if pod.Name != "foo" { if pod.Name != "foo" {
t.Errorf("Unexpected pod: %#v %s", pod, resp.Node.Value) t.Errorf("Unexpected pod: %#v %s", pod, resp.Node.Value)
} }
var boundPods api.BoundPods
resp, err = fakeClient.Get("/registry/nodes/machine/boundpods", false, false)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
err = latest.Codec.DecodeInto([]byte(resp.Node.Value), &boundPods)
if len(boundPods.Items) != 1 || boundPods.Items[0].Name != "foo" {
t.Errorf("Unexpected boundPod list: %#v", boundPods)
}
} }
func TestEtcdCreateWithExistingContainers(t *testing.T) { func TestEtcdCreateWithExistingContainers(t *testing.T) {
@ -1027,11 +927,6 @@ func TestEtcdCreateWithExistingContainers(t *testing.T) {
}, },
E: tools.EtcdErrorNotFound, E: tools.EtcdErrorNotFound,
} }
fakeClient.Set("/registry/nodes/machine/boundpods", runtime.EncodeOrDie(latest.Codec, &api.BoundPods{
Items: []api.BoundPod{
{ObjectMeta: api.ObjectMeta{Name: "bar"}},
},
}), 0)
_, err := registry.Create(ctx, validNewPod()) _, err := registry.Create(ctx, validNewPod())
if err != nil { if err != nil {
t.Fatalf("unexpected error: %v", err) t.Fatalf("unexpected error: %v", err)
@ -1059,16 +954,6 @@ func TestEtcdCreateWithExistingContainers(t *testing.T) {
if pod.Name != "foo" { if pod.Name != "foo" {
t.Errorf("Unexpected pod: %#v %s", pod, resp.Node.Value) t.Errorf("Unexpected pod: %#v %s", pod, resp.Node.Value)
} }
var boundPods api.BoundPods
resp, err = fakeClient.Get("/registry/nodes/machine/boundpods", false, false)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
err = latest.Codec.DecodeInto([]byte(resp.Node.Value), &boundPods)
if len(boundPods.Items) != 2 || boundPods.Items[1].Name != "foo" {
t.Errorf("Unexpected boundPod list: %#v", boundPods)
}
} }
func TestEtcdCreateBinding(t *testing.T) { func TestEtcdCreateBinding(t *testing.T) {
@ -1124,12 +1009,9 @@ func TestEtcdCreateBinding(t *testing.T) {
}, },
E: tools.EtcdErrorNotFound, E: tools.EtcdErrorNotFound,
} }
path := fmt.Sprintf("/registry/nodes/%v/boundpods", test.binding.Target.Name)
fakeClient.Set(path, runtime.EncodeOrDie(latest.Codec, &api.BoundPods{}), 0)
if _, err := registry.Create(ctx, validNewPod()); err != nil { if _, err := registry.Create(ctx, validNewPod()); err != nil {
t.Fatalf("%s: unexpected error: %v", k, err) t.Fatalf("%s: unexpected error: %v", k, err)
} }
fakeClient.Set(path, runtime.EncodeOrDie(latest.Codec, &api.BoundPods{}), 0)
if _, err := bindingRegistry.Create(ctx, &test.binding); !test.errOK(err) { if _, err := bindingRegistry.Create(ctx, &test.binding); !test.errOK(err) {
t.Errorf("%s: unexpected error: %v", k, err) t.Errorf("%s: unexpected error: %v", k, err)
} else if err == nil { } else if err == nil {
@ -1218,37 +1100,6 @@ func TestEtcdUpdateScheduled(t *testing.T) {
}, },
}), 1) }), 1)
contKey := "/registry/nodes/machine/boundpods"
fakeClient.Set(contKey, runtime.EncodeOrDie(latest.Codec, &api.BoundPods{
Items: []api.BoundPod{
{
ObjectMeta: api.ObjectMeta{
Name: "foo",
Namespace: "other",
},
Spec: api.PodSpec{
Containers: []api.Container{
{
Image: "foo:v1",
},
},
},
}, {
ObjectMeta: api.ObjectMeta{
Name: "foo",
Namespace: api.NamespaceDefault,
},
Spec: api.PodSpec{
Containers: []api.Container{
{
Image: "foo:v1",
},
},
},
},
},
}), 0)
podIn := api.Pod{ podIn := api.Pod{
ObjectMeta: api.ObjectMeta{ ObjectMeta: api.ObjectMeta{
Name: "foo", Name: "foo",
@ -1286,18 +1137,6 @@ func TestEtcdUpdateScheduled(t *testing.T) {
t.Errorf("expected: %#v, got: %#v", podOut, podIn) t.Errorf("expected: %#v, got: %#v", podOut, podIn)
} }
response, err = fakeClient.Get(contKey, false, false)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
var list api.BoundPods
if err := latest.Codec.DecodeInto([]byte(response.Node.Value), &list); err != nil {
t.Fatalf("unexpected error decoding response: %v", err)
}
if len(list.Items) != 2 || !api.Semantic.DeepEqual(list.Items[1].Spec, podIn.Spec) {
t.Errorf("unexpected container list: %d\n items[0] - %#v\n podin.spec - %#v\n", len(list.Items), list.Items[0].Spec, podIn.Spec)
}
} }
func TestEtcdUpdateStatus(t *testing.T) { func TestEtcdUpdateStatus(t *testing.T) {
@ -1382,11 +1221,6 @@ func TestEtcdDeletePod(t *testing.T) {
ObjectMeta: api.ObjectMeta{Name: "foo"}, ObjectMeta: api.ObjectMeta{Name: "foo"},
Status: api.PodStatus{Host: "machine"}, Status: api.PodStatus{Host: "machine"},
}), 0) }), 0)
fakeClient.Set("/registry/nodes/machine/boundpods", runtime.EncodeOrDie(latest.Codec, &api.BoundPods{
Items: []api.BoundPod{
{ObjectMeta: api.ObjectMeta{Name: "foo"}},
},
}), 0)
_, err := registry.Delete(ctx, "foo") _, err := registry.Delete(ctx, "foo")
if err != nil { if err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
@ -1397,15 +1231,6 @@ func TestEtcdDeletePod(t *testing.T) {
} else if fakeClient.DeletedKeys[0] != key { } else if fakeClient.DeletedKeys[0] != key {
t.Errorf("Unexpected key: %s, expected %s", fakeClient.DeletedKeys[0], key) t.Errorf("Unexpected key: %s, expected %s", fakeClient.DeletedKeys[0], key)
} }
response, err := fakeClient.Get("/registry/nodes/machine/boundpods", false, false)
if err != nil {
t.Fatalf("Unexpected error %v", err)
}
var boundPods api.BoundPods
latest.Codec.DecodeInto([]byte(response.Node.Value), &boundPods)
if len(boundPods.Items) != 0 {
t.Errorf("Unexpected container set: %s, expected empty", response.Node.Value)
}
} }
func TestEtcdDeletePodMultipleContainers(t *testing.T) { func TestEtcdDeletePodMultipleContainers(t *testing.T) {
@ -1417,12 +1242,6 @@ func TestEtcdDeletePodMultipleContainers(t *testing.T) {
ObjectMeta: api.ObjectMeta{Name: "foo"}, ObjectMeta: api.ObjectMeta{Name: "foo"},
Status: api.PodStatus{Host: "machine"}, Status: api.PodStatus{Host: "machine"},
}), 0) }), 0)
fakeClient.Set("/registry/nodes/machine/boundpods", runtime.EncodeOrDie(latest.Codec, &api.BoundPods{
Items: []api.BoundPod{
{ObjectMeta: api.ObjectMeta{Name: "foo"}},
{ObjectMeta: api.ObjectMeta{Name: "bar"}},
},
}), 0)
_, err := registry.Delete(ctx, "foo") _, err := registry.Delete(ctx, "foo")
if err != nil { if err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
@ -1434,18 +1253,6 @@ func TestEtcdDeletePodMultipleContainers(t *testing.T) {
if fakeClient.DeletedKeys[0] != key { if fakeClient.DeletedKeys[0] != key {
t.Errorf("Unexpected key: %s, expected %s", fakeClient.DeletedKeys[0], key) t.Errorf("Unexpected key: %s, expected %s", fakeClient.DeletedKeys[0], key)
} }
response, err := fakeClient.Get("/registry/nodes/machine/boundpods", false, false)
if err != nil {
t.Fatalf("Unexpected error %v", err)
}
var boundPods api.BoundPods
latest.Codec.DecodeInto([]byte(response.Node.Value), &boundPods)
if len(boundPods.Items) != 1 {
t.Fatalf("Unexpected boundPod set: %#v, expected empty", boundPods)
}
if boundPods.Items[0].Name != "bar" {
t.Errorf("Deleted wrong boundPod: %#v", boundPods)
}
} }
func TestEtcdEmptyList(t *testing.T) { func TestEtcdEmptyList(t *testing.T) {