godep: update etcd to 3.0.12

This commit is contained in:
Hongchao Deng 2016-10-10 09:33:51 -07:00
parent 3c9600e4c4
commit c060417ecb
25 changed files with 1119 additions and 766 deletions

210
Godeps/Godeps.json generated
View File

@ -353,263 +353,263 @@
},
{
"ImportPath": "github.com/coreos/etcd/alarm",
"Comment": "v3.0.10",
"Rev": "546c0f7ed65523c24390f0f26c7e4af2232f52d2"
"Comment": "v3.0.12",
"Rev": "2d1e2e8e646e65f73ca0d9ee905a5343d6135a50"
},
{
"ImportPath": "github.com/coreos/etcd/auth",
"Comment": "v3.0.10",
"Rev": "546c0f7ed65523c24390f0f26c7e4af2232f52d2"
"Comment": "v3.0.12",
"Rev": "2d1e2e8e646e65f73ca0d9ee905a5343d6135a50"
},
{
"ImportPath": "github.com/coreos/etcd/auth/authpb",
"Comment": "v3.0.10",
"Rev": "546c0f7ed65523c24390f0f26c7e4af2232f52d2"
"Comment": "v3.0.12",
"Rev": "2d1e2e8e646e65f73ca0d9ee905a5343d6135a50"
},
{
"ImportPath": "github.com/coreos/etcd/client",
"Comment": "v3.0.10",
"Rev": "546c0f7ed65523c24390f0f26c7e4af2232f52d2"
"Comment": "v3.0.12",
"Rev": "2d1e2e8e646e65f73ca0d9ee905a5343d6135a50"
},
{
"ImportPath": "github.com/coreos/etcd/clientv3",
"Comment": "v3.0.10",
"Rev": "546c0f7ed65523c24390f0f26c7e4af2232f52d2"
"Comment": "v3.0.12",
"Rev": "2d1e2e8e646e65f73ca0d9ee905a5343d6135a50"
},
{
"ImportPath": "github.com/coreos/etcd/compactor",
"Comment": "v3.0.10",
"Rev": "546c0f7ed65523c24390f0f26c7e4af2232f52d2"
"Comment": "v3.0.12",
"Rev": "2d1e2e8e646e65f73ca0d9ee905a5343d6135a50"
},
{
"ImportPath": "github.com/coreos/etcd/discovery",
"Comment": "v3.0.10",
"Rev": "546c0f7ed65523c24390f0f26c7e4af2232f52d2"
"Comment": "v3.0.12",
"Rev": "2d1e2e8e646e65f73ca0d9ee905a5343d6135a50"
},
{
"ImportPath": "github.com/coreos/etcd/error",
"Comment": "v3.0.10",
"Rev": "546c0f7ed65523c24390f0f26c7e4af2232f52d2"
"Comment": "v3.0.12",
"Rev": "2d1e2e8e646e65f73ca0d9ee905a5343d6135a50"
},
{
"ImportPath": "github.com/coreos/etcd/etcdserver",
"Comment": "v3.0.10",
"Rev": "546c0f7ed65523c24390f0f26c7e4af2232f52d2"
"Comment": "v3.0.12",
"Rev": "2d1e2e8e646e65f73ca0d9ee905a5343d6135a50"
},
{
"ImportPath": "github.com/coreos/etcd/etcdserver/api",
"Comment": "v3.0.10",
"Rev": "546c0f7ed65523c24390f0f26c7e4af2232f52d2"
"Comment": "v3.0.12",
"Rev": "2d1e2e8e646e65f73ca0d9ee905a5343d6135a50"
},
{
"ImportPath": "github.com/coreos/etcd/etcdserver/api/v2http",
"Comment": "v3.0.10",
"Rev": "546c0f7ed65523c24390f0f26c7e4af2232f52d2"
"Comment": "v3.0.12",
"Rev": "2d1e2e8e646e65f73ca0d9ee905a5343d6135a50"
},
{
"ImportPath": "github.com/coreos/etcd/etcdserver/api/v2http/httptypes",
"Comment": "v3.0.10",
"Rev": "546c0f7ed65523c24390f0f26c7e4af2232f52d2"
"Comment": "v3.0.12",
"Rev": "2d1e2e8e646e65f73ca0d9ee905a5343d6135a50"
},
{
"ImportPath": "github.com/coreos/etcd/etcdserver/api/v3rpc",
"Comment": "v3.0.10",
"Rev": "546c0f7ed65523c24390f0f26c7e4af2232f52d2"
"Comment": "v3.0.12",
"Rev": "2d1e2e8e646e65f73ca0d9ee905a5343d6135a50"
},
{
"ImportPath": "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes",
"Comment": "v3.0.10",
"Rev": "546c0f7ed65523c24390f0f26c7e4af2232f52d2"
"Comment": "v3.0.12",
"Rev": "2d1e2e8e646e65f73ca0d9ee905a5343d6135a50"
},
{
"ImportPath": "github.com/coreos/etcd/etcdserver/auth",
"Comment": "v3.0.10",
"Rev": "546c0f7ed65523c24390f0f26c7e4af2232f52d2"
"Comment": "v3.0.12",
"Rev": "2d1e2e8e646e65f73ca0d9ee905a5343d6135a50"
},
{
"ImportPath": "github.com/coreos/etcd/etcdserver/etcdserverpb",
"Comment": "v3.0.10",
"Rev": "546c0f7ed65523c24390f0f26c7e4af2232f52d2"
"Comment": "v3.0.12",
"Rev": "2d1e2e8e646e65f73ca0d9ee905a5343d6135a50"
},
{
"ImportPath": "github.com/coreos/etcd/etcdserver/membership",
"Comment": "v3.0.10",
"Rev": "546c0f7ed65523c24390f0f26c7e4af2232f52d2"
"Comment": "v3.0.12",
"Rev": "2d1e2e8e646e65f73ca0d9ee905a5343d6135a50"
},
{
"ImportPath": "github.com/coreos/etcd/etcdserver/stats",
"Comment": "v3.0.10",
"Rev": "546c0f7ed65523c24390f0f26c7e4af2232f52d2"
"Comment": "v3.0.12",
"Rev": "2d1e2e8e646e65f73ca0d9ee905a5343d6135a50"
},
{
"ImportPath": "github.com/coreos/etcd/integration",
"Comment": "v3.0.10",
"Rev": "546c0f7ed65523c24390f0f26c7e4af2232f52d2"
"Comment": "v3.0.12",
"Rev": "2d1e2e8e646e65f73ca0d9ee905a5343d6135a50"
},
{
"ImportPath": "github.com/coreos/etcd/lease",
"Comment": "v3.0.10",
"Rev": "546c0f7ed65523c24390f0f26c7e4af2232f52d2"
"Comment": "v3.0.12",
"Rev": "2d1e2e8e646e65f73ca0d9ee905a5343d6135a50"
},
{
"ImportPath": "github.com/coreos/etcd/lease/leasehttp",
"Comment": "v3.0.10",
"Rev": "546c0f7ed65523c24390f0f26c7e4af2232f52d2"
"Comment": "v3.0.12",
"Rev": "2d1e2e8e646e65f73ca0d9ee905a5343d6135a50"
},
{
"ImportPath": "github.com/coreos/etcd/lease/leasepb",
"Comment": "v3.0.10",
"Rev": "546c0f7ed65523c24390f0f26c7e4af2232f52d2"
"Comment": "v3.0.12",
"Rev": "2d1e2e8e646e65f73ca0d9ee905a5343d6135a50"
},
{
"ImportPath": "github.com/coreos/etcd/mvcc",
"Comment": "v3.0.10",
"Rev": "546c0f7ed65523c24390f0f26c7e4af2232f52d2"
"Comment": "v3.0.12",
"Rev": "2d1e2e8e646e65f73ca0d9ee905a5343d6135a50"
},
{
"ImportPath": "github.com/coreos/etcd/mvcc/backend",
"Comment": "v3.0.10",
"Rev": "546c0f7ed65523c24390f0f26c7e4af2232f52d2"
"Comment": "v3.0.12",
"Rev": "2d1e2e8e646e65f73ca0d9ee905a5343d6135a50"
},
{
"ImportPath": "github.com/coreos/etcd/mvcc/mvccpb",
"Comment": "v3.0.10",
"Rev": "546c0f7ed65523c24390f0f26c7e4af2232f52d2"
"Comment": "v3.0.12",
"Rev": "2d1e2e8e646e65f73ca0d9ee905a5343d6135a50"
},
{
"ImportPath": "github.com/coreos/etcd/pkg/adt",
"Comment": "v3.0.10",
"Rev": "546c0f7ed65523c24390f0f26c7e4af2232f52d2"
"Comment": "v3.0.12",
"Rev": "2d1e2e8e646e65f73ca0d9ee905a5343d6135a50"
},
{
"ImportPath": "github.com/coreos/etcd/pkg/contention",
"Comment": "v3.0.10",
"Rev": "546c0f7ed65523c24390f0f26c7e4af2232f52d2"
"Comment": "v3.0.12",
"Rev": "2d1e2e8e646e65f73ca0d9ee905a5343d6135a50"
},
{
"ImportPath": "github.com/coreos/etcd/pkg/crc",
"Comment": "v3.0.10",
"Rev": "546c0f7ed65523c24390f0f26c7e4af2232f52d2"
"Comment": "v3.0.12",
"Rev": "2d1e2e8e646e65f73ca0d9ee905a5343d6135a50"
},
{
"ImportPath": "github.com/coreos/etcd/pkg/fileutil",
"Comment": "v3.0.10",
"Rev": "546c0f7ed65523c24390f0f26c7e4af2232f52d2"
"Comment": "v3.0.12",
"Rev": "2d1e2e8e646e65f73ca0d9ee905a5343d6135a50"
},
{
"ImportPath": "github.com/coreos/etcd/pkg/httputil",
"Comment": "v3.0.10",
"Rev": "546c0f7ed65523c24390f0f26c7e4af2232f52d2"
"Comment": "v3.0.12",
"Rev": "2d1e2e8e646e65f73ca0d9ee905a5343d6135a50"
},
{
"ImportPath": "github.com/coreos/etcd/pkg/idutil",
"Comment": "v3.0.10",
"Rev": "546c0f7ed65523c24390f0f26c7e4af2232f52d2"
"Comment": "v3.0.12",
"Rev": "2d1e2e8e646e65f73ca0d9ee905a5343d6135a50"
},
{
"ImportPath": "github.com/coreos/etcd/pkg/ioutil",
"Comment": "v3.0.10",
"Rev": "546c0f7ed65523c24390f0f26c7e4af2232f52d2"
"Comment": "v3.0.12",
"Rev": "2d1e2e8e646e65f73ca0d9ee905a5343d6135a50"
},
{
"ImportPath": "github.com/coreos/etcd/pkg/logutil",
"Comment": "v3.0.10",
"Rev": "546c0f7ed65523c24390f0f26c7e4af2232f52d2"
"Comment": "v3.0.12",
"Rev": "2d1e2e8e646e65f73ca0d9ee905a5343d6135a50"
},
{
"ImportPath": "github.com/coreos/etcd/pkg/netutil",
"Comment": "v3.0.10",
"Rev": "546c0f7ed65523c24390f0f26c7e4af2232f52d2"
"Comment": "v3.0.12",
"Rev": "2d1e2e8e646e65f73ca0d9ee905a5343d6135a50"
},
{
"ImportPath": "github.com/coreos/etcd/pkg/pathutil",
"Comment": "v3.0.10",
"Rev": "546c0f7ed65523c24390f0f26c7e4af2232f52d2"
"Comment": "v3.0.12",
"Rev": "2d1e2e8e646e65f73ca0d9ee905a5343d6135a50"
},
{
"ImportPath": "github.com/coreos/etcd/pkg/pbutil",
"Comment": "v3.0.10",
"Rev": "546c0f7ed65523c24390f0f26c7e4af2232f52d2"
"Comment": "v3.0.12",
"Rev": "2d1e2e8e646e65f73ca0d9ee905a5343d6135a50"
},
{
"ImportPath": "github.com/coreos/etcd/pkg/runtime",
"Comment": "v3.0.10",
"Rev": "546c0f7ed65523c24390f0f26c7e4af2232f52d2"
"Comment": "v3.0.12",
"Rev": "2d1e2e8e646e65f73ca0d9ee905a5343d6135a50"
},
{
"ImportPath": "github.com/coreos/etcd/pkg/schedule",
"Comment": "v3.0.10",
"Rev": "546c0f7ed65523c24390f0f26c7e4af2232f52d2"
"Comment": "v3.0.12",
"Rev": "2d1e2e8e646e65f73ca0d9ee905a5343d6135a50"
},
{
"ImportPath": "github.com/coreos/etcd/pkg/testutil",
"Comment": "v3.0.10",
"Rev": "546c0f7ed65523c24390f0f26c7e4af2232f52d2"
"Comment": "v3.0.12",
"Rev": "2d1e2e8e646e65f73ca0d9ee905a5343d6135a50"
},
{
"ImportPath": "github.com/coreos/etcd/pkg/tlsutil",
"Comment": "v3.0.10",
"Rev": "546c0f7ed65523c24390f0f26c7e4af2232f52d2"
"Comment": "v3.0.12",
"Rev": "2d1e2e8e646e65f73ca0d9ee905a5343d6135a50"
},
{
"ImportPath": "github.com/coreos/etcd/pkg/transport",
"Comment": "v3.0.10",
"Rev": "546c0f7ed65523c24390f0f26c7e4af2232f52d2"
"Comment": "v3.0.12",
"Rev": "2d1e2e8e646e65f73ca0d9ee905a5343d6135a50"
},
{
"ImportPath": "github.com/coreos/etcd/pkg/types",
"Comment": "v3.0.10",
"Rev": "546c0f7ed65523c24390f0f26c7e4af2232f52d2"
"Comment": "v3.0.12",
"Rev": "2d1e2e8e646e65f73ca0d9ee905a5343d6135a50"
},
{
"ImportPath": "github.com/coreos/etcd/pkg/wait",
"Comment": "v3.0.10",
"Rev": "546c0f7ed65523c24390f0f26c7e4af2232f52d2"
"Comment": "v3.0.12",
"Rev": "2d1e2e8e646e65f73ca0d9ee905a5343d6135a50"
},
{
"ImportPath": "github.com/coreos/etcd/raft",
"Comment": "v3.0.10",
"Rev": "546c0f7ed65523c24390f0f26c7e4af2232f52d2"
"Comment": "v3.0.12",
"Rev": "2d1e2e8e646e65f73ca0d9ee905a5343d6135a50"
},
{
"ImportPath": "github.com/coreos/etcd/raft/raftpb",
"Comment": "v3.0.10",
"Rev": "546c0f7ed65523c24390f0f26c7e4af2232f52d2"
"Comment": "v3.0.12",
"Rev": "2d1e2e8e646e65f73ca0d9ee905a5343d6135a50"
},
{
"ImportPath": "github.com/coreos/etcd/rafthttp",
"Comment": "v3.0.10",
"Rev": "546c0f7ed65523c24390f0f26c7e4af2232f52d2"
"Comment": "v3.0.12",
"Rev": "2d1e2e8e646e65f73ca0d9ee905a5343d6135a50"
},
{
"ImportPath": "github.com/coreos/etcd/snap",
"Comment": "v3.0.10",
"Rev": "546c0f7ed65523c24390f0f26c7e4af2232f52d2"
"Comment": "v3.0.12",
"Rev": "2d1e2e8e646e65f73ca0d9ee905a5343d6135a50"
},
{
"ImportPath": "github.com/coreos/etcd/snap/snappb",
"Comment": "v3.0.10",
"Rev": "546c0f7ed65523c24390f0f26c7e4af2232f52d2"
"Comment": "v3.0.12",
"Rev": "2d1e2e8e646e65f73ca0d9ee905a5343d6135a50"
},
{
"ImportPath": "github.com/coreos/etcd/store",
"Comment": "v3.0.10",
"Rev": "546c0f7ed65523c24390f0f26c7e4af2232f52d2"
"Comment": "v3.0.12",
"Rev": "2d1e2e8e646e65f73ca0d9ee905a5343d6135a50"
},
{
"ImportPath": "github.com/coreos/etcd/version",
"Comment": "v3.0.10",
"Rev": "546c0f7ed65523c24390f0f26c7e4af2232f52d2"
"Comment": "v3.0.12",
"Rev": "2d1e2e8e646e65f73ca0d9ee905a5343d6135a50"
},
{
"ImportPath": "github.com/coreos/etcd/wal",
"Comment": "v3.0.10",
"Rev": "546c0f7ed65523c24390f0f26c7e4af2232f52d2"
"Comment": "v3.0.12",
"Rev": "2d1e2e8e646e65f73ca0d9ee905a5343d6135a50"
},
{
"ImportPath": "github.com/coreos/etcd/wal/walpb",
"Comment": "v3.0.10",
"Rev": "546c0f7ed65523c24390f0f26c7e4af2232f52d2"
"Comment": "v3.0.12",
"Rev": "2d1e2e8e646e65f73ca0d9ee905a5343d6135a50"
},
{
"ImportPath": "github.com/coreos/go-oidc/http",
@ -2252,7 +2252,7 @@
},
{
"ImportPath": "github.com/xiang90/probing",
"Rev": "6a0cc1ae81b4cc11db5e491e030e4b98fba79c19"
"Rev": "07dd2e8dfe18522e9c447ba95f2fe95262f63bb2"
},
{
"ImportPath": "github.com/xyproto/simpleredis",

View File

@ -21,9 +21,9 @@ import (
proto "github.com/golang/protobuf/proto"
math "math"
)
import io "io"
io "io"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal

View File

@ -157,14 +157,14 @@ func (kv *kv) do(ctx context.Context, op Op) (OpResponse, error) {
}
case tPut:
var resp *pb.PutResponse
r := &pb.PutRequest{Key: op.key, Value: op.val, Lease: int64(op.leaseID)}
r := &pb.PutRequest{Key: op.key, Value: op.val, Lease: int64(op.leaseID), PrevKv: op.prevKV}
resp, err = kv.remote.Put(ctx, r)
if err == nil {
return OpResponse{put: (*PutResponse)(resp)}, nil
}
case tDeleteRange:
var resp *pb.DeleteRangeResponse
r := &pb.DeleteRangeRequest{Key: op.key, RangeEnd: op.end}
r := &pb.DeleteRangeRequest{Key: op.key, RangeEnd: op.end, PrevKv: op.prevKV}
resp, err = kv.remote.DeleteRange(ctx, r)
if err == nil {
return OpResponse{del: (*DeleteResponse)(resp)}, nil

View File

@ -47,6 +47,9 @@ type Op struct {
// for range, watch
rev int64
// for watch, put, delete
prevKV bool
// progressNotify is for progress updates.
progressNotify bool
@ -73,10 +76,10 @@ func (op Op) toRequestOp() *pb.RequestOp {
}
return &pb.RequestOp{Request: &pb.RequestOp_RequestRange{RequestRange: r}}
case tPut:
r := &pb.PutRequest{Key: op.key, Value: op.val, Lease: int64(op.leaseID)}
r := &pb.PutRequest{Key: op.key, Value: op.val, Lease: int64(op.leaseID), PrevKv: op.prevKV}
return &pb.RequestOp{Request: &pb.RequestOp_RequestPut{RequestPut: r}}
case tDeleteRange:
r := &pb.DeleteRangeRequest{Key: op.key, RangeEnd: op.end}
r := &pb.DeleteRangeRequest{Key: op.key, RangeEnd: op.end, PrevKv: op.prevKV}
return &pb.RequestOp{Request: &pb.RequestOp_RequestDeleteRange{RequestDeleteRange: r}}
default:
panic("Unknown Op")
@ -271,3 +274,11 @@ func WithProgressNotify() OpOption {
op.progressNotify = true
}
}
// WithPrevKV gets the previous key-value pair before the event happens. If the previous KV is already compacted,
// nothing will be returned.
func WithPrevKV() OpOption {
return func(op *Op) {
op.prevKV = true
}
}

View File

@ -61,6 +61,9 @@ type WatchResponse struct {
// the channel sends a final response that has Canceled set to true with a non-nil Err().
Canceled bool
// created is used to indicate the creation of the watcher.
created bool
closeErr error
}
@ -89,7 +92,7 @@ func (wr *WatchResponse) Err() error {
// IsProgressNotify returns true if the WatchResponse is progress notification.
func (wr *WatchResponse) IsProgressNotify() bool {
return len(wr.Events) == 0 && !wr.Canceled
return len(wr.Events) == 0 && !wr.Canceled && !wr.created && wr.CompactRevision == 0 && wr.Header.Revision != 0
}
// watcher implements the Watcher interface
@ -102,6 +105,7 @@ type watcher struct {
streams map[string]*watchGrpcStream
}
// watchGrpcStream tracks all watch resources attached to a single grpc stream.
type watchGrpcStream struct {
owner *watcher
remote pb.WatchClient
@ -112,10 +116,10 @@ type watchGrpcStream struct {
ctxKey string
cancel context.CancelFunc
// mu protects the streams map
mu sync.RWMutex
// streams holds all active watchers
streams map[int64]*watcherStream
// substreams holds all active watchers on this grpc stream
substreams map[int64]*watcherStream
// resuming holds all resuming watchers on this grpc stream
resuming []*watcherStream
// reqc sends a watch request from Watch() to the main goroutine
reqc chan *watchRequest
@ -130,7 +134,9 @@ type watchGrpcStream struct {
// closingc gets the watcherStream of closing watchers
closingc chan *watcherStream
// the error that closed the watch stream
// resumec closes to signal that all substreams should begin resuming
resumec chan struct{}
// closeErr is the error that closed the watch stream
closeErr error
}
@ -142,6 +148,8 @@ type watchRequest struct {
rev int64
// progressNotify is for progress updates.
progressNotify bool
// get the previous key-value pair before the event happens
prevKV bool
// retc receives a chan WatchResponse once the watcher is established
retc chan chan WatchResponse
}
@ -152,15 +160,18 @@ type watcherStream struct {
initReq watchRequest
// outc publishes watch responses to subscriber
outc chan<- WatchResponse
outc chan WatchResponse
// recvc buffers watch responses before publishing
recvc chan *WatchResponse
id int64
// donec closes when the watcherStream goroutine stops.
donec chan struct{}
// closing is set to true when stream should be scheduled to shutdown.
closing bool
// id is the registered watch id on the grpc stream
id int64
// lastRev is revision last successfully sent over outc
lastRev int64
// resumec indicates the stream must recover at a given revision
resumec chan int64
// buf holds all events received from etcd but not yet consumed by the client
buf []*WatchResponse
}
func NewWatcher(c *Client) Watcher {
@ -184,12 +195,12 @@ func (vc *valCtx) Err() error { return nil }
func (w *watcher) newWatcherGrpcStream(inctx context.Context) *watchGrpcStream {
ctx, cancel := context.WithCancel(&valCtx{inctx})
wgs := &watchGrpcStream{
owner: w,
remote: w.remote,
ctx: ctx,
ctxKey: fmt.Sprintf("%v", inctx),
cancel: cancel,
streams: make(map[int64]*watcherStream),
owner: w,
remote: w.remote,
ctx: ctx,
ctxKey: fmt.Sprintf("%v", inctx),
cancel: cancel,
substreams: make(map[int64]*watcherStream),
respc: make(chan *pb.WatchResponse),
reqc: make(chan *watchRequest),
@ -197,6 +208,7 @@ func (w *watcher) newWatcherGrpcStream(inctx context.Context) *watchGrpcStream {
donec: make(chan struct{}),
errc: make(chan error, 1),
closingc: make(chan *watcherStream),
resumec: make(chan struct{}),
}
go wgs.run()
return wgs
@ -206,14 +218,14 @@ func (w *watcher) newWatcherGrpcStream(inctx context.Context) *watchGrpcStream {
func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) WatchChan {
ow := opWatch(key, opts...)
retc := make(chan chan WatchResponse, 1)
wr := &watchRequest{
ctx: ctx,
key: string(ow.key),
end: string(ow.end),
rev: ow.rev,
progressNotify: ow.progressNotify,
retc: retc,
prevKV: ow.prevKV,
retc: make(chan chan WatchResponse, 1),
}
ok := false
@ -257,7 +269,7 @@ func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) Watch
// receive channel
if ok {
select {
case ret := <-retc:
case ret := <-wr.retc:
return ret
case <-ctx.Done():
case <-donec:
@ -288,12 +300,7 @@ func (w *watcher) Close() (err error) {
}
func (w *watchGrpcStream) Close() (err error) {
w.mu.Lock()
if w.stopc != nil {
close(w.stopc)
w.stopc = nil
}
w.mu.Unlock()
close(w.stopc)
<-w.donec
select {
case err = <-w.errc:
@ -302,71 +309,57 @@ func (w *watchGrpcStream) Close() (err error) {
return toErr(w.ctx, err)
}
func (w *watchGrpcStream) addStream(resp *pb.WatchResponse, pendingReq *watchRequest) {
if pendingReq == nil {
// no pending request; ignore
return
}
if resp.Canceled || resp.CompactRevision != 0 {
// a cancel at id creation time means the start revision has
// been compacted out of the store
ret := make(chan WatchResponse, 1)
ret <- WatchResponse{
Header: *resp.Header,
CompactRevision: resp.CompactRevision,
Canceled: true}
close(ret)
pendingReq.retc <- ret
return
}
ret := make(chan WatchResponse)
if resp.WatchId == -1 {
// failed; no channel
close(ret)
pendingReq.retc <- ret
return
}
ws := &watcherStream{
initReq: *pendingReq,
id: resp.WatchId,
outc: ret,
// buffered so unlikely to block on sending while holding mu
recvc: make(chan *WatchResponse, 4),
resumec: make(chan int64),
}
if pendingReq.rev == 0 {
// note the header revision so that a put following a current watcher
// disconnect will arrive on the watcher channel after reconnect
ws.initReq.rev = resp.Header.Revision
}
func (w *watcher) closeStream(wgs *watchGrpcStream) {
w.mu.Lock()
w.streams[ws.id] = ws
close(wgs.donec)
wgs.cancel()
if w.streams != nil {
delete(w.streams, wgs.ctxKey)
}
w.mu.Unlock()
// pass back the subscriber channel for the watcher
pendingReq.retc <- ret
// send messages to subscriber
go w.serveStream(ws)
}
func (w *watchGrpcStream) closeStream(ws *watcherStream) bool {
w.mu.Lock()
// cancels request stream; subscriber receives nil channel
close(ws.initReq.retc)
// close subscriber's channel
close(ws.outc)
delete(w.streams, ws.id)
empty := len(w.streams) == 0
if empty && w.stopc != nil {
w.stopc = nil
func (w *watchGrpcStream) addSubstream(resp *pb.WatchResponse, ws *watcherStream) {
if resp.WatchId == -1 {
// failed; no channel
close(ws.recvc)
return
}
ws.id = resp.WatchId
w.substreams[ws.id] = ws
}
func (w *watchGrpcStream) sendCloseSubstream(ws *watcherStream, resp *WatchResponse) {
select {
case ws.outc <- *resp:
case <-ws.initReq.ctx.Done():
case <-time.After(closeSendErrTimeout):
}
close(ws.outc)
}
func (w *watchGrpcStream) closeSubstream(ws *watcherStream) {
// send channel response in case stream was never established
select {
case ws.initReq.retc <- ws.outc:
default:
}
// close subscriber's channel
if closeErr := w.closeErr; closeErr != nil && ws.initReq.ctx.Err() == nil {
go w.sendCloseSubstream(ws, &WatchResponse{closeErr: w.closeErr})
} else {
close(ws.outc)
}
if ws.id != -1 {
delete(w.substreams, ws.id)
return
}
for i := range w.resuming {
if w.resuming[i] == ws {
w.resuming[i] = nil
return
}
}
w.mu.Unlock()
return empty
}
// run is the root of the goroutines for managing a watcher client
@ -374,66 +367,79 @@ func (w *watchGrpcStream) run() {
var wc pb.Watch_WatchClient
var closeErr error
defer func() {
w.owner.mu.Lock()
w.closeErr = closeErr
if w.owner.streams != nil {
delete(w.owner.streams, w.ctxKey)
}
close(w.donec)
w.owner.mu.Unlock()
w.cancel()
}()
// substreams marked to close but goroutine still running; needed for
// avoiding double-closing recvc on grpc stream teardown
closing := make(map[*watcherStream]struct{})
// already stopped?
w.mu.RLock()
stopc := w.stopc
w.mu.RUnlock()
if stopc == nil {
return
}
defer func() {
w.closeErr = closeErr
// shutdown substreams and resuming substreams
for _, ws := range w.substreams {
if _, ok := closing[ws]; !ok {
close(ws.recvc)
}
}
for _, ws := range w.resuming {
if _, ok := closing[ws]; ws != nil && !ok {
close(ws.recvc)
}
}
w.joinSubstreams()
for toClose := len(w.substreams) + len(w.resuming); toClose > 0; toClose-- {
w.closeSubstream(<-w.closingc)
}
w.owner.closeStream(w)
}()
// start a stream with the etcd grpc server
if wc, closeErr = w.newWatchClient(); closeErr != nil {
return
}
var pendingReq, failedReq *watchRequest
curReqC := w.reqc
cancelSet := make(map[int64]struct{})
for {
select {
// Watch() requested
case pendingReq = <-curReqC:
// no more watch requests until there's a response
curReqC = nil
if err := wc.Send(pendingReq.toPB()); err == nil {
// pendingReq now waits on w.respc
break
case wreq := <-w.reqc:
outc := make(chan WatchResponse, 1)
ws := &watcherStream{
initReq: *wreq,
id: -1,
outc: outc,
// unbufffered so resumes won't cause repeat events
recvc: make(chan *WatchResponse),
}
ws.donec = make(chan struct{})
go w.serveSubstream(ws, w.resumec)
// queue up for watcher creation/resume
w.resuming = append(w.resuming, ws)
if len(w.resuming) == 1 {
// head of resume queue, can register a new watcher
wc.Send(ws.initReq.toPB())
}
failedReq = pendingReq
// New events from the watch client
case pbresp := <-w.respc:
switch {
case pbresp.Created:
// response to pending req, try to add
w.addStream(pbresp, pendingReq)
pendingReq = nil
curReqC = w.reqc
// response to head of queue creation
if ws := w.resuming[0]; ws != nil {
w.addSubstream(pbresp, ws)
w.dispatchEvent(pbresp)
w.resuming[0] = nil
}
if ws := w.nextResume(); ws != nil {
wc.Send(ws.initReq.toPB())
}
case pbresp.Canceled:
delete(cancelSet, pbresp.WatchId)
// shutdown serveStream, if any
w.mu.Lock()
if ws, ok := w.streams[pbresp.WatchId]; ok {
if ws, ok := w.substreams[pbresp.WatchId]; ok {
// signal to stream goroutine to update closingc
close(ws.recvc)
delete(w.streams, ws.id)
}
numStreams := len(w.streams)
w.mu.Unlock()
if numStreams == 0 {
// don't leak watcher streams
return
closing[ws] = struct{}{}
}
default:
// dispatch to appropriate watch stream
@ -454,7 +460,6 @@ func (w *watchGrpcStream) run() {
wc.Send(req)
}
// watch client failed to recv; spawn another if possible
// TODO report watch client errors from errc?
case err := <-w.errc:
if toErr(w.ctx, err) == v3rpc.ErrNoLeader {
closeErr = err
@ -463,52 +468,58 @@ func (w *watchGrpcStream) run() {
if wc, closeErr = w.newWatchClient(); closeErr != nil {
return
}
curReqC = w.reqc
if pendingReq != nil {
failedReq = pendingReq
if ws := w.nextResume(); ws != nil {
wc.Send(ws.initReq.toPB())
}
cancelSet = make(map[int64]struct{})
case <-stopc:
case <-w.stopc:
return
case ws := <-w.closingc:
if w.closeStream(ws) {
w.closeSubstream(ws)
delete(closing, ws)
if len(w.substreams)+len(w.resuming) == 0 {
// no more watchers on this stream, shutdown
return
}
}
// send failed; queue for retry
if failedReq != nil {
go func(wr *watchRequest) {
select {
case w.reqc <- wr:
case <-wr.ctx.Done():
case <-w.donec:
}
}(pendingReq)
failedReq = nil
pendingReq = nil
}
}
}
// nextResume chooses the next resuming to register with the grpc stream. Abandoned
// streams are marked as nil in the queue since the head must wait for its inflight registration.
func (w *watchGrpcStream) nextResume() *watcherStream {
for len(w.resuming) != 0 {
if w.resuming[0] != nil {
return w.resuming[0]
}
w.resuming = w.resuming[1:len(w.resuming)]
}
return nil
}
// dispatchEvent sends a WatchResponse to the appropriate watcher stream
func (w *watchGrpcStream) dispatchEvent(pbresp *pb.WatchResponse) bool {
w.mu.RLock()
defer w.mu.RUnlock()
ws, ok := w.streams[pbresp.WatchId]
ws, ok := w.substreams[pbresp.WatchId]
if !ok {
return false
}
events := make([]*Event, len(pbresp.Events))
for i, ev := range pbresp.Events {
events[i] = (*Event)(ev)
}
if ok {
wr := &WatchResponse{
Header: *pbresp.Header,
Events: events,
CompactRevision: pbresp.CompactRevision,
Canceled: pbresp.Canceled}
ws.recvc <- wr
wr := &WatchResponse{
Header: *pbresp.Header,
Events: events,
CompactRevision: pbresp.CompactRevision,
created: pbresp.Created,
Canceled: pbresp.Canceled,
}
return ok
select {
case ws.recvc <- wr:
case <-ws.donec:
return false
}
return true
}
// serveWatchClient forwards messages from the grpc stream to run()
@ -530,132 +541,123 @@ func (w *watchGrpcStream) serveWatchClient(wc pb.Watch_WatchClient) {
}
}
// serveStream forwards watch responses from run() to the subscriber
func (w *watchGrpcStream) serveStream(ws *watcherStream) {
// serveSubstream forwards watch responses from run() to the subscriber
func (w *watchGrpcStream) serveSubstream(ws *watcherStream, resumec chan struct{}) {
if ws.closing {
panic("created substream goroutine but substream is closing")
}
// nextRev is the minimum expected next revision
nextRev := ws.initReq.rev
resuming := false
defer func() {
// signal that this watcherStream is finished
select {
case w.closingc <- ws:
case <-w.donec:
w.closeStream(ws)
if !resuming {
ws.closing = true
}
close(ws.donec)
if !resuming {
w.closingc <- ws
}
}()
var closeErr error
emptyWr := &WatchResponse{}
wrs := []*WatchResponse{}
resuming := false
closing := false
for !closing {
for {
curWr := emptyWr
outc := ws.outc
if len(wrs) > 0 {
curWr = wrs[0]
if len(ws.buf) > 0 && ws.buf[0].created {
select {
case ws.initReq.retc <- ws.outc:
default:
}
ws.buf = ws.buf[1:]
}
if len(ws.buf) > 0 {
curWr = ws.buf[0]
} else {
outc = nil
}
select {
case outc <- *curWr:
if wrs[0].Err() != nil {
closing = true
break
}
var newRev int64
if len(wrs[0].Events) > 0 {
newRev = wrs[0].Events[len(wrs[0].Events)-1].Kv.ModRevision
} else {
newRev = wrs[0].Header.Revision
}
if newRev != ws.lastRev {
ws.lastRev = newRev
}
wrs[0] = nil
wrs = wrs[1:]
case wr, ok := <-ws.recvc:
if !ok {
// shutdown from closeStream
if ws.buf[0].Err() != nil {
return
}
// resume up to last seen event if disconnected
if resuming && wr.Err() == nil {
resuming = false
// trim events already seen
for i := 0; i < len(wr.Events); i++ {
if wr.Events[i].Kv.ModRevision > ws.lastRev {
wr.Events = wr.Events[i:]
break
}
}
// only forward new events
if wr.Events[0].Kv.ModRevision == ws.lastRev {
break
}
ws.buf[0] = nil
ws.buf = ws.buf[1:]
case wr, ok := <-ws.recvc:
if !ok {
// shutdown from closeSubstream
return
}
resuming = false
// TODO don't keep buffering if subscriber stops reading
wrs = append(wrs, wr)
case resumeRev := <-ws.resumec:
wrs = nil
resuming = true
if resumeRev == -1 {
// pause serving stream while resume gets set up
break
// TODO pause channel if buffer gets too large
ws.buf = append(ws.buf, wr)
nextRev = wr.Header.Revision
if len(wr.Events) > 0 {
nextRev = wr.Events[len(wr.Events)-1].Kv.ModRevision + 1
}
if resumeRev != ws.lastRev {
panic("unexpected resume revision")
}
case <-w.donec:
closing = true
closeErr = w.closeErr
ws.initReq.rev = nextRev
case <-ws.initReq.ctx.Done():
closing = true
return
case <-resumec:
resuming = true
return
}
}
// try to send off close error
if closeErr != nil {
select {
case ws.outc <- WatchResponse{closeErr: w.closeErr}:
case <-w.donec:
case <-time.After(closeSendErrTimeout):
}
}
// lazily send cancel message if events on missing id
}
func (w *watchGrpcStream) newWatchClient() (pb.Watch_WatchClient, error) {
ws, rerr := w.resume()
if rerr != nil {
return nil, rerr
// connect to grpc stream
wc, err := w.openWatchClient()
if err != nil {
return nil, v3rpc.Error(err)
}
go w.serveWatchClient(ws)
return ws, nil
}
// resume creates a new WatchClient with all current watchers reestablished
func (w *watchGrpcStream) resume() (ws pb.Watch_WatchClient, err error) {
for {
if ws, err = w.openWatchClient(); err != nil {
break
} else if err = w.resumeWatchers(ws); err == nil {
break
// mark all substreams as resuming
if len(w.substreams)+len(w.resuming) > 0 {
close(w.resumec)
w.resumec = make(chan struct{})
w.joinSubstreams()
for _, ws := range w.substreams {
ws.id = -1
w.resuming = append(w.resuming, ws)
}
for _, ws := range w.resuming {
if ws == nil || ws.closing {
continue
}
ws.donec = make(chan struct{})
go w.serveSubstream(ws, w.resumec)
}
}
w.substreams = make(map[int64]*watcherStream)
// receive data from new grpc stream
go w.serveWatchClient(wc)
return wc, nil
}
// joinSubstream waits for all substream goroutines to complete
func (w *watchGrpcStream) joinSubstreams() {
for _, ws := range w.substreams {
<-ws.donec
}
for _, ws := range w.resuming {
if ws != nil {
<-ws.donec
}
}
return ws, v3rpc.Error(err)
}
// openWatchClient retries opening a watchclient until retryConnection fails
func (w *watchGrpcStream) openWatchClient() (ws pb.Watch_WatchClient, err error) {
for {
w.mu.Lock()
stopc := w.stopc
w.mu.Unlock()
if stopc == nil {
select {
case <-w.stopc:
if err == nil {
err = context.Canceled
return nil, context.Canceled
}
return nil, err
default:
}
if ws, err = w.remote.Watch(w.ctx, grpc.FailFast(false)); ws != nil && err == nil {
break
@ -667,63 +669,6 @@ func (w *watchGrpcStream) openWatchClient() (ws pb.Watch_WatchClient, err error)
return ws, nil
}
// resumeWatchers rebuilds every registered watcher on a new client
func (w *watchGrpcStream) resumeWatchers(wc pb.Watch_WatchClient) error {
w.mu.RLock()
streams := make([]*watcherStream, 0, len(w.streams))
for _, ws := range w.streams {
streams = append(streams, ws)
}
w.mu.RUnlock()
for _, ws := range streams {
// drain recvc so no old WatchResponses (e.g., Created messages)
// are processed while resuming
ws.drain()
// pause serveStream
ws.resumec <- -1
// reconstruct watcher from initial request
if ws.lastRev != 0 {
ws.initReq.rev = ws.lastRev
}
if err := wc.Send(ws.initReq.toPB()); err != nil {
return err
}
// wait for request ack
resp, err := wc.Recv()
if err != nil {
return err
} else if len(resp.Events) != 0 || !resp.Created {
return fmt.Errorf("watcher: unexpected response (%+v)", resp)
}
// id may be different since new remote watcher; update map
w.mu.Lock()
delete(w.streams, ws.id)
ws.id = resp.WatchId
w.streams[ws.id] = ws
w.mu.Unlock()
// unpause serveStream
ws.resumec <- ws.lastRev
}
return nil
}
// drain removes all buffered WatchResponses from the stream's receive channel.
func (ws *watcherStream) drain() {
for {
select {
case <-ws.recvc:
default:
return
}
}
}
// toPB converts an internal watch request structure to its protobuf messagefunc (wr *watchRequest)
func (wr *watchRequest) toPB() *pb.WatchRequest {
req := &pb.WatchCreateRequest{
@ -731,6 +676,7 @@ func (wr *watchRequest) toPB() *pb.WatchRequest {
Key: []byte(wr.key),
RangeEnd: []byte(wr.end),
ProgressNotify: wr.progressNotify,
PrevKv: wr.prevKV,
}
cr := &pb.WatchRequest_CreateRequest{CreateRequest: req}
return &pb.WatchRequest{RequestUnion: cr}

View File

@ -32,7 +32,7 @@ type watchServer struct {
clusterID int64
memberID int64
raftTimer etcdserver.RaftTimer
watchable mvcc.Watchable
watchable mvcc.WatchableKV
}
func NewWatchServer(s *etcdserver.EtcdServer) pb.WatchServer {
@ -82,6 +82,8 @@ type serverWatchStream struct {
memberID int64
raftTimer etcdserver.RaftTimer
watchable mvcc.WatchableKV
gRPCStream pb.Watch_WatchServer
watchStream mvcc.WatchStream
ctrlStream chan *pb.WatchResponse
@ -91,6 +93,7 @@ type serverWatchStream struct {
// progress tracks the watchID that stream might need to send
// progress to.
progress map[mvcc.WatchID]bool
prevKV map[mvcc.WatchID]bool
// closec indicates the stream is closed.
closec chan struct{}
@ -101,14 +104,18 @@ type serverWatchStream struct {
func (ws *watchServer) Watch(stream pb.Watch_WatchServer) (err error) {
sws := serverWatchStream{
clusterID: ws.clusterID,
memberID: ws.memberID,
raftTimer: ws.raftTimer,
clusterID: ws.clusterID,
memberID: ws.memberID,
raftTimer: ws.raftTimer,
watchable: ws.watchable,
gRPCStream: stream,
watchStream: ws.watchable.NewWatchStream(),
// chan for sending control response like watcher created and canceled.
ctrlStream: make(chan *pb.WatchResponse, ctrlStreamBufLen),
progress: make(map[mvcc.WatchID]bool),
prevKV: make(map[mvcc.WatchID]bool),
closec: make(chan struct{}),
}
@ -170,9 +177,14 @@ func (sws *serverWatchStream) recvLoop() error {
rev = wsrev + 1
}
id := sws.watchStream.Watch(creq.Key, creq.RangeEnd, rev)
if id != -1 && creq.ProgressNotify {
if id != -1 {
sws.mu.Lock()
sws.progress[id] = true
if creq.ProgressNotify {
sws.progress[id] = true
}
if creq.PrevKv {
sws.prevKV[id] = true
}
sws.mu.Unlock()
}
wr := &pb.WatchResponse{
@ -198,6 +210,7 @@ func (sws *serverWatchStream) recvLoop() error {
}
sws.mu.Lock()
delete(sws.progress, mvcc.WatchID(id))
delete(sws.prevKV, mvcc.WatchID(id))
sws.mu.Unlock()
}
}
@ -244,8 +257,19 @@ func (sws *serverWatchStream) sendLoop() {
// or define protocol buffer with []mvccpb.Event.
evs := wresp.Events
events := make([]*mvccpb.Event, len(evs))
sws.mu.Lock()
needPrevKV := sws.prevKV[wresp.WatchID]
sws.mu.Unlock()
for i := range evs {
events[i] = &evs[i]
if needPrevKV {
opt := mvcc.RangeOptions{Rev: evs[i].Kv.ModRevision - 1}
r, err := sws.watchable.Range(evs[i].Kv.Key, nil, opt)
if err == nil && len(r.KVs) != 0 {
events[i].PrevKv = &(r.KVs[0])
}
}
}
wr := &pb.WatchResponse{

View File

@ -159,6 +159,22 @@ func (a *applierV3backend) Put(txnID int64, p *pb.PutRequest) (*pb.PutResponse,
rev int64
err error
)
var rr *mvcc.RangeResult
if p.PrevKv {
if txnID != noTxn {
rr, err = a.s.KV().TxnRange(txnID, p.Key, nil, mvcc.RangeOptions{})
if err != nil {
return nil, err
}
} else {
rr, err = a.s.KV().Range(p.Key, nil, mvcc.RangeOptions{})
if err != nil {
return nil, err
}
}
}
if txnID != noTxn {
rev, err = a.s.KV().TxnPut(txnID, p.Key, p.Value, lease.LeaseID(p.Lease))
if err != nil {
@ -174,6 +190,9 @@ func (a *applierV3backend) Put(txnID int64, p *pb.PutRequest) (*pb.PutResponse,
rev = a.s.KV().Put(p.Key, p.Value, leaseID)
}
resp.Header.Revision = rev
if rr != nil && len(rr.KVs) != 0 {
resp.PrevKv = &rr.KVs[0]
}
return resp, nil
}
@ -191,6 +210,21 @@ func (a *applierV3backend) DeleteRange(txnID int64, dr *pb.DeleteRangeRequest) (
dr.RangeEnd = []byte{}
}
var rr *mvcc.RangeResult
if dr.PrevKv {
if txnID != noTxn {
rr, err = a.s.KV().TxnRange(txnID, dr.Key, dr.RangeEnd, mvcc.RangeOptions{})
if err != nil {
return nil, err
}
} else {
rr, err = a.s.KV().Range(dr.Key, dr.RangeEnd, mvcc.RangeOptions{})
if err != nil {
return nil, err
}
}
}
if txnID != noTxn {
n, rev, err = a.s.KV().TxnDeleteRange(txnID, dr.Key, dr.RangeEnd)
if err != nil {
@ -201,6 +235,11 @@ func (a *applierV3backend) DeleteRange(txnID int64, dr *pb.DeleteRangeRequest) (
}
resp.Deleted = n
if rr != nil {
for i := range rr.KVs {
resp.PrevKvs = append(resp.PrevKvs, &rr.KVs[i])
}
}
resp.Header.Revision = rev
return resp, nil
}

View File

@ -56,6 +56,9 @@ func (aa *authApplierV3) Put(txnID int64, r *pb.PutRequest) (*pb.PutResponse, er
if !aa.as.IsPutPermitted(aa.user, r.Key) {
return nil, auth.ErrPermissionDenied
}
if r.PrevKv && !aa.as.IsRangePermitted(aa.user, r.Key, nil) {
return nil, auth.ErrPermissionDenied
}
return aa.applierV3.Put(txnID, r)
}
@ -70,6 +73,9 @@ func (aa *authApplierV3) DeleteRange(txnID int64, r *pb.DeleteRangeRequest) (*pb
if !aa.as.IsDeleteRangePermitted(aa.user, r.Key, r.RangeEnd) {
return nil, auth.ErrPermissionDenied
}
if r.PrevKv && !aa.as.IsRangePermitted(aa.user, r.Key, r.RangeEnd) {
return nil, auth.ErrPermissionDenied
}
return aa.applierV3.DeleteRange(txnID, r)
}
@ -99,7 +105,7 @@ func (aa *authApplierV3) checkTxnReqsPermission(reqs []*pb.RequestOp) bool {
continue
}
if !aa.as.IsDeleteRangePermitted(aa.user, tv.RequestDeleteRange.Key, tv.RequestDeleteRange.RangeEnd) {
if tv.RequestDeleteRange.PrevKv && !aa.as.IsRangePermitted(aa.user, tv.RequestDeleteRange.Key, tv.RequestDeleteRange.RangeEnd) {
return false
}
}

View File

@ -102,9 +102,9 @@ import (
proto "github.com/golang/protobuf/proto"
math "math"
)
import io "io"
io "io"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal

View File

@ -10,9 +10,9 @@ import (
proto "github.com/golang/protobuf/proto"
math "math"
)
import io "io"
io "io"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal

File diff suppressed because it is too large Load Diff

View File

@ -396,10 +396,16 @@ message PutRequest {
// lease is the lease ID to associate with the key in the key-value store. A lease
// value of 0 indicates no lease.
int64 lease = 3;
// If prev_kv is set, etcd gets the previous key-value pair before changing it.
// The previous key-value pair will be returned in the put response.
bool prev_kv = 4;
}
message PutResponse {
ResponseHeader header = 1;
// if prev_kv is set in the request, the previous key-value pair will be returned.
mvccpb.KeyValue prev_kv = 2;
}
message DeleteRangeRequest {
@ -409,12 +415,17 @@ message DeleteRangeRequest {
// If range_end is not given, the range is defined to contain only the key argument.
// If range_end is '\0', the range is all keys greater than or equal to the key argument.
bytes range_end = 2;
// If prev_kv is set, etcd gets the previous key-value pairs before deleting it.
// The previous key-value pairs will be returned in the delte response.
bool prev_kv = 3;
}
message DeleteRangeResponse {
ResponseHeader header = 1;
// deleted is the number of keys deleted by the delete range request.
int64 deleted = 2;
// if prev_kv is set in the request, the previous key-value pairs will be returned.
repeated mvccpb.KeyValue prev_kvs = 3;
}
message RequestOp {
@ -563,6 +574,9 @@ message WatchCreateRequest {
// wish to recover a disconnected watcher starting from a recent known revision.
// The etcd server may decide how often it will send notifications based on current load.
bool progress_notify = 4;
// If prev_kv is set, created watcher gets the previous KV before the event happens.
// If the previous KV is already compacted, nothing will be returned.
bool prev_kv = 6;
}
message WatchCancelRequest {

View File

@ -551,4 +551,4 @@ func (s *EtcdServer) processInternalRaftRequest(ctx context.Context, r pb.Intern
}
// Watchable returns a watchable interface attached to the etcdserver.
func (s *EtcdServer) Watchable() mvcc.Watchable { return s.KV() }
func (s *EtcdServer) Watchable() mvcc.WatchableKV { return s.KV() }

View File

@ -19,9 +19,9 @@ import (
proto "github.com/golang/protobuf/proto"
math "math"
)
import io "io"
io "io"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal

View File

@ -20,9 +20,9 @@ import (
proto "github.com/golang/protobuf/proto"
math "math"
)
import io "io"
io "io"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
@ -89,6 +89,8 @@ type Event struct {
// A DELETE/EXPIRE event contains the deleted key with
// its modification revision set to the revision of deletion.
Kv *KeyValue `protobuf:"bytes,2,opt,name=kv" json:"kv,omitempty"`
// prev_kv holds the key-value pair before the event happens.
PrevKv *KeyValue `protobuf:"bytes,3,opt,name=prev_kv,json=prevKv" json:"prev_kv,omitempty"`
}
func (m *Event) Reset() { *m = Event{} }
@ -181,6 +183,16 @@ func (m *Event) MarshalTo(data []byte) (int, error) {
}
i += n1
}
if m.PrevKv != nil {
data[i] = 0x1a
i++
i = encodeVarintKv(data, i, uint64(m.PrevKv.Size()))
n2, err := m.PrevKv.MarshalTo(data[i:])
if err != nil {
return 0, err
}
i += n2
}
return i, nil
}
@ -247,6 +259,10 @@ func (m *Event) Size() (n int) {
l = m.Kv.Size()
n += 1 + l + sovKv(uint64(l))
}
if m.PrevKv != nil {
l = m.PrevKv.Size()
n += 1 + l + sovKv(uint64(l))
}
return n
}
@ -532,6 +548,39 @@ func (m *Event) Unmarshal(data []byte) error {
return err
}
iNdEx = postIndex
case 3:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field PrevKv", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowKv
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := data[iNdEx]
iNdEx++
msglen |= (int(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthKv
}
postIndex := iNdEx + msglen
if postIndex > l {
return io.ErrUnexpectedEOF
}
if m.PrevKv == nil {
m.PrevKv = &KeyValue{}
}
if err := m.PrevKv.Unmarshal(data[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipKv(data[iNdEx:])
@ -659,23 +708,24 @@ var (
)
var fileDescriptorKv = []byte{
// 285 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x44, 0x90, 0x41, 0x4a, 0xc3, 0x40,
0x14, 0x86, 0x33, 0x4d, 0x9b, 0xd6, 0xd7, 0x52, 0xc3, 0x50, 0x70, 0x70, 0x31, 0xc4, 0x6e, 0x2c,
0x08, 0x11, 0xea, 0x0d, 0xc4, 0xac, 0x74, 0x21, 0x21, 0xba, 0x95, 0x34, 0x7d, 0x94, 0x92, 0xa6,
0x13, 0xd2, 0x38, 0x98, 0x9b, 0x78, 0x0a, 0xcf, 0xd1, 0x65, 0x8f, 0x60, 0xe3, 0x45, 0x24, 0x6f,
0x4c, 0xdd, 0x0c, 0xef, 0xff, 0xff, 0x6f, 0x98, 0xff, 0x0d, 0x0c, 0x52, 0xed, 0xe7, 0x85, 0x2a,
0x15, 0x77, 0x32, 0x9d, 0x24, 0xf9, 0xe2, 0x72, 0xb2, 0x52, 0x2b, 0x45, 0xd6, 0x6d, 0x33, 0x99,
0x74, 0xfa, 0xc5, 0x60, 0xf0, 0x88, 0xd5, 0x6b, 0xbc, 0x79, 0x47, 0xee, 0x82, 0x9d, 0x62, 0x25,
0x98, 0xc7, 0x66, 0xa3, 0xb0, 0x19, 0xf9, 0x35, 0x9c, 0x27, 0x05, 0xc6, 0x25, 0xbe, 0x15, 0xa8,
0xd7, 0xbb, 0xb5, 0xda, 0x8a, 0x8e, 0xc7, 0x66, 0x76, 0x38, 0x36, 0x76, 0xf8, 0xe7, 0xf2, 0x2b,
0x18, 0x65, 0x6a, 0xf9, 0x4f, 0xd9, 0x44, 0x0d, 0x33, 0xb5, 0x3c, 0x21, 0x02, 0xfa, 0x1a, 0x0b,
0x4a, 0xbb, 0x94, 0xb6, 0x92, 0x4f, 0xa0, 0xa7, 0x9b, 0x02, 0xa2, 0x47, 0x2f, 0x1b, 0xd1, 0xb8,
0x1b, 0x8c, 0x77, 0x28, 0x1c, 0xa2, 0x8d, 0x98, 0x7e, 0x40, 0x2f, 0xd0, 0xb8, 0x2d, 0xf9, 0x0d,
0x74, 0xcb, 0x2a, 0x47, 0x6a, 0x3b, 0x9e, 0x5f, 0xf8, 0x66, 0x4d, 0x9f, 0x42, 0x73, 0x46, 0x55,
0x8e, 0x21, 0x41, 0xdc, 0x83, 0x4e, 0xaa, 0xa9, 0xfa, 0x70, 0xee, 0xb6, 0x68, 0xbb, 0x77, 0xd8,
0x49, 0xf5, 0xd4, 0x83, 0xb3, 0xd3, 0x25, 0xde, 0x07, 0xfb, 0xf9, 0x25, 0x72, 0x2d, 0x0e, 0xe0,
0x3c, 0x04, 0x4f, 0x41, 0x14, 0xb8, 0xec, 0x5e, 0xec, 0x8f, 0xd2, 0x3a, 0x1c, 0xa5, 0xb5, 0xaf,
0x25, 0x3b, 0xd4, 0x92, 0x7d, 0xd7, 0x92, 0x7d, 0xfe, 0x48, 0x6b, 0xe1, 0xd0, 0x5f, 0xde, 0xfd,
0x06, 0x00, 0x00, 0xff, 0xff, 0xd6, 0x21, 0x8f, 0x2c, 0x75, 0x01, 0x00, 0x00,
// 303 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x6c, 0x90, 0x41, 0x4e, 0xc2, 0x40,
0x14, 0x86, 0x3b, 0x14, 0x0a, 0x3e, 0x08, 0x36, 0x13, 0x12, 0x27, 0x2e, 0x26, 0x95, 0x8d, 0x18,
0x13, 0x4c, 0xf0, 0x06, 0xc6, 0xae, 0x70, 0x61, 0x1a, 0x74, 0x4b, 0x4a, 0x79, 0x21, 0xa4, 0x94,
0x69, 0x4a, 0x9d, 0xa4, 0x37, 0x71, 0xef, 0xde, 0x73, 0xb0, 0xe4, 0x08, 0x52, 0x2f, 0x62, 0xfa,
0xc6, 0xe2, 0xc6, 0xcd, 0xe4, 0xfd, 0xff, 0xff, 0x65, 0xe6, 0x7f, 0x03, 0x9d, 0x58, 0x8f, 0xd3,
0x4c, 0xe5, 0x8a, 0x3b, 0x89, 0x8e, 0xa2, 0x74, 0x71, 0x39, 0x58, 0xa9, 0x95, 0x22, 0xeb, 0xae,
0x9a, 0x4c, 0x3a, 0xfc, 0x64, 0xd0, 0x99, 0x62, 0xf1, 0x1a, 0x6e, 0xde, 0x90, 0xbb, 0x60, 0xc7,
0x58, 0x08, 0xe6, 0xb1, 0x51, 0x2f, 0xa8, 0x46, 0x7e, 0x0d, 0xe7, 0x51, 0x86, 0x61, 0x8e, 0xf3,
0x0c, 0xf5, 0x7a, 0xb7, 0x56, 0x5b, 0xd1, 0xf0, 0xd8, 0xc8, 0x0e, 0xfa, 0xc6, 0x0e, 0x7e, 0x5d,
0x7e, 0x05, 0xbd, 0x44, 0x2d, 0xff, 0x28, 0x9b, 0xa8, 0x6e, 0xa2, 0x96, 0x27, 0x44, 0x40, 0x5b,
0x63, 0x46, 0x69, 0x93, 0xd2, 0x5a, 0xf2, 0x01, 0xb4, 0x74, 0x55, 0x40, 0xb4, 0xe8, 0x65, 0x23,
0x2a, 0x77, 0x83, 0xe1, 0x0e, 0x85, 0x43, 0xb4, 0x11, 0xc3, 0x0f, 0x06, 0x2d, 0x5f, 0xe3, 0x36,
0xe7, 0xb7, 0xd0, 0xcc, 0x8b, 0x14, 0xa9, 0x6e, 0x7f, 0x72, 0x31, 0x36, 0x7b, 0x8e, 0x29, 0x34,
0xe7, 0xac, 0x48, 0x31, 0x20, 0x88, 0x7b, 0xd0, 0x88, 0x35, 0x75, 0xef, 0x4e, 0xdc, 0x1a, 0xad,
0x17, 0x0f, 0x1a, 0xb1, 0xe6, 0x37, 0xd0, 0x4e, 0x33, 0xd4, 0xf3, 0x58, 0x53, 0xf9, 0xff, 0x30,
0xa7, 0x02, 0xa6, 0x7a, 0xe8, 0xc1, 0xd9, 0xe9, 0x7e, 0xde, 0x06, 0xfb, 0xf9, 0x65, 0xe6, 0x5a,
0x1c, 0xc0, 0x79, 0xf4, 0x9f, 0xfc, 0x99, 0xef, 0xb2, 0x07, 0xb1, 0x3f, 0x4a, 0xeb, 0x70, 0x94,
0xd6, 0xbe, 0x94, 0xec, 0x50, 0x4a, 0xf6, 0x55, 0x4a, 0xf6, 0xfe, 0x2d, 0xad, 0x85, 0x43, 0xff,
0x7e, 0xff, 0x13, 0x00, 0x00, 0xff, 0xff, 0xb5, 0x45, 0x92, 0x5d, 0xa1, 0x01, 0x00, 0x00,
}

View File

@ -43,4 +43,6 @@ message Event {
// A DELETE/EXPIRE event contains the deleted key with
// its modification revision set to the revision of deletion.
KeyValue kv = 2;
// prev_kv holds the key-value pair before the event happens.
KeyValue prev_kv = 3;
}

View File

@ -38,9 +38,12 @@ type PageWriter struct {
bufWatermarkBytes int
}
func NewPageWriter(w io.Writer, pageBytes int) *PageWriter {
// NewPageWriter creates a new PageWriter. pageBytes is the number of bytes
// to write per page. pageOffset is the starting offset of io.Writer.
func NewPageWriter(w io.Writer, pageBytes, pageOffset int) *PageWriter {
return &PageWriter{
w: w,
pageOffset: pageOffset,
pageBytes: pageBytes,
buf: make([]byte, defaultBufferBytes+pageBytes),
bufWatermarkBytes: defaultBufferBytes,

View File

@ -25,9 +25,9 @@ import (
proto "github.com/golang/protobuf/proto"
math "math"
)
import io "io"
io "io"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal

View File

@ -19,9 +19,9 @@ import (
proto "github.com/golang/protobuf/proto"
math "math"
)
import io "io"
io "io"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal

View File

@ -29,7 +29,7 @@ import (
var (
// MinClusterVersion is the min cluster version this etcd binary is compatible with.
MinClusterVersion = "2.3.0"
Version = "3.0.10"
Version = "3.0.12"
// Git SHA Value will be set during build
GitSHA = "Not provided (use ./build instead of go build)"

View File

@ -18,6 +18,7 @@ import (
"encoding/binary"
"hash"
"io"
"os"
"sync"
"github.com/coreos/etcd/pkg/crc"
@ -39,9 +40,9 @@ type encoder struct {
uint64buf []byte
}
func newEncoder(w io.Writer, prevCrc uint32) *encoder {
func newEncoder(w io.Writer, prevCrc uint32, pageOffset int) *encoder {
return &encoder{
bw: ioutil.NewPageWriter(w, walPageBytes),
bw: ioutil.NewPageWriter(w, walPageBytes, pageOffset),
crc: crc.New(prevCrc, crcTable),
// 1MB buffer
buf: make([]byte, 1024*1024),
@ -49,6 +50,15 @@ func newEncoder(w io.Writer, prevCrc uint32) *encoder {
}
}
// newFileEncoder creates a new encoder with current file offset for the page writer.
func newFileEncoder(f *os.File, prevCrc uint32) (*encoder, error) {
offset, err := f.Seek(0, os.SEEK_CUR)
if err != nil {
return nil, err
}
return newEncoder(f, prevCrc, int(offset)), nil
}
func (e *encoder) encode(rec *walpb.Record) error {
e.mu.Lock()
defer e.mu.Unlock()

View File

@ -120,7 +120,10 @@ func Create(dirpath string, metadata []byte) (*WAL, error) {
w := &WAL{
dir: dirpath,
metadata: metadata,
encoder: newEncoder(f, 0),
}
w.encoder, err = newFileEncoder(f.File, 0)
if err != nil {
return nil, err
}
w.locks = append(w.locks, f)
if err = w.saveCrc(0); err != nil {
@ -341,7 +344,10 @@ func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb.
if w.tail() != nil {
// create encoder (chain crc with the decoder), enable appending
w.encoder = newEncoder(w.tail(), w.decoder.lastCRC())
w.encoder, err = newFileEncoder(w.tail().File, w.decoder.lastCRC())
if err != nil {
return
}
}
w.decoder = nil
@ -375,7 +381,10 @@ func (w *WAL) cut() error {
// update writer and save the previous crc
w.locks = append(w.locks, newTail)
prevCrc := w.encoder.crc.Sum32()
w.encoder = newEncoder(w.tail(), prevCrc)
w.encoder, err = newFileEncoder(w.tail().File, prevCrc)
if err != nil {
return err
}
if err = w.saveCrc(prevCrc); err != nil {
return err
}
@ -414,7 +423,10 @@ func (w *WAL) cut() error {
w.locks[len(w.locks)-1] = newTail
prevCrc = w.encoder.crc.Sum32()
w.encoder = newEncoder(w.tail(), prevCrc)
w.encoder, err = newFileEncoder(w.tail().File, prevCrc)
if err != nil {
return err
}
plog.Infof("segmented wal file %v is created", fpath)
return nil

View File

@ -20,9 +20,9 @@ import (
proto "github.com/golang/protobuf/proto"
math "math"
)
import io "io"
io "io"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal

View File

@ -61,7 +61,7 @@ func (p *prober) AddHTTP(id string, probingInterval time.Duration, endpoints []s
}
resp, err := p.tr.RoundTrip(req)
if err != nil {
s.recordFailure()
s.recordFailure(err)
pinned = (pinned + 1) % len(endpoints)
continue
}
@ -71,7 +71,7 @@ func (p *prober) AddHTTP(id string, probingInterval time.Duration, endpoints []s
err = d.Decode(&hh)
resp.Body.Close()
if err != nil || !hh.OK {
s.recordFailure()
s.recordFailure(err)
pinned = (pinned + 1) % len(endpoints)
continue
}

View File

@ -14,6 +14,7 @@ type Status interface {
Total() int64
Loss() int64
Health() bool
Err() error
// Estimated smoothed round trip time
SRTT() time.Duration
// Estimated clock difference
@ -27,6 +28,7 @@ type status struct {
total int64
loss int64
health bool
err error
clockdiff time.Duration
stopC chan struct{}
}
@ -56,6 +58,12 @@ func (s *status) Health() bool {
return s.health
}
func (s *status) Err() error {
s.mu.Lock()
defer s.mu.Unlock()
return s.err
}
func (s *status) ClockDiff() time.Duration {
s.mu.Lock()
defer s.mu.Unlock()
@ -74,15 +82,17 @@ func (s *status) record(rtt time.Duration, when time.Time) {
s.health = true
s.srtt = time.Duration((1-α)*float64(s.srtt) + α*float64(rtt))
s.clockdiff = time.Now().Sub(when) - s.srtt/2
s.err = nil
}
func (s *status) recordFailure() {
func (s *status) recordFailure(err error) {
s.mu.Lock()
defer s.mu.Unlock()
s.total++
s.health = false
s.loss += 1
s.err = err
}
func (s *status) reset() {
@ -91,6 +101,8 @@ func (s *status) reset() {
s.srtt = 0
s.total = 0
s.loss = 0
s.health = false
s.clockdiff = 0
s.err = nil
}