diff --git a/pkg/storage/etcd3/event.go b/pkg/storage/etcd3/event.go index 585c371b5dc..4746dafb421 100644 --- a/pkg/storage/etcd3/event.go +++ b/pkg/storage/etcd3/event.go @@ -24,15 +24,17 @@ import ( type event struct { key string value []byte + prevValue []byte rev int64 isDeleted bool isCreated bool } -func parseKV(kv *mvccpb.KeyValue) *event { +func parseKV(kv *mvccpb.KeyValue, prevVal []byte) *event { return &event{ key: string(kv.Key), value: kv.Value, + prevValue: prevVal, rev: kv.ModRevision, isDeleted: false, isCreated: kv.ModRevision == kv.CreateRevision, @@ -40,11 +42,15 @@ func parseKV(kv *mvccpb.KeyValue) *event { } func parseEvent(e *clientv3.Event) *event { - return &event{ + ret := &event{ key: string(e.Kv.Key), value: e.Kv.Value, rev: e.Kv.ModRevision, isDeleted: e.Type == clientv3.EventTypeDelete, isCreated: e.IsCreate(), } + if e.PrevKv != nil { + ret.prevValue = e.PrevKv.Value + } + return ret } diff --git a/pkg/storage/etcd3/watcher.go b/pkg/storage/etcd3/watcher.go index 1a6bbbf2464..3c41b29e240 100644 --- a/pkg/storage/etcd3/watcher.go +++ b/pkg/storage/etcd3/watcher.go @@ -154,7 +154,15 @@ func (wc *watchChan) sync() error { wc.initialRev = getResp.Header.Revision for _, kv := range getResp.Kvs { - wc.sendEvent(parseKV(kv)) + prevResp, err := wc.watcher.client.Get(wc.ctx, string(kv.Key), clientv3.WithRev(kv.ModRevision-1)) + if err != nil { + return err + } + var prevVal []byte + if len(prevResp.Kvs) > 0 { + prevVal = prevResp.Kvs[0].Value + } + wc.sendEvent(parseKV(kv, prevVal)) } return nil } @@ -170,7 +178,7 @@ func (wc *watchChan) startWatching(watchClosedCh chan struct{}) { return } } - opts := []clientv3.OpOption{clientv3.WithRev(wc.initialRev + 1)} + opts := []clientv3.OpOption{clientv3.WithRev(wc.initialRev + 1), clientv3.WithPrevKV()} if wc.recursive { opts = append(opts, clientv3.WithPrefix()) } @@ -331,16 +339,12 @@ func prepareObjs(ctx context.Context, e *event, client *clientv3.Client, codec r return nil, nil, err } } - if e.isDeleted || !e.isCreated { - getResp, err := client.Get(ctx, e.key, clientv3.WithRev(e.rev-1), clientv3.WithSerializable()) - if err != nil { - return nil, nil, err - } + if len(e.prevValue) > 0 { // Note that this sends the *old* object with the etcd revision for the time at // which it gets deleted. // We assume old object is returned only in Deleted event. Users (e.g. cacher) need // to have larger than previous rev to tell the ordering. - oldObj, err = decodeObj(codec, versioner, getResp.Kvs[0].Value, e.rev) + oldObj, err = decodeObj(codec, versioner, e.prevValue, e.rev) if err != nil { return nil, nil, err }