From 529870d12150b05387dbb18c51c62523f69df2a1 Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Fri, 27 Jun 2014 10:55:05 -0700 Subject: [PATCH] Generic atomic update code --- pkg/registry/etcd_registry.go | 8 +-- pkg/util/etcd_tools.go | 92 ++++++++++++++++++++++++++++------- pkg/util/etcd_tools_test.go | 6 +-- 3 files changed, 81 insertions(+), 25 deletions(-) diff --git a/pkg/registry/etcd_registry.go b/pkg/registry/etcd_registry.go index b2a7a68c898..0372aa3da2f 100644 --- a/pkg/registry/etcd_registry.go +++ b/pkg/registry/etcd_registry.go @@ -91,7 +91,7 @@ func makeContainerKey(machine string) string { } func (registry *EtcdRegistry) loadManifests(machine string) (manifests []api.ContainerManifest, index uint64, err error) { - err, index = registry.helper().ExtractObj(makeContainerKey(machine), &manifests, true) + index, err = registry.helper().ExtractObj(makeContainerKey(machine), &manifests, true) return manifests, index, err } @@ -190,7 +190,7 @@ func (registry *EtcdRegistry) deletePodFromMachine(machine, podID string) error func (registry *EtcdRegistry) getPodForMachine(machine, podID string) (pod api.Pod, err error) { key := makePodKey(machine, podID) - err, _ = registry.helper().ExtractObj(key, &pod, false) + _, err = registry.helper().ExtractObj(key, &pod, false) if err != nil { return } @@ -225,7 +225,7 @@ func makeControllerKey(id string) string { func (registry *EtcdRegistry) GetController(controllerID string) (*api.ReplicationController, error) { var controller api.ReplicationController key := makeControllerKey(controllerID) - err, _ := registry.helper().ExtractObj(key, &controller, false) + _, err := registry.helper().ExtractObj(key, &controller, false) if err != nil { return nil, err } @@ -264,7 +264,7 @@ func (registry *EtcdRegistry) CreateService(svc api.Service) error { func (registry *EtcdRegistry) GetService(name string) (*api.Service, error) { key := makeServiceKey(name) var svc api.Service - err, _ := registry.helper().ExtractObj(key, &svc, false) + _, err := registry.helper().ExtractObj(key, &svc, false) if err != nil { return nil, err } diff --git a/pkg/util/etcd_tools.go b/pkg/util/etcd_tools.go index 26b2641939c..aeb2d45bc06 100644 --- a/pkg/util/etcd_tools.go +++ b/pkg/util/etcd_tools.go @@ -116,26 +116,13 @@ func (h *EtcdHelper) ExtractList(key string, slicePtr interface{}) error { // Unmarshals json found at key into objPtr. On a not found error, will either return // a zero object of the requested type, or an error, depending on ignoreNotFound. Treats // empty responses and nil response nodes exactly like a not found error. -func (h *EtcdHelper) ExtractObj(key string, objPtr interface{}, ignoreNotFound bool) (error, uint64) { - response, err := h.Client.Get(key, false, false) - - if err != nil && !IsEtcdNotFound(err) { - return err, 0 - } - if err != nil || response.Node == nil || len(response.Node.Value) == 0 { - if ignoreNotFound { - pv := reflect.ValueOf(objPtr) - pv.Elem().Set(reflect.Zero(pv.Type().Elem())) - return nil, 0 - } else if err != nil { - return err, 0 - } - return fmt.Errorf("key '%v' found no nodes field: %#v", key, response), 0 - } - return json.Unmarshal([]byte(response.Node.Value), objPtr), response.Node.ModifiedIndex +func (h *EtcdHelper) ExtractObj(key string, objPtr interface{}, ignoreNotFound bool) (modifiedIndex uint64, err error) { + _, modifiedIndex, err = h.bodyAndExtractObj(key, objPtr, ignoreNotFound) + return modifiedIndex, err } -// CompareAndSwapObj marshals obj via json, and stores under key so long as index matches the previous modified index +// CompareAndSwapObj marshals obj via json, and stores under key so long as index matches +// the previous modified index. func (h *EtcdHelper) CompareAndSwapObj(key string, obj interface{}, index uint64) error { data, err := json.Marshal(obj) if err != nil { @@ -145,6 +132,32 @@ func (h *EtcdHelper) CompareAndSwapObj(key string, obj interface{}, index uint64 return err } +/* +func (h *EtcdHelper) ExtractObj(key string, objPtr interface{}, ignoreNotFound bool) error { + _, _, err := h.bodyAndExtractObj(key, objPtr, ignoreNotFound) + return err +}*/ + +func (h *EtcdHelper) bodyAndExtractObj(key string, objPtr interface{}, ignoreNotFound bool) (body string, modifiedIndex uint64, err error) { + response, err := h.Client.Get(key, false, false) + + if err != nil && !IsEtcdNotFound(err) { + return "", 0, err + } + if err != nil || response.Node == nil || len(response.Node.Value) == 0 { + if ignoreNotFound { + pv := reflect.ValueOf(objPtr) + pv.Elem().Set(reflect.Zero(pv.Type().Elem())) + return "", 0, nil + } else if err != nil { + return "", 0, err + } + return "", 0, fmt.Errorf("key '%v' found no nodes field: %#v", key, response) + } + body = response.Node.Value + return body, response.Node.ModifiedIndex, json.Unmarshal([]byte(body), objPtr) +} + // SetObj marshals obj via json, and stores under key. func (h *EtcdHelper) SetObj(key string, obj interface{}) error { data, err := json.Marshal(obj) @@ -154,3 +167,46 @@ func (h *EtcdHelper) SetObj(key string, obj interface{}) error { _, err = h.Client.Set(key, string(data), 0) return err } + +// Pass an EtcdUpdateFunc to EtcdHelper.AtomicUpdate to make an atomic etcd update. +// See the comment for AtomicUpdate for more detail. +type EtcdUpdateFunc func() (interface{}, error) + +// AtomicUpdate generalizes the pattern that allows for making atomic updates to etcd objects. +// Note, tryUpdate may be called more than once. +// +// Example: +// +// h := &util.EtcdHelper{client} +// var currentObj MyType +// err := h.AtomicUpdate("myKey", ¤tObj, func() (interface{}, error) { +// // Before this function is called, currentObj has been reset to etcd's current +// // contents for "myKey". +// +// // Make a *modification*. +// currentObj.Counter++ +// +// // Return the modified object. Return an error to stop iterating. +// return currentObj, nil +// }) +// +func (h *EtcdHelper) AtomicUpdate(key string, objPtr interface{}, tryUpdate EtcdUpdateFunc) error { + for { + origBody, index, err := h.bodyAndExtractObj(key, objPtr, true) + if err != nil { + return err + } + + ret, err := tryUpdate() + if err != nil { + return err + } + + data, err := json.Marshal(ret) + if err != nil { + return err + } + _, err = h.Client.CompareAndSwap(key, string(data), 0, origBody, index) + return err + } +} diff --git a/pkg/util/etcd_tools_test.go b/pkg/util/etcd_tools_test.go index a322e504216..4355dcd66b9 100644 --- a/pkg/util/etcd_tools_test.go +++ b/pkg/util/etcd_tools_test.go @@ -86,7 +86,7 @@ func TestExtractObj(t *testing.T) { fakeClient.Set("/some/key", MakeJSONString(expect), 0) helper := EtcdHelper{fakeClient} var got testMarshalType - err, _ := helper.ExtractObj("/some/key", &got, false) + _, err := helper.ExtractObj("/some/key", &got, false) if err != nil { t.Errorf("Unexpected error %#v", err) } @@ -120,11 +120,11 @@ func TestExtractObjNotFoundErr(t *testing.T) { helper := EtcdHelper{fakeClient} try := func(key string) { var got testMarshalType - err, _ := helper.ExtractObj(key, &got, false) + _, err := helper.ExtractObj(key, &got, false) if err == nil { t.Errorf("%s: wanted error but didn't get one", key) } - err, _ = helper.ExtractObj(key, &got, true) + _, err = helper.ExtractObj(key, &got, true) if err != nil { t.Errorf("%s: didn't want error but got %#v", key, err) }