diff --git a/pkg/storage/cacher.go b/pkg/storage/cacher.go index c810c8d608c..ae64f741dcb 100644 --- a/pkg/storage/cacher.go +++ b/pkg/storage/cacher.go @@ -462,7 +462,21 @@ func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, p } // Implements storage.Interface. -func (c *Cacher) GuaranteedUpdate(ctx context.Context, key string, ptrToType runtime.Object, ignoreNotFound bool, preconditions *Preconditions, tryUpdate UpdateFunc) error { +func (c *Cacher) GuaranteedUpdate( + ctx context.Context, key string, ptrToType runtime.Object, ignoreNotFound bool, + preconditions *Preconditions, tryUpdate UpdateFunc, _ ...runtime.Object) error { + // Ignore the suggestion and try to pass down the current version of the object + // read from cache. + if elem, exists, err := c.watchCache.GetByKey(key); err != nil { + glog.Errorf("GetByKey returned error: %v", err) + } else if exists { + currObj, copyErr := api.Scheme.Copy(elem.(*storeElement).Object) + if copyErr == nil { + return c.storage.GuaranteedUpdate(ctx, key, ptrToType, ignoreNotFound, preconditions, tryUpdate, currObj) + } + glog.Errorf("couldn't copy object: %v", copyErr) + } + // If we couldn't get the object, fallback to no-suggestion. return c.storage.GuaranteedUpdate(ctx, key, ptrToType, ignoreNotFound, preconditions, tryUpdate) } diff --git a/pkg/storage/etcd/etcd_helper.go b/pkg/storage/etcd/etcd_helper.go index b01a5e15f51..a7245905c03 100644 --- a/pkg/storage/etcd/etcd_helper.go +++ b/pkg/storage/etcd/etcd_helper.go @@ -434,7 +434,10 @@ func (h *etcdHelper) listEtcdNode(ctx context.Context, key string) ([]*etcd.Node } // Implements storage.Interface. -func (h *etcdHelper) GuaranteedUpdate(ctx context.Context, key string, ptrToType runtime.Object, ignoreNotFound bool, preconditions *storage.Preconditions, tryUpdate storage.UpdateFunc) error { +func (h *etcdHelper) GuaranteedUpdate( + ctx context.Context, key string, ptrToType runtime.Object, ignoreNotFound bool, + preconditions *storage.Preconditions, tryUpdate storage.UpdateFunc, _ ...runtime.Object) error { + // Ignore the suggestion about current object. if ctx == nil { glog.Errorf("Context is nil") } diff --git a/pkg/storage/etcd3/store.go b/pkg/storage/etcd3/store.go index 2566450fbf4..571623ca4b3 100644 --- a/pkg/storage/etcd3/store.go +++ b/pkg/storage/etcd3/store.go @@ -211,22 +211,33 @@ func (s *store) conditionalDelete(ctx context.Context, key string, out runtime.O } // GuaranteedUpdate implements storage.Interface.GuaranteedUpdate. -func (s *store) GuaranteedUpdate(ctx context.Context, key string, out runtime.Object, ignoreNotFound bool, precondtions *storage.Preconditions, tryUpdate storage.UpdateFunc) error { +func (s *store) GuaranteedUpdate( + ctx context.Context, key string, out runtime.Object, ignoreNotFound bool, + precondtions *storage.Preconditions, tryUpdate storage.UpdateFunc, suggestion ...runtime.Object) error { v, err := conversion.EnforcePtr(out) if err != nil { panic("unable to convert output object to pointer") } key = keyWithPrefix(s.pathPrefix, key) - getResp, err := s.client.KV.Get(ctx, key, s.getOps...) - if err != nil { - return err - } - for { - origState, err := s.getState(getResp, key, v, ignoreNotFound) + + var origState *objState + if len(suggestion) == 1 && suggestion[0] != nil { + origState, err = s.getStateFromObject(suggestion[0]) if err != nil { return err } + } else { + getResp, err := s.client.KV.Get(ctx, key, s.getOps...) + if err != nil { + return err + } + origState, err = s.getState(getResp, key, v, ignoreNotFound) + if err != nil { + return err + } + } + for { if err := checkPreconditions(key, precondtions, origState.obj); err != nil { return err } @@ -260,8 +271,12 @@ func (s *store) GuaranteedUpdate(ctx context.Context, key string, out runtime.Ob return err } if !txnResp.Succeeded { - getResp = (*clientv3.GetResponse)(txnResp.Responses[0].GetResponseRange()) + getResp := (*clientv3.GetResponse)(txnResp.Responses[0].GetResponseRange()) glog.V(4).Infof("GuaranteedUpdate of %s failed because of a conflict, going to retry", key) + origState, err = s.getState(getResp, key, v, ignoreNotFound) + if err != nil { + return err + } continue } putResp := txnResp.Responses[0].GetResponsePut() @@ -369,6 +384,32 @@ func (s *store) getState(getResp *clientv3.GetResponse, key string, v reflect.Va return state, nil } +func (s *store) getStateFromObject(obj runtime.Object) (*objState, error) { + state := &objState{ + obj: obj, + meta: &storage.ResponseMeta{}, + } + + rv, err := s.versioner.ObjectResourceVersion(obj) + if err != nil { + return nil, fmt.Errorf("couldn't get resource version: %v", err) + } + state.rev = int64(rv) + state.meta.ResourceVersion = uint64(state.rev) + + // Compute the serialized form - for that we need to temporarily clean + // its resource version field (those are not stored in etcd). + if err := s.versioner.UpdateObject(obj, 0); err != nil { + return nil, errors.New("resourceVersion cannot be set on objects store in etcd") + } + state.data, err = runtime.Encode(s.codec, obj) + if err != nil { + return nil, err + } + s.versioner.UpdateObject(state.obj, uint64(rv)) + return state, nil +} + func (s *store) updateState(st *objState, userUpdate storage.UpdateFunc) (runtime.Object, uint64, error) { ret, ttlPtr, err := userUpdate(st.obj, *st.meta) if err != nil { diff --git a/pkg/storage/interfaces.go b/pkg/storage/interfaces.go index c6a0fdbc82b..612d3741b7b 100644 --- a/pkg/storage/interfaces.go +++ b/pkg/storage/interfaces.go @@ -147,6 +147,9 @@ type Interface interface { // or zero value in 'ptrToType' parameter otherwise. // If the object to update has the same value as previous, it won't do any update // but will return the object in 'ptrToType' parameter. + // If 'suggestion' can contain zero or one element - in such case this can be used as + // a suggestion about the current version of the object to avoid read operation from + // storage to get it. // // Example: // @@ -166,5 +169,7 @@ type Interface interface { // return cur, nil, nil // } // }) - GuaranteedUpdate(ctx context.Context, key string, ptrToType runtime.Object, ignoreNotFound bool, precondtions *Preconditions, tryUpdate UpdateFunc) error + GuaranteedUpdate( + ctx context.Context, key string, ptrToType runtime.Object, ignoreNotFound bool, + precondtions *Preconditions, tryUpdate UpdateFunc, suggestion ...runtime.Object) error }