Merge pull request #92131 from SataQiu/fix-etcd-opt-20200615

kubeadm: increase robustness for kubeadm etcd operations
This commit is contained in:
Kubernetes Prow Robot 2020-07-01 00:06:21 -07:00 committed by GitHub
commit 4c523b1981
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -42,11 +42,11 @@ import (
const etcdTimeout = 2 * time.Second
// Exponential backoff for etcd operations
// Exponential backoff for etcd operations (up to ~200 seconds)
var etcdBackoff = wait.Backoff{
Steps: 11,
Duration: 50 * time.Millisecond,
Factor: 2.0,
Steps: 18,
Duration: 100 * time.Millisecond,
Factor: 1.5,
Jitter: 0.1,
}
@ -210,29 +210,27 @@ func getRawEtcdEndpointsFromClusterStatus(client clientset.Interface) ([]string,
return etcdEndpoints, nil
}
// dialTimeout is the timeout for failing to establish a connection.
// It is set to >20 seconds as times shorter than that will cause TLS connections to fail
// on heavily loaded arm64 CPUs (issue #64649)
const dialTimeout = 40 * time.Second
// Sync synchronizes client's endpoints with the known endpoints from the etcd membership.
func (c *Client) Sync() error {
cli, err := clientv3.New(clientv3.Config{
Endpoints: c.Endpoints,
DialTimeout: dialTimeout,
DialOptions: []grpc.DialOption{
grpc.WithBlock(), // block until the underlying connection is up
},
TLS: c.TLS,
})
if err != nil {
return err
}
defer cli.Close()
// Syncs the list of endpoints
var cli *clientv3.Client
var lastError error
err = wait.ExponentialBackoff(etcdBackoff, func() (bool, error) {
err := wait.ExponentialBackoff(etcdBackoff, func() (bool, error) {
var err error
cli, err = clientv3.New(clientv3.Config{
Endpoints: c.Endpoints,
DialTimeout: etcdTimeout,
DialOptions: []grpc.DialOption{
grpc.WithBlock(), // block until the underlying connection is up
},
TLS: c.TLS,
})
if err != nil {
lastError = err
return false, nil
}
defer cli.Close()
ctx, cancel := context.WithTimeout(context.Background(), etcdTimeout)
err = cli.Sync(ctx)
cancel()
@ -260,23 +258,24 @@ type Member struct {
}
func (c *Client) listMembers() (*clientv3.MemberListResponse, error) {
cli, err := clientv3.New(clientv3.Config{
Endpoints: c.Endpoints,
DialTimeout: dialTimeout,
DialOptions: []grpc.DialOption{
grpc.WithBlock(), // block until the underlying connection is up
},
TLS: c.TLS,
})
if err != nil {
return nil, err
}
defer cli.Close()
// Gets the member list
var lastError error
var resp *clientv3.MemberListResponse
err = wait.ExponentialBackoff(etcdBackoff, func() (bool, error) {
err := wait.ExponentialBackoff(etcdBackoff, func() (bool, error) {
cli, err := clientv3.New(clientv3.Config{
Endpoints: c.Endpoints,
DialTimeout: etcdTimeout,
DialOptions: []grpc.DialOption{
grpc.WithBlock(), // block until the underlying connection is up
},
TLS: c.TLS,
})
if err != nil {
lastError = err
return false, nil
}
defer cli.Close()
ctx, cancel := context.WithTimeout(context.Background(), etcdTimeout)
resp, err = cli.MemberList(ctx)
cancel()
@ -324,23 +323,24 @@ func (c *Client) ListMembers() ([]Member, error) {
// RemoveMember notifies an etcd cluster to remove an existing member
func (c *Client) RemoveMember(id uint64) ([]Member, error) {
cli, err := clientv3.New(clientv3.Config{
Endpoints: c.Endpoints,
DialTimeout: dialTimeout,
DialOptions: []grpc.DialOption{
grpc.WithBlock(), // block until the underlying connection is up
},
TLS: c.TLS,
})
if err != nil {
return nil, err
}
defer cli.Close()
// Remove an existing member from the cluster
var lastError error
var resp *clientv3.MemberRemoveResponse
err = wait.ExponentialBackoff(etcdBackoff, func() (bool, error) {
err := wait.ExponentialBackoff(etcdBackoff, func() (bool, error) {
cli, err := clientv3.New(clientv3.Config{
Endpoints: c.Endpoints,
DialTimeout: etcdTimeout,
DialOptions: []grpc.DialOption{
grpc.WithBlock(), // block until the underlying connection is up
},
TLS: c.TLS,
})
if err != nil {
lastError = err
return false, nil
}
defer cli.Close()
ctx, cancel := context.WithTimeout(context.Background(), etcdTimeout)
resp, err = cli.MemberRemove(ctx, id)
cancel()
@ -374,18 +374,10 @@ func (c *Client) AddMember(name string, peerAddrs string) ([]Member, error) {
return nil, errors.Wrapf(err, "error parsing peer address %s", peerAddrs)
}
// Exponential backoff for the MemberAdd operation (up to ~200 seconds)
etcdBackoffAdd := wait.Backoff{
Steps: 18,
Duration: 100 * time.Millisecond,
Factor: 1.5,
Jitter: 0.1,
}
// Adds a new member to the cluster
var lastError error
var resp *clientv3.MemberAddResponse
err = wait.ExponentialBackoff(etcdBackoffAdd, func() (bool, error) {
err = wait.ExponentialBackoff(etcdBackoff, func() (bool, error) {
cli, err := clientv3.New(clientv3.Config{
Endpoints: c.Endpoints,
DialTimeout: etcdTimeout,
@ -447,25 +439,26 @@ func (c *Client) CheckClusterHealth() error {
// getClusterStatus returns nil for status Up (along with endpoint status response map) or error for status Down
func (c *Client) getClusterStatus() (map[string]*clientv3.StatusResponse, error) {
cli, err := clientv3.New(clientv3.Config{
Endpoints: c.Endpoints,
DialTimeout: dialTimeout,
DialOptions: []grpc.DialOption{
grpc.WithBlock(), // block until the underlying connection is up
},
TLS: c.TLS,
})
if err != nil {
return nil, err
}
defer cli.Close()
clusterStatus := make(map[string]*clientv3.StatusResponse)
for _, ep := range c.Endpoints {
// Gets the member status
var lastError error
var resp *clientv3.StatusResponse
err = wait.ExponentialBackoff(etcdBackoff, func() (bool, error) {
err := wait.ExponentialBackoff(etcdBackoff, func() (bool, error) {
cli, err := clientv3.New(clientv3.Config{
Endpoints: c.Endpoints,
DialTimeout: etcdTimeout,
DialOptions: []grpc.DialOption{
grpc.WithBlock(), // block until the underlying connection is up
},
TLS: c.TLS,
})
if err != nil {
lastError = err
return false, nil
}
defer cli.Close()
ctx, cancel := context.WithTimeout(context.Background(), etcdTimeout)
resp, err = cli.Status(ctx, ep)
cancel()