Merge pull request #81435 from gyuho/with-block

Block etcd client creation until connection is up
This commit is contained in:
Kubernetes Prow Robot 2019-08-29 18:07:27 -07:00 committed by GitHub
commit 6c62ddc85b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 78 additions and 28 deletions

View File

@ -28,6 +28,7 @@ go_library(
"//vendor/github.com/coreos/etcd/client:go_default_library",
"//vendor/github.com/coreos/etcd/clientv3:go_default_library",
"//vendor/github.com/spf13/cobra:go_default_library",
"//vendor/google.golang.org/grpc:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
],
)

View File

@ -18,6 +18,7 @@ package main
import (
"bytes"
"context"
"fmt"
"os"
"os/exec"
@ -25,10 +26,9 @@ import (
"strings"
"time"
"context"
clientv2 "github.com/coreos/etcd/client"
"github.com/coreos/etcd/clientv3"
"google.golang.org/grpc"
"k8s.io/klog"
)
@ -113,7 +113,13 @@ func (e *CombinedEtcdClient) clientV2() (clientv2.KeysAPI, error) {
}
func (e *CombinedEtcdClient) clientV3() (*clientv3.Client, error) {
return clientv3.New(clientv3.Config{Endpoints: []string{e.endpoint()}})
return clientv3.New(clientv3.Config{
Endpoints: []string{e.endpoint()},
DialTimeout: 20 * time.Second,
DialOptions: []grpc.DialOption{
grpc.WithBlock(), // block until the underlying connection is up
},
})
}
// Backup creates a backup of an etcd2 data directory at the given backupDir.

View File

@ -174,13 +174,7 @@
"golang.org/x/text/unicode/bidi",
"golang.org/x/text/unicode/norm",
"golang.org/x/text/width",
"golang.org/x/time/rate"
]
},
{
"SelectorRegexp": "google[.]golang[.]org",
"AllowedPrefixes": [
"google.golang.org/genproto/googleapis/rpc/status",
"golang.org/x/time/rate",
"google.golang.org/grpc"
]
},

View File

@ -14,6 +14,7 @@ go_library(
"//vendor/github.com/coreos/etcd/clientv3:go_default_library",
"//vendor/github.com/coreos/etcd/pkg/transport:go_default_library",
"//vendor/github.com/pkg/errors:go_default_library",
"//vendor/google.golang.org/grpc:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
],
)

View File

@ -29,6 +29,7 @@ import (
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/pkg/transport"
"github.com/pkg/errors"
"google.golang.org/grpc"
"k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/klog"
@ -126,12 +127,20 @@ func NewFromCluster(client clientset.Interface, certificatesDir string) (*Client
return etcdClient, nil
}
// dialTimeout is the timeout for failing to establish a connection.
// It is set to 20 seconds as times shorter than that will cause TLS connections to fail
// on heavily loaded arm64 CPUs (issue #64649)
const dialTimeout = 20 * time.Second
// Sync synchronizes client's endpoints with the known endpoints from the etcd membership.
func (c *Client) Sync() error {
cli, err := clientv3.New(clientv3.Config{
Endpoints: c.Endpoints,
DialTimeout: 20 * time.Second,
TLS: c.TLS,
DialTimeout: dialTimeout,
DialOptions: []grpc.DialOption{
grpc.WithBlock(), // block until the underlying connection is up
},
TLS: c.TLS,
})
if err != nil {
return err
@ -161,8 +170,11 @@ type Member struct {
func (c *Client) GetMemberID(peerURL string) (uint64, error) {
cli, err := clientv3.New(clientv3.Config{
Endpoints: c.Endpoints,
DialTimeout: 30 * time.Second,
TLS: c.TLS,
DialTimeout: dialTimeout,
DialOptions: []grpc.DialOption{
grpc.WithBlock(), // block until the underlying connection is up
},
TLS: c.TLS,
})
if err != nil {
return 0, err
@ -188,8 +200,11 @@ func (c *Client) GetMemberID(peerURL string) (uint64, error) {
func (c *Client) RemoveMember(id uint64) ([]Member, error) {
cli, err := clientv3.New(clientv3.Config{
Endpoints: c.Endpoints,
DialTimeout: 30 * time.Second,
TLS: c.TLS,
DialTimeout: dialTimeout,
DialOptions: []grpc.DialOption{
grpc.WithBlock(), // block until the underlying connection is up
},
TLS: c.TLS,
})
if err != nil {
return nil, err
@ -232,8 +247,11 @@ func (c *Client) AddMember(name string, peerAddrs string) ([]Member, error) {
cli, err := clientv3.New(clientv3.Config{
Endpoints: c.Endpoints,
DialTimeout: 20 * time.Second,
TLS: c.TLS,
DialTimeout: dialTimeout,
DialOptions: []grpc.DialOption{
grpc.WithBlock(), // block until the underlying connection is up
},
TLS: c.TLS,
})
if err != nil {
return nil, err
@ -320,8 +338,11 @@ func (c *Client) ClusterAvailable() (bool, error) {
func (c *Client) GetClusterStatus() (map[string]*clientv3.StatusResponse, error) {
cli, err := clientv3.New(clientv3.Config{
Endpoints: c.Endpoints,
DialTimeout: 5 * time.Second,
TLS: c.TLS,
DialTimeout: dialTimeout,
DialOptions: []grpc.DialOption{
grpc.WithBlock(), // block until the underlying connection is up
},
TLS: c.TLS,
})
if err != nil {
return nil, err

View File

@ -20,6 +20,7 @@ require (
github.com/spf13/cobra v0.0.5
github.com/spf13/pflag v1.0.3
github.com/stretchr/testify v1.3.0
google.golang.org/grpc v1.23.0
gopkg.in/yaml.v2 v2.2.2
k8s.io/api v0.0.0
k8s.io/apimachinery v0.0.0

View File

@ -61,6 +61,7 @@ go_test(
"//vendor/github.com/coreos/etcd/pkg/transport:go_default_library",
"//vendor/github.com/stretchr/testify/assert:go_default_library",
"//vendor/github.com/stretchr/testify/require:go_default_library",
"//vendor/google.golang.org/grpc:go_default_library",
"//vendor/k8s.io/utils/pointer:go_default_library",
"//vendor/sigs.k8s.io/yaml:go_default_library",
],

View File

@ -21,9 +21,11 @@ import (
"reflect"
"strings"
"testing"
"time"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/pkg/transport"
"google.golang.org/grpc"
"sigs.k8s.io/yaml"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
@ -147,8 +149,12 @@ func TestInvalidObjectMetaInStorage(t *testing.T) {
t.Fatal(err)
}
etcdConfig := clientv3.Config{
Endpoints: restOptions.StorageConfig.Transport.ServerList,
TLS: tlsConfig,
Endpoints: restOptions.StorageConfig.Transport.ServerList,
DialTimeout: 20 * time.Second,
DialOptions: []grpc.DialOption{
grpc.WithBlock(), // block until the underlying connection is up
},
TLS: tlsConfig,
}
etcdclient, err := clientv3.New(etcdConfig)
if err != nil {

View File

@ -21,9 +21,11 @@ import (
"reflect"
"strings"
"testing"
"time"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/pkg/transport"
"google.golang.org/grpc"
"sigs.k8s.io/yaml"
@ -331,8 +333,12 @@ func TestPruningFromStorage(t *testing.T) {
t.Fatal(err)
}
etcdConfig := clientv3.Config{
Endpoints: restOptions.StorageConfig.Transport.ServerList,
TLS: tlsConfig,
Endpoints: restOptions.StorageConfig.Transport.ServerList,
DialTimeout: 20 * time.Second,
DialOptions: []grpc.DialOption{
grpc.WithBlock(), // block until the underlying connection is up
},
TLS: tlsConfig,
}
etcdclient, err := clientv3.New(etcdConfig)
if err != nil {

View File

@ -13,6 +13,7 @@ go_library(
"//staging/src/k8s.io/apiserver/pkg/storage/storagebackend:go_default_library",
"//vendor/github.com/coreos/etcd/clientv3:go_default_library",
"//vendor/github.com/coreos/etcd/pkg/transport:go_default_library",
"//vendor/google.golang.org/grpc:go_default_library",
],
)

View File

@ -25,6 +25,7 @@ import (
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/pkg/transport"
"google.golang.org/grpc"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apiserver/pkg/registry/generic"
@ -112,8 +113,12 @@ func GetEtcdClients(config storagebackend.TransportConfig) (*clientv3.Client, cl
}
cfg := clientv3.Config{
Endpoints: config.ServerList,
TLS: tlsConfig,
Endpoints: config.ServerList,
DialTimeout: 20 * time.Second,
DialOptions: []grpc.DialOption{
grpc.WithBlock(), // block until the underlying connection is up
},
TLS: tlsConfig,
}
c, err := clientv3.New(cfg)

View File

@ -111,6 +111,7 @@ func newETCD3Client(c storagebackend.TransportConfig) (*clientv3.Client, error)
DialKeepAliveTime: keepaliveTime,
DialKeepAliveTimeout: keepaliveTimeout,
DialOptions: []grpc.DialOption{
grpc.WithBlock(), // block until the underlying connection is up
grpc.WithUnaryInterceptor(grpcprom.UnaryClientInterceptor),
grpc.WithStreamInterceptor(grpcprom.StreamClientInterceptor),
},

View File

@ -21,6 +21,7 @@ go_library(
"//staging/src/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library",
"//vendor/github.com/coreos/etcd/clientv3:go_default_library",
"//vendor/github.com/coreos/etcd/pkg/transport:go_default_library",
"//vendor/google.golang.org/grpc:go_default_library",
],
)

View File

@ -20,6 +20,7 @@ import (
"testing"
"time"
"google.golang.org/grpc"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
@ -82,8 +83,12 @@ func GetEtcdClients(config storagebackend.TransportConfig) (*clientv3.Client, cl
}
cfg := clientv3.Config{
Endpoints: config.ServerList,
TLS: tlsConfig,
Endpoints: config.ServerList,
DialTimeout: 20 * time.Second,
DialOptions: []grpc.DialOption{
grpc.WithBlock(), // block until the underlying connection is up
},
TLS: tlsConfig,
}
c, err := clientv3.New(cfg)