Merge pull request #34246 from hongchaodeng/etcddep

Automatic merge from submit-queue

etcd3: use PrevKV to remove additional get

ref: #https://github.com/kubernetes/kubernetes/issues/33653

We are trying to test using PrevKV feature and see if it improves performance.
In order to test this, we will need etcd v3.1 (alpha) image.

Blockers:
- update gcr.io image (version v3.0.12)
This commit is contained in:
Kubernetes Submit Queue 2016-10-11 01:16:59 -07:00 committed by GitHub
commit 0b627334df
27 changed files with 1139 additions and 776 deletions

210
Godeps/Godeps.json generated
View File

@ -353,263 +353,263 @@
}, },
{ {
"ImportPath": "github.com/coreos/etcd/alarm", "ImportPath": "github.com/coreos/etcd/alarm",
"Comment": "v3.0.10", "Comment": "v3.0.12",
"Rev": "546c0f7ed65523c24390f0f26c7e4af2232f52d2" "Rev": "2d1e2e8e646e65f73ca0d9ee905a5343d6135a50"
}, },
{ {
"ImportPath": "github.com/coreos/etcd/auth", "ImportPath": "github.com/coreos/etcd/auth",
"Comment": "v3.0.10", "Comment": "v3.0.12",
"Rev": "546c0f7ed65523c24390f0f26c7e4af2232f52d2" "Rev": "2d1e2e8e646e65f73ca0d9ee905a5343d6135a50"
}, },
{ {
"ImportPath": "github.com/coreos/etcd/auth/authpb", "ImportPath": "github.com/coreos/etcd/auth/authpb",
"Comment": "v3.0.10", "Comment": "v3.0.12",
"Rev": "546c0f7ed65523c24390f0f26c7e4af2232f52d2" "Rev": "2d1e2e8e646e65f73ca0d9ee905a5343d6135a50"
}, },
{ {
"ImportPath": "github.com/coreos/etcd/client", "ImportPath": "github.com/coreos/etcd/client",
"Comment": "v3.0.10", "Comment": "v3.0.12",
"Rev": "546c0f7ed65523c24390f0f26c7e4af2232f52d2" "Rev": "2d1e2e8e646e65f73ca0d9ee905a5343d6135a50"
}, },
{ {
"ImportPath": "github.com/coreos/etcd/clientv3", "ImportPath": "github.com/coreos/etcd/clientv3",
"Comment": "v3.0.10", "Comment": "v3.0.12",
"Rev": "546c0f7ed65523c24390f0f26c7e4af2232f52d2" "Rev": "2d1e2e8e646e65f73ca0d9ee905a5343d6135a50"
}, },
{ {
"ImportPath": "github.com/coreos/etcd/compactor", "ImportPath": "github.com/coreos/etcd/compactor",
"Comment": "v3.0.10", "Comment": "v3.0.12",
"Rev": "546c0f7ed65523c24390f0f26c7e4af2232f52d2" "Rev": "2d1e2e8e646e65f73ca0d9ee905a5343d6135a50"
}, },
{ {
"ImportPath": "github.com/coreos/etcd/discovery", "ImportPath": "github.com/coreos/etcd/discovery",
"Comment": "v3.0.10", "Comment": "v3.0.12",
"Rev": "546c0f7ed65523c24390f0f26c7e4af2232f52d2" "Rev": "2d1e2e8e646e65f73ca0d9ee905a5343d6135a50"
}, },
{ {
"ImportPath": "github.com/coreos/etcd/error", "ImportPath": "github.com/coreos/etcd/error",
"Comment": "v3.0.10", "Comment": "v3.0.12",
"Rev": "546c0f7ed65523c24390f0f26c7e4af2232f52d2" "Rev": "2d1e2e8e646e65f73ca0d9ee905a5343d6135a50"
}, },
{ {
"ImportPath": "github.com/coreos/etcd/etcdserver", "ImportPath": "github.com/coreos/etcd/etcdserver",
"Comment": "v3.0.10", "Comment": "v3.0.12",
"Rev": "546c0f7ed65523c24390f0f26c7e4af2232f52d2" "Rev": "2d1e2e8e646e65f73ca0d9ee905a5343d6135a50"
}, },
{ {
"ImportPath": "github.com/coreos/etcd/etcdserver/api", "ImportPath": "github.com/coreos/etcd/etcdserver/api",
"Comment": "v3.0.10", "Comment": "v3.0.12",
"Rev": "546c0f7ed65523c24390f0f26c7e4af2232f52d2" "Rev": "2d1e2e8e646e65f73ca0d9ee905a5343d6135a50"
}, },
{ {
"ImportPath": "github.com/coreos/etcd/etcdserver/api/v2http", "ImportPath": "github.com/coreos/etcd/etcdserver/api/v2http",
"Comment": "v3.0.10", "Comment": "v3.0.12",
"Rev": "546c0f7ed65523c24390f0f26c7e4af2232f52d2" "Rev": "2d1e2e8e646e65f73ca0d9ee905a5343d6135a50"
}, },
{ {
"ImportPath": "github.com/coreos/etcd/etcdserver/api/v2http/httptypes", "ImportPath": "github.com/coreos/etcd/etcdserver/api/v2http/httptypes",
"Comment": "v3.0.10", "Comment": "v3.0.12",
"Rev": "546c0f7ed65523c24390f0f26c7e4af2232f52d2" "Rev": "2d1e2e8e646e65f73ca0d9ee905a5343d6135a50"
}, },
{ {
"ImportPath": "github.com/coreos/etcd/etcdserver/api/v3rpc", "ImportPath": "github.com/coreos/etcd/etcdserver/api/v3rpc",
"Comment": "v3.0.10", "Comment": "v3.0.12",
"Rev": "546c0f7ed65523c24390f0f26c7e4af2232f52d2" "Rev": "2d1e2e8e646e65f73ca0d9ee905a5343d6135a50"
}, },
{ {
"ImportPath": "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes", "ImportPath": "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes",
"Comment": "v3.0.10", "Comment": "v3.0.12",
"Rev": "546c0f7ed65523c24390f0f26c7e4af2232f52d2" "Rev": "2d1e2e8e646e65f73ca0d9ee905a5343d6135a50"
}, },
{ {
"ImportPath": "github.com/coreos/etcd/etcdserver/auth", "ImportPath": "github.com/coreos/etcd/etcdserver/auth",
"Comment": "v3.0.10", "Comment": "v3.0.12",
"Rev": "546c0f7ed65523c24390f0f26c7e4af2232f52d2" "Rev": "2d1e2e8e646e65f73ca0d9ee905a5343d6135a50"
}, },
{ {
"ImportPath": "github.com/coreos/etcd/etcdserver/etcdserverpb", "ImportPath": "github.com/coreos/etcd/etcdserver/etcdserverpb",
"Comment": "v3.0.10", "Comment": "v3.0.12",
"Rev": "546c0f7ed65523c24390f0f26c7e4af2232f52d2" "Rev": "2d1e2e8e646e65f73ca0d9ee905a5343d6135a50"
}, },
{ {
"ImportPath": "github.com/coreos/etcd/etcdserver/membership", "ImportPath": "github.com/coreos/etcd/etcdserver/membership",
"Comment": "v3.0.10", "Comment": "v3.0.12",
"Rev": "546c0f7ed65523c24390f0f26c7e4af2232f52d2" "Rev": "2d1e2e8e646e65f73ca0d9ee905a5343d6135a50"
}, },
{ {
"ImportPath": "github.com/coreos/etcd/etcdserver/stats", "ImportPath": "github.com/coreos/etcd/etcdserver/stats",
"Comment": "v3.0.10", "Comment": "v3.0.12",
"Rev": "546c0f7ed65523c24390f0f26c7e4af2232f52d2" "Rev": "2d1e2e8e646e65f73ca0d9ee905a5343d6135a50"
}, },
{ {
"ImportPath": "github.com/coreos/etcd/integration", "ImportPath": "github.com/coreos/etcd/integration",
"Comment": "v3.0.10", "Comment": "v3.0.12",
"Rev": "546c0f7ed65523c24390f0f26c7e4af2232f52d2" "Rev": "2d1e2e8e646e65f73ca0d9ee905a5343d6135a50"
}, },
{ {
"ImportPath": "github.com/coreos/etcd/lease", "ImportPath": "github.com/coreos/etcd/lease",
"Comment": "v3.0.10", "Comment": "v3.0.12",
"Rev": "546c0f7ed65523c24390f0f26c7e4af2232f52d2" "Rev": "2d1e2e8e646e65f73ca0d9ee905a5343d6135a50"
}, },
{ {
"ImportPath": "github.com/coreos/etcd/lease/leasehttp", "ImportPath": "github.com/coreos/etcd/lease/leasehttp",
"Comment": "v3.0.10", "Comment": "v3.0.12",
"Rev": "546c0f7ed65523c24390f0f26c7e4af2232f52d2" "Rev": "2d1e2e8e646e65f73ca0d9ee905a5343d6135a50"
}, },
{ {
"ImportPath": "github.com/coreos/etcd/lease/leasepb", "ImportPath": "github.com/coreos/etcd/lease/leasepb",
"Comment": "v3.0.10", "Comment": "v3.0.12",
"Rev": "546c0f7ed65523c24390f0f26c7e4af2232f52d2" "Rev": "2d1e2e8e646e65f73ca0d9ee905a5343d6135a50"
}, },
{ {
"ImportPath": "github.com/coreos/etcd/mvcc", "ImportPath": "github.com/coreos/etcd/mvcc",
"Comment": "v3.0.10", "Comment": "v3.0.12",
"Rev": "546c0f7ed65523c24390f0f26c7e4af2232f52d2" "Rev": "2d1e2e8e646e65f73ca0d9ee905a5343d6135a50"
}, },
{ {
"ImportPath": "github.com/coreos/etcd/mvcc/backend", "ImportPath": "github.com/coreos/etcd/mvcc/backend",
"Comment": "v3.0.10", "Comment": "v3.0.12",
"Rev": "546c0f7ed65523c24390f0f26c7e4af2232f52d2" "Rev": "2d1e2e8e646e65f73ca0d9ee905a5343d6135a50"
}, },
{ {
"ImportPath": "github.com/coreos/etcd/mvcc/mvccpb", "ImportPath": "github.com/coreos/etcd/mvcc/mvccpb",
"Comment": "v3.0.10", "Comment": "v3.0.12",
"Rev": "546c0f7ed65523c24390f0f26c7e4af2232f52d2" "Rev": "2d1e2e8e646e65f73ca0d9ee905a5343d6135a50"
}, },
{ {
"ImportPath": "github.com/coreos/etcd/pkg/adt", "ImportPath": "github.com/coreos/etcd/pkg/adt",
"Comment": "v3.0.10", "Comment": "v3.0.12",
"Rev": "546c0f7ed65523c24390f0f26c7e4af2232f52d2" "Rev": "2d1e2e8e646e65f73ca0d9ee905a5343d6135a50"
}, },
{ {
"ImportPath": "github.com/coreos/etcd/pkg/contention", "ImportPath": "github.com/coreos/etcd/pkg/contention",
"Comment": "v3.0.10", "Comment": "v3.0.12",
"Rev": "546c0f7ed65523c24390f0f26c7e4af2232f52d2" "Rev": "2d1e2e8e646e65f73ca0d9ee905a5343d6135a50"
}, },
{ {
"ImportPath": "github.com/coreos/etcd/pkg/crc", "ImportPath": "github.com/coreos/etcd/pkg/crc",
"Comment": "v3.0.10", "Comment": "v3.0.12",
"Rev": "546c0f7ed65523c24390f0f26c7e4af2232f52d2" "Rev": "2d1e2e8e646e65f73ca0d9ee905a5343d6135a50"
}, },
{ {
"ImportPath": "github.com/coreos/etcd/pkg/fileutil", "ImportPath": "github.com/coreos/etcd/pkg/fileutil",
"Comment": "v3.0.10", "Comment": "v3.0.12",
"Rev": "546c0f7ed65523c24390f0f26c7e4af2232f52d2" "Rev": "2d1e2e8e646e65f73ca0d9ee905a5343d6135a50"
}, },
{ {
"ImportPath": "github.com/coreos/etcd/pkg/httputil", "ImportPath": "github.com/coreos/etcd/pkg/httputil",
"Comment": "v3.0.10", "Comment": "v3.0.12",
"Rev": "546c0f7ed65523c24390f0f26c7e4af2232f52d2" "Rev": "2d1e2e8e646e65f73ca0d9ee905a5343d6135a50"
}, },
{ {
"ImportPath": "github.com/coreos/etcd/pkg/idutil", "ImportPath": "github.com/coreos/etcd/pkg/idutil",
"Comment": "v3.0.10", "Comment": "v3.0.12",
"Rev": "546c0f7ed65523c24390f0f26c7e4af2232f52d2" "Rev": "2d1e2e8e646e65f73ca0d9ee905a5343d6135a50"
}, },
{ {
"ImportPath": "github.com/coreos/etcd/pkg/ioutil", "ImportPath": "github.com/coreos/etcd/pkg/ioutil",
"Comment": "v3.0.10", "Comment": "v3.0.12",
"Rev": "546c0f7ed65523c24390f0f26c7e4af2232f52d2" "Rev": "2d1e2e8e646e65f73ca0d9ee905a5343d6135a50"
}, },
{ {
"ImportPath": "github.com/coreos/etcd/pkg/logutil", "ImportPath": "github.com/coreos/etcd/pkg/logutil",
"Comment": "v3.0.10", "Comment": "v3.0.12",
"Rev": "546c0f7ed65523c24390f0f26c7e4af2232f52d2" "Rev": "2d1e2e8e646e65f73ca0d9ee905a5343d6135a50"
}, },
{ {
"ImportPath": "github.com/coreos/etcd/pkg/netutil", "ImportPath": "github.com/coreos/etcd/pkg/netutil",
"Comment": "v3.0.10", "Comment": "v3.0.12",
"Rev": "546c0f7ed65523c24390f0f26c7e4af2232f52d2" "Rev": "2d1e2e8e646e65f73ca0d9ee905a5343d6135a50"
}, },
{ {
"ImportPath": "github.com/coreos/etcd/pkg/pathutil", "ImportPath": "github.com/coreos/etcd/pkg/pathutil",
"Comment": "v3.0.10", "Comment": "v3.0.12",
"Rev": "546c0f7ed65523c24390f0f26c7e4af2232f52d2" "Rev": "2d1e2e8e646e65f73ca0d9ee905a5343d6135a50"
}, },
{ {
"ImportPath": "github.com/coreos/etcd/pkg/pbutil", "ImportPath": "github.com/coreos/etcd/pkg/pbutil",
"Comment": "v3.0.10", "Comment": "v3.0.12",
"Rev": "546c0f7ed65523c24390f0f26c7e4af2232f52d2" "Rev": "2d1e2e8e646e65f73ca0d9ee905a5343d6135a50"
}, },
{ {
"ImportPath": "github.com/coreos/etcd/pkg/runtime", "ImportPath": "github.com/coreos/etcd/pkg/runtime",
"Comment": "v3.0.10", "Comment": "v3.0.12",
"Rev": "546c0f7ed65523c24390f0f26c7e4af2232f52d2" "Rev": "2d1e2e8e646e65f73ca0d9ee905a5343d6135a50"
}, },
{ {
"ImportPath": "github.com/coreos/etcd/pkg/schedule", "ImportPath": "github.com/coreos/etcd/pkg/schedule",
"Comment": "v3.0.10", "Comment": "v3.0.12",
"Rev": "546c0f7ed65523c24390f0f26c7e4af2232f52d2" "Rev": "2d1e2e8e646e65f73ca0d9ee905a5343d6135a50"
}, },
{ {
"ImportPath": "github.com/coreos/etcd/pkg/testutil", "ImportPath": "github.com/coreos/etcd/pkg/testutil",
"Comment": "v3.0.10", "Comment": "v3.0.12",
"Rev": "546c0f7ed65523c24390f0f26c7e4af2232f52d2" "Rev": "2d1e2e8e646e65f73ca0d9ee905a5343d6135a50"
}, },
{ {
"ImportPath": "github.com/coreos/etcd/pkg/tlsutil", "ImportPath": "github.com/coreos/etcd/pkg/tlsutil",
"Comment": "v3.0.10", "Comment": "v3.0.12",
"Rev": "546c0f7ed65523c24390f0f26c7e4af2232f52d2" "Rev": "2d1e2e8e646e65f73ca0d9ee905a5343d6135a50"
}, },
{ {
"ImportPath": "github.com/coreos/etcd/pkg/transport", "ImportPath": "github.com/coreos/etcd/pkg/transport",
"Comment": "v3.0.10", "Comment": "v3.0.12",
"Rev": "546c0f7ed65523c24390f0f26c7e4af2232f52d2" "Rev": "2d1e2e8e646e65f73ca0d9ee905a5343d6135a50"
}, },
{ {
"ImportPath": "github.com/coreos/etcd/pkg/types", "ImportPath": "github.com/coreos/etcd/pkg/types",
"Comment": "v3.0.10", "Comment": "v3.0.12",
"Rev": "546c0f7ed65523c24390f0f26c7e4af2232f52d2" "Rev": "2d1e2e8e646e65f73ca0d9ee905a5343d6135a50"
}, },
{ {
"ImportPath": "github.com/coreos/etcd/pkg/wait", "ImportPath": "github.com/coreos/etcd/pkg/wait",
"Comment": "v3.0.10", "Comment": "v3.0.12",
"Rev": "546c0f7ed65523c24390f0f26c7e4af2232f52d2" "Rev": "2d1e2e8e646e65f73ca0d9ee905a5343d6135a50"
}, },
{ {
"ImportPath": "github.com/coreos/etcd/raft", "ImportPath": "github.com/coreos/etcd/raft",
"Comment": "v3.0.10", "Comment": "v3.0.12",
"Rev": "546c0f7ed65523c24390f0f26c7e4af2232f52d2" "Rev": "2d1e2e8e646e65f73ca0d9ee905a5343d6135a50"
}, },
{ {
"ImportPath": "github.com/coreos/etcd/raft/raftpb", "ImportPath": "github.com/coreos/etcd/raft/raftpb",
"Comment": "v3.0.10", "Comment": "v3.0.12",
"Rev": "546c0f7ed65523c24390f0f26c7e4af2232f52d2" "Rev": "2d1e2e8e646e65f73ca0d9ee905a5343d6135a50"
}, },
{ {
"ImportPath": "github.com/coreos/etcd/rafthttp", "ImportPath": "github.com/coreos/etcd/rafthttp",
"Comment": "v3.0.10", "Comment": "v3.0.12",
"Rev": "546c0f7ed65523c24390f0f26c7e4af2232f52d2" "Rev": "2d1e2e8e646e65f73ca0d9ee905a5343d6135a50"
}, },
{ {
"ImportPath": "github.com/coreos/etcd/snap", "ImportPath": "github.com/coreos/etcd/snap",
"Comment": "v3.0.10", "Comment": "v3.0.12",
"Rev": "546c0f7ed65523c24390f0f26c7e4af2232f52d2" "Rev": "2d1e2e8e646e65f73ca0d9ee905a5343d6135a50"
}, },
{ {
"ImportPath": "github.com/coreos/etcd/snap/snappb", "ImportPath": "github.com/coreos/etcd/snap/snappb",
"Comment": "v3.0.10", "Comment": "v3.0.12",
"Rev": "546c0f7ed65523c24390f0f26c7e4af2232f52d2" "Rev": "2d1e2e8e646e65f73ca0d9ee905a5343d6135a50"
}, },
{ {
"ImportPath": "github.com/coreos/etcd/store", "ImportPath": "github.com/coreos/etcd/store",
"Comment": "v3.0.10", "Comment": "v3.0.12",
"Rev": "546c0f7ed65523c24390f0f26c7e4af2232f52d2" "Rev": "2d1e2e8e646e65f73ca0d9ee905a5343d6135a50"
}, },
{ {
"ImportPath": "github.com/coreos/etcd/version", "ImportPath": "github.com/coreos/etcd/version",
"Comment": "v3.0.10", "Comment": "v3.0.12",
"Rev": "546c0f7ed65523c24390f0f26c7e4af2232f52d2" "Rev": "2d1e2e8e646e65f73ca0d9ee905a5343d6135a50"
}, },
{ {
"ImportPath": "github.com/coreos/etcd/wal", "ImportPath": "github.com/coreos/etcd/wal",
"Comment": "v3.0.10", "Comment": "v3.0.12",
"Rev": "546c0f7ed65523c24390f0f26c7e4af2232f52d2" "Rev": "2d1e2e8e646e65f73ca0d9ee905a5343d6135a50"
}, },
{ {
"ImportPath": "github.com/coreos/etcd/wal/walpb", "ImportPath": "github.com/coreos/etcd/wal/walpb",
"Comment": "v3.0.10", "Comment": "v3.0.12",
"Rev": "546c0f7ed65523c24390f0f26c7e4af2232f52d2" "Rev": "2d1e2e8e646e65f73ca0d9ee905a5343d6135a50"
}, },
{ {
"ImportPath": "github.com/coreos/go-oidc/http", "ImportPath": "github.com/coreos/go-oidc/http",
@ -2252,7 +2252,7 @@
}, },
{ {
"ImportPath": "github.com/xiang90/probing", "ImportPath": "github.com/xiang90/probing",
"Rev": "6a0cc1ae81b4cc11db5e491e030e4b98fba79c19" "Rev": "07dd2e8dfe18522e9c447ba95f2fe95262f63bb2"
}, },
{ {
"ImportPath": "github.com/xyproto/simpleredis", "ImportPath": "github.com/xyproto/simpleredis",

View File

@ -24,15 +24,17 @@ import (
type event struct { type event struct {
key string key string
value []byte value []byte
prevValue []byte
rev int64 rev int64
isDeleted bool isDeleted bool
isCreated bool isCreated bool
} }
func parseKV(kv *mvccpb.KeyValue) *event { func parseKV(kv *mvccpb.KeyValue, prevVal []byte) *event {
return &event{ return &event{
key: string(kv.Key), key: string(kv.Key),
value: kv.Value, value: kv.Value,
prevValue: prevVal,
rev: kv.ModRevision, rev: kv.ModRevision,
isDeleted: false, isDeleted: false,
isCreated: kv.ModRevision == kv.CreateRevision, isCreated: kv.ModRevision == kv.CreateRevision,
@ -40,11 +42,15 @@ func parseKV(kv *mvccpb.KeyValue) *event {
} }
func parseEvent(e *clientv3.Event) *event { func parseEvent(e *clientv3.Event) *event {
return &event{ ret := &event{
key: string(e.Kv.Key), key: string(e.Kv.Key),
value: e.Kv.Value, value: e.Kv.Value,
rev: e.Kv.ModRevision, rev: e.Kv.ModRevision,
isDeleted: e.Type == clientv3.EventTypeDelete, isDeleted: e.Type == clientv3.EventTypeDelete,
isCreated: e.IsCreate(), isCreated: e.IsCreate(),
} }
if e.PrevKv != nil {
ret.prevValue = e.PrevKv.Value
}
return ret
} }

View File

@ -154,7 +154,15 @@ func (wc *watchChan) sync() error {
wc.initialRev = getResp.Header.Revision wc.initialRev = getResp.Header.Revision
for _, kv := range getResp.Kvs { for _, kv := range getResp.Kvs {
wc.sendEvent(parseKV(kv)) prevResp, err := wc.watcher.client.Get(wc.ctx, string(kv.Key), clientv3.WithRev(kv.ModRevision-1))
if err != nil {
return err
}
var prevVal []byte
if len(prevResp.Kvs) > 0 {
prevVal = prevResp.Kvs[0].Value
}
wc.sendEvent(parseKV(kv, prevVal))
} }
return nil return nil
} }
@ -170,7 +178,7 @@ func (wc *watchChan) startWatching(watchClosedCh chan struct{}) {
return return
} }
} }
opts := []clientv3.OpOption{clientv3.WithRev(wc.initialRev + 1)} opts := []clientv3.OpOption{clientv3.WithRev(wc.initialRev + 1), clientv3.WithPrevKV()}
if wc.recursive { if wc.recursive {
opts = append(opts, clientv3.WithPrefix()) opts = append(opts, clientv3.WithPrefix())
} }
@ -331,16 +339,12 @@ func prepareObjs(ctx context.Context, e *event, client *clientv3.Client, codec r
return nil, nil, err return nil, nil, err
} }
} }
if e.isDeleted || !e.isCreated { if len(e.prevValue) > 0 {
getResp, err := client.Get(ctx, e.key, clientv3.WithRev(e.rev-1), clientv3.WithSerializable())
if err != nil {
return nil, nil, err
}
// Note that this sends the *old* object with the etcd revision for the time at // Note that this sends the *old* object with the etcd revision for the time at
// which it gets deleted. // which it gets deleted.
// We assume old object is returned only in Deleted event. Users (e.g. cacher) need // We assume old object is returned only in Deleted event. Users (e.g. cacher) need
// to have larger than previous rev to tell the ordering. // to have larger than previous rev to tell the ordering.
oldObj, err = decodeObj(codec, versioner, getResp.Kvs[0].Value, e.rev) oldObj, err = decodeObj(codec, versioner, e.prevValue, e.rev)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }

View File

@ -21,9 +21,9 @@ import (
proto "github.com/golang/protobuf/proto" proto "github.com/golang/protobuf/proto"
math "math" math "math"
)
import io "io" io "io"
)
// Reference imports to suppress errors if they are not otherwise used. // Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal var _ = proto.Marshal

View File

@ -157,14 +157,14 @@ func (kv *kv) do(ctx context.Context, op Op) (OpResponse, error) {
} }
case tPut: case tPut:
var resp *pb.PutResponse var resp *pb.PutResponse
r := &pb.PutRequest{Key: op.key, Value: op.val, Lease: int64(op.leaseID)} r := &pb.PutRequest{Key: op.key, Value: op.val, Lease: int64(op.leaseID), PrevKv: op.prevKV}
resp, err = kv.remote.Put(ctx, r) resp, err = kv.remote.Put(ctx, r)
if err == nil { if err == nil {
return OpResponse{put: (*PutResponse)(resp)}, nil return OpResponse{put: (*PutResponse)(resp)}, nil
} }
case tDeleteRange: case tDeleteRange:
var resp *pb.DeleteRangeResponse var resp *pb.DeleteRangeResponse
r := &pb.DeleteRangeRequest{Key: op.key, RangeEnd: op.end} r := &pb.DeleteRangeRequest{Key: op.key, RangeEnd: op.end, PrevKv: op.prevKV}
resp, err = kv.remote.DeleteRange(ctx, r) resp, err = kv.remote.DeleteRange(ctx, r)
if err == nil { if err == nil {
return OpResponse{del: (*DeleteResponse)(resp)}, nil return OpResponse{del: (*DeleteResponse)(resp)}, nil

View File

@ -47,6 +47,9 @@ type Op struct {
// for range, watch // for range, watch
rev int64 rev int64
// for watch, put, delete
prevKV bool
// progressNotify is for progress updates. // progressNotify is for progress updates.
progressNotify bool progressNotify bool
@ -73,10 +76,10 @@ func (op Op) toRequestOp() *pb.RequestOp {
} }
return &pb.RequestOp{Request: &pb.RequestOp_RequestRange{RequestRange: r}} return &pb.RequestOp{Request: &pb.RequestOp_RequestRange{RequestRange: r}}
case tPut: case tPut:
r := &pb.PutRequest{Key: op.key, Value: op.val, Lease: int64(op.leaseID)} r := &pb.PutRequest{Key: op.key, Value: op.val, Lease: int64(op.leaseID), PrevKv: op.prevKV}
return &pb.RequestOp{Request: &pb.RequestOp_RequestPut{RequestPut: r}} return &pb.RequestOp{Request: &pb.RequestOp_RequestPut{RequestPut: r}}
case tDeleteRange: case tDeleteRange:
r := &pb.DeleteRangeRequest{Key: op.key, RangeEnd: op.end} r := &pb.DeleteRangeRequest{Key: op.key, RangeEnd: op.end, PrevKv: op.prevKV}
return &pb.RequestOp{Request: &pb.RequestOp_RequestDeleteRange{RequestDeleteRange: r}} return &pb.RequestOp{Request: &pb.RequestOp_RequestDeleteRange{RequestDeleteRange: r}}
default: default:
panic("Unknown Op") panic("Unknown Op")
@ -271,3 +274,11 @@ func WithProgressNotify() OpOption {
op.progressNotify = true op.progressNotify = true
} }
} }
// WithPrevKV gets the previous key-value pair before the event happens. If the previous KV is already compacted,
// nothing will be returned.
func WithPrevKV() OpOption {
return func(op *Op) {
op.prevKV = true
}
}

View File

@ -61,6 +61,9 @@ type WatchResponse struct {
// the channel sends a final response that has Canceled set to true with a non-nil Err(). // the channel sends a final response that has Canceled set to true with a non-nil Err().
Canceled bool Canceled bool
// created is used to indicate the creation of the watcher.
created bool
closeErr error closeErr error
} }
@ -89,7 +92,7 @@ func (wr *WatchResponse) Err() error {
// IsProgressNotify returns true if the WatchResponse is progress notification. // IsProgressNotify returns true if the WatchResponse is progress notification.
func (wr *WatchResponse) IsProgressNotify() bool { func (wr *WatchResponse) IsProgressNotify() bool {
return len(wr.Events) == 0 && !wr.Canceled return len(wr.Events) == 0 && !wr.Canceled && !wr.created && wr.CompactRevision == 0 && wr.Header.Revision != 0
} }
// watcher implements the Watcher interface // watcher implements the Watcher interface
@ -102,6 +105,7 @@ type watcher struct {
streams map[string]*watchGrpcStream streams map[string]*watchGrpcStream
} }
// watchGrpcStream tracks all watch resources attached to a single grpc stream.
type watchGrpcStream struct { type watchGrpcStream struct {
owner *watcher owner *watcher
remote pb.WatchClient remote pb.WatchClient
@ -112,10 +116,10 @@ type watchGrpcStream struct {
ctxKey string ctxKey string
cancel context.CancelFunc cancel context.CancelFunc
// mu protects the streams map // substreams holds all active watchers on this grpc stream
mu sync.RWMutex substreams map[int64]*watcherStream
// streams holds all active watchers // resuming holds all resuming watchers on this grpc stream
streams map[int64]*watcherStream resuming []*watcherStream
// reqc sends a watch request from Watch() to the main goroutine // reqc sends a watch request from Watch() to the main goroutine
reqc chan *watchRequest reqc chan *watchRequest
@ -130,7 +134,9 @@ type watchGrpcStream struct {
// closingc gets the watcherStream of closing watchers // closingc gets the watcherStream of closing watchers
closingc chan *watcherStream closingc chan *watcherStream
// the error that closed the watch stream // resumec closes to signal that all substreams should begin resuming
resumec chan struct{}
// closeErr is the error that closed the watch stream
closeErr error closeErr error
} }
@ -142,6 +148,8 @@ type watchRequest struct {
rev int64 rev int64
// progressNotify is for progress updates. // progressNotify is for progress updates.
progressNotify bool progressNotify bool
// get the previous key-value pair before the event happens
prevKV bool
// retc receives a chan WatchResponse once the watcher is established // retc receives a chan WatchResponse once the watcher is established
retc chan chan WatchResponse retc chan chan WatchResponse
} }
@ -152,15 +160,18 @@ type watcherStream struct {
initReq watchRequest initReq watchRequest
// outc publishes watch responses to subscriber // outc publishes watch responses to subscriber
outc chan<- WatchResponse outc chan WatchResponse
// recvc buffers watch responses before publishing // recvc buffers watch responses before publishing
recvc chan *WatchResponse recvc chan *WatchResponse
id int64 // donec closes when the watcherStream goroutine stops.
donec chan struct{}
// closing is set to true when stream should be scheduled to shutdown.
closing bool
// id is the registered watch id on the grpc stream
id int64
// lastRev is revision last successfully sent over outc // buf holds all events received from etcd but not yet consumed by the client
lastRev int64 buf []*WatchResponse
// resumec indicates the stream must recover at a given revision
resumec chan int64
} }
func NewWatcher(c *Client) Watcher { func NewWatcher(c *Client) Watcher {
@ -184,12 +195,12 @@ func (vc *valCtx) Err() error { return nil }
func (w *watcher) newWatcherGrpcStream(inctx context.Context) *watchGrpcStream { func (w *watcher) newWatcherGrpcStream(inctx context.Context) *watchGrpcStream {
ctx, cancel := context.WithCancel(&valCtx{inctx}) ctx, cancel := context.WithCancel(&valCtx{inctx})
wgs := &watchGrpcStream{ wgs := &watchGrpcStream{
owner: w, owner: w,
remote: w.remote, remote: w.remote,
ctx: ctx, ctx: ctx,
ctxKey: fmt.Sprintf("%v", inctx), ctxKey: fmt.Sprintf("%v", inctx),
cancel: cancel, cancel: cancel,
streams: make(map[int64]*watcherStream), substreams: make(map[int64]*watcherStream),
respc: make(chan *pb.WatchResponse), respc: make(chan *pb.WatchResponse),
reqc: make(chan *watchRequest), reqc: make(chan *watchRequest),
@ -197,6 +208,7 @@ func (w *watcher) newWatcherGrpcStream(inctx context.Context) *watchGrpcStream {
donec: make(chan struct{}), donec: make(chan struct{}),
errc: make(chan error, 1), errc: make(chan error, 1),
closingc: make(chan *watcherStream), closingc: make(chan *watcherStream),
resumec: make(chan struct{}),
} }
go wgs.run() go wgs.run()
return wgs return wgs
@ -206,14 +218,14 @@ func (w *watcher) newWatcherGrpcStream(inctx context.Context) *watchGrpcStream {
func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) WatchChan { func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) WatchChan {
ow := opWatch(key, opts...) ow := opWatch(key, opts...)
retc := make(chan chan WatchResponse, 1)
wr := &watchRequest{ wr := &watchRequest{
ctx: ctx, ctx: ctx,
key: string(ow.key), key: string(ow.key),
end: string(ow.end), end: string(ow.end),
rev: ow.rev, rev: ow.rev,
progressNotify: ow.progressNotify, progressNotify: ow.progressNotify,
retc: retc, prevKV: ow.prevKV,
retc: make(chan chan WatchResponse, 1),
} }
ok := false ok := false
@ -257,7 +269,7 @@ func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) Watch
// receive channel // receive channel
if ok { if ok {
select { select {
case ret := <-retc: case ret := <-wr.retc:
return ret return ret
case <-ctx.Done(): case <-ctx.Done():
case <-donec: case <-donec:
@ -288,12 +300,7 @@ func (w *watcher) Close() (err error) {
} }
func (w *watchGrpcStream) Close() (err error) { func (w *watchGrpcStream) Close() (err error) {
w.mu.Lock() close(w.stopc)
if w.stopc != nil {
close(w.stopc)
w.stopc = nil
}
w.mu.Unlock()
<-w.donec <-w.donec
select { select {
case err = <-w.errc: case err = <-w.errc:
@ -302,71 +309,57 @@ func (w *watchGrpcStream) Close() (err error) {
return toErr(w.ctx, err) return toErr(w.ctx, err)
} }
func (w *watchGrpcStream) addStream(resp *pb.WatchResponse, pendingReq *watchRequest) { func (w *watcher) closeStream(wgs *watchGrpcStream) {
if pendingReq == nil {
// no pending request; ignore
return
}
if resp.Canceled || resp.CompactRevision != 0 {
// a cancel at id creation time means the start revision has
// been compacted out of the store
ret := make(chan WatchResponse, 1)
ret <- WatchResponse{
Header: *resp.Header,
CompactRevision: resp.CompactRevision,
Canceled: true}
close(ret)
pendingReq.retc <- ret
return
}
ret := make(chan WatchResponse)
if resp.WatchId == -1 {
// failed; no channel
close(ret)
pendingReq.retc <- ret
return
}
ws := &watcherStream{
initReq: *pendingReq,
id: resp.WatchId,
outc: ret,
// buffered so unlikely to block on sending while holding mu
recvc: make(chan *WatchResponse, 4),
resumec: make(chan int64),
}
if pendingReq.rev == 0 {
// note the header revision so that a put following a current watcher
// disconnect will arrive on the watcher channel after reconnect
ws.initReq.rev = resp.Header.Revision
}
w.mu.Lock() w.mu.Lock()
w.streams[ws.id] = ws close(wgs.donec)
wgs.cancel()
if w.streams != nil {
delete(w.streams, wgs.ctxKey)
}
w.mu.Unlock() w.mu.Unlock()
// pass back the subscriber channel for the watcher
pendingReq.retc <- ret
// send messages to subscriber
go w.serveStream(ws)
} }
func (w *watchGrpcStream) closeStream(ws *watcherStream) bool { func (w *watchGrpcStream) addSubstream(resp *pb.WatchResponse, ws *watcherStream) {
w.mu.Lock() if resp.WatchId == -1 {
// cancels request stream; subscriber receives nil channel // failed; no channel
close(ws.initReq.retc) close(ws.recvc)
// close subscriber's channel return
close(ws.outc) }
delete(w.streams, ws.id) ws.id = resp.WatchId
empty := len(w.streams) == 0 w.substreams[ws.id] = ws
if empty && w.stopc != nil { }
w.stopc = nil
func (w *watchGrpcStream) sendCloseSubstream(ws *watcherStream, resp *WatchResponse) {
select {
case ws.outc <- *resp:
case <-ws.initReq.ctx.Done():
case <-time.After(closeSendErrTimeout):
}
close(ws.outc)
}
func (w *watchGrpcStream) closeSubstream(ws *watcherStream) {
// send channel response in case stream was never established
select {
case ws.initReq.retc <- ws.outc:
default:
}
// close subscriber's channel
if closeErr := w.closeErr; closeErr != nil && ws.initReq.ctx.Err() == nil {
go w.sendCloseSubstream(ws, &WatchResponse{closeErr: w.closeErr})
} else {
close(ws.outc)
}
if ws.id != -1 {
delete(w.substreams, ws.id)
return
}
for i := range w.resuming {
if w.resuming[i] == ws {
w.resuming[i] = nil
return
}
} }
w.mu.Unlock()
return empty
} }
// run is the root of the goroutines for managing a watcher client // run is the root of the goroutines for managing a watcher client
@ -374,66 +367,79 @@ func (w *watchGrpcStream) run() {
var wc pb.Watch_WatchClient var wc pb.Watch_WatchClient
var closeErr error var closeErr error
defer func() { // substreams marked to close but goroutine still running; needed for
w.owner.mu.Lock() // avoiding double-closing recvc on grpc stream teardown
w.closeErr = closeErr closing := make(map[*watcherStream]struct{})
if w.owner.streams != nil {
delete(w.owner.streams, w.ctxKey)
}
close(w.donec)
w.owner.mu.Unlock()
w.cancel()
}()
// already stopped? defer func() {
w.mu.RLock() w.closeErr = closeErr
stopc := w.stopc // shutdown substreams and resuming substreams
w.mu.RUnlock() for _, ws := range w.substreams {
if stopc == nil { if _, ok := closing[ws]; !ok {
return close(ws.recvc)
} }
}
for _, ws := range w.resuming {
if _, ok := closing[ws]; ws != nil && !ok {
close(ws.recvc)
}
}
w.joinSubstreams()
for toClose := len(w.substreams) + len(w.resuming); toClose > 0; toClose-- {
w.closeSubstream(<-w.closingc)
}
w.owner.closeStream(w)
}()
// start a stream with the etcd grpc server // start a stream with the etcd grpc server
if wc, closeErr = w.newWatchClient(); closeErr != nil { if wc, closeErr = w.newWatchClient(); closeErr != nil {
return return
} }
var pendingReq, failedReq *watchRequest
curReqC := w.reqc
cancelSet := make(map[int64]struct{}) cancelSet := make(map[int64]struct{})
for { for {
select { select {
// Watch() requested // Watch() requested
case pendingReq = <-curReqC: case wreq := <-w.reqc:
// no more watch requests until there's a response outc := make(chan WatchResponse, 1)
curReqC = nil ws := &watcherStream{
if err := wc.Send(pendingReq.toPB()); err == nil { initReq: *wreq,
// pendingReq now waits on w.respc id: -1,
break outc: outc,
// unbufffered so resumes won't cause repeat events
recvc: make(chan *WatchResponse),
}
ws.donec = make(chan struct{})
go w.serveSubstream(ws, w.resumec)
// queue up for watcher creation/resume
w.resuming = append(w.resuming, ws)
if len(w.resuming) == 1 {
// head of resume queue, can register a new watcher
wc.Send(ws.initReq.toPB())
} }
failedReq = pendingReq
// New events from the watch client // New events from the watch client
case pbresp := <-w.respc: case pbresp := <-w.respc:
switch { switch {
case pbresp.Created: case pbresp.Created:
// response to pending req, try to add // response to head of queue creation
w.addStream(pbresp, pendingReq) if ws := w.resuming[0]; ws != nil {
pendingReq = nil w.addSubstream(pbresp, ws)
curReqC = w.reqc w.dispatchEvent(pbresp)
w.resuming[0] = nil
}
if ws := w.nextResume(); ws != nil {
wc.Send(ws.initReq.toPB())
}
case pbresp.Canceled: case pbresp.Canceled:
delete(cancelSet, pbresp.WatchId) delete(cancelSet, pbresp.WatchId)
// shutdown serveStream, if any if ws, ok := w.substreams[pbresp.WatchId]; ok {
w.mu.Lock() // signal to stream goroutine to update closingc
if ws, ok := w.streams[pbresp.WatchId]; ok {
close(ws.recvc) close(ws.recvc)
delete(w.streams, ws.id) closing[ws] = struct{}{}
}
numStreams := len(w.streams)
w.mu.Unlock()
if numStreams == 0 {
// don't leak watcher streams
return
} }
default: default:
// dispatch to appropriate watch stream // dispatch to appropriate watch stream
@ -454,7 +460,6 @@ func (w *watchGrpcStream) run() {
wc.Send(req) wc.Send(req)
} }
// watch client failed to recv; spawn another if possible // watch client failed to recv; spawn another if possible
// TODO report watch client errors from errc?
case err := <-w.errc: case err := <-w.errc:
if toErr(w.ctx, err) == v3rpc.ErrNoLeader { if toErr(w.ctx, err) == v3rpc.ErrNoLeader {
closeErr = err closeErr = err
@ -463,52 +468,58 @@ func (w *watchGrpcStream) run() {
if wc, closeErr = w.newWatchClient(); closeErr != nil { if wc, closeErr = w.newWatchClient(); closeErr != nil {
return return
} }
curReqC = w.reqc if ws := w.nextResume(); ws != nil {
if pendingReq != nil { wc.Send(ws.initReq.toPB())
failedReq = pendingReq
} }
cancelSet = make(map[int64]struct{}) cancelSet = make(map[int64]struct{})
case <-stopc: case <-w.stopc:
return return
case ws := <-w.closingc: case ws := <-w.closingc:
if w.closeStream(ws) { w.closeSubstream(ws)
delete(closing, ws)
if len(w.substreams)+len(w.resuming) == 0 {
// no more watchers on this stream, shutdown
return return
} }
} }
// send failed; queue for retry
if failedReq != nil {
go func(wr *watchRequest) {
select {
case w.reqc <- wr:
case <-wr.ctx.Done():
case <-w.donec:
}
}(pendingReq)
failedReq = nil
pendingReq = nil
}
} }
} }
// nextResume chooses the next resuming to register with the grpc stream. Abandoned
// streams are marked as nil in the queue since the head must wait for its inflight registration.
func (w *watchGrpcStream) nextResume() *watcherStream {
for len(w.resuming) != 0 {
if w.resuming[0] != nil {
return w.resuming[0]
}
w.resuming = w.resuming[1:len(w.resuming)]
}
return nil
}
// dispatchEvent sends a WatchResponse to the appropriate watcher stream // dispatchEvent sends a WatchResponse to the appropriate watcher stream
func (w *watchGrpcStream) dispatchEvent(pbresp *pb.WatchResponse) bool { func (w *watchGrpcStream) dispatchEvent(pbresp *pb.WatchResponse) bool {
w.mu.RLock() ws, ok := w.substreams[pbresp.WatchId]
defer w.mu.RUnlock() if !ok {
ws, ok := w.streams[pbresp.WatchId] return false
}
events := make([]*Event, len(pbresp.Events)) events := make([]*Event, len(pbresp.Events))
for i, ev := range pbresp.Events { for i, ev := range pbresp.Events {
events[i] = (*Event)(ev) events[i] = (*Event)(ev)
} }
if ok { wr := &WatchResponse{
wr := &WatchResponse{ Header: *pbresp.Header,
Header: *pbresp.Header, Events: events,
Events: events, CompactRevision: pbresp.CompactRevision,
CompactRevision: pbresp.CompactRevision, created: pbresp.Created,
Canceled: pbresp.Canceled} Canceled: pbresp.Canceled,
ws.recvc <- wr
} }
return ok select {
case ws.recvc <- wr:
case <-ws.donec:
return false
}
return true
} }
// serveWatchClient forwards messages from the grpc stream to run() // serveWatchClient forwards messages from the grpc stream to run()
@ -530,132 +541,123 @@ func (w *watchGrpcStream) serveWatchClient(wc pb.Watch_WatchClient) {
} }
} }
// serveStream forwards watch responses from run() to the subscriber // serveSubstream forwards watch responses from run() to the subscriber
func (w *watchGrpcStream) serveStream(ws *watcherStream) { func (w *watchGrpcStream) serveSubstream(ws *watcherStream, resumec chan struct{}) {
if ws.closing {
panic("created substream goroutine but substream is closing")
}
// nextRev is the minimum expected next revision
nextRev := ws.initReq.rev
resuming := false
defer func() { defer func() {
// signal that this watcherStream is finished if !resuming {
select { ws.closing = true
case w.closingc <- ws: }
case <-w.donec: close(ws.donec)
w.closeStream(ws) if !resuming {
w.closingc <- ws
} }
}() }()
var closeErr error
emptyWr := &WatchResponse{} emptyWr := &WatchResponse{}
wrs := []*WatchResponse{} for {
resuming := false
closing := false
for !closing {
curWr := emptyWr curWr := emptyWr
outc := ws.outc outc := ws.outc
if len(wrs) > 0 {
curWr = wrs[0] if len(ws.buf) > 0 && ws.buf[0].created {
select {
case ws.initReq.retc <- ws.outc:
default:
}
ws.buf = ws.buf[1:]
}
if len(ws.buf) > 0 {
curWr = ws.buf[0]
} else { } else {
outc = nil outc = nil
} }
select { select {
case outc <- *curWr: case outc <- *curWr:
if wrs[0].Err() != nil { if ws.buf[0].Err() != nil {
closing = true
break
}
var newRev int64
if len(wrs[0].Events) > 0 {
newRev = wrs[0].Events[len(wrs[0].Events)-1].Kv.ModRevision
} else {
newRev = wrs[0].Header.Revision
}
if newRev != ws.lastRev {
ws.lastRev = newRev
}
wrs[0] = nil
wrs = wrs[1:]
case wr, ok := <-ws.recvc:
if !ok {
// shutdown from closeStream
return return
} }
// resume up to last seen event if disconnected ws.buf[0] = nil
if resuming && wr.Err() == nil { ws.buf = ws.buf[1:]
resuming = false case wr, ok := <-ws.recvc:
// trim events already seen if !ok {
for i := 0; i < len(wr.Events); i++ { // shutdown from closeSubstream
if wr.Events[i].Kv.ModRevision > ws.lastRev { return
wr.Events = wr.Events[i:]
break
}
}
// only forward new events
if wr.Events[0].Kv.ModRevision == ws.lastRev {
break
}
} }
resuming = false // TODO pause channel if buffer gets too large
// TODO don't keep buffering if subscriber stops reading ws.buf = append(ws.buf, wr)
wrs = append(wrs, wr) nextRev = wr.Header.Revision
case resumeRev := <-ws.resumec: if len(wr.Events) > 0 {
wrs = nil nextRev = wr.Events[len(wr.Events)-1].Kv.ModRevision + 1
resuming = true
if resumeRev == -1 {
// pause serving stream while resume gets set up
break
} }
if resumeRev != ws.lastRev { ws.initReq.rev = nextRev
panic("unexpected resume revision")
}
case <-w.donec:
closing = true
closeErr = w.closeErr
case <-ws.initReq.ctx.Done(): case <-ws.initReq.ctx.Done():
closing = true return
case <-resumec:
resuming = true
return
} }
} }
// try to send off close error
if closeErr != nil {
select {
case ws.outc <- WatchResponse{closeErr: w.closeErr}:
case <-w.donec:
case <-time.After(closeSendErrTimeout):
}
}
// lazily send cancel message if events on missing id // lazily send cancel message if events on missing id
} }
func (w *watchGrpcStream) newWatchClient() (pb.Watch_WatchClient, error) { func (w *watchGrpcStream) newWatchClient() (pb.Watch_WatchClient, error) {
ws, rerr := w.resume() // connect to grpc stream
if rerr != nil { wc, err := w.openWatchClient()
return nil, rerr if err != nil {
return nil, v3rpc.Error(err)
} }
go w.serveWatchClient(ws) // mark all substreams as resuming
return ws, nil if len(w.substreams)+len(w.resuming) > 0 {
} close(w.resumec)
w.resumec = make(chan struct{})
// resume creates a new WatchClient with all current watchers reestablished w.joinSubstreams()
func (w *watchGrpcStream) resume() (ws pb.Watch_WatchClient, err error) { for _, ws := range w.substreams {
for { ws.id = -1
if ws, err = w.openWatchClient(); err != nil { w.resuming = append(w.resuming, ws)
break }
} else if err = w.resumeWatchers(ws); err == nil { for _, ws := range w.resuming {
break if ws == nil || ws.closing {
continue
}
ws.donec = make(chan struct{})
go w.serveSubstream(ws, w.resumec)
}
}
w.substreams = make(map[int64]*watcherStream)
// receive data from new grpc stream
go w.serveWatchClient(wc)
return wc, nil
}
// joinSubstream waits for all substream goroutines to complete
func (w *watchGrpcStream) joinSubstreams() {
for _, ws := range w.substreams {
<-ws.donec
}
for _, ws := range w.resuming {
if ws != nil {
<-ws.donec
} }
} }
return ws, v3rpc.Error(err)
} }
// openWatchClient retries opening a watchclient until retryConnection fails // openWatchClient retries opening a watchclient until retryConnection fails
func (w *watchGrpcStream) openWatchClient() (ws pb.Watch_WatchClient, err error) { func (w *watchGrpcStream) openWatchClient() (ws pb.Watch_WatchClient, err error) {
for { for {
w.mu.Lock() select {
stopc := w.stopc case <-w.stopc:
w.mu.Unlock()
if stopc == nil {
if err == nil { if err == nil {
err = context.Canceled return nil, context.Canceled
} }
return nil, err return nil, err
default:
} }
if ws, err = w.remote.Watch(w.ctx, grpc.FailFast(false)); ws != nil && err == nil { if ws, err = w.remote.Watch(w.ctx, grpc.FailFast(false)); ws != nil && err == nil {
break break
@ -667,63 +669,6 @@ func (w *watchGrpcStream) openWatchClient() (ws pb.Watch_WatchClient, err error)
return ws, nil return ws, nil
} }
// resumeWatchers rebuilds every registered watcher on a new client
func (w *watchGrpcStream) resumeWatchers(wc pb.Watch_WatchClient) error {
w.mu.RLock()
streams := make([]*watcherStream, 0, len(w.streams))
for _, ws := range w.streams {
streams = append(streams, ws)
}
w.mu.RUnlock()
for _, ws := range streams {
// drain recvc so no old WatchResponses (e.g., Created messages)
// are processed while resuming
ws.drain()
// pause serveStream
ws.resumec <- -1
// reconstruct watcher from initial request
if ws.lastRev != 0 {
ws.initReq.rev = ws.lastRev
}
if err := wc.Send(ws.initReq.toPB()); err != nil {
return err
}
// wait for request ack
resp, err := wc.Recv()
if err != nil {
return err
} else if len(resp.Events) != 0 || !resp.Created {
return fmt.Errorf("watcher: unexpected response (%+v)", resp)
}
// id may be different since new remote watcher; update map
w.mu.Lock()
delete(w.streams, ws.id)
ws.id = resp.WatchId
w.streams[ws.id] = ws
w.mu.Unlock()
// unpause serveStream
ws.resumec <- ws.lastRev
}
return nil
}
// drain removes all buffered WatchResponses from the stream's receive channel.
func (ws *watcherStream) drain() {
for {
select {
case <-ws.recvc:
default:
return
}
}
}
// toPB converts an internal watch request structure to its protobuf messagefunc (wr *watchRequest) // toPB converts an internal watch request structure to its protobuf messagefunc (wr *watchRequest)
func (wr *watchRequest) toPB() *pb.WatchRequest { func (wr *watchRequest) toPB() *pb.WatchRequest {
req := &pb.WatchCreateRequest{ req := &pb.WatchCreateRequest{
@ -731,6 +676,7 @@ func (wr *watchRequest) toPB() *pb.WatchRequest {
Key: []byte(wr.key), Key: []byte(wr.key),
RangeEnd: []byte(wr.end), RangeEnd: []byte(wr.end),
ProgressNotify: wr.progressNotify, ProgressNotify: wr.progressNotify,
PrevKv: wr.prevKV,
} }
cr := &pb.WatchRequest_CreateRequest{CreateRequest: req} cr := &pb.WatchRequest_CreateRequest{CreateRequest: req}
return &pb.WatchRequest{RequestUnion: cr} return &pb.WatchRequest{RequestUnion: cr}

View File

@ -32,7 +32,7 @@ type watchServer struct {
clusterID int64 clusterID int64
memberID int64 memberID int64
raftTimer etcdserver.RaftTimer raftTimer etcdserver.RaftTimer
watchable mvcc.Watchable watchable mvcc.WatchableKV
} }
func NewWatchServer(s *etcdserver.EtcdServer) pb.WatchServer { func NewWatchServer(s *etcdserver.EtcdServer) pb.WatchServer {
@ -82,6 +82,8 @@ type serverWatchStream struct {
memberID int64 memberID int64
raftTimer etcdserver.RaftTimer raftTimer etcdserver.RaftTimer
watchable mvcc.WatchableKV
gRPCStream pb.Watch_WatchServer gRPCStream pb.Watch_WatchServer
watchStream mvcc.WatchStream watchStream mvcc.WatchStream
ctrlStream chan *pb.WatchResponse ctrlStream chan *pb.WatchResponse
@ -91,6 +93,7 @@ type serverWatchStream struct {
// progress tracks the watchID that stream might need to send // progress tracks the watchID that stream might need to send
// progress to. // progress to.
progress map[mvcc.WatchID]bool progress map[mvcc.WatchID]bool
prevKV map[mvcc.WatchID]bool
// closec indicates the stream is closed. // closec indicates the stream is closed.
closec chan struct{} closec chan struct{}
@ -101,14 +104,18 @@ type serverWatchStream struct {
func (ws *watchServer) Watch(stream pb.Watch_WatchServer) (err error) { func (ws *watchServer) Watch(stream pb.Watch_WatchServer) (err error) {
sws := serverWatchStream{ sws := serverWatchStream{
clusterID: ws.clusterID, clusterID: ws.clusterID,
memberID: ws.memberID, memberID: ws.memberID,
raftTimer: ws.raftTimer, raftTimer: ws.raftTimer,
watchable: ws.watchable,
gRPCStream: stream, gRPCStream: stream,
watchStream: ws.watchable.NewWatchStream(), watchStream: ws.watchable.NewWatchStream(),
// chan for sending control response like watcher created and canceled. // chan for sending control response like watcher created and canceled.
ctrlStream: make(chan *pb.WatchResponse, ctrlStreamBufLen), ctrlStream: make(chan *pb.WatchResponse, ctrlStreamBufLen),
progress: make(map[mvcc.WatchID]bool), progress: make(map[mvcc.WatchID]bool),
prevKV: make(map[mvcc.WatchID]bool),
closec: make(chan struct{}), closec: make(chan struct{}),
} }
@ -170,9 +177,14 @@ func (sws *serverWatchStream) recvLoop() error {
rev = wsrev + 1 rev = wsrev + 1
} }
id := sws.watchStream.Watch(creq.Key, creq.RangeEnd, rev) id := sws.watchStream.Watch(creq.Key, creq.RangeEnd, rev)
if id != -1 && creq.ProgressNotify { if id != -1 {
sws.mu.Lock() sws.mu.Lock()
sws.progress[id] = true if creq.ProgressNotify {
sws.progress[id] = true
}
if creq.PrevKv {
sws.prevKV[id] = true
}
sws.mu.Unlock() sws.mu.Unlock()
} }
wr := &pb.WatchResponse{ wr := &pb.WatchResponse{
@ -198,6 +210,7 @@ func (sws *serverWatchStream) recvLoop() error {
} }
sws.mu.Lock() sws.mu.Lock()
delete(sws.progress, mvcc.WatchID(id)) delete(sws.progress, mvcc.WatchID(id))
delete(sws.prevKV, mvcc.WatchID(id))
sws.mu.Unlock() sws.mu.Unlock()
} }
} }
@ -244,8 +257,19 @@ func (sws *serverWatchStream) sendLoop() {
// or define protocol buffer with []mvccpb.Event. // or define protocol buffer with []mvccpb.Event.
evs := wresp.Events evs := wresp.Events
events := make([]*mvccpb.Event, len(evs)) events := make([]*mvccpb.Event, len(evs))
sws.mu.Lock()
needPrevKV := sws.prevKV[wresp.WatchID]
sws.mu.Unlock()
for i := range evs { for i := range evs {
events[i] = &evs[i] events[i] = &evs[i]
if needPrevKV {
opt := mvcc.RangeOptions{Rev: evs[i].Kv.ModRevision - 1}
r, err := sws.watchable.Range(evs[i].Kv.Key, nil, opt)
if err == nil && len(r.KVs) != 0 {
events[i].PrevKv = &(r.KVs[0])
}
}
} }
wr := &pb.WatchResponse{ wr := &pb.WatchResponse{

View File

@ -159,6 +159,22 @@ func (a *applierV3backend) Put(txnID int64, p *pb.PutRequest) (*pb.PutResponse,
rev int64 rev int64
err error err error
) )
var rr *mvcc.RangeResult
if p.PrevKv {
if txnID != noTxn {
rr, err = a.s.KV().TxnRange(txnID, p.Key, nil, mvcc.RangeOptions{})
if err != nil {
return nil, err
}
} else {
rr, err = a.s.KV().Range(p.Key, nil, mvcc.RangeOptions{})
if err != nil {
return nil, err
}
}
}
if txnID != noTxn { if txnID != noTxn {
rev, err = a.s.KV().TxnPut(txnID, p.Key, p.Value, lease.LeaseID(p.Lease)) rev, err = a.s.KV().TxnPut(txnID, p.Key, p.Value, lease.LeaseID(p.Lease))
if err != nil { if err != nil {
@ -174,6 +190,9 @@ func (a *applierV3backend) Put(txnID int64, p *pb.PutRequest) (*pb.PutResponse,
rev = a.s.KV().Put(p.Key, p.Value, leaseID) rev = a.s.KV().Put(p.Key, p.Value, leaseID)
} }
resp.Header.Revision = rev resp.Header.Revision = rev
if rr != nil && len(rr.KVs) != 0 {
resp.PrevKv = &rr.KVs[0]
}
return resp, nil return resp, nil
} }
@ -191,6 +210,21 @@ func (a *applierV3backend) DeleteRange(txnID int64, dr *pb.DeleteRangeRequest) (
dr.RangeEnd = []byte{} dr.RangeEnd = []byte{}
} }
var rr *mvcc.RangeResult
if dr.PrevKv {
if txnID != noTxn {
rr, err = a.s.KV().TxnRange(txnID, dr.Key, dr.RangeEnd, mvcc.RangeOptions{})
if err != nil {
return nil, err
}
} else {
rr, err = a.s.KV().Range(dr.Key, dr.RangeEnd, mvcc.RangeOptions{})
if err != nil {
return nil, err
}
}
}
if txnID != noTxn { if txnID != noTxn {
n, rev, err = a.s.KV().TxnDeleteRange(txnID, dr.Key, dr.RangeEnd) n, rev, err = a.s.KV().TxnDeleteRange(txnID, dr.Key, dr.RangeEnd)
if err != nil { if err != nil {
@ -201,6 +235,11 @@ func (a *applierV3backend) DeleteRange(txnID int64, dr *pb.DeleteRangeRequest) (
} }
resp.Deleted = n resp.Deleted = n
if rr != nil {
for i := range rr.KVs {
resp.PrevKvs = append(resp.PrevKvs, &rr.KVs[i])
}
}
resp.Header.Revision = rev resp.Header.Revision = rev
return resp, nil return resp, nil
} }

View File

@ -56,6 +56,9 @@ func (aa *authApplierV3) Put(txnID int64, r *pb.PutRequest) (*pb.PutResponse, er
if !aa.as.IsPutPermitted(aa.user, r.Key) { if !aa.as.IsPutPermitted(aa.user, r.Key) {
return nil, auth.ErrPermissionDenied return nil, auth.ErrPermissionDenied
} }
if r.PrevKv && !aa.as.IsRangePermitted(aa.user, r.Key, nil) {
return nil, auth.ErrPermissionDenied
}
return aa.applierV3.Put(txnID, r) return aa.applierV3.Put(txnID, r)
} }
@ -70,6 +73,9 @@ func (aa *authApplierV3) DeleteRange(txnID int64, r *pb.DeleteRangeRequest) (*pb
if !aa.as.IsDeleteRangePermitted(aa.user, r.Key, r.RangeEnd) { if !aa.as.IsDeleteRangePermitted(aa.user, r.Key, r.RangeEnd) {
return nil, auth.ErrPermissionDenied return nil, auth.ErrPermissionDenied
} }
if r.PrevKv && !aa.as.IsRangePermitted(aa.user, r.Key, r.RangeEnd) {
return nil, auth.ErrPermissionDenied
}
return aa.applierV3.DeleteRange(txnID, r) return aa.applierV3.DeleteRange(txnID, r)
} }
@ -99,7 +105,7 @@ func (aa *authApplierV3) checkTxnReqsPermission(reqs []*pb.RequestOp) bool {
continue continue
} }
if !aa.as.IsDeleteRangePermitted(aa.user, tv.RequestDeleteRange.Key, tv.RequestDeleteRange.RangeEnd) { if tv.RequestDeleteRange.PrevKv && !aa.as.IsRangePermitted(aa.user, tv.RequestDeleteRange.Key, tv.RequestDeleteRange.RangeEnd) {
return false return false
} }
} }

View File

@ -102,9 +102,9 @@ import (
proto "github.com/golang/protobuf/proto" proto "github.com/golang/protobuf/proto"
math "math" math "math"
)
import io "io" io "io"
)
// Reference imports to suppress errors if they are not otherwise used. // Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal var _ = proto.Marshal

View File

@ -10,9 +10,9 @@ import (
proto "github.com/golang/protobuf/proto" proto "github.com/golang/protobuf/proto"
math "math" math "math"
)
import io "io" io "io"
)
// Reference imports to suppress errors if they are not otherwise used. // Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal var _ = proto.Marshal

File diff suppressed because it is too large Load Diff

View File

@ -396,10 +396,16 @@ message PutRequest {
// lease is the lease ID to associate with the key in the key-value store. A lease // lease is the lease ID to associate with the key in the key-value store. A lease
// value of 0 indicates no lease. // value of 0 indicates no lease.
int64 lease = 3; int64 lease = 3;
// If prev_kv is set, etcd gets the previous key-value pair before changing it.
// The previous key-value pair will be returned in the put response.
bool prev_kv = 4;
} }
message PutResponse { message PutResponse {
ResponseHeader header = 1; ResponseHeader header = 1;
// if prev_kv is set in the request, the previous key-value pair will be returned.
mvccpb.KeyValue prev_kv = 2;
} }
message DeleteRangeRequest { message DeleteRangeRequest {
@ -409,12 +415,17 @@ message DeleteRangeRequest {
// If range_end is not given, the range is defined to contain only the key argument. // If range_end is not given, the range is defined to contain only the key argument.
// If range_end is '\0', the range is all keys greater than or equal to the key argument. // If range_end is '\0', the range is all keys greater than or equal to the key argument.
bytes range_end = 2; bytes range_end = 2;
// If prev_kv is set, etcd gets the previous key-value pairs before deleting it.
// The previous key-value pairs will be returned in the delte response.
bool prev_kv = 3;
} }
message DeleteRangeResponse { message DeleteRangeResponse {
ResponseHeader header = 1; ResponseHeader header = 1;
// deleted is the number of keys deleted by the delete range request. // deleted is the number of keys deleted by the delete range request.
int64 deleted = 2; int64 deleted = 2;
// if prev_kv is set in the request, the previous key-value pairs will be returned.
repeated mvccpb.KeyValue prev_kvs = 3;
} }
message RequestOp { message RequestOp {
@ -563,6 +574,9 @@ message WatchCreateRequest {
// wish to recover a disconnected watcher starting from a recent known revision. // wish to recover a disconnected watcher starting from a recent known revision.
// The etcd server may decide how often it will send notifications based on current load. // The etcd server may decide how often it will send notifications based on current load.
bool progress_notify = 4; bool progress_notify = 4;
// If prev_kv is set, created watcher gets the previous KV before the event happens.
// If the previous KV is already compacted, nothing will be returned.
bool prev_kv = 6;
} }
message WatchCancelRequest { message WatchCancelRequest {

View File

@ -551,4 +551,4 @@ func (s *EtcdServer) processInternalRaftRequest(ctx context.Context, r pb.Intern
} }
// Watchable returns a watchable interface attached to the etcdserver. // Watchable returns a watchable interface attached to the etcdserver.
func (s *EtcdServer) Watchable() mvcc.Watchable { return s.KV() } func (s *EtcdServer) Watchable() mvcc.WatchableKV { return s.KV() }

View File

@ -19,9 +19,9 @@ import (
proto "github.com/golang/protobuf/proto" proto "github.com/golang/protobuf/proto"
math "math" math "math"
)
import io "io" io "io"
)
// Reference imports to suppress errors if they are not otherwise used. // Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal var _ = proto.Marshal

View File

@ -20,9 +20,9 @@ import (
proto "github.com/golang/protobuf/proto" proto "github.com/golang/protobuf/proto"
math "math" math "math"
)
import io "io" io "io"
)
// Reference imports to suppress errors if they are not otherwise used. // Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal var _ = proto.Marshal
@ -89,6 +89,8 @@ type Event struct {
// A DELETE/EXPIRE event contains the deleted key with // A DELETE/EXPIRE event contains the deleted key with
// its modification revision set to the revision of deletion. // its modification revision set to the revision of deletion.
Kv *KeyValue `protobuf:"bytes,2,opt,name=kv" json:"kv,omitempty"` Kv *KeyValue `protobuf:"bytes,2,opt,name=kv" json:"kv,omitempty"`
// prev_kv holds the key-value pair before the event happens.
PrevKv *KeyValue `protobuf:"bytes,3,opt,name=prev_kv,json=prevKv" json:"prev_kv,omitempty"`
} }
func (m *Event) Reset() { *m = Event{} } func (m *Event) Reset() { *m = Event{} }
@ -181,6 +183,16 @@ func (m *Event) MarshalTo(data []byte) (int, error) {
} }
i += n1 i += n1
} }
if m.PrevKv != nil {
data[i] = 0x1a
i++
i = encodeVarintKv(data, i, uint64(m.PrevKv.Size()))
n2, err := m.PrevKv.MarshalTo(data[i:])
if err != nil {
return 0, err
}
i += n2
}
return i, nil return i, nil
} }
@ -247,6 +259,10 @@ func (m *Event) Size() (n int) {
l = m.Kv.Size() l = m.Kv.Size()
n += 1 + l + sovKv(uint64(l)) n += 1 + l + sovKv(uint64(l))
} }
if m.PrevKv != nil {
l = m.PrevKv.Size()
n += 1 + l + sovKv(uint64(l))
}
return n return n
} }
@ -532,6 +548,39 @@ func (m *Event) Unmarshal(data []byte) error {
return err return err
} }
iNdEx = postIndex iNdEx = postIndex
case 3:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field PrevKv", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowKv
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := data[iNdEx]
iNdEx++
msglen |= (int(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthKv
}
postIndex := iNdEx + msglen
if postIndex > l {
return io.ErrUnexpectedEOF
}
if m.PrevKv == nil {
m.PrevKv = &KeyValue{}
}
if err := m.PrevKv.Unmarshal(data[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex
default: default:
iNdEx = preIndex iNdEx = preIndex
skippy, err := skipKv(data[iNdEx:]) skippy, err := skipKv(data[iNdEx:])
@ -659,23 +708,24 @@ var (
) )
var fileDescriptorKv = []byte{ var fileDescriptorKv = []byte{
// 285 bytes of a gzipped FileDescriptorProto // 303 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x44, 0x90, 0x41, 0x4a, 0xc3, 0x40, 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x6c, 0x90, 0x41, 0x4e, 0xc2, 0x40,
0x14, 0x86, 0x33, 0x4d, 0x9b, 0xd6, 0xd7, 0x52, 0xc3, 0x50, 0x70, 0x70, 0x31, 0xc4, 0x6e, 0x2c, 0x14, 0x86, 0x3b, 0x14, 0x0a, 0x3e, 0x08, 0x36, 0x13, 0x12, 0x27, 0x2e, 0x26, 0x95, 0x8d, 0x18,
0x08, 0x11, 0xea, 0x0d, 0xc4, 0xac, 0x74, 0x21, 0x21, 0xba, 0x95, 0x34, 0x7d, 0x94, 0x92, 0xa6, 0x13, 0x4c, 0xf0, 0x06, 0xc6, 0xae, 0x70, 0x61, 0x1a, 0x74, 0x4b, 0x4a, 0x79, 0x21, 0xa4, 0x94,
0x13, 0xd2, 0x38, 0x98, 0x9b, 0x78, 0x0a, 0xcf, 0xd1, 0x65, 0x8f, 0x60, 0xe3, 0x45, 0x24, 0x6f, 0x69, 0x4a, 0x9d, 0xa4, 0x37, 0x71, 0xef, 0xde, 0x73, 0xb0, 0xe4, 0x08, 0x52, 0x2f, 0x62, 0xfa,
0x4c, 0xdd, 0x0c, 0xef, 0xff, 0xff, 0x6f, 0x98, 0xff, 0x0d, 0x0c, 0x52, 0xed, 0xe7, 0x85, 0x2a, 0xc6, 0xe2, 0xc6, 0xcd, 0xe4, 0xfd, 0xff, 0xff, 0x65, 0xe6, 0x7f, 0x03, 0x9d, 0x58, 0x8f, 0xd3,
0x15, 0x77, 0x32, 0x9d, 0x24, 0xf9, 0xe2, 0x72, 0xb2, 0x52, 0x2b, 0x45, 0xd6, 0x6d, 0x33, 0x99, 0x4c, 0xe5, 0x8a, 0x3b, 0x89, 0x8e, 0xa2, 0x74, 0x71, 0x39, 0x58, 0xa9, 0x95, 0x22, 0xeb, 0xae,
0x74, 0xfa, 0xc5, 0x60, 0xf0, 0x88, 0xd5, 0x6b, 0xbc, 0x79, 0x47, 0xee, 0x82, 0x9d, 0x62, 0x25, 0x9a, 0x4c, 0x3a, 0xfc, 0x64, 0xd0, 0x99, 0x62, 0xf1, 0x1a, 0x6e, 0xde, 0x90, 0xbb, 0x60, 0xc7,
0x98, 0xc7, 0x66, 0xa3, 0xb0, 0x19, 0xf9, 0x35, 0x9c, 0x27, 0x05, 0xc6, 0x25, 0xbe, 0x15, 0xa8, 0x58, 0x08, 0xe6, 0xb1, 0x51, 0x2f, 0xa8, 0x46, 0x7e, 0x0d, 0xe7, 0x51, 0x86, 0x61, 0x8e, 0xf3,
0xd7, 0xbb, 0xb5, 0xda, 0x8a, 0x8e, 0xc7, 0x66, 0x76, 0x38, 0x36, 0x76, 0xf8, 0xe7, 0xf2, 0x2b, 0x0c, 0xf5, 0x7a, 0xb7, 0x56, 0x5b, 0xd1, 0xf0, 0xd8, 0xc8, 0x0e, 0xfa, 0xc6, 0x0e, 0x7e, 0x5d,
0x18, 0x65, 0x6a, 0xf9, 0x4f, 0xd9, 0x44, 0x0d, 0x33, 0xb5, 0x3c, 0x21, 0x02, 0xfa, 0x1a, 0x0b, 0x7e, 0x05, 0xbd, 0x44, 0x2d, 0xff, 0x28, 0x9b, 0xa8, 0x6e, 0xa2, 0x96, 0x27, 0x44, 0x40, 0x5b,
0x4a, 0xbb, 0x94, 0xb6, 0x92, 0x4f, 0xa0, 0xa7, 0x9b, 0x02, 0xa2, 0x47, 0x2f, 0x1b, 0xd1, 0xb8, 0x63, 0x46, 0x69, 0x93, 0xd2, 0x5a, 0xf2, 0x01, 0xb4, 0x74, 0x55, 0x40, 0xb4, 0xe8, 0x65, 0x23,
0x1b, 0x8c, 0x77, 0x28, 0x1c, 0xa2, 0x8d, 0x98, 0x7e, 0x40, 0x2f, 0xd0, 0xb8, 0x2d, 0xf9, 0x0d, 0x2a, 0x77, 0x83, 0xe1, 0x0e, 0x85, 0x43, 0xb4, 0x11, 0xc3, 0x0f, 0x06, 0x2d, 0x5f, 0xe3, 0x36,
0x74, 0xcb, 0x2a, 0x47, 0x6a, 0x3b, 0x9e, 0x5f, 0xf8, 0x66, 0x4d, 0x9f, 0x42, 0x73, 0x46, 0x55, 0xe7, 0xb7, 0xd0, 0xcc, 0x8b, 0x14, 0xa9, 0x6e, 0x7f, 0x72, 0x31, 0x36, 0x7b, 0x8e, 0x29, 0x34,
0x8e, 0x21, 0x41, 0xdc, 0x83, 0x4e, 0xaa, 0xa9, 0xfa, 0x70, 0xee, 0xb6, 0x68, 0xbb, 0x77, 0xd8, 0xe7, 0xac, 0x48, 0x31, 0x20, 0x88, 0x7b, 0xd0, 0x88, 0x35, 0x75, 0xef, 0x4e, 0xdc, 0x1a, 0xad,
0x49, 0xf5, 0xd4, 0x83, 0xb3, 0xd3, 0x25, 0xde, 0x07, 0xfb, 0xf9, 0x25, 0x72, 0x2d, 0x0e, 0xe0, 0x17, 0x0f, 0x1a, 0xb1, 0xe6, 0x37, 0xd0, 0x4e, 0x33, 0xd4, 0xf3, 0x58, 0x53, 0xf9, 0xff, 0x30,
0x3c, 0x04, 0x4f, 0x41, 0x14, 0xb8, 0xec, 0x5e, 0xec, 0x8f, 0xd2, 0x3a, 0x1c, 0xa5, 0xb5, 0xaf, 0xa7, 0x02, 0xa6, 0x7a, 0xe8, 0xc1, 0xd9, 0xe9, 0x7e, 0xde, 0x06, 0xfb, 0xf9, 0x65, 0xe6, 0x5a,
0x25, 0x3b, 0xd4, 0x92, 0x7d, 0xd7, 0x92, 0x7d, 0xfe, 0x48, 0x6b, 0xe1, 0xd0, 0x5f, 0xde, 0xfd, 0x1c, 0xc0, 0x79, 0xf4, 0x9f, 0xfc, 0x99, 0xef, 0xb2, 0x07, 0xb1, 0x3f, 0x4a, 0xeb, 0x70, 0x94,
0x06, 0x00, 0x00, 0xff, 0xff, 0xd6, 0x21, 0x8f, 0x2c, 0x75, 0x01, 0x00, 0x00, 0xd6, 0xbe, 0x94, 0xec, 0x50, 0x4a, 0xf6, 0x55, 0x4a, 0xf6, 0xfe, 0x2d, 0xad, 0x85, 0x43, 0xff,
0x7e, 0xff, 0x13, 0x00, 0x00, 0xff, 0xff, 0xb5, 0x45, 0x92, 0x5d, 0xa1, 0x01, 0x00, 0x00,
} }

View File

@ -43,4 +43,6 @@ message Event {
// A DELETE/EXPIRE event contains the deleted key with // A DELETE/EXPIRE event contains the deleted key with
// its modification revision set to the revision of deletion. // its modification revision set to the revision of deletion.
KeyValue kv = 2; KeyValue kv = 2;
// prev_kv holds the key-value pair before the event happens.
KeyValue prev_kv = 3;
} }

View File

@ -38,9 +38,12 @@ type PageWriter struct {
bufWatermarkBytes int bufWatermarkBytes int
} }
func NewPageWriter(w io.Writer, pageBytes int) *PageWriter { // NewPageWriter creates a new PageWriter. pageBytes is the number of bytes
// to write per page. pageOffset is the starting offset of io.Writer.
func NewPageWriter(w io.Writer, pageBytes, pageOffset int) *PageWriter {
return &PageWriter{ return &PageWriter{
w: w, w: w,
pageOffset: pageOffset,
pageBytes: pageBytes, pageBytes: pageBytes,
buf: make([]byte, defaultBufferBytes+pageBytes), buf: make([]byte, defaultBufferBytes+pageBytes),
bufWatermarkBytes: defaultBufferBytes, bufWatermarkBytes: defaultBufferBytes,

View File

@ -25,9 +25,9 @@ import (
proto "github.com/golang/protobuf/proto" proto "github.com/golang/protobuf/proto"
math "math" math "math"
)
import io "io" io "io"
)
// Reference imports to suppress errors if they are not otherwise used. // Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal var _ = proto.Marshal

View File

@ -19,9 +19,9 @@ import (
proto "github.com/golang/protobuf/proto" proto "github.com/golang/protobuf/proto"
math "math" math "math"
)
import io "io" io "io"
)
// Reference imports to suppress errors if they are not otherwise used. // Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal var _ = proto.Marshal

View File

@ -29,7 +29,7 @@ import (
var ( var (
// MinClusterVersion is the min cluster version this etcd binary is compatible with. // MinClusterVersion is the min cluster version this etcd binary is compatible with.
MinClusterVersion = "2.3.0" MinClusterVersion = "2.3.0"
Version = "3.0.10" Version = "3.0.12"
// Git SHA Value will be set during build // Git SHA Value will be set during build
GitSHA = "Not provided (use ./build instead of go build)" GitSHA = "Not provided (use ./build instead of go build)"

View File

@ -18,6 +18,7 @@ import (
"encoding/binary" "encoding/binary"
"hash" "hash"
"io" "io"
"os"
"sync" "sync"
"github.com/coreos/etcd/pkg/crc" "github.com/coreos/etcd/pkg/crc"
@ -39,9 +40,9 @@ type encoder struct {
uint64buf []byte uint64buf []byte
} }
func newEncoder(w io.Writer, prevCrc uint32) *encoder { func newEncoder(w io.Writer, prevCrc uint32, pageOffset int) *encoder {
return &encoder{ return &encoder{
bw: ioutil.NewPageWriter(w, walPageBytes), bw: ioutil.NewPageWriter(w, walPageBytes, pageOffset),
crc: crc.New(prevCrc, crcTable), crc: crc.New(prevCrc, crcTable),
// 1MB buffer // 1MB buffer
buf: make([]byte, 1024*1024), buf: make([]byte, 1024*1024),
@ -49,6 +50,15 @@ func newEncoder(w io.Writer, prevCrc uint32) *encoder {
} }
} }
// newFileEncoder creates a new encoder with current file offset for the page writer.
func newFileEncoder(f *os.File, prevCrc uint32) (*encoder, error) {
offset, err := f.Seek(0, os.SEEK_CUR)
if err != nil {
return nil, err
}
return newEncoder(f, prevCrc, int(offset)), nil
}
func (e *encoder) encode(rec *walpb.Record) error { func (e *encoder) encode(rec *walpb.Record) error {
e.mu.Lock() e.mu.Lock()
defer e.mu.Unlock() defer e.mu.Unlock()

View File

@ -120,7 +120,10 @@ func Create(dirpath string, metadata []byte) (*WAL, error) {
w := &WAL{ w := &WAL{
dir: dirpath, dir: dirpath,
metadata: metadata, metadata: metadata,
encoder: newEncoder(f, 0), }
w.encoder, err = newFileEncoder(f.File, 0)
if err != nil {
return nil, err
} }
w.locks = append(w.locks, f) w.locks = append(w.locks, f)
if err = w.saveCrc(0); err != nil { if err = w.saveCrc(0); err != nil {
@ -341,7 +344,10 @@ func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb.
if w.tail() != nil { if w.tail() != nil {
// create encoder (chain crc with the decoder), enable appending // create encoder (chain crc with the decoder), enable appending
w.encoder = newEncoder(w.tail(), w.decoder.lastCRC()) w.encoder, err = newFileEncoder(w.tail().File, w.decoder.lastCRC())
if err != nil {
return
}
} }
w.decoder = nil w.decoder = nil
@ -375,7 +381,10 @@ func (w *WAL) cut() error {
// update writer and save the previous crc // update writer and save the previous crc
w.locks = append(w.locks, newTail) w.locks = append(w.locks, newTail)
prevCrc := w.encoder.crc.Sum32() prevCrc := w.encoder.crc.Sum32()
w.encoder = newEncoder(w.tail(), prevCrc) w.encoder, err = newFileEncoder(w.tail().File, prevCrc)
if err != nil {
return err
}
if err = w.saveCrc(prevCrc); err != nil { if err = w.saveCrc(prevCrc); err != nil {
return err return err
} }
@ -414,7 +423,10 @@ func (w *WAL) cut() error {
w.locks[len(w.locks)-1] = newTail w.locks[len(w.locks)-1] = newTail
prevCrc = w.encoder.crc.Sum32() prevCrc = w.encoder.crc.Sum32()
w.encoder = newEncoder(w.tail(), prevCrc) w.encoder, err = newFileEncoder(w.tail().File, prevCrc)
if err != nil {
return err
}
plog.Infof("segmented wal file %v is created", fpath) plog.Infof("segmented wal file %v is created", fpath)
return nil return nil

View File

@ -20,9 +20,9 @@ import (
proto "github.com/golang/protobuf/proto" proto "github.com/golang/protobuf/proto"
math "math" math "math"
)
import io "io" io "io"
)
// Reference imports to suppress errors if they are not otherwise used. // Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal var _ = proto.Marshal

View File

@ -61,7 +61,7 @@ func (p *prober) AddHTTP(id string, probingInterval time.Duration, endpoints []s
} }
resp, err := p.tr.RoundTrip(req) resp, err := p.tr.RoundTrip(req)
if err != nil { if err != nil {
s.recordFailure() s.recordFailure(err)
pinned = (pinned + 1) % len(endpoints) pinned = (pinned + 1) % len(endpoints)
continue continue
} }
@ -71,7 +71,7 @@ func (p *prober) AddHTTP(id string, probingInterval time.Duration, endpoints []s
err = d.Decode(&hh) err = d.Decode(&hh)
resp.Body.Close() resp.Body.Close()
if err != nil || !hh.OK { if err != nil || !hh.OK {
s.recordFailure() s.recordFailure(err)
pinned = (pinned + 1) % len(endpoints) pinned = (pinned + 1) % len(endpoints)
continue continue
} }

View File

@ -14,6 +14,7 @@ type Status interface {
Total() int64 Total() int64
Loss() int64 Loss() int64
Health() bool Health() bool
Err() error
// Estimated smoothed round trip time // Estimated smoothed round trip time
SRTT() time.Duration SRTT() time.Duration
// Estimated clock difference // Estimated clock difference
@ -27,6 +28,7 @@ type status struct {
total int64 total int64
loss int64 loss int64
health bool health bool
err error
clockdiff time.Duration clockdiff time.Duration
stopC chan struct{} stopC chan struct{}
} }
@ -56,6 +58,12 @@ func (s *status) Health() bool {
return s.health return s.health
} }
func (s *status) Err() error {
s.mu.Lock()
defer s.mu.Unlock()
return s.err
}
func (s *status) ClockDiff() time.Duration { func (s *status) ClockDiff() time.Duration {
s.mu.Lock() s.mu.Lock()
defer s.mu.Unlock() defer s.mu.Unlock()
@ -74,15 +82,17 @@ func (s *status) record(rtt time.Duration, when time.Time) {
s.health = true s.health = true
s.srtt = time.Duration((1-α)*float64(s.srtt) + α*float64(rtt)) s.srtt = time.Duration((1-α)*float64(s.srtt) + α*float64(rtt))
s.clockdiff = time.Now().Sub(when) - s.srtt/2 s.clockdiff = time.Now().Sub(when) - s.srtt/2
s.err = nil
} }
func (s *status) recordFailure() { func (s *status) recordFailure(err error) {
s.mu.Lock() s.mu.Lock()
defer s.mu.Unlock() defer s.mu.Unlock()
s.total++ s.total++
s.health = false s.health = false
s.loss += 1 s.loss += 1
s.err = err
} }
func (s *status) reset() { func (s *status) reset() {
@ -91,6 +101,8 @@ func (s *status) reset() {
s.srtt = 0 s.srtt = 0
s.total = 0 s.total = 0
s.loss = 0
s.health = false s.health = false
s.clockdiff = 0 s.clockdiff = 0
s.err = nil
} }