From 249ad2a6137cc8f1e0ccb7f0aef9ff4ba38927b9 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Fri, 31 May 2024 10:27:51 +0200 Subject: [PATCH 1/7] Add etcd kubernetes interface package to vendor --- .../apiserver/pkg/storage/etcd3/store.go | 1 + .../etcd/client/v3/kubernetes/client.go | 136 +++++++++++++++++ .../etcd/client/v3/kubernetes/interface.go | 140 ++++++++++++++++++ vendor/modules.txt | 1 + 4 files changed, 278 insertions(+) create mode 100644 vendor/go.etcd.io/etcd/client/v3/kubernetes/client.go create mode 100644 vendor/go.etcd.io/etcd/client/v3/kubernetes/interface.go diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go index 37e454e90e9..d437ee3d232 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go @@ -27,6 +27,7 @@ import ( "time" clientv3 "go.etcd.io/etcd/client/v3" + _ "go.etcd.io/etcd/client/v3/kubernetes" "go.opentelemetry.io/otel/attribute" apierrors "k8s.io/apimachinery/pkg/api/errors" diff --git a/vendor/go.etcd.io/etcd/client/v3/kubernetes/client.go b/vendor/go.etcd.io/etcd/client/v3/kubernetes/client.go new file mode 100644 index 00000000000..11f2a456447 --- /dev/null +++ b/vendor/go.etcd.io/etcd/client/v3/kubernetes/client.go @@ -0,0 +1,136 @@ +// Copyright 2024 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kubernetes + +import ( + "context" + "fmt" + + pb "go.etcd.io/etcd/api/v3/etcdserverpb" + "go.etcd.io/etcd/api/v3/mvccpb" + clientv3 "go.etcd.io/etcd/client/v3" +) + +// New creates Client from config. +// Caller is responsible to call Close() to clean up client. +func New(cfg clientv3.Config) (*Client, error) { + c, err := clientv3.New(cfg) + if err != nil { + return nil, err + } + kc := &Client{ + Client: c, + } + kc.Kubernetes = kc + return kc, nil +} + +type Client struct { + *clientv3.Client + Kubernetes Interface +} + +var _ Interface = (*Client)(nil) + +func (k Client) Get(ctx context.Context, key string, opts GetOptions) (resp GetResponse, err error) { + rangeResp, err := k.KV.Get(ctx, key, clientv3.WithRev(opts.Revision), clientv3.WithLimit(1)) + if err != nil { + return resp, err + } + resp.Revision = rangeResp.Header.Revision + if len(rangeResp.Kvs) == 1 { + resp.KV = rangeResp.Kvs[0] + } + return resp, nil +} + +func (k Client) List(ctx context.Context, prefix string, opts ListOptions) (resp ListResponse, err error) { + rangeStart := prefix + if opts.Continue != "" { + rangeStart = opts.Continue + } + rangeEnd := clientv3.GetPrefixRangeEnd(prefix) + rangeResp, err := k.KV.Get(ctx, rangeStart, clientv3.WithRange(rangeEnd), clientv3.WithLimit(opts.Limit), clientv3.WithRev(opts.Revision)) + if err != nil { + return resp, err + } + resp.Kvs = rangeResp.Kvs + resp.Count = rangeResp.Count + resp.Revision = rangeResp.Header.Revision + return resp, nil +} + +func (k Client) Count(ctx context.Context, prefix string, _ CountOptions) (int64, error) { + resp, err := k.KV.Get(ctx, prefix, clientv3.WithPrefix(), clientv3.WithCountOnly()) + if err != nil { + return 0, err + } + return resp.Count, nil +} + +func (k Client) OptimisticPut(ctx context.Context, key string, value []byte, expectedRevision int64, opts PutOptions) (resp PutResponse, err error) { + txn := k.KV.Txn(ctx).If( + clientv3.Compare(clientv3.ModRevision(key), "=", expectedRevision), + ).Then( + clientv3.OpPut(key, string(value), clientv3.WithLease(opts.LeaseID)), + ) + + if opts.GetOnFailure { + txn = txn.Else(clientv3.OpGet(key)) + } + + txnResp, err := txn.Commit() + if err != nil { + return resp, err + } + resp.Succeeded = txnResp.Succeeded + resp.Revision = txnResp.Header.Revision + if opts.GetOnFailure && !txnResp.Succeeded { + if len(txnResp.Responses) == 0 { + return resp, fmt.Errorf("invalid OptimisticPut response: %v", txnResp.Responses) + } + resp.KV = kvFromTxnResponse(txnResp.Responses[0]) + } + return resp, nil +} + +func (k Client) OptimisticDelete(ctx context.Context, key string, expectedRevision int64, opts DeleteOptions) (resp DeleteResponse, err error) { + txn := k.KV.Txn(ctx).If( + clientv3.Compare(clientv3.ModRevision(key), "=", expectedRevision), + ).Then( + clientv3.OpDelete(key), + ) + if opts.GetOnFailure { + txn = txn.Else(clientv3.OpGet(key)) + } + txnResp, err := txn.Commit() + if err != nil { + return resp, err + } + resp.Succeeded = txnResp.Succeeded + resp.Revision = txnResp.Header.Revision + if opts.GetOnFailure && !txnResp.Succeeded { + resp.KV = kvFromTxnResponse(txnResp.Responses[0]) + } + return resp, nil +} + +func kvFromTxnResponse(resp *pb.ResponseOp) *mvccpb.KeyValue { + getResponse := resp.GetResponseRange() + if len(getResponse.Kvs) == 1 { + return getResponse.Kvs[0] + } + return nil +} diff --git a/vendor/go.etcd.io/etcd/client/v3/kubernetes/interface.go b/vendor/go.etcd.io/etcd/client/v3/kubernetes/interface.go new file mode 100644 index 00000000000..19b82a62927 --- /dev/null +++ b/vendor/go.etcd.io/etcd/client/v3/kubernetes/interface.go @@ -0,0 +1,140 @@ +// Copyright 2024 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kubernetes + +import ( + "context" + + "go.etcd.io/etcd/api/v3/mvccpb" + clientv3 "go.etcd.io/etcd/client/v3" +) + +// Interface defines the minimal client-side interface that Kubernetes requires +// to interact with etcd. Methods below are standard etcd operations with +// semantics adjusted to better suit Kubernetes' needs. +type Interface interface { + // Get retrieves a single key-value pair from etcd. + // + // If opts.Revision is set to a non-zero value, the key-value pair is retrieved at the specified revision. + // If the required revision has been compacted, the request will fail with ErrCompacted. + Get(ctx context.Context, key string, opts GetOptions) (GetResponse, error) + + // List retrieves key-value pairs with the specified prefix, ordered lexicographically by key. + // + // If opts.Revision is non-zero, the key-value pairs are retrieved at the specified revision. + // If the required revision has been compacted, the request will fail with ErrCompacted. + // If opts.Limit is greater than zero, the number of returned key-value pairs is bounded by the limit. + // If opts.Continue is not empty, the listing will start from the key immediately after the one specified by Continue. + // The Continue value should be the last key returned in a previous paginated ListResponse. + List(ctx context.Context, prefix string, opts ListOptions) (ListResponse, error) + + // Count returns the number of keys with the specified prefix. + // + // Currently, there are no options for the Count operation. However, a placeholder options struct (CountOptions) + // is provided for future extensibility in case options become necessary. + Count(ctx context.Context, prefix string, opts CountOptions) (int64, error) + + // OptimisticPut creates or updates a key-value pair if the key has not been modified or created + // since the revision specified in expectedRevision. + // + // An OptimisticPut fails if the key has been modified since expectedRevision. + OptimisticPut(ctx context.Context, key string, value []byte, expectedRevision int64, opts PutOptions) (PutResponse, error) + + // OptimisticDelete deletes the key-value pair if it hasn't been modified since the revision + // specified in expectedRevision. + // + // An OptimisticDelete fails if the key has been modified since expectedRevision. + OptimisticDelete(ctx context.Context, key string, expectedRevision int64, opts DeleteOptions) (DeleteResponse, error) +} + +type GetOptions struct { + // Revision is the point-in-time of the etcd key-value store to use for the Get operation. + // If Revision is 0, it gets the latest value. + Revision int64 +} + +type ListOptions struct { + // Revision is the point-in-time of the etcd key-value store to use for the List operation. + // If Revision is 0, it gets the latest values. + Revision int64 + + // Limit is the maximum number of keys to return for a List operation. + // 0 means no limitation. + Limit int64 + + // Continue is a key from which to resume the List operation, excluding the given key. + // It should be set to the last key from a previous ListResponse when paginating. + Continue string +} + +// CountOptions is a placeholder for potential future options for the Count operation. +type CountOptions struct{} + +type PutOptions struct { + // GetOnFailure specifies whether to return the modified key-value pair if the Put operation fails due to a revision mismatch. + GetOnFailure bool + + // LeaseID is the ID of a lease to associate with the key allowing for automatic deletion after lease expires after it's TTL (time to live). + // Deprecated: Should be replaced with TTL when Interface starts using one lease per object. + LeaseID clientv3.LeaseID +} + +type DeleteOptions struct { + // GetOnFailure specifies whether to return the modified key-value pair if the Delete operation fails due to a revision mismatch. + GetOnFailure bool +} + +type GetResponse struct { + // KV is the key-value pair retrieved from etcd. + KV *mvccpb.KeyValue + + // Revision is the revision of the key-value store at the time of the Get operation. + Revision int64 +} + +type ListResponse struct { + // Kvs is the list of key-value pairs retrieved from etcd, ordered lexicographically by key. + Kvs []*mvccpb.KeyValue + + // Count is the total number of keys with the specified prefix, even if not all were returned due to a limit. + Count int64 + + // Revision is the revision of the key-value store at the time of the List operation. + Revision int64 +} + +type PutResponse struct { + // KV is the created or updated key-value pair. If the Put operation failed and GetOnFailure was true, this + // will be the modified key-value pair that caused the failure. + KV *mvccpb.KeyValue + + // Succeeded indicates whether the Put operation was successful. + Succeeded bool + + // Revision is the revision of the key-value store after the Put operation. + Revision int64 +} + +type DeleteResponse struct { + // KV is the deleted key-value pair. If the Delete operation failed and GetOnFailure was true, this + // will be the modified key-value pair that caused the failure. + KV *mvccpb.KeyValue + + // Succeeded indicates whether the Delete operation was successful. + Succeeded bool + + // Revision is the revision of the key-value store after the Delete operation. + Revision int64 +} diff --git a/vendor/modules.txt b/vendor/modules.txt index 4c1db56c788..699e43c3eb5 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -645,6 +645,7 @@ go.etcd.io/etcd/client/v3/concurrency go.etcd.io/etcd/client/v3/credentials go.etcd.io/etcd/client/v3/internal/endpoint go.etcd.io/etcd/client/v3/internal/resolver +go.etcd.io/etcd/client/v3/kubernetes # go.etcd.io/etcd/pkg/v3 v3.5.16 ## explicit; go 1.22 go.etcd.io/etcd/pkg/v3/adt From 066c1c05d73690b48c872f3fbc23b7722cd44fe3 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Fri, 5 Jul 2024 14:19:49 +0200 Subject: [PATCH 2/7] Update recorders to wrap kubernetes.Client --- .../pkg/storage/cacher/cacher_test.go | 14 +++---- .../pkg/storage/etcd3/compact_test.go | 4 +- .../apiserver/pkg/storage/etcd3/store.go | 12 +++--- .../apiserver/pkg/storage/etcd3/store_test.go | 37 ++++++++++--------- .../pkg/storage/etcd3/testing/test_server.go | 4 +- .../storage/etcd3/testserver/test_server.go | 5 ++- .../pkg/storage/etcd3/watcher_test.go | 2 +- .../storage/storagebackend/factory/etcd3.go | 12 +++--- .../storagebackend/factory/factory_test.go | 9 +++-- 9 files changed, 52 insertions(+), 47 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_test.go index cc8f715c700..faf68fd1995 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_test.go @@ -180,42 +180,42 @@ func TestList(t *testing.T) { featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, false) ctx, cacher, server, terminate := testSetupWithEtcdServer(t) t.Cleanup(terminate) - storagetesting.RunTestList(ctx, t, cacher, compactStorage(cacher, server.V3Client), true) + storagetesting.RunTestList(ctx, t, cacher, compactStorage(cacher, server.V3Client.Client), true) } func TestListWithConsistentListFromCache(t *testing.T) { featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, true) ctx, cacher, server, terminate := testSetupWithEtcdServer(t) t.Cleanup(terminate) - storagetesting.RunTestList(ctx, t, cacher, compactStorage(cacher, server.V3Client), true) + storagetesting.RunTestList(ctx, t, cacher, compactStorage(cacher, server.V3Client.Client), true) } func TestConsistentList(t *testing.T) { featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, false) ctx, cacher, server, terminate := testSetupWithEtcdServer(t) t.Cleanup(terminate) - storagetesting.RunTestConsistentList(ctx, t, cacher, compactStorage(cacher, server.V3Client), true, false) + storagetesting.RunTestConsistentList(ctx, t, cacher, compactStorage(cacher, server.V3Client.Client), true, false) } func TestConsistentListWithConsistentListFromCache(t *testing.T) { featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, true) ctx, cacher, server, terminate := testSetupWithEtcdServer(t) t.Cleanup(terminate) - storagetesting.RunTestConsistentList(ctx, t, cacher, compactStorage(cacher, server.V3Client), true, true) + storagetesting.RunTestConsistentList(ctx, t, cacher, compactStorage(cacher, server.V3Client.Client), true, true) } func TestGetListNonRecursive(t *testing.T) { featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, false) ctx, cacher, server, terminate := testSetupWithEtcdServer(t) t.Cleanup(terminate) - storagetesting.RunTestGetListNonRecursive(ctx, t, compactStorage(cacher, server.V3Client), cacher) + storagetesting.RunTestGetListNonRecursive(ctx, t, compactStorage(cacher, server.V3Client.Client), cacher) } func TestGetListNonRecursiveWithConsistentListFromCache(t *testing.T) { featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, true) ctx, cacher, server, terminate := testSetupWithEtcdServer(t) t.Cleanup(terminate) - storagetesting.RunTestGetListNonRecursive(ctx, t, compactStorage(cacher, server.V3Client), cacher) + storagetesting.RunTestGetListNonRecursive(ctx, t, compactStorage(cacher, server.V3Client.Client), cacher) } func TestGetListRecursivePrefix(t *testing.T) { @@ -301,7 +301,7 @@ func TestWatch(t *testing.T) { func TestWatchFromZero(t *testing.T) { ctx, cacher, server, terminate := testSetupWithEtcdServer(t) t.Cleanup(terminate) - storagetesting.RunTestWatchFromZero(ctx, t, cacher, compactStorage(cacher, server.V3Client)) + storagetesting.RunTestWatchFromZero(ctx, t, cacher, compactStorage(cacher, server.V3Client.Client)) } func TestDeleteTriggerWatch(t *testing.T) { diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/compact_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/compact_test.go index 04157ce1733..97c61e6ee17 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/compact_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/compact_test.go @@ -27,7 +27,7 @@ import ( ) func TestCompact(t *testing.T) { - client := testserver.RunEtcd(t, nil) + client := testserver.RunEtcd(t, nil).Client ctx := context.Background() putResp, err := client.Put(ctx, "/somekey", "data") @@ -56,7 +56,7 @@ func TestCompact(t *testing.T) { // - C1 compacts first. It will succeed. // - C2 compacts after. It will fail. But it will get latest logical time, which should be larger by one. func TestCompactConflict(t *testing.T) { - client := testserver.RunEtcd(t, nil) + client := testserver.RunEtcd(t, nil).Client ctx := context.Background() putResp, err := client.Put(ctx, "/somekey", "data") diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go index d437ee3d232..766821b12ed 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go @@ -27,7 +27,7 @@ import ( "time" clientv3 "go.etcd.io/etcd/client/v3" - _ "go.etcd.io/etcd/client/v3/kubernetes" + "go.etcd.io/etcd/client/v3/kubernetes" "go.opentelemetry.io/otel/attribute" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -73,7 +73,7 @@ func (d authenticatedDataString) AuthenticatedData() []byte { var _ value.Context = authenticatedDataString("") type store struct { - client *clientv3.Client + client *kubernetes.Client codec runtime.Codec versioner storage.Versioner transformer value.Transformer @@ -100,11 +100,11 @@ type objState struct { } // New returns an etcd3 implementation of storage.Interface. -func New(c *clientv3.Client, codec runtime.Codec, newFunc, newListFunc func() runtime.Object, prefix, resourcePrefix string, groupResource schema.GroupResource, transformer value.Transformer, leaseManagerConfig LeaseManagerConfig, decoder Decoder, versioner storage.Versioner) storage.Interface { +func New(c *kubernetes.Client, codec runtime.Codec, newFunc, newListFunc func() runtime.Object, prefix, resourcePrefix string, groupResource schema.GroupResource, transformer value.Transformer, leaseManagerConfig LeaseManagerConfig, decoder Decoder, versioner storage.Versioner) storage.Interface { return newStore(c, codec, newFunc, newListFunc, prefix, resourcePrefix, groupResource, transformer, leaseManagerConfig, decoder, versioner) } -func newStore(c *clientv3.Client, codec runtime.Codec, newFunc, newListFunc func() runtime.Object, prefix, resourcePrefix string, groupResource schema.GroupResource, transformer value.Transformer, leaseManagerConfig LeaseManagerConfig, decoder Decoder, versioner storage.Versioner) *store { +func newStore(c *kubernetes.Client, codec runtime.Codec, newFunc, newListFunc func() runtime.Object, prefix, resourcePrefix string, groupResource schema.GroupResource, transformer value.Transformer, leaseManagerConfig LeaseManagerConfig, decoder Decoder, versioner storage.Versioner) *store { // for compatibility with etcd2 impl. // no-op for default prefix of '/registry'. // keeps compatibility with etcd2 impl for custom prefixes that don't start with '/' @@ -115,7 +115,7 @@ func newStore(c *clientv3.Client, codec runtime.Codec, newFunc, newListFunc func } w := &watcher{ - client: c, + client: c.Client, codec: codec, newFunc: newFunc, groupResource: groupResource, @@ -136,7 +136,7 @@ func newStore(c *clientv3.Client, codec runtime.Codec, newFunc, newListFunc func groupResource: groupResource, groupResourceString: groupResource.String(), watcher: w, - leaseManager: newDefaultLeaseManager(c, leaseManagerConfig), + leaseManager: newDefaultLeaseManager(c.Client, leaseManagerConfig), decoder: decoder, } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go index 43d2f84751b..df8ee5c6cc1 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go @@ -29,6 +29,7 @@ import ( "github.com/go-logr/logr" clientv3 "go.etcd.io/etcd/client/v3" + "go.etcd.io/etcd/client/v3/kubernetes" "go.etcd.io/etcd/server/v3/embed" "google.golang.org/grpc/grpclog" @@ -95,7 +96,7 @@ func checkStorageInvariants(etcdClient *clientv3.Client, codec runtime.Codec) st func TestCreate(t *testing.T) { ctx, store, etcdClient := testSetup(t) - storagetesting.RunTestCreate(ctx, t, store, checkStorageInvariants(etcdClient, store.codec)) + storagetesting.RunTestCreate(ctx, t, store, checkStorageInvariants(etcdClient.Client, store.codec)) } func TestCreateWithTTL(t *testing.T) { @@ -170,7 +171,7 @@ func TestListPaging(t *testing.T) { func TestGetListNonRecursive(t *testing.T) { ctx, store, client := testSetup(t) - storagetesting.RunTestGetListNonRecursive(ctx, t, compactStorage(client), store) + storagetesting.RunTestGetListNonRecursive(ctx, t, compactStorage(client.Client), store) } func TestGetListRecursivePrefix(t *testing.T) { @@ -194,8 +195,8 @@ func (s *storeWithPrefixTransformer) UpdatePrefixTransformer(modifier storagetes } func TestGuaranteedUpdate(t *testing.T) { - ctx, store, etcdClient := testSetup(t) - storagetesting.RunTestGuaranteedUpdate(ctx, t, &storeWithPrefixTransformer{store}, checkStorageInvariants(etcdClient, store.codec)) + ctx, store, client := testSetup(t) + storagetesting.RunTestGuaranteedUpdate(ctx, t, &storeWithPrefixTransformer{store}, checkStorageInvariants(client.Client, store.codec)) } func TestGuaranteedUpdateWithTTL(t *testing.T) { @@ -225,12 +226,12 @@ func TestTransformationFailure(t *testing.T) { func TestList(t *testing.T) { ctx, store, client := testSetup(t) - storagetesting.RunTestList(ctx, t, store, compactStorage(client), false) + storagetesting.RunTestList(ctx, t, store, compactStorage(client.Client), false) } func TestConsistentList(t *testing.T) { ctx, store, client := testSetup(t) - storagetesting.RunTestConsistentList(ctx, t, store, compactStorage(client), false, true) + storagetesting.RunTestConsistentList(ctx, t, store, compactStorage(client.Client), false, true) } func checkStorageCallsInvariants(transformer *storagetesting.PrefixTransformer, recorder *clientRecorder) storagetesting.CallsValidation { @@ -258,29 +259,29 @@ func checkStorageCallsInvariants(transformer *storagetesting.PrefixTransformer, } } if reads := recorder.GetReadsAndReset(); reads != estimatedGetCalls { - t.Errorf("unexpected reads: %d", reads) + t.Fatalf("unexpected reads: %d, want: %d", reads, estimatedGetCalls) } } } func TestListContinuation(t *testing.T) { - ctx, store, etcdClient := testSetup(t, withRecorder()) + ctx, store, client := testSetup(t, withRecorder()) validation := checkStorageCallsInvariants( - store.transformer.(*storagetesting.PrefixTransformer), etcdClient.KV.(*clientRecorder)) + store.transformer.(*storagetesting.PrefixTransformer), client.KV.(*clientRecorder)) storagetesting.RunTestListContinuation(ctx, t, store, validation) } func TestListPaginationRareObject(t *testing.T) { - ctx, store, etcdClient := testSetup(t, withRecorder()) + ctx, store, client := testSetup(t, withRecorder()) validation := checkStorageCallsInvariants( - store.transformer.(*storagetesting.PrefixTransformer), etcdClient.KV.(*clientRecorder)) + store.transformer.(*storagetesting.PrefixTransformer), client.KV.(*clientRecorder)) storagetesting.RunTestListPaginationRareObject(ctx, t, store, validation) } func TestListContinuationWithFilter(t *testing.T) { - ctx, store, etcdClient := testSetup(t, withRecorder()) + ctx, store, client := testSetup(t, withRecorder()) validation := checkStorageCallsInvariants( - store.transformer.(*storagetesting.PrefixTransformer), etcdClient.KV.(*clientRecorder)) + store.transformer.(*storagetesting.PrefixTransformer), client.KV.(*clientRecorder)) storagetesting.RunTestListContinuationWithFilter(ctx, t, store, validation) } @@ -299,7 +300,7 @@ func compactStorage(etcdClient *clientv3.Client) storagetesting.Compaction { func TestListInconsistentContinuation(t *testing.T) { ctx, store, client := testSetup(t) - storagetesting.RunTestListInconsistentContinuation(ctx, t, store, compactStorage(client)) + storagetesting.RunTestListInconsistentContinuation(ctx, t, store, compactStorage(client.Client)) } func TestListResourceVersionMatch(t *testing.T) { @@ -499,7 +500,7 @@ func (r *clientRecorder) GetReadsAndReset() uint64 { } type setupOptions struct { - client func(testing.TB) *clientv3.Client + client func(testing.TB) *kubernetes.Client codec runtime.Codec newFunc func() runtime.Object newListFunc func() runtime.Object @@ -516,7 +517,7 @@ type setupOption func(*setupOptions) func withClientConfig(config *embed.Config) setupOption { return func(options *setupOptions) { - options.client = func(t testing.TB) *clientv3.Client { + options.client = func(t testing.TB) *kubernetes.Client { return testserver.RunEtcd(t, config) } } @@ -541,7 +542,7 @@ func withRecorder() setupOption { } func withDefaults(options *setupOptions) { - options.client = func(t testing.TB) *clientv3.Client { + options.client = func(t testing.TB) *kubernetes.Client { return testserver.RunEtcd(t, nil) } options.codec = apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) @@ -556,7 +557,7 @@ func withDefaults(options *setupOptions) { var _ setupOption = withDefaults -func testSetup(t testing.TB, opts ...setupOption) (context.Context, *store, *clientv3.Client) { +func testSetup(t testing.TB, opts ...setupOption) (context.Context, *store, *kubernetes.Client) { setupOpts := setupOptions{} opts = append([]setupOption{withDefaults}, opts...) for _, opt := range opts { diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/testing/test_server.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/testing/test_server.go index c5b7ad0feea..01e1b6a180e 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/testing/test_server.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/testing/test_server.go @@ -19,7 +19,7 @@ package testing import ( "testing" - clientv3 "go.etcd.io/etcd/client/v3" + "go.etcd.io/etcd/client/v3/kubernetes" "k8s.io/apiserver/pkg/storage/etcd3/testserver" "k8s.io/apiserver/pkg/storage/storagebackend" @@ -27,7 +27,7 @@ import ( // EtcdTestServer encapsulates the datastructures needed to start local instance for testing type EtcdTestServer struct { - V3Client *clientv3.Client + V3Client *kubernetes.Client } func (e *EtcdTestServer) Terminate(t testing.TB) { diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/testserver/test_server.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/testserver/test_server.go index ff1c04e220e..0df860aa216 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/testserver/test_server.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/testserver/test_server.go @@ -26,6 +26,7 @@ import ( "time" clientv3 "go.etcd.io/etcd/client/v3" + "go.etcd.io/etcd/client/v3/kubernetes" "go.etcd.io/etcd/server/v3/embed" "go.uber.org/zap/zapcore" "go.uber.org/zap/zaptest" @@ -81,7 +82,7 @@ func NewTestConfig(t testing.TB) *embed.Config { // RunEtcd starts an embedded etcd server with the provided config // (or NewTestConfig(t) if nil), and returns a client connected to the server. // The server is terminated when the test ends. -func RunEtcd(t testing.TB, cfg *embed.Config) *clientv3.Client { +func RunEtcd(t testing.TB, cfg *embed.Config) *kubernetes.Client { t.Helper() if cfg == nil { @@ -112,7 +113,7 @@ func RunEtcd(t testing.TB, cfg *embed.Config) *clientv3.Client { t.Fatal(err) } - client, err := clientv3.New(clientv3.Config{ + client, err := kubernetes.New(clientv3.Config{ TLS: tlsConfig, Endpoints: e.Server.Cluster().ClientURLs(), DialTimeout: 10 * time.Second, diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go index 1d809ff6f44..236fa9acabd 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go @@ -64,7 +64,7 @@ func TestDeleteTriggerWatch(t *testing.T) { func TestWatchFromZero(t *testing.T) { ctx, store, client := testSetup(t) - storagetesting.RunTestWatchFromZero(ctx, t, store, compactStorage(client)) + storagetesting.RunTestWatchFromZero(ctx, t, store, compactStorage(client.Client)) } // TestWatchFromNonZero tests that diff --git a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go index d629087d7d1..3bb248674df 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go @@ -33,6 +33,7 @@ import ( "go.etcd.io/etcd/client/pkg/v3/logutil" "go.etcd.io/etcd/client/pkg/v3/transport" clientv3 "go.etcd.io/etcd/client/v3" + "go.etcd.io/etcd/client/v3/kubernetes" "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "go.uber.org/zap" "go.uber.org/zap/zapcore" @@ -228,7 +229,7 @@ func newETCD3ProberMonitor(c storagebackend.Config) (*etcd3ProberMonitor, error) return nil, err } return &etcd3ProberMonitor{ - client: client, + client: client.Client, prefix: c.Prefix, endpoints: c.Transport.ServerList, }, nil @@ -282,7 +283,7 @@ func (t *etcd3ProberMonitor) Monitor(ctx context.Context) (metrics.StorageMetric }, nil } -var newETCD3Client = func(c storagebackend.TransportConfig) (*clientv3.Client, error) { +var newETCD3Client = func(c storagebackend.TransportConfig) (*kubernetes.Client, error) { tlsInfo := transport.TLSInfo{ CertFile: c.CertFile, KeyFile: c.KeyFile, @@ -352,7 +353,7 @@ var newETCD3Client = func(c storagebackend.TransportConfig) (*clientv3.Client, e Logger: etcd3ClientLogger, } - return clientv3.New(cfg) + return kubernetes.New(cfg) } type runningCompactor struct { @@ -384,10 +385,11 @@ func startCompactorOnce(c storagebackend.TransportConfig, interval time.Duration } key := fmt.Sprintf("%v", c) // gives: {[server1 server2] keyFile certFile caFile} if compactor, foundBefore := compactors[key]; !foundBefore || compactor.interval > interval { - compactorClient, err := newETCD3Client(c) + client, err := newETCD3Client(c) if err != nil { return nil, err } + compactorClient := client.Client if foundBefore { // replace compactor @@ -439,7 +441,7 @@ func newETCD3Storage(c storagebackend.ConfigForResource, newFunc, newListFunc fu // decorate the KV instance so we can track etcd latency per request. client.KV = etcd3.NewETCDLatencyTracker(client.KV) - stopDBSizeMonitor, err := startDBSizeMonitorPerEndpoint(client, c.DBMetricPollInterval) + stopDBSizeMonitor, err := startDBSizeMonitorPerEndpoint(client.Client, c.DBMetricPollInterval) if err != nil { return nil, nil, err } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/factory_test.go b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/factory_test.go index 7b9106dd9e6..c513b5d4c47 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/factory_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/factory_test.go @@ -27,6 +27,7 @@ import ( "time" clientv3 "go.etcd.io/etcd/client/v3" + "go.etcd.io/etcd/client/v3/kubernetes" "k8s.io/apiserver/pkg/storage/etcd3/testserver" "k8s.io/apiserver/pkg/storage/storagebackend" ) @@ -111,7 +112,7 @@ func TestCreateHealthcheck(t *testing.T) { t.Run(tc.name, func(t *testing.T) { ready := make(chan struct{}) tc.cfg.Transport.ServerList = client.Endpoints() - newETCD3Client = func(c storagebackend.TransportConfig) (*clientv3.Client, error) { + newETCD3Client = func(c storagebackend.TransportConfig) (*kubernetes.Client, error) { defer close(ready) dummyKV := mockKV{ get: func(ctx context.Context) (*clientv3.GetResponse, error) { @@ -211,7 +212,7 @@ func TestCreateReadycheck(t *testing.T) { t.Run(tc.name, func(t *testing.T) { ready := make(chan struct{}) tc.cfg.Transport.ServerList = client.Endpoints() - newETCD3Client = func(c storagebackend.TransportConfig) (*clientv3.Client, error) { + newETCD3Client = func(c storagebackend.TransportConfig) (*kubernetes.Client, error) { defer close(ready) dummyKV := mockKV{ get: func(ctx context.Context) (*clientv3.GetResponse, error) { @@ -277,7 +278,7 @@ func TestRateLimitHealthcheck(t *testing.T) { ready := make(chan struct{}) var counter uint64 - newETCD3Client = func(c storagebackend.TransportConfig) (*clientv3.Client, error) { + newETCD3Client = func(c storagebackend.TransportConfig) (*kubernetes.Client, error) { defer close(ready) dummyKV := mockKV{ get: func(ctx context.Context) (*clientv3.GetResponse, error) { @@ -373,7 +374,7 @@ func TestTimeTravelHealthcheck(t *testing.T) { signal := make(chan struct{}) var counter uint64 - newETCD3Client = func(c storagebackend.TransportConfig) (*clientv3.Client, error) { + newETCD3Client = func(c storagebackend.TransportConfig) (*kubernetes.Client, error) { defer close(ready) dummyKV := mockKV{ get: func(ctx context.Context) (*clientv3.GetResponse, error) { From 092a6d1e0d9f91d543369f40a1fb65e9c3bff034 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Fri, 31 May 2024 10:54:49 +0200 Subject: [PATCH 3/7] Migrate Get to Kubernetes client --- .../src/k8s.io/apiserver/pkg/storage/etcd3/store.go | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go index 766821b12ed..e178bd830ae 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go @@ -161,29 +161,28 @@ func (s *store) Get(ctx context.Context, key string, opts storage.GetOptions, ou return err } startTime := time.Now() - getResp, err := s.client.KV.Get(ctx, preparedKey) + getResp, err := s.client.Kubernetes.Get(ctx, preparedKey, kubernetes.GetOptions{}) metrics.RecordEtcdRequest("get", s.groupResourceString, err, startTime) if err != nil { return err } - if err = s.validateMinimumResourceVersion(opts.ResourceVersion, uint64(getResp.Header.Revision)); err != nil { + if err = s.validateMinimumResourceVersion(opts.ResourceVersion, uint64(getResp.Revision)); err != nil { return err } - if len(getResp.Kvs) == 0 { + if getResp.KV == nil { if opts.IgnoreNotFound { return runtime.SetZeroValue(out) } return storage.NewKeyNotFoundError(preparedKey, 0) } - kv := getResp.Kvs[0] - data, _, err := s.transformer.TransformFromStorage(ctx, kv.Value, authenticatedDataString(preparedKey)) + data, _, err := s.transformer.TransformFromStorage(ctx, getResp.KV.Value, authenticatedDataString(preparedKey)) if err != nil { return storage.NewInternalError(err) } - err = s.decoder.Decode(data, out, kv.ModRevision) + err = s.decoder.Decode(data, out, getResp.KV.ModRevision) if err != nil { recordDecodeError(s.groupResourceString, preparedKey) return err From 53ca81da29511596cac0301e7a4f527309d380e3 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Fri, 31 May 2024 10:57:41 +0200 Subject: [PATCH 4/7] Migrate Create to Kubernetes client --- .../apiserver/pkg/storage/etcd3/store.go | 22 +++++++------------ 1 file changed, 8 insertions(+), 14 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go index e178bd830ae..c938465da87 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go @@ -217,9 +217,12 @@ func (s *store) Create(ctx context.Context, key string, obj, out runtime.Object, } span.AddEvent("Encode succeeded", attribute.Int("len", len(data))) - opts, err := s.ttlOpts(ctx, int64(ttl)) - if err != nil { - return err + var lease clientv3.LeaseID + if ttl != 0 { + lease, err = s.leaseManager.GetLease(ctx, int64(ttl)) + if err != nil { + return err + } } newData, err := s.transformer.TransformToStorage(ctx, data, authenticatedDataString(preparedKey)) @@ -230,11 +233,7 @@ func (s *store) Create(ctx context.Context, key string, obj, out runtime.Object, span.AddEvent("TransformToStorage succeeded") startTime := time.Now() - txnResp, err := s.client.KV.Txn(ctx).If( - notFound(preparedKey), - ).Then( - clientv3.OpPut(preparedKey, string(newData), opts...), - ).Commit() + txnResp, err := s.client.Kubernetes.OptimisticPut(ctx, preparedKey, newData, 0, kubernetes.PutOptions{LeaseID: lease}) metrics.RecordEtcdRequest("create", s.groupResourceString, err, startTime) if err != nil { span.AddEvent("Txn call failed", attribute.String("err", err.Error())) @@ -247,8 +246,7 @@ func (s *store) Create(ctx context.Context, key string, obj, out runtime.Object, } if out != nil { - putResp := txnResp.Responses[0].GetResponsePut() - err = s.decoder.Decode(data, out, putResp.Header.Revision) + err = s.decoder.Decode(data, out, txnResp.Revision) if err != nil { span.AddEvent("decode failed", attribute.Int("len", len(data)), attribute.String("err", err.Error())) recordDecodeError(s.groupResourceString, preparedKey) @@ -1052,10 +1050,6 @@ func recordDecodeError(resource string, key string) { klog.V(4).Infof("Decoding %s \"%s\" failed", resource, key) } -func notFound(key string) clientv3.Cmp { - return clientv3.Compare(clientv3.ModRevision(key), "=", 0) -} - // getTypeName returns type name of an object for reporting purposes. func getTypeName(obj interface{}) string { return reflect.TypeOf(obj).String() From 2fcd321c426d2567e74b97da38b4860638385007 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Fri, 31 May 2024 11:07:38 +0200 Subject: [PATCH 5/7] Migrate Delete and GuaranteedUpdate to Kubernetes client --- .../apiserver/pkg/storage/etcd3/store.go | 77 ++++++------------- 1 file changed, 25 insertions(+), 52 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go index c938465da87..481d496e297 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go @@ -19,13 +19,13 @@ package etcd3 import ( "bytes" "context" - "errors" "fmt" "path" "reflect" "strings" "time" + "go.etcd.io/etcd/api/v3/mvccpb" clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/client/v3/kubernetes" "go.opentelemetry.io/otel/attribute" @@ -347,21 +347,16 @@ func (s *store) conditionalDelete( } startTime := time.Now() - txnResp, err := s.client.KV.Txn(ctx).If( - clientv3.Compare(clientv3.ModRevision(key), "=", origState.rev), - ).Then( - clientv3.OpDelete(key), - ).Else( - clientv3.OpGet(key), - ).Commit() + txnResp, err := s.client.Kubernetes.OptimisticDelete(ctx, key, origState.rev, kubernetes.DeleteOptions{ + GetOnFailure: true, + }) metrics.RecordEtcdRequest("delete", s.groupResourceString, err, startTime) if err != nil { return err } if !txnResp.Succeeded { - getResp := (*clientv3.GetResponse)(txnResp.Responses[0].GetResponseRange()) klog.V(4).Infof("deletion of %s failed because of a conflict, going to retry", key) - origState, err = s.getState(ctx, getResp, key, v, false, skipTransformDecode) + origState, err = s.getState(ctx, txnResp.KV, key, v, false, skipTransformDecode) if err != nil { return err } @@ -369,15 +364,8 @@ func (s *store) conditionalDelete( continue } - if len(txnResp.Responses) == 0 || txnResp.Responses[0].GetResponseDeleteRange() == nil { - return errors.New(fmt.Sprintf("invalid DeleteRange response: %v", txnResp.Responses)) - } - deleteResp := txnResp.Responses[0].GetResponseDeleteRange() - if deleteResp.Header == nil { - return errors.New("invalid DeleteRange response - nil header") - } if !skipTransformDecode { - err = s.decoder.Decode(origState.data, out, deleteResp.Header.Revision) + err = s.decoder.Decode(origState.data, out, txnResp.Revision) if err != nil { recordDecodeError(s.groupResourceString, key) return err @@ -510,20 +498,21 @@ func (s *store) GuaranteedUpdate( } span.AddEvent("TransformToStorage succeeded") - opts, err := s.ttlOpts(ctx, int64(ttl)) - if err != nil { - return err + var lease clientv3.LeaseID + if ttl != 0 { + lease, err = s.leaseManager.GetLease(ctx, int64(ttl)) + if err != nil { + return err + } } span.AddEvent("Transaction prepared") startTime := time.Now() - txnResp, err := s.client.KV.Txn(ctx).If( - clientv3.Compare(clientv3.ModRevision(preparedKey), "=", origState.rev), - ).Then( - clientv3.OpPut(preparedKey, string(newData), opts...), - ).Else( - clientv3.OpGet(preparedKey), - ).Commit() + + txnResp, err := s.client.Kubernetes.OptimisticPut(ctx, preparedKey, newData, origState.rev, kubernetes.PutOptions{ + GetOnFailure: true, + LeaseID: lease, + }) metrics.RecordEtcdRequest("update", s.groupResourceString, err, startTime) if err != nil { span.AddEvent("Txn call failed", attribute.String("err", err.Error())) @@ -532,10 +521,8 @@ func (s *store) GuaranteedUpdate( span.AddEvent("Txn call completed") span.AddEvent("Transaction committed") if !txnResp.Succeeded { - getResp := (*clientv3.GetResponse)(txnResp.Responses[0].GetResponseRange()) klog.V(4).Infof("GuaranteedUpdate of %s failed because of a conflict, going to retry", preparedKey) - skipTransformDecode := false - origState, err = s.getState(ctx, getResp, preparedKey, v, ignoreNotFound, skipTransformDecode) + origState, err = s.getState(ctx, txnResp.KV, preparedKey, v, ignoreNotFound, skipTransformDecode) if err != nil { return err } @@ -543,9 +530,8 @@ func (s *store) GuaranteedUpdate( origStateIsCurrent = true continue } - putResp := txnResp.Responses[0].GetResponsePut() - err = s.decoder.Decode(data, destination, putResp.Header.Revision) + err = s.decoder.Decode(data, destination, txnResp.Revision) if err != nil { span.AddEvent("decode failed", attribute.Int("len", len(data)), attribute.String("err", err.Error())) recordDecodeError(s.groupResourceString, preparedKey) @@ -885,12 +871,12 @@ func (s *store) watchContext(ctx context.Context) context.Context { func (s *store) getCurrentState(ctx context.Context, key string, v reflect.Value, ignoreNotFound bool, skipTransformDecode bool) func() (*objState, error) { return func() (*objState, error) { startTime := time.Now() - getResp, err := s.client.KV.Get(ctx, key) + getResp, err := s.client.Kubernetes.Get(ctx, key, kubernetes.GetOptions{}) metrics.RecordEtcdRequest("get", s.groupResourceString, err, startTime) if err != nil { return nil, err } - return s.getState(ctx, getResp, key, v, ignoreNotFound, skipTransformDecode) + return s.getState(ctx, getResp.KV, key, v, ignoreNotFound, skipTransformDecode) } } @@ -900,7 +886,7 @@ func (s *store) getCurrentState(ctx context.Context, key string, v reflect.Value // storage will be transformed and decoded. // NOTE: when skipTransformDecode is true, the 'data', and the 'obj' fields // of the objState will be nil, and 'stale' will be set to true. -func (s *store) getState(ctx context.Context, getResp *clientv3.GetResponse, key string, v reflect.Value, ignoreNotFound bool, skipTransformDecode bool) (*objState, error) { +func (s *store) getState(ctx context.Context, kv *mvccpb.KeyValue, key string, v reflect.Value, ignoreNotFound bool, skipTransformDecode bool) (*objState, error) { state := &objState{ meta: &storage.ResponseMeta{}, } @@ -911,7 +897,7 @@ func (s *store) getState(ctx context.Context, getResp *clientv3.GetResponse, key state.obj = reflect.New(v.Type()).Interface().(runtime.Object) } - if len(getResp.Kvs) == 0 { + if kv == nil { if !ignoreNotFound { return nil, storage.NewKeyNotFoundError(key, 0) } @@ -919,7 +905,7 @@ func (s *store) getState(ctx context.Context, getResp *clientv3.GetResponse, key return nil, err } } else { - state.rev = getResp.Kvs[0].ModRevision + state.rev = kv.ModRevision state.meta.ResourceVersion = uint64(state.rev) if skipTransformDecode { @@ -929,7 +915,7 @@ func (s *store) getState(ctx context.Context, getResp *clientv3.GetResponse, key return state, nil } - data, stale, err := s.transformer.TransformFromStorage(ctx, getResp.Kvs[0].Value, authenticatedDataString(key)) + data, stale, err := s.transformer.TransformFromStorage(ctx, kv.Value, authenticatedDataString(key)) if err != nil { return nil, storage.NewInternalError(err) } @@ -989,19 +975,6 @@ func (s *store) updateState(st *objState, userUpdate storage.UpdateFunc) (runtim return ret, ttl, nil } -// ttlOpts returns client options based on given ttl. -// ttl: if ttl is non-zero, it will attach the key to a lease with ttl of roughly the same length -func (s *store) ttlOpts(ctx context.Context, ttl int64) ([]clientv3.OpOption, error) { - if ttl == 0 { - return nil, nil - } - id, err := s.leaseManager.GetLease(ctx, ttl) - if err != nil { - return nil, err - } - return []clientv3.OpOption{clientv3.WithLease(id)}, nil -} - // validateMinimumResourceVersion returns a 'too large resource' version error when the provided minimumResourceVersion is // greater than the most recent actualRevision available from storage. func (s *store) validateMinimumResourceVersion(minimumResourceVersion string, actualRevision uint64) error { From e192ac31a425e186230285829947df3854d08125 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Fri, 31 May 2024 11:08:48 +0200 Subject: [PATCH 6/7] Migrate Count to Kubernetes client --- staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go index 481d496e297..753cbe899a1 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go @@ -573,12 +573,12 @@ func (s *store) Count(key string) (int64, error) { } startTime := time.Now() - getResp, err := s.client.KV.Get(context.Background(), preparedKey, clientv3.WithRange(clientv3.GetPrefixRangeEnd(preparedKey)), clientv3.WithCountOnly()) + count, err := s.client.Kubernetes.Count(context.Background(), preparedKey, kubernetes.CountOptions{}) metrics.RecordEtcdRequest("listWithCount", preparedKey, err, startTime) if err != nil { return 0, err } - return getResp.Count, nil + return count, nil } // ReadinessCheck implements storage.Interface. From a16a364324c218b703d033edf89187aa60d9dd87 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Fri, 31 May 2024 14:36:22 +0200 Subject: [PATCH 7/7] Migrate GetList to Kubernetes client --- .../apiserver/pkg/storage/etcd3/store.go | 67 ++++++++++--------- 1 file changed, 35 insertions(+), 32 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go index 753cbe899a1..4a4e416a577 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go @@ -629,7 +629,7 @@ func (s *store) resolveGetListRev(continueKey string, continueRV int64, opts sto // GetList implements storage.Interface. func (s *store) GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error { - preparedKey, err := s.prepareKey(key) + keyPrefix, err := s.prepareKey(key) if err != nil { return err } @@ -654,27 +654,13 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption // get children "directories". e.g. if we have key "/a", "/a/b", "/ab", getting keys // with prefix "/a" will return all three, while with prefix "/a/" will return only // "/a/b" which is the correct answer. - if opts.Recursive && !strings.HasSuffix(preparedKey, "/") { - preparedKey += "/" + if opts.Recursive && !strings.HasSuffix(keyPrefix, "/") { + keyPrefix += "/" } - keyPrefix := preparedKey // set the appropriate clientv3 options to filter the returned data set - var limitOption *clientv3.OpOption limit := opts.Predicate.Limit - var paging bool - options := make([]clientv3.OpOption, 0, 4) - if opts.Predicate.Limit > 0 { - paging = true - options = append(options, clientv3.WithLimit(limit)) - limitOption = &options[len(options)-1] - } - - if opts.Recursive { - rangeEnd := clientv3.GetPrefixRangeEnd(keyPrefix) - options = append(options, clientv3.WithRange(rangeEnd)) - } - + paging := opts.Predicate.Limit > 0 newItemFunc := getNewItemFunc(listObj, v) var continueRV, withRev int64 @@ -684,20 +670,15 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption if err != nil { return apierrors.NewBadRequest(fmt.Sprintf("invalid continue token: %v", err)) } - preparedKey = continueKey } if withRev, err = s.resolveGetListRev(continueKey, continueRV, opts); err != nil { return err } - if withRev != 0 { - options = append(options, clientv3.WithRev(withRev)) - } - // loop until we have filled the requested limit from etcd or there are no more results var lastKey []byte var hasMore bool - var getResp *clientv3.GetResponse + var getResp kubernetes.ListResponse var numFetched int var numEvald int // Because these metrics are for understanding the costs of handling LIST requests, @@ -714,24 +695,27 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption for { startTime := time.Now() - getResp, err = s.client.KV.Get(ctx, preparedKey, options...) + getResp, err = s.getList(ctx, keyPrefix, opts.Recursive, kubernetes.ListOptions{ + Revision: withRev, + Limit: limit, + Continue: continueKey, + }) metrics.RecordEtcdRequest(metricsOp, s.groupResourceString, err, startTime) if err != nil { return interpretListError(err, len(opts.Predicate.Continue) > 0, continueKey, keyPrefix) } numFetched += len(getResp.Kvs) - if err = s.validateMinimumResourceVersion(opts.ResourceVersion, uint64(getResp.Header.Revision)); err != nil { + if err = s.validateMinimumResourceVersion(opts.ResourceVersion, uint64(getResp.Revision)); err != nil { return err } - hasMore = getResp.More + hasMore = int64(len(getResp.Kvs)) < getResp.Count - if len(getResp.Kvs) == 0 && getResp.More { + if len(getResp.Kvs) == 0 && hasMore { return fmt.Errorf("no results were found, but etcd indicated there were more values remaining") } // indicate to the client which resource version was returned, and use the same resource version for subsequent requests. if withRev == 0 { - withRev = getResp.Header.Revision - options = append(options, clientv3.WithRev(withRev)) + withRev = getResp.Revision } // avoid small allocations for the result slice, since this can be called in many @@ -779,6 +763,7 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption // free kv early. Long lists can take O(seconds) to decode. getResp.Kvs[i] = nil } + continueKey = string(lastKey) + "\x00" // no more results remain or we didn't request paging if !hasMore || !paging { @@ -796,9 +781,7 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption if limit > maxLimit { limit = maxLimit } - *limitOption = clientv3.WithLimit(limit) } - preparedKey = string(lastKey) + "\x00" } if v.IsNil() { @@ -813,6 +796,26 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption return s.versioner.UpdateList(listObj, uint64(withRev), continueValue, remainingItemCount) } +func (s *store) getList(ctx context.Context, keyPrefix string, recursive bool, options kubernetes.ListOptions) (kubernetes.ListResponse, error) { + if recursive { + return s.client.Kubernetes.List(ctx, keyPrefix, options) + } + getResp, err := s.client.Kubernetes.Get(ctx, keyPrefix, kubernetes.GetOptions{ + Revision: options.Revision, + }) + var resp kubernetes.ListResponse + if getResp.KV != nil { + resp.Kvs = []*mvccpb.KeyValue{getResp.KV} + resp.Count = 1 + resp.Revision = getResp.Revision + } else { + resp.Kvs = []*mvccpb.KeyValue{} + resp.Count = 0 + resp.Revision = getResp.Revision + } + return resp, err +} + // growSlice takes a slice value and grows its capacity up // to the maximum of the passed sizes or maxCapacity, whichever // is smaller. Above maxCapacity decisions about allocation are left