diff --git a/pkg/storage/etcd3/watcher.go b/pkg/storage/etcd3/watcher.go index abff8a50061..1e13d59b584 100644 --- a/pkg/storage/etcd3/watcher.go +++ b/pkg/storage/etcd3/watcher.go @@ -322,7 +322,11 @@ func prepareObjs(ctx context.Context, e *event, client *clientv3.Client, codec r if err != nil { return nil, nil, err } - oldObj, err = decodeObj(codec, versioner, getResp.Kvs[0].Value, getResp.Kvs[0].ModRevision) + // Note that this sends the *old* object with the etcd revision for the time at + // which it gets deleted. + // We assume old object is returned only in Deleted event. Users (e.g. cacher) need + // to have larger than previous rev to tell the ordering. + oldObj, err = decodeObj(codec, versioner, getResp.Kvs[0].Value, e.rev) if err != nil { return nil, nil, err } diff --git a/pkg/storage/etcd3/watcher_test.go b/pkg/storage/etcd3/watcher_test.go index 3288c6047b1..330048cf060 100644 --- a/pkg/storage/etcd3/watcher_test.go +++ b/pkg/storage/etcd3/watcher_test.go @@ -20,11 +20,11 @@ import ( "errors" "fmt" "reflect" + "sync" "testing" "time" - "sync" - + "github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/integration" "golang.org/x/net/context" "k8s.io/kubernetes/pkg/api" @@ -123,7 +123,7 @@ func TestDeleteTriggerWatch(t *testing.T) { if err := store.Delete(ctx, key, &api.Pod{}, nil); err != nil { t.Fatalf("Delete failed: %v", err) } - testCheckResult(t, 0, watch.Deleted, w, storedObj) + testCheckResult(t, 0, watch.Deleted, w, nil) } // TestWatchSync tests that @@ -213,6 +213,36 @@ func TestWatchErrResultNotBlockAfterCancel(t *testing.T) { wg.Wait() } +func TestWatchDeleteEventObjectShouldHaveLatestRV(t *testing.T) { + ctx, store, cluster := testSetup(t) + defer cluster.Terminate(t) + key, storedObj := testPropogateStore(t, store, ctx, &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}) + + w, err := store.Watch(ctx, key, storedObj.ResourceVersion, storage.Everything) + if err != nil { + t.Fatalf("Watch failed: %v", err) + } + etcdW := cluster.RandClient().Watch(ctx, "/", clientv3.WithPrefix()) + + if err := store.Delete(ctx, key, &api.Pod{}, &storage.Preconditions{}); err != nil { + t.Fatalf("Delete failed: %v", err) + } + + e := <-w.ResultChan() + watchedDeleteObj := e.Object.(*api.Pod) + var wres clientv3.WatchResponse + wres = <-etcdW + + watchedDeleteRev, err := storage.ParseWatchResourceVersion(watchedDeleteObj.ResourceVersion) + if err != nil { + t.Fatalf("ParseWatchResourceVersion failed: %v", err) + } + if int64(watchedDeleteRev) != wres.Events[0].Kv.ModRevision { + t.Errorf("Object from delete event have version: %v, should be the same as etcd delete's mod rev: %d", + watchedDeleteRev, wres.Events[0].Kv.ModRevision) + } +} + type testWatchStruct struct { obj *api.Pod expectEvent bool