Merge pull request #125258 from serathius/etcd-kubernetes-interface

Etcd kubernetes interface
This commit is contained in:
Kubernetes Prow Robot 2024-10-24 14:16:52 +01:00 committed by GitHub
commit cadb1508a9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 404 additions and 152 deletions

View File

@ -180,42 +180,42 @@ func TestList(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, false)
ctx, cacher, server, terminate := testSetupWithEtcdServer(t)
t.Cleanup(terminate)
storagetesting.RunTestList(ctx, t, cacher, compactStorage(cacher, server.V3Client), true)
storagetesting.RunTestList(ctx, t, cacher, compactStorage(cacher, server.V3Client.Client), true)
}
func TestListWithConsistentListFromCache(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, true)
ctx, cacher, server, terminate := testSetupWithEtcdServer(t)
t.Cleanup(terminate)
storagetesting.RunTestList(ctx, t, cacher, compactStorage(cacher, server.V3Client), true)
storagetesting.RunTestList(ctx, t, cacher, compactStorage(cacher, server.V3Client.Client), true)
}
func TestConsistentList(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, false)
ctx, cacher, server, terminate := testSetupWithEtcdServer(t)
t.Cleanup(terminate)
storagetesting.RunTestConsistentList(ctx, t, cacher, compactStorage(cacher, server.V3Client), true, false)
storagetesting.RunTestConsistentList(ctx, t, cacher, compactStorage(cacher, server.V3Client.Client), true, false)
}
func TestConsistentListWithConsistentListFromCache(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, true)
ctx, cacher, server, terminate := testSetupWithEtcdServer(t)
t.Cleanup(terminate)
storagetesting.RunTestConsistentList(ctx, t, cacher, compactStorage(cacher, server.V3Client), true, true)
storagetesting.RunTestConsistentList(ctx, t, cacher, compactStorage(cacher, server.V3Client.Client), true, true)
}
func TestGetListNonRecursive(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, false)
ctx, cacher, server, terminate := testSetupWithEtcdServer(t)
t.Cleanup(terminate)
storagetesting.RunTestGetListNonRecursive(ctx, t, compactStorage(cacher, server.V3Client), cacher)
storagetesting.RunTestGetListNonRecursive(ctx, t, compactStorage(cacher, server.V3Client.Client), cacher)
}
func TestGetListNonRecursiveWithConsistentListFromCache(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, true)
ctx, cacher, server, terminate := testSetupWithEtcdServer(t)
t.Cleanup(terminate)
storagetesting.RunTestGetListNonRecursive(ctx, t, compactStorage(cacher, server.V3Client), cacher)
storagetesting.RunTestGetListNonRecursive(ctx, t, compactStorage(cacher, server.V3Client.Client), cacher)
}
func TestGetListRecursivePrefix(t *testing.T) {
@ -301,7 +301,7 @@ func TestWatch(t *testing.T) {
func TestWatchFromZero(t *testing.T) {
ctx, cacher, server, terminate := testSetupWithEtcdServer(t)
t.Cleanup(terminate)
storagetesting.RunTestWatchFromZero(ctx, t, cacher, compactStorage(cacher, server.V3Client))
storagetesting.RunTestWatchFromZero(ctx, t, cacher, compactStorage(cacher, server.V3Client.Client))
}
func TestDeleteTriggerWatch(t *testing.T) {

View File

@ -27,7 +27,7 @@ import (
)
func TestCompact(t *testing.T) {
client := testserver.RunEtcd(t, nil)
client := testserver.RunEtcd(t, nil).Client
ctx := context.Background()
putResp, err := client.Put(ctx, "/somekey", "data")
@ -56,7 +56,7 @@ func TestCompact(t *testing.T) {
// - C1 compacts first. It will succeed.
// - C2 compacts after. It will fail. But it will get latest logical time, which should be larger by one.
func TestCompactConflict(t *testing.T) {
client := testserver.RunEtcd(t, nil)
client := testserver.RunEtcd(t, nil).Client
ctx := context.Background()
putResp, err := client.Put(ctx, "/somekey", "data")

View File

@ -19,14 +19,15 @@ package etcd3
import (
"bytes"
"context"
"errors"
"fmt"
"path"
"reflect"
"strings"
"time"
"go.etcd.io/etcd/api/v3/mvccpb"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/kubernetes"
"go.opentelemetry.io/otel/attribute"
apierrors "k8s.io/apimachinery/pkg/api/errors"
@ -72,7 +73,7 @@ func (d authenticatedDataString) AuthenticatedData() []byte {
var _ value.Context = authenticatedDataString("")
type store struct {
client *clientv3.Client
client *kubernetes.Client
codec runtime.Codec
versioner storage.Versioner
transformer value.Transformer
@ -99,11 +100,11 @@ type objState struct {
}
// New returns an etcd3 implementation of storage.Interface.
func New(c *clientv3.Client, codec runtime.Codec, newFunc, newListFunc func() runtime.Object, prefix, resourcePrefix string, groupResource schema.GroupResource, transformer value.Transformer, leaseManagerConfig LeaseManagerConfig, decoder Decoder, versioner storage.Versioner) storage.Interface {
func New(c *kubernetes.Client, codec runtime.Codec, newFunc, newListFunc func() runtime.Object, prefix, resourcePrefix string, groupResource schema.GroupResource, transformer value.Transformer, leaseManagerConfig LeaseManagerConfig, decoder Decoder, versioner storage.Versioner) storage.Interface {
return newStore(c, codec, newFunc, newListFunc, prefix, resourcePrefix, groupResource, transformer, leaseManagerConfig, decoder, versioner)
}
func newStore(c *clientv3.Client, codec runtime.Codec, newFunc, newListFunc func() runtime.Object, prefix, resourcePrefix string, groupResource schema.GroupResource, transformer value.Transformer, leaseManagerConfig LeaseManagerConfig, decoder Decoder, versioner storage.Versioner) *store {
func newStore(c *kubernetes.Client, codec runtime.Codec, newFunc, newListFunc func() runtime.Object, prefix, resourcePrefix string, groupResource schema.GroupResource, transformer value.Transformer, leaseManagerConfig LeaseManagerConfig, decoder Decoder, versioner storage.Versioner) *store {
// for compatibility with etcd2 impl.
// no-op for default prefix of '/registry'.
// keeps compatibility with etcd2 impl for custom prefixes that don't start with '/'
@ -114,7 +115,7 @@ func newStore(c *clientv3.Client, codec runtime.Codec, newFunc, newListFunc func
}
w := &watcher{
client: c,
client: c.Client,
codec: codec,
newFunc: newFunc,
groupResource: groupResource,
@ -135,7 +136,7 @@ func newStore(c *clientv3.Client, codec runtime.Codec, newFunc, newListFunc func
groupResource: groupResource,
groupResourceString: groupResource.String(),
watcher: w,
leaseManager: newDefaultLeaseManager(c, leaseManagerConfig),
leaseManager: newDefaultLeaseManager(c.Client, leaseManagerConfig),
decoder: decoder,
}
@ -160,29 +161,28 @@ func (s *store) Get(ctx context.Context, key string, opts storage.GetOptions, ou
return err
}
startTime := time.Now()
getResp, err := s.client.KV.Get(ctx, preparedKey)
getResp, err := s.client.Kubernetes.Get(ctx, preparedKey, kubernetes.GetOptions{})
metrics.RecordEtcdRequest("get", s.groupResourceString, err, startTime)
if err != nil {
return err
}
if err = s.validateMinimumResourceVersion(opts.ResourceVersion, uint64(getResp.Header.Revision)); err != nil {
if err = s.validateMinimumResourceVersion(opts.ResourceVersion, uint64(getResp.Revision)); err != nil {
return err
}
if len(getResp.Kvs) == 0 {
if getResp.KV == nil {
if opts.IgnoreNotFound {
return runtime.SetZeroValue(out)
}
return storage.NewKeyNotFoundError(preparedKey, 0)
}
kv := getResp.Kvs[0]
data, _, err := s.transformer.TransformFromStorage(ctx, kv.Value, authenticatedDataString(preparedKey))
data, _, err := s.transformer.TransformFromStorage(ctx, getResp.KV.Value, authenticatedDataString(preparedKey))
if err != nil {
return storage.NewInternalError(err)
}
err = s.decoder.Decode(data, out, kv.ModRevision)
err = s.decoder.Decode(data, out, getResp.KV.ModRevision)
if err != nil {
recordDecodeError(s.groupResourceString, preparedKey)
return err
@ -217,9 +217,12 @@ func (s *store) Create(ctx context.Context, key string, obj, out runtime.Object,
}
span.AddEvent("Encode succeeded", attribute.Int("len", len(data)))
opts, err := s.ttlOpts(ctx, int64(ttl))
if err != nil {
return err
var lease clientv3.LeaseID
if ttl != 0 {
lease, err = s.leaseManager.GetLease(ctx, int64(ttl))
if err != nil {
return err
}
}
newData, err := s.transformer.TransformToStorage(ctx, data, authenticatedDataString(preparedKey))
@ -230,11 +233,7 @@ func (s *store) Create(ctx context.Context, key string, obj, out runtime.Object,
span.AddEvent("TransformToStorage succeeded")
startTime := time.Now()
txnResp, err := s.client.KV.Txn(ctx).If(
notFound(preparedKey),
).Then(
clientv3.OpPut(preparedKey, string(newData), opts...),
).Commit()
txnResp, err := s.client.Kubernetes.OptimisticPut(ctx, preparedKey, newData, 0, kubernetes.PutOptions{LeaseID: lease})
metrics.RecordEtcdRequest("create", s.groupResourceString, err, startTime)
if err != nil {
span.AddEvent("Txn call failed", attribute.String("err", err.Error()))
@ -247,8 +246,7 @@ func (s *store) Create(ctx context.Context, key string, obj, out runtime.Object,
}
if out != nil {
putResp := txnResp.Responses[0].GetResponsePut()
err = s.decoder.Decode(data, out, putResp.Header.Revision)
err = s.decoder.Decode(data, out, txnResp.Revision)
if err != nil {
span.AddEvent("decode failed", attribute.Int("len", len(data)), attribute.String("err", err.Error()))
recordDecodeError(s.groupResourceString, preparedKey)
@ -349,21 +347,16 @@ func (s *store) conditionalDelete(
}
startTime := time.Now()
txnResp, err := s.client.KV.Txn(ctx).If(
clientv3.Compare(clientv3.ModRevision(key), "=", origState.rev),
).Then(
clientv3.OpDelete(key),
).Else(
clientv3.OpGet(key),
).Commit()
txnResp, err := s.client.Kubernetes.OptimisticDelete(ctx, key, origState.rev, kubernetes.DeleteOptions{
GetOnFailure: true,
})
metrics.RecordEtcdRequest("delete", s.groupResourceString, err, startTime)
if err != nil {
return err
}
if !txnResp.Succeeded {
getResp := (*clientv3.GetResponse)(txnResp.Responses[0].GetResponseRange())
klog.V(4).Infof("deletion of %s failed because of a conflict, going to retry", key)
origState, err = s.getState(ctx, getResp, key, v, false, skipTransformDecode)
origState, err = s.getState(ctx, txnResp.KV, key, v, false, skipTransformDecode)
if err != nil {
return err
}
@ -371,15 +364,8 @@ func (s *store) conditionalDelete(
continue
}
if len(txnResp.Responses) == 0 || txnResp.Responses[0].GetResponseDeleteRange() == nil {
return errors.New(fmt.Sprintf("invalid DeleteRange response: %v", txnResp.Responses))
}
deleteResp := txnResp.Responses[0].GetResponseDeleteRange()
if deleteResp.Header == nil {
return errors.New("invalid DeleteRange response - nil header")
}
if !skipTransformDecode {
err = s.decoder.Decode(origState.data, out, deleteResp.Header.Revision)
err = s.decoder.Decode(origState.data, out, txnResp.Revision)
if err != nil {
recordDecodeError(s.groupResourceString, key)
return err
@ -512,20 +498,21 @@ func (s *store) GuaranteedUpdate(
}
span.AddEvent("TransformToStorage succeeded")
opts, err := s.ttlOpts(ctx, int64(ttl))
if err != nil {
return err
var lease clientv3.LeaseID
if ttl != 0 {
lease, err = s.leaseManager.GetLease(ctx, int64(ttl))
if err != nil {
return err
}
}
span.AddEvent("Transaction prepared")
startTime := time.Now()
txnResp, err := s.client.KV.Txn(ctx).If(
clientv3.Compare(clientv3.ModRevision(preparedKey), "=", origState.rev),
).Then(
clientv3.OpPut(preparedKey, string(newData), opts...),
).Else(
clientv3.OpGet(preparedKey),
).Commit()
txnResp, err := s.client.Kubernetes.OptimisticPut(ctx, preparedKey, newData, origState.rev, kubernetes.PutOptions{
GetOnFailure: true,
LeaseID: lease,
})
metrics.RecordEtcdRequest("update", s.groupResourceString, err, startTime)
if err != nil {
span.AddEvent("Txn call failed", attribute.String("err", err.Error()))
@ -534,10 +521,8 @@ func (s *store) GuaranteedUpdate(
span.AddEvent("Txn call completed")
span.AddEvent("Transaction committed")
if !txnResp.Succeeded {
getResp := (*clientv3.GetResponse)(txnResp.Responses[0].GetResponseRange())
klog.V(4).Infof("GuaranteedUpdate of %s failed because of a conflict, going to retry", preparedKey)
skipTransformDecode := false
origState, err = s.getState(ctx, getResp, preparedKey, v, ignoreNotFound, skipTransformDecode)
origState, err = s.getState(ctx, txnResp.KV, preparedKey, v, ignoreNotFound, skipTransformDecode)
if err != nil {
return err
}
@ -545,9 +530,8 @@ func (s *store) GuaranteedUpdate(
origStateIsCurrent = true
continue
}
putResp := txnResp.Responses[0].GetResponsePut()
err = s.decoder.Decode(data, destination, putResp.Header.Revision)
err = s.decoder.Decode(data, destination, txnResp.Revision)
if err != nil {
span.AddEvent("decode failed", attribute.Int("len", len(data)), attribute.String("err", err.Error()))
recordDecodeError(s.groupResourceString, preparedKey)
@ -589,12 +573,12 @@ func (s *store) Count(key string) (int64, error) {
}
startTime := time.Now()
getResp, err := s.client.KV.Get(context.Background(), preparedKey, clientv3.WithRange(clientv3.GetPrefixRangeEnd(preparedKey)), clientv3.WithCountOnly())
count, err := s.client.Kubernetes.Count(context.Background(), preparedKey, kubernetes.CountOptions{})
metrics.RecordEtcdRequest("listWithCount", preparedKey, err, startTime)
if err != nil {
return 0, err
}
return getResp.Count, nil
return count, nil
}
// ReadinessCheck implements storage.Interface.
@ -645,7 +629,7 @@ func (s *store) resolveGetListRev(continueKey string, continueRV int64, opts sto
// GetList implements storage.Interface.
func (s *store) GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
preparedKey, err := s.prepareKey(key)
keyPrefix, err := s.prepareKey(key)
if err != nil {
return err
}
@ -670,27 +654,13 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption
// get children "directories". e.g. if we have key "/a", "/a/b", "/ab", getting keys
// with prefix "/a" will return all three, while with prefix "/a/" will return only
// "/a/b" which is the correct answer.
if opts.Recursive && !strings.HasSuffix(preparedKey, "/") {
preparedKey += "/"
if opts.Recursive && !strings.HasSuffix(keyPrefix, "/") {
keyPrefix += "/"
}
keyPrefix := preparedKey
// set the appropriate clientv3 options to filter the returned data set
var limitOption *clientv3.OpOption
limit := opts.Predicate.Limit
var paging bool
options := make([]clientv3.OpOption, 0, 4)
if opts.Predicate.Limit > 0 {
paging = true
options = append(options, clientv3.WithLimit(limit))
limitOption = &options[len(options)-1]
}
if opts.Recursive {
rangeEnd := clientv3.GetPrefixRangeEnd(keyPrefix)
options = append(options, clientv3.WithRange(rangeEnd))
}
paging := opts.Predicate.Limit > 0
newItemFunc := getNewItemFunc(listObj, v)
var continueRV, withRev int64
@ -700,20 +670,15 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption
if err != nil {
return apierrors.NewBadRequest(fmt.Sprintf("invalid continue token: %v", err))
}
preparedKey = continueKey
}
if withRev, err = s.resolveGetListRev(continueKey, continueRV, opts); err != nil {
return err
}
if withRev != 0 {
options = append(options, clientv3.WithRev(withRev))
}
// loop until we have filled the requested limit from etcd or there are no more results
var lastKey []byte
var hasMore bool
var getResp *clientv3.GetResponse
var getResp kubernetes.ListResponse
var numFetched int
var numEvald int
// Because these metrics are for understanding the costs of handling LIST requests,
@ -730,24 +695,27 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption
for {
startTime := time.Now()
getResp, err = s.client.KV.Get(ctx, preparedKey, options...)
getResp, err = s.getList(ctx, keyPrefix, opts.Recursive, kubernetes.ListOptions{
Revision: withRev,
Limit: limit,
Continue: continueKey,
})
metrics.RecordEtcdRequest(metricsOp, s.groupResourceString, err, startTime)
if err != nil {
return interpretListError(err, len(opts.Predicate.Continue) > 0, continueKey, keyPrefix)
}
numFetched += len(getResp.Kvs)
if err = s.validateMinimumResourceVersion(opts.ResourceVersion, uint64(getResp.Header.Revision)); err != nil {
if err = s.validateMinimumResourceVersion(opts.ResourceVersion, uint64(getResp.Revision)); err != nil {
return err
}
hasMore = getResp.More
hasMore = int64(len(getResp.Kvs)) < getResp.Count
if len(getResp.Kvs) == 0 && getResp.More {
if len(getResp.Kvs) == 0 && hasMore {
return fmt.Errorf("no results were found, but etcd indicated there were more values remaining")
}
// indicate to the client which resource version was returned, and use the same resource version for subsequent requests.
if withRev == 0 {
withRev = getResp.Header.Revision
options = append(options, clientv3.WithRev(withRev))
withRev = getResp.Revision
}
// avoid small allocations for the result slice, since this can be called in many
@ -795,6 +763,7 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption
// free kv early. Long lists can take O(seconds) to decode.
getResp.Kvs[i] = nil
}
continueKey = string(lastKey) + "\x00"
// no more results remain or we didn't request paging
if !hasMore || !paging {
@ -812,9 +781,7 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption
if limit > maxLimit {
limit = maxLimit
}
*limitOption = clientv3.WithLimit(limit)
}
preparedKey = string(lastKey) + "\x00"
}
if v.IsNil() {
@ -829,6 +796,26 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption
return s.versioner.UpdateList(listObj, uint64(withRev), continueValue, remainingItemCount)
}
func (s *store) getList(ctx context.Context, keyPrefix string, recursive bool, options kubernetes.ListOptions) (kubernetes.ListResponse, error) {
if recursive {
return s.client.Kubernetes.List(ctx, keyPrefix, options)
}
getResp, err := s.client.Kubernetes.Get(ctx, keyPrefix, kubernetes.GetOptions{
Revision: options.Revision,
})
var resp kubernetes.ListResponse
if getResp.KV != nil {
resp.Kvs = []*mvccpb.KeyValue{getResp.KV}
resp.Count = 1
resp.Revision = getResp.Revision
} else {
resp.Kvs = []*mvccpb.KeyValue{}
resp.Count = 0
resp.Revision = getResp.Revision
}
return resp, err
}
// growSlice takes a slice value and grows its capacity up
// to the maximum of the passed sizes or maxCapacity, whichever
// is smaller. Above maxCapacity decisions about allocation are left
@ -887,12 +874,12 @@ func (s *store) watchContext(ctx context.Context) context.Context {
func (s *store) getCurrentState(ctx context.Context, key string, v reflect.Value, ignoreNotFound bool, skipTransformDecode bool) func() (*objState, error) {
return func() (*objState, error) {
startTime := time.Now()
getResp, err := s.client.KV.Get(ctx, key)
getResp, err := s.client.Kubernetes.Get(ctx, key, kubernetes.GetOptions{})
metrics.RecordEtcdRequest("get", s.groupResourceString, err, startTime)
if err != nil {
return nil, err
}
return s.getState(ctx, getResp, key, v, ignoreNotFound, skipTransformDecode)
return s.getState(ctx, getResp.KV, key, v, ignoreNotFound, skipTransformDecode)
}
}
@ -902,7 +889,7 @@ func (s *store) getCurrentState(ctx context.Context, key string, v reflect.Value
// storage will be transformed and decoded.
// NOTE: when skipTransformDecode is true, the 'data', and the 'obj' fields
// of the objState will be nil, and 'stale' will be set to true.
func (s *store) getState(ctx context.Context, getResp *clientv3.GetResponse, key string, v reflect.Value, ignoreNotFound bool, skipTransformDecode bool) (*objState, error) {
func (s *store) getState(ctx context.Context, kv *mvccpb.KeyValue, key string, v reflect.Value, ignoreNotFound bool, skipTransformDecode bool) (*objState, error) {
state := &objState{
meta: &storage.ResponseMeta{},
}
@ -913,7 +900,7 @@ func (s *store) getState(ctx context.Context, getResp *clientv3.GetResponse, key
state.obj = reflect.New(v.Type()).Interface().(runtime.Object)
}
if len(getResp.Kvs) == 0 {
if kv == nil {
if !ignoreNotFound {
return nil, storage.NewKeyNotFoundError(key, 0)
}
@ -921,7 +908,7 @@ func (s *store) getState(ctx context.Context, getResp *clientv3.GetResponse, key
return nil, err
}
} else {
state.rev = getResp.Kvs[0].ModRevision
state.rev = kv.ModRevision
state.meta.ResourceVersion = uint64(state.rev)
if skipTransformDecode {
@ -931,7 +918,7 @@ func (s *store) getState(ctx context.Context, getResp *clientv3.GetResponse, key
return state, nil
}
data, stale, err := s.transformer.TransformFromStorage(ctx, getResp.Kvs[0].Value, authenticatedDataString(key))
data, stale, err := s.transformer.TransformFromStorage(ctx, kv.Value, authenticatedDataString(key))
if err != nil {
return nil, storage.NewInternalError(err)
}
@ -991,19 +978,6 @@ func (s *store) updateState(st *objState, userUpdate storage.UpdateFunc) (runtim
return ret, ttl, nil
}
// ttlOpts returns client options based on given ttl.
// ttl: if ttl is non-zero, it will attach the key to a lease with ttl of roughly the same length
func (s *store) ttlOpts(ctx context.Context, ttl int64) ([]clientv3.OpOption, error) {
if ttl == 0 {
return nil, nil
}
id, err := s.leaseManager.GetLease(ctx, ttl)
if err != nil {
return nil, err
}
return []clientv3.OpOption{clientv3.WithLease(id)}, nil
}
// validateMinimumResourceVersion returns a 'too large resource' version error when the provided minimumResourceVersion is
// greater than the most recent actualRevision available from storage.
func (s *store) validateMinimumResourceVersion(minimumResourceVersion string, actualRevision uint64) error {
@ -1052,10 +1026,6 @@ func recordDecodeError(resource string, key string) {
klog.V(4).Infof("Decoding %s \"%s\" failed", resource, key)
}
func notFound(key string) clientv3.Cmp {
return clientv3.Compare(clientv3.ModRevision(key), "=", 0)
}
// getTypeName returns type name of an object for reporting purposes.
func getTypeName(obj interface{}) string {
return reflect.TypeOf(obj).String()

View File

@ -29,6 +29,7 @@ import (
"github.com/go-logr/logr"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/kubernetes"
"go.etcd.io/etcd/server/v3/embed"
"google.golang.org/grpc/grpclog"
@ -95,7 +96,7 @@ func checkStorageInvariants(etcdClient *clientv3.Client, codec runtime.Codec) st
func TestCreate(t *testing.T) {
ctx, store, etcdClient := testSetup(t)
storagetesting.RunTestCreate(ctx, t, store, checkStorageInvariants(etcdClient, store.codec))
storagetesting.RunTestCreate(ctx, t, store, checkStorageInvariants(etcdClient.Client, store.codec))
}
func TestCreateWithTTL(t *testing.T) {
@ -170,7 +171,7 @@ func TestListPaging(t *testing.T) {
func TestGetListNonRecursive(t *testing.T) {
ctx, store, client := testSetup(t)
storagetesting.RunTestGetListNonRecursive(ctx, t, compactStorage(client), store)
storagetesting.RunTestGetListNonRecursive(ctx, t, compactStorage(client.Client), store)
}
func TestGetListRecursivePrefix(t *testing.T) {
@ -194,8 +195,8 @@ func (s *storeWithPrefixTransformer) UpdatePrefixTransformer(modifier storagetes
}
func TestGuaranteedUpdate(t *testing.T) {
ctx, store, etcdClient := testSetup(t)
storagetesting.RunTestGuaranteedUpdate(ctx, t, &storeWithPrefixTransformer{store}, checkStorageInvariants(etcdClient, store.codec))
ctx, store, client := testSetup(t)
storagetesting.RunTestGuaranteedUpdate(ctx, t, &storeWithPrefixTransformer{store}, checkStorageInvariants(client.Client, store.codec))
}
func TestGuaranteedUpdateWithTTL(t *testing.T) {
@ -225,12 +226,12 @@ func TestTransformationFailure(t *testing.T) {
func TestList(t *testing.T) {
ctx, store, client := testSetup(t)
storagetesting.RunTestList(ctx, t, store, compactStorage(client), false)
storagetesting.RunTestList(ctx, t, store, compactStorage(client.Client), false)
}
func TestConsistentList(t *testing.T) {
ctx, store, client := testSetup(t)
storagetesting.RunTestConsistentList(ctx, t, store, compactStorage(client), false, true)
storagetesting.RunTestConsistentList(ctx, t, store, compactStorage(client.Client), false, true)
}
func checkStorageCallsInvariants(transformer *storagetesting.PrefixTransformer, recorder *clientRecorder) storagetesting.CallsValidation {
@ -258,29 +259,29 @@ func checkStorageCallsInvariants(transformer *storagetesting.PrefixTransformer,
}
}
if reads := recorder.GetReadsAndReset(); reads != estimatedGetCalls {
t.Errorf("unexpected reads: %d", reads)
t.Fatalf("unexpected reads: %d, want: %d", reads, estimatedGetCalls)
}
}
}
func TestListContinuation(t *testing.T) {
ctx, store, etcdClient := testSetup(t, withRecorder())
ctx, store, client := testSetup(t, withRecorder())
validation := checkStorageCallsInvariants(
store.transformer.(*storagetesting.PrefixTransformer), etcdClient.KV.(*clientRecorder))
store.transformer.(*storagetesting.PrefixTransformer), client.KV.(*clientRecorder))
storagetesting.RunTestListContinuation(ctx, t, store, validation)
}
func TestListPaginationRareObject(t *testing.T) {
ctx, store, etcdClient := testSetup(t, withRecorder())
ctx, store, client := testSetup(t, withRecorder())
validation := checkStorageCallsInvariants(
store.transformer.(*storagetesting.PrefixTransformer), etcdClient.KV.(*clientRecorder))
store.transformer.(*storagetesting.PrefixTransformer), client.KV.(*clientRecorder))
storagetesting.RunTestListPaginationRareObject(ctx, t, store, validation)
}
func TestListContinuationWithFilter(t *testing.T) {
ctx, store, etcdClient := testSetup(t, withRecorder())
ctx, store, client := testSetup(t, withRecorder())
validation := checkStorageCallsInvariants(
store.transformer.(*storagetesting.PrefixTransformer), etcdClient.KV.(*clientRecorder))
store.transformer.(*storagetesting.PrefixTransformer), client.KV.(*clientRecorder))
storagetesting.RunTestListContinuationWithFilter(ctx, t, store, validation)
}
@ -299,7 +300,7 @@ func compactStorage(etcdClient *clientv3.Client) storagetesting.Compaction {
func TestListInconsistentContinuation(t *testing.T) {
ctx, store, client := testSetup(t)
storagetesting.RunTestListInconsistentContinuation(ctx, t, store, compactStorage(client))
storagetesting.RunTestListInconsistentContinuation(ctx, t, store, compactStorage(client.Client))
}
func TestListResourceVersionMatch(t *testing.T) {
@ -499,7 +500,7 @@ func (r *clientRecorder) GetReadsAndReset() uint64 {
}
type setupOptions struct {
client func(testing.TB) *clientv3.Client
client func(testing.TB) *kubernetes.Client
codec runtime.Codec
newFunc func() runtime.Object
newListFunc func() runtime.Object
@ -516,7 +517,7 @@ type setupOption func(*setupOptions)
func withClientConfig(config *embed.Config) setupOption {
return func(options *setupOptions) {
options.client = func(t testing.TB) *clientv3.Client {
options.client = func(t testing.TB) *kubernetes.Client {
return testserver.RunEtcd(t, config)
}
}
@ -541,7 +542,7 @@ func withRecorder() setupOption {
}
func withDefaults(options *setupOptions) {
options.client = func(t testing.TB) *clientv3.Client {
options.client = func(t testing.TB) *kubernetes.Client {
return testserver.RunEtcd(t, nil)
}
options.codec = apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
@ -556,7 +557,7 @@ func withDefaults(options *setupOptions) {
var _ setupOption = withDefaults
func testSetup(t testing.TB, opts ...setupOption) (context.Context, *store, *clientv3.Client) {
func testSetup(t testing.TB, opts ...setupOption) (context.Context, *store, *kubernetes.Client) {
setupOpts := setupOptions{}
opts = append([]setupOption{withDefaults}, opts...)
for _, opt := range opts {

View File

@ -19,7 +19,7 @@ package testing
import (
"testing"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/kubernetes"
"k8s.io/apiserver/pkg/storage/etcd3/testserver"
"k8s.io/apiserver/pkg/storage/storagebackend"
@ -27,7 +27,7 @@ import (
// EtcdTestServer encapsulates the datastructures needed to start local instance for testing
type EtcdTestServer struct {
V3Client *clientv3.Client
V3Client *kubernetes.Client
}
func (e *EtcdTestServer) Terminate(t testing.TB) {

View File

@ -26,6 +26,7 @@ import (
"time"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/kubernetes"
"go.etcd.io/etcd/server/v3/embed"
"go.uber.org/zap/zapcore"
"go.uber.org/zap/zaptest"
@ -81,7 +82,7 @@ func NewTestConfig(t testing.TB) *embed.Config {
// RunEtcd starts an embedded etcd server with the provided config
// (or NewTestConfig(t) if nil), and returns a client connected to the server.
// The server is terminated when the test ends.
func RunEtcd(t testing.TB, cfg *embed.Config) *clientv3.Client {
func RunEtcd(t testing.TB, cfg *embed.Config) *kubernetes.Client {
t.Helper()
if cfg == nil {
@ -112,7 +113,7 @@ func RunEtcd(t testing.TB, cfg *embed.Config) *clientv3.Client {
t.Fatal(err)
}
client, err := clientv3.New(clientv3.Config{
client, err := kubernetes.New(clientv3.Config{
TLS: tlsConfig,
Endpoints: e.Server.Cluster().ClientURLs(),
DialTimeout: 10 * time.Second,

View File

@ -64,7 +64,7 @@ func TestDeleteTriggerWatch(t *testing.T) {
func TestWatchFromZero(t *testing.T) {
ctx, store, client := testSetup(t)
storagetesting.RunTestWatchFromZero(ctx, t, store, compactStorage(client))
storagetesting.RunTestWatchFromZero(ctx, t, store, compactStorage(client.Client))
}
// TestWatchFromNonZero tests that

View File

@ -33,6 +33,7 @@ import (
"go.etcd.io/etcd/client/pkg/v3/logutil"
"go.etcd.io/etcd/client/pkg/v3/transport"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/kubernetes"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
@ -228,7 +229,7 @@ func newETCD3ProberMonitor(c storagebackend.Config) (*etcd3ProberMonitor, error)
return nil, err
}
return &etcd3ProberMonitor{
client: client,
client: client.Client,
prefix: c.Prefix,
endpoints: c.Transport.ServerList,
}, nil
@ -282,7 +283,7 @@ func (t *etcd3ProberMonitor) Monitor(ctx context.Context) (metrics.StorageMetric
}, nil
}
var newETCD3Client = func(c storagebackend.TransportConfig) (*clientv3.Client, error) {
var newETCD3Client = func(c storagebackend.TransportConfig) (*kubernetes.Client, error) {
tlsInfo := transport.TLSInfo{
CertFile: c.CertFile,
KeyFile: c.KeyFile,
@ -352,7 +353,7 @@ var newETCD3Client = func(c storagebackend.TransportConfig) (*clientv3.Client, e
Logger: etcd3ClientLogger,
}
return clientv3.New(cfg)
return kubernetes.New(cfg)
}
type runningCompactor struct {
@ -384,10 +385,11 @@ func startCompactorOnce(c storagebackend.TransportConfig, interval time.Duration
}
key := fmt.Sprintf("%v", c) // gives: {[server1 server2] keyFile certFile caFile}
if compactor, foundBefore := compactors[key]; !foundBefore || compactor.interval > interval {
compactorClient, err := newETCD3Client(c)
client, err := newETCD3Client(c)
if err != nil {
return nil, err
}
compactorClient := client.Client
if foundBefore {
// replace compactor
@ -439,7 +441,7 @@ func newETCD3Storage(c storagebackend.ConfigForResource, newFunc, newListFunc fu
// decorate the KV instance so we can track etcd latency per request.
client.KV = etcd3.NewETCDLatencyTracker(client.KV)
stopDBSizeMonitor, err := startDBSizeMonitorPerEndpoint(client, c.DBMetricPollInterval)
stopDBSizeMonitor, err := startDBSizeMonitorPerEndpoint(client.Client, c.DBMetricPollInterval)
if err != nil {
return nil, nil, err
}

View File

@ -27,6 +27,7 @@ import (
"time"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/kubernetes"
"k8s.io/apiserver/pkg/storage/etcd3/testserver"
"k8s.io/apiserver/pkg/storage/storagebackend"
)
@ -111,7 +112,7 @@ func TestCreateHealthcheck(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
ready := make(chan struct{})
tc.cfg.Transport.ServerList = client.Endpoints()
newETCD3Client = func(c storagebackend.TransportConfig) (*clientv3.Client, error) {
newETCD3Client = func(c storagebackend.TransportConfig) (*kubernetes.Client, error) {
defer close(ready)
dummyKV := mockKV{
get: func(ctx context.Context) (*clientv3.GetResponse, error) {
@ -211,7 +212,7 @@ func TestCreateReadycheck(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
ready := make(chan struct{})
tc.cfg.Transport.ServerList = client.Endpoints()
newETCD3Client = func(c storagebackend.TransportConfig) (*clientv3.Client, error) {
newETCD3Client = func(c storagebackend.TransportConfig) (*kubernetes.Client, error) {
defer close(ready)
dummyKV := mockKV{
get: func(ctx context.Context) (*clientv3.GetResponse, error) {
@ -277,7 +278,7 @@ func TestRateLimitHealthcheck(t *testing.T) {
ready := make(chan struct{})
var counter uint64
newETCD3Client = func(c storagebackend.TransportConfig) (*clientv3.Client, error) {
newETCD3Client = func(c storagebackend.TransportConfig) (*kubernetes.Client, error) {
defer close(ready)
dummyKV := mockKV{
get: func(ctx context.Context) (*clientv3.GetResponse, error) {
@ -373,7 +374,7 @@ func TestTimeTravelHealthcheck(t *testing.T) {
signal := make(chan struct{})
var counter uint64
newETCD3Client = func(c storagebackend.TransportConfig) (*clientv3.Client, error) {
newETCD3Client = func(c storagebackend.TransportConfig) (*kubernetes.Client, error) {
defer close(ready)
dummyKV := mockKV{
get: func(ctx context.Context) (*clientv3.GetResponse, error) {

136
vendor/go.etcd.io/etcd/client/v3/kubernetes/client.go generated vendored Normal file
View File

@ -0,0 +1,136 @@
// Copyright 2024 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package kubernetes
import (
"context"
"fmt"
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/api/v3/mvccpb"
clientv3 "go.etcd.io/etcd/client/v3"
)
// New creates Client from config.
// Caller is responsible to call Close() to clean up client.
func New(cfg clientv3.Config) (*Client, error) {
c, err := clientv3.New(cfg)
if err != nil {
return nil, err
}
kc := &Client{
Client: c,
}
kc.Kubernetes = kc
return kc, nil
}
type Client struct {
*clientv3.Client
Kubernetes Interface
}
var _ Interface = (*Client)(nil)
func (k Client) Get(ctx context.Context, key string, opts GetOptions) (resp GetResponse, err error) {
rangeResp, err := k.KV.Get(ctx, key, clientv3.WithRev(opts.Revision), clientv3.WithLimit(1))
if err != nil {
return resp, err
}
resp.Revision = rangeResp.Header.Revision
if len(rangeResp.Kvs) == 1 {
resp.KV = rangeResp.Kvs[0]
}
return resp, nil
}
func (k Client) List(ctx context.Context, prefix string, opts ListOptions) (resp ListResponse, err error) {
rangeStart := prefix
if opts.Continue != "" {
rangeStart = opts.Continue
}
rangeEnd := clientv3.GetPrefixRangeEnd(prefix)
rangeResp, err := k.KV.Get(ctx, rangeStart, clientv3.WithRange(rangeEnd), clientv3.WithLimit(opts.Limit), clientv3.WithRev(opts.Revision))
if err != nil {
return resp, err
}
resp.Kvs = rangeResp.Kvs
resp.Count = rangeResp.Count
resp.Revision = rangeResp.Header.Revision
return resp, nil
}
func (k Client) Count(ctx context.Context, prefix string, _ CountOptions) (int64, error) {
resp, err := k.KV.Get(ctx, prefix, clientv3.WithPrefix(), clientv3.WithCountOnly())
if err != nil {
return 0, err
}
return resp.Count, nil
}
func (k Client) OptimisticPut(ctx context.Context, key string, value []byte, expectedRevision int64, opts PutOptions) (resp PutResponse, err error) {
txn := k.KV.Txn(ctx).If(
clientv3.Compare(clientv3.ModRevision(key), "=", expectedRevision),
).Then(
clientv3.OpPut(key, string(value), clientv3.WithLease(opts.LeaseID)),
)
if opts.GetOnFailure {
txn = txn.Else(clientv3.OpGet(key))
}
txnResp, err := txn.Commit()
if err != nil {
return resp, err
}
resp.Succeeded = txnResp.Succeeded
resp.Revision = txnResp.Header.Revision
if opts.GetOnFailure && !txnResp.Succeeded {
if len(txnResp.Responses) == 0 {
return resp, fmt.Errorf("invalid OptimisticPut response: %v", txnResp.Responses)
}
resp.KV = kvFromTxnResponse(txnResp.Responses[0])
}
return resp, nil
}
func (k Client) OptimisticDelete(ctx context.Context, key string, expectedRevision int64, opts DeleteOptions) (resp DeleteResponse, err error) {
txn := k.KV.Txn(ctx).If(
clientv3.Compare(clientv3.ModRevision(key), "=", expectedRevision),
).Then(
clientv3.OpDelete(key),
)
if opts.GetOnFailure {
txn = txn.Else(clientv3.OpGet(key))
}
txnResp, err := txn.Commit()
if err != nil {
return resp, err
}
resp.Succeeded = txnResp.Succeeded
resp.Revision = txnResp.Header.Revision
if opts.GetOnFailure && !txnResp.Succeeded {
resp.KV = kvFromTxnResponse(txnResp.Responses[0])
}
return resp, nil
}
func kvFromTxnResponse(resp *pb.ResponseOp) *mvccpb.KeyValue {
getResponse := resp.GetResponseRange()
if len(getResponse.Kvs) == 1 {
return getResponse.Kvs[0]
}
return nil
}

View File

@ -0,0 +1,140 @@
// Copyright 2024 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package kubernetes
import (
"context"
"go.etcd.io/etcd/api/v3/mvccpb"
clientv3 "go.etcd.io/etcd/client/v3"
)
// Interface defines the minimal client-side interface that Kubernetes requires
// to interact with etcd. Methods below are standard etcd operations with
// semantics adjusted to better suit Kubernetes' needs.
type Interface interface {
// Get retrieves a single key-value pair from etcd.
//
// If opts.Revision is set to a non-zero value, the key-value pair is retrieved at the specified revision.
// If the required revision has been compacted, the request will fail with ErrCompacted.
Get(ctx context.Context, key string, opts GetOptions) (GetResponse, error)
// List retrieves key-value pairs with the specified prefix, ordered lexicographically by key.
//
// If opts.Revision is non-zero, the key-value pairs are retrieved at the specified revision.
// If the required revision has been compacted, the request will fail with ErrCompacted.
// If opts.Limit is greater than zero, the number of returned key-value pairs is bounded by the limit.
// If opts.Continue is not empty, the listing will start from the key immediately after the one specified by Continue.
// The Continue value should be the last key returned in a previous paginated ListResponse.
List(ctx context.Context, prefix string, opts ListOptions) (ListResponse, error)
// Count returns the number of keys with the specified prefix.
//
// Currently, there are no options for the Count operation. However, a placeholder options struct (CountOptions)
// is provided for future extensibility in case options become necessary.
Count(ctx context.Context, prefix string, opts CountOptions) (int64, error)
// OptimisticPut creates or updates a key-value pair if the key has not been modified or created
// since the revision specified in expectedRevision.
//
// An OptimisticPut fails if the key has been modified since expectedRevision.
OptimisticPut(ctx context.Context, key string, value []byte, expectedRevision int64, opts PutOptions) (PutResponse, error)
// OptimisticDelete deletes the key-value pair if it hasn't been modified since the revision
// specified in expectedRevision.
//
// An OptimisticDelete fails if the key has been modified since expectedRevision.
OptimisticDelete(ctx context.Context, key string, expectedRevision int64, opts DeleteOptions) (DeleteResponse, error)
}
type GetOptions struct {
// Revision is the point-in-time of the etcd key-value store to use for the Get operation.
// If Revision is 0, it gets the latest value.
Revision int64
}
type ListOptions struct {
// Revision is the point-in-time of the etcd key-value store to use for the List operation.
// If Revision is 0, it gets the latest values.
Revision int64
// Limit is the maximum number of keys to return for a List operation.
// 0 means no limitation.
Limit int64
// Continue is a key from which to resume the List operation, excluding the given key.
// It should be set to the last key from a previous ListResponse when paginating.
Continue string
}
// CountOptions is a placeholder for potential future options for the Count operation.
type CountOptions struct{}
type PutOptions struct {
// GetOnFailure specifies whether to return the modified key-value pair if the Put operation fails due to a revision mismatch.
GetOnFailure bool
// LeaseID is the ID of a lease to associate with the key allowing for automatic deletion after lease expires after it's TTL (time to live).
// Deprecated: Should be replaced with TTL when Interface starts using one lease per object.
LeaseID clientv3.LeaseID
}
type DeleteOptions struct {
// GetOnFailure specifies whether to return the modified key-value pair if the Delete operation fails due to a revision mismatch.
GetOnFailure bool
}
type GetResponse struct {
// KV is the key-value pair retrieved from etcd.
KV *mvccpb.KeyValue
// Revision is the revision of the key-value store at the time of the Get operation.
Revision int64
}
type ListResponse struct {
// Kvs is the list of key-value pairs retrieved from etcd, ordered lexicographically by key.
Kvs []*mvccpb.KeyValue
// Count is the total number of keys with the specified prefix, even if not all were returned due to a limit.
Count int64
// Revision is the revision of the key-value store at the time of the List operation.
Revision int64
}
type PutResponse struct {
// KV is the created or updated key-value pair. If the Put operation failed and GetOnFailure was true, this
// will be the modified key-value pair that caused the failure.
KV *mvccpb.KeyValue
// Succeeded indicates whether the Put operation was successful.
Succeeded bool
// Revision is the revision of the key-value store after the Put operation.
Revision int64
}
type DeleteResponse struct {
// KV is the deleted key-value pair. If the Delete operation failed and GetOnFailure was true, this
// will be the modified key-value pair that caused the failure.
KV *mvccpb.KeyValue
// Succeeded indicates whether the Delete operation was successful.
Succeeded bool
// Revision is the revision of the key-value store after the Delete operation.
Revision int64
}

1
vendor/modules.txt vendored
View File

@ -645,6 +645,7 @@ go.etcd.io/etcd/client/v3/concurrency
go.etcd.io/etcd/client/v3/credentials
go.etcd.io/etcd/client/v3/internal/endpoint
go.etcd.io/etcd/client/v3/internal/resolver
go.etcd.io/etcd/client/v3/kubernetes
# go.etcd.io/etcd/pkg/v3 v3.5.16
## explicit; go 1.22
go.etcd.io/etcd/pkg/v3/adt