kubeadm/app/util/etcd: : block etcd client creation until connection is up

The new etcd balancer (>3.3.14, 3.4.0) uses an asynchronous resolver for
endpoints. Without "WithBlock", the client may return before the
connection is up.

Signed-off-by: Gyuho Lee <leegyuho@amazon.com>
This commit is contained in:
Gyuho Lee 2019-08-14 17:28:12 -07:00
parent 3ad4fedede
commit eb1509a1d3

View File

@ -29,6 +29,7 @@ import (
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/pkg/transport"
"github.com/pkg/errors"
"google.golang.org/grpc"
"k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/klog"
@ -126,12 +127,20 @@ func NewFromCluster(client clientset.Interface, certificatesDir string) (*Client
return etcdClient, nil
}
// dialTimeout is the timeout for failing to establish a connection.
// It is set to 20 seconds as times shorter than that will cause TLS connections to fail
// on heavily loaded arm64 CPUs (issue #64649)
const dialTimeout = 20 * time.Second
// Sync synchronizes client's endpoints with the known endpoints from the etcd membership.
func (c *Client) Sync() error {
cli, err := clientv3.New(clientv3.Config{
Endpoints: c.Endpoints,
DialTimeout: 20 * time.Second,
TLS: c.TLS,
DialTimeout: dialTimeout,
DialOptions: []grpc.DialOption{
grpc.WithBlock(), // block until the underlying connection is up
},
TLS: c.TLS,
})
if err != nil {
return err
@ -161,8 +170,11 @@ type Member struct {
func (c *Client) GetMemberID(peerURL string) (uint64, error) {
cli, err := clientv3.New(clientv3.Config{
Endpoints: c.Endpoints,
DialTimeout: 30 * time.Second,
TLS: c.TLS,
DialTimeout: dialTimeout,
DialOptions: []grpc.DialOption{
grpc.WithBlock(), // block until the underlying connection is up
},
TLS: c.TLS,
})
if err != nil {
return 0, err
@ -188,8 +200,11 @@ func (c *Client) GetMemberID(peerURL string) (uint64, error) {
func (c *Client) RemoveMember(id uint64) ([]Member, error) {
cli, err := clientv3.New(clientv3.Config{
Endpoints: c.Endpoints,
DialTimeout: 30 * time.Second,
TLS: c.TLS,
DialTimeout: dialTimeout,
DialOptions: []grpc.DialOption{
grpc.WithBlock(), // block until the underlying connection is up
},
TLS: c.TLS,
})
if err != nil {
return nil, err
@ -232,8 +247,11 @@ func (c *Client) AddMember(name string, peerAddrs string) ([]Member, error) {
cli, err := clientv3.New(clientv3.Config{
Endpoints: c.Endpoints,
DialTimeout: 20 * time.Second,
TLS: c.TLS,
DialTimeout: dialTimeout,
DialOptions: []grpc.DialOption{
grpc.WithBlock(), // block until the underlying connection is up
},
TLS: c.TLS,
})
if err != nil {
return nil, err
@ -320,8 +338,11 @@ func (c *Client) ClusterAvailable() (bool, error) {
func (c *Client) GetClusterStatus() (map[string]*clientv3.StatusResponse, error) {
cli, err := clientv3.New(clientv3.Config{
Endpoints: c.Endpoints,
DialTimeout: 5 * time.Second,
TLS: c.TLS,
DialTimeout: dialTimeout,
DialOptions: []grpc.DialOption{
grpc.WithBlock(), // block until the underlying connection is up
},
TLS: c.TLS,
})
if err != nil {
return nil, err