From da7e9783e8f4793d4d0dd54dcc68f86bb23df3fb Mon Sep 17 00:00:00 2001 From: Hongchao Deng Date: Mon, 9 May 2016 22:11:04 -0700 Subject: [PATCH 1/2] etcd3/watcher: Event.Object should have the same rev as etcd delete instead of previous object's revision. --- pkg/storage/etcd3/watcher.go | 6 +++++- pkg/storage/etcd3/watcher_test.go | 36 ++++++++++++++++++++++++++++--- 2 files changed, 38 insertions(+), 4 deletions(-) diff --git a/pkg/storage/etcd3/watcher.go b/pkg/storage/etcd3/watcher.go index abff8a50061..1e13d59b584 100644 --- a/pkg/storage/etcd3/watcher.go +++ b/pkg/storage/etcd3/watcher.go @@ -322,7 +322,11 @@ func prepareObjs(ctx context.Context, e *event, client *clientv3.Client, codec r if err != nil { return nil, nil, err } - oldObj, err = decodeObj(codec, versioner, getResp.Kvs[0].Value, getResp.Kvs[0].ModRevision) + // Note that this sends the *old* object with the etcd revision for the time at + // which it gets deleted. + // We assume old object is returned only in Deleted event. Users (e.g. cacher) need + // to have larger than previous rev to tell the ordering. + oldObj, err = decodeObj(codec, versioner, getResp.Kvs[0].Value, e.rev) if err != nil { return nil, nil, err } diff --git a/pkg/storage/etcd3/watcher_test.go b/pkg/storage/etcd3/watcher_test.go index 3288c6047b1..330048cf060 100644 --- a/pkg/storage/etcd3/watcher_test.go +++ b/pkg/storage/etcd3/watcher_test.go @@ -20,11 +20,11 @@ import ( "errors" "fmt" "reflect" + "sync" "testing" "time" - "sync" - + "github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/integration" "golang.org/x/net/context" "k8s.io/kubernetes/pkg/api" @@ -123,7 +123,7 @@ func TestDeleteTriggerWatch(t *testing.T) { if err := store.Delete(ctx, key, &api.Pod{}, nil); err != nil { t.Fatalf("Delete failed: %v", err) } - testCheckResult(t, 0, watch.Deleted, w, storedObj) + testCheckResult(t, 0, watch.Deleted, w, nil) } // TestWatchSync tests that @@ -213,6 +213,36 @@ func TestWatchErrResultNotBlockAfterCancel(t *testing.T) { wg.Wait() } +func TestWatchDeleteEventObjectShouldHaveLatestRV(t *testing.T) { + ctx, store, cluster := testSetup(t) + defer cluster.Terminate(t) + key, storedObj := testPropogateStore(t, store, ctx, &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}) + + w, err := store.Watch(ctx, key, storedObj.ResourceVersion, storage.Everything) + if err != nil { + t.Fatalf("Watch failed: %v", err) + } + etcdW := cluster.RandClient().Watch(ctx, "/", clientv3.WithPrefix()) + + if err := store.Delete(ctx, key, &api.Pod{}, &storage.Preconditions{}); err != nil { + t.Fatalf("Delete failed: %v", err) + } + + e := <-w.ResultChan() + watchedDeleteObj := e.Object.(*api.Pod) + var wres clientv3.WatchResponse + wres = <-etcdW + + watchedDeleteRev, err := storage.ParseWatchResourceVersion(watchedDeleteObj.ResourceVersion) + if err != nil { + t.Fatalf("ParseWatchResourceVersion failed: %v", err) + } + if int64(watchedDeleteRev) != wres.Events[0].Kv.ModRevision { + t.Errorf("Object from delete event have version: %v, should be the same as etcd delete's mod rev: %d", + watchedDeleteRev, wres.Events[0].Kv.ModRevision) + } +} + type testWatchStruct struct { obj *api.Pod expectEvent bool From cd3f7f41c128f0f25a2490b9894246267727a697 Mon Sep 17 00:00:00 2001 From: Hongchao Deng Date: Tue, 10 May 2016 11:02:29 -0700 Subject: [PATCH 2/2] etcd3/watcher: refactor test --- pkg/storage/etcd3/store_test.go | 4 ++-- pkg/storage/etcd3/watcher_test.go | 31 +++++++++++++++++++++++++------ 2 files changed, 27 insertions(+), 8 deletions(-) diff --git a/pkg/storage/etcd3/store_test.go b/pkg/storage/etcd3/store_test.go index 4da97f60265..2fd686d2ee6 100644 --- a/pkg/storage/etcd3/store_test.go +++ b/pkg/storage/etcd3/store_test.go @@ -88,7 +88,7 @@ func TestCreateWithTTL(t *testing.T) { if err != nil { t.Fatalf("Watch failed: %v", err) } - testCheckResult(t, 0, watch.Deleted, w, nil) + testCheckEventType(t, watch.Deleted, w) } func TestCreateWithKeyExist(t *testing.T) { @@ -396,7 +396,7 @@ func TestGuaranteedUpdateWithTTL(t *testing.T) { if err != nil { t.Fatalf("Watch failed: %v", err) } - testCheckResult(t, 0, watch.Deleted, w, nil) + testCheckEventType(t, watch.Deleted, w) } func TestGuaranteedUpdateWithConflict(t *testing.T) { diff --git a/pkg/storage/etcd3/watcher_test.go b/pkg/storage/etcd3/watcher_test.go index 330048cf060..aafef5792e4 100644 --- a/pkg/storage/etcd3/watcher_test.go +++ b/pkg/storage/etcd3/watcher_test.go @@ -46,7 +46,8 @@ func TestWatchList(t *testing.T) { // It tests that // - first occurrence of objects should notify Add event -// - +// - update should trigger Modified event +// - update that gets filtered should trigger Deleted event func testWatch(t *testing.T, recursive bool) { ctx, store, cluster := testSetup(t) defer cluster.Terminate(t) @@ -90,6 +91,7 @@ func testWatch(t *testing.T, recursive bool) { if err != nil { t.Fatalf("Watch failed: %v", err) } + var prevObj *api.Pod for _, watchTest := range tt.watchTests { out := &api.Pod{} key := tt.key @@ -104,8 +106,14 @@ func testWatch(t *testing.T, recursive bool) { t.Fatalf("GuaranteedUpdate failed: %v", err) } if watchTest.expectEvent { - testCheckResult(t, i, watchTest.watchType, w, nil) + expectObj := out + if watchTest.watchType == watch.Deleted { + expectObj = prevObj + expectObj.ResourceVersion = out.ResourceVersion + } + testCheckResult(t, i, watchTest.watchType, w, expectObj) } + prevObj = out } w.Stop() testCheckStop(t, i, w) @@ -123,7 +131,7 @@ func TestDeleteTriggerWatch(t *testing.T) { if err := store.Delete(ctx, key, &api.Pod{}, nil); err != nil { t.Fatalf("Delete failed: %v", err) } - testCheckResult(t, 0, watch.Deleted, w, nil) + testCheckEventType(t, watch.Deleted, w) } // TestWatchSync tests that @@ -168,7 +176,7 @@ func TestWatchError(t *testing.T) { func(runtime.Object) (runtime.Object, error) { return &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}, nil })) - testCheckResult(t, 0, watch.Error, w, nil) + testCheckEventType(t, watch.Error, w) } func TestWatchContextCancel(t *testing.T) { @@ -213,7 +221,7 @@ func TestWatchErrResultNotBlockAfterCancel(t *testing.T) { wg.Wait() } -func TestWatchDeleteEventObjectShouldHaveLatestRV(t *testing.T) { +func TestWatchDeleteEventObjectHaveLatestRV(t *testing.T) { ctx, store, cluster := testSetup(t) defer cluster.Terminate(t) key, storedObj := testPropogateStore(t, store, ctx, &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}) @@ -257,6 +265,17 @@ func (c *testCodec) Decode(data []byte, defaults *unversioned.GroupVersionKind, return nil, nil, errors.New("Expected decoding failure") } +func testCheckEventType(t *testing.T, expectEventType watch.EventType, w watch.Interface) { + select { + case res := <-w.ResultChan(): + if res.Type != expectEventType { + t.Errorf("event type want=%v, get=%v", expectEventType, res.Type) + } + case <-time.After(wait.ForeverTestTimeout): + t.Errorf("time out after waiting %v on ResultChan", wait.ForeverTestTimeout) + } +} + func testCheckResult(t *testing.T, i int, expectEventType watch.EventType, w watch.Interface, expectObj *api.Pod) { select { case res := <-w.ResultChan(): @@ -264,7 +283,7 @@ func testCheckResult(t *testing.T, i int, expectEventType watch.EventType, w wat t.Errorf("#%d: event type want=%v, get=%v", i, expectEventType, res.Type) return } - if expectObj != nil && !reflect.DeepEqual(expectObj, res.Object) { + if !reflect.DeepEqual(expectObj, res.Object) { t.Errorf("#%d: obj want=\n%#v\nget=\n%#v", i, expectObj, res.Object) } case <-time.After(wait.ForeverTestTimeout):