From 800dd19fc23f14f9e83f09ff40155395af9f7cb0 Mon Sep 17 00:00:00 2001 From: SataQiu <1527062125@qq.com> Date: Mon, 15 Jun 2020 22:43:21 +0800 Subject: [PATCH] increase robustness for kubeadm etcd operations Signed-off-by: SataQiu <1527062125@qq.com> --- cmd/kubeadm/app/util/etcd/etcd.go | 141 ++++++++++++++---------------- 1 file changed, 67 insertions(+), 74 deletions(-) diff --git a/cmd/kubeadm/app/util/etcd/etcd.go b/cmd/kubeadm/app/util/etcd/etcd.go index fe3693dcd86..b9f9c2f2763 100644 --- a/cmd/kubeadm/app/util/etcd/etcd.go +++ b/cmd/kubeadm/app/util/etcd/etcd.go @@ -42,11 +42,11 @@ import ( const etcdTimeout = 2 * time.Second -// Exponential backoff for etcd operations +// Exponential backoff for etcd operations (up to ~200 seconds) var etcdBackoff = wait.Backoff{ - Steps: 11, - Duration: 50 * time.Millisecond, - Factor: 2.0, + Steps: 18, + Duration: 100 * time.Millisecond, + Factor: 1.5, Jitter: 0.1, } @@ -210,29 +210,27 @@ func getRawEtcdEndpointsFromClusterStatus(client clientset.Interface) ([]string, return etcdEndpoints, nil } -// dialTimeout is the timeout for failing to establish a connection. -// It is set to >20 seconds as times shorter than that will cause TLS connections to fail -// on heavily loaded arm64 CPUs (issue #64649) -const dialTimeout = 40 * time.Second - // Sync synchronizes client's endpoints with the known endpoints from the etcd membership. func (c *Client) Sync() error { - cli, err := clientv3.New(clientv3.Config{ - Endpoints: c.Endpoints, - DialTimeout: dialTimeout, - DialOptions: []grpc.DialOption{ - grpc.WithBlock(), // block until the underlying connection is up - }, - TLS: c.TLS, - }) - if err != nil { - return err - } - defer cli.Close() - // Syncs the list of endpoints + var cli *clientv3.Client var lastError error - err = wait.ExponentialBackoff(etcdBackoff, func() (bool, error) { + err := wait.ExponentialBackoff(etcdBackoff, func() (bool, error) { + var err error + cli, err = clientv3.New(clientv3.Config{ + Endpoints: c.Endpoints, + DialTimeout: etcdTimeout, + DialOptions: []grpc.DialOption{ + grpc.WithBlock(), // block until the underlying connection is up + }, + TLS: c.TLS, + }) + if err != nil { + lastError = err + return false, nil + } + defer cli.Close() + ctx, cancel := context.WithTimeout(context.Background(), etcdTimeout) err = cli.Sync(ctx) cancel() @@ -260,23 +258,24 @@ type Member struct { } func (c *Client) listMembers() (*clientv3.MemberListResponse, error) { - cli, err := clientv3.New(clientv3.Config{ - Endpoints: c.Endpoints, - DialTimeout: dialTimeout, - DialOptions: []grpc.DialOption{ - grpc.WithBlock(), // block until the underlying connection is up - }, - TLS: c.TLS, - }) - if err != nil { - return nil, err - } - defer cli.Close() - // Gets the member list var lastError error var resp *clientv3.MemberListResponse - err = wait.ExponentialBackoff(etcdBackoff, func() (bool, error) { + err := wait.ExponentialBackoff(etcdBackoff, func() (bool, error) { + cli, err := clientv3.New(clientv3.Config{ + Endpoints: c.Endpoints, + DialTimeout: etcdTimeout, + DialOptions: []grpc.DialOption{ + grpc.WithBlock(), // block until the underlying connection is up + }, + TLS: c.TLS, + }) + if err != nil { + lastError = err + return false, nil + } + defer cli.Close() + ctx, cancel := context.WithTimeout(context.Background(), etcdTimeout) resp, err = cli.MemberList(ctx) cancel() @@ -324,23 +323,24 @@ func (c *Client) ListMembers() ([]Member, error) { // RemoveMember notifies an etcd cluster to remove an existing member func (c *Client) RemoveMember(id uint64) ([]Member, error) { - cli, err := clientv3.New(clientv3.Config{ - Endpoints: c.Endpoints, - DialTimeout: dialTimeout, - DialOptions: []grpc.DialOption{ - grpc.WithBlock(), // block until the underlying connection is up - }, - TLS: c.TLS, - }) - if err != nil { - return nil, err - } - defer cli.Close() - // Remove an existing member from the cluster var lastError error var resp *clientv3.MemberRemoveResponse - err = wait.ExponentialBackoff(etcdBackoff, func() (bool, error) { + err := wait.ExponentialBackoff(etcdBackoff, func() (bool, error) { + cli, err := clientv3.New(clientv3.Config{ + Endpoints: c.Endpoints, + DialTimeout: etcdTimeout, + DialOptions: []grpc.DialOption{ + grpc.WithBlock(), // block until the underlying connection is up + }, + TLS: c.TLS, + }) + if err != nil { + lastError = err + return false, nil + } + defer cli.Close() + ctx, cancel := context.WithTimeout(context.Background(), etcdTimeout) resp, err = cli.MemberRemove(ctx, id) cancel() @@ -374,18 +374,10 @@ func (c *Client) AddMember(name string, peerAddrs string) ([]Member, error) { return nil, errors.Wrapf(err, "error parsing peer address %s", peerAddrs) } - // Exponential backoff for the MemberAdd operation (up to ~200 seconds) - etcdBackoffAdd := wait.Backoff{ - Steps: 18, - Duration: 100 * time.Millisecond, - Factor: 1.5, - Jitter: 0.1, - } - // Adds a new member to the cluster var lastError error var resp *clientv3.MemberAddResponse - err = wait.ExponentialBackoff(etcdBackoffAdd, func() (bool, error) { + err = wait.ExponentialBackoff(etcdBackoff, func() (bool, error) { cli, err := clientv3.New(clientv3.Config{ Endpoints: c.Endpoints, DialTimeout: etcdTimeout, @@ -447,25 +439,26 @@ func (c *Client) CheckClusterHealth() error { // getClusterStatus returns nil for status Up (along with endpoint status response map) or error for status Down func (c *Client) getClusterStatus() (map[string]*clientv3.StatusResponse, error) { - cli, err := clientv3.New(clientv3.Config{ - Endpoints: c.Endpoints, - DialTimeout: dialTimeout, - DialOptions: []grpc.DialOption{ - grpc.WithBlock(), // block until the underlying connection is up - }, - TLS: c.TLS, - }) - if err != nil { - return nil, err - } - defer cli.Close() - clusterStatus := make(map[string]*clientv3.StatusResponse) for _, ep := range c.Endpoints { // Gets the member status var lastError error var resp *clientv3.StatusResponse - err = wait.ExponentialBackoff(etcdBackoff, func() (bool, error) { + err := wait.ExponentialBackoff(etcdBackoff, func() (bool, error) { + cli, err := clientv3.New(clientv3.Config{ + Endpoints: c.Endpoints, + DialTimeout: etcdTimeout, + DialOptions: []grpc.DialOption{ + grpc.WithBlock(), // block until the underlying connection is up + }, + TLS: c.TLS, + }) + if err != nil { + lastError = err + return false, nil + } + defer cli.Close() + ctx, cancel := context.WithTimeout(context.Background(), etcdTimeout) resp, err = cli.Status(ctx, ep) cancel()