From 6b28a69a1b08ac59c284c1b02c8038b1d048ee7a Mon Sep 17 00:00:00 2001 From: David Oppenheimer Date: Sun, 12 Apr 2015 15:22:04 -0700 Subject: [PATCH] Clarify comments describing how GuaranteedUpdate() (previously AtomicUpdate() works. Closes #6626. --- pkg/apiserver/resthandler.go | 2 +- pkg/registry/generic/etcd/etcd.go | 2 +- pkg/registry/pod/etcd/etcd.go | 2 +- pkg/tools/etcd_helper.go | 22 ++++++++++++---------- pkg/tools/etcd_helper_test.go | 24 ++++++++++++------------ 5 files changed, 27 insertions(+), 25 deletions(-) diff --git a/pkg/apiserver/resthandler.go b/pkg/apiserver/resthandler.go index c5f98807c21..e92f7b30a83 100644 --- a/pkg/apiserver/resthandler.go +++ b/pkg/apiserver/resthandler.go @@ -244,7 +244,7 @@ func CreateResource(r rest.Creater, scope RequestScope, typer runtime.ObjectType } // PatchResource returns a function that will handle a resource patch -// TODO: Eventually PatchResource should just use AtomicUpdate and this routine should be a bit cleaner +// TODO: Eventually PatchResource should just use GuaranteedUpdate and this routine should be a bit cleaner func PatchResource(r rest.Patcher, scope RequestScope, typer runtime.ObjectTyper, admit admission.Interface, converter runtime.ObjectConvertor) restful.RouteFunction { return func(req *restful.Request, res *restful.Response) { w := res.ResponseWriter diff --git a/pkg/registry/generic/etcd/etcd.go b/pkg/registry/generic/etcd/etcd.go index 736bdeed891..b354ad4c160 100644 --- a/pkg/registry/generic/etcd/etcd.go +++ b/pkg/registry/generic/etcd/etcd.go @@ -255,7 +255,7 @@ func (e *Etcd) Update(ctx api.Context, obj runtime.Object) (runtime.Object, bool // TODO: expose TTL creating := false out := e.NewFunc() - err = e.Helper.AtomicUpdate(key, out, true, func(existing runtime.Object) (runtime.Object, uint64, error) { + err = e.Helper.GuaranteedUpdate(key, out, true, func(existing runtime.Object) (runtime.Object, uint64, error) { version, err := e.Helper.Versioner.ObjectResourceVersion(existing) if err != nil { return nil, 0, err diff --git a/pkg/registry/pod/etcd/etcd.go b/pkg/registry/pod/etcd/etcd.go index 990265464f4..90ba9728515 100644 --- a/pkg/registry/pod/etcd/etcd.go +++ b/pkg/registry/pod/etcd/etcd.go @@ -135,7 +135,7 @@ func (r *BindingREST) setPodHostAndAnnotations(ctx api.Context, podID, oldMachin if err != nil { return nil, err } - err = r.store.Helper.AtomicUpdate(podKey, &api.Pod{}, false, func(obj runtime.Object) (runtime.Object, uint64, error) { + err = r.store.Helper.GuaranteedUpdate(podKey, &api.Pod{}, false, func(obj runtime.Object) (runtime.Object, uint64, error) { pod, ok := obj.(*api.Pod) if !ok { return nil, 0, fmt.Errorf("unexpected object: %#v", obj) diff --git a/pkg/tools/etcd_helper.go b/pkg/tools/etcd_helper.go index 124e5dbaa4a..4c5e289d760 100644 --- a/pkg/tools/etcd_helper.go +++ b/pkg/tools/etcd_helper.go @@ -284,31 +284,33 @@ func (h *EtcdHelper) SetObj(key string, obj, out runtime.Object, ttl uint64) err return err } -// Pass an EtcdUpdateFunc to EtcdHelper.AtomicUpdate to make an atomic etcd update. -// See the comment for AtomicUpdate for more detail. +// Pass an EtcdUpdateFunc to EtcdHelper.GuaranteedUpdate to make an etcd update that is guaranteed to succeed. +// See the comment for GuaranteedUpdate for more detail. type EtcdUpdateFunc func(input runtime.Object) (output runtime.Object, ttl uint64, err error) -// AtomicUpdate generalizes the pattern that allows for making atomic updates to etcd objects. -// Note, tryUpdate may be called more than once. +// GuaranteedUpdate calls "tryUpdate()" to update key "key" that is of type "ptrToType". It keeps +// calling tryUpdate() and retrying the update until success if there is etcd index conflict. Note that object +// passed to tryUpdate() may change across invocations of tryUpdate() if other writers are simultaneously +// updating it, so tryUpdate() needs to take into account the current contents of the object when +// deciding how the updated object (that it returns) should look. // // Example: // // h := &util.EtcdHelper{client, encoding, versioning} -// err := h.AtomicUpdate("myKey", &MyType{}, true, func(input runtime.Object) (runtime.Object, uint64, error) { -// // Before this function is called, currentObj has been reset to etcd's current -// // contents for "myKey". +// err := h.GuaranteedUpdate("myKey", &MyType{}, true, func(input runtime.Object) (runtime.Object, uint64, error) { +// // Before each invocation of the user-defined function, "input" is reset to etcd's current contents for "myKey". // -// cur := input.(*MyType) // Guaranteed to work. +// cur := input.(*MyType) // Guaranteed to succeed. // // // Make a *modification*. // cur.Counter++ // // // Return the modified object. Return an error to stop iterating. Return a non-zero uint64 to set -// // the TTL on the object. +// // the TTL on the object. // return cur, 0, nil // }) // -func (h *EtcdHelper) AtomicUpdate(key string, ptrToType runtime.Object, ignoreNotFound bool, tryUpdate EtcdUpdateFunc) error { +func (h *EtcdHelper) GuaranteedUpdate(key string, ptrToType runtime.Object, ignoreNotFound bool, tryUpdate EtcdUpdateFunc) error { v, err := conversion.EnforcePtr(ptrToType) if err != nil { // Panic is appropriate, because this is a programming error. diff --git a/pkg/tools/etcd_helper_test.go b/pkg/tools/etcd_helper_test.go index caaa632defb..2450cede80c 100644 --- a/pkg/tools/etcd_helper_test.go +++ b/pkg/tools/etcd_helper_test.go @@ -503,7 +503,7 @@ func TestSetObjNilOutParam(t *testing.T) { } } -func TestAtomicUpdate(t *testing.T) { +func TestGuaranteedUpdate(t *testing.T) { fakeClient := NewFakeEtcdClient(t) fakeClient.TestIndex = true helper := NewEtcdHelper(fakeClient, codec) @@ -511,7 +511,7 @@ func TestAtomicUpdate(t *testing.T) { // Create a new node. fakeClient.ExpectNotFoundGet("/some/key") obj := &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: 1} - err := helper.AtomicUpdate("/some/key", &TestResource{}, true, func(in runtime.Object) (runtime.Object, uint64, error) { + err := helper.GuaranteedUpdate("/some/key", &TestResource{}, true, func(in runtime.Object) (runtime.Object, uint64, error) { return obj, 0, nil }) if err != nil { @@ -530,7 +530,7 @@ func TestAtomicUpdate(t *testing.T) { // Update an existing node. callbackCalled := false objUpdate := &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: 2} - err = helper.AtomicUpdate("/some/key", &TestResource{}, true, func(in runtime.Object) (runtime.Object, uint64, error) { + err = helper.GuaranteedUpdate("/some/key", &TestResource{}, true, func(in runtime.Object) (runtime.Object, uint64, error) { callbackCalled = true if in.(*TestResource).Value != 1 { @@ -557,7 +557,7 @@ func TestAtomicUpdate(t *testing.T) { } } -func TestAtomicUpdateNoChange(t *testing.T) { +func TestGuaranteedUpdateNoChange(t *testing.T) { fakeClient := NewFakeEtcdClient(t) fakeClient.TestIndex = true helper := NewEtcdHelper(fakeClient, codec) @@ -565,7 +565,7 @@ func TestAtomicUpdateNoChange(t *testing.T) { // Create a new node. fakeClient.ExpectNotFoundGet("/some/key") obj := &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: 1} - err := helper.AtomicUpdate("/some/key", &TestResource{}, true, func(in runtime.Object) (runtime.Object, uint64, error) { + err := helper.GuaranteedUpdate("/some/key", &TestResource{}, true, func(in runtime.Object) (runtime.Object, uint64, error) { return obj, 0, nil }) if err != nil { @@ -575,7 +575,7 @@ func TestAtomicUpdateNoChange(t *testing.T) { // Update an existing node with the same data callbackCalled := false objUpdate := &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: 1} - err = helper.AtomicUpdate("/some/key", &TestResource{}, true, func(in runtime.Object) (runtime.Object, uint64, error) { + err = helper.GuaranteedUpdate("/some/key", &TestResource{}, true, func(in runtime.Object) (runtime.Object, uint64, error) { fakeClient.Err = errors.New("should not be called") callbackCalled = true return objUpdate, 0, nil @@ -588,7 +588,7 @@ func TestAtomicUpdateNoChange(t *testing.T) { } } -func TestAtomicUpdateKeyNotFound(t *testing.T) { +func TestGuaranteedUpdateKeyNotFound(t *testing.T) { fakeClient := NewFakeEtcdClient(t) fakeClient.TestIndex = true helper := NewEtcdHelper(fakeClient, codec) @@ -602,19 +602,19 @@ func TestAtomicUpdateKeyNotFound(t *testing.T) { } ignoreNotFound := false - err := helper.AtomicUpdate("/some/key", &TestResource{}, ignoreNotFound, f) + err := helper.GuaranteedUpdate("/some/key", &TestResource{}, ignoreNotFound, f) if err == nil { t.Errorf("Expected error for key not found.") } ignoreNotFound = true - err = helper.AtomicUpdate("/some/key", &TestResource{}, ignoreNotFound, f) + err = helper.GuaranteedUpdate("/some/key", &TestResource{}, ignoreNotFound, f) if err != nil { t.Errorf("Unexpected error %v.", err) } } -func TestAtomicUpdate_CreateCollision(t *testing.T) { +func TestGuaranteedUpdate_CreateCollision(t *testing.T) { fakeClient := NewFakeEtcdClient(t) fakeClient.TestIndex = true helper := NewEtcdHelper(fakeClient, codec) @@ -633,11 +633,11 @@ func TestAtomicUpdate_CreateCollision(t *testing.T) { defer wgDone.Done() firstCall := true - err := helper.AtomicUpdate("/some/key", &TestResource{}, true, func(in runtime.Object) (runtime.Object, uint64, error) { + err := helper.GuaranteedUpdate("/some/key", &TestResource{}, true, func(in runtime.Object) (runtime.Object, uint64, error) { defer func() { firstCall = false }() if firstCall { - // Force collision by joining all concurrent AtomicUpdate operations here. + // Force collision by joining all concurrent GuaranteedUpdate operations here. wgForceCollision.Done() wgForceCollision.Wait() }