diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index 8ee218f2287..39b27d7c1cb 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -347,263 +347,263 @@ }, { "ImportPath": "github.com/coreos/etcd/alarm", - "Comment": "v3.0.4", - "Rev": "d53923c636e0e4ab7f00cb75681b97a8f11f5a9d" + "Comment": "v3.0.6", + "Rev": "9efa00d1030d4bf62eb8e5ec130023aeb1b8e2d0" }, { "ImportPath": "github.com/coreos/etcd/auth", - "Comment": "v3.0.4", - "Rev": "d53923c636e0e4ab7f00cb75681b97a8f11f5a9d" + "Comment": "v3.0.6", + "Rev": "9efa00d1030d4bf62eb8e5ec130023aeb1b8e2d0" }, { "ImportPath": "github.com/coreos/etcd/auth/authpb", - "Comment": "v3.0.4", - "Rev": "d53923c636e0e4ab7f00cb75681b97a8f11f5a9d" + "Comment": "v3.0.6", + "Rev": "9efa00d1030d4bf62eb8e5ec130023aeb1b8e2d0" }, { "ImportPath": "github.com/coreos/etcd/client", - "Comment": "v3.0.4", - "Rev": "d53923c636e0e4ab7f00cb75681b97a8f11f5a9d" + "Comment": "v3.0.6", + "Rev": "9efa00d1030d4bf62eb8e5ec130023aeb1b8e2d0" }, { "ImportPath": "github.com/coreos/etcd/clientv3", - "Comment": "v3.0.4", - "Rev": "d53923c636e0e4ab7f00cb75681b97a8f11f5a9d" + "Comment": "v3.0.6", + "Rev": "9efa00d1030d4bf62eb8e5ec130023aeb1b8e2d0" }, { "ImportPath": "github.com/coreos/etcd/compactor", - "Comment": "v3.0.4", - "Rev": "d53923c636e0e4ab7f00cb75681b97a8f11f5a9d" + "Comment": "v3.0.6", + "Rev": "9efa00d1030d4bf62eb8e5ec130023aeb1b8e2d0" }, { "ImportPath": "github.com/coreos/etcd/discovery", - "Comment": "v3.0.4", - "Rev": "d53923c636e0e4ab7f00cb75681b97a8f11f5a9d" + "Comment": "v3.0.6", + "Rev": "9efa00d1030d4bf62eb8e5ec130023aeb1b8e2d0" }, { "ImportPath": "github.com/coreos/etcd/error", - "Comment": "v3.0.4", - "Rev": "d53923c636e0e4ab7f00cb75681b97a8f11f5a9d" + "Comment": "v3.0.6", + "Rev": "9efa00d1030d4bf62eb8e5ec130023aeb1b8e2d0" }, { "ImportPath": "github.com/coreos/etcd/etcdserver", - "Comment": "v3.0.4", - "Rev": "d53923c636e0e4ab7f00cb75681b97a8f11f5a9d" + "Comment": "v3.0.6", + "Rev": "9efa00d1030d4bf62eb8e5ec130023aeb1b8e2d0" }, { "ImportPath": "github.com/coreos/etcd/etcdserver/api", - "Comment": "v3.0.4", - "Rev": "d53923c636e0e4ab7f00cb75681b97a8f11f5a9d" + "Comment": "v3.0.6", + "Rev": "9efa00d1030d4bf62eb8e5ec130023aeb1b8e2d0" }, { "ImportPath": "github.com/coreos/etcd/etcdserver/api/v2http", - "Comment": "v3.0.4", - "Rev": "d53923c636e0e4ab7f00cb75681b97a8f11f5a9d" + "Comment": "v3.0.6", + "Rev": "9efa00d1030d4bf62eb8e5ec130023aeb1b8e2d0" }, { "ImportPath": "github.com/coreos/etcd/etcdserver/api/v2http/httptypes", - "Comment": "v3.0.4", - "Rev": "d53923c636e0e4ab7f00cb75681b97a8f11f5a9d" + "Comment": "v3.0.6", + "Rev": "9efa00d1030d4bf62eb8e5ec130023aeb1b8e2d0" }, { "ImportPath": "github.com/coreos/etcd/etcdserver/api/v3rpc", - "Comment": "v3.0.4", - "Rev": "d53923c636e0e4ab7f00cb75681b97a8f11f5a9d" + "Comment": "v3.0.6", + "Rev": "9efa00d1030d4bf62eb8e5ec130023aeb1b8e2d0" }, { "ImportPath": "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes", - "Comment": "v3.0.4", - "Rev": "d53923c636e0e4ab7f00cb75681b97a8f11f5a9d" + "Comment": "v3.0.6", + "Rev": "9efa00d1030d4bf62eb8e5ec130023aeb1b8e2d0" }, { "ImportPath": "github.com/coreos/etcd/etcdserver/auth", - "Comment": "v3.0.4", - "Rev": "d53923c636e0e4ab7f00cb75681b97a8f11f5a9d" + "Comment": "v3.0.6", + "Rev": "9efa00d1030d4bf62eb8e5ec130023aeb1b8e2d0" }, { "ImportPath": "github.com/coreos/etcd/etcdserver/etcdserverpb", - "Comment": "v3.0.4", - "Rev": "d53923c636e0e4ab7f00cb75681b97a8f11f5a9d" + "Comment": "v3.0.6", + "Rev": "9efa00d1030d4bf62eb8e5ec130023aeb1b8e2d0" }, { "ImportPath": "github.com/coreos/etcd/etcdserver/membership", - "Comment": "v3.0.4", - "Rev": "d53923c636e0e4ab7f00cb75681b97a8f11f5a9d" + "Comment": "v3.0.6", + "Rev": "9efa00d1030d4bf62eb8e5ec130023aeb1b8e2d0" }, { "ImportPath": "github.com/coreos/etcd/etcdserver/stats", - "Comment": "v3.0.4", - "Rev": "d53923c636e0e4ab7f00cb75681b97a8f11f5a9d" + "Comment": "v3.0.6", + "Rev": "9efa00d1030d4bf62eb8e5ec130023aeb1b8e2d0" }, { "ImportPath": "github.com/coreos/etcd/integration", - "Comment": "v3.0.4", - "Rev": "d53923c636e0e4ab7f00cb75681b97a8f11f5a9d" + "Comment": "v3.0.6", + "Rev": "9efa00d1030d4bf62eb8e5ec130023aeb1b8e2d0" }, { "ImportPath": "github.com/coreos/etcd/lease", - "Comment": "v3.0.4", - "Rev": "d53923c636e0e4ab7f00cb75681b97a8f11f5a9d" + "Comment": "v3.0.6", + "Rev": "9efa00d1030d4bf62eb8e5ec130023aeb1b8e2d0" }, { "ImportPath": "github.com/coreos/etcd/lease/leasehttp", - "Comment": "v3.0.4", - "Rev": "d53923c636e0e4ab7f00cb75681b97a8f11f5a9d" + "Comment": "v3.0.6", + "Rev": "9efa00d1030d4bf62eb8e5ec130023aeb1b8e2d0" }, { "ImportPath": "github.com/coreos/etcd/lease/leasepb", - "Comment": "v3.0.4", - "Rev": "d53923c636e0e4ab7f00cb75681b97a8f11f5a9d" + "Comment": "v3.0.6", + "Rev": "9efa00d1030d4bf62eb8e5ec130023aeb1b8e2d0" }, { "ImportPath": "github.com/coreos/etcd/mvcc", - "Comment": "v3.0.4", - "Rev": "d53923c636e0e4ab7f00cb75681b97a8f11f5a9d" + "Comment": "v3.0.6", + "Rev": "9efa00d1030d4bf62eb8e5ec130023aeb1b8e2d0" }, { "ImportPath": "github.com/coreos/etcd/mvcc/backend", - "Comment": "v3.0.4", - "Rev": "d53923c636e0e4ab7f00cb75681b97a8f11f5a9d" + "Comment": "v3.0.6", + "Rev": "9efa00d1030d4bf62eb8e5ec130023aeb1b8e2d0" }, { "ImportPath": "github.com/coreos/etcd/mvcc/mvccpb", - "Comment": "v3.0.4", - "Rev": "d53923c636e0e4ab7f00cb75681b97a8f11f5a9d" + "Comment": "v3.0.6", + "Rev": "9efa00d1030d4bf62eb8e5ec130023aeb1b8e2d0" }, { "ImportPath": "github.com/coreos/etcd/pkg/adt", - "Comment": "v3.0.4", - "Rev": "d53923c636e0e4ab7f00cb75681b97a8f11f5a9d" + "Comment": "v3.0.6", + "Rev": "9efa00d1030d4bf62eb8e5ec130023aeb1b8e2d0" }, { "ImportPath": "github.com/coreos/etcd/pkg/contention", - "Comment": "v3.0.4", - "Rev": "d53923c636e0e4ab7f00cb75681b97a8f11f5a9d" + "Comment": "v3.0.6", + "Rev": "9efa00d1030d4bf62eb8e5ec130023aeb1b8e2d0" }, { "ImportPath": "github.com/coreos/etcd/pkg/crc", - "Comment": "v3.0.4", - "Rev": "d53923c636e0e4ab7f00cb75681b97a8f11f5a9d" + "Comment": "v3.0.6", + "Rev": "9efa00d1030d4bf62eb8e5ec130023aeb1b8e2d0" }, { "ImportPath": "github.com/coreos/etcd/pkg/fileutil", - "Comment": "v3.0.4", - "Rev": "d53923c636e0e4ab7f00cb75681b97a8f11f5a9d" + "Comment": "v3.0.6", + "Rev": "9efa00d1030d4bf62eb8e5ec130023aeb1b8e2d0" }, { "ImportPath": "github.com/coreos/etcd/pkg/httputil", - "Comment": "v3.0.4", - "Rev": "d53923c636e0e4ab7f00cb75681b97a8f11f5a9d" + "Comment": "v3.0.6", + "Rev": "9efa00d1030d4bf62eb8e5ec130023aeb1b8e2d0" }, { "ImportPath": "github.com/coreos/etcd/pkg/idutil", - "Comment": "v3.0.4", - "Rev": "d53923c636e0e4ab7f00cb75681b97a8f11f5a9d" + "Comment": "v3.0.6", + "Rev": "9efa00d1030d4bf62eb8e5ec130023aeb1b8e2d0" }, { "ImportPath": "github.com/coreos/etcd/pkg/ioutil", - "Comment": "v3.0.4", - "Rev": "d53923c636e0e4ab7f00cb75681b97a8f11f5a9d" + "Comment": "v3.0.6", + "Rev": "9efa00d1030d4bf62eb8e5ec130023aeb1b8e2d0" }, { "ImportPath": "github.com/coreos/etcd/pkg/logutil", - "Comment": "v3.0.4", - "Rev": "d53923c636e0e4ab7f00cb75681b97a8f11f5a9d" + "Comment": "v3.0.6", + "Rev": "9efa00d1030d4bf62eb8e5ec130023aeb1b8e2d0" }, { "ImportPath": "github.com/coreos/etcd/pkg/netutil", - "Comment": "v3.0.4", - "Rev": "d53923c636e0e4ab7f00cb75681b97a8f11f5a9d" + "Comment": "v3.0.6", + "Rev": "9efa00d1030d4bf62eb8e5ec130023aeb1b8e2d0" }, { "ImportPath": "github.com/coreos/etcd/pkg/pathutil", - "Comment": "v3.0.4", - "Rev": "d53923c636e0e4ab7f00cb75681b97a8f11f5a9d" + "Comment": "v3.0.6", + "Rev": "9efa00d1030d4bf62eb8e5ec130023aeb1b8e2d0" }, { "ImportPath": "github.com/coreos/etcd/pkg/pbutil", - "Comment": "v3.0.4", - "Rev": "d53923c636e0e4ab7f00cb75681b97a8f11f5a9d" + "Comment": "v3.0.6", + "Rev": "9efa00d1030d4bf62eb8e5ec130023aeb1b8e2d0" }, { "ImportPath": "github.com/coreos/etcd/pkg/runtime", - "Comment": "v3.0.4", - "Rev": "d53923c636e0e4ab7f00cb75681b97a8f11f5a9d" + "Comment": "v3.0.6", + "Rev": "9efa00d1030d4bf62eb8e5ec130023aeb1b8e2d0" }, { "ImportPath": "github.com/coreos/etcd/pkg/schedule", - "Comment": "v3.0.4", - "Rev": "d53923c636e0e4ab7f00cb75681b97a8f11f5a9d" + "Comment": "v3.0.6", + "Rev": "9efa00d1030d4bf62eb8e5ec130023aeb1b8e2d0" }, { "ImportPath": "github.com/coreos/etcd/pkg/testutil", - "Comment": "v3.0.4", - "Rev": "d53923c636e0e4ab7f00cb75681b97a8f11f5a9d" + "Comment": "v3.0.6", + "Rev": "9efa00d1030d4bf62eb8e5ec130023aeb1b8e2d0" }, { "ImportPath": "github.com/coreos/etcd/pkg/tlsutil", - "Comment": "v3.0.4", - "Rev": "d53923c636e0e4ab7f00cb75681b97a8f11f5a9d" + "Comment": "v3.0.6", + "Rev": "9efa00d1030d4bf62eb8e5ec130023aeb1b8e2d0" }, { "ImportPath": "github.com/coreos/etcd/pkg/transport", - "Comment": "v3.0.4", - "Rev": "d53923c636e0e4ab7f00cb75681b97a8f11f5a9d" + "Comment": "v3.0.6", + "Rev": "9efa00d1030d4bf62eb8e5ec130023aeb1b8e2d0" }, { "ImportPath": "github.com/coreos/etcd/pkg/types", - "Comment": "v3.0.4", - "Rev": "d53923c636e0e4ab7f00cb75681b97a8f11f5a9d" + "Comment": "v3.0.6", + "Rev": "9efa00d1030d4bf62eb8e5ec130023aeb1b8e2d0" }, { "ImportPath": "github.com/coreos/etcd/pkg/wait", - "Comment": "v3.0.4", - "Rev": "d53923c636e0e4ab7f00cb75681b97a8f11f5a9d" + "Comment": "v3.0.6", + "Rev": "9efa00d1030d4bf62eb8e5ec130023aeb1b8e2d0" }, { "ImportPath": "github.com/coreos/etcd/raft", - "Comment": "v3.0.4", - "Rev": "d53923c636e0e4ab7f00cb75681b97a8f11f5a9d" + "Comment": "v3.0.6", + "Rev": "9efa00d1030d4bf62eb8e5ec130023aeb1b8e2d0" }, { "ImportPath": "github.com/coreos/etcd/raft/raftpb", - "Comment": "v3.0.4", - "Rev": "d53923c636e0e4ab7f00cb75681b97a8f11f5a9d" + "Comment": "v3.0.6", + "Rev": "9efa00d1030d4bf62eb8e5ec130023aeb1b8e2d0" }, { "ImportPath": "github.com/coreos/etcd/rafthttp", - "Comment": "v3.0.4", - "Rev": "d53923c636e0e4ab7f00cb75681b97a8f11f5a9d" + "Comment": "v3.0.6", + "Rev": "9efa00d1030d4bf62eb8e5ec130023aeb1b8e2d0" }, { "ImportPath": "github.com/coreos/etcd/snap", - "Comment": "v3.0.4", - "Rev": "d53923c636e0e4ab7f00cb75681b97a8f11f5a9d" + "Comment": "v3.0.6", + "Rev": "9efa00d1030d4bf62eb8e5ec130023aeb1b8e2d0" }, { "ImportPath": "github.com/coreos/etcd/snap/snappb", - "Comment": "v3.0.4", - "Rev": "d53923c636e0e4ab7f00cb75681b97a8f11f5a9d" + "Comment": "v3.0.6", + "Rev": "9efa00d1030d4bf62eb8e5ec130023aeb1b8e2d0" }, { "ImportPath": "github.com/coreos/etcd/store", - "Comment": "v3.0.4", - "Rev": "d53923c636e0e4ab7f00cb75681b97a8f11f5a9d" + "Comment": "v3.0.6", + "Rev": "9efa00d1030d4bf62eb8e5ec130023aeb1b8e2d0" }, { "ImportPath": "github.com/coreos/etcd/version", - "Comment": "v3.0.4", - "Rev": "d53923c636e0e4ab7f00cb75681b97a8f11f5a9d" + "Comment": "v3.0.6", + "Rev": "9efa00d1030d4bf62eb8e5ec130023aeb1b8e2d0" }, { "ImportPath": "github.com/coreos/etcd/wal", - "Comment": "v3.0.4", - "Rev": "d53923c636e0e4ab7f00cb75681b97a8f11f5a9d" + "Comment": "v3.0.6", + "Rev": "9efa00d1030d4bf62eb8e5ec130023aeb1b8e2d0" }, { "ImportPath": "github.com/coreos/etcd/wal/walpb", - "Comment": "v3.0.4", - "Rev": "d53923c636e0e4ab7f00cb75681b97a8f11f5a9d" + "Comment": "v3.0.6", + "Rev": "9efa00d1030d4bf62eb8e5ec130023aeb1b8e2d0" }, { "ImportPath": "github.com/coreos/go-oidc/http", diff --git a/vendor/github.com/coreos/etcd/clientv3/auth.go b/vendor/github.com/coreos/etcd/clientv3/auth.go index 1839042430b..8c3c047f4e7 100644 --- a/vendor/github.com/coreos/etcd/clientv3/auth.go +++ b/vendor/github.com/coreos/etcd/clientv3/auth.go @@ -115,32 +115,32 @@ func NewAuth(c *Client) Auth { } func (auth *auth) AuthEnable(ctx context.Context) (*AuthEnableResponse, error) { - resp, err := auth.remote.AuthEnable(ctx, &pb.AuthEnableRequest{}, grpc.FailFast(false)) + resp, err := auth.remote.AuthEnable(ctx, &pb.AuthEnableRequest{}) return (*AuthEnableResponse)(resp), toErr(ctx, err) } func (auth *auth) AuthDisable(ctx context.Context) (*AuthDisableResponse, error) { - resp, err := auth.remote.AuthDisable(ctx, &pb.AuthDisableRequest{}, grpc.FailFast(false)) + resp, err := auth.remote.AuthDisable(ctx, &pb.AuthDisableRequest{}) return (*AuthDisableResponse)(resp), toErr(ctx, err) } func (auth *auth) UserAdd(ctx context.Context, name string, password string) (*AuthUserAddResponse, error) { - resp, err := auth.remote.UserAdd(ctx, &pb.AuthUserAddRequest{Name: name, Password: password}, grpc.FailFast(false)) + resp, err := auth.remote.UserAdd(ctx, &pb.AuthUserAddRequest{Name: name, Password: password}) return (*AuthUserAddResponse)(resp), toErr(ctx, err) } func (auth *auth) UserDelete(ctx context.Context, name string) (*AuthUserDeleteResponse, error) { - resp, err := auth.remote.UserDelete(ctx, &pb.AuthUserDeleteRequest{Name: name}, grpc.FailFast(false)) + resp, err := auth.remote.UserDelete(ctx, &pb.AuthUserDeleteRequest{Name: name}) return (*AuthUserDeleteResponse)(resp), toErr(ctx, err) } func (auth *auth) UserChangePassword(ctx context.Context, name string, password string) (*AuthUserChangePasswordResponse, error) { - resp, err := auth.remote.UserChangePassword(ctx, &pb.AuthUserChangePasswordRequest{Name: name, Password: password}, grpc.FailFast(false)) + resp, err := auth.remote.UserChangePassword(ctx, &pb.AuthUserChangePasswordRequest{Name: name, Password: password}) return (*AuthUserChangePasswordResponse)(resp), toErr(ctx, err) } func (auth *auth) UserGrantRole(ctx context.Context, user string, role string) (*AuthUserGrantRoleResponse, error) { - resp, err := auth.remote.UserGrantRole(ctx, &pb.AuthUserGrantRoleRequest{User: user, Role: role}, grpc.FailFast(false)) + resp, err := auth.remote.UserGrantRole(ctx, &pb.AuthUserGrantRoleRequest{User: user, Role: role}) return (*AuthUserGrantRoleResponse)(resp), toErr(ctx, err) } @@ -155,12 +155,12 @@ func (auth *auth) UserList(ctx context.Context) (*AuthUserListResponse, error) { } func (auth *auth) UserRevokeRole(ctx context.Context, name string, role string) (*AuthUserRevokeRoleResponse, error) { - resp, err := auth.remote.UserRevokeRole(ctx, &pb.AuthUserRevokeRoleRequest{Name: name, Role: role}, grpc.FailFast(false)) + resp, err := auth.remote.UserRevokeRole(ctx, &pb.AuthUserRevokeRoleRequest{Name: name, Role: role}) return (*AuthUserRevokeRoleResponse)(resp), toErr(ctx, err) } func (auth *auth) RoleAdd(ctx context.Context, name string) (*AuthRoleAddResponse, error) { - resp, err := auth.remote.RoleAdd(ctx, &pb.AuthRoleAddRequest{Name: name}, grpc.FailFast(false)) + resp, err := auth.remote.RoleAdd(ctx, &pb.AuthRoleAddRequest{Name: name}) return (*AuthRoleAddResponse)(resp), toErr(ctx, err) } @@ -170,7 +170,7 @@ func (auth *auth) RoleGrantPermission(ctx context.Context, name string, key, ran RangeEnd: []byte(rangeEnd), PermType: authpb.Permission_Type(permType), } - resp, err := auth.remote.RoleGrantPermission(ctx, &pb.AuthRoleGrantPermissionRequest{Name: name, Perm: perm}, grpc.FailFast(false)) + resp, err := auth.remote.RoleGrantPermission(ctx, &pb.AuthRoleGrantPermissionRequest{Name: name, Perm: perm}) return (*AuthRoleGrantPermissionResponse)(resp), toErr(ctx, err) } @@ -185,12 +185,12 @@ func (auth *auth) RoleList(ctx context.Context) (*AuthRoleListResponse, error) { } func (auth *auth) RoleRevokePermission(ctx context.Context, role string, key, rangeEnd string) (*AuthRoleRevokePermissionResponse, error) { - resp, err := auth.remote.RoleRevokePermission(ctx, &pb.AuthRoleRevokePermissionRequest{Role: role, Key: key, RangeEnd: rangeEnd}, grpc.FailFast(false)) + resp, err := auth.remote.RoleRevokePermission(ctx, &pb.AuthRoleRevokePermissionRequest{Role: role, Key: key, RangeEnd: rangeEnd}) return (*AuthRoleRevokePermissionResponse)(resp), toErr(ctx, err) } func (auth *auth) RoleDelete(ctx context.Context, role string) (*AuthRoleDeleteResponse, error) { - resp, err := auth.remote.RoleDelete(ctx, &pb.AuthRoleDeleteRequest{Role: role}, grpc.FailFast(false)) + resp, err := auth.remote.RoleDelete(ctx, &pb.AuthRoleDeleteRequest{Role: role}) return (*AuthRoleDeleteResponse)(resp), toErr(ctx, err) } diff --git a/vendor/github.com/coreos/etcd/clientv3/balancer.go b/vendor/github.com/coreos/etcd/clientv3/balancer.go index 31871b8a4b6..b7fba6a2048 100644 --- a/vendor/github.com/coreos/etcd/clientv3/balancer.go +++ b/vendor/github.com/coreos/etcd/clientv3/balancer.go @@ -17,7 +17,7 @@ package clientv3 import ( "net/url" "strings" - "sync/atomic" + "sync" "golang.org/x/net/context" "google.golang.org/grpc" @@ -26,32 +26,115 @@ import ( // simpleBalancer does the bare minimum to expose multiple eps // to the grpc reconnection code path type simpleBalancer struct { - // eps are the client's endpoints stripped of any URL scheme - eps []string - ch chan []grpc.Address - numGets uint32 + // addrs are the client's endpoints for grpc + addrs []grpc.Address + // notifyCh notifies grpc of the set of addresses for connecting + notifyCh chan []grpc.Address + + // readyc closes once the first connection is up + readyc chan struct{} + readyOnce sync.Once + + // mu protects upEps, pinAddr, and connectingAddr + mu sync.RWMutex + // upEps holds the current endpoints that have an active connection + upEps map[string]struct{} + // upc closes when upEps transitions from empty to non-zero or the balancer closes. + upc chan struct{} + + // pinAddr is the currently pinned address; set to the empty string on + // intialization and shutdown. + pinAddr string } -func newSimpleBalancer(eps []string) grpc.Balancer { - ch := make(chan []grpc.Address, 1) +func newSimpleBalancer(eps []string) *simpleBalancer { + notifyCh := make(chan []grpc.Address, 1) addrs := make([]grpc.Address, len(eps)) for i := range eps { addrs[i].Addr = getHost(eps[i]) } - ch <- addrs - return &simpleBalancer{eps: eps, ch: ch} + notifyCh <- addrs + sb := &simpleBalancer{ + addrs: addrs, + notifyCh: notifyCh, + readyc: make(chan struct{}), + upEps: make(map[string]struct{}), + upc: make(chan struct{}), + } + return sb } -func (b *simpleBalancer) Start(target string) error { return nil } -func (b *simpleBalancer) Up(addr grpc.Address) func(error) { return func(error) {} } -func (b *simpleBalancer) Get(ctx context.Context, opts grpc.BalancerGetOptions) (grpc.Address, func(), error) { - v := atomic.AddUint32(&b.numGets, 1) - ep := b.eps[v%uint32(len(b.eps))] - return grpc.Address{Addr: getHost(ep)}, func() {}, nil +func (b *simpleBalancer) Start(target string) error { return nil } + +func (b *simpleBalancer) ConnectNotify() <-chan struct{} { + b.mu.Lock() + defer b.mu.Unlock() + return b.upc } -func (b *simpleBalancer) Notify() <-chan []grpc.Address { return b.ch } + +func (b *simpleBalancer) Up(addr grpc.Address) func(error) { + b.mu.Lock() + if len(b.upEps) == 0 { + // notify waiting Get()s and pin first connected address + close(b.upc) + b.pinAddr = addr.Addr + } + b.upEps[addr.Addr] = struct{}{} + b.mu.Unlock() + // notify client that a connection is up + b.readyOnce.Do(func() { close(b.readyc) }) + return func(err error) { + b.mu.Lock() + delete(b.upEps, addr.Addr) + if len(b.upEps) == 0 && b.pinAddr != "" { + b.upc = make(chan struct{}) + } else if b.pinAddr == addr.Addr { + // choose new random up endpoint + for k := range b.upEps { + b.pinAddr = k + break + } + } + b.mu.Unlock() + } +} + +func (b *simpleBalancer) Get(ctx context.Context, opts grpc.BalancerGetOptions) (grpc.Address, func(), error) { + var addr string + for { + b.mu.RLock() + ch := b.upc + b.mu.RUnlock() + select { + case <-ch: + case <-ctx.Done(): + return grpc.Address{Addr: ""}, nil, ctx.Err() + } + b.mu.RLock() + addr = b.pinAddr + upEps := len(b.upEps) + b.mu.RUnlock() + if addr == "" { + return grpc.Address{Addr: ""}, nil, grpc.ErrClientConnClosing + } + if upEps > 0 { + break + } + } + return grpc.Address{Addr: addr}, func() {}, nil +} + +func (b *simpleBalancer) Notify() <-chan []grpc.Address { return b.notifyCh } + func (b *simpleBalancer) Close() error { - close(b.ch) + b.mu.Lock() + close(b.notifyCh) + // terminate all waiting Get()s + b.pinAddr = "" + if len(b.upEps) == 0 { + close(b.upc) + } + b.mu.Unlock() return nil } diff --git a/vendor/github.com/coreos/etcd/clientv3/client.go b/vendor/github.com/coreos/etcd/clientv3/client.go index 66d5a3c0f96..e6903fc2f69 100644 --- a/vendor/github.com/coreos/etcd/clientv3/client.go +++ b/vendor/github.com/coreos/etcd/clientv3/client.go @@ -46,9 +46,11 @@ type Client struct { Auth Maintenance - conn *grpc.ClientConn - cfg Config - creds *credentials.TransportCredentials + conn *grpc.ClientConn + cfg Config + creds *credentials.TransportCredentials + balancer *simpleBalancer + retryWrapper retryRpcFunc ctx context.Context cancel context.CancelFunc @@ -138,11 +140,10 @@ func (c *Client) dialTarget(endpoint string) (proto string, host string, creds * return } -// dialSetupOpts gives the dial opts prioer to any authentication -func (c *Client) dialSetupOpts(endpoint string, dopts ...grpc.DialOption) []grpc.DialOption { - opts := []grpc.DialOption{ - grpc.WithBlock(), - grpc.WithTimeout(c.cfg.DialTimeout), +// dialSetupOpts gives the dial opts prior to any authentication +func (c *Client) dialSetupOpts(endpoint string, dopts ...grpc.DialOption) (opts []grpc.DialOption) { + if c.cfg.DialTimeout > 0 { + opts = []grpc.DialOption{grpc.WithTimeout(c.cfg.DialTimeout)} } opts = append(opts, dopts...) @@ -240,12 +241,30 @@ func newClient(cfg *Config) (*Client, error) { client.Password = cfg.Password } - b := newSimpleBalancer(cfg.Endpoints) - conn, err := client.dial(cfg.Endpoints[0], grpc.WithBalancer(b)) + client.balancer = newSimpleBalancer(cfg.Endpoints) + conn, err := client.dial(cfg.Endpoints[0], grpc.WithBalancer(client.balancer)) if err != nil { return nil, err } client.conn = conn + client.retryWrapper = client.newRetryWrapper() + + // wait for a connection + if cfg.DialTimeout > 0 { + hasConn := false + waitc := time.After(cfg.DialTimeout) + select { + case <-client.balancer.readyc: + hasConn = true + case <-ctx.Done(): + case <-waitc: + } + if !hasConn { + client.cancel() + conn.Close() + return nil, grpc.ErrClientConnTimeout + } + } client.Cluster = NewCluster(client) client.KV = NewKV(client) @@ -280,8 +299,12 @@ func isHaltErr(ctx context.Context, err error) bool { return eErr != rpctypes.ErrStopped && eErr != rpctypes.ErrNoLeader } // treat etcdserver errors not recognized by the client as halting - return strings.Contains(err.Error(), grpc.ErrClientConnClosing.Error()) || - strings.Contains(err.Error(), "etcdserver:") + return isConnClosing(err) || strings.Contains(err.Error(), "etcdserver:") +} + +// isConnClosing returns true if the error matches a grpc client closing error +func isConnClosing(err error) bool { + return strings.Contains(err.Error(), grpc.ErrClientConnClosing.Error()) } func toErr(ctx context.Context, err error) error { @@ -289,9 +312,12 @@ func toErr(ctx context.Context, err error) error { return nil } err = rpctypes.Error(err) - if ctx.Err() != nil && strings.Contains(err.Error(), "context") { + switch { + case ctx.Err() != nil && strings.Contains(err.Error(), "context"): err = ctx.Err() - } else if strings.Contains(err.Error(), grpc.ErrClientConnClosing.Error()) { + case strings.Contains(err.Error(), ErrNoAvailableEndpoints.Error()): + err = ErrNoAvailableEndpoints + case strings.Contains(err.Error(), grpc.ErrClientConnClosing.Error()): err = grpc.ErrClientConnClosing } return err diff --git a/vendor/github.com/coreos/etcd/clientv3/cluster.go b/vendor/github.com/coreos/etcd/clientv3/cluster.go index ee0244e4ee8..8b981171b5a 100644 --- a/vendor/github.com/coreos/etcd/clientv3/cluster.go +++ b/vendor/github.com/coreos/etcd/clientv3/cluster.go @@ -47,12 +47,12 @@ type cluster struct { } func NewCluster(c *Client) Cluster { - return &cluster{remote: pb.NewClusterClient(c.conn)} + return &cluster{remote: RetryClusterClient(c)} } func (c *cluster) MemberAdd(ctx context.Context, peerAddrs []string) (*MemberAddResponse, error) { r := &pb.MemberAddRequest{PeerURLs: peerAddrs} - resp, err := c.remote.MemberAdd(ctx, r, grpc.FailFast(false)) + resp, err := c.remote.MemberAdd(ctx, r) if err == nil { return (*MemberAddResponse)(resp), nil } @@ -64,7 +64,7 @@ func (c *cluster) MemberAdd(ctx context.Context, peerAddrs []string) (*MemberAdd func (c *cluster) MemberRemove(ctx context.Context, id uint64) (*MemberRemoveResponse, error) { r := &pb.MemberRemoveRequest{ID: id} - resp, err := c.remote.MemberRemove(ctx, r, grpc.FailFast(false)) + resp, err := c.remote.MemberRemove(ctx, r) if err == nil { return (*MemberRemoveResponse)(resp), nil } @@ -78,7 +78,7 @@ func (c *cluster) MemberUpdate(ctx context.Context, id uint64, peerAddrs []strin // it is safe to retry on update. for { r := &pb.MemberUpdateRequest{ID: id, PeerURLs: peerAddrs} - resp, err := c.remote.MemberUpdate(ctx, r, grpc.FailFast(false)) + resp, err := c.remote.MemberUpdate(ctx, r) if err == nil { return (*MemberUpdateResponse)(resp), nil } diff --git a/vendor/github.com/coreos/etcd/clientv3/kv.go b/vendor/github.com/coreos/etcd/clientv3/kv.go index ff68f73f4a6..27f9110f803 100644 --- a/vendor/github.com/coreos/etcd/clientv3/kv.go +++ b/vendor/github.com/coreos/etcd/clientv3/kv.go @@ -82,7 +82,7 @@ type kv struct { } func NewKV(c *Client) KV { - return &kv{remote: pb.NewKVClient(c.conn)} + return &kv{remote: RetryKVClient(c)} } func (kv *kv) Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error) { @@ -158,14 +158,14 @@ func (kv *kv) do(ctx context.Context, op Op) (OpResponse, error) { case tPut: var resp *pb.PutResponse r := &pb.PutRequest{Key: op.key, Value: op.val, Lease: int64(op.leaseID)} - resp, err = kv.remote.Put(ctx, r, grpc.FailFast(false)) + resp, err = kv.remote.Put(ctx, r) if err == nil { return OpResponse{put: (*PutResponse)(resp)}, nil } case tDeleteRange: var resp *pb.DeleteRangeResponse r := &pb.DeleteRangeRequest{Key: op.key, RangeEnd: op.end} - resp, err = kv.remote.DeleteRange(ctx, r, grpc.FailFast(false)) + resp, err = kv.remote.DeleteRange(ctx, r) if err == nil { return OpResponse{del: (*DeleteResponse)(resp)}, nil } diff --git a/vendor/github.com/coreos/etcd/clientv3/lease.go b/vendor/github.com/coreos/etcd/clientv3/lease.go index 18bfc3c8703..bf8919c349a 100644 --- a/vendor/github.com/coreos/etcd/clientv3/lease.go +++ b/vendor/github.com/coreos/etcd/clientv3/lease.go @@ -110,7 +110,7 @@ func NewLease(c *Client) Lease { l := &lessor{ donec: make(chan struct{}), keepAlives: make(map[LeaseID]*keepAlive), - remote: pb.NewLeaseClient(c.conn), + remote: RetryLeaseClient(c), firstKeepAliveTimeout: c.cfg.DialTimeout + time.Second, } if l.firstKeepAliveTimeout == time.Second { @@ -130,7 +130,7 @@ func (l *lessor) Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, err for { r := &pb.LeaseGrantRequest{TTL: ttl} - resp, err := l.remote.LeaseGrant(cctx, r, grpc.FailFast(false)) + resp, err := l.remote.LeaseGrant(cctx, r) if err == nil { gresp := &LeaseGrantResponse{ ResponseHeader: resp.GetHeader(), @@ -156,7 +156,7 @@ func (l *lessor) Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse, for { r := &pb.LeaseRevokeRequest{ID: int64(id)} - resp, err := l.remote.LeaseRevoke(cctx, r, grpc.FailFast(false)) + resp, err := l.remote.LeaseRevoke(cctx, r) if err == nil { return (*LeaseRevokeResponse)(resp), nil diff --git a/vendor/github.com/coreos/etcd/clientv3/retry.go b/vendor/github.com/coreos/etcd/clientv3/retry.go new file mode 100644 index 00000000000..3029ed8ea93 --- /dev/null +++ b/vendor/github.com/coreos/etcd/clientv3/retry.go @@ -0,0 +1,243 @@ +// Copyright 2016 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package clientv3 + +import ( + pb "github.com/coreos/etcd/etcdserver/etcdserverpb" + "golang.org/x/net/context" + "google.golang.org/grpc" +) + +type rpcFunc func(ctx context.Context) error +type retryRpcFunc func(context.Context, rpcFunc) + +func (c *Client) newRetryWrapper() retryRpcFunc { + return func(rpcCtx context.Context, f rpcFunc) { + for { + err := f(rpcCtx) + // ignore grpc conn closing on fail-fast calls; they are transient errors + if err == nil || !isConnClosing(err) { + return + } + select { + case <-c.balancer.ConnectNotify(): + case <-rpcCtx.Done(): + case <-c.ctx.Done(): + return + } + } + } +} + +type retryKVClient struct { + pb.KVClient + retryf retryRpcFunc +} + +// RetryKVClient implements a KVClient that uses the client's FailFast retry policy. +func RetryKVClient(c *Client) pb.KVClient { + return &retryKVClient{pb.NewKVClient(c.conn), c.retryWrapper} +} + +func (rkv *retryKVClient) Put(ctx context.Context, in *pb.PutRequest, opts ...grpc.CallOption) (resp *pb.PutResponse, err error) { + rkv.retryf(ctx, func(rctx context.Context) error { + resp, err = rkv.KVClient.Put(rctx, in, opts...) + return err + }) + return resp, err +} + +func (rkv *retryKVClient) DeleteRange(ctx context.Context, in *pb.DeleteRangeRequest, opts ...grpc.CallOption) (resp *pb.DeleteRangeResponse, err error) { + rkv.retryf(ctx, func(rctx context.Context) error { + resp, err = rkv.KVClient.DeleteRange(rctx, in, opts...) + return err + }) + return resp, err +} + +func (rkv *retryKVClient) Txn(ctx context.Context, in *pb.TxnRequest, opts ...grpc.CallOption) (resp *pb.TxnResponse, err error) { + rkv.retryf(ctx, func(rctx context.Context) error { + resp, err = rkv.KVClient.Txn(rctx, in, opts...) + return err + }) + return resp, err +} + +func (rkv *retryKVClient) Compact(ctx context.Context, in *pb.CompactionRequest, opts ...grpc.CallOption) (resp *pb.CompactionResponse, err error) { + rkv.retryf(ctx, func(rctx context.Context) error { + resp, err = rkv.KVClient.Compact(rctx, in, opts...) + return err + }) + return resp, err +} + +type retryLeaseClient struct { + pb.LeaseClient + retryf retryRpcFunc +} + +// RetryLeaseClient implements a LeaseClient that uses the client's FailFast retry policy. +func RetryLeaseClient(c *Client) pb.LeaseClient { + return &retryLeaseClient{pb.NewLeaseClient(c.conn), c.retryWrapper} +} + +func (rlc *retryLeaseClient) LeaseGrant(ctx context.Context, in *pb.LeaseGrantRequest, opts ...grpc.CallOption) (resp *pb.LeaseGrantResponse, err error) { + rlc.retryf(ctx, func(rctx context.Context) error { + resp, err = rlc.LeaseClient.LeaseGrant(rctx, in, opts...) + return err + }) + return resp, err + +} + +func (rlc *retryLeaseClient) LeaseRevoke(ctx context.Context, in *pb.LeaseRevokeRequest, opts ...grpc.CallOption) (resp *pb.LeaseRevokeResponse, err error) { + rlc.retryf(ctx, func(rctx context.Context) error { + resp, err = rlc.LeaseClient.LeaseRevoke(rctx, in, opts...) + return err + }) + return resp, err +} + +type retryClusterClient struct { + pb.ClusterClient + retryf retryRpcFunc +} + +// RetryClusterClient implements a ClusterClient that uses the client's FailFast retry policy. +func RetryClusterClient(c *Client) pb.ClusterClient { + return &retryClusterClient{pb.NewClusterClient(c.conn), c.retryWrapper} +} + +func (rcc *retryClusterClient) MemberAdd(ctx context.Context, in *pb.MemberAddRequest, opts ...grpc.CallOption) (resp *pb.MemberAddResponse, err error) { + rcc.retryf(ctx, func(rctx context.Context) error { + resp, err = rcc.ClusterClient.MemberAdd(rctx, in, opts...) + return err + }) + return resp, err +} + +func (rcc *retryClusterClient) MemberRemove(ctx context.Context, in *pb.MemberRemoveRequest, opts ...grpc.CallOption) (resp *pb.MemberRemoveResponse, err error) { + rcc.retryf(ctx, func(rctx context.Context) error { + resp, err = rcc.ClusterClient.MemberRemove(rctx, in, opts...) + return err + }) + return resp, err +} + +func (rcc *retryClusterClient) MemberUpdate(ctx context.Context, in *pb.MemberUpdateRequest, opts ...grpc.CallOption) (resp *pb.MemberUpdateResponse, err error) { + rcc.retryf(ctx, func(rctx context.Context) error { + resp, err = rcc.ClusterClient.MemberUpdate(rctx, in, opts...) + return err + }) + return resp, err +} + +type retryAuthClient struct { + pb.AuthClient + retryf retryRpcFunc +} + +// RetryAuthClient implements a AuthClient that uses the client's FailFast retry policy. +func RetryAuthClient(c *Client) pb.AuthClient { + return &retryAuthClient{pb.NewAuthClient(c.conn), c.retryWrapper} +} + +func (rac *retryAuthClient) AuthEnable(ctx context.Context, in *pb.AuthEnableRequest, opts ...grpc.CallOption) (resp *pb.AuthEnableResponse, err error) { + rac.retryf(ctx, func(rctx context.Context) error { + resp, err = rac.AuthClient.AuthEnable(rctx, in, opts...) + return err + }) + return resp, err +} + +func (rac *retryAuthClient) AuthDisable(ctx context.Context, in *pb.AuthDisableRequest, opts ...grpc.CallOption) (resp *pb.AuthDisableResponse, err error) { + rac.retryf(ctx, func(rctx context.Context) error { + resp, err = rac.AuthClient.AuthDisable(rctx, in, opts...) + return err + }) + return resp, err +} + +func (rac *retryAuthClient) UserAdd(ctx context.Context, in *pb.AuthUserAddRequest, opts ...grpc.CallOption) (resp *pb.AuthUserAddResponse, err error) { + rac.retryf(ctx, func(rctx context.Context) error { + resp, err = rac.AuthClient.UserAdd(rctx, in, opts...) + return err + }) + return resp, err +} + +func (rac *retryAuthClient) UserDelete(ctx context.Context, in *pb.AuthUserDeleteRequest, opts ...grpc.CallOption) (resp *pb.AuthUserDeleteResponse, err error) { + rac.retryf(ctx, func(rctx context.Context) error { + resp, err = rac.AuthClient.UserDelete(rctx, in, opts...) + return err + }) + return resp, err +} + +func (rac *retryAuthClient) UserChangePassword(ctx context.Context, in *pb.AuthUserChangePasswordRequest, opts ...grpc.CallOption) (resp *pb.AuthUserChangePasswordResponse, err error) { + rac.retryf(ctx, func(rctx context.Context) error { + resp, err = rac.AuthClient.UserChangePassword(rctx, in, opts...) + return err + }) + return resp, err +} + +func (rac *retryAuthClient) UserGrantRole(ctx context.Context, in *pb.AuthUserGrantRoleRequest, opts ...grpc.CallOption) (resp *pb.AuthUserGrantRoleResponse, err error) { + rac.retryf(ctx, func(rctx context.Context) error { + resp, err = rac.AuthClient.UserGrantRole(rctx, in, opts...) + return err + }) + return resp, err +} + +func (rac *retryAuthClient) UserRevokeRole(ctx context.Context, in *pb.AuthUserRevokeRoleRequest, opts ...grpc.CallOption) (resp *pb.AuthUserRevokeRoleResponse, err error) { + rac.retryf(ctx, func(rctx context.Context) error { + resp, err = rac.AuthClient.UserRevokeRole(rctx, in, opts...) + return err + }) + return resp, err +} + +func (rac *retryAuthClient) RoleAdd(ctx context.Context, in *pb.AuthRoleAddRequest, opts ...grpc.CallOption) (resp *pb.AuthRoleAddResponse, err error) { + rac.retryf(ctx, func(rctx context.Context) error { + resp, err = rac.AuthClient.RoleAdd(rctx, in, opts...) + return err + }) + return resp, err +} + +func (rac *retryAuthClient) RoleDelete(ctx context.Context, in *pb.AuthRoleDeleteRequest, opts ...grpc.CallOption) (resp *pb.AuthRoleDeleteResponse, err error) { + rac.retryf(ctx, func(rctx context.Context) error { + resp, err = rac.AuthClient.RoleDelete(rctx, in, opts...) + return err + }) + return resp, err +} + +func (rac *retryAuthClient) RoleGrantPermission(ctx context.Context, in *pb.AuthRoleGrantPermissionRequest, opts ...grpc.CallOption) (resp *pb.AuthRoleGrantPermissionResponse, err error) { + rac.retryf(ctx, func(rctx context.Context) error { + resp, err = rac.AuthClient.RoleGrantPermission(rctx, in, opts...) + return err + }) + return resp, err +} + +func (rac *retryAuthClient) RoleRevokePermission(ctx context.Context, in *pb.AuthRoleRevokePermissionRequest, opts ...grpc.CallOption) (resp *pb.AuthRoleRevokePermissionResponse, err error) { + rac.retryf(ctx, func(rctx context.Context) error { + resp, err = rac.AuthClient.RoleRevokePermission(rctx, in, opts...) + return err + }) + return resp, err +} diff --git a/vendor/github.com/coreos/etcd/clientv3/txn.go b/vendor/github.com/coreos/etcd/clientv3/txn.go index a76bd174c2e..a451e33acdd 100644 --- a/vendor/github.com/coreos/etcd/clientv3/txn.go +++ b/vendor/github.com/coreos/etcd/clientv3/txn.go @@ -19,7 +19,6 @@ import ( pb "github.com/coreos/etcd/etcdserver/etcdserverpb" "golang.org/x/net/context" - "google.golang.org/grpc" ) // Txn is the interface that wraps mini-transactions. @@ -153,7 +152,7 @@ func (txn *txn) Commit() (*TxnResponse, error) { func (txn *txn) commit() (*TxnResponse, error) { r := &pb.TxnRequest{Compare: txn.cmps, Success: txn.sus, Failure: txn.fas} - resp, err := txn.kv.remote.Txn(txn.ctx, r, grpc.FailFast(false)) + resp, err := txn.kv.remote.Txn(txn.ctx, r) if err != nil { return nil, err } diff --git a/vendor/github.com/coreos/etcd/clientv3/watch.go b/vendor/github.com/coreos/etcd/clientv3/watch.go index aa2e6c351b0..afcc3b1afa2 100644 --- a/vendor/github.com/coreos/etcd/clientv3/watch.go +++ b/vendor/github.com/coreos/etcd/clientv3/watch.go @@ -242,6 +242,7 @@ func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) Watch case reqc <- wr: ok = true case <-wr.ctx.Done(): + wgs.stopIfEmpty() case <-donec: if wgs.closeErr != nil { closeCh <- WatchResponse{closeErr: wgs.closeErr} @@ -285,7 +286,12 @@ func (w *watcher) Close() (err error) { } func (w *watchGrpcStream) Close() (err error) { - close(w.stopc) + w.mu.Lock() + if w.stopc != nil { + close(w.stopc) + w.stopc = nil + } + w.mu.Unlock() <-w.donec select { case err = <-w.errc: @@ -348,11 +354,13 @@ func (w *watchGrpcStream) addStream(resp *pb.WatchResponse, pendingReq *watchReq // closeStream closes the watcher resources and removes it func (w *watchGrpcStream) closeStream(ws *watcherStream) { + w.mu.Lock() // cancels request stream; subscriber receives nil channel close(ws.initReq.retc) // close subscriber's channel close(ws.outc) delete(w.streams, ws.id) + w.mu.Unlock() } // run is the root of the goroutines for managing a watcher client @@ -371,6 +379,14 @@ func (w *watchGrpcStream) run() { w.cancel() }() + // already stopped? + w.mu.RLock() + stopc := w.stopc + w.mu.RUnlock() + if stopc == nil { + return + } + // start a stream with the etcd grpc server if wc, closeErr = w.newWatchClient(); closeErr != nil { return @@ -446,7 +462,7 @@ func (w *watchGrpcStream) run() { failedReq = pendingReq } cancelSet = make(map[int64]struct{}) - case <-w.stopc: + case <-stopc: return } @@ -586,12 +602,20 @@ func (w *watchGrpcStream) serveStream(ws *watcherStream) { } } - w.mu.Lock() w.closeStream(ws) - w.mu.Unlock() + w.stopIfEmpty() // lazily send cancel message if events on missing id } +func (wgs *watchGrpcStream) stopIfEmpty() { + wgs.mu.Lock() + if len(wgs.streams) == 0 && wgs.stopc != nil { + close(wgs.stopc) + wgs.stopc = nil + } + wgs.mu.Unlock() +} + func (w *watchGrpcStream) newWatchClient() (pb.Watch_WatchClient, error) { ws, rerr := w.resume() if rerr != nil { @@ -616,13 +640,14 @@ func (w *watchGrpcStream) resume() (ws pb.Watch_WatchClient, err error) { // openWatchClient retries opening a watchclient until retryConnection fails func (w *watchGrpcStream) openWatchClient() (ws pb.Watch_WatchClient, err error) { for { - select { - case <-w.stopc: + w.mu.Lock() + stopc := w.stopc + w.mu.Unlock() + if stopc == nil { if err == nil { err = context.Canceled } return nil, err - default: } if ws, err = w.remote.Watch(w.ctx, grpc.FailFast(false)); ws != nil && err == nil { break diff --git a/vendor/github.com/coreos/etcd/etcdserver/api/v2http/client_auth.go b/vendor/github.com/coreos/etcd/etcdserver/api/v2http/client_auth.go index 2b3278528f1..606e2e00b3e 100644 --- a/vendor/github.com/coreos/etcd/etcdserver/api/v2http/client_auth.go +++ b/vendor/github.com/coreos/etcd/etcdserver/api/v2http/client_auth.go @@ -116,10 +116,11 @@ func hasKeyPrefixAccess(sec auth.Store, r *http.Request, key string, recursive, } var user *auth.User - if r.Header.Get("Authorization") == "" && clientCertAuthEnabled { - user = userFromClientCertificate(sec, r) + if r.Header.Get("Authorization") == "" { + if clientCertAuthEnabled { + user = userFromClientCertificate(sec, r) + } if user == nil { - plog.Warningf("auth: no authorization provided, checking guest access") return hasGuestAccess(sec, r, key) } } else { diff --git a/vendor/github.com/coreos/etcd/etcdserver/server.go b/vendor/github.com/coreos/etcd/etcdserver/server.go index e4943aeecf4..afbbfc656a9 100644 --- a/vendor/github.com/coreos/etcd/etcdserver/server.go +++ b/vendor/github.com/coreos/etcd/etcdserver/server.go @@ -405,6 +405,10 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) { srv.be = be srv.lessor = lease.NewLessor(srv.be) + + // always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases. + // If we recover mvcc.KV first, it will attach the keys to the wrong lessor before it recovers. + srv.lessor = lease.NewLessor(srv.be) srv.kv = mvcc.New(srv.be, srv.lessor, &srv.consistIndex) if beExist { kvindex := srv.kv.ConsistentIndex() @@ -413,6 +417,7 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) { } } srv.consistIndex.setConsistentIndex(srv.kv.ConsistentIndex()) + srv.authStore = auth.NewAuthStore(srv.be) if h := cfg.AutoCompactionRetention; h != 0 { srv.compactor = compactor.NewPeriodic(h, srv.kv, srv) @@ -658,6 +663,14 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) { newbe := backend.NewDefaultBackend(fn) + // always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases. + // If we recover mvcc.KV first, it will attach the keys to the wrong lessor before it recovers. + if s.lessor != nil { + plog.Info("recovering lessor...") + s.lessor.Recover(newbe, s.kv) + plog.Info("finished recovering lessor") + } + plog.Info("restoring mvcc store...") if err := s.kv.Restore(newbe); err != nil { @@ -684,12 +697,6 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) { s.be = newbe s.bemu.Unlock() - if s.lessor != nil { - plog.Info("recovering lessor...") - s.lessor.Recover(newbe, s.kv) - plog.Info("finished recovering lessor") - } - plog.Info("recovering alarms...") if err := s.restoreAlarms(); err != nil { plog.Panicf("restore alarms error: %v", err) diff --git a/vendor/github.com/coreos/etcd/integration/cluster.go b/vendor/github.com/coreos/etcd/integration/cluster.go index 4b34bf494f5..b63c1fc66af 100644 --- a/vendor/github.com/coreos/etcd/integration/cluster.go +++ b/vendor/github.com/coreos/etcd/integration/cluster.go @@ -752,6 +752,7 @@ func NewClusterV3(t *testing.T, cfg *ClusterConfig) *ClusterV3 { clus := &ClusterV3{ cluster: NewClusterByConfig(t, cfg), } + clus.Launch(t) for _, m := range clus.Members { client, err := NewClientV3(m) if err != nil { @@ -759,7 +760,6 @@ func NewClusterV3(t *testing.T, cfg *ClusterConfig) *ClusterV3 { } clus.clients = append(clus.clients, client) } - clus.Launch(t) return clus } diff --git a/vendor/github.com/coreos/etcd/lease/lessor.go b/vendor/github.com/coreos/etcd/lease/lessor.go index 13c3f1b128e..48147b2c12a 100644 --- a/vendor/github.com/coreos/etcd/lease/lessor.go +++ b/vendor/github.com/coreos/etcd/lease/lessor.go @@ -45,13 +45,18 @@ var ( type LeaseID int64 -// RangeDeleter defines an interface with DeleteRange method. +// RangeDeleter defines an interface with Txn and DeleteRange method. // We define this interface only for lessor to limit the number // of methods of mvcc.KV to what lessor actually needs. // // Having a minimum interface makes testing easy. type RangeDeleter interface { - DeleteRange(key, end []byte) (int64, int64) + // TxnBegin see comments on mvcc.KV + TxnBegin() int64 + // TxnEnd see comments on mvcc.KV + TxnEnd(txnID int64) error + // TxnDeleteRange see comments on mvcc.KV + TxnDeleteRange(txnID int64, key, end []byte) (n, rev int64, err error) } // Lessor owns leases. It can grant, revoke, renew and modify leases for lessee. @@ -211,16 +216,30 @@ func (le *lessor) Revoke(id LeaseID) error { // unlock before doing external work le.mu.Unlock() - if le.rd != nil { - for item := range l.itemSet { - le.rd.DeleteRange([]byte(item.Key), nil) + if le.rd == nil { + return nil + } + + tid := le.rd.TxnBegin() + for item := range l.itemSet { + _, _, err := le.rd.TxnDeleteRange(tid, []byte(item.Key), nil) + if err != nil { + panic(err) } } le.mu.Lock() defer le.mu.Unlock() delete(le.leaseMap, l.ID) - l.removeFrom(le.b) + // lease deletion needs to be in the same backend transaction with the + // kv deletion. Or we might end up with not executing the revoke or not + // deleting the keys if etcdserver fails in between. + le.b.BatchTx().UnsafeDelete(leaseBucketName, int64ToBytes(int64(l.ID))) + + err := le.rd.TxnEnd(tid) + if err != nil { + panic(err) + } return nil } @@ -443,16 +462,7 @@ func (l Lease) persistTo(b backend.Backend) { b.BatchTx().Unlock() } -func (l Lease) removeFrom(b backend.Backend) { - key := int64ToBytes(int64(l.ID)) - - b.BatchTx().Lock() - b.BatchTx().UnsafeDelete(leaseBucketName, key) - b.BatchTx().Unlock() -} - -// refresh refreshes the expiry of the lease. It extends the expiry at least -// minLeaseTTL second. +// refresh refreshes the expiry of the lease. func (l *Lease) refresh(extend time.Duration) { if l.TTL < minLeaseTTL { l.TTL = minLeaseTTL diff --git a/vendor/github.com/coreos/etcd/mvcc/kvstore.go b/vendor/github.com/coreos/etcd/mvcc/kvstore.go index d1c71fd7c1a..adc1fb7661e 100644 --- a/vendor/github.com/coreos/etcd/mvcc/kvstore.go +++ b/vendor/github.com/coreos/etcd/mvcc/kvstore.go @@ -367,6 +367,8 @@ func (s *store) restore() error { revToBytes(revision{main: 1}, min) revToBytes(revision{main: math.MaxInt64, sub: math.MaxInt64}, max) + keyToLease := make(map[string]lease.LeaseID) + // restore index tx := s.b.BatchTx() tx.Lock() @@ -390,26 +392,15 @@ func (s *store) restore() error { switch { case isTombstone(key): s.kvindex.Tombstone(kv.Key, rev) - if lease.LeaseID(kv.Lease) != lease.NoLease { - err := s.le.Detach(lease.LeaseID(kv.Lease), []lease.LeaseItem{{Key: string(kv.Key)}}) - if err != nil && err != lease.ErrLeaseNotFound { - plog.Fatalf("unexpected Detach error %v", err) - } - } + delete(keyToLease, string(kv.Key)) + default: s.kvindex.Restore(kv.Key, revision{kv.CreateRevision, 0}, rev, kv.Version) - if lease.LeaseID(kv.Lease) != lease.NoLease { - if s.le == nil { - panic("no lessor to attach lease") - } - err := s.le.Attach(lease.LeaseID(kv.Lease), []lease.LeaseItem{{Key: string(kv.Key)}}) - // We are walking through the kv history here. It is possible that we attached a key to - // the lease and the lease was revoked later. - // Thus attaching an old version of key to a none existing lease is possible here, and - // we should just ignore the error. - if err != nil && err != lease.ErrLeaseNotFound { - panic("unexpected Attach error") - } + + if lid := lease.LeaseID(kv.Lease); lid != lease.NoLease { + keyToLease[string(kv.Key)] = lid + } else { + delete(keyToLease, string(kv.Key)) } } @@ -417,6 +408,16 @@ func (s *store) restore() error { s.currentRev = rev } + for key, lid := range keyToLease { + if s.le == nil { + panic("no lessor to attach lease") + } + err := s.le.Attach(lid, []lease.LeaseItem{{Key: key}}) + if err != nil { + plog.Errorf("unexpected Attach error: %v", err) + } + } + _, scheduledCompactBytes := tx.UnsafeRange(metaBucketName, scheduledCompactKeyName, nil, 0) scheduledCompact := int64(0) if len(scheduledCompactBytes) != 0 { @@ -497,7 +498,7 @@ func (s *store) rangeKeys(key, end []byte, limit, rangeRev int64, countOnly bool break } } - return kvs, len(kvs), curRev, nil + return kvs, len(revpairs), curRev, nil } func (s *store) put(key, value []byte, leaseID lease.LeaseID) { @@ -550,7 +551,7 @@ func (s *store) put(key, value []byte, leaseID lease.LeaseID) { err = s.le.Detach(oldLease, []lease.LeaseItem{{Key: string(key)}}) if err != nil { - panic("unexpected error from lease detach") + plog.Errorf("unexpected error from lease detach: %v", err) } } @@ -619,7 +620,7 @@ func (s *store) delete(key []byte, rev revision) { if lease.LeaseID(kv.Lease) != lease.NoLease { err = s.le.Detach(lease.LeaseID(kv.Lease), []lease.LeaseItem{{Key: string(kv.Key)}}) if err != nil { - plog.Fatalf("cannot detach %v", err) + plog.Errorf("cannot detach %v", err) } } } diff --git a/vendor/github.com/coreos/etcd/pkg/transport/listener.go b/vendor/github.com/coreos/etcd/pkg/transport/listener.go index d94757b2c52..46fe12009db 100644 --- a/vendor/github.com/coreos/etcd/pkg/transport/listener.go +++ b/vendor/github.com/coreos/etcd/pkg/transport/listener.go @@ -64,6 +64,9 @@ type TLSInfo struct { TrustedCAFile string ClientCertAuth bool + // ServerName ensures the cert matches the given host in case of discovery / virtual hosting + ServerName string + selfCert bool // parseFunc exists to simplify testing. Typically, parseFunc @@ -164,6 +167,7 @@ func (info TLSInfo) baseConfig() (*tls.Config, error) { cfg := &tls.Config{ Certificates: []tls.Certificate{*tlsCert}, MinVersion: tls.VersionTLS12, + ServerName: info.ServerName, } return cfg, nil } @@ -215,7 +219,7 @@ func (info TLSInfo) ClientConfig() (*tls.Config, error) { return nil, err } } else { - cfg = &tls.Config{} + cfg = &tls.Config{ServerName: info.ServerName} } CAFiles := info.cafiles() @@ -224,6 +228,8 @@ func (info TLSInfo) ClientConfig() (*tls.Config, error) { if err != nil { return nil, err } + // if given a CA, trust any host with a cert signed by the CA + cfg.ServerName = "" } if info.selfCert { diff --git a/vendor/github.com/coreos/etcd/pkg/transport/tls.go b/vendor/github.com/coreos/etcd/pkg/transport/tls.go new file mode 100644 index 00000000000..62fe0d38519 --- /dev/null +++ b/vendor/github.com/coreos/etcd/pkg/transport/tls.go @@ -0,0 +1,49 @@ +// Copyright 2016 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package transport + +import ( + "fmt" + "strings" + "time" +) + +// ValidateSecureEndpoints scans the given endpoints against tls info, returning only those +// endpoints that could be validated as secure. +func ValidateSecureEndpoints(tlsInfo TLSInfo, eps []string) ([]string, error) { + t, err := NewTransport(tlsInfo, 5*time.Second) + if err != nil { + return nil, err + } + var errs []string + var endpoints []string + for _, ep := range eps { + if !strings.HasPrefix(ep, "https://") { + errs = append(errs, fmt.Sprintf("%q is insecure", ep)) + continue + } + conn, cerr := t.Dial("tcp", ep[len("https://"):]) + if cerr != nil { + errs = append(errs, fmt.Sprintf("%q failed to dial (%v)", ep, cerr)) + continue + } + conn.Close() + endpoints = append(endpoints, ep) + } + if len(errs) != 0 { + err = fmt.Errorf("%s", strings.Join(errs, ",")) + } + return endpoints, err +} diff --git a/vendor/github.com/coreos/etcd/rafthttp/stream.go b/vendor/github.com/coreos/etcd/rafthttp/stream.go index 07bc1c20474..76e731e91d0 100644 --- a/vendor/github.com/coreos/etcd/rafthttp/stream.go +++ b/vendor/github.com/coreos/etcd/rafthttp/stream.go @@ -332,7 +332,16 @@ func (cr *streamReader) decodeLoop(rc io.ReadCloser, t streamType) error { default: plog.Panicf("unhandled stream type %s", t) } - cr.closer = rc + select { + case <-cr.stopc: + cr.mu.Unlock() + if err := rc.Close(); err != nil { + return err + } + return io.EOF + default: + cr.closer = rc + } cr.mu.Unlock() for { diff --git a/vendor/github.com/coreos/etcd/version/version.go b/vendor/github.com/coreos/etcd/version/version.go index 01bdd6933c1..224dfe43222 100644 --- a/vendor/github.com/coreos/etcd/version/version.go +++ b/vendor/github.com/coreos/etcd/version/version.go @@ -29,7 +29,7 @@ import ( var ( // MinClusterVersion is the min cluster version this etcd binary is compatible with. MinClusterVersion = "2.3.0" - Version = "3.0.4" + Version = "3.0.6" // Git SHA Value will be set during build GitSHA = "Not provided (use ./build instead of go build)"