mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-28 14:07:14 +00:00
Merge pull request #35415 from wojtek-t/avoid_get
Automatic merge from submit-queue Try to avoid Get to etcd in GuaranteedUpdate in Cacher
This commit is contained in:
commit
6f80ec91d6
@ -462,7 +462,21 @@ func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, p
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Implements storage.Interface.
|
// Implements storage.Interface.
|
||||||
func (c *Cacher) GuaranteedUpdate(ctx context.Context, key string, ptrToType runtime.Object, ignoreNotFound bool, preconditions *Preconditions, tryUpdate UpdateFunc) error {
|
func (c *Cacher) GuaranteedUpdate(
|
||||||
|
ctx context.Context, key string, ptrToType runtime.Object, ignoreNotFound bool,
|
||||||
|
preconditions *Preconditions, tryUpdate UpdateFunc, _ ...runtime.Object) error {
|
||||||
|
// Ignore the suggestion and try to pass down the current version of the object
|
||||||
|
// read from cache.
|
||||||
|
if elem, exists, err := c.watchCache.GetByKey(key); err != nil {
|
||||||
|
glog.Errorf("GetByKey returned error: %v", err)
|
||||||
|
} else if exists {
|
||||||
|
currObj, copyErr := api.Scheme.Copy(elem.(*storeElement).Object)
|
||||||
|
if copyErr == nil {
|
||||||
|
return c.storage.GuaranteedUpdate(ctx, key, ptrToType, ignoreNotFound, preconditions, tryUpdate, currObj)
|
||||||
|
}
|
||||||
|
glog.Errorf("couldn't copy object: %v", copyErr)
|
||||||
|
}
|
||||||
|
// If we couldn't get the object, fallback to no-suggestion.
|
||||||
return c.storage.GuaranteedUpdate(ctx, key, ptrToType, ignoreNotFound, preconditions, tryUpdate)
|
return c.storage.GuaranteedUpdate(ctx, key, ptrToType, ignoreNotFound, preconditions, tryUpdate)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -434,7 +434,10 @@ func (h *etcdHelper) listEtcdNode(ctx context.Context, key string) ([]*etcd.Node
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Implements storage.Interface.
|
// Implements storage.Interface.
|
||||||
func (h *etcdHelper) GuaranteedUpdate(ctx context.Context, key string, ptrToType runtime.Object, ignoreNotFound bool, preconditions *storage.Preconditions, tryUpdate storage.UpdateFunc) error {
|
func (h *etcdHelper) GuaranteedUpdate(
|
||||||
|
ctx context.Context, key string, ptrToType runtime.Object, ignoreNotFound bool,
|
||||||
|
preconditions *storage.Preconditions, tryUpdate storage.UpdateFunc, _ ...runtime.Object) error {
|
||||||
|
// Ignore the suggestion about current object.
|
||||||
if ctx == nil {
|
if ctx == nil {
|
||||||
glog.Errorf("Context is nil")
|
glog.Errorf("Context is nil")
|
||||||
}
|
}
|
||||||
|
@ -211,22 +211,33 @@ func (s *store) conditionalDelete(ctx context.Context, key string, out runtime.O
|
|||||||
}
|
}
|
||||||
|
|
||||||
// GuaranteedUpdate implements storage.Interface.GuaranteedUpdate.
|
// GuaranteedUpdate implements storage.Interface.GuaranteedUpdate.
|
||||||
func (s *store) GuaranteedUpdate(ctx context.Context, key string, out runtime.Object, ignoreNotFound bool, precondtions *storage.Preconditions, tryUpdate storage.UpdateFunc) error {
|
func (s *store) GuaranteedUpdate(
|
||||||
|
ctx context.Context, key string, out runtime.Object, ignoreNotFound bool,
|
||||||
|
precondtions *storage.Preconditions, tryUpdate storage.UpdateFunc, suggestion ...runtime.Object) error {
|
||||||
v, err := conversion.EnforcePtr(out)
|
v, err := conversion.EnforcePtr(out)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic("unable to convert output object to pointer")
|
panic("unable to convert output object to pointer")
|
||||||
}
|
}
|
||||||
key = keyWithPrefix(s.pathPrefix, key)
|
key = keyWithPrefix(s.pathPrefix, key)
|
||||||
getResp, err := s.client.KV.Get(ctx, key, s.getOps...)
|
|
||||||
if err != nil {
|
var origState *objState
|
||||||
return err
|
if len(suggestion) == 1 && suggestion[0] != nil {
|
||||||
}
|
origState, err = s.getStateFromObject(suggestion[0])
|
||||||
for {
|
|
||||||
origState, err := s.getState(getResp, key, v, ignoreNotFound)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
getResp, err := s.client.KV.Get(ctx, key, s.getOps...)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
origState, err = s.getState(getResp, key, v, ignoreNotFound)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for {
|
||||||
if err := checkPreconditions(key, precondtions, origState.obj); err != nil {
|
if err := checkPreconditions(key, precondtions, origState.obj); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -260,8 +271,12 @@ func (s *store) GuaranteedUpdate(ctx context.Context, key string, out runtime.Ob
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if !txnResp.Succeeded {
|
if !txnResp.Succeeded {
|
||||||
getResp = (*clientv3.GetResponse)(txnResp.Responses[0].GetResponseRange())
|
getResp := (*clientv3.GetResponse)(txnResp.Responses[0].GetResponseRange())
|
||||||
glog.V(4).Infof("GuaranteedUpdate of %s failed because of a conflict, going to retry", key)
|
glog.V(4).Infof("GuaranteedUpdate of %s failed because of a conflict, going to retry", key)
|
||||||
|
origState, err = s.getState(getResp, key, v, ignoreNotFound)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
putResp := txnResp.Responses[0].GetResponsePut()
|
putResp := txnResp.Responses[0].GetResponsePut()
|
||||||
@ -369,6 +384,32 @@ func (s *store) getState(getResp *clientv3.GetResponse, key string, v reflect.Va
|
|||||||
return state, nil
|
return state, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *store) getStateFromObject(obj runtime.Object) (*objState, error) {
|
||||||
|
state := &objState{
|
||||||
|
obj: obj,
|
||||||
|
meta: &storage.ResponseMeta{},
|
||||||
|
}
|
||||||
|
|
||||||
|
rv, err := s.versioner.ObjectResourceVersion(obj)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("couldn't get resource version: %v", err)
|
||||||
|
}
|
||||||
|
state.rev = int64(rv)
|
||||||
|
state.meta.ResourceVersion = uint64(state.rev)
|
||||||
|
|
||||||
|
// Compute the serialized form - for that we need to temporarily clean
|
||||||
|
// its resource version field (those are not stored in etcd).
|
||||||
|
if err := s.versioner.UpdateObject(obj, 0); err != nil {
|
||||||
|
return nil, errors.New("resourceVersion cannot be set on objects store in etcd")
|
||||||
|
}
|
||||||
|
state.data, err = runtime.Encode(s.codec, obj)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
s.versioner.UpdateObject(state.obj, uint64(rv))
|
||||||
|
return state, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (s *store) updateState(st *objState, userUpdate storage.UpdateFunc) (runtime.Object, uint64, error) {
|
func (s *store) updateState(st *objState, userUpdate storage.UpdateFunc) (runtime.Object, uint64, error) {
|
||||||
ret, ttlPtr, err := userUpdate(st.obj, *st.meta)
|
ret, ttlPtr, err := userUpdate(st.obj, *st.meta)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -147,6 +147,9 @@ type Interface interface {
|
|||||||
// or zero value in 'ptrToType' parameter otherwise.
|
// or zero value in 'ptrToType' parameter otherwise.
|
||||||
// If the object to update has the same value as previous, it won't do any update
|
// If the object to update has the same value as previous, it won't do any update
|
||||||
// but will return the object in 'ptrToType' parameter.
|
// but will return the object in 'ptrToType' parameter.
|
||||||
|
// If 'suggestion' can contain zero or one element - in such case this can be used as
|
||||||
|
// a suggestion about the current version of the object to avoid read operation from
|
||||||
|
// storage to get it.
|
||||||
//
|
//
|
||||||
// Example:
|
// Example:
|
||||||
//
|
//
|
||||||
@ -166,5 +169,7 @@ type Interface interface {
|
|||||||
// return cur, nil, nil
|
// return cur, nil, nil
|
||||||
// }
|
// }
|
||||||
// })
|
// })
|
||||||
GuaranteedUpdate(ctx context.Context, key string, ptrToType runtime.Object, ignoreNotFound bool, precondtions *Preconditions, tryUpdate UpdateFunc) error
|
GuaranteedUpdate(
|
||||||
|
ctx context.Context, key string, ptrToType runtime.Object, ignoreNotFound bool,
|
||||||
|
precondtions *Preconditions, tryUpdate UpdateFunc, suggestion ...runtime.Object) error
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user