From 34db128af91dea51a5e9bab9b6f215df29079466 Mon Sep 17 00:00:00 2001 From: Hongchao Deng Date: Sun, 4 Dec 2016 18:37:07 -0800 Subject: [PATCH 1/2] etcd2: watch on 0 to return ADDED always --- pkg/storage/etcd/etcd_watcher.go | 5 ++--- pkg/storage/etcd/etcd_watcher_test.go | 22 +++++++++++++++++----- 2 files changed, 19 insertions(+), 8 deletions(-) diff --git a/pkg/storage/etcd/etcd_watcher.go b/pkg/storage/etcd/etcd_watcher.go index 17dd3ea7602..9d1547174b9 100644 --- a/pkg/storage/etcd/etcd_watcher.go +++ b/pkg/storage/etcd/etcd_watcher.go @@ -359,9 +359,6 @@ func (w *etcdWatcher) sendAdd(res *etcd.Response) { return } action := watch.Added - if res.Node.ModifiedIndex != res.Node.CreatedIndex { - action = watch.Modified - } w.emit(watch.Event{ Type: action, Object: obj, @@ -454,6 +451,8 @@ func (w *etcdWatcher) sendDelete(res *etcd.Response) { func (w *etcdWatcher) sendResult(res *etcd.Response) { switch res.Action { case EtcdCreate, EtcdGet: + // "Get" will only happen in watch 0 case, where we explicitly want ADDED event + // for initial state. w.sendAdd(res) case EtcdSet, EtcdCAS: w.sendModify(res) diff --git a/pkg/storage/etcd/etcd_watcher_test.go b/pkg/storage/etcd/etcd_watcher_test.go index ce05a3920e4..5c6ef11240e 100644 --- a/pkg/storage/etcd/etcd_watcher_test.go +++ b/pkg/storage/etcd/etcd_watcher_test.go @@ -412,6 +412,18 @@ func TestWatchFromZeroIndex(t *testing.T) { } pod.ResourceVersion = "" + watching, err := h.Watch(context.TODO(), key, "0", storage.Everything) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + // The create trigger ADDED event when watching from 0 + event := <-watching.ResultChan() + watching.Stop() + if event.Type != watch.Added { + t.Errorf("Unexpected event %#v", event) + } + // check for concatenation on watch event with CAS updateFn := func(input runtime.Object, res storage.ResponseMeta) (runtime.Object, *uint64, error) { pod := input.(*api.Pod) @@ -423,15 +435,15 @@ func TestWatchFromZeroIndex(t *testing.T) { t.Fatalf("Unexpected error: %v", err) } - watching, err := h.Watch(context.TODO(), key, "0", storage.Everything) + watching, err = h.Watch(context.TODO(), key, "0", storage.Everything) if err != nil { t.Fatalf("Unexpected error: %v", err) } defer watching.Stop() - // marked as modified b/c of concatenation - event := <-watching.ResultChan() - if event.Type != watch.Modified { + // because we watch from 0, first event that we receive will always be ADDED + event = <-watching.ResultChan() + if event.Type != watch.Added { t.Errorf("Unexpected event %#v", event) } @@ -451,7 +463,7 @@ func TestWatchFromZeroIndex(t *testing.T) { t.Errorf("Unexpected event %#v", event) } - if e, a := pod, event.Object; !api.Semantic.DeepDerivative(e, a) { + if e, a := pod, event.Object; a == nil || !api.Semantic.DeepDerivative(e, a) { t.Errorf("Unexpected error: expected %#v, got %#v", e, a) } } From 62c0e5abc3fbe5d8a708d39d25ba539e85ab07e1 Mon Sep 17 00:00:00 2001 From: Hongchao Deng Date: Mon, 14 Nov 2016 21:35:56 -0800 Subject: [PATCH 2/2] pkg/storage: docs on watch 0 behavior --- pkg/storage/interfaces.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pkg/storage/interfaces.go b/pkg/storage/interfaces.go index 3ce105301aa..801bbecddeb 100644 --- a/pkg/storage/interfaces.go +++ b/pkg/storage/interfaces.go @@ -112,6 +112,8 @@ type Interface interface { // resourceVersion may be used to specify what version to begin watching, // which should be the current resourceVersion, and no longer rv+1 // (e.g. reconnecting without missing any updates). + // If resource version is "0", this interface will get current object at given key + // and send it in an "ADDED" event, before watch starts. Watch(ctx context.Context, key string, resourceVersion string, p SelectionPredicate) (watch.Interface, error) // WatchList begins watching the specified key's items. Items are decoded into API @@ -119,6 +121,8 @@ type Interface interface { // resourceVersion may be used to specify what version to begin watching, // which should be the current resourceVersion, and no longer rv+1 // (e.g. reconnecting without missing any updates). + // If resource version is "0", this interface will list current objects directory defined by key + // and send them in "ADDED" events, before watch starts. WatchList(ctx context.Context, key string, resourceVersion string, p SelectionPredicate) (watch.Interface, error) // Get unmarshals json found at key into objPtr. On a not found error, will either