From 2fcd321c426d2567e74b97da38b4860638385007 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Fri, 31 May 2024 11:07:38 +0200 Subject: [PATCH] Migrate Delete and GuaranteedUpdate to Kubernetes client --- .../apiserver/pkg/storage/etcd3/store.go | 77 ++++++------------- 1 file changed, 25 insertions(+), 52 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go index c938465da87..481d496e297 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go @@ -19,13 +19,13 @@ package etcd3 import ( "bytes" "context" - "errors" "fmt" "path" "reflect" "strings" "time" + "go.etcd.io/etcd/api/v3/mvccpb" clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/client/v3/kubernetes" "go.opentelemetry.io/otel/attribute" @@ -347,21 +347,16 @@ func (s *store) conditionalDelete( } startTime := time.Now() - txnResp, err := s.client.KV.Txn(ctx).If( - clientv3.Compare(clientv3.ModRevision(key), "=", origState.rev), - ).Then( - clientv3.OpDelete(key), - ).Else( - clientv3.OpGet(key), - ).Commit() + txnResp, err := s.client.Kubernetes.OptimisticDelete(ctx, key, origState.rev, kubernetes.DeleteOptions{ + GetOnFailure: true, + }) metrics.RecordEtcdRequest("delete", s.groupResourceString, err, startTime) if err != nil { return err } if !txnResp.Succeeded { - getResp := (*clientv3.GetResponse)(txnResp.Responses[0].GetResponseRange()) klog.V(4).Infof("deletion of %s failed because of a conflict, going to retry", key) - origState, err = s.getState(ctx, getResp, key, v, false, skipTransformDecode) + origState, err = s.getState(ctx, txnResp.KV, key, v, false, skipTransformDecode) if err != nil { return err } @@ -369,15 +364,8 @@ func (s *store) conditionalDelete( continue } - if len(txnResp.Responses) == 0 || txnResp.Responses[0].GetResponseDeleteRange() == nil { - return errors.New(fmt.Sprintf("invalid DeleteRange response: %v", txnResp.Responses)) - } - deleteResp := txnResp.Responses[0].GetResponseDeleteRange() - if deleteResp.Header == nil { - return errors.New("invalid DeleteRange response - nil header") - } if !skipTransformDecode { - err = s.decoder.Decode(origState.data, out, deleteResp.Header.Revision) + err = s.decoder.Decode(origState.data, out, txnResp.Revision) if err != nil { recordDecodeError(s.groupResourceString, key) return err @@ -510,20 +498,21 @@ func (s *store) GuaranteedUpdate( } span.AddEvent("TransformToStorage succeeded") - opts, err := s.ttlOpts(ctx, int64(ttl)) - if err != nil { - return err + var lease clientv3.LeaseID + if ttl != 0 { + lease, err = s.leaseManager.GetLease(ctx, int64(ttl)) + if err != nil { + return err + } } span.AddEvent("Transaction prepared") startTime := time.Now() - txnResp, err := s.client.KV.Txn(ctx).If( - clientv3.Compare(clientv3.ModRevision(preparedKey), "=", origState.rev), - ).Then( - clientv3.OpPut(preparedKey, string(newData), opts...), - ).Else( - clientv3.OpGet(preparedKey), - ).Commit() + + txnResp, err := s.client.Kubernetes.OptimisticPut(ctx, preparedKey, newData, origState.rev, kubernetes.PutOptions{ + GetOnFailure: true, + LeaseID: lease, + }) metrics.RecordEtcdRequest("update", s.groupResourceString, err, startTime) if err != nil { span.AddEvent("Txn call failed", attribute.String("err", err.Error())) @@ -532,10 +521,8 @@ func (s *store) GuaranteedUpdate( span.AddEvent("Txn call completed") span.AddEvent("Transaction committed") if !txnResp.Succeeded { - getResp := (*clientv3.GetResponse)(txnResp.Responses[0].GetResponseRange()) klog.V(4).Infof("GuaranteedUpdate of %s failed because of a conflict, going to retry", preparedKey) - skipTransformDecode := false - origState, err = s.getState(ctx, getResp, preparedKey, v, ignoreNotFound, skipTransformDecode) + origState, err = s.getState(ctx, txnResp.KV, preparedKey, v, ignoreNotFound, skipTransformDecode) if err != nil { return err } @@ -543,9 +530,8 @@ func (s *store) GuaranteedUpdate( origStateIsCurrent = true continue } - putResp := txnResp.Responses[0].GetResponsePut() - err = s.decoder.Decode(data, destination, putResp.Header.Revision) + err = s.decoder.Decode(data, destination, txnResp.Revision) if err != nil { span.AddEvent("decode failed", attribute.Int("len", len(data)), attribute.String("err", err.Error())) recordDecodeError(s.groupResourceString, preparedKey) @@ -885,12 +871,12 @@ func (s *store) watchContext(ctx context.Context) context.Context { func (s *store) getCurrentState(ctx context.Context, key string, v reflect.Value, ignoreNotFound bool, skipTransformDecode bool) func() (*objState, error) { return func() (*objState, error) { startTime := time.Now() - getResp, err := s.client.KV.Get(ctx, key) + getResp, err := s.client.Kubernetes.Get(ctx, key, kubernetes.GetOptions{}) metrics.RecordEtcdRequest("get", s.groupResourceString, err, startTime) if err != nil { return nil, err } - return s.getState(ctx, getResp, key, v, ignoreNotFound, skipTransformDecode) + return s.getState(ctx, getResp.KV, key, v, ignoreNotFound, skipTransformDecode) } } @@ -900,7 +886,7 @@ func (s *store) getCurrentState(ctx context.Context, key string, v reflect.Value // storage will be transformed and decoded. // NOTE: when skipTransformDecode is true, the 'data', and the 'obj' fields // of the objState will be nil, and 'stale' will be set to true. -func (s *store) getState(ctx context.Context, getResp *clientv3.GetResponse, key string, v reflect.Value, ignoreNotFound bool, skipTransformDecode bool) (*objState, error) { +func (s *store) getState(ctx context.Context, kv *mvccpb.KeyValue, key string, v reflect.Value, ignoreNotFound bool, skipTransformDecode bool) (*objState, error) { state := &objState{ meta: &storage.ResponseMeta{}, } @@ -911,7 +897,7 @@ func (s *store) getState(ctx context.Context, getResp *clientv3.GetResponse, key state.obj = reflect.New(v.Type()).Interface().(runtime.Object) } - if len(getResp.Kvs) == 0 { + if kv == nil { if !ignoreNotFound { return nil, storage.NewKeyNotFoundError(key, 0) } @@ -919,7 +905,7 @@ func (s *store) getState(ctx context.Context, getResp *clientv3.GetResponse, key return nil, err } } else { - state.rev = getResp.Kvs[0].ModRevision + state.rev = kv.ModRevision state.meta.ResourceVersion = uint64(state.rev) if skipTransformDecode { @@ -929,7 +915,7 @@ func (s *store) getState(ctx context.Context, getResp *clientv3.GetResponse, key return state, nil } - data, stale, err := s.transformer.TransformFromStorage(ctx, getResp.Kvs[0].Value, authenticatedDataString(key)) + data, stale, err := s.transformer.TransformFromStorage(ctx, kv.Value, authenticatedDataString(key)) if err != nil { return nil, storage.NewInternalError(err) } @@ -989,19 +975,6 @@ func (s *store) updateState(st *objState, userUpdate storage.UpdateFunc) (runtim return ret, ttl, nil } -// ttlOpts returns client options based on given ttl. -// ttl: if ttl is non-zero, it will attach the key to a lease with ttl of roughly the same length -func (s *store) ttlOpts(ctx context.Context, ttl int64) ([]clientv3.OpOption, error) { - if ttl == 0 { - return nil, nil - } - id, err := s.leaseManager.GetLease(ctx, ttl) - if err != nil { - return nil, err - } - return []clientv3.OpOption{clientv3.WithLease(id)}, nil -} - // validateMinimumResourceVersion returns a 'too large resource' version error when the provided minimumResourceVersion is // greater than the most recent actualRevision available from storage. func (s *store) validateMinimumResourceVersion(minimumResourceVersion string, actualRevision uint64) error {