Update godeps for etcd 3.0.4

This commit is contained in:
Timothy St. Clair
2016-07-22 13:54:40 -05:00
parent 456c43c22d
commit 5f008faa8b
457 changed files with 25492 additions and 10481 deletions

View File

@@ -40,9 +40,11 @@ if err != nil {
// use the response
```
etcd uses go's `vendor` directory to manage external dependencies. If `clientv3` is imported
outside of etcd, simply copy `clientv3` to the `vendor` directory or use tools like godep to
manage your own dependency, as in [vendor directories](https://golang.org/cmd/go/#hdr-Vendor_Directories).
etcd uses `cmd/vendor` directory to store external dependencies, which are
to be compiled into etcd release binaries. `client` can be imported without
vendoring. For full compatibility, it is recommended to vendor builds using
etcd's vendored packages, using tools like godep, as in
[vendor directories](https://golang.org/cmd/go/#hdr-Vendor_Directories).
For more detail, please read [Go vendor design](https://golang.org/s/go15vendor).
## Error Handling
@@ -50,21 +52,22 @@ For more detail, please read [Go vendor design](https://golang.org/s/go15vendor)
etcd client returns 2 types of errors:
1. context error: canceled or deadline exceeded.
2. gRPC error: see [v3rpc/error](https://github.com/coreos/etcd/blob/master/etcdserver/api/v3rpc/error.go).
2. gRPC error: see [api/v3rpc/rpctypes](https://godoc.org/github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes).
Here is the example code to handle client errors:
```go
resp, err := kvc.Put(ctx, "", "")
if err != nil {
if err == context.Canceled {
// ctx is canceled by another routine
} else if err == context.DeadlineExceeded {
// ctx is attached with a deadline and it exceeded
} else if verr, ok := err.(*v3rpc.ErrEmptyKey); ok {
// process (verr.Errors)
} else {
// bad cluster endpoints, which are not etcd servers
switch err {
case context.Canceled:
log.Fatalf("ctx is canceled by another routine: %v", err)
case context.DeadlineExceeded:
log.Fatalf("ctx is attached with a deadline is exceeded: %v", err)
case rpctypes.ErrEmptyKey:
log.Fatalf("client-side error: %v", err)
default:
log.Fatalf("bad cluster endpoints, which are not etcd servers: %v", err)
}
}
```

View File

@@ -1,4 +1,4 @@
// Copyright 2016 Nippon Telegraph and Telephone Corporation.
// Copyright 2016 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@@ -15,23 +15,49 @@
package clientv3
import (
"fmt"
"strings"
"github.com/coreos/etcd/auth/authpb"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"golang.org/x/net/context"
"google.golang.org/grpc"
)
type (
AuthEnableResponse pb.AuthEnableResponse
AuthUserAddResponse pb.AuthUserAddResponse
AuthUserDeleteResponse pb.AuthUserDeleteResponse
AuthUserChangePasswordResponse pb.AuthUserChangePasswordResponse
AuthRoleAddResponse pb.AuthRoleAddResponse
AuthEnableResponse pb.AuthEnableResponse
AuthDisableResponse pb.AuthDisableResponse
AuthenticateResponse pb.AuthenticateResponse
AuthUserAddResponse pb.AuthUserAddResponse
AuthUserDeleteResponse pb.AuthUserDeleteResponse
AuthUserChangePasswordResponse pb.AuthUserChangePasswordResponse
AuthUserGrantRoleResponse pb.AuthUserGrantRoleResponse
AuthUserGetResponse pb.AuthUserGetResponse
AuthUserRevokeRoleResponse pb.AuthUserRevokeRoleResponse
AuthRoleAddResponse pb.AuthRoleAddResponse
AuthRoleGrantPermissionResponse pb.AuthRoleGrantPermissionResponse
AuthRoleGetResponse pb.AuthRoleGetResponse
AuthRoleRevokePermissionResponse pb.AuthRoleRevokePermissionResponse
AuthRoleDeleteResponse pb.AuthRoleDeleteResponse
AuthUserListResponse pb.AuthUserListResponse
AuthRoleListResponse pb.AuthRoleListResponse
PermissionType authpb.Permission_Type
)
const (
PermRead = authpb.READ
PermWrite = authpb.WRITE
PermReadWrite = authpb.READWRITE
)
type Auth interface {
// AuthEnable enables auth of an etcd cluster.
AuthEnable(ctx context.Context) (*AuthEnableResponse, error)
// AuthDisable disables auth of an etcd cluster.
AuthDisable(ctx context.Context) (*AuthDisableResponse, error)
// UserAdd adds a new user to an etcd cluster.
UserAdd(ctx context.Context, name string, password string) (*AuthUserAddResponse, error)
@@ -41,8 +67,35 @@ type Auth interface {
// UserChangePassword changes a password of a user.
UserChangePassword(ctx context.Context, name string, password string) (*AuthUserChangePasswordResponse, error)
// RoleAdd adds a new user to an etcd cluster.
// UserGrantRole grants a role to a user.
UserGrantRole(ctx context.Context, user string, role string) (*AuthUserGrantRoleResponse, error)
// UserGet gets a detailed information of a user.
UserGet(ctx context.Context, name string) (*AuthUserGetResponse, error)
// UserList gets a list of all users.
UserList(ctx context.Context) (*AuthUserListResponse, error)
// UserRevokeRole revokes a role of a user.
UserRevokeRole(ctx context.Context, name string, role string) (*AuthUserRevokeRoleResponse, error)
// RoleAdd adds a new role to an etcd cluster.
RoleAdd(ctx context.Context, name string) (*AuthRoleAddResponse, error)
// RoleGrantPermission grants a permission to a role.
RoleGrantPermission(ctx context.Context, name string, key, rangeEnd string, permType PermissionType) (*AuthRoleGrantPermissionResponse, error)
// RoleGet gets a detailed information of a role.
RoleGet(ctx context.Context, role string) (*AuthRoleGetResponse, error)
// RoleList gets a list of all roles.
RoleList(ctx context.Context) (*AuthRoleListResponse, error)
// RoleRevokePermission revokes a permission from a role.
RoleRevokePermission(ctx context.Context, role string, key, rangeEnd string) (*AuthRoleRevokePermissionResponse, error)
// RoleDelete deletes a role.
RoleDelete(ctx context.Context, role string) (*AuthRoleDeleteResponse, error)
}
type auth struct {
@@ -62,26 +115,115 @@ func NewAuth(c *Client) Auth {
}
func (auth *auth) AuthEnable(ctx context.Context) (*AuthEnableResponse, error) {
resp, err := auth.remote.AuthEnable(ctx, &pb.AuthEnableRequest{})
return (*AuthEnableResponse)(resp), err
resp, err := auth.remote.AuthEnable(ctx, &pb.AuthEnableRequest{}, grpc.FailFast(false))
return (*AuthEnableResponse)(resp), toErr(ctx, err)
}
func (auth *auth) AuthDisable(ctx context.Context) (*AuthDisableResponse, error) {
resp, err := auth.remote.AuthDisable(ctx, &pb.AuthDisableRequest{}, grpc.FailFast(false))
return (*AuthDisableResponse)(resp), toErr(ctx, err)
}
func (auth *auth) UserAdd(ctx context.Context, name string, password string) (*AuthUserAddResponse, error) {
resp, err := auth.remote.UserAdd(ctx, &pb.AuthUserAddRequest{Name: name, Password: password})
return (*AuthUserAddResponse)(resp), err
resp, err := auth.remote.UserAdd(ctx, &pb.AuthUserAddRequest{Name: name, Password: password}, grpc.FailFast(false))
return (*AuthUserAddResponse)(resp), toErr(ctx, err)
}
func (auth *auth) UserDelete(ctx context.Context, name string) (*AuthUserDeleteResponse, error) {
resp, err := auth.remote.UserDelete(ctx, &pb.AuthUserDeleteRequest{Name: name})
return (*AuthUserDeleteResponse)(resp), err
resp, err := auth.remote.UserDelete(ctx, &pb.AuthUserDeleteRequest{Name: name}, grpc.FailFast(false))
return (*AuthUserDeleteResponse)(resp), toErr(ctx, err)
}
func (auth *auth) UserChangePassword(ctx context.Context, name string, password string) (*AuthUserChangePasswordResponse, error) {
resp, err := auth.remote.UserChangePassword(ctx, &pb.AuthUserChangePasswordRequest{Name: name, Password: password})
return (*AuthUserChangePasswordResponse)(resp), err
resp, err := auth.remote.UserChangePassword(ctx, &pb.AuthUserChangePasswordRequest{Name: name, Password: password}, grpc.FailFast(false))
return (*AuthUserChangePasswordResponse)(resp), toErr(ctx, err)
}
func (auth *auth) UserGrantRole(ctx context.Context, user string, role string) (*AuthUserGrantRoleResponse, error) {
resp, err := auth.remote.UserGrantRole(ctx, &pb.AuthUserGrantRoleRequest{User: user, Role: role}, grpc.FailFast(false))
return (*AuthUserGrantRoleResponse)(resp), toErr(ctx, err)
}
func (auth *auth) UserGet(ctx context.Context, name string) (*AuthUserGetResponse, error) {
resp, err := auth.remote.UserGet(ctx, &pb.AuthUserGetRequest{Name: name}, grpc.FailFast(false))
return (*AuthUserGetResponse)(resp), toErr(ctx, err)
}
func (auth *auth) UserList(ctx context.Context) (*AuthUserListResponse, error) {
resp, err := auth.remote.UserList(ctx, &pb.AuthUserListRequest{}, grpc.FailFast(false))
return (*AuthUserListResponse)(resp), toErr(ctx, err)
}
func (auth *auth) UserRevokeRole(ctx context.Context, name string, role string) (*AuthUserRevokeRoleResponse, error) {
resp, err := auth.remote.UserRevokeRole(ctx, &pb.AuthUserRevokeRoleRequest{Name: name, Role: role}, grpc.FailFast(false))
return (*AuthUserRevokeRoleResponse)(resp), toErr(ctx, err)
}
func (auth *auth) RoleAdd(ctx context.Context, name string) (*AuthRoleAddResponse, error) {
resp, err := auth.remote.RoleAdd(ctx, &pb.AuthRoleAddRequest{Name: name})
return (*AuthRoleAddResponse)(resp), err
resp, err := auth.remote.RoleAdd(ctx, &pb.AuthRoleAddRequest{Name: name}, grpc.FailFast(false))
return (*AuthRoleAddResponse)(resp), toErr(ctx, err)
}
func (auth *auth) RoleGrantPermission(ctx context.Context, name string, key, rangeEnd string, permType PermissionType) (*AuthRoleGrantPermissionResponse, error) {
perm := &authpb.Permission{
Key: []byte(key),
RangeEnd: []byte(rangeEnd),
PermType: authpb.Permission_Type(permType),
}
resp, err := auth.remote.RoleGrantPermission(ctx, &pb.AuthRoleGrantPermissionRequest{Name: name, Perm: perm}, grpc.FailFast(false))
return (*AuthRoleGrantPermissionResponse)(resp), toErr(ctx, err)
}
func (auth *auth) RoleGet(ctx context.Context, role string) (*AuthRoleGetResponse, error) {
resp, err := auth.remote.RoleGet(ctx, &pb.AuthRoleGetRequest{Role: role}, grpc.FailFast(false))
return (*AuthRoleGetResponse)(resp), toErr(ctx, err)
}
func (auth *auth) RoleList(ctx context.Context) (*AuthRoleListResponse, error) {
resp, err := auth.remote.RoleList(ctx, &pb.AuthRoleListRequest{}, grpc.FailFast(false))
return (*AuthRoleListResponse)(resp), toErr(ctx, err)
}
func (auth *auth) RoleRevokePermission(ctx context.Context, role string, key, rangeEnd string) (*AuthRoleRevokePermissionResponse, error) {
resp, err := auth.remote.RoleRevokePermission(ctx, &pb.AuthRoleRevokePermissionRequest{Role: role, Key: key, RangeEnd: rangeEnd}, grpc.FailFast(false))
return (*AuthRoleRevokePermissionResponse)(resp), toErr(ctx, err)
}
func (auth *auth) RoleDelete(ctx context.Context, role string) (*AuthRoleDeleteResponse, error) {
resp, err := auth.remote.RoleDelete(ctx, &pb.AuthRoleDeleteRequest{Role: role}, grpc.FailFast(false))
return (*AuthRoleDeleteResponse)(resp), toErr(ctx, err)
}
func StrToPermissionType(s string) (PermissionType, error) {
val, ok := authpb.Permission_Type_value[strings.ToUpper(s)]
if ok {
return PermissionType(val), nil
}
return PermissionType(-1), fmt.Errorf("invalid permission type: %s", s)
}
type authenticator struct {
conn *grpc.ClientConn // conn in-use
remote pb.AuthClient
}
func (auth *authenticator) authenticate(ctx context.Context, name string, password string) (*AuthenticateResponse, error) {
resp, err := auth.remote.Authenticate(ctx, &pb.AuthenticateRequest{Name: name, Password: password}, grpc.FailFast(false))
return (*AuthenticateResponse)(resp), toErr(ctx, err)
}
func (auth *authenticator) close() {
auth.conn.Close()
}
func newAuthenticator(endpoint string, opts []grpc.DialOption) (*authenticator, error) {
conn, err := grpc.Dial(endpoint, opts...)
if err != nil {
return nil, err
}
return &authenticator{
conn: conn,
remote: pb.NewAuthClient(conn),
}, nil
}

64
vendor/github.com/coreos/etcd/clientv3/balancer.go generated vendored Normal file
View File

@@ -0,0 +1,64 @@
// Copyright 2016 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package clientv3
import (
"net/url"
"strings"
"sync/atomic"
"golang.org/x/net/context"
"google.golang.org/grpc"
)
// simpleBalancer does the bare minimum to expose multiple eps
// to the grpc reconnection code path
type simpleBalancer struct {
// eps are the client's endpoints stripped of any URL scheme
eps []string
ch chan []grpc.Address
numGets uint32
}
func newSimpleBalancer(eps []string) grpc.Balancer {
ch := make(chan []grpc.Address, 1)
addrs := make([]grpc.Address, len(eps))
for i := range eps {
addrs[i].Addr = getHost(eps[i])
}
ch <- addrs
return &simpleBalancer{eps: eps, ch: ch}
}
func (b *simpleBalancer) Start(target string) error { return nil }
func (b *simpleBalancer) Up(addr grpc.Address) func(error) { return func(error) {} }
func (b *simpleBalancer) Get(ctx context.Context, opts grpc.BalancerGetOptions) (grpc.Address, func(), error) {
v := atomic.AddUint32(&b.numGets, 1)
ep := b.eps[v%uint32(len(b.eps))]
return grpc.Address{Addr: getHost(ep)}, func() {}, nil
}
func (b *simpleBalancer) Notify() <-chan []grpc.Address { return b.ch }
func (b *simpleBalancer) Close() error {
close(b.ch)
return nil
}
func getHost(ep string) string {
url, uerr := url.Parse(ep)
if uerr != nil || !strings.Contains(ep, "://") {
return ep
}
return url.Host
}

View File

@@ -1,4 +1,4 @@
// Copyright 2016 CoreOS, Inc.
// Copyright 2016 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@@ -15,18 +15,22 @@
package clientv3
import (
"crypto/tls"
"errors"
"fmt"
"io/ioutil"
"log"
"net"
"net/url"
"strings"
"sync"
"time"
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/metadata"
)
var (
@@ -42,21 +46,21 @@ type Client struct {
Auth
Maintenance
conn *grpc.ClientConn
cfg Config
creds *credentials.TransportAuthenticator
mu sync.RWMutex // protects connection selection and error list
errors []error // errors passed to retryConnection
conn *grpc.ClientConn
cfg Config
creds *credentials.TransportCredentials
ctx context.Context
cancel context.CancelFunc
// Username is a username for authentication
Username string
// Password is a password for authentication
Password string
}
// New creates a new etcdv3 client from a given configuration.
func New(cfg Config) (*Client, error) {
if cfg.RetryDialer == nil {
cfg.RetryDialer = dialEndpointList
}
if len(cfg.Endpoints) == 0 {
return nil, ErrNoAvailableEndpoints
}
@@ -80,17 +84,8 @@ func NewFromConfigFile(path string) (*Client, error) {
// Close shuts down the client's etcd connections.
func (c *Client) Close() error {
c.mu.Lock()
if c.cancel == nil {
c.mu.Unlock()
return nil
}
c.cancel()
c.cancel = nil
c.mu.Unlock()
c.Watcher.Close()
c.Lease.Close()
return c.conn.Close()
return toErr(c.ctx, c.conn.Close())
}
// Ctx is a context for "out of band" messages (e.g., for sending
@@ -101,72 +96,157 @@ func (c *Client) Ctx() context.Context { return c.ctx }
// Endpoints lists the registered endpoints for the client.
func (c *Client) Endpoints() []string { return c.cfg.Endpoints }
// Errors returns all errors that have been observed since called last.
func (c *Client) Errors() (errs []error) {
c.mu.Lock()
defer c.mu.Unlock()
errs = c.errors
c.errors = nil
return errs
type authTokenCredential struct {
token string
}
// Dial establishes a connection for a given endpoint using the client's config
func (c *Client) Dial(endpoint string) (*grpc.ClientConn, error) {
func (cred authTokenCredential) RequireTransportSecurity() bool {
return false
}
func (cred authTokenCredential) GetRequestMetadata(ctx context.Context, s ...string) (map[string]string, error) {
return map[string]string{
"token": cred.token,
}, nil
}
func (c *Client) dialTarget(endpoint string) (proto string, host string, creds *credentials.TransportCredentials) {
proto = "tcp"
host = endpoint
creds = c.creds
url, uerr := url.Parse(endpoint)
if uerr != nil || !strings.Contains(endpoint, "://") {
return
}
// strip scheme:// prefix since grpc dials by host
host = url.Host
switch url.Scheme {
case "unix":
proto = "unix"
case "http":
creds = nil
case "https":
if creds != nil {
break
}
tlsconfig := &tls.Config{}
emptyCreds := credentials.NewTLS(tlsconfig)
creds = &emptyCreds
default:
return "", "", nil
}
return
}
// dialSetupOpts gives the dial opts prioer to any authentication
func (c *Client) dialSetupOpts(endpoint string, dopts ...grpc.DialOption) []grpc.DialOption {
opts := []grpc.DialOption{
grpc.WithBlock(),
grpc.WithTimeout(c.cfg.DialTimeout),
}
if c.creds != nil {
opts = append(opts, grpc.WithTransportCredentials(*c.creds))
} else {
opts = append(opts, grpc.WithInsecure())
opts = append(opts, dopts...)
// grpc issues TLS cert checks using the string passed into dial so
// that string must be the host. To recover the full scheme://host URL,
// have a map from hosts to the original endpoint.
host2ep := make(map[string]string)
for i := range c.cfg.Endpoints {
_, host, _ := c.dialTarget(c.cfg.Endpoints[i])
host2ep[host] = c.cfg.Endpoints[i]
}
proto := "tcp"
if url, uerr := url.Parse(endpoint); uerr == nil && url.Scheme == "unix" {
proto = "unix"
// strip unix:// prefix so certs work
endpoint = url.Host
}
f := func(a string, t time.Duration) (net.Conn, error) {
f := func(host string, t time.Duration) (net.Conn, error) {
proto, host, _ := c.dialTarget(host2ep[host])
if proto == "" {
return nil, fmt.Errorf("unknown scheme for %q", host)
}
select {
case <-c.ctx.Done():
return nil, c.ctx.Err()
default:
}
return net.DialTimeout(proto, a, t)
return net.DialTimeout(proto, host, t)
}
opts = append(opts, grpc.WithDialer(f))
conn, err := grpc.Dial(endpoint, opts...)
_, _, creds := c.dialTarget(endpoint)
if creds != nil {
opts = append(opts, grpc.WithTransportCredentials(*creds))
} else {
opts = append(opts, grpc.WithInsecure())
}
return opts
}
// Dial connects to a single endpoint using the client's config.
func (c *Client) Dial(endpoint string) (*grpc.ClientConn, error) {
return c.dial(endpoint)
}
func (c *Client) dial(endpoint string, dopts ...grpc.DialOption) (*grpc.ClientConn, error) {
opts := c.dialSetupOpts(endpoint, dopts...)
host := getHost(endpoint)
if c.Username != "" && c.Password != "" {
// use dial options without dopts to avoid reusing the client balancer
auth, err := newAuthenticator(host, c.dialSetupOpts(endpoint))
if err != nil {
return nil, err
}
defer auth.close()
resp, err := auth.authenticate(c.ctx, c.Username, c.Password)
if err != nil {
return nil, err
}
opts = append(opts, grpc.WithPerRPCCredentials(authTokenCredential{token: resp.Token}))
}
conn, err := grpc.Dial(host, opts...)
if err != nil {
return nil, err
}
return conn, nil
}
// WithRequireLeader requires client requests to only succeed
// when the cluster has a leader.
func WithRequireLeader(ctx context.Context) context.Context {
md := metadata.Pairs(rpctypes.MetadataRequireLeaderKey, rpctypes.MetadataHasLeader)
return metadata.NewContext(ctx, md)
}
func newClient(cfg *Config) (*Client, error) {
if cfg == nil {
cfg = &Config{RetryDialer: dialEndpointList}
cfg = &Config{}
}
var creds *credentials.TransportAuthenticator
var creds *credentials.TransportCredentials
if cfg.TLS != nil {
c := credentials.NewTLS(cfg.TLS)
creds = &c
}
// use a temporary skeleton client to bootstrap first connection
ctx, cancel := context.WithCancel(context.TODO())
conn, err := cfg.RetryDialer(&Client{cfg: *cfg, creds: creds, ctx: ctx})
if err != nil {
return nil, err
}
client := &Client{
conn: conn,
conn: nil,
cfg: *cfg,
creds: creds,
ctx: ctx,
cancel: cancel,
}
if cfg.Username != "" && cfg.Password != "" {
client.Username = cfg.Username
client.Password = cfg.Password
}
b := newSimpleBalancer(cfg.Endpoints)
conn, err := client.dial(cfg.Endpoints[0], grpc.WithBalancer(b))
if err != nil {
return nil, err
}
client.conn = conn
client.Cluster = NewCluster(client)
client.KV = NewKV(client)
client.Lease = NewLease(client)
@@ -184,60 +264,35 @@ func newClient(cfg *Config) (*Client, error) {
}
// ActiveConnection returns the current in-use connection
func (c *Client) ActiveConnection() *grpc.ClientConn {
c.mu.RLock()
defer c.mu.RUnlock()
return c.conn
}
func (c *Client) ActiveConnection() *grpc.ClientConn { return c.conn }
// retryConnection establishes a new connection
func (c *Client) retryConnection(oldConn *grpc.ClientConn, err error) (*grpc.ClientConn, error) {
c.mu.Lock()
defer c.mu.Unlock()
if err != nil {
c.errors = append(c.errors, err)
}
if c.cancel == nil {
return nil, c.ctx.Err()
}
if oldConn != c.conn {
// conn has already been updated
return c.conn, nil
}
oldConn.Close()
if st, _ := oldConn.State(); st != grpc.Shutdown {
// wait for shutdown so grpc doesn't leak sleeping goroutines
oldConn.WaitForStateChange(c.ctx, st)
}
conn, dialErr := c.cfg.RetryDialer(c)
if dialErr != nil {
c.errors = append(c.errors, dialErr)
return nil, dialErr
}
c.conn = conn
return c.conn, nil
}
// dialEndpointList attempts to connect to each endpoint in order until a
// connection is established.
func dialEndpointList(c *Client) (*grpc.ClientConn, error) {
var err error
for _, ep := range c.Endpoints() {
conn, curErr := c.Dial(ep)
if curErr != nil {
err = curErr
} else {
return conn, nil
}
}
return nil, err
}
// isHalted returns true if the given error and context indicate no forward
// isHaltErr returns true if the given error and context indicate no forward
// progress can be made, even after reconnecting.
func isHalted(ctx context.Context, err error) bool {
isRPCError := strings.HasPrefix(grpc.ErrorDesc(err), "etcdserver: ")
return isRPCError || ctx.Err() != nil
func isHaltErr(ctx context.Context, err error) bool {
if ctx != nil && ctx.Err() != nil {
return true
}
if err == nil {
return false
}
eErr := rpctypes.Error(err)
if _, ok := eErr.(rpctypes.EtcdError); ok {
return eErr != rpctypes.ErrStopped && eErr != rpctypes.ErrNoLeader
}
// treat etcdserver errors not recognized by the client as halting
return strings.Contains(err.Error(), grpc.ErrClientConnClosing.Error()) ||
strings.Contains(err.Error(), "etcdserver:")
}
func toErr(ctx context.Context, err error) error {
if err == nil {
return nil
}
err = rpctypes.Error(err)
if ctx.Err() != nil && strings.Contains(err.Error(), "context") {
err = ctx.Err()
} else if strings.Contains(err.Error(), grpc.ErrClientConnClosing.Error()) {
err = grpc.ErrClientConnClosing
}
return err
}

View File

@@ -1,4 +1,4 @@
// Copyright 2016 CoreOS, Inc.
// Copyright 2016 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@@ -15,8 +15,6 @@
package clientv3
import (
"sync"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"golang.org/x/net/context"
"google.golang.org/grpc"
@@ -34,9 +32,6 @@ type Cluster interface {
// MemberList lists the current cluster membership.
MemberList(ctx context.Context) (*MemberListResponse, error)
// MemberLeader returns the current leader member.
MemberLeader(ctx context.Context) (*Member, error)
// MemberAdd adds a new member into the cluster.
MemberAdd(ctx context.Context, peerAddrs []string) (*MemberAddResponse, error)
@@ -48,70 +43,47 @@ type Cluster interface {
}
type cluster struct {
c *Client
mu sync.Mutex
conn *grpc.ClientConn // conn in-use
remote pb.ClusterClient
}
func NewCluster(c *Client) Cluster {
conn := c.ActiveConnection()
return &cluster{
c: c,
conn: conn,
remote: pb.NewClusterClient(conn),
}
return &cluster{remote: pb.NewClusterClient(c.conn)}
}
func (c *cluster) MemberAdd(ctx context.Context, peerAddrs []string) (*MemberAddResponse, error) {
r := &pb.MemberAddRequest{PeerURLs: peerAddrs}
resp, err := c.getRemote().MemberAdd(ctx, r)
resp, err := c.remote.MemberAdd(ctx, r, grpc.FailFast(false))
if err == nil {
return (*MemberAddResponse)(resp), nil
}
if isHalted(ctx, err) {
return nil, err
if isHaltErr(ctx, err) {
return nil, toErr(ctx, err)
}
go c.switchRemote(err)
return nil, err
return nil, toErr(ctx, err)
}
func (c *cluster) MemberRemove(ctx context.Context, id uint64) (*MemberRemoveResponse, error) {
r := &pb.MemberRemoveRequest{ID: id}
resp, err := c.getRemote().MemberRemove(ctx, r)
resp, err := c.remote.MemberRemove(ctx, r, grpc.FailFast(false))
if err == nil {
return (*MemberRemoveResponse)(resp), nil
}
if isHalted(ctx, err) {
return nil, err
if isHaltErr(ctx, err) {
return nil, toErr(ctx, err)
}
go c.switchRemote(err)
return nil, err
return nil, toErr(ctx, err)
}
func (c *cluster) MemberUpdate(ctx context.Context, id uint64, peerAddrs []string) (*MemberUpdateResponse, error) {
// it is safe to retry on update.
for {
r := &pb.MemberUpdateRequest{ID: id, PeerURLs: peerAddrs}
resp, err := c.getRemote().MemberUpdate(ctx, r)
resp, err := c.remote.MemberUpdate(ctx, r, grpc.FailFast(false))
if err == nil {
return (*MemberUpdateResponse)(resp), nil
}
if isHalted(ctx, err) {
return nil, err
}
err = c.switchRemote(err)
if err != nil {
return nil, err
if isHaltErr(ctx, err) {
return nil, toErr(ctx, err)
}
}
}
@@ -119,52 +91,12 @@ func (c *cluster) MemberUpdate(ctx context.Context, id uint64, peerAddrs []strin
func (c *cluster) MemberList(ctx context.Context) (*MemberListResponse, error) {
// it is safe to retry on list.
for {
resp, err := c.getRemote().MemberList(ctx, &pb.MemberListRequest{})
resp, err := c.remote.MemberList(ctx, &pb.MemberListRequest{}, grpc.FailFast(false))
if err == nil {
return (*MemberListResponse)(resp), nil
}
if isHalted(ctx, err) {
return nil, err
}
err = c.switchRemote(err)
if err != nil {
return nil, err
if isHaltErr(ctx, err) {
return nil, toErr(ctx, err)
}
}
}
func (c *cluster) MemberLeader(ctx context.Context) (*Member, error) {
resp, err := c.MemberList(ctx)
if err != nil {
return nil, err
}
for _, m := range resp.Members {
if m.IsLeader {
return (*Member)(m), nil
}
}
return nil, nil
}
func (c *cluster) getRemote() pb.ClusterClient {
c.mu.Lock()
defer c.mu.Unlock()
return c.remote
}
func (c *cluster) switchRemote(prevErr error) error {
newConn, err := c.c.retryConnection(c.conn, prevErr)
if err != nil {
return err
}
c.mu.Lock()
defer c.mu.Unlock()
c.conn = newConn
c.remote = pb.NewClusterClient(c.conn)
return nil
}

53
vendor/github.com/coreos/etcd/clientv3/compact_op.go generated vendored Normal file
View File

@@ -0,0 +1,53 @@
// Copyright 2016 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package clientv3
import (
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
)
// CompactOp represents a compact operation.
type CompactOp struct {
revision int64
physical bool
}
// CompactOption configures compact operation.
type CompactOption func(*CompactOp)
func (op *CompactOp) applyCompactOpts(opts []CompactOption) {
for _, opt := range opts {
opt(op)
}
}
// OpCompact wraps slice CompactOption to create a CompactOp.
func OpCompact(rev int64, opts ...CompactOption) CompactOp {
ret := CompactOp{revision: rev}
ret.applyCompactOpts(opts)
return ret
}
func (op CompactOp) toRequest() *pb.CompactionRequest {
return &pb.CompactionRequest{Revision: op.revision, Physical: op.physical}
}
// WithCompactPhysical makes compact RPC call wait until
// the compaction is physically applied to the local database
// such that compacted entries are totally removed from the
// backend database.
func WithCompactPhysical() CompactOption {
return func(op *CompactOp) { op.physical = true }
}

View File

@@ -1,4 +1,4 @@
// Copyright 2016 CoreOS, Inc.
// Copyright 2016 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.

View File

@@ -1,4 +1,4 @@
// Copyright 2016 CoreOS, Inc.
// Copyright 2016 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@@ -22,19 +22,12 @@ import (
"github.com/coreos/etcd/pkg/tlsutil"
"github.com/ghodss/yaml"
"google.golang.org/grpc"
)
// EndpointDialer is a policy for choosing which endpoint to dial next
type EndpointDialer func(*Client) (*grpc.ClientConn, error)
type Config struct {
// Endpoints is a list of URLs
Endpoints []string
// RetryDialer chooses the next endpoint to use
RetryDialer EndpointDialer
// DialTimeout is the timeout for failing to establish a connection.
DialTimeout time.Duration
@@ -43,9 +36,15 @@ type Config struct {
// Logger is the logger used by client library.
Logger Logger
// Username is a username for authentication
Username string
// Password is a password for authentication
Password string
}
type YamlConfig struct {
type yamlConfig struct {
Endpoints []string `json:"endpoints"`
DialTimeout time.Duration `json:"dial-timeout"`
InsecureTransport bool `json:"insecure-transport"`
@@ -61,7 +60,7 @@ func configFromFile(fpath string) (*Config, error) {
return nil, err
}
yc := &YamlConfig{}
yc := &yamlConfig{}
err = yaml.Unmarshal(b, yc)
if err != nil {

View File

@@ -1,4 +1,4 @@
// Copyright 2016 CoreOS, Inc.
// Copyright 2016 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.

View File

@@ -1,4 +1,4 @@
// Copyright 2015 CoreOS, Inc.
// Copyright 2015 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@@ -15,18 +15,17 @@
package clientv3
import (
"sync"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"golang.org/x/net/context"
"google.golang.org/grpc"
)
type (
PutResponse pb.PutResponse
GetResponse pb.RangeResponse
DeleteResponse pb.DeleteRangeResponse
TxnResponse pb.TxnResponse
CompactResponse pb.CompactionResponse
PutResponse pb.PutResponse
GetResponse pb.RangeResponse
DeleteResponse pb.DeleteRangeResponse
TxnResponse pb.TxnResponse
)
type KV interface {
@@ -50,7 +49,7 @@ type KV interface {
Delete(ctx context.Context, key string, opts ...OpOption) (*DeleteResponse, error)
// Compact compacts etcd KV history before the given rev.
Compact(ctx context.Context, rev int64) error
Compact(ctx context.Context, rev int64, opts ...CompactOption) (*CompactResponse, error)
// Do applies a single Op on KV without a transaction.
// Do is useful when declaring operations to be issued at a later time
@@ -61,7 +60,7 @@ type KV interface {
// Do is useful when creating arbitrary operations to be issued at a
// later time; the user can range over the operations, calling Do to
// execute them. Get/Put/Delete, on the other hand, are best suited
// for when the operation should be issued at the time of declaration.
// for when the operation should be issued at the time of declaration.
Do(ctx context.Context, op Op) (OpResponse, error)
// Txn creates a transaction.
@@ -74,54 +73,39 @@ type OpResponse struct {
del *DeleteResponse
}
type kv struct {
c *Client
func (op OpResponse) Put() *PutResponse { return op.put }
func (op OpResponse) Get() *GetResponse { return op.get }
func (op OpResponse) Del() *DeleteResponse { return op.del }
mu sync.Mutex // guards all fields
conn *grpc.ClientConn // conn in-use
type kv struct {
remote pb.KVClient
}
func NewKV(c *Client) KV {
conn := c.ActiveConnection()
remote := pb.NewKVClient(conn)
return &kv{
conn: c.ActiveConnection(),
remote: remote,
c: c,
}
return &kv{remote: pb.NewKVClient(c.conn)}
}
func (kv *kv) Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error) {
r, err := kv.Do(ctx, OpPut(key, val, opts...))
return r.put, err
return r.put, toErr(ctx, err)
}
func (kv *kv) Get(ctx context.Context, key string, opts ...OpOption) (*GetResponse, error) {
r, err := kv.Do(ctx, OpGet(key, opts...))
return r.get, err
return r.get, toErr(ctx, err)
}
func (kv *kv) Delete(ctx context.Context, key string, opts ...OpOption) (*DeleteResponse, error) {
r, err := kv.Do(ctx, OpDelete(key, opts...))
return r.del, err
return r.del, toErr(ctx, err)
}
func (kv *kv) Compact(ctx context.Context, rev int64) error {
r := &pb.CompactionRequest{Revision: rev}
_, err := kv.getRemote().Compact(ctx, r)
if err == nil {
return nil
func (kv *kv) Compact(ctx context.Context, rev int64, opts ...CompactOption) (*CompactResponse, error) {
resp, err := kv.remote.Compact(ctx, OpCompact(rev, opts...).toRequest(), grpc.FailFast(false))
if err != nil {
return nil, toErr(ctx, err)
}
if isHalted(ctx, err) {
return err
}
go kv.switchRemote(err)
return err
return (*CompactResponse)(resp), err
}
func (kv *kv) Txn(ctx context.Context) Txn {
@@ -133,75 +117,60 @@ func (kv *kv) Txn(ctx context.Context) Txn {
func (kv *kv) Do(ctx context.Context, op Op) (OpResponse, error) {
for {
var err error
switch op.t {
// TODO: handle other ops
case tRange:
var resp *pb.RangeResponse
r := &pb.RangeRequest{Key: op.key, RangeEnd: op.end, Limit: op.limit, Revision: op.rev, Serializable: op.serializable}
if op.sort != nil {
r.SortOrder = pb.RangeRequest_SortOrder(op.sort.Order)
r.SortTarget = pb.RangeRequest_SortTarget(op.sort.Target)
}
resp, err = kv.getRemote().Range(ctx, r)
if err == nil {
return OpResponse{get: (*GetResponse)(resp)}, nil
}
case tPut:
var resp *pb.PutResponse
r := &pb.PutRequest{Key: op.key, Value: op.val, Lease: int64(op.leaseID)}
resp, err = kv.getRemote().Put(ctx, r)
if err == nil {
return OpResponse{put: (*PutResponse)(resp)}, nil
}
case tDeleteRange:
var resp *pb.DeleteRangeResponse
r := &pb.DeleteRangeRequest{Key: op.key, RangeEnd: op.end}
resp, err = kv.getRemote().DeleteRange(ctx, r)
if err == nil {
return OpResponse{del: (*DeleteResponse)(resp)}, nil
}
default:
panic("Unknown op")
resp, err := kv.do(ctx, op)
if err == nil {
return resp, nil
}
if isHalted(ctx, err) {
return OpResponse{}, err
if isHaltErr(ctx, err) {
return resp, toErr(ctx, err)
}
// do not retry on modifications
if op.isWrite() {
go kv.switchRemote(err)
return OpResponse{}, err
}
if nerr := kv.switchRemote(err); nerr != nil {
return OpResponse{}, nerr
return resp, toErr(ctx, err)
}
}
}
func (kv *kv) switchRemote(prevErr error) error {
// Usually it's a bad idea to lock on network i/o but here it's OK
// since the link is down and new requests can't be processed anyway.
// Likewise, if connecting stalls, closing the Client can break the
// lock via context cancelation.
kv.mu.Lock()
defer kv.mu.Unlock()
func (kv *kv) do(ctx context.Context, op Op) (OpResponse, error) {
var err error
switch op.t {
// TODO: handle other ops
case tRange:
var resp *pb.RangeResponse
r := &pb.RangeRequest{
Key: op.key,
RangeEnd: op.end,
Limit: op.limit,
Revision: op.rev,
Serializable: op.serializable,
KeysOnly: op.keysOnly,
CountOnly: op.countOnly,
}
if op.sort != nil {
r.SortOrder = pb.RangeRequest_SortOrder(op.sort.Order)
r.SortTarget = pb.RangeRequest_SortTarget(op.sort.Target)
}
newConn, err := kv.c.retryConnection(kv.conn, prevErr)
if err != nil {
return err
resp, err = kv.remote.Range(ctx, r, grpc.FailFast(false))
if err == nil {
return OpResponse{get: (*GetResponse)(resp)}, nil
}
case tPut:
var resp *pb.PutResponse
r := &pb.PutRequest{Key: op.key, Value: op.val, Lease: int64(op.leaseID)}
resp, err = kv.remote.Put(ctx, r, grpc.FailFast(false))
if err == nil {
return OpResponse{put: (*PutResponse)(resp)}, nil
}
case tDeleteRange:
var resp *pb.DeleteRangeResponse
r := &pb.DeleteRangeRequest{Key: op.key, RangeEnd: op.end}
resp, err = kv.remote.DeleteRange(ctx, r, grpc.FailFast(false))
if err == nil {
return OpResponse{del: (*DeleteResponse)(resp)}, nil
}
default:
panic("Unknown op")
}
kv.conn = newConn
kv.remote = pb.NewKVClient(kv.conn)
return nil
}
func (kv *kv) getRemote() pb.KVClient {
kv.mu.Lock()
defer kv.mu.Unlock()
return kv.remote
return OpResponse{}, err
}

View File

@@ -1,4 +1,4 @@
// Copyright 2016 CoreOS, Inc.
// Copyright 2016 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@@ -18,19 +18,36 @@ import (
"sync"
"time"
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"golang.org/x/net/context"
"google.golang.org/grpc"
)
type (
LeaseGrantResponse pb.LeaseGrantResponse
LeaseRevokeResponse pb.LeaseRevokeResponse
LeaseKeepAliveResponse pb.LeaseKeepAliveResponse
LeaseID int64
LeaseRevokeResponse pb.LeaseRevokeResponse
LeaseID int64
)
// LeaseGrantResponse is used to convert the protobuf grant response.
type LeaseGrantResponse struct {
*pb.ResponseHeader
ID LeaseID
TTL int64
Error string
}
// LeaseKeepAliveResponse is used to convert the protobuf keepalive response.
type LeaseKeepAliveResponse struct {
*pb.ResponseHeader
ID LeaseID
TTL int64
}
const (
// defaultTTL is the assumed lease TTL used for the first keepalive
// deadline before the actual TTL is known to the client.
defaultTTL = 5 * time.Second
// a small buffer to store unsent lease responses.
leaseResponseChSize = 16
// NoLease is a lease ID for the absence of a lease.
@@ -57,10 +74,7 @@ type Lease interface {
}
type lessor struct {
c *Client
mu sync.Mutex // guards all fields
conn *grpc.ClientConn // conn in-use
mu sync.Mutex // guards all fields
// donec is closed when recvKeepAliveLoop stops
donec chan struct{}
@@ -74,32 +88,38 @@ type lessor struct {
stopCancel context.CancelFunc
keepAlives map[LeaseID]*keepAlive
// firstKeepAliveTimeout is the timeout for the first keepalive request
// before the actual TTL is known to the lease client
firstKeepAliveTimeout time.Duration
}
// keepAlive multiplexes a keepalive for a lease over multiple channels
type keepAlive struct {
chs []chan<- *LeaseKeepAliveResponse
ctxs []context.Context
// deadline is the next time to send a keep alive message
// deadline is the time the keep alive channels close if no response
deadline time.Time
// nextKeepAlive is when to send the next keep alive message
nextKeepAlive time.Time
// donec is closed on lease revoke, expiration, or cancel.
donec chan struct{}
}
func NewLease(c *Client) Lease {
l := &lessor{
c: c,
conn: c.ActiveConnection(),
donec: make(chan struct{}),
keepAlives: make(map[LeaseID]*keepAlive),
donec: make(chan struct{}),
keepAlives: make(map[LeaseID]*keepAlive),
remote: pb.NewLeaseClient(c.conn),
firstKeepAliveTimeout: c.cfg.DialTimeout + time.Second,
}
if l.firstKeepAliveTimeout == time.Second {
l.firstKeepAliveTimeout = defaultTTL
}
l.remote = pb.NewLeaseClient(l.conn)
l.stopCtx, l.stopCancel = context.WithCancel(context.Background())
go l.recvKeepAliveLoop()
go l.deadlineLoop()
return l
}
@@ -110,14 +130,20 @@ func (l *lessor) Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, err
for {
r := &pb.LeaseGrantRequest{TTL: ttl}
resp, err := l.getRemote().LeaseGrant(cctx, r)
resp, err := l.remote.LeaseGrant(cctx, r, grpc.FailFast(false))
if err == nil {
return (*LeaseGrantResponse)(resp), nil
gresp := &LeaseGrantResponse{
ResponseHeader: resp.GetHeader(),
ID: LeaseID(resp.ID),
TTL: resp.TTL,
Error: resp.Error,
}
return gresp, nil
}
if isHalted(cctx, err) {
return nil, err
if isHaltErr(cctx, err) {
return nil, toErr(ctx, err)
}
if nerr := l.switchRemoteAndStream(err); nerr != nil {
if nerr := l.newStream(); nerr != nil {
return nil, nerr
}
}
@@ -130,16 +156,15 @@ func (l *lessor) Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse,
for {
r := &pb.LeaseRevokeRequest{ID: int64(id)}
resp, err := l.getRemote().LeaseRevoke(cctx, r)
resp, err := l.remote.LeaseRevoke(cctx, r, grpc.FailFast(false))
if err == nil {
return (*LeaseRevokeResponse)(resp), nil
}
if isHalted(ctx, err) {
return nil, err
if isHaltErr(ctx, err) {
return nil, toErr(ctx, err)
}
if nerr := l.switchRemoteAndStream(err); nerr != nil {
if nerr := l.newStream(); nerr != nil {
return nil, nerr
}
}
@@ -153,10 +178,11 @@ func (l *lessor) KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAl
if !ok {
// create fresh keep alive
ka = &keepAlive{
chs: []chan<- *LeaseKeepAliveResponse{ch},
ctxs: []context.Context{ctx},
deadline: time.Now(),
donec: make(chan struct{}),
chs: []chan<- *LeaseKeepAliveResponse{ch},
ctxs: []context.Context{ctx},
deadline: time.Now().Add(l.firstKeepAliveTimeout),
nextKeepAlive: time.Now(),
donec: make(chan struct{}),
}
l.keepAlives[id] = ka
} else {
@@ -179,11 +205,16 @@ func (l *lessor) KeepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAlive
for {
resp, err := l.keepAliveOnce(cctx, id)
if err == nil {
if resp.TTL == 0 {
err = rpctypes.ErrLeaseNotFound
}
return resp, err
}
if isHaltErr(ctx, err) {
return nil, toErr(ctx, err)
}
nerr := l.switchRemoteAndStream(err)
if nerr != nil {
if nerr := l.newStream(); nerr != nil {
return nil, nerr
}
}
@@ -228,26 +259,34 @@ func (l *lessor) keepAliveCtxCloser(id LeaseID, ctx context.Context, donec <-cha
}
func (l *lessor) keepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAliveResponse, error) {
stream, err := l.getRemote().LeaseKeepAlive(ctx)
cctx, cancel := context.WithCancel(ctx)
defer cancel()
stream, err := l.remote.LeaseKeepAlive(cctx, grpc.FailFast(false))
if err != nil {
return nil, err
return nil, toErr(ctx, err)
}
err = stream.Send(&pb.LeaseKeepAliveRequest{ID: int64(id)})
if err != nil {
return nil, err
return nil, toErr(ctx, err)
}
resp, rerr := stream.Recv()
if rerr != nil {
return nil, rerr
return nil, toErr(ctx, rerr)
}
return (*LeaseKeepAliveResponse)(resp), nil
karesp := &LeaseKeepAliveResponse{
ResponseHeader: resp.GetHeader(),
ID: LeaseID(resp.ID),
TTL: resp.TTL,
}
return karesp, nil
}
func (l *lessor) recvKeepAliveLoop() {
defer func() {
l.stopCancel()
l.mu.Lock()
close(l.donec)
for _, ka := range l.keepAlives {
@@ -261,7 +300,7 @@ func (l *lessor) recvKeepAliveLoop() {
for serr == nil {
resp, err := stream.Recv()
if err != nil {
if isHalted(l.stopCtx, err) {
if isHaltErr(l.stopCtx, err) {
return
}
stream, serr = l.resetRecv()
@@ -273,7 +312,7 @@ func (l *lessor) recvKeepAliveLoop() {
// resetRecv opens a new lease stream and starts sending LeaseKeepAliveRequests
func (l *lessor) resetRecv() (pb.Lease_LeaseKeepAliveClient, error) {
if err := l.switchRemoteAndStream(nil); err != nil {
if err := l.newStream(); err != nil {
return nil, err
}
stream := l.getKeepAliveStream()
@@ -283,39 +322,68 @@ func (l *lessor) resetRecv() (pb.Lease_LeaseKeepAliveClient, error) {
// recvKeepAlive updates a lease based on its LeaseKeepAliveResponse
func (l *lessor) recvKeepAlive(resp *pb.LeaseKeepAliveResponse) {
id := LeaseID(resp.ID)
karesp := &LeaseKeepAliveResponse{
ResponseHeader: resp.GetHeader(),
ID: LeaseID(resp.ID),
TTL: resp.TTL,
}
l.mu.Lock()
defer l.mu.Unlock()
ka, ok := l.keepAlives[id]
ka, ok := l.keepAlives[karesp.ID]
if !ok {
return
}
if resp.TTL <= 0 {
if karesp.TTL <= 0 {
// lease expired; close all keep alive channels
delete(l.keepAlives, id)
delete(l.keepAlives, karesp.ID)
ka.Close()
return
}
// send update to all channels
nextDeadline := time.Now().Add(1 + time.Duration(resp.TTL/3)*time.Second)
nextKeepAlive := time.Now().Add(1 + time.Duration(karesp.TTL/3)*time.Second)
ka.deadline = time.Now().Add(time.Duration(karesp.TTL) * time.Second)
for _, ch := range ka.chs {
select {
case ch <- (*LeaseKeepAliveResponse)(resp):
ka.deadline = nextDeadline
case ch <- karesp:
ka.nextKeepAlive = nextKeepAlive
default:
}
}
}
// deadlineLoop reaps any keep alive channels that have not received a response
// within the lease TTL
func (l *lessor) deadlineLoop() {
for {
select {
case <-time.After(time.Second):
case <-l.donec:
return
}
now := time.Now()
l.mu.Lock()
for id, ka := range l.keepAlives {
if ka.deadline.Before(now) {
// waited too long for response; lease may be expired
ka.Close()
delete(l.keepAlives, id)
}
}
l.mu.Unlock()
}
}
// sendKeepAliveLoop sends LeaseKeepAliveRequests for the lifetime of a lease stream
func (l *lessor) sendKeepAliveLoop(stream pb.Lease_LeaseKeepAliveClient) {
for {
select {
case <-time.After(500 * time.Millisecond):
case <-stream.Context().Done():
return
case <-l.donec:
return
case <-l.stopCtx.Done():
@@ -327,7 +395,7 @@ func (l *lessor) sendKeepAliveLoop(stream pb.Lease_LeaseKeepAliveClient) {
now := time.Now()
l.mu.Lock()
for id, ka := range l.keepAlives {
if ka.deadline.Before(now) {
if ka.nextKeepAlive.Before(now) {
tosend = append(tosend, id)
}
}
@@ -343,57 +411,18 @@ func (l *lessor) sendKeepAliveLoop(stream pb.Lease_LeaseKeepAliveClient) {
}
}
func (l *lessor) getRemote() pb.LeaseClient {
l.mu.Lock()
defer l.mu.Unlock()
return l.remote
}
func (l *lessor) getKeepAliveStream() pb.Lease_LeaseKeepAliveClient {
l.mu.Lock()
defer l.mu.Unlock()
return l.stream
}
func (l *lessor) switchRemoteAndStream(prevErr error) error {
l.mu.Lock()
conn := l.conn
l.mu.Unlock()
var (
err error
newConn *grpc.ClientConn
)
if prevErr != nil {
conn.Close()
newConn, err = l.c.retryConnection(conn, prevErr)
if err != nil {
return err
}
}
l.mu.Lock()
if newConn != nil {
l.conn = newConn
}
l.remote = pb.NewLeaseClient(l.conn)
l.mu.Unlock()
serr := l.newStream()
if serr != nil {
return serr
}
return nil
}
func (l *lessor) newStream() error {
sctx, cancel := context.WithCancel(l.stopCtx)
stream, err := l.getRemote().LeaseKeepAlive(sctx)
stream, err := l.remote.LeaseKeepAlive(sctx, grpc.FailFast(false))
if err != nil {
cancel()
return err
return toErr(sctx, err)
}
l.mu.Lock()

View File

@@ -1,4 +1,4 @@
// Copyright 2016 CoreOS, Inc.
// Copyright 2016 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.

View File

@@ -1,4 +1,4 @@
// Copyright 2016 CoreOS, Inc.
// Copyright 2016 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@@ -15,7 +15,7 @@
package clientv3
import (
"sync"
"io"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"golang.org/x/net/context"
@@ -45,25 +45,20 @@ type Maintenance interface {
// times with different endpoints.
Defragment(ctx context.Context, endpoint string) (*DefragmentResponse, error)
// Status gets the status of the member.
// Status gets the status of the endpoint.
Status(ctx context.Context, endpoint string) (*StatusResponse, error)
// Snapshot provides a reader for a snapshot of a backend.
Snapshot(ctx context.Context) (io.ReadCloser, error)
}
type maintenance struct {
c *Client
mu sync.Mutex
conn *grpc.ClientConn // conn in-use
c *Client
remote pb.MaintenanceClient
}
func NewMaintenance(c *Client) Maintenance {
conn := c.ActiveConnection()
return &maintenance{
c: c,
conn: conn,
remote: pb.NewMaintenanceClient(conn),
}
return &maintenance{c: c, remote: pb.NewMaintenanceClient(c.conn)}
}
func (m *maintenance) AlarmList(ctx context.Context) (*AlarmResponse, error) {
@@ -73,15 +68,12 @@ func (m *maintenance) AlarmList(ctx context.Context) (*AlarmResponse, error) {
Alarm: pb.AlarmType_NONE, // all
}
for {
resp, err := m.getRemote().Alarm(ctx, req)
resp, err := m.remote.Alarm(ctx, req, grpc.FailFast(false))
if err == nil {
return (*AlarmResponse)(resp), nil
}
if isHalted(ctx, err) {
return nil, err
}
if err = m.switchRemote(err); err != nil {
return nil, err
if isHaltErr(ctx, err) {
return nil, toErr(ctx, err)
}
}
}
@@ -96,38 +88,36 @@ func (m *maintenance) AlarmDisarm(ctx context.Context, am *AlarmMember) (*AlarmR
if req.MemberID == 0 && req.Alarm == pb.AlarmType_NONE {
ar, err := m.AlarmList(ctx)
if err != nil {
return nil, err
return nil, toErr(ctx, err)
}
ret := AlarmResponse{}
for _, am := range ar.Alarms {
dresp, derr := m.AlarmDisarm(ctx, (*AlarmMember)(am))
if derr != nil {
return nil, derr
return nil, toErr(ctx, derr)
}
ret.Alarms = append(ret.Alarms, dresp.Alarms...)
}
return &ret, nil
}
resp, err := m.getRemote().Alarm(ctx, req)
resp, err := m.remote.Alarm(ctx, req, grpc.FailFast(false))
if err == nil {
return (*AlarmResponse)(resp), nil
}
if !isHalted(ctx, err) {
go m.switchRemote(err)
}
return nil, err
return nil, toErr(ctx, err)
}
func (m *maintenance) Defragment(ctx context.Context, endpoint string) (*DefragmentResponse, error) {
conn, err := m.c.Dial(endpoint)
if err != nil {
return nil, err
return nil, toErr(ctx, err)
}
defer conn.Close()
remote := pb.NewMaintenanceClient(conn)
resp, err := remote.Defragment(ctx, &pb.DefragmentRequest{})
resp, err := remote.Defragment(ctx, &pb.DefragmentRequest{}, grpc.FailFast(false))
if err != nil {
return nil, err
return nil, toErr(ctx, err)
}
return (*DefragmentResponse)(resp), nil
}
@@ -135,30 +125,40 @@ func (m *maintenance) Defragment(ctx context.Context, endpoint string) (*Defragm
func (m *maintenance) Status(ctx context.Context, endpoint string) (*StatusResponse, error) {
conn, err := m.c.Dial(endpoint)
if err != nil {
return nil, err
return nil, toErr(ctx, err)
}
defer conn.Close()
remote := pb.NewMaintenanceClient(conn)
resp, err := remote.Status(ctx, &pb.StatusRequest{})
resp, err := remote.Status(ctx, &pb.StatusRequest{}, grpc.FailFast(false))
if err != nil {
return nil, err
return nil, toErr(ctx, err)
}
return (*StatusResponse)(resp), nil
}
func (m *maintenance) getRemote() pb.MaintenanceClient {
m.mu.Lock()
defer m.mu.Unlock()
return m.remote
}
func (m *maintenance) switchRemote(prevErr error) error {
m.mu.Lock()
defer m.mu.Unlock()
newConn, err := m.c.retryConnection(m.conn, prevErr)
func (m *maintenance) Snapshot(ctx context.Context) (io.ReadCloser, error) {
ss, err := m.remote.Snapshot(ctx, &pb.SnapshotRequest{}, grpc.FailFast(false))
if err != nil {
return err
return nil, toErr(ctx, err)
}
m.conn = newConn
m.remote = pb.NewMaintenanceClient(m.conn)
return nil
pr, pw := io.Pipe()
go func() {
for {
resp, err := ss.Recv()
if err != nil {
pw.CloseWithError(err)
return
}
if resp == nil && err == nil {
break
}
if _, werr := pw.Write(resp.Blob); werr != nil {
pw.CloseWithError(werr)
return
}
}
pw.Close()
}()
return pr, nil
}

View File

@@ -1,4 +1,4 @@
// Copyright 2016 CoreOS, Inc.
// Copyright 2016 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@@ -41,6 +41,8 @@ type Op struct {
limit int64
sort *SortOption
serializable bool
keysOnly bool
countOnly bool
// for range, watch
rev int64
@@ -53,21 +55,29 @@ type Op struct {
leaseID LeaseID
}
func (op Op) toRequestUnion() *pb.RequestUnion {
func (op Op) toRequestOp() *pb.RequestOp {
switch op.t {
case tRange:
r := &pb.RangeRequest{Key: op.key, RangeEnd: op.end, Limit: op.limit, Revision: op.rev, Serializable: op.serializable}
r := &pb.RangeRequest{
Key: op.key,
RangeEnd: op.end,
Limit: op.limit,
Revision: op.rev,
Serializable: op.serializable,
KeysOnly: op.keysOnly,
CountOnly: op.countOnly,
}
if op.sort != nil {
r.SortOrder = pb.RangeRequest_SortOrder(op.sort.Order)
r.SortTarget = pb.RangeRequest_SortTarget(op.sort.Target)
}
return &pb.RequestUnion{Request: &pb.RequestUnion_RequestRange{RequestRange: r}}
return &pb.RequestOp{Request: &pb.RequestOp_RequestRange{RequestRange: r}}
case tPut:
r := &pb.PutRequest{Key: op.key, Value: op.val, Lease: int64(op.leaseID)}
return &pb.RequestUnion{Request: &pb.RequestUnion_RequestPut{RequestPut: r}}
return &pb.RequestOp{Request: &pb.RequestOp_RequestPut{RequestPut: r}}
case tDeleteRange:
r := &pb.DeleteRangeRequest{Key: op.key, RangeEnd: op.end}
return &pb.RequestUnion{Request: &pb.RequestUnion_RequestDeleteRange{RequestDeleteRange: r}}
return &pb.RequestOp{Request: &pb.RequestOp_RequestDeleteRange{RequestDeleteRange: r}}
default:
panic("Unknown Op")
}
@@ -97,6 +107,8 @@ func OpDelete(key string, opts ...OpOption) Op {
panic("unexpected sort in delete")
case ret.serializable:
panic("unexpected serializable in delete")
case ret.countOnly:
panic("unexpected countOnly in delete")
}
return ret
}
@@ -114,7 +126,9 @@ func OpPut(key, val string, opts ...OpOption) Op {
case ret.sort != nil:
panic("unexpected sort in put")
case ret.serializable:
panic("unexpected serializable in delete")
panic("unexpected serializable in put")
case ret.countOnly:
panic("unexpected countOnly in delete")
}
return ret
}
@@ -131,6 +145,8 @@ func opWatch(key string, opts ...OpOption) Op {
panic("unexpected sort in watch")
case ret.serializable:
panic("unexpected serializable in watch")
case ret.countOnly:
panic("unexpected countOnly in delete")
}
return ret
}
@@ -166,6 +182,12 @@ func WithSort(target SortTarget, order SortOrder) OpOption {
}
}
// GetPrefixRangeEnd gets the range end of the prefix.
// 'Get(foo, WithPrefix())' is equal to 'Get(foo, WithRange(GetPrefixRangeEnd(foo))'.
func GetPrefixRangeEnd(prefix string) string {
return string(getPrefix([]byte(prefix)))
}
func getPrefix(key []byte) []byte {
end := make([]byte, len(key))
copy(end, key)
@@ -198,7 +220,7 @@ func WithRange(endKey string) OpOption {
}
// WithFromKey specifies the range of 'Get' or 'Delete' requests
// to be equal or greater than they key in the argument.
// to be equal or greater than the key in the argument.
func WithFromKey() OpOption { return WithRange("\x00") }
// WithSerializable makes 'Get' request serializable. By default,
@@ -208,6 +230,17 @@ func WithSerializable() OpOption {
return func(op *Op) { op.serializable = true }
}
// WithKeysOnly makes the 'Get' request return only the keys and the corresponding
// values will be omitted.
func WithKeysOnly() OpOption {
return func(op *Op) { op.keysOnly = true }
}
// WithCountOnly makes the 'Get' request return only the count of keys.
func WithCountOnly() OpOption {
return func(op *Op) { op.countOnly = true }
}
// WithFirstCreate gets the key with the oldest creation revision in the request range.
func WithFirstCreate() []OpOption { return withTop(SortByCreateRevision, SortAscend) }

View File

@@ -1,4 +1,4 @@
// Copyright 2016 CoreOS, Inc.
// Copyright 2016 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.

View File

@@ -1,4 +1,4 @@
// Copyright 2016 CoreOS, Inc.
// Copyright 2016 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@@ -19,17 +19,20 @@ import (
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"golang.org/x/net/context"
"google.golang.org/grpc"
)
// Txn is the interface that wraps mini-transactions.
//
// Tx.If(
// Compare(Value(k1), ">", v1),
// Compare(Version(k1), "=", 2)
// ).Then(
// OpPut(k2,v2), OpPut(k3,v3)
// ).Else(
// OpPut(k4,v4), OpPut(k5,v5)
// ).Commit()
//
// Tx.If(
// Compare(Value(k1), ">", v1),
// Compare(Version(k1), "=", 2)
// ).Then(
// OpPut(k2,v2), OpPut(k3,v3)
// ).Else(
// OpPut(k4,v4), OpPut(k5,v5)
// ).Commit()
type Txn interface {
// If takes a list of comparison. If all comparisons passed in succeed,
// the operations passed into Then() will be executed. Or the operations
@@ -63,8 +66,8 @@ type txn struct {
cmps []*pb.Compare
sus []*pb.RequestUnion
fas []*pb.RequestUnion
sus []*pb.RequestOp
fas []*pb.RequestOp
}
func (txn *txn) If(cs ...Cmp) Txn {
@@ -107,7 +110,7 @@ func (txn *txn) Then(ops ...Op) Txn {
for _, op := range ops {
txn.isWrite = txn.isWrite || op.isWrite()
txn.sus = append(txn.sus, op.toRequestUnion())
txn.sus = append(txn.sus, op.toRequestOp())
}
return txn
@@ -125,7 +128,7 @@ func (txn *txn) Else(ops ...Op) Txn {
for _, op := range ops {
txn.isWrite = txn.isWrite || op.isWrite()
txn.fas = append(txn.fas, op.toRequestUnion())
txn.fas = append(txn.fas, op.toRequestOp())
}
return txn
@@ -134,27 +137,25 @@ func (txn *txn) Else(ops ...Op) Txn {
func (txn *txn) Commit() (*TxnResponse, error) {
txn.mu.Lock()
defer txn.mu.Unlock()
kv := txn.kv
for {
r := &pb.TxnRequest{Compare: txn.cmps, Success: txn.sus, Failure: txn.fas}
resp, err := kv.getRemote().Txn(txn.ctx, r)
resp, err := txn.commit()
if err == nil {
return (*TxnResponse)(resp), nil
return resp, err
}
if isHalted(txn.ctx, err) {
return nil, err
if isHaltErr(txn.ctx, err) {
return nil, toErr(txn.ctx, err)
}
if txn.isWrite {
go kv.switchRemote(err)
return nil, err
}
if nerr := kv.switchRemote(err); nerr != nil {
return nil, nerr
return nil, toErr(txn.ctx, err)
}
}
}
func (txn *txn) commit() (*TxnResponse, error) {
r := &pb.TxnRequest{Compare: txn.cmps, Success: txn.sus, Failure: txn.fas}
resp, err := txn.kv.remote.Txn(txn.ctx, r, grpc.FailFast(false))
if err != nil {
return nil, err
}
return (*TxnResponse)(resp), nil
}

View File

@@ -1,4 +1,4 @@
// Copyright 2016 CoreOS, Inc.
// Copyright 2016 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@@ -17,20 +17,23 @@ package clientv3
import (
"fmt"
"sync"
"time"
v3rpc "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
storagepb "github.com/coreos/etcd/storage/storagepb"
mvccpb "github.com/coreos/etcd/mvcc/mvccpb"
"golang.org/x/net/context"
"google.golang.org/grpc"
)
const (
EventTypeDelete = storagepb.DELETE
EventTypePut = storagepb.PUT
EventTypeDelete = mvccpb.DELETE
EventTypePut = mvccpb.PUT
closeSendErrTimeout = 250 * time.Millisecond
)
type Event storagepb.Event
type Event mvccpb.Event
type WatchChan <-chan WatchResponse
@@ -39,7 +42,7 @@ type Watcher interface {
// through the returned channel.
// If the watch is slow or the required rev is compacted, the watch request
// might be canceled from the server-side and the chan will be closed.
// 'opts' can be: 'WithRev' and/or 'WitchPrefix'.
// 'opts' can be: 'WithRev' and/or 'WithPrefix'.
Watch(ctx context.Context, key string, opts ...OpOption) WatchChan
// Close closes the watcher and cancels all watch requests.
@@ -57,6 +60,8 @@ type WatchResponse struct {
// If the watch failed and the stream was about to close, before the channel is closed,
// the channel sends a final response that has Canceled set to true with a non-nil Err().
Canceled bool
closeErr error
}
// IsCreate returns true if the event tells that the key is newly created.
@@ -71,10 +76,12 @@ func (e *Event) IsModify() bool {
// Err is the error value if this WatchResponse holds an error.
func (wr *WatchResponse) Err() error {
if wr.CompactRevision != 0 {
switch {
case wr.closeErr != nil:
return v3rpc.Error(wr.closeErr)
case wr.CompactRevision != 0:
return v3rpc.ErrCompacted
}
if wr.Canceled {
case wr.Canceled:
return v3rpc.ErrFutureRev
}
return nil
@@ -87,18 +94,28 @@ func (wr *WatchResponse) IsProgressNotify() bool {
// watcher implements the Watcher interface
type watcher struct {
c *Client
conn *grpc.ClientConn
remote pb.WatchClient
// mu protects the grpc streams map
mu sync.RWMutex
// streams holds all the active grpc streams keyed by ctx value.
streams map[string]*watchGrpcStream
}
type watchGrpcStream struct {
owner *watcher
remote pb.WatchClient
// ctx controls internal remote.Watch requests
ctx context.Context
ctx context.Context
// ctxKey is the key used when looking up this stream's context
ctxKey string
cancel context.CancelFunc
// streams holds all active watchers
streams map[int64]*watcherStream
// mu protects the streams map
mu sync.RWMutex
// streams holds all active watchers
streams map[int64]*watcherStream
// reqc sends a watch request from Watch() to the main goroutine
reqc chan *watchRequest
@@ -108,8 +125,11 @@ type watcher struct {
stopc chan struct{}
// donec closes to broadcast shutdown
donec chan struct{}
// errc transmits errors from grpc Recv
// errc transmits errors from grpc Recv to the watch stream reconn logic
errc chan error
// the error that closed the watch stream
closeErr error
}
// watchRequest is issued by the subscriber to start a new watcher
@@ -126,6 +146,7 @@ type watchRequest struct {
// watcherStream represents a registered watcher
type watcherStream struct {
// initReq is the request that initiated this request
initReq watchRequest
// outc publishes watch responses to subscriber
@@ -141,15 +162,30 @@ type watcherStream struct {
}
func NewWatcher(c *Client) Watcher {
ctx, cancel := context.WithCancel(context.Background())
conn := c.ActiveConnection()
return &watcher{
remote: pb.NewWatchClient(c.conn),
streams: make(map[string]*watchGrpcStream),
}
}
w := &watcher{
c: c,
conn: conn,
remote: pb.NewWatchClient(conn),
// never closes
var valCtxCh = make(chan struct{})
var zeroTime = time.Unix(0, 0)
// ctx with only the values; never Done
type valCtx struct{ context.Context }
func (vc *valCtx) Deadline() (time.Time, bool) { return zeroTime, false }
func (vc *valCtx) Done() <-chan struct{} { return valCtxCh }
func (vc *valCtx) Err() error { return nil }
func (w *watcher) newWatcherGrpcStream(inctx context.Context) *watchGrpcStream {
ctx, cancel := context.WithCancel(&valCtx{inctx})
wgs := &watchGrpcStream{
owner: w,
remote: w.remote,
ctx: ctx,
ctxKey: fmt.Sprintf("%v", inctx),
cancel: cancel,
streams: make(map[int64]*watcherStream),
@@ -159,8 +195,8 @@ func NewWatcher(c *Client) Watcher {
donec: make(chan struct{}),
errc: make(chan error, 1),
}
go w.run()
return w
go wgs.run()
return wgs
}
// Watch posts a watch request to run() and waits for a new watcher channel
@@ -178,13 +214,41 @@ func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) Watch
}
ok := false
ctxKey := fmt.Sprintf("%v", ctx)
// find or allocate appropriate grpc watch stream
w.mu.Lock()
if w.streams == nil {
// closed
w.mu.Unlock()
ch := make(chan WatchResponse)
close(ch)
return ch
}
wgs := w.streams[ctxKey]
if wgs == nil {
wgs = w.newWatcherGrpcStream(ctx)
w.streams[ctxKey] = wgs
}
donec := wgs.donec
reqc := wgs.reqc
w.mu.Unlock()
// couldn't create channel; return closed channel
closeCh := make(chan WatchResponse, 1)
// submit request
select {
case w.reqc <- wr:
case reqc <- wr:
ok = true
case <-wr.ctx.Done():
case <-w.donec:
case <-donec:
if wgs.closeErr != nil {
closeCh <- WatchResponse{closeErr: wgs.closeErr}
break
}
// retry; may have dropped stream from no ctxs
return w.Watch(ctx, key, opts...)
}
// receive channel
@@ -193,26 +257,44 @@ func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) Watch
case ret := <-retc:
return ret
case <-ctx.Done():
case <-w.donec:
case <-donec:
if wgs.closeErr != nil {
closeCh <- WatchResponse{closeErr: wgs.closeErr}
break
}
// retry; may have dropped stream from no ctxs
return w.Watch(ctx, key, opts...)
}
}
// couldn't create channel; return closed channel
ch := make(chan WatchResponse)
close(ch)
return ch
close(closeCh)
return closeCh
}
func (w *watcher) Close() error {
select {
case w.stopc <- struct{}{}:
case <-w.donec:
func (w *watcher) Close() (err error) {
w.mu.Lock()
streams := w.streams
w.streams = nil
w.mu.Unlock()
for _, wgs := range streams {
if werr := wgs.Close(); werr != nil {
err = werr
}
}
<-w.donec
return <-w.errc
return err
}
func (w *watcher) addStream(resp *pb.WatchResponse, pendingReq *watchRequest) {
func (w *watchGrpcStream) Close() (err error) {
close(w.stopc)
<-w.donec
select {
case err = <-w.errc:
default:
}
return toErr(w.ctx, err)
}
func (w *watchGrpcStream) addStream(resp *pb.WatchResponse, pendingReq *watchRequest) {
if pendingReq == nil {
// no pending request; ignore
return
@@ -265,27 +347,32 @@ func (w *watcher) addStream(resp *pb.WatchResponse, pendingReq *watchRequest) {
}
// closeStream closes the watcher resources and removes it
func (w *watcher) closeStream(ws *watcherStream) {
func (w *watchGrpcStream) closeStream(ws *watcherStream) {
// cancels request stream; subscriber receives nil channel
close(ws.initReq.retc)
// close subscriber's channel
close(ws.outc)
// shutdown serveStream
close(ws.recvc)
delete(w.streams, ws.id)
}
// run is the root of the goroutines for managing a watcher client
func (w *watcher) run() {
func (w *watchGrpcStream) run() {
var wc pb.Watch_WatchClient
var closeErr error
defer func() {
w.owner.mu.Lock()
w.closeErr = closeErr
if w.owner.streams != nil {
delete(w.owner.streams, w.ctxKey)
}
close(w.donec)
w.owner.mu.Unlock()
w.cancel()
}()
// start a stream with the etcd grpc server
wc, wcerr := w.newWatchClient()
if wcerr != nil {
w.errc <- wcerr
if wc, closeErr = w.newWatchClient(); closeErr != nil {
return
}
@@ -314,6 +401,18 @@ func (w *watcher) run() {
curReqC = w.reqc
case pbresp.Canceled:
delete(cancelSet, pbresp.WatchId)
// shutdown serveStream, if any
w.mu.Lock()
if ws, ok := w.streams[pbresp.WatchId]; ok {
close(ws.recvc)
delete(w.streams, ws.id)
}
numStreams := len(w.streams)
w.mu.Unlock()
if numStreams == 0 {
// don't leak watcher streams
return
}
default:
// dispatch to appropriate watch stream
if ok := w.dispatchEvent(pbresp); ok {
@@ -334,9 +433,12 @@ func (w *watcher) run() {
}
// watch client failed to recv; spawn another if possible
// TODO report watch client errors from errc?
case <-w.errc:
if wc, wcerr = w.newWatchClient(); wcerr != nil {
w.errc <- wcerr
case err := <-w.errc:
if toErr(w.ctx, err) == v3rpc.ErrNoLeader {
closeErr = err
return
}
if wc, closeErr = w.newWatchClient(); closeErr != nil {
return
}
curReqC = w.reqc
@@ -345,7 +447,6 @@ func (w *watcher) run() {
}
cancelSet = make(map[int64]struct{})
case <-w.stopc:
w.errc <- nil
return
}
@@ -365,7 +466,7 @@ func (w *watcher) run() {
}
// dispatchEvent sends a WatchResponse to the appropriate watcher stream
func (w *watcher) dispatchEvent(pbresp *pb.WatchResponse) bool {
func (w *watchGrpcStream) dispatchEvent(pbresp *pb.WatchResponse) bool {
w.mu.RLock()
defer w.mu.RUnlock()
ws, ok := w.streams[pbresp.WatchId]
@@ -385,7 +486,7 @@ func (w *watcher) dispatchEvent(pbresp *pb.WatchResponse) bool {
}
// serveWatchClient forwards messages from the grpc stream to run()
func (w *watcher) serveWatchClient(wc pb.Watch_WatchClient) {
func (w *watchGrpcStream) serveWatchClient(wc pb.Watch_WatchClient) {
for {
resp, err := wc.Recv()
if err != nil {
@@ -404,7 +505,8 @@ func (w *watcher) serveWatchClient(wc pb.Watch_WatchClient) {
}
// serveStream forwards watch responses from run() to the subscriber
func (w *watcher) serveStream(ws *watcherStream) {
func (w *watchGrpcStream) serveStream(ws *watcherStream) {
var closeErr error
emptyWr := &WatchResponse{}
wrs := []*WatchResponse{}
resuming := false
@@ -440,7 +542,7 @@ func (w *watcher) serveStream(ws *watcherStream) {
return
}
// resume up to last seen event if disconnected
if resuming {
if resuming && wr.Err() == nil {
resuming = false
// trim events already seen
for i := 0; i < len(wr.Events); i++ {
@@ -454,6 +556,7 @@ func (w *watcher) serveStream(ws *watcherStream) {
break
}
}
resuming = false
// TODO don't keep buffering if subscriber stops reading
wrs = append(wrs, wr)
case resumeRev := <-ws.resumec:
@@ -468,17 +571,28 @@ func (w *watcher) serveStream(ws *watcherStream) {
}
case <-w.donec:
closing = true
closeErr = w.closeErr
case <-ws.initReq.ctx.Done():
closing = true
}
}
// try to send off close error
if closeErr != nil {
select {
case ws.outc <- WatchResponse{closeErr: w.closeErr}:
case <-w.donec:
case <-time.After(closeSendErrTimeout):
}
}
w.mu.Lock()
w.closeStream(ws)
w.mu.Unlock()
// lazily send cancel message if events on missing id
}
func (w *watcher) newWatchClient() (pb.Watch_WatchClient, error) {
func (w *watchGrpcStream) newWatchClient() (pb.Watch_WatchClient, error) {
ws, rerr := w.resume()
if rerr != nil {
return nil, rerr
@@ -488,7 +602,7 @@ func (w *watcher) newWatchClient() (pb.Watch_WatchClient, error) {
}
// resume creates a new WatchClient with all current watchers reestablished
func (w *watcher) resume() (ws pb.Watch_WatchClient, err error) {
func (w *watchGrpcStream) resume() (ws pb.Watch_WatchClient, err error) {
for {
if ws, err = w.openWatchClient(); err != nil {
break
@@ -496,31 +610,34 @@ func (w *watcher) resume() (ws pb.Watch_WatchClient, err error) {
break
}
}
return ws, err
return ws, v3rpc.Error(err)
}
// openWatchClient retries opening a watchclient until retryConnection fails
func (w *watcher) openWatchClient() (ws pb.Watch_WatchClient, err error) {
func (w *watchGrpcStream) openWatchClient() (ws pb.Watch_WatchClient, err error) {
for {
if ws, err = w.remote.Watch(w.ctx); ws != nil {
break
} else if isHalted(w.ctx, err) {
select {
case <-w.stopc:
if err == nil {
err = context.Canceled
}
return nil, err
default:
}
newConn, nerr := w.c.retryConnection(w.conn, nil)
if nerr != nil {
return nil, nerr
if ws, err = w.remote.Watch(w.ctx, grpc.FailFast(false)); ws != nil && err == nil {
break
}
if isHaltErr(w.ctx, err) {
return nil, v3rpc.Error(err)
}
w.conn = newConn
w.remote = pb.NewWatchClient(w.conn)
}
return ws, nil
}
// resumeWatchers rebuilds every registered watcher on a new client
func (w *watcher) resumeWatchers(wc pb.Watch_WatchClient) error {
streams := []*watcherStream{}
func (w *watchGrpcStream) resumeWatchers(wc pb.Watch_WatchClient) error {
w.mu.RLock()
streams := make([]*watcherStream, 0, len(w.streams))
for _, ws := range w.streams {
streams = append(streams, ws)
}