mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-02 00:07:50 +00:00
TTL is not preserved automatically during edit
Change the signature of GuaranteedUpdate so that TTL can be more easily preserved. Allow a simpler (no ttl) and more complex (response and node directly available, set ttl) path for GuaranteedUpdate. Add some tests to ensure this doesn't blow up again.
This commit is contained in:
parent
d0daabb34b
commit
2b8e918ed9
@ -44,7 +44,7 @@ func NewEtcdRegistry(h tools.EtcdHelper, ttl uint64) generic.Registry {
|
|||||||
KeyFunc: func(ctx api.Context, id string) (string, error) {
|
KeyFunc: func(ctx api.Context, id string) (string, error) {
|
||||||
return etcdgeneric.NamespaceKeyFunc(ctx, prefix, id)
|
return etcdgeneric.NamespaceKeyFunc(ctx, prefix, id)
|
||||||
},
|
},
|
||||||
TTLFunc: func(runtime.Object, bool) (uint64, error) {
|
TTLFunc: func(runtime.Object, uint64, bool) (uint64, error) {
|
||||||
return ttl, nil
|
return ttl, nil
|
||||||
},
|
},
|
||||||
Helper: h,
|
Helper: h,
|
||||||
|
@ -72,8 +72,9 @@ type Etcd struct {
|
|||||||
ObjectNameFunc func(obj runtime.Object) (string, error)
|
ObjectNameFunc func(obj runtime.Object) (string, error)
|
||||||
|
|
||||||
// Return the TTL objects should be persisted with. Update is true if this
|
// Return the TTL objects should be persisted with. Update is true if this
|
||||||
// is an operation against an existing object.
|
// is an operation against an existing object. Existing is the current TTL
|
||||||
TTLFunc func(obj runtime.Object, update bool) (uint64, error)
|
// or the default for this operation.
|
||||||
|
TTLFunc func(obj runtime.Object, existing uint64, update bool) (uint64, error)
|
||||||
|
|
||||||
// Returns a matcher corresponding to the provided labels and fields.
|
// Returns a matcher corresponding to the provided labels and fields.
|
||||||
PredicateFunc func(label labels.Selector, field fields.Selector) generic.Matcher
|
PredicateFunc func(label labels.Selector, field fields.Selector) generic.Matcher
|
||||||
@ -184,12 +185,9 @@ func (e *Etcd) CreateWithName(ctx api.Context, name string, obj runtime.Object)
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
ttl := uint64(0)
|
ttl, err := e.calculateTTL(obj, 0, false)
|
||||||
if e.TTLFunc != nil {
|
if err != nil {
|
||||||
ttl, err = e.TTLFunc(obj, false)
|
return err
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
err = e.Helper.CreateObj(key, obj, nil, ttl)
|
err = e.Helper.CreateObj(key, obj, nil, ttl)
|
||||||
err = etcderr.InterpretCreateError(err, e.EndpointName, name)
|
err = etcderr.InterpretCreateError(err, e.EndpointName, name)
|
||||||
@ -214,12 +212,9 @@ func (e *Etcd) Create(ctx api.Context, obj runtime.Object) (runtime.Object, erro
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
ttl := uint64(0)
|
ttl, err := e.calculateTTL(obj, 0, false)
|
||||||
if e.TTLFunc != nil {
|
if err != nil {
|
||||||
ttl, err = e.TTLFunc(obj, false)
|
return nil, err
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
trace.Step("About to create object")
|
trace.Step("About to create object")
|
||||||
out := e.NewFunc()
|
out := e.NewFunc()
|
||||||
@ -249,12 +244,9 @@ func (e *Etcd) UpdateWithName(ctx api.Context, name string, obj runtime.Object)
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
ttl := uint64(0)
|
ttl, err := e.calculateTTL(obj, 0, true)
|
||||||
if e.TTLFunc != nil {
|
if err != nil {
|
||||||
ttl, err = e.TTLFunc(obj, true)
|
return err
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
err = e.Helper.SetObj(key, obj, nil, ttl)
|
err = e.Helper.SetObj(key, obj, nil, ttl)
|
||||||
err = etcderr.InterpretUpdateError(err, e.EndpointName, name)
|
err = etcderr.InterpretUpdateError(err, e.EndpointName, name)
|
||||||
@ -281,49 +273,46 @@ func (e *Etcd) Update(ctx api.Context, obj runtime.Object) (runtime.Object, bool
|
|||||||
// TODO: expose TTL
|
// TODO: expose TTL
|
||||||
creating := false
|
creating := false
|
||||||
out := e.NewFunc()
|
out := e.NewFunc()
|
||||||
err = e.Helper.GuaranteedUpdate(key, out, true, func(existing runtime.Object) (runtime.Object, uint64, error) {
|
err = e.Helper.GuaranteedUpdate(key, out, true, func(existing runtime.Object, res tools.ResponseMeta) (runtime.Object, *uint64, error) {
|
||||||
version, err := e.Helper.Versioner.ObjectResourceVersion(existing)
|
version, err := e.Helper.Versioner.ObjectResourceVersion(existing)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, 0, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
if version == 0 {
|
if version == 0 {
|
||||||
if !e.UpdateStrategy.AllowCreateOnUpdate() {
|
if !e.UpdateStrategy.AllowCreateOnUpdate() {
|
||||||
return nil, 0, kubeerr.NewNotFound(e.EndpointName, name)
|
return nil, nil, kubeerr.NewNotFound(e.EndpointName, name)
|
||||||
}
|
}
|
||||||
creating = true
|
creating = true
|
||||||
if err := rest.BeforeCreate(e.CreateStrategy, ctx, obj); err != nil {
|
if err := rest.BeforeCreate(e.CreateStrategy, ctx, obj); err != nil {
|
||||||
return nil, 0, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
ttl := uint64(0)
|
ttl, err := e.calculateTTL(obj, 0, false)
|
||||||
if e.TTLFunc != nil {
|
if err != nil {
|
||||||
ttl, err = e.TTLFunc(obj, false)
|
return nil, nil, err
|
||||||
if err != nil {
|
|
||||||
return nil, 0, err
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return obj, ttl, nil
|
return obj, &ttl, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
creating = false
|
creating = false
|
||||||
newVersion, err := e.Helper.Versioner.ObjectResourceVersion(obj)
|
newVersion, err := e.Helper.Versioner.ObjectResourceVersion(obj)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, 0, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
if newVersion != version {
|
if newVersion != version {
|
||||||
// TODO: return the most recent version to a client?
|
// TODO: return the most recent version to a client?
|
||||||
return nil, 0, kubeerr.NewConflict(e.EndpointName, name, fmt.Errorf("the resource was updated to %d", version))
|
return nil, nil, kubeerr.NewConflict(e.EndpointName, name, fmt.Errorf("the resource was updated to %d", version))
|
||||||
}
|
}
|
||||||
if err := rest.BeforeUpdate(e.UpdateStrategy, ctx, obj, existing); err != nil {
|
if err := rest.BeforeUpdate(e.UpdateStrategy, ctx, obj, existing); err != nil {
|
||||||
return nil, 0, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
ttl := uint64(0)
|
ttl, err := e.calculateTTL(obj, res.TTL, true)
|
||||||
if e.TTLFunc != nil {
|
if err != nil {
|
||||||
ttl, err = e.TTLFunc(obj, true)
|
return nil, nil, err
|
||||||
if err != nil {
|
|
||||||
return nil, 0, err
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return obj, ttl, nil
|
if int64(ttl) != res.TTL {
|
||||||
|
return obj, &ttl, nil
|
||||||
|
}
|
||||||
|
return obj, nil, nil
|
||||||
})
|
})
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -479,3 +468,19 @@ func (e *Etcd) WatchPredicate(ctx api.Context, m generic.Matcher, resourceVersio
|
|||||||
|
|
||||||
return e.Helper.WatchList(e.KeyRootFunc(ctx), version, filterFunc)
|
return e.Helper.WatchList(e.KeyRootFunc(ctx), version, filterFunc)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// calculateTTL is a helper for retrieving the updated TTL for an object or returning an error
|
||||||
|
// if the TTL cannot be calculated. The defaultTTL is changed to 1 if less than zero. Zero means
|
||||||
|
// no TTL, not expire immediately.
|
||||||
|
func (e *Etcd) calculateTTL(obj runtime.Object, defaultTTL int64, update bool) (ttl uint64, err error) {
|
||||||
|
// etcd may return a negative TTL for a node if the expiration has not occured due
|
||||||
|
// to server lag - we will ensure that the value is at least set.
|
||||||
|
if defaultTTL < 0 {
|
||||||
|
defaultTTL = 1
|
||||||
|
}
|
||||||
|
ttl = uint64(defaultTTL)
|
||||||
|
if e.TTLFunc != nil {
|
||||||
|
ttl, err = e.TTLFunc(obj, ttl, update)
|
||||||
|
}
|
||||||
|
return ttl, err
|
||||||
|
}
|
||||||
|
@ -142,13 +142,16 @@ func (r *BindingREST) setPodHostAndAnnotations(ctx api.Context, podID, oldMachin
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
err = r.store.Helper.GuaranteedUpdate(podKey, &api.Pod{}, false, func(obj runtime.Object) (runtime.Object, uint64, error) {
|
err = r.store.Helper.GuaranteedUpdate(podKey, &api.Pod{}, false, tools.SimpleUpdate(func(obj runtime.Object) (runtime.Object, error) {
|
||||||
pod, ok := obj.(*api.Pod)
|
pod, ok := obj.(*api.Pod)
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, 0, fmt.Errorf("unexpected object: %#v", obj)
|
return nil, fmt.Errorf("unexpected object: %#v", obj)
|
||||||
|
}
|
||||||
|
if pod.DeletionTimestamp != nil {
|
||||||
|
return nil, fmt.Errorf("pod %s is being deleted, cannot be assigned to a host", pod.Name)
|
||||||
}
|
}
|
||||||
if pod.Spec.NodeName != oldMachine {
|
if pod.Spec.NodeName != oldMachine {
|
||||||
return nil, 0, fmt.Errorf("pod %v is already assigned to node %q", pod.Name, pod.Spec.NodeName)
|
return nil, fmt.Errorf("pod %v is already assigned to node %q", pod.Name, pod.Spec.NodeName)
|
||||||
}
|
}
|
||||||
pod.Spec.NodeName = machine
|
pod.Spec.NodeName = machine
|
||||||
if pod.Annotations == nil {
|
if pod.Annotations == nil {
|
||||||
@ -158,8 +161,8 @@ func (r *BindingREST) setPodHostAndAnnotations(ctx api.Context, podID, oldMachin
|
|||||||
pod.Annotations[k] = v
|
pod.Annotations[k] = v
|
||||||
}
|
}
|
||||||
finalPod = pod
|
finalPod = pod
|
||||||
return pod, 0, nil
|
return pod, nil
|
||||||
})
|
}))
|
||||||
return finalPod, err
|
return finalPod, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -141,25 +141,25 @@ func (e *Etcd) Release(item int) error {
|
|||||||
// tryUpdate performs a read-update to persist the latest snapshot state of allocation.
|
// tryUpdate performs a read-update to persist the latest snapshot state of allocation.
|
||||||
func (e *Etcd) tryUpdate(fn func() error) error {
|
func (e *Etcd) tryUpdate(fn func() error) error {
|
||||||
err := e.helper.GuaranteedUpdate(e.baseKey, &api.RangeAllocation{}, true,
|
err := e.helper.GuaranteedUpdate(e.baseKey, &api.RangeAllocation{}, true,
|
||||||
func(input runtime.Object) (output runtime.Object, ttl uint64, err error) {
|
tools.SimpleUpdate(func(input runtime.Object) (output runtime.Object, err error) {
|
||||||
existing := input.(*api.RangeAllocation)
|
existing := input.(*api.RangeAllocation)
|
||||||
if len(existing.ResourceVersion) == 0 {
|
if len(existing.ResourceVersion) == 0 {
|
||||||
return nil, 0, fmt.Errorf("cannot allocate resources of type %s at this time", e.kind)
|
return nil, fmt.Errorf("cannot allocate resources of type %s at this time", e.kind)
|
||||||
}
|
}
|
||||||
if existing.ResourceVersion != e.last {
|
if existing.ResourceVersion != e.last {
|
||||||
if err := e.alloc.Restore(existing.Range, existing.Data); err != nil {
|
if err := e.alloc.Restore(existing.Range, existing.Data); err != nil {
|
||||||
return nil, 0, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if err := fn(); err != nil {
|
if err := fn(); err != nil {
|
||||||
return nil, 0, err
|
return nil, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
e.last = existing.ResourceVersion
|
e.last = existing.ResourceVersion
|
||||||
rangeSpec, data := e.alloc.Snapshot()
|
rangeSpec, data := e.alloc.Snapshot()
|
||||||
existing.Range = rangeSpec
|
existing.Range = rangeSpec
|
||||||
existing.Data = data
|
existing.Data = data
|
||||||
return existing, 0, nil
|
return existing, nil
|
||||||
},
|
}),
|
||||||
)
|
)
|
||||||
return etcderr.InterpretUpdateError(err, e.kind, "")
|
return etcderr.InterpretUpdateError(err, e.kind, "")
|
||||||
}
|
}
|
||||||
@ -198,19 +198,19 @@ func (e *Etcd) CreateOrUpdate(snapshot *api.RangeAllocation) error {
|
|||||||
|
|
||||||
last := ""
|
last := ""
|
||||||
err := e.helper.GuaranteedUpdate(e.baseKey, &api.RangeAllocation{}, true,
|
err := e.helper.GuaranteedUpdate(e.baseKey, &api.RangeAllocation{}, true,
|
||||||
func(input runtime.Object) (output runtime.Object, ttl uint64, err error) {
|
tools.SimpleUpdate(func(input runtime.Object) (output runtime.Object, err error) {
|
||||||
existing := input.(*api.RangeAllocation)
|
existing := input.(*api.RangeAllocation)
|
||||||
switch {
|
switch {
|
||||||
case len(snapshot.ResourceVersion) != 0 && len(existing.ResourceVersion) != 0:
|
case len(snapshot.ResourceVersion) != 0 && len(existing.ResourceVersion) != 0:
|
||||||
if snapshot.ResourceVersion != existing.ResourceVersion {
|
if snapshot.ResourceVersion != existing.ResourceVersion {
|
||||||
return nil, 0, k8serr.NewConflict(e.kind, "", fmt.Errorf("the provided resource version does not match"))
|
return nil, k8serr.NewConflict(e.kind, "", fmt.Errorf("the provided resource version does not match"))
|
||||||
}
|
}
|
||||||
case len(existing.ResourceVersion) != 0:
|
case len(existing.ResourceVersion) != 0:
|
||||||
return nil, 0, k8serr.NewConflict(e.kind, "", fmt.Errorf("another caller has already initialized the resource"))
|
return nil, k8serr.NewConflict(e.kind, "", fmt.Errorf("another caller has already initialized the resource"))
|
||||||
}
|
}
|
||||||
last = snapshot.ResourceVersion
|
last = snapshot.ResourceVersion
|
||||||
return snapshot, 0, nil
|
return snapshot, nil
|
||||||
},
|
}),
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return etcderr.InterpretUpdateError(err, e.kind, "")
|
return etcderr.InterpretUpdateError(err, e.kind, "")
|
||||||
|
@ -339,23 +339,25 @@ func (h *EtcdHelper) ExtractObjToList(key string, listObj runtime.Object) error
|
|||||||
// empty responses and nil response nodes exactly like a not found error.
|
// empty responses and nil response nodes exactly like a not found error.
|
||||||
func (h *EtcdHelper) ExtractObj(key string, objPtr runtime.Object, ignoreNotFound bool) error {
|
func (h *EtcdHelper) ExtractObj(key string, objPtr runtime.Object, ignoreNotFound bool) error {
|
||||||
key = h.PrefixEtcdKey(key)
|
key = h.PrefixEtcdKey(key)
|
||||||
_, _, err := h.bodyAndExtractObj(key, objPtr, ignoreNotFound)
|
_, _, _, err := h.bodyAndExtractObj(key, objPtr, ignoreNotFound)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *EtcdHelper) bodyAndExtractObj(key string, objPtr runtime.Object, ignoreNotFound bool) (body string, modifiedIndex uint64, err error) {
|
// bodyAndExtractObj performs the normal Get path to etcd, returning the parsed node and response for additional information
|
||||||
|
// about the response, like the current etcd index and the ttl.
|
||||||
|
func (h *EtcdHelper) bodyAndExtractObj(key string, objPtr runtime.Object, ignoreNotFound bool) (body string, node *etcd.Node, res *etcd.Response, err error) {
|
||||||
startTime := time.Now()
|
startTime := time.Now()
|
||||||
response, err := h.Client.Get(key, false, false)
|
response, err := h.Client.Get(key, false, false)
|
||||||
recordEtcdRequestLatency("get", getTypeName(objPtr), startTime)
|
recordEtcdRequestLatency("get", getTypeName(objPtr), startTime)
|
||||||
|
|
||||||
if err != nil && !IsEtcdNotFound(err) {
|
if err != nil && !IsEtcdNotFound(err) {
|
||||||
return "", 0, err
|
return "", nil, nil, err
|
||||||
}
|
}
|
||||||
return h.extractObj(response, err, objPtr, ignoreNotFound, false)
|
body, node, err = h.extractObj(response, err, objPtr, ignoreNotFound, false)
|
||||||
|
return body, node, response, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *EtcdHelper) extractObj(response *etcd.Response, inErr error, objPtr runtime.Object, ignoreNotFound, prevNode bool) (body string, modifiedIndex uint64, err error) {
|
func (h *EtcdHelper) extractObj(response *etcd.Response, inErr error, objPtr runtime.Object, ignoreNotFound, prevNode bool) (body string, node *etcd.Node, err error) {
|
||||||
var node *etcd.Node
|
|
||||||
if response != nil {
|
if response != nil {
|
||||||
if prevNode {
|
if prevNode {
|
||||||
node = response.PrevNode
|
node = response.PrevNode
|
||||||
@ -367,14 +369,14 @@ func (h *EtcdHelper) extractObj(response *etcd.Response, inErr error, objPtr run
|
|||||||
if ignoreNotFound {
|
if ignoreNotFound {
|
||||||
v, err := conversion.EnforcePtr(objPtr)
|
v, err := conversion.EnforcePtr(objPtr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", 0, err
|
return "", nil, err
|
||||||
}
|
}
|
||||||
v.Set(reflect.Zero(v.Type()))
|
v.Set(reflect.Zero(v.Type()))
|
||||||
return "", 0, nil
|
return "", nil, nil
|
||||||
} else if inErr != nil {
|
} else if inErr != nil {
|
||||||
return "", 0, inErr
|
return "", nil, inErr
|
||||||
}
|
}
|
||||||
return "", 0, fmt.Errorf("unable to locate a value on the response: %#v", response)
|
return "", nil, fmt.Errorf("unable to locate a value on the response: %#v", response)
|
||||||
}
|
}
|
||||||
body = node.Value
|
body = node.Value
|
||||||
err = h.Codec.DecodeInto([]byte(body), objPtr)
|
err = h.Codec.DecodeInto([]byte(body), objPtr)
|
||||||
@ -382,7 +384,7 @@ func (h *EtcdHelper) extractObj(response *etcd.Response, inErr error, objPtr run
|
|||||||
_ = h.Versioner.UpdateObject(objPtr, node)
|
_ = h.Versioner.UpdateObject(objPtr, node)
|
||||||
// being unable to set the version does not prevent the object from being extracted
|
// being unable to set the version does not prevent the object from being extracted
|
||||||
}
|
}
|
||||||
return body, node.ModifiedIndex, err
|
return body, node, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// CreateObj adds a new object at a key unless it already exists. 'ttl' is time-to-live in seconds,
|
// CreateObj adds a new object at a key unless it already exists. 'ttl' is time-to-live in seconds,
|
||||||
@ -486,9 +488,28 @@ func (h *EtcdHelper) SetObj(key string, obj, out runtime.Object, ttl uint64) err
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ResponseMeta contains information about the etcd metadata that is associated with
|
||||||
|
// an object. It abstracts the actual underlying objects to prevent coupling with etcd
|
||||||
|
// and to improve testability.
|
||||||
|
type ResponseMeta struct {
|
||||||
|
// TTL is the time to live of the node that contained the returned object. It may be
|
||||||
|
// zero or negative in some cases (objects may be expired after the requested
|
||||||
|
// expiration time due to server lag).
|
||||||
|
TTL int64
|
||||||
|
}
|
||||||
|
|
||||||
// Pass an EtcdUpdateFunc to EtcdHelper.GuaranteedUpdate to make an etcd update that is guaranteed to succeed.
|
// Pass an EtcdUpdateFunc to EtcdHelper.GuaranteedUpdate to make an etcd update that is guaranteed to succeed.
|
||||||
// See the comment for GuaranteedUpdate for more detail.
|
// See the comment for GuaranteedUpdate for more detail.
|
||||||
type EtcdUpdateFunc func(input runtime.Object) (output runtime.Object, ttl uint64, err error)
|
type EtcdUpdateFunc func(input runtime.Object, res ResponseMeta) (output runtime.Object, ttl *uint64, err error)
|
||||||
|
type SimpleEtcdUpdateFunc func(runtime.Object) (runtime.Object, error)
|
||||||
|
|
||||||
|
// SimpleUpdateFunc converts SimpleEtcdUpdateFunc into EtcdUpdateFunc
|
||||||
|
func SimpleUpdate(fn SimpleEtcdUpdateFunc) EtcdUpdateFunc {
|
||||||
|
return func(input runtime.Object, _ ResponseMeta) (runtime.Object, *uint64, error) {
|
||||||
|
out, err := fn(input)
|
||||||
|
return out, nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// GuaranteedUpdate calls "tryUpdate()" to update key "key" that is of type "ptrToType". It keeps
|
// GuaranteedUpdate calls "tryUpdate()" to update key "key" that is of type "ptrToType". It keeps
|
||||||
// calling tryUpdate() and retrying the update until success if there is etcd index conflict. Note that object
|
// calling tryUpdate() and retrying the update until success if there is etcd index conflict. Note that object
|
||||||
@ -499,7 +520,7 @@ type EtcdUpdateFunc func(input runtime.Object) (output runtime.Object, ttl uint6
|
|||||||
// Example:
|
// Example:
|
||||||
//
|
//
|
||||||
// h := &util.EtcdHelper{client, encoding, versioning}
|
// h := &util.EtcdHelper{client, encoding, versioning}
|
||||||
// err := h.GuaranteedUpdate("myKey", &MyType{}, true, func(input runtime.Object) (runtime.Object, uint64, error) {
|
// err := h.GuaranteedUpdate("myKey", &MyType{}, true, func(input runtime.Object, res ResponseMeta) (runtime.Object, *uint64, error) {
|
||||||
// // Before each invocation of the user-defined function, "input" is reset to etcd's current contents for "myKey".
|
// // Before each invocation of the user-defined function, "input" is reset to etcd's current contents for "myKey".
|
||||||
//
|
//
|
||||||
// cur := input.(*MyType) // Guaranteed to succeed.
|
// cur := input.(*MyType) // Guaranteed to succeed.
|
||||||
@ -507,9 +528,9 @@ type EtcdUpdateFunc func(input runtime.Object) (output runtime.Object, ttl uint6
|
|||||||
// // Make a *modification*.
|
// // Make a *modification*.
|
||||||
// cur.Counter++
|
// cur.Counter++
|
||||||
//
|
//
|
||||||
// // Return the modified object. Return an error to stop iterating. Return a non-zero uint64 to set
|
// // Return the modified object. Return an error to stop iterating. Return a uint64 to alter
|
||||||
// // the TTL on the object.
|
// // the TTL on the object, or nil to keep it the same value.
|
||||||
// return cur, 0, nil
|
// return cur, nil, nil
|
||||||
// })
|
// })
|
||||||
//
|
//
|
||||||
func (h *EtcdHelper) GuaranteedUpdate(key string, ptrToType runtime.Object, ignoreNotFound bool, tryUpdate EtcdUpdateFunc) error {
|
func (h *EtcdHelper) GuaranteedUpdate(key string, ptrToType runtime.Object, ignoreNotFound bool, tryUpdate EtcdUpdateFunc) error {
|
||||||
@ -521,14 +542,33 @@ func (h *EtcdHelper) GuaranteedUpdate(key string, ptrToType runtime.Object, igno
|
|||||||
key = h.PrefixEtcdKey(key)
|
key = h.PrefixEtcdKey(key)
|
||||||
for {
|
for {
|
||||||
obj := reflect.New(v.Type()).Interface().(runtime.Object)
|
obj := reflect.New(v.Type()).Interface().(runtime.Object)
|
||||||
origBody, index, err := h.bodyAndExtractObj(key, obj, ignoreNotFound)
|
origBody, node, res, err := h.bodyAndExtractObj(key, obj, ignoreNotFound)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
meta := ResponseMeta{}
|
||||||
|
if node != nil {
|
||||||
|
meta.TTL = node.TTL
|
||||||
|
}
|
||||||
|
|
||||||
|
ret, newTTL, err := tryUpdate(obj, meta)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
ret, ttl, err := tryUpdate(obj)
|
index := uint64(0)
|
||||||
if err != nil {
|
ttl := uint64(0)
|
||||||
return err
|
if node != nil {
|
||||||
|
index = node.ModifiedIndex
|
||||||
|
if node.TTL > 0 {
|
||||||
|
ttl = uint64(node.TTL)
|
||||||
|
}
|
||||||
|
} else if res != nil {
|
||||||
|
index = res.EtcdIndex
|
||||||
|
}
|
||||||
|
|
||||||
|
if newTTL != nil {
|
||||||
|
ttl = *newTTL
|
||||||
}
|
}
|
||||||
|
|
||||||
data, err := h.Codec.Encode(ret)
|
data, err := h.Codec.Encode(ret)
|
||||||
|
@ -529,9 +529,9 @@ func TestGuaranteedUpdate(t *testing.T) {
|
|||||||
// Create a new node.
|
// Create a new node.
|
||||||
fakeClient.ExpectNotFoundGet(key)
|
fakeClient.ExpectNotFoundGet(key)
|
||||||
obj := &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: 1}
|
obj := &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: 1}
|
||||||
err := helper.GuaranteedUpdate("/some/key", &TestResource{}, true, func(in runtime.Object) (runtime.Object, uint64, error) {
|
err := helper.GuaranteedUpdate("/some/key", &TestResource{}, true, SimpleUpdate(func(in runtime.Object) (runtime.Object, error) {
|
||||||
return obj, 0, nil
|
return obj, nil
|
||||||
})
|
}))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("Unexpected error %#v", err)
|
t.Errorf("Unexpected error %#v", err)
|
||||||
}
|
}
|
||||||
@ -548,15 +548,15 @@ func TestGuaranteedUpdate(t *testing.T) {
|
|||||||
// Update an existing node.
|
// Update an existing node.
|
||||||
callbackCalled := false
|
callbackCalled := false
|
||||||
objUpdate := &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: 2}
|
objUpdate := &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: 2}
|
||||||
err = helper.GuaranteedUpdate("/some/key", &TestResource{}, true, func(in runtime.Object) (runtime.Object, uint64, error) {
|
err = helper.GuaranteedUpdate("/some/key", &TestResource{}, true, SimpleUpdate(func(in runtime.Object) (runtime.Object, error) {
|
||||||
callbackCalled = true
|
callbackCalled = true
|
||||||
|
|
||||||
if in.(*TestResource).Value != 1 {
|
if in.(*TestResource).Value != 1 {
|
||||||
t.Errorf("Callback input was not current set value")
|
t.Errorf("Callback input was not current set value")
|
||||||
}
|
}
|
||||||
|
|
||||||
return objUpdate, 0, nil
|
return objUpdate, nil
|
||||||
})
|
}))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("Unexpected error %#v", err)
|
t.Errorf("Unexpected error %#v", err)
|
||||||
}
|
}
|
||||||
@ -575,6 +575,107 @@ func TestGuaranteedUpdate(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestGuaranteedUpdateTTL(t *testing.T) {
|
||||||
|
fakeClient := NewFakeEtcdClient(t)
|
||||||
|
fakeClient.TestIndex = true
|
||||||
|
helper := NewEtcdHelper(fakeClient, codec, etcdtest.PathPrefix())
|
||||||
|
key := etcdtest.AddPrefix("/some/key")
|
||||||
|
|
||||||
|
// Create a new node.
|
||||||
|
fakeClient.ExpectNotFoundGet(key)
|
||||||
|
obj := &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: 1}
|
||||||
|
err := helper.GuaranteedUpdate("/some/key", &TestResource{}, true, func(in runtime.Object, res ResponseMeta) (runtime.Object, *uint64, error) {
|
||||||
|
if res.TTL != 0 {
|
||||||
|
t.Fatalf("unexpected response meta: %#v", res)
|
||||||
|
}
|
||||||
|
ttl := uint64(10)
|
||||||
|
return obj, &ttl, nil
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("Unexpected error %#v", err)
|
||||||
|
}
|
||||||
|
data, err := codec.Encode(obj)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("Unexpected error %#v", err)
|
||||||
|
}
|
||||||
|
expect := string(data)
|
||||||
|
got := fakeClient.Data[key].R.Node.Value
|
||||||
|
if expect != got {
|
||||||
|
t.Errorf("Wanted %v, got %v", expect, got)
|
||||||
|
}
|
||||||
|
if fakeClient.Data[key].R.Node.TTL != 10 {
|
||||||
|
t.Errorf("expected TTL set: %d", fakeClient.Data[key].R.Node.TTL)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update an existing node.
|
||||||
|
callbackCalled := false
|
||||||
|
objUpdate := &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: 2}
|
||||||
|
err = helper.GuaranteedUpdate("/some/key", &TestResource{}, true, func(in runtime.Object, res ResponseMeta) (runtime.Object, *uint64, error) {
|
||||||
|
if res.TTL != 10 {
|
||||||
|
t.Fatalf("unexpected response meta: %#v", res)
|
||||||
|
}
|
||||||
|
callbackCalled = true
|
||||||
|
|
||||||
|
if in.(*TestResource).Value != 1 {
|
||||||
|
t.Errorf("Callback input was not current set value")
|
||||||
|
}
|
||||||
|
|
||||||
|
return objUpdate, nil, nil
|
||||||
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("Unexpected error %#v", err)
|
||||||
|
}
|
||||||
|
data, err = codec.Encode(objUpdate)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("Unexpected error %#v", err)
|
||||||
|
}
|
||||||
|
expect = string(data)
|
||||||
|
got = fakeClient.Data[key].R.Node.Value
|
||||||
|
if expect != got {
|
||||||
|
t.Errorf("Wanted %v, got %v", expect, got)
|
||||||
|
}
|
||||||
|
if fakeClient.Data[key].R.Node.TTL != 10 {
|
||||||
|
t.Errorf("expected TTL remained set: %d", fakeClient.Data[key].R.Node.TTL)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update an existing node and change ttl
|
||||||
|
callbackCalled = false
|
||||||
|
objUpdate = &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: 3}
|
||||||
|
err = helper.GuaranteedUpdate("/some/key", &TestResource{}, true, func(in runtime.Object, res ResponseMeta) (runtime.Object, *uint64, error) {
|
||||||
|
if res.TTL != 10 {
|
||||||
|
t.Fatalf("unexpected response meta: %#v", res)
|
||||||
|
}
|
||||||
|
callbackCalled = true
|
||||||
|
|
||||||
|
if in.(*TestResource).Value != 2 {
|
||||||
|
t.Errorf("Callback input was not current set value")
|
||||||
|
}
|
||||||
|
|
||||||
|
newTTL := uint64(20)
|
||||||
|
return objUpdate, &newTTL, nil
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("Unexpected error %#v", err)
|
||||||
|
}
|
||||||
|
data, err = codec.Encode(objUpdate)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("Unexpected error %#v", err)
|
||||||
|
}
|
||||||
|
expect = string(data)
|
||||||
|
got = fakeClient.Data[key].R.Node.Value
|
||||||
|
if expect != got {
|
||||||
|
t.Errorf("Wanted %v, got %v", expect, got)
|
||||||
|
}
|
||||||
|
if fakeClient.Data[key].R.Node.TTL != 20 {
|
||||||
|
t.Errorf("expected TTL changed: %d", fakeClient.Data[key].R.Node.TTL)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !callbackCalled {
|
||||||
|
t.Errorf("tryUpdate callback should have been called.")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestGuaranteedUpdateNoChange(t *testing.T) {
|
func TestGuaranteedUpdateNoChange(t *testing.T) {
|
||||||
fakeClient := NewFakeEtcdClient(t)
|
fakeClient := NewFakeEtcdClient(t)
|
||||||
fakeClient.TestIndex = true
|
fakeClient.TestIndex = true
|
||||||
@ -584,9 +685,9 @@ func TestGuaranteedUpdateNoChange(t *testing.T) {
|
|||||||
// Create a new node.
|
// Create a new node.
|
||||||
fakeClient.ExpectNotFoundGet(key)
|
fakeClient.ExpectNotFoundGet(key)
|
||||||
obj := &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: 1}
|
obj := &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: 1}
|
||||||
err := helper.GuaranteedUpdate("/some/key", &TestResource{}, true, func(in runtime.Object) (runtime.Object, uint64, error) {
|
err := helper.GuaranteedUpdate("/some/key", &TestResource{}, true, SimpleUpdate(func(in runtime.Object) (runtime.Object, error) {
|
||||||
return obj, 0, nil
|
return obj, nil
|
||||||
})
|
}))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("Unexpected error %#v", err)
|
t.Errorf("Unexpected error %#v", err)
|
||||||
}
|
}
|
||||||
@ -594,11 +695,11 @@ func TestGuaranteedUpdateNoChange(t *testing.T) {
|
|||||||
// Update an existing node with the same data
|
// Update an existing node with the same data
|
||||||
callbackCalled := false
|
callbackCalled := false
|
||||||
objUpdate := &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: 1}
|
objUpdate := &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: 1}
|
||||||
err = helper.GuaranteedUpdate("/some/key", &TestResource{}, true, func(in runtime.Object) (runtime.Object, uint64, error) {
|
err = helper.GuaranteedUpdate("/some/key", &TestResource{}, true, SimpleUpdate(func(in runtime.Object) (runtime.Object, error) {
|
||||||
fakeClient.Err = errors.New("should not be called")
|
fakeClient.Err = errors.New("should not be called")
|
||||||
callbackCalled = true
|
callbackCalled = true
|
||||||
return objUpdate, 0, nil
|
return objUpdate, nil
|
||||||
})
|
}))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Unexpected error %#v", err)
|
t.Fatalf("Unexpected error %#v", err)
|
||||||
}
|
}
|
||||||
@ -617,9 +718,9 @@ func TestGuaranteedUpdateKeyNotFound(t *testing.T) {
|
|||||||
fakeClient.ExpectNotFoundGet(key)
|
fakeClient.ExpectNotFoundGet(key)
|
||||||
obj := &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: 1}
|
obj := &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: 1}
|
||||||
|
|
||||||
f := func(in runtime.Object) (runtime.Object, uint64, error) {
|
f := SimpleUpdate(func(in runtime.Object) (runtime.Object, error) {
|
||||||
return obj, 0, nil
|
return obj, nil
|
||||||
}
|
})
|
||||||
|
|
||||||
ignoreNotFound := false
|
ignoreNotFound := false
|
||||||
err := helper.GuaranteedUpdate("/some/key", &TestResource{}, ignoreNotFound, f)
|
err := helper.GuaranteedUpdate("/some/key", &TestResource{}, ignoreNotFound, f)
|
||||||
@ -654,7 +755,7 @@ func TestGuaranteedUpdate_CreateCollision(t *testing.T) {
|
|||||||
defer wgDone.Done()
|
defer wgDone.Done()
|
||||||
|
|
||||||
firstCall := true
|
firstCall := true
|
||||||
err := helper.GuaranteedUpdate("/some/key", &TestResource{}, true, func(in runtime.Object) (runtime.Object, uint64, error) {
|
err := helper.GuaranteedUpdate("/some/key", &TestResource{}, true, SimpleUpdate(func(in runtime.Object) (runtime.Object, error) {
|
||||||
defer func() { firstCall = false }()
|
defer func() { firstCall = false }()
|
||||||
|
|
||||||
if firstCall {
|
if firstCall {
|
||||||
@ -665,8 +766,8 @@ func TestGuaranteedUpdate_CreateCollision(t *testing.T) {
|
|||||||
|
|
||||||
currValue := in.(*TestResource).Value
|
currValue := in.(*TestResource).Value
|
||||||
obj := &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: currValue + 1}
|
obj := &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: currValue + 1}
|
||||||
return obj, 0, nil
|
return obj, nil
|
||||||
})
|
}))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("Unexpected error %#v", err)
|
t.Errorf("Unexpected error %#v", err)
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user