From d53add76dda7a8f2f8c5fad71b44f6750c453aa1 Mon Sep 17 00:00:00 2001 From: Wojciech Tyczynski Date: Thu, 16 Feb 2017 09:10:26 +0100 Subject: [PATCH] Update etcd Godeps to 3.0.17 --- Godeps/Godeps.json | 208 +++++++++--------- vendor/BUILD | 1 + .../github.com/coreos/etcd/clientv3/lease.go | 53 ++--- vendor/github.com/coreos/etcd/clientv3/op.go | 5 +- .../github.com/coreos/etcd/clientv3/watch.go | 97 +++++--- .../coreos/etcd/etcdserver/api/v3rpc/lease.go | 4 +- .../etcdserver/api/v3rpc/rpctypes/error.go | 30 ++- .../coreos/etcd/etcdserver/api/v3rpc/util.go | 11 + .../coreos/etcd/etcdserver/server.go | 7 +- .../coreos/etcd/etcdserver/v3_server.go | 44 ++-- .../coreos/etcd/lease/leasehttp/http.go | 15 +- vendor/github.com/coreos/etcd/lease/lessor.go | 34 +-- vendor/github.com/coreos/etcd/mvcc/watcher.go | 7 + .../github.com/coreos/etcd/version/version.go | 2 +- 14 files changed, 297 insertions(+), 221 deletions(-) diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index 55e0c83c02b..2754b76f914 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -423,263 +423,263 @@ }, { "ImportPath": "github.com/coreos/etcd/alarm", - "Comment": "v3.0.14", - "Rev": "8a37349097a592db79ba3087f3cae9cd4b0af21c" + "Comment": "v3.0.17", + "Rev": "cc198e22d3b8fd7ec98304c95e68ee375be54589" }, { "ImportPath": "github.com/coreos/etcd/auth", - "Comment": "v3.0.14", - "Rev": "8a37349097a592db79ba3087f3cae9cd4b0af21c" + "Comment": "v3.0.17", + "Rev": "cc198e22d3b8fd7ec98304c95e68ee375be54589" }, { "ImportPath": "github.com/coreos/etcd/auth/authpb", - "Comment": "v3.0.14", - "Rev": "8a37349097a592db79ba3087f3cae9cd4b0af21c" + "Comment": "v3.0.17", + "Rev": "cc198e22d3b8fd7ec98304c95e68ee375be54589" }, { "ImportPath": "github.com/coreos/etcd/client", - "Comment": "v3.0.14", - "Rev": "8a37349097a592db79ba3087f3cae9cd4b0af21c" + "Comment": "v3.0.17", + "Rev": "cc198e22d3b8fd7ec98304c95e68ee375be54589" }, { "ImportPath": "github.com/coreos/etcd/clientv3", - "Comment": "v3.0.14", - "Rev": "8a37349097a592db79ba3087f3cae9cd4b0af21c" + "Comment": "v3.0.17", + "Rev": "cc198e22d3b8fd7ec98304c95e68ee375be54589" }, { "ImportPath": "github.com/coreos/etcd/compactor", - "Comment": "v3.0.14", - "Rev": "8a37349097a592db79ba3087f3cae9cd4b0af21c" + "Comment": "v3.0.17", + "Rev": "cc198e22d3b8fd7ec98304c95e68ee375be54589" }, { "ImportPath": "github.com/coreos/etcd/discovery", - "Comment": "v3.0.14", - "Rev": "8a37349097a592db79ba3087f3cae9cd4b0af21c" + "Comment": "v3.0.17", + "Rev": "cc198e22d3b8fd7ec98304c95e68ee375be54589" }, { "ImportPath": "github.com/coreos/etcd/error", - "Comment": "v3.0.14", - "Rev": "8a37349097a592db79ba3087f3cae9cd4b0af21c" + "Comment": "v3.0.17", + "Rev": "cc198e22d3b8fd7ec98304c95e68ee375be54589" }, { "ImportPath": "github.com/coreos/etcd/etcdserver", - "Comment": "v3.0.14", - "Rev": "8a37349097a592db79ba3087f3cae9cd4b0af21c" + "Comment": "v3.0.17", + "Rev": "cc198e22d3b8fd7ec98304c95e68ee375be54589" }, { "ImportPath": "github.com/coreos/etcd/etcdserver/api", - "Comment": "v3.0.14", - "Rev": "8a37349097a592db79ba3087f3cae9cd4b0af21c" + "Comment": "v3.0.17", + "Rev": "cc198e22d3b8fd7ec98304c95e68ee375be54589" }, { "ImportPath": "github.com/coreos/etcd/etcdserver/api/v2http", - "Comment": "v3.0.14", - "Rev": "8a37349097a592db79ba3087f3cae9cd4b0af21c" + "Comment": "v3.0.17", + "Rev": "cc198e22d3b8fd7ec98304c95e68ee375be54589" }, { "ImportPath": "github.com/coreos/etcd/etcdserver/api/v2http/httptypes", - "Comment": "v3.0.14", - "Rev": "8a37349097a592db79ba3087f3cae9cd4b0af21c" + "Comment": "v3.0.17", + "Rev": "cc198e22d3b8fd7ec98304c95e68ee375be54589" }, { "ImportPath": "github.com/coreos/etcd/etcdserver/api/v3rpc", - "Comment": "v3.0.14", - "Rev": "8a37349097a592db79ba3087f3cae9cd4b0af21c" + "Comment": "v3.0.17", + "Rev": "cc198e22d3b8fd7ec98304c95e68ee375be54589" }, { "ImportPath": "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes", - "Comment": "v3.0.14", - "Rev": "8a37349097a592db79ba3087f3cae9cd4b0af21c" + "Comment": "v3.0.17", + "Rev": "cc198e22d3b8fd7ec98304c95e68ee375be54589" }, { "ImportPath": "github.com/coreos/etcd/etcdserver/auth", - "Comment": "v3.0.14", - "Rev": "8a37349097a592db79ba3087f3cae9cd4b0af21c" + "Comment": "v3.0.17", + "Rev": "cc198e22d3b8fd7ec98304c95e68ee375be54589" }, { "ImportPath": "github.com/coreos/etcd/etcdserver/etcdserverpb", - "Comment": "v3.0.14", - "Rev": "8a37349097a592db79ba3087f3cae9cd4b0af21c" + "Comment": "v3.0.17", + "Rev": "cc198e22d3b8fd7ec98304c95e68ee375be54589" }, { "ImportPath": "github.com/coreos/etcd/etcdserver/membership", - "Comment": "v3.0.14", - "Rev": "8a37349097a592db79ba3087f3cae9cd4b0af21c" + "Comment": "v3.0.17", + "Rev": "cc198e22d3b8fd7ec98304c95e68ee375be54589" }, { "ImportPath": "github.com/coreos/etcd/etcdserver/stats", - "Comment": "v3.0.14", - "Rev": "8a37349097a592db79ba3087f3cae9cd4b0af21c" + "Comment": "v3.0.17", + "Rev": "cc198e22d3b8fd7ec98304c95e68ee375be54589" }, { "ImportPath": "github.com/coreos/etcd/integration", - "Comment": "v3.0.14", - "Rev": "8a37349097a592db79ba3087f3cae9cd4b0af21c" + "Comment": "v3.0.17", + "Rev": "cc198e22d3b8fd7ec98304c95e68ee375be54589" }, { "ImportPath": "github.com/coreos/etcd/lease", - "Comment": "v3.0.14", - "Rev": "8a37349097a592db79ba3087f3cae9cd4b0af21c" + "Comment": "v3.0.17", + "Rev": "cc198e22d3b8fd7ec98304c95e68ee375be54589" }, { "ImportPath": "github.com/coreos/etcd/lease/leasehttp", - "Comment": "v3.0.14", - "Rev": "8a37349097a592db79ba3087f3cae9cd4b0af21c" + "Comment": "v3.0.17", + "Rev": "cc198e22d3b8fd7ec98304c95e68ee375be54589" }, { "ImportPath": "github.com/coreos/etcd/lease/leasepb", - "Comment": "v3.0.14", - "Rev": "8a37349097a592db79ba3087f3cae9cd4b0af21c" + "Comment": "v3.0.17", + "Rev": "cc198e22d3b8fd7ec98304c95e68ee375be54589" }, { "ImportPath": "github.com/coreos/etcd/mvcc", - "Comment": "v3.0.14", - "Rev": "8a37349097a592db79ba3087f3cae9cd4b0af21c" + "Comment": "v3.0.17", + "Rev": "cc198e22d3b8fd7ec98304c95e68ee375be54589" }, { "ImportPath": "github.com/coreos/etcd/mvcc/backend", - "Comment": "v3.0.14", - "Rev": "8a37349097a592db79ba3087f3cae9cd4b0af21c" + "Comment": "v3.0.17", + "Rev": "cc198e22d3b8fd7ec98304c95e68ee375be54589" }, { "ImportPath": "github.com/coreos/etcd/mvcc/mvccpb", - "Comment": "v3.0.14", - "Rev": "8a37349097a592db79ba3087f3cae9cd4b0af21c" + "Comment": "v3.0.17", + "Rev": "cc198e22d3b8fd7ec98304c95e68ee375be54589" }, { "ImportPath": "github.com/coreos/etcd/pkg/adt", - "Comment": "v3.0.14", - "Rev": "8a37349097a592db79ba3087f3cae9cd4b0af21c" + "Comment": "v3.0.17", + "Rev": "cc198e22d3b8fd7ec98304c95e68ee375be54589" }, { "ImportPath": "github.com/coreos/etcd/pkg/contention", - "Comment": "v3.0.14", - "Rev": "8a37349097a592db79ba3087f3cae9cd4b0af21c" + "Comment": "v3.0.17", + "Rev": "cc198e22d3b8fd7ec98304c95e68ee375be54589" }, { "ImportPath": "github.com/coreos/etcd/pkg/crc", - "Comment": "v3.0.14", - "Rev": "8a37349097a592db79ba3087f3cae9cd4b0af21c" + "Comment": "v3.0.17", + "Rev": "cc198e22d3b8fd7ec98304c95e68ee375be54589" }, { "ImportPath": "github.com/coreos/etcd/pkg/fileutil", - "Comment": "v3.0.14", - "Rev": "8a37349097a592db79ba3087f3cae9cd4b0af21c" + "Comment": "v3.0.17", + "Rev": "cc198e22d3b8fd7ec98304c95e68ee375be54589" }, { "ImportPath": "github.com/coreos/etcd/pkg/httputil", - "Comment": "v3.0.14", - "Rev": "8a37349097a592db79ba3087f3cae9cd4b0af21c" + "Comment": "v3.0.17", + "Rev": "cc198e22d3b8fd7ec98304c95e68ee375be54589" }, { "ImportPath": "github.com/coreos/etcd/pkg/idutil", - "Comment": "v3.0.14", - "Rev": "8a37349097a592db79ba3087f3cae9cd4b0af21c" + "Comment": "v3.0.17", + "Rev": "cc198e22d3b8fd7ec98304c95e68ee375be54589" }, { "ImportPath": "github.com/coreos/etcd/pkg/ioutil", - "Comment": "v3.0.14", - "Rev": "8a37349097a592db79ba3087f3cae9cd4b0af21c" + "Comment": "v3.0.17", + "Rev": "cc198e22d3b8fd7ec98304c95e68ee375be54589" }, { "ImportPath": "github.com/coreos/etcd/pkg/logutil", - "Comment": "v3.0.14", - "Rev": "8a37349097a592db79ba3087f3cae9cd4b0af21c" + "Comment": "v3.0.17", + "Rev": "cc198e22d3b8fd7ec98304c95e68ee375be54589" }, { "ImportPath": "github.com/coreos/etcd/pkg/netutil", - "Comment": "v3.0.14", - "Rev": "8a37349097a592db79ba3087f3cae9cd4b0af21c" + "Comment": "v3.0.17", + "Rev": "cc198e22d3b8fd7ec98304c95e68ee375be54589" }, { "ImportPath": "github.com/coreos/etcd/pkg/pathutil", - "Comment": "v3.0.14", - "Rev": "8a37349097a592db79ba3087f3cae9cd4b0af21c" + "Comment": "v3.0.17", + "Rev": "cc198e22d3b8fd7ec98304c95e68ee375be54589" }, { "ImportPath": "github.com/coreos/etcd/pkg/pbutil", - "Comment": "v3.0.14", - "Rev": "8a37349097a592db79ba3087f3cae9cd4b0af21c" + "Comment": "v3.0.17", + "Rev": "cc198e22d3b8fd7ec98304c95e68ee375be54589" }, { "ImportPath": "github.com/coreos/etcd/pkg/runtime", - "Comment": "v3.0.14", - "Rev": "8a37349097a592db79ba3087f3cae9cd4b0af21c" + "Comment": "v3.0.17", + "Rev": "cc198e22d3b8fd7ec98304c95e68ee375be54589" }, { "ImportPath": "github.com/coreos/etcd/pkg/schedule", - "Comment": "v3.0.14", - "Rev": "8a37349097a592db79ba3087f3cae9cd4b0af21c" + "Comment": "v3.0.17", + "Rev": "cc198e22d3b8fd7ec98304c95e68ee375be54589" }, { "ImportPath": "github.com/coreos/etcd/pkg/testutil", - "Comment": "v3.0.14", - "Rev": "8a37349097a592db79ba3087f3cae9cd4b0af21c" + "Comment": "v3.0.17", + "Rev": "cc198e22d3b8fd7ec98304c95e68ee375be54589" }, { "ImportPath": "github.com/coreos/etcd/pkg/tlsutil", - "Comment": "v3.0.14", - "Rev": "8a37349097a592db79ba3087f3cae9cd4b0af21c" + "Comment": "v3.0.17", + "Rev": "cc198e22d3b8fd7ec98304c95e68ee375be54589" }, { "ImportPath": "github.com/coreos/etcd/pkg/transport", - "Comment": "v3.0.14", - "Rev": "8a37349097a592db79ba3087f3cae9cd4b0af21c" + "Comment": "v3.0.17", + "Rev": "cc198e22d3b8fd7ec98304c95e68ee375be54589" }, { "ImportPath": "github.com/coreos/etcd/pkg/types", - "Comment": "v3.0.14", - "Rev": "8a37349097a592db79ba3087f3cae9cd4b0af21c" + "Comment": "v3.0.17", + "Rev": "cc198e22d3b8fd7ec98304c95e68ee375be54589" }, { "ImportPath": "github.com/coreos/etcd/pkg/wait", - "Comment": "v3.0.14", - "Rev": "8a37349097a592db79ba3087f3cae9cd4b0af21c" + "Comment": "v3.0.17", + "Rev": "cc198e22d3b8fd7ec98304c95e68ee375be54589" }, { "ImportPath": "github.com/coreos/etcd/raft", - "Comment": "v3.0.14", - "Rev": "8a37349097a592db79ba3087f3cae9cd4b0af21c" + "Comment": "v3.0.17", + "Rev": "cc198e22d3b8fd7ec98304c95e68ee375be54589" }, { "ImportPath": "github.com/coreos/etcd/raft/raftpb", - "Comment": "v3.0.14", - "Rev": "8a37349097a592db79ba3087f3cae9cd4b0af21c" + "Comment": "v3.0.17", + "Rev": "cc198e22d3b8fd7ec98304c95e68ee375be54589" }, { "ImportPath": "github.com/coreos/etcd/rafthttp", - "Comment": "v3.0.14", - "Rev": "8a37349097a592db79ba3087f3cae9cd4b0af21c" + "Comment": "v3.0.17", + "Rev": "cc198e22d3b8fd7ec98304c95e68ee375be54589" }, { "ImportPath": "github.com/coreos/etcd/snap", - "Comment": "v3.0.14", - "Rev": "8a37349097a592db79ba3087f3cae9cd4b0af21c" + "Comment": "v3.0.17", + "Rev": "cc198e22d3b8fd7ec98304c95e68ee375be54589" }, { "ImportPath": "github.com/coreos/etcd/snap/snappb", - "Comment": "v3.0.14", - "Rev": "8a37349097a592db79ba3087f3cae9cd4b0af21c" + "Comment": "v3.0.17", + "Rev": "cc198e22d3b8fd7ec98304c95e68ee375be54589" }, { "ImportPath": "github.com/coreos/etcd/store", - "Comment": "v3.0.14", - "Rev": "8a37349097a592db79ba3087f3cae9cd4b0af21c" + "Comment": "v3.0.17", + "Rev": "cc198e22d3b8fd7ec98304c95e68ee375be54589" }, { "ImportPath": "github.com/coreos/etcd/version", - "Comment": "v3.0.14", - "Rev": "8a37349097a592db79ba3087f3cae9cd4b0af21c" + "Comment": "v3.0.17", + "Rev": "cc198e22d3b8fd7ec98304c95e68ee375be54589" }, { "ImportPath": "github.com/coreos/etcd/wal", - "Comment": "v3.0.14", - "Rev": "8a37349097a592db79ba3087f3cae9cd4b0af21c" + "Comment": "v3.0.17", + "Rev": "cc198e22d3b8fd7ec98304c95e68ee375be54589" }, { "ImportPath": "github.com/coreos/etcd/wal/walpb", - "Comment": "v3.0.14", - "Rev": "8a37349097a592db79ba3087f3cae9cd4b0af21c" + "Comment": "v3.0.17", + "Rev": "cc198e22d3b8fd7ec98304c95e68ee375be54589" }, { "ImportPath": "github.com/coreos/go-oidc/http", diff --git a/vendor/BUILD b/vendor/BUILD index bca564119a4..db8bede64fc 100644 --- a/vendor/BUILD +++ b/vendor/BUILD @@ -1609,6 +1609,7 @@ go_library( deps = [ "//vendor:github.com/coreos/etcd/etcdserver/etcdserverpb", "//vendor:github.com/coreos/etcd/lease", + "//vendor:golang.org/x/net/context", ], ) diff --git a/vendor/github.com/coreos/etcd/clientv3/lease.go b/vendor/github.com/coreos/etcd/clientv3/lease.go index bf8919c349a..cf7c9ba8bc3 100644 --- a/vendor/github.com/coreos/etcd/clientv3/lease.go +++ b/vendor/github.com/coreos/etcd/clientv3/lease.go @@ -143,9 +143,6 @@ func (l *lessor) Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, err if isHaltErr(cctx, err) { return nil, toErr(ctx, err) } - if nerr := l.newStream(); nerr != nil { - return nil, nerr - } } } @@ -164,9 +161,6 @@ func (l *lessor) Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse, if isHaltErr(ctx, err) { return nil, toErr(ctx, err) } - if nerr := l.newStream(); nerr != nil { - return nil, nerr - } } } @@ -213,10 +207,6 @@ func (l *lessor) KeepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAlive if isHaltErr(ctx, err) { return nil, toErr(ctx, err) } - - if nerr := l.newStream(); nerr != nil { - return nil, nerr - } } } @@ -312,10 +302,23 @@ func (l *lessor) recvKeepAliveLoop() { // resetRecv opens a new lease stream and starts sending LeaseKeepAliveRequests func (l *lessor) resetRecv() (pb.Lease_LeaseKeepAliveClient, error) { - if err := l.newStream(); err != nil { + sctx, cancel := context.WithCancel(l.stopCtx) + stream, err := l.remote.LeaseKeepAlive(sctx, grpc.FailFast(false)) + if err = toErr(sctx, err); err != nil { + cancel() return nil, err } - stream := l.getKeepAliveStream() + + l.mu.Lock() + defer l.mu.Unlock() + if l.stream != nil && l.streamCancel != nil { + l.stream.CloseSend() + l.streamCancel() + } + + l.streamCancel = cancel + l.stream = stream + go l.sendKeepAliveLoop(stream) return stream, nil } @@ -411,32 +414,6 @@ func (l *lessor) sendKeepAliveLoop(stream pb.Lease_LeaseKeepAliveClient) { } } -func (l *lessor) getKeepAliveStream() pb.Lease_LeaseKeepAliveClient { - l.mu.Lock() - defer l.mu.Unlock() - return l.stream -} - -func (l *lessor) newStream() error { - sctx, cancel := context.WithCancel(l.stopCtx) - stream, err := l.remote.LeaseKeepAlive(sctx, grpc.FailFast(false)) - if err != nil { - cancel() - return toErr(sctx, err) - } - - l.mu.Lock() - defer l.mu.Unlock() - if l.stream != nil && l.streamCancel != nil { - l.stream.CloseSend() - l.streamCancel() - } - - l.streamCancel = cancel - l.stream = stream - return nil -} - func (ka *keepAlive) Close() { close(ka.donec) for _, ch := range ka.chs { diff --git a/vendor/github.com/coreos/etcd/clientv3/op.go b/vendor/github.com/coreos/etcd/clientv3/op.go index e2afdd52833..d3ed55b6274 100644 --- a/vendor/github.com/coreos/etcd/clientv3/op.go +++ b/vendor/github.com/coreos/etcd/clientv3/op.go @@ -215,14 +215,15 @@ func WithPrefix() OpOption { } } -// WithRange specifies the range of 'Get' or 'Delete' requests. +// WithRange specifies the range of 'Get', 'Delete', 'Watch' requests. // For example, 'Get' requests with 'WithRange(end)' returns // the keys in the range [key, end). +// endKey must be lexicographically greater than start key. func WithRange(endKey string) OpOption { return func(op *Op) { op.end = []byte(endKey) } } -// WithFromKey specifies the range of 'Get' or 'Delete' requests +// WithFromKey specifies the range of 'Get', 'Delete', 'Watch' requests // to be equal or greater than the key in the argument. func WithFromKey() OpOption { return WithRange("\x00") } diff --git a/vendor/github.com/coreos/etcd/clientv3/watch.go b/vendor/github.com/coreos/etcd/clientv3/watch.go index 10b97f2ee70..f6e45f9bf91 100644 --- a/vendor/github.com/coreos/etcd/clientv3/watch.go +++ b/vendor/github.com/coreos/etcd/clientv3/watch.go @@ -125,8 +125,6 @@ type watchGrpcStream struct { reqc chan *watchRequest // respc receives data from the watch client respc chan *pb.WatchResponse - // stopc is sent to the main goroutine to stop all processing - stopc chan struct{} // donec closes to broadcast shutdown donec chan struct{} // errc transmits errors from grpc Recv to the watch stream reconn logic @@ -204,7 +202,6 @@ func (w *watcher) newWatcherGrpcStream(inctx context.Context) *watchGrpcStream { respc: make(chan *pb.WatchResponse), reqc: make(chan *watchRequest), - stopc: make(chan struct{}), donec: make(chan struct{}), errc: make(chan error, 1), closingc: make(chan *watcherStream), @@ -300,7 +297,7 @@ func (w *watcher) Close() (err error) { } func (w *watchGrpcStream) Close() (err error) { - close(w.stopc) + w.cancel() <-w.donec select { case err = <-w.errc: @@ -347,7 +344,7 @@ func (w *watchGrpcStream) closeSubstream(ws *watcherStream) { // close subscriber's channel if closeErr := w.closeErr; closeErr != nil && ws.initReq.ctx.Err() == nil { go w.sendCloseSubstream(ws, &WatchResponse{closeErr: w.closeErr}) - } else { + } else if ws.outc != nil { close(ws.outc) } if ws.id != -1 { @@ -472,7 +469,7 @@ func (w *watchGrpcStream) run() { wc.Send(ws.initReq.toPB()) } cancelSet = make(map[int64]struct{}) - case <-w.stopc: + case <-w.ctx.Done(): return case ws := <-w.closingc: w.closeSubstream(ws) @@ -597,6 +594,8 @@ func (w *watchGrpcStream) serveSubstream(ws *watcherStream, resumec chan struct{ nextRev = wr.Events[len(wr.Events)-1].Kv.ModRevision + 1 } ws.initReq.rev = nextRev + case <-w.ctx.Done(): + return case <-ws.initReq.ctx.Done(): return case <-resumec: @@ -608,34 +607,78 @@ func (w *watchGrpcStream) serveSubstream(ws *watcherStream, resumec chan struct{ } func (w *watchGrpcStream) newWatchClient() (pb.Watch_WatchClient, error) { - // connect to grpc stream + // mark all substreams as resuming + close(w.resumec) + w.resumec = make(chan struct{}) + w.joinSubstreams() + for _, ws := range w.substreams { + ws.id = -1 + w.resuming = append(w.resuming, ws) + } + // strip out nils, if any + var resuming []*watcherStream + for _, ws := range w.resuming { + if ws != nil { + resuming = append(resuming, ws) + } + } + w.resuming = resuming + w.substreams = make(map[int64]*watcherStream) + + // connect to grpc stream while accepting watcher cancelation + stopc := make(chan struct{}) + donec := w.waitCancelSubstreams(stopc) wc, err := w.openWatchClient() + close(stopc) + <-donec + + // serve all non-closing streams, even if there's a client error + // so that the teardown path can shutdown the streams as expected. + for _, ws := range w.resuming { + if ws.closing { + continue + } + ws.donec = make(chan struct{}) + go w.serveSubstream(ws, w.resumec) + } + if err != nil { return nil, v3rpc.Error(err) } - // mark all substreams as resuming - if len(w.substreams)+len(w.resuming) > 0 { - close(w.resumec) - w.resumec = make(chan struct{}) - w.joinSubstreams() - for _, ws := range w.substreams { - ws.id = -1 - w.resuming = append(w.resuming, ws) - } - for _, ws := range w.resuming { - if ws == nil || ws.closing { - continue - } - ws.donec = make(chan struct{}) - go w.serveSubstream(ws, w.resumec) - } - } - w.substreams = make(map[int64]*watcherStream) + // receive data from new grpc stream go w.serveWatchClient(wc) return wc, nil } +func (w *watchGrpcStream) waitCancelSubstreams(stopc <-chan struct{}) <-chan struct{} { + var wg sync.WaitGroup + wg.Add(len(w.resuming)) + donec := make(chan struct{}) + for i := range w.resuming { + go func(ws *watcherStream) { + defer wg.Done() + if ws.closing { + return + } + select { + case <-ws.initReq.ctx.Done(): + // closed ws will be removed from resuming + ws.closing = true + close(ws.outc) + ws.outc = nil + go func() { w.closingc <- ws }() + case <-stopc: + } + }(w.resuming[i]) + } + go func() { + defer close(donec) + wg.Wait() + }() + return donec +} + // joinSubstream waits for all substream goroutines to complete func (w *watchGrpcStream) joinSubstreams() { for _, ws := range w.substreams { @@ -652,9 +695,9 @@ func (w *watchGrpcStream) joinSubstreams() { func (w *watchGrpcStream) openWatchClient() (ws pb.Watch_WatchClient, err error) { for { select { - case <-w.stopc: + case <-w.ctx.Done(): if err == nil { - return nil, context.Canceled + return nil, w.ctx.Err() } return nil, err default: diff --git a/vendor/github.com/coreos/etcd/etcdserver/api/v3rpc/lease.go b/vendor/github.com/coreos/etcd/etcdserver/api/v3rpc/lease.go index 108e1387ad0..c761f79c3d0 100644 --- a/vendor/github.com/coreos/etcd/etcdserver/api/v3rpc/lease.go +++ b/vendor/github.com/coreos/etcd/etcdserver/api/v3rpc/lease.go @@ -73,14 +73,14 @@ func (ls *LeaseServer) LeaseKeepAlive(stream pb.Lease_LeaseKeepAliveServer) erro resp := &pb.LeaseKeepAliveResponse{ID: req.ID, Header: &pb.ResponseHeader{}} ls.hdr.fill(resp.Header) - ttl, err := ls.le.LeaseRenew(lease.LeaseID(req.ID)) + ttl, err := ls.le.LeaseRenew(stream.Context(), lease.LeaseID(req.ID)) if err == lease.ErrLeaseNotFound { err = nil ttl = 0 } if err != nil { - return err + return togRPCError(err) } resp.TTL = ttl diff --git a/vendor/github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes/error.go b/vendor/github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes/error.go index ecf5a20674b..97c75620ef7 100644 --- a/vendor/github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes/error.go +++ b/vendor/github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes/error.go @@ -49,9 +49,13 @@ var ( ErrGRPCRoleNotGranted = grpc.Errorf(codes.FailedPrecondition, "etcdserver: role is not granted to the user") ErrGRPCPermissionNotGranted = grpc.Errorf(codes.FailedPrecondition, "etcdserver: permission is not granted to the role") - ErrGRPCNoLeader = grpc.Errorf(codes.Unavailable, "etcdserver: no leader") - ErrGRPCNotCapable = grpc.Errorf(codes.Unavailable, "etcdserver: not capable") - ErrGRPCStopped = grpc.Errorf(codes.Unavailable, "etcdserver: server stopped") + ErrGRPCNoLeader = grpc.Errorf(codes.Unavailable, "etcdserver: no leader") + ErrGRPCNotCapable = grpc.Errorf(codes.Unavailable, "etcdserver: not capable") + ErrGRPCStopped = grpc.Errorf(codes.Unavailable, "etcdserver: server stopped") + ErrGRPCTimeout = grpc.Errorf(codes.Unavailable, "etcdserver: request timed out") + ErrGRPCTimeoutDueToLeaderFail = grpc.Errorf(codes.Unavailable, "etcdserver: request timed out, possibly due to previous leader failure") + ErrGRPCTimeoutDueToConnectionLost = grpc.Errorf(codes.Unavailable, "etcdserver: request timed out, possibly due to connection lost") + ErrGRPCUnhealthy = grpc.Errorf(codes.Unavailable, "etcdserver: unhealthy cluster") errStringToError = map[string]error{ grpc.ErrorDesc(ErrGRPCEmptyKey): ErrGRPCEmptyKey, @@ -82,9 +86,13 @@ var ( grpc.ErrorDesc(ErrGRPCRoleNotGranted): ErrGRPCRoleNotGranted, grpc.ErrorDesc(ErrGRPCPermissionNotGranted): ErrGRPCPermissionNotGranted, - grpc.ErrorDesc(ErrGRPCNoLeader): ErrGRPCNoLeader, - grpc.ErrorDesc(ErrGRPCNotCapable): ErrGRPCNotCapable, - grpc.ErrorDesc(ErrGRPCStopped): ErrGRPCStopped, + grpc.ErrorDesc(ErrGRPCNoLeader): ErrGRPCNoLeader, + grpc.ErrorDesc(ErrGRPCNotCapable): ErrGRPCNotCapable, + grpc.ErrorDesc(ErrGRPCStopped): ErrGRPCStopped, + grpc.ErrorDesc(ErrGRPCTimeout): ErrGRPCTimeout, + grpc.ErrorDesc(ErrGRPCTimeoutDueToLeaderFail): ErrGRPCTimeoutDueToLeaderFail, + grpc.ErrorDesc(ErrGRPCTimeoutDueToConnectionLost): ErrGRPCTimeoutDueToConnectionLost, + grpc.ErrorDesc(ErrGRPCUnhealthy): ErrGRPCUnhealthy, } // client-side error @@ -116,9 +124,13 @@ var ( ErrRoleNotGranted = Error(ErrGRPCRoleNotGranted) ErrPermissionNotGranted = Error(ErrGRPCPermissionNotGranted) - ErrNoLeader = Error(ErrGRPCNoLeader) - ErrNotCapable = Error(ErrGRPCNotCapable) - ErrStopped = Error(ErrGRPCStopped) + ErrNoLeader = Error(ErrGRPCNoLeader) + ErrNotCapable = Error(ErrGRPCNotCapable) + ErrStopped = Error(ErrGRPCStopped) + ErrTimeout = Error(ErrGRPCTimeout) + ErrTimeoutDueToLeaderFail = Error(ErrGRPCTimeoutDueToLeaderFail) + ErrTimeoutDueToConnectionLost = Error(ErrGRPCTimeoutDueToConnectionLost) + ErrUnhealthy = Error(ErrGRPCUnhealthy) ) // EtcdError defines gRPC server errors. diff --git a/vendor/github.com/coreos/etcd/etcdserver/api/v3rpc/util.go b/vendor/github.com/coreos/etcd/etcdserver/api/v3rpc/util.go index 997e57892f0..47f7e85f0aa 100644 --- a/vendor/github.com/coreos/etcd/etcdserver/api/v3rpc/util.go +++ b/vendor/github.com/coreos/etcd/etcdserver/api/v3rpc/util.go @@ -38,6 +38,17 @@ func togRPCError(err error) error { case etcdserver.ErrNoSpace: return rpctypes.ErrGRPCNoSpace + case etcdserver.ErrNoLeader: + return rpctypes.ErrGRPCNoLeader + case etcdserver.ErrStopped: + return rpctypes.ErrGRPCStopped + case etcdserver.ErrTimeout: + return rpctypes.ErrGRPCTimeout + case etcdserver.ErrTimeoutDueToLeaderFail: + return rpctypes.ErrGRPCTimeoutDueToLeaderFail + case etcdserver.ErrTimeoutDueToConnectionLost: + return rpctypes.ErrGRPCTimeoutDueToConnectionLost + case auth.ErrRootUserNotExist: return rpctypes.ErrGRPCRootUserNotExist case auth.ErrRootRoleNotExist: diff --git a/vendor/github.com/coreos/etcd/etcdserver/server.go b/vendor/github.com/coreos/etcd/etcdserver/server.go index d2324e57c2d..3644e7d513a 100644 --- a/vendor/github.com/coreos/etcd/etcdserver/server.go +++ b/vendor/github.com/coreos/etcd/etcdserver/server.go @@ -18,6 +18,7 @@ import ( "encoding/json" "expvar" "fmt" + "math" "math/rand" "net/http" "os" @@ -398,11 +399,9 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) { srv.applyV2 = &applierV2store{store: srv.store, cluster: srv.cluster} srv.be = be - srv.lessor = lease.NewLessor(srv.be) + minTTL := time.Duration((3*cfg.ElectionTicks)/2) * time.Duration(cfg.TickMs) * time.Millisecond + srv.lessor = lease.NewLessor(srv.be, int64(math.Ceil(minTTL.Seconds()))) - // always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases. - // If we recover mvcc.KV first, it will attach the keys to the wrong lessor before it recovers. - srv.lessor = lease.NewLessor(srv.be) srv.kv = mvcc.New(srv.be, srv.lessor, &srv.consistIndex) if beExist { kvindex := srv.kv.ConsistentIndex() diff --git a/vendor/github.com/coreos/etcd/etcdserver/v3_server.go b/vendor/github.com/coreos/etcd/etcdserver/v3_server.go index dcd0f0d8a48..743dd2af5fc 100644 --- a/vendor/github.com/coreos/etcd/etcdserver/v3_server.go +++ b/vendor/github.com/coreos/etcd/etcdserver/v3_server.go @@ -20,6 +20,7 @@ import ( "time" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" + "github.com/coreos/etcd/etcdserver/membership" "github.com/coreos/etcd/lease" "github.com/coreos/etcd/lease/leasehttp" "github.com/coreos/etcd/mvcc" @@ -54,7 +55,7 @@ type Lessor interface { // LeaseRenew renews the lease with given ID. The renewed TTL is returned. Or an error // is returned. - LeaseRenew(id lease.LeaseID) (int64, error) + LeaseRenew(ctx context.Context, id lease.LeaseID) (int64, error) } type Authenticator interface { @@ -218,7 +219,7 @@ func (s *EtcdServer) LeaseRevoke(ctx context.Context, r *pb.LeaseRevokeRequest) return result.resp.(*pb.LeaseRevokeResponse), nil } -func (s *EtcdServer) LeaseRenew(id lease.LeaseID) (int64, error) { +func (s *EtcdServer) LeaseRenew(ctx context.Context, id lease.LeaseID) (int64, error) { ttl, err := s.lessor.Renew(id) if err == nil { return ttl, nil @@ -228,29 +229,44 @@ func (s *EtcdServer) LeaseRenew(id lease.LeaseID) (int64, error) { } // renewals don't go through raft; forward to leader manually + cctx, cancel := context.WithTimeout(ctx, s.Cfg.ReqTimeout()) + defer cancel() + + // renewals don't go through raft; forward to leader manually + for cctx.Err() == nil && err != nil { + leader, lerr := s.waitLeader(cctx) + if lerr != nil { + return -1, lerr + } + for _, url := range leader.PeerURLs { + lurl := url + "/leases" + ttl, err = leasehttp.RenewHTTP(cctx, id, lurl, s.peerRt) + if err == nil || err == lease.ErrLeaseNotFound { + return ttl, err + } + } + } + return -1, ErrTimeout +} + +func (s *EtcdServer) waitLeader(ctx context.Context) (*membership.Member, error) { leader := s.cluster.Member(s.Leader()) - for i := 0; i < 5 && leader == nil; i++ { + for leader == nil { // wait an election dur := time.Duration(s.Cfg.ElectionTicks) * time.Duration(s.Cfg.TickMs) * time.Millisecond select { case <-time.After(dur): leader = s.cluster.Member(s.Leader()) case <-s.done: - return -1, ErrStopped + return nil, ErrStopped + case <-ctx.Done(): + return nil, ErrNoLeader } } if leader == nil || len(leader.PeerURLs) == 0 { - return -1, ErrNoLeader + return nil, ErrNoLeader } - - for _, url := range leader.PeerURLs { - lurl := url + "/leases" - ttl, err = leasehttp.RenewHTTP(id, lurl, s.peerRt, s.Cfg.peerDialTimeout()) - if err == nil { - break - } - } - return ttl, err + return leader, nil } func (s *EtcdServer) Alarm(ctx context.Context, r *pb.AlarmRequest) (*pb.AlarmResponse, error) { diff --git a/vendor/github.com/coreos/etcd/lease/leasehttp/http.go b/vendor/github.com/coreos/etcd/lease/leasehttp/http.go index cf95595b6f0..e730b2d2d7f 100644 --- a/vendor/github.com/coreos/etcd/lease/leasehttp/http.go +++ b/vendor/github.com/coreos/etcd/lease/leasehttp/http.go @@ -19,10 +19,10 @@ import ( "fmt" "io/ioutil" "net/http" - "time" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" "github.com/coreos/etcd/lease" + "golang.org/x/net/context" ) // NewHandler returns an http Handler for lease renewals @@ -75,15 +75,22 @@ func (h *leaseHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { // RenewHTTP renews a lease at a given primary server. // TODO: Batch request in future? -func RenewHTTP(id lease.LeaseID, url string, rt http.RoundTripper, timeout time.Duration) (int64, error) { +func RenewHTTP(ctx context.Context, id lease.LeaseID, url string, rt http.RoundTripper) (int64, error) { // will post lreq protobuf to leader lreq, err := (&pb.LeaseKeepAliveRequest{ID: int64(id)}).Marshal() if err != nil { return -1, err } - cc := &http.Client{Transport: rt, Timeout: timeout} - resp, err := cc.Post(url, "application/protobuf", bytes.NewReader(lreq)) + cc := &http.Client{Transport: rt} + req, err := http.NewRequest("POST", url, bytes.NewReader(lreq)) + if err != nil { + return -1, err + } + req.Header.Set("Content-Type", "application/protobuf") + req.Cancel = ctx.Done() + + resp, err := cc.Do(req) if err != nil { // TODO detect if leader failed and retry? return -1, err diff --git a/vendor/github.com/coreos/etcd/lease/lessor.go b/vendor/github.com/coreos/etcd/lease/lessor.go index 48147b2c12a..682d7376b76 100644 --- a/vendor/github.com/coreos/etcd/lease/lessor.go +++ b/vendor/github.com/coreos/etcd/lease/lessor.go @@ -31,8 +31,6 @@ const ( ) var ( - minLeaseTTL = int64(5) - leaseBucketName = []byte("lease") // do not use maxInt64 since it can overflow time which will add // the offset of unix time (1970yr to seconds). @@ -143,6 +141,10 @@ type lessor struct { // The leased items can be recovered by iterating all the keys in kv. b backend.Backend + // minLeaseTTL is the minimum lease TTL that can be granted for a lease. Any + // requests for shorter TTLs are extended to the minimum TTL. + minLeaseTTL int64 + expiredC chan []*Lease // stopC is a channel whose closure indicates that the lessor should be stopped. stopC chan struct{} @@ -150,14 +152,15 @@ type lessor struct { doneC chan struct{} } -func NewLessor(b backend.Backend) Lessor { - return newLessor(b) +func NewLessor(b backend.Backend, minLeaseTTL int64) Lessor { + return newLessor(b, minLeaseTTL) } -func newLessor(b backend.Backend) *lessor { +func newLessor(b backend.Backend, minLeaseTTL int64) *lessor { l := &lessor{ - leaseMap: make(map[LeaseID]*Lease), - b: b, + leaseMap: make(map[LeaseID]*Lease), + b: b, + minLeaseTTL: minLeaseTTL, // expiredC is a small buffered chan to avoid unnecessary blocking. expiredC: make(chan []*Lease, 16), stopC: make(chan struct{}), @@ -193,6 +196,10 @@ func (le *lessor) Grant(id LeaseID, ttl int64) (*Lease, error) { return nil, ErrLeaseExists } + if l.TTL < le.minLeaseTTL { + l.TTL = le.minLeaseTTL + } + if le.primary { l.refresh(0) } else { @@ -425,6 +432,9 @@ func (le *lessor) initAndRecover() { panic("failed to unmarshal lease proto item") } ID := LeaseID(lpb.ID) + if lpb.TTL < le.minLeaseTTL { + lpb.TTL = le.minLeaseTTL + } le.leaseMap[ID] = &Lease{ ID: ID, TTL: lpb.TTL, @@ -464,19 +474,11 @@ func (l Lease) persistTo(b backend.Backend) { // refresh refreshes the expiry of the lease. func (l *Lease) refresh(extend time.Duration) { - if l.TTL < minLeaseTTL { - l.TTL = minLeaseTTL - } l.expiry = time.Now().Add(extend + time.Second*time.Duration(l.TTL)) } // forever sets the expiry of lease to be forever. -func (l *Lease) forever() { - if l.TTL < minLeaseTTL { - l.TTL = minLeaseTTL - } - l.expiry = forever -} +func (l *Lease) forever() { l.expiry = forever } type LeaseItem struct { Key string diff --git a/vendor/github.com/coreos/etcd/mvcc/watcher.go b/vendor/github.com/coreos/etcd/mvcc/watcher.go index 9353f835683..8410aa845fd 100644 --- a/vendor/github.com/coreos/etcd/mvcc/watcher.go +++ b/vendor/github.com/coreos/etcd/mvcc/watcher.go @@ -15,6 +15,7 @@ package mvcc import ( + "bytes" "errors" "sync" @@ -96,6 +97,12 @@ type watchStream struct { // Watch creates a new watcher in the stream and returns its WatchID. // TODO: return error if ws is closed? func (ws *watchStream) Watch(key, end []byte, startRev int64) WatchID { + // prevent wrong range where key >= end lexicographically + // watch request with 'WithFromKey' has empty-byte range end + if len(end) != 0 && bytes.Compare(key, end) != -1 { + return -1 + } + ws.mu.Lock() defer ws.mu.Unlock() if ws.closed { diff --git a/vendor/github.com/coreos/etcd/version/version.go b/vendor/github.com/coreos/etcd/version/version.go index fd27f1fa6a2..a3122aa4924 100644 --- a/vendor/github.com/coreos/etcd/version/version.go +++ b/vendor/github.com/coreos/etcd/version/version.go @@ -29,7 +29,7 @@ import ( var ( // MinClusterVersion is the min cluster version this etcd binary is compatible with. MinClusterVersion = "2.3.0" - Version = "3.0.14" + Version = "3.0.17" // Git SHA Value will be set during build GitSHA = "Not provided (use ./build instead of go build)"