diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go index 37e454e90e9..d437ee3d232 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go @@ -27,6 +27,7 @@ import ( "time" clientv3 "go.etcd.io/etcd/client/v3" + _ "go.etcd.io/etcd/client/v3/kubernetes" "go.opentelemetry.io/otel/attribute" apierrors "k8s.io/apimachinery/pkg/api/errors" diff --git a/vendor/go.etcd.io/etcd/client/v3/kubernetes/client.go b/vendor/go.etcd.io/etcd/client/v3/kubernetes/client.go new file mode 100644 index 00000000000..11f2a456447 --- /dev/null +++ b/vendor/go.etcd.io/etcd/client/v3/kubernetes/client.go @@ -0,0 +1,136 @@ +// Copyright 2024 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kubernetes + +import ( + "context" + "fmt" + + pb "go.etcd.io/etcd/api/v3/etcdserverpb" + "go.etcd.io/etcd/api/v3/mvccpb" + clientv3 "go.etcd.io/etcd/client/v3" +) + +// New creates Client from config. +// Caller is responsible to call Close() to clean up client. +func New(cfg clientv3.Config) (*Client, error) { + c, err := clientv3.New(cfg) + if err != nil { + return nil, err + } + kc := &Client{ + Client: c, + } + kc.Kubernetes = kc + return kc, nil +} + +type Client struct { + *clientv3.Client + Kubernetes Interface +} + +var _ Interface = (*Client)(nil) + +func (k Client) Get(ctx context.Context, key string, opts GetOptions) (resp GetResponse, err error) { + rangeResp, err := k.KV.Get(ctx, key, clientv3.WithRev(opts.Revision), clientv3.WithLimit(1)) + if err != nil { + return resp, err + } + resp.Revision = rangeResp.Header.Revision + if len(rangeResp.Kvs) == 1 { + resp.KV = rangeResp.Kvs[0] + } + return resp, nil +} + +func (k Client) List(ctx context.Context, prefix string, opts ListOptions) (resp ListResponse, err error) { + rangeStart := prefix + if opts.Continue != "" { + rangeStart = opts.Continue + } + rangeEnd := clientv3.GetPrefixRangeEnd(prefix) + rangeResp, err := k.KV.Get(ctx, rangeStart, clientv3.WithRange(rangeEnd), clientv3.WithLimit(opts.Limit), clientv3.WithRev(opts.Revision)) + if err != nil { + return resp, err + } + resp.Kvs = rangeResp.Kvs + resp.Count = rangeResp.Count + resp.Revision = rangeResp.Header.Revision + return resp, nil +} + +func (k Client) Count(ctx context.Context, prefix string, _ CountOptions) (int64, error) { + resp, err := k.KV.Get(ctx, prefix, clientv3.WithPrefix(), clientv3.WithCountOnly()) + if err != nil { + return 0, err + } + return resp.Count, nil +} + +func (k Client) OptimisticPut(ctx context.Context, key string, value []byte, expectedRevision int64, opts PutOptions) (resp PutResponse, err error) { + txn := k.KV.Txn(ctx).If( + clientv3.Compare(clientv3.ModRevision(key), "=", expectedRevision), + ).Then( + clientv3.OpPut(key, string(value), clientv3.WithLease(opts.LeaseID)), + ) + + if opts.GetOnFailure { + txn = txn.Else(clientv3.OpGet(key)) + } + + txnResp, err := txn.Commit() + if err != nil { + return resp, err + } + resp.Succeeded = txnResp.Succeeded + resp.Revision = txnResp.Header.Revision + if opts.GetOnFailure && !txnResp.Succeeded { + if len(txnResp.Responses) == 0 { + return resp, fmt.Errorf("invalid OptimisticPut response: %v", txnResp.Responses) + } + resp.KV = kvFromTxnResponse(txnResp.Responses[0]) + } + return resp, nil +} + +func (k Client) OptimisticDelete(ctx context.Context, key string, expectedRevision int64, opts DeleteOptions) (resp DeleteResponse, err error) { + txn := k.KV.Txn(ctx).If( + clientv3.Compare(clientv3.ModRevision(key), "=", expectedRevision), + ).Then( + clientv3.OpDelete(key), + ) + if opts.GetOnFailure { + txn = txn.Else(clientv3.OpGet(key)) + } + txnResp, err := txn.Commit() + if err != nil { + return resp, err + } + resp.Succeeded = txnResp.Succeeded + resp.Revision = txnResp.Header.Revision + if opts.GetOnFailure && !txnResp.Succeeded { + resp.KV = kvFromTxnResponse(txnResp.Responses[0]) + } + return resp, nil +} + +func kvFromTxnResponse(resp *pb.ResponseOp) *mvccpb.KeyValue { + getResponse := resp.GetResponseRange() + if len(getResponse.Kvs) == 1 { + return getResponse.Kvs[0] + } + return nil +} diff --git a/vendor/go.etcd.io/etcd/client/v3/kubernetes/interface.go b/vendor/go.etcd.io/etcd/client/v3/kubernetes/interface.go new file mode 100644 index 00000000000..19b82a62927 --- /dev/null +++ b/vendor/go.etcd.io/etcd/client/v3/kubernetes/interface.go @@ -0,0 +1,140 @@ +// Copyright 2024 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kubernetes + +import ( + "context" + + "go.etcd.io/etcd/api/v3/mvccpb" + clientv3 "go.etcd.io/etcd/client/v3" +) + +// Interface defines the minimal client-side interface that Kubernetes requires +// to interact with etcd. Methods below are standard etcd operations with +// semantics adjusted to better suit Kubernetes' needs. +type Interface interface { + // Get retrieves a single key-value pair from etcd. + // + // If opts.Revision is set to a non-zero value, the key-value pair is retrieved at the specified revision. + // If the required revision has been compacted, the request will fail with ErrCompacted. + Get(ctx context.Context, key string, opts GetOptions) (GetResponse, error) + + // List retrieves key-value pairs with the specified prefix, ordered lexicographically by key. + // + // If opts.Revision is non-zero, the key-value pairs are retrieved at the specified revision. + // If the required revision has been compacted, the request will fail with ErrCompacted. + // If opts.Limit is greater than zero, the number of returned key-value pairs is bounded by the limit. + // If opts.Continue is not empty, the listing will start from the key immediately after the one specified by Continue. + // The Continue value should be the last key returned in a previous paginated ListResponse. + List(ctx context.Context, prefix string, opts ListOptions) (ListResponse, error) + + // Count returns the number of keys with the specified prefix. + // + // Currently, there are no options for the Count operation. However, a placeholder options struct (CountOptions) + // is provided for future extensibility in case options become necessary. + Count(ctx context.Context, prefix string, opts CountOptions) (int64, error) + + // OptimisticPut creates or updates a key-value pair if the key has not been modified or created + // since the revision specified in expectedRevision. + // + // An OptimisticPut fails if the key has been modified since expectedRevision. + OptimisticPut(ctx context.Context, key string, value []byte, expectedRevision int64, opts PutOptions) (PutResponse, error) + + // OptimisticDelete deletes the key-value pair if it hasn't been modified since the revision + // specified in expectedRevision. + // + // An OptimisticDelete fails if the key has been modified since expectedRevision. + OptimisticDelete(ctx context.Context, key string, expectedRevision int64, opts DeleteOptions) (DeleteResponse, error) +} + +type GetOptions struct { + // Revision is the point-in-time of the etcd key-value store to use for the Get operation. + // If Revision is 0, it gets the latest value. + Revision int64 +} + +type ListOptions struct { + // Revision is the point-in-time of the etcd key-value store to use for the List operation. + // If Revision is 0, it gets the latest values. + Revision int64 + + // Limit is the maximum number of keys to return for a List operation. + // 0 means no limitation. + Limit int64 + + // Continue is a key from which to resume the List operation, excluding the given key. + // It should be set to the last key from a previous ListResponse when paginating. + Continue string +} + +// CountOptions is a placeholder for potential future options for the Count operation. +type CountOptions struct{} + +type PutOptions struct { + // GetOnFailure specifies whether to return the modified key-value pair if the Put operation fails due to a revision mismatch. + GetOnFailure bool + + // LeaseID is the ID of a lease to associate with the key allowing for automatic deletion after lease expires after it's TTL (time to live). + // Deprecated: Should be replaced with TTL when Interface starts using one lease per object. + LeaseID clientv3.LeaseID +} + +type DeleteOptions struct { + // GetOnFailure specifies whether to return the modified key-value pair if the Delete operation fails due to a revision mismatch. + GetOnFailure bool +} + +type GetResponse struct { + // KV is the key-value pair retrieved from etcd. + KV *mvccpb.KeyValue + + // Revision is the revision of the key-value store at the time of the Get operation. + Revision int64 +} + +type ListResponse struct { + // Kvs is the list of key-value pairs retrieved from etcd, ordered lexicographically by key. + Kvs []*mvccpb.KeyValue + + // Count is the total number of keys with the specified prefix, even if not all were returned due to a limit. + Count int64 + + // Revision is the revision of the key-value store at the time of the List operation. + Revision int64 +} + +type PutResponse struct { + // KV is the created or updated key-value pair. If the Put operation failed and GetOnFailure was true, this + // will be the modified key-value pair that caused the failure. + KV *mvccpb.KeyValue + + // Succeeded indicates whether the Put operation was successful. + Succeeded bool + + // Revision is the revision of the key-value store after the Put operation. + Revision int64 +} + +type DeleteResponse struct { + // KV is the deleted key-value pair. If the Delete operation failed and GetOnFailure was true, this + // will be the modified key-value pair that caused the failure. + KV *mvccpb.KeyValue + + // Succeeded indicates whether the Delete operation was successful. + Succeeded bool + + // Revision is the revision of the key-value store after the Delete operation. + Revision int64 +} diff --git a/vendor/modules.txt b/vendor/modules.txt index 4c1db56c788..699e43c3eb5 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -645,6 +645,7 @@ go.etcd.io/etcd/client/v3/concurrency go.etcd.io/etcd/client/v3/credentials go.etcd.io/etcd/client/v3/internal/endpoint go.etcd.io/etcd/client/v3/internal/resolver +go.etcd.io/etcd/client/v3/kubernetes # go.etcd.io/etcd/pkg/v3 v3.5.16 ## explicit; go 1.22 go.etcd.io/etcd/pkg/v3/adt