mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-06 18:54:06 +00:00
Merge pull request #6746 from davidopp/master
Clarify comments describing how GuaranteedUpdate() (previously AtomicUpd...
This commit is contained in:
commit
d7a1965fd9
@ -244,7 +244,7 @@ func CreateResource(r rest.Creater, scope RequestScope, typer runtime.ObjectType
|
|||||||
}
|
}
|
||||||
|
|
||||||
// PatchResource returns a function that will handle a resource patch
|
// PatchResource returns a function that will handle a resource patch
|
||||||
// TODO: Eventually PatchResource should just use AtomicUpdate and this routine should be a bit cleaner
|
// TODO: Eventually PatchResource should just use GuaranteedUpdate and this routine should be a bit cleaner
|
||||||
func PatchResource(r rest.Patcher, scope RequestScope, typer runtime.ObjectTyper, admit admission.Interface, converter runtime.ObjectConvertor) restful.RouteFunction {
|
func PatchResource(r rest.Patcher, scope RequestScope, typer runtime.ObjectTyper, admit admission.Interface, converter runtime.ObjectConvertor) restful.RouteFunction {
|
||||||
return func(req *restful.Request, res *restful.Response) {
|
return func(req *restful.Request, res *restful.Response) {
|
||||||
w := res.ResponseWriter
|
w := res.ResponseWriter
|
||||||
|
@ -255,7 +255,7 @@ func (e *Etcd) Update(ctx api.Context, obj runtime.Object) (runtime.Object, bool
|
|||||||
// TODO: expose TTL
|
// TODO: expose TTL
|
||||||
creating := false
|
creating := false
|
||||||
out := e.NewFunc()
|
out := e.NewFunc()
|
||||||
err = e.Helper.AtomicUpdate(key, out, true, func(existing runtime.Object) (runtime.Object, uint64, error) {
|
err = e.Helper.GuaranteedUpdate(key, out, true, func(existing runtime.Object) (runtime.Object, uint64, error) {
|
||||||
version, err := e.Helper.Versioner.ObjectResourceVersion(existing)
|
version, err := e.Helper.Versioner.ObjectResourceVersion(existing)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, 0, err
|
return nil, 0, err
|
||||||
|
@ -135,7 +135,7 @@ func (r *BindingREST) setPodHostAndAnnotations(ctx api.Context, podID, oldMachin
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
err = r.store.Helper.AtomicUpdate(podKey, &api.Pod{}, false, func(obj runtime.Object) (runtime.Object, uint64, error) {
|
err = r.store.Helper.GuaranteedUpdate(podKey, &api.Pod{}, false, func(obj runtime.Object) (runtime.Object, uint64, error) {
|
||||||
pod, ok := obj.(*api.Pod)
|
pod, ok := obj.(*api.Pod)
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, 0, fmt.Errorf("unexpected object: %#v", obj)
|
return nil, 0, fmt.Errorf("unexpected object: %#v", obj)
|
||||||
|
@ -284,31 +284,33 @@ func (h *EtcdHelper) SetObj(key string, obj, out runtime.Object, ttl uint64) err
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Pass an EtcdUpdateFunc to EtcdHelper.AtomicUpdate to make an atomic etcd update.
|
// Pass an EtcdUpdateFunc to EtcdHelper.GuaranteedUpdate to make an etcd update that is guaranteed to succeed.
|
||||||
// See the comment for AtomicUpdate for more detail.
|
// See the comment for GuaranteedUpdate for more detail.
|
||||||
type EtcdUpdateFunc func(input runtime.Object) (output runtime.Object, ttl uint64, err error)
|
type EtcdUpdateFunc func(input runtime.Object) (output runtime.Object, ttl uint64, err error)
|
||||||
|
|
||||||
// AtomicUpdate generalizes the pattern that allows for making atomic updates to etcd objects.
|
// GuaranteedUpdate calls "tryUpdate()" to update key "key" that is of type "ptrToType". It keeps
|
||||||
// Note, tryUpdate may be called more than once.
|
// calling tryUpdate() and retrying the update until success if there is etcd index conflict. Note that object
|
||||||
|
// passed to tryUpdate() may change across invocations of tryUpdate() if other writers are simultaneously
|
||||||
|
// updating it, so tryUpdate() needs to take into account the current contents of the object when
|
||||||
|
// deciding how the updated object (that it returns) should look.
|
||||||
//
|
//
|
||||||
// Example:
|
// Example:
|
||||||
//
|
//
|
||||||
// h := &util.EtcdHelper{client, encoding, versioning}
|
// h := &util.EtcdHelper{client, encoding, versioning}
|
||||||
// err := h.AtomicUpdate("myKey", &MyType{}, true, func(input runtime.Object) (runtime.Object, uint64, error) {
|
// err := h.GuaranteedUpdate("myKey", &MyType{}, true, func(input runtime.Object) (runtime.Object, uint64, error) {
|
||||||
// // Before this function is called, currentObj has been reset to etcd's current
|
// // Before each invocation of the user-defined function, "input" is reset to etcd's current contents for "myKey".
|
||||||
// // contents for "myKey".
|
|
||||||
//
|
//
|
||||||
// cur := input.(*MyType) // Guaranteed to work.
|
// cur := input.(*MyType) // Guaranteed to succeed.
|
||||||
//
|
//
|
||||||
// // Make a *modification*.
|
// // Make a *modification*.
|
||||||
// cur.Counter++
|
// cur.Counter++
|
||||||
//
|
//
|
||||||
// // Return the modified object. Return an error to stop iterating. Return a non-zero uint64 to set
|
// // Return the modified object. Return an error to stop iterating. Return a non-zero uint64 to set
|
||||||
// // the TTL on the object.
|
// // the TTL on the object.
|
||||||
// return cur, 0, nil
|
// return cur, 0, nil
|
||||||
// })
|
// })
|
||||||
//
|
//
|
||||||
func (h *EtcdHelper) AtomicUpdate(key string, ptrToType runtime.Object, ignoreNotFound bool, tryUpdate EtcdUpdateFunc) error {
|
func (h *EtcdHelper) GuaranteedUpdate(key string, ptrToType runtime.Object, ignoreNotFound bool, tryUpdate EtcdUpdateFunc) error {
|
||||||
v, err := conversion.EnforcePtr(ptrToType)
|
v, err := conversion.EnforcePtr(ptrToType)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// Panic is appropriate, because this is a programming error.
|
// Panic is appropriate, because this is a programming error.
|
||||||
|
@ -503,7 +503,7 @@ func TestSetObjNilOutParam(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestAtomicUpdate(t *testing.T) {
|
func TestGuaranteedUpdate(t *testing.T) {
|
||||||
fakeClient := NewFakeEtcdClient(t)
|
fakeClient := NewFakeEtcdClient(t)
|
||||||
fakeClient.TestIndex = true
|
fakeClient.TestIndex = true
|
||||||
helper := NewEtcdHelper(fakeClient, codec)
|
helper := NewEtcdHelper(fakeClient, codec)
|
||||||
@ -511,7 +511,7 @@ func TestAtomicUpdate(t *testing.T) {
|
|||||||
// Create a new node.
|
// Create a new node.
|
||||||
fakeClient.ExpectNotFoundGet("/some/key")
|
fakeClient.ExpectNotFoundGet("/some/key")
|
||||||
obj := &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: 1}
|
obj := &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: 1}
|
||||||
err := helper.AtomicUpdate("/some/key", &TestResource{}, true, func(in runtime.Object) (runtime.Object, uint64, error) {
|
err := helper.GuaranteedUpdate("/some/key", &TestResource{}, true, func(in runtime.Object) (runtime.Object, uint64, error) {
|
||||||
return obj, 0, nil
|
return obj, 0, nil
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -530,7 +530,7 @@ func TestAtomicUpdate(t *testing.T) {
|
|||||||
// Update an existing node.
|
// Update an existing node.
|
||||||
callbackCalled := false
|
callbackCalled := false
|
||||||
objUpdate := &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: 2}
|
objUpdate := &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: 2}
|
||||||
err = helper.AtomicUpdate("/some/key", &TestResource{}, true, func(in runtime.Object) (runtime.Object, uint64, error) {
|
err = helper.GuaranteedUpdate("/some/key", &TestResource{}, true, func(in runtime.Object) (runtime.Object, uint64, error) {
|
||||||
callbackCalled = true
|
callbackCalled = true
|
||||||
|
|
||||||
if in.(*TestResource).Value != 1 {
|
if in.(*TestResource).Value != 1 {
|
||||||
@ -557,7 +557,7 @@ func TestAtomicUpdate(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestAtomicUpdateNoChange(t *testing.T) {
|
func TestGuaranteedUpdateNoChange(t *testing.T) {
|
||||||
fakeClient := NewFakeEtcdClient(t)
|
fakeClient := NewFakeEtcdClient(t)
|
||||||
fakeClient.TestIndex = true
|
fakeClient.TestIndex = true
|
||||||
helper := NewEtcdHelper(fakeClient, codec)
|
helper := NewEtcdHelper(fakeClient, codec)
|
||||||
@ -565,7 +565,7 @@ func TestAtomicUpdateNoChange(t *testing.T) {
|
|||||||
// Create a new node.
|
// Create a new node.
|
||||||
fakeClient.ExpectNotFoundGet("/some/key")
|
fakeClient.ExpectNotFoundGet("/some/key")
|
||||||
obj := &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: 1}
|
obj := &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: 1}
|
||||||
err := helper.AtomicUpdate("/some/key", &TestResource{}, true, func(in runtime.Object) (runtime.Object, uint64, error) {
|
err := helper.GuaranteedUpdate("/some/key", &TestResource{}, true, func(in runtime.Object) (runtime.Object, uint64, error) {
|
||||||
return obj, 0, nil
|
return obj, 0, nil
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -575,7 +575,7 @@ func TestAtomicUpdateNoChange(t *testing.T) {
|
|||||||
// Update an existing node with the same data
|
// Update an existing node with the same data
|
||||||
callbackCalled := false
|
callbackCalled := false
|
||||||
objUpdate := &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: 1}
|
objUpdate := &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: 1}
|
||||||
err = helper.AtomicUpdate("/some/key", &TestResource{}, true, func(in runtime.Object) (runtime.Object, uint64, error) {
|
err = helper.GuaranteedUpdate("/some/key", &TestResource{}, true, func(in runtime.Object) (runtime.Object, uint64, error) {
|
||||||
fakeClient.Err = errors.New("should not be called")
|
fakeClient.Err = errors.New("should not be called")
|
||||||
callbackCalled = true
|
callbackCalled = true
|
||||||
return objUpdate, 0, nil
|
return objUpdate, 0, nil
|
||||||
@ -588,7 +588,7 @@ func TestAtomicUpdateNoChange(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestAtomicUpdateKeyNotFound(t *testing.T) {
|
func TestGuaranteedUpdateKeyNotFound(t *testing.T) {
|
||||||
fakeClient := NewFakeEtcdClient(t)
|
fakeClient := NewFakeEtcdClient(t)
|
||||||
fakeClient.TestIndex = true
|
fakeClient.TestIndex = true
|
||||||
helper := NewEtcdHelper(fakeClient, codec)
|
helper := NewEtcdHelper(fakeClient, codec)
|
||||||
@ -602,19 +602,19 @@ func TestAtomicUpdateKeyNotFound(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
ignoreNotFound := false
|
ignoreNotFound := false
|
||||||
err := helper.AtomicUpdate("/some/key", &TestResource{}, ignoreNotFound, f)
|
err := helper.GuaranteedUpdate("/some/key", &TestResource{}, ignoreNotFound, f)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
t.Errorf("Expected error for key not found.")
|
t.Errorf("Expected error for key not found.")
|
||||||
}
|
}
|
||||||
|
|
||||||
ignoreNotFound = true
|
ignoreNotFound = true
|
||||||
err = helper.AtomicUpdate("/some/key", &TestResource{}, ignoreNotFound, f)
|
err = helper.GuaranteedUpdate("/some/key", &TestResource{}, ignoreNotFound, f)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("Unexpected error %v.", err)
|
t.Errorf("Unexpected error %v.", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestAtomicUpdate_CreateCollision(t *testing.T) {
|
func TestGuaranteedUpdate_CreateCollision(t *testing.T) {
|
||||||
fakeClient := NewFakeEtcdClient(t)
|
fakeClient := NewFakeEtcdClient(t)
|
||||||
fakeClient.TestIndex = true
|
fakeClient.TestIndex = true
|
||||||
helper := NewEtcdHelper(fakeClient, codec)
|
helper := NewEtcdHelper(fakeClient, codec)
|
||||||
@ -633,11 +633,11 @@ func TestAtomicUpdate_CreateCollision(t *testing.T) {
|
|||||||
defer wgDone.Done()
|
defer wgDone.Done()
|
||||||
|
|
||||||
firstCall := true
|
firstCall := true
|
||||||
err := helper.AtomicUpdate("/some/key", &TestResource{}, true, func(in runtime.Object) (runtime.Object, uint64, error) {
|
err := helper.GuaranteedUpdate("/some/key", &TestResource{}, true, func(in runtime.Object) (runtime.Object, uint64, error) {
|
||||||
defer func() { firstCall = false }()
|
defer func() { firstCall = false }()
|
||||||
|
|
||||||
if firstCall {
|
if firstCall {
|
||||||
// Force collision by joining all concurrent AtomicUpdate operations here.
|
// Force collision by joining all concurrent GuaranteedUpdate operations here.
|
||||||
wgForceCollision.Done()
|
wgForceCollision.Done()
|
||||||
wgForceCollision.Wait()
|
wgForceCollision.Wait()
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user