kubeadm: Use internal etcd client through an interface

This commit is contained in:
Daniel Lipovetsky 2023-05-08 13:31:23 -07:00
parent 5fd29596ef
commit fc1b228779
No known key found for this signature in database
GPG Key ID: 559B3DEDDDF8FF82

View File

@ -66,27 +66,69 @@ type ClusterInterrogator interface {
RemoveMember(id uint64) ([]Member, error) RemoveMember(id uint64) ([]Member, error)
} }
type etcdClient interface {
// Close shuts down the client's etcd connections.
Close() error
// Endpoints lists the registered endpoints for the client.
Endpoints() []string
// MemberList lists the current cluster membership.
MemberList(ctx context.Context) (*clientv3.MemberListResponse, error)
// MemberAdd adds a new member into the cluster.
MemberAdd(ctx context.Context, peerAddrs []string) (*clientv3.MemberAddResponse, error)
// MemberAddAsLearner adds a new learner member into the cluster.
MemberAddAsLearner(ctx context.Context, peerAddrs []string) (*clientv3.MemberAddResponse, error)
// MemberRemove removes an existing member from the cluster.
MemberRemove(ctx context.Context, id uint64) (*clientv3.MemberRemoveResponse, error)
// MemberPromote promotes a member from raft learner (non-voting) to raft voting member.
MemberPromote(ctx context.Context, id uint64) (*clientv3.MemberPromoteResponse, error)
// Status gets the status of the endpoint.
Status(ctx context.Context, endpoint string) (*clientv3.StatusResponse, error)
// Sync synchronizes client's endpoints with the known endpoints from the etcd membership.
Sync(ctx context.Context) error
}
// Client provides connection parameters for an etcd cluster // Client provides connection parameters for an etcd cluster
type Client struct { type Client struct {
Endpoints []string Endpoints []string
TLS *tls.Config
newEtcdClient func(endpoints []string) (etcdClient, error)
} }
// New creates a new EtcdCluster client // New creates a new EtcdCluster client
func New(endpoints []string, ca, cert, key string) (*Client, error) { func New(endpoints []string, ca, cert, key string) (*Client, error) {
client := Client{Endpoints: endpoints} client := Client{Endpoints: endpoints}
var err error
var tlsConfig *tls.Config
if ca != "" || cert != "" || key != "" { if ca != "" || cert != "" || key != "" {
tlsInfo := transport.TLSInfo{ tlsInfo := transport.TLSInfo{
CertFile: cert, CertFile: cert,
KeyFile: key, KeyFile: key,
TrustedCAFile: ca, TrustedCAFile: ca,
} }
tlsConfig, err := tlsInfo.ClientConfig() tlsConfig, err = tlsInfo.ClientConfig()
if err != nil { if err != nil {
return nil, err return nil, err
} }
client.TLS = tlsConfig }
client.newEtcdClient = func(endpoints []string) (etcdClient, error) {
return clientv3.New(clientv3.Config{
Endpoints: endpoints,
DialTimeout: etcdTimeout,
DialOptions: []grpc.DialOption{
grpc.WithBlock(), // block until the underlying connection is up
},
TLS: tlsConfig,
})
} }
return &client, nil return &client, nil
@ -192,24 +234,16 @@ func getRawEtcdEndpointsFromPodAnnotationWithoutRetry(client clientset.Interface
// Sync synchronizes client's endpoints with the known endpoints from the etcd membership. // Sync synchronizes client's endpoints with the known endpoints from the etcd membership.
func (c *Client) Sync() error { func (c *Client) Sync() error {
// Syncs the list of endpoints // Syncs the list of endpoints
var cli *clientv3.Client var cli etcdClient
var lastError error var lastError error
err := wait.ExponentialBackoff(etcdBackoff, func() (bool, error) { err := wait.ExponentialBackoff(etcdBackoff, func() (bool, error) {
var err error var err error
cli, err = clientv3.New(clientv3.Config{ cli, err = c.newEtcdClient(c.Endpoints)
Endpoints: c.Endpoints,
DialTimeout: etcdTimeout,
DialOptions: []grpc.DialOption{
grpc.WithBlock(), // block until the underlying connection is up
},
TLS: c.TLS,
})
if err != nil { if err != nil {
lastError = err lastError = err
return false, nil return false, nil
} }
defer cli.Close() defer cli.Close()
ctx, cancel := context.WithTimeout(context.Background(), etcdTimeout) ctx, cancel := context.WithTimeout(context.Background(), etcdTimeout)
err = cli.Sync(ctx) err = cli.Sync(ctx)
cancel() cancel()
@ -241,14 +275,7 @@ func (c *Client) listMembers() (*clientv3.MemberListResponse, error) {
var lastError error var lastError error
var resp *clientv3.MemberListResponse var resp *clientv3.MemberListResponse
err := wait.ExponentialBackoff(etcdBackoff, func() (bool, error) { err := wait.ExponentialBackoff(etcdBackoff, func() (bool, error) {
cli, err := clientv3.New(clientv3.Config{ cli, err := c.newEtcdClient(c.Endpoints)
Endpoints: c.Endpoints,
DialTimeout: etcdTimeout,
DialOptions: []grpc.DialOption{
grpc.WithBlock(), // block until the underlying connection is up
},
TLS: c.TLS,
})
if err != nil { if err != nil {
lastError = err lastError = err
return false, nil return false, nil
@ -306,14 +333,7 @@ func (c *Client) RemoveMember(id uint64) ([]Member, error) {
var lastError error var lastError error
var resp *clientv3.MemberRemoveResponse var resp *clientv3.MemberRemoveResponse
err := wait.ExponentialBackoff(etcdBackoff, func() (bool, error) { err := wait.ExponentialBackoff(etcdBackoff, func() (bool, error) {
cli, err := clientv3.New(clientv3.Config{ cli, err := c.newEtcdClient(c.Endpoints)
Endpoints: c.Endpoints,
DialTimeout: etcdTimeout,
DialOptions: []grpc.DialOption{
grpc.WithBlock(), // block until the underlying connection is up
},
TLS: c.TLS,
})
if err != nil { if err != nil {
lastError = err lastError = err
return false, nil return false, nil
@ -365,14 +385,7 @@ func (c *Client) addMember(name string, peerAddrs string, isLearner bool) ([]Mem
return nil, errors.Wrapf(err, "error parsing peer address %s", peerAddrs) return nil, errors.Wrapf(err, "error parsing peer address %s", peerAddrs)
} }
cli, err := clientv3.New(clientv3.Config{ cli, err := c.newEtcdClient(c.Endpoints)
Endpoints: c.Endpoints,
DialTimeout: etcdTimeout,
DialOptions: []grpc.DialOption{
grpc.WithBlock(), // block until the underlying connection is up
},
TLS: c.TLS,
})
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -482,14 +495,7 @@ func (c *Client) MemberPromote(learnerID uint64) error {
} }
klog.V(1).Infof("[etcd] Promoting a learner as a voting member: %016x", learnerID) klog.V(1).Infof("[etcd] Promoting a learner as a voting member: %016x", learnerID)
cli, err := clientv3.New(clientv3.Config{ cli, err := c.newEtcdClient(c.Endpoints)
Endpoints: c.Endpoints,
DialTimeout: etcdTimeout,
DialOptions: []grpc.DialOption{
grpc.WithBlock(), // block until the underlying connection is up
},
TLS: c.TLS,
})
if err != nil { if err != nil {
return err return err
} }
@ -537,14 +543,7 @@ func (c *Client) getClusterStatus() (map[string]*clientv3.StatusResponse, error)
var lastError error var lastError error
var resp *clientv3.StatusResponse var resp *clientv3.StatusResponse
err := wait.ExponentialBackoff(etcdBackoff, func() (bool, error) { err := wait.ExponentialBackoff(etcdBackoff, func() (bool, error) {
cli, err := clientv3.New(clientv3.Config{ cli, err := c.newEtcdClient(c.Endpoints)
Endpoints: c.Endpoints,
DialTimeout: etcdTimeout,
DialOptions: []grpc.DialOption{
grpc.WithBlock(), // block until the underlying connection is up
},
TLS: c.TLS,
})
if err != nil { if err != nil {
lastError = err lastError = err
return false, nil return false, nil