diff --git a/cmd/kubeadm/app/util/etcd/etcd.go b/cmd/kubeadm/app/util/etcd/etcd.go index c2439536d06..5de86cacc08 100644 --- a/cmd/kubeadm/app/util/etcd/etcd.go +++ b/cmd/kubeadm/app/util/etcd/etcd.go @@ -66,27 +66,69 @@ type ClusterInterrogator interface { RemoveMember(id uint64) ([]Member, error) } +type etcdClient interface { + // Close shuts down the client's etcd connections. + Close() error + + // Endpoints lists the registered endpoints for the client. + Endpoints() []string + + // MemberList lists the current cluster membership. + MemberList(ctx context.Context) (*clientv3.MemberListResponse, error) + + // MemberAdd adds a new member into the cluster. + MemberAdd(ctx context.Context, peerAddrs []string) (*clientv3.MemberAddResponse, error) + + // MemberAddAsLearner adds a new learner member into the cluster. + MemberAddAsLearner(ctx context.Context, peerAddrs []string) (*clientv3.MemberAddResponse, error) + + // MemberRemove removes an existing member from the cluster. + MemberRemove(ctx context.Context, id uint64) (*clientv3.MemberRemoveResponse, error) + + // MemberPromote promotes a member from raft learner (non-voting) to raft voting member. + MemberPromote(ctx context.Context, id uint64) (*clientv3.MemberPromoteResponse, error) + + // Status gets the status of the endpoint. + Status(ctx context.Context, endpoint string) (*clientv3.StatusResponse, error) + + // Sync synchronizes client's endpoints with the known endpoints from the etcd membership. + Sync(ctx context.Context) error +} + // Client provides connection parameters for an etcd cluster type Client struct { Endpoints []string - TLS *tls.Config + + newEtcdClient func(endpoints []string) (etcdClient, error) } // New creates a new EtcdCluster client func New(endpoints []string, ca, cert, key string) (*Client, error) { client := Client{Endpoints: endpoints} + var err error + var tlsConfig *tls.Config if ca != "" || cert != "" || key != "" { tlsInfo := transport.TLSInfo{ CertFile: cert, KeyFile: key, TrustedCAFile: ca, } - tlsConfig, err := tlsInfo.ClientConfig() + tlsConfig, err = tlsInfo.ClientConfig() if err != nil { return nil, err } - client.TLS = tlsConfig + } + + client.newEtcdClient = func(endpoints []string) (etcdClient, error) { + return clientv3.New(clientv3.Config{ + Endpoints: endpoints, + DialTimeout: etcdTimeout, + DialOptions: []grpc.DialOption{ + grpc.WithBlock(), // block until the underlying connection is up + }, + TLS: tlsConfig, + }) } return &client, nil @@ -192,24 +234,16 @@ func getRawEtcdEndpointsFromPodAnnotationWithoutRetry(client clientset.Interface // Sync synchronizes client's endpoints with the known endpoints from the etcd membership. func (c *Client) Sync() error { // Syncs the list of endpoints - var cli *clientv3.Client + var cli etcdClient var lastError error err := wait.ExponentialBackoff(etcdBackoff, func() (bool, error) { var err error - cli, err = clientv3.New(clientv3.Config{ - Endpoints: c.Endpoints, - DialTimeout: etcdTimeout, - DialOptions: []grpc.DialOption{ - grpc.WithBlock(), // block until the underlying connection is up - }, - TLS: c.TLS, - }) + cli, err = c.newEtcdClient(c.Endpoints) if err != nil { lastError = err return false, nil } defer cli.Close() - ctx, cancel := context.WithTimeout(context.Background(), etcdTimeout) err = cli.Sync(ctx) cancel() @@ -241,14 +275,7 @@ func (c *Client) listMembers() (*clientv3.MemberListResponse, error) { var lastError error var resp *clientv3.MemberListResponse err := wait.ExponentialBackoff(etcdBackoff, func() (bool, error) { - cli, err := clientv3.New(clientv3.Config{ - Endpoints: c.Endpoints, - DialTimeout: etcdTimeout, - DialOptions: []grpc.DialOption{ - grpc.WithBlock(), // block until the underlying connection is up - }, - TLS: c.TLS, - }) + cli, err := c.newEtcdClient(c.Endpoints) if err != nil { lastError = err return false, nil @@ -306,14 +333,7 @@ func (c *Client) RemoveMember(id uint64) ([]Member, error) { var lastError error var resp *clientv3.MemberRemoveResponse err := wait.ExponentialBackoff(etcdBackoff, func() (bool, error) { - cli, err := clientv3.New(clientv3.Config{ - Endpoints: c.Endpoints, - DialTimeout: etcdTimeout, - DialOptions: []grpc.DialOption{ - grpc.WithBlock(), // block until the underlying connection is up - }, - TLS: c.TLS, - }) + cli, err := c.newEtcdClient(c.Endpoints) if err != nil { lastError = err return false, nil @@ -365,14 +385,7 @@ func (c *Client) addMember(name string, peerAddrs string, isLearner bool) ([]Mem return nil, errors.Wrapf(err, "error parsing peer address %s", peerAddrs) } - cli, err := clientv3.New(clientv3.Config{ - Endpoints: c.Endpoints, - DialTimeout: etcdTimeout, - DialOptions: []grpc.DialOption{ - grpc.WithBlock(), // block until the underlying connection is up - }, - TLS: c.TLS, - }) + cli, err := c.newEtcdClient(c.Endpoints) if err != nil { return nil, err } @@ -482,14 +495,7 @@ func (c *Client) MemberPromote(learnerID uint64) error { } klog.V(1).Infof("[etcd] Promoting a learner as a voting member: %016x", learnerID) - cli, err := clientv3.New(clientv3.Config{ - Endpoints: c.Endpoints, - DialTimeout: etcdTimeout, - DialOptions: []grpc.DialOption{ - grpc.WithBlock(), // block until the underlying connection is up - }, - TLS: c.TLS, - }) + cli, err := c.newEtcdClient(c.Endpoints) if err != nil { return err } @@ -537,14 +543,7 @@ func (c *Client) getClusterStatus() (map[string]*clientv3.StatusResponse, error) var lastError error var resp *clientv3.StatusResponse err := wait.ExponentialBackoff(etcdBackoff, func() (bool, error) { - cli, err := clientv3.New(clientv3.Config{ - Endpoints: c.Endpoints, - DialTimeout: etcdTimeout, - DialOptions: []grpc.DialOption{ - grpc.WithBlock(), // block until the underlying connection is up - }, - TLS: c.TLS, - }) + cli, err := c.newEtcdClient(c.Endpoints) if err != nil { lastError = err return false, nil