diff --git a/pkg/storage/etcd/etcd_watcher.go b/pkg/storage/etcd/etcd_watcher.go index 17dd3ea7602..9d1547174b9 100644 --- a/pkg/storage/etcd/etcd_watcher.go +++ b/pkg/storage/etcd/etcd_watcher.go @@ -359,9 +359,6 @@ func (w *etcdWatcher) sendAdd(res *etcd.Response) { return } action := watch.Added - if res.Node.ModifiedIndex != res.Node.CreatedIndex { - action = watch.Modified - } w.emit(watch.Event{ Type: action, Object: obj, @@ -454,6 +451,8 @@ func (w *etcdWatcher) sendDelete(res *etcd.Response) { func (w *etcdWatcher) sendResult(res *etcd.Response) { switch res.Action { case EtcdCreate, EtcdGet: + // "Get" will only happen in watch 0 case, where we explicitly want ADDED event + // for initial state. w.sendAdd(res) case EtcdSet, EtcdCAS: w.sendModify(res) diff --git a/pkg/storage/etcd/etcd_watcher_test.go b/pkg/storage/etcd/etcd_watcher_test.go index ce05a3920e4..5c6ef11240e 100644 --- a/pkg/storage/etcd/etcd_watcher_test.go +++ b/pkg/storage/etcd/etcd_watcher_test.go @@ -412,6 +412,18 @@ func TestWatchFromZeroIndex(t *testing.T) { } pod.ResourceVersion = "" + watching, err := h.Watch(context.TODO(), key, "0", storage.Everything) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + // The create trigger ADDED event when watching from 0 + event := <-watching.ResultChan() + watching.Stop() + if event.Type != watch.Added { + t.Errorf("Unexpected event %#v", event) + } + // check for concatenation on watch event with CAS updateFn := func(input runtime.Object, res storage.ResponseMeta) (runtime.Object, *uint64, error) { pod := input.(*api.Pod) @@ -423,15 +435,15 @@ func TestWatchFromZeroIndex(t *testing.T) { t.Fatalf("Unexpected error: %v", err) } - watching, err := h.Watch(context.TODO(), key, "0", storage.Everything) + watching, err = h.Watch(context.TODO(), key, "0", storage.Everything) if err != nil { t.Fatalf("Unexpected error: %v", err) } defer watching.Stop() - // marked as modified b/c of concatenation - event := <-watching.ResultChan() - if event.Type != watch.Modified { + // because we watch from 0, first event that we receive will always be ADDED + event = <-watching.ResultChan() + if event.Type != watch.Added { t.Errorf("Unexpected event %#v", event) } @@ -451,7 +463,7 @@ func TestWatchFromZeroIndex(t *testing.T) { t.Errorf("Unexpected event %#v", event) } - if e, a := pod, event.Object; !api.Semantic.DeepDerivative(e, a) { + if e, a := pod, event.Object; a == nil || !api.Semantic.DeepDerivative(e, a) { t.Errorf("Unexpected error: expected %#v, got %#v", e, a) } } diff --git a/pkg/storage/interfaces.go b/pkg/storage/interfaces.go index 3ce105301aa..801bbecddeb 100644 --- a/pkg/storage/interfaces.go +++ b/pkg/storage/interfaces.go @@ -112,6 +112,8 @@ type Interface interface { // resourceVersion may be used to specify what version to begin watching, // which should be the current resourceVersion, and no longer rv+1 // (e.g. reconnecting without missing any updates). + // If resource version is "0", this interface will get current object at given key + // and send it in an "ADDED" event, before watch starts. Watch(ctx context.Context, key string, resourceVersion string, p SelectionPredicate) (watch.Interface, error) // WatchList begins watching the specified key's items. Items are decoded into API @@ -119,6 +121,8 @@ type Interface interface { // resourceVersion may be used to specify what version to begin watching, // which should be the current resourceVersion, and no longer rv+1 // (e.g. reconnecting without missing any updates). + // If resource version is "0", this interface will list current objects directory defined by key + // and send them in "ADDED" events, before watch starts. WatchList(ctx context.Context, key string, resourceVersion string, p SelectionPredicate) (watch.Interface, error) // Get unmarshals json found at key into objPtr. On a not found error, will either