From ab4918464e8e7a6359815d29541b59fbd6521422 Mon Sep 17 00:00:00 2001 From: Brendan Burns Date: Fri, 1 Aug 2014 14:02:37 -0700 Subject: [PATCH] Switched Set to Create --- pkg/registry/etcdregistry.go | 3 ++- pkg/registry/etcdregistry_test.go | 27 ++++++++++++++++++++++----- pkg/tools/etcd_tools.go | 11 +++++------ 3 files changed, 29 insertions(+), 12 deletions(-) diff --git a/pkg/registry/etcdregistry.go b/pkg/registry/etcdregistry.go index 8dd6089a284..3ce50ddcd13 100644 --- a/pkg/registry/etcdregistry.go +++ b/pkg/registry/etcdregistry.go @@ -311,5 +311,6 @@ func (registry *EtcdRegistry) UpdateService(svc api.Service) error { // UpdateEndpoints update Endpoints of a Service. func (registry *EtcdRegistry) UpdateEndpoints(e api.Endpoints) error { - return registry.helper.SetObj("/registry/services/endpoints/"+e.ID, e) + updateFunc := func(interface{}) (interface{}, error) { return e, nil } + return registry.helper.AtomicUpdate("/registry/services/endpoints/"+e.ID, &api.Endpoints{}, updateFunc) } diff --git a/pkg/registry/etcdregistry_test.go b/pkg/registry/etcdregistry_test.go index 51d838caab5..e033d9aca72 100644 --- a/pkg/registry/etcdregistry_test.go +++ b/pkg/registry/etcdregistry_test.go @@ -66,6 +66,7 @@ func TestEtcdGetPodNotFound(t *testing.T) { func TestEtcdCreatePod(t *testing.T) { fakeClient := tools.MakeFakeEtcdClient(t) + fakeClient.TestIndex = true fakeClient.Data["/registry/hosts/machine/pods/foo"] = tools.EtcdResponseWithError{ R: &etcd.Response{ Node: nil, @@ -231,6 +232,7 @@ func TestEtcdCreatePodWithContainersNotFound(t *testing.T) { func TestEtcdCreatePodWithExistingContainers(t *testing.T) { fakeClient := tools.MakeFakeEtcdClient(t) + fakeClient.TestIndex = true fakeClient.Data["/registry/hosts/machine/pods/foo"] = tools.EtcdResponseWithError{ R: &etcd.Response{ Node: nil, @@ -289,6 +291,8 @@ func TestEtcdCreatePodWithExistingContainers(t *testing.T) { func TestEtcdDeletePod(t *testing.T) { fakeClient := tools.MakeFakeEtcdClient(t) + fakeClient.TestIndex = true + key := "/registry/hosts/machine/pods/foo" fakeClient.Set(key, util.MakeJSONString(api.Pod{JSONBase: api.JSONBase{ID: "foo"}}), 0) fakeClient.Set("/registry/hosts/machine/kubelet", api.EncodeOrDie(&api.ContainerManifestList{ @@ -320,6 +324,8 @@ func TestEtcdDeletePod(t *testing.T) { func TestEtcdDeletePodMultipleContainers(t *testing.T) { fakeClient := tools.MakeFakeEtcdClient(t) + fakeClient.TestIndex = true + key := "/registry/hosts/machine/pods/foo" fakeClient.Set(key, util.MakeJSONString(api.Pod{JSONBase: api.JSONBase{ID: "foo"}}), 0) fakeClient.Set("/registry/hosts/machine/kubelet", api.EncodeOrDie(&api.ContainerManifestList{ @@ -570,10 +576,12 @@ func TestEtcdCreateController(t *testing.T) { func TestEtcdUpdateController(t *testing.T) { fakeClient := tools.MakeFakeEtcdClient(t) - fakeClient.Set("/registry/controllers/foo", util.MakeJSONString(api.ReplicationController{JSONBase: api.JSONBase{ID: "foo"}}), 0) + fakeClient.TestIndex = true + + resp, _ := fakeClient.Set("/registry/controllers/foo", util.MakeJSONString(api.ReplicationController{JSONBase: api.JSONBase{ID: "foo"}}), 0) registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"}) err := registry.UpdateController(api.ReplicationController{ - JSONBase: api.JSONBase{ID: "foo"}, + JSONBase: api.JSONBase{ID: "foo", ResourceVersion: resp.Node.ModifiedIndex}, DesiredState: api.ReplicationControllerState{ Replicas: 2, }, @@ -701,10 +709,12 @@ func TestEtcdDeleteService(t *testing.T) { func TestEtcdUpdateService(t *testing.T) { fakeClient := tools.MakeFakeEtcdClient(t) - fakeClient.Set("/registry/services/specs/foo", util.MakeJSONString(api.Service{JSONBase: api.JSONBase{ID: "foo"}}), 0) + fakeClient.TestIndex = true + + resp, _ := fakeClient.Set("/registry/services/specs/foo", util.MakeJSONString(api.Service{JSONBase: api.JSONBase{ID: "foo"}}), 0) registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"}) testService := api.Service{ - JSONBase: api.JSONBase{ID: "foo"}, + JSONBase: api.JSONBase{ID: "foo", ResourceVersion: resp.Node.ModifiedIndex}, Labels: map[string]string{ "baz": "bar", }, @@ -722,18 +732,25 @@ func TestEtcdUpdateService(t *testing.T) { t.Errorf("unexpected error: %v", err) } + // Clear modified indices before the equality test. + svc.ResourceVersion = 0 + testService.ResourceVersion = 0 if !reflect.DeepEqual(*svc, testService) { - t.Errorf("Unexpected service: got %#v, wanted %#v", svc, testService) + t.Errorf("Unexpected service: got\n %#v\n, wanted\n %#v", svc, testService) } } func TestEtcdUpdateEndpoints(t *testing.T) { fakeClient := tools.MakeFakeEtcdClient(t) + fakeClient.TestIndex = true registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"}) endpoints := api.Endpoints{ JSONBase: api.JSONBase{ID: "foo"}, Endpoints: []string{"baz", "bar"}, } + + fakeClient.Set("/registry/services/endpoints/foo", util.MakeJSONString(api.Endpoints{}), 0) + err := registry.UpdateEndpoints(endpoints) if err != nil { t.Errorf("unexpected error: %v", err) diff --git a/pkg/tools/etcd_tools.go b/pkg/tools/etcd_tools.go index b186cdec5d2..52c68f2ec6e 100644 --- a/pkg/tools/etcd_tools.go +++ b/pkg/tools/etcd_tools.go @@ -69,6 +69,7 @@ type EtcdClient interface { type EtcdGetSet interface { Get(key string, sort, recursive bool) (*etcd.Response, error) Set(key, value string, ttl uint64) (*etcd.Response, error) + Create(key, value string, ttl uint64) (*etcd.Response, error) CompareAndSwap(key, value string, ttl uint64, prevValue string, prevIndex uint64) (*etcd.Response, error) Watch(prefix string, waitIndex uint64, recursive bool, receiver chan *etcd.Response, stop chan bool) (*etcd.Response, error) } @@ -185,8 +186,8 @@ func (h *EtcdHelper) SetObj(key string, obj interface{}) error { } } - // TODO: when client supports atomic creation, integrate this with the above. - _, err = h.Client.Set(key, string(data), 0) + // Create will fail if a key already exists. + _, err = h.Client.Create(key, string(data), 0) return err } @@ -232,8 +233,6 @@ func (h *EtcdHelper) AtomicUpdate(key string, ptrToType interface{}, tryUpdate E } // First time this key has been used, just set. - // TODO: This is racy. Fix when our client supports prevExist. See: - // https://github.com/coreos/etcd/blob/master/Documentation/api.md#atomic-compare-and-swap if index == 0 { return h.SetObj(key, ret) } @@ -350,7 +349,7 @@ func (w *etcdWatcher) sendResult(res *etcd.Response) { var action watch.EventType var data []byte switch res.Action { - case "set": + case "create", "set": if res.Node == nil { glog.Errorf("unexpected nil node: %#v", res) return @@ -376,7 +375,7 @@ func (w *etcdWatcher) sendResult(res *etcd.Response) { obj, err := w.encoding.Decode(data) if err != nil { - glog.Errorf("failure to decode api object: '%v' from %#v", string(data), res) + glog.Errorf("failure to decode api object: '%v' from %#v %#v", string(data), res, res.Node) // TODO: expose an error through watch.Interface? w.Stop() return