k8s/apiextensions-apiserver/test/integration: block etcd client creation until connection is up

The new etcd balancer (>3.3.14, 3.4.0) uses an asynchronous resolver for
endpoints. Without "WithBlock", the client may return before the
connection is up.

Signed-off-by: Gyuho Lee <leegyuho@amazon.com>
This commit is contained in:
Gyuho Lee 2019-08-14 17:28:54 -07:00
parent b61433ef0f
commit a254d0e2a6
4 changed files with 24 additions and 6 deletions

View File

@ -21,9 +21,11 @@ import (
"reflect" "reflect"
"strings" "strings"
"testing" "testing"
"time"
"github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/pkg/transport" "github.com/coreos/etcd/pkg/transport"
"google.golang.org/grpc"
"sigs.k8s.io/yaml" "sigs.k8s.io/yaml"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
@ -147,8 +149,12 @@ func TestInvalidObjectMetaInStorage(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
etcdConfig := clientv3.Config{ etcdConfig := clientv3.Config{
Endpoints: restOptions.StorageConfig.Transport.ServerList, Endpoints: restOptions.StorageConfig.Transport.ServerList,
TLS: tlsConfig, DialTimeout: 20 * time.Second,
DialOptions: []grpc.DialOption{
grpc.WithBlock(), // block until the underlying connection is up
},
TLS: tlsConfig,
} }
etcdclient, err := clientv3.New(etcdConfig) etcdclient, err := clientv3.New(etcdConfig)
if err != nil { if err != nil {

View File

@ -21,9 +21,11 @@ import (
"reflect" "reflect"
"strings" "strings"
"testing" "testing"
"time"
"github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/pkg/transport" "github.com/coreos/etcd/pkg/transport"
"google.golang.org/grpc"
"sigs.k8s.io/yaml" "sigs.k8s.io/yaml"
@ -331,8 +333,12 @@ func TestPruningFromStorage(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
etcdConfig := clientv3.Config{ etcdConfig := clientv3.Config{
Endpoints: restOptions.StorageConfig.Transport.ServerList, Endpoints: restOptions.StorageConfig.Transport.ServerList,
TLS: tlsConfig, DialTimeout: 20 * time.Second,
DialOptions: []grpc.DialOption{
grpc.WithBlock(), // block until the underlying connection is up
},
TLS: tlsConfig,
} }
etcdclient, err := clientv3.New(etcdConfig) etcdclient, err := clientv3.New(etcdConfig)
if err != nil { if err != nil {

View File

@ -25,6 +25,7 @@ import (
"github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/pkg/transport" "github.com/coreos/etcd/pkg/transport"
"google.golang.org/grpc"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apiserver/pkg/registry/generic" "k8s.io/apiserver/pkg/registry/generic"
@ -112,8 +113,12 @@ func GetEtcdClients(config storagebackend.TransportConfig) (*clientv3.Client, cl
} }
cfg := clientv3.Config{ cfg := clientv3.Config{
Endpoints: config.ServerList, Endpoints: config.ServerList,
TLS: tlsConfig, DialTimeout: 20 * time.Second,
DialOptions: []grpc.DialOption{
grpc.WithBlock(), // block until the underlying connection is up
},
TLS: tlsConfig,
} }
c, err := clientv3.New(cfg) c, err := clientv3.New(cfg)

View File

@ -111,6 +111,7 @@ func newETCD3Client(c storagebackend.TransportConfig) (*clientv3.Client, error)
DialKeepAliveTime: keepaliveTime, DialKeepAliveTime: keepaliveTime,
DialKeepAliveTimeout: keepaliveTimeout, DialKeepAliveTimeout: keepaliveTimeout,
DialOptions: []grpc.DialOption{ DialOptions: []grpc.DialOption{
grpc.WithBlock(), // block until the underlying connection is up
grpc.WithUnaryInterceptor(grpcprom.UnaryClientInterceptor), grpc.WithUnaryInterceptor(grpcprom.UnaryClientInterceptor),
grpc.WithStreamInterceptor(grpcprom.StreamClientInterceptor), grpc.WithStreamInterceptor(grpcprom.StreamClientInterceptor),
}, },