Migrate Delete and GuaranteedUpdate to Kubernetes client

This commit is contained in:
Marek Siarkowicz 2024-05-31 11:07:38 +02:00
parent 53ca81da29
commit 2fcd321c42

View File

@ -19,13 +19,13 @@ package etcd3
import ( import (
"bytes" "bytes"
"context" "context"
"errors"
"fmt" "fmt"
"path" "path"
"reflect" "reflect"
"strings" "strings"
"time" "time"
"go.etcd.io/etcd/api/v3/mvccpb"
clientv3 "go.etcd.io/etcd/client/v3" clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/kubernetes" "go.etcd.io/etcd/client/v3/kubernetes"
"go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/attribute"
@ -347,21 +347,16 @@ func (s *store) conditionalDelete(
} }
startTime := time.Now() startTime := time.Now()
txnResp, err := s.client.KV.Txn(ctx).If( txnResp, err := s.client.Kubernetes.OptimisticDelete(ctx, key, origState.rev, kubernetes.DeleteOptions{
clientv3.Compare(clientv3.ModRevision(key), "=", origState.rev), GetOnFailure: true,
).Then( })
clientv3.OpDelete(key),
).Else(
clientv3.OpGet(key),
).Commit()
metrics.RecordEtcdRequest("delete", s.groupResourceString, err, startTime) metrics.RecordEtcdRequest("delete", s.groupResourceString, err, startTime)
if err != nil { if err != nil {
return err return err
} }
if !txnResp.Succeeded { if !txnResp.Succeeded {
getResp := (*clientv3.GetResponse)(txnResp.Responses[0].GetResponseRange())
klog.V(4).Infof("deletion of %s failed because of a conflict, going to retry", key) klog.V(4).Infof("deletion of %s failed because of a conflict, going to retry", key)
origState, err = s.getState(ctx, getResp, key, v, false, skipTransformDecode) origState, err = s.getState(ctx, txnResp.KV, key, v, false, skipTransformDecode)
if err != nil { if err != nil {
return err return err
} }
@ -369,15 +364,8 @@ func (s *store) conditionalDelete(
continue continue
} }
if len(txnResp.Responses) == 0 || txnResp.Responses[0].GetResponseDeleteRange() == nil {
return errors.New(fmt.Sprintf("invalid DeleteRange response: %v", txnResp.Responses))
}
deleteResp := txnResp.Responses[0].GetResponseDeleteRange()
if deleteResp.Header == nil {
return errors.New("invalid DeleteRange response - nil header")
}
if !skipTransformDecode { if !skipTransformDecode {
err = s.decoder.Decode(origState.data, out, deleteResp.Header.Revision) err = s.decoder.Decode(origState.data, out, txnResp.Revision)
if err != nil { if err != nil {
recordDecodeError(s.groupResourceString, key) recordDecodeError(s.groupResourceString, key)
return err return err
@ -510,20 +498,21 @@ func (s *store) GuaranteedUpdate(
} }
span.AddEvent("TransformToStorage succeeded") span.AddEvent("TransformToStorage succeeded")
opts, err := s.ttlOpts(ctx, int64(ttl)) var lease clientv3.LeaseID
if ttl != 0 {
lease, err = s.leaseManager.GetLease(ctx, int64(ttl))
if err != nil { if err != nil {
return err return err
} }
}
span.AddEvent("Transaction prepared") span.AddEvent("Transaction prepared")
startTime := time.Now() startTime := time.Now()
txnResp, err := s.client.KV.Txn(ctx).If(
clientv3.Compare(clientv3.ModRevision(preparedKey), "=", origState.rev), txnResp, err := s.client.Kubernetes.OptimisticPut(ctx, preparedKey, newData, origState.rev, kubernetes.PutOptions{
).Then( GetOnFailure: true,
clientv3.OpPut(preparedKey, string(newData), opts...), LeaseID: lease,
).Else( })
clientv3.OpGet(preparedKey),
).Commit()
metrics.RecordEtcdRequest("update", s.groupResourceString, err, startTime) metrics.RecordEtcdRequest("update", s.groupResourceString, err, startTime)
if err != nil { if err != nil {
span.AddEvent("Txn call failed", attribute.String("err", err.Error())) span.AddEvent("Txn call failed", attribute.String("err", err.Error()))
@ -532,10 +521,8 @@ func (s *store) GuaranteedUpdate(
span.AddEvent("Txn call completed") span.AddEvent("Txn call completed")
span.AddEvent("Transaction committed") span.AddEvent("Transaction committed")
if !txnResp.Succeeded { if !txnResp.Succeeded {
getResp := (*clientv3.GetResponse)(txnResp.Responses[0].GetResponseRange())
klog.V(4).Infof("GuaranteedUpdate of %s failed because of a conflict, going to retry", preparedKey) klog.V(4).Infof("GuaranteedUpdate of %s failed because of a conflict, going to retry", preparedKey)
skipTransformDecode := false origState, err = s.getState(ctx, txnResp.KV, preparedKey, v, ignoreNotFound, skipTransformDecode)
origState, err = s.getState(ctx, getResp, preparedKey, v, ignoreNotFound, skipTransformDecode)
if err != nil { if err != nil {
return err return err
} }
@ -543,9 +530,8 @@ func (s *store) GuaranteedUpdate(
origStateIsCurrent = true origStateIsCurrent = true
continue continue
} }
putResp := txnResp.Responses[0].GetResponsePut()
err = s.decoder.Decode(data, destination, putResp.Header.Revision) err = s.decoder.Decode(data, destination, txnResp.Revision)
if err != nil { if err != nil {
span.AddEvent("decode failed", attribute.Int("len", len(data)), attribute.String("err", err.Error())) span.AddEvent("decode failed", attribute.Int("len", len(data)), attribute.String("err", err.Error()))
recordDecodeError(s.groupResourceString, preparedKey) recordDecodeError(s.groupResourceString, preparedKey)
@ -885,12 +871,12 @@ func (s *store) watchContext(ctx context.Context) context.Context {
func (s *store) getCurrentState(ctx context.Context, key string, v reflect.Value, ignoreNotFound bool, skipTransformDecode bool) func() (*objState, error) { func (s *store) getCurrentState(ctx context.Context, key string, v reflect.Value, ignoreNotFound bool, skipTransformDecode bool) func() (*objState, error) {
return func() (*objState, error) { return func() (*objState, error) {
startTime := time.Now() startTime := time.Now()
getResp, err := s.client.KV.Get(ctx, key) getResp, err := s.client.Kubernetes.Get(ctx, key, kubernetes.GetOptions{})
metrics.RecordEtcdRequest("get", s.groupResourceString, err, startTime) metrics.RecordEtcdRequest("get", s.groupResourceString, err, startTime)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return s.getState(ctx, getResp, key, v, ignoreNotFound, skipTransformDecode) return s.getState(ctx, getResp.KV, key, v, ignoreNotFound, skipTransformDecode)
} }
} }
@ -900,7 +886,7 @@ func (s *store) getCurrentState(ctx context.Context, key string, v reflect.Value
// storage will be transformed and decoded. // storage will be transformed and decoded.
// NOTE: when skipTransformDecode is true, the 'data', and the 'obj' fields // NOTE: when skipTransformDecode is true, the 'data', and the 'obj' fields
// of the objState will be nil, and 'stale' will be set to true. // of the objState will be nil, and 'stale' will be set to true.
func (s *store) getState(ctx context.Context, getResp *clientv3.GetResponse, key string, v reflect.Value, ignoreNotFound bool, skipTransformDecode bool) (*objState, error) { func (s *store) getState(ctx context.Context, kv *mvccpb.KeyValue, key string, v reflect.Value, ignoreNotFound bool, skipTransformDecode bool) (*objState, error) {
state := &objState{ state := &objState{
meta: &storage.ResponseMeta{}, meta: &storage.ResponseMeta{},
} }
@ -911,7 +897,7 @@ func (s *store) getState(ctx context.Context, getResp *clientv3.GetResponse, key
state.obj = reflect.New(v.Type()).Interface().(runtime.Object) state.obj = reflect.New(v.Type()).Interface().(runtime.Object)
} }
if len(getResp.Kvs) == 0 { if kv == nil {
if !ignoreNotFound { if !ignoreNotFound {
return nil, storage.NewKeyNotFoundError(key, 0) return nil, storage.NewKeyNotFoundError(key, 0)
} }
@ -919,7 +905,7 @@ func (s *store) getState(ctx context.Context, getResp *clientv3.GetResponse, key
return nil, err return nil, err
} }
} else { } else {
state.rev = getResp.Kvs[0].ModRevision state.rev = kv.ModRevision
state.meta.ResourceVersion = uint64(state.rev) state.meta.ResourceVersion = uint64(state.rev)
if skipTransformDecode { if skipTransformDecode {
@ -929,7 +915,7 @@ func (s *store) getState(ctx context.Context, getResp *clientv3.GetResponse, key
return state, nil return state, nil
} }
data, stale, err := s.transformer.TransformFromStorage(ctx, getResp.Kvs[0].Value, authenticatedDataString(key)) data, stale, err := s.transformer.TransformFromStorage(ctx, kv.Value, authenticatedDataString(key))
if err != nil { if err != nil {
return nil, storage.NewInternalError(err) return nil, storage.NewInternalError(err)
} }
@ -989,19 +975,6 @@ func (s *store) updateState(st *objState, userUpdate storage.UpdateFunc) (runtim
return ret, ttl, nil return ret, ttl, nil
} }
// ttlOpts returns client options based on given ttl.
// ttl: if ttl is non-zero, it will attach the key to a lease with ttl of roughly the same length
func (s *store) ttlOpts(ctx context.Context, ttl int64) ([]clientv3.OpOption, error) {
if ttl == 0 {
return nil, nil
}
id, err := s.leaseManager.GetLease(ctx, ttl)
if err != nil {
return nil, err
}
return []clientv3.OpOption{clientv3.WithLease(id)}, nil
}
// validateMinimumResourceVersion returns a 'too large resource' version error when the provided minimumResourceVersion is // validateMinimumResourceVersion returns a 'too large resource' version error when the provided minimumResourceVersion is
// greater than the most recent actualRevision available from storage. // greater than the most recent actualRevision available from storage.
func (s *store) validateMinimumResourceVersion(minimumResourceVersion string, actualRevision uint64) error { func (s *store) validateMinimumResourceVersion(minimumResourceVersion string, actualRevision uint64) error {