diff --git a/pkg/tools/etcd_tools_watch.go b/pkg/tools/etcd_tools_watch.go index f11c3dcd2a6..738a8294c5c 100644 --- a/pkg/tools/etcd_tools_watch.go +++ b/pkg/tools/etcd_tools_watch.go @@ -126,7 +126,7 @@ func (w *etcdWatcher) etcdWatch(client EtcdGetSet, key string, resourceVersion u if !ok { return } - resourceVersion = latest + resourceVersion = latest + 1 } _, err := client.Watch(key, resourceVersion, w.list, w.etcdIncoming, w.etcdStop) if err != etcd.ErrWatchStoppedByUser { @@ -146,11 +146,11 @@ func etcdGetInitialWatchState(client EtcdGetSet, key string, recursive bool, inc return } if index, ok := etcdErrorIndex(err); ok { - resourceVersion = index + 1 + resourceVersion = index } return } - resourceVersion = resp.EtcdIndex + 1 + resourceVersion = resp.EtcdIndex convertRecursiveResponse(resp.Node, resp, incoming) return } @@ -217,7 +217,7 @@ func (w *etcdWatcher) decodeObject(data []byte, index uint64) (interface{}, erro return obj, nil } -func (w *etcdWatcher) sendCreate(res *etcd.Response) { +func (w *etcdWatcher) sendAdd(res *etcd.Response) { if res.Node == nil { glog.Errorf("unexpected nil node: %#v", res) return @@ -296,7 +296,14 @@ func (w *etcdWatcher) sendDelete(res *etcd.Response) { return } data := []byte(res.PrevNode.Value) - obj, err := w.decodeObject(data, res.PrevNode.ModifiedIndex) + index := res.PrevNode.ModifiedIndex + if res.Node != nil { + // Note that this sends the *old* object with the etcd index for the time at + // which it gets deleted. This will allow users to restart the watch at the right + // index. + index = res.Node.ModifiedIndex + } + obj, err := w.decodeObject(data, index) if err != nil { glog.Errorf("failure to decode api object: '%v' from %#v %#v", string(data), res, res.PrevNode) // TODO: expose an error through watch.Interface? @@ -316,7 +323,7 @@ func (w *etcdWatcher) sendDelete(res *etcd.Response) { func (w *etcdWatcher) sendResult(res *etcd.Response) { switch res.Action { case "create", "get": - w.sendCreate(res) + w.sendAdd(res) case "set", "compareAndSwap": w.sendModify(res) case "delete": diff --git a/test/integration/etcd_tools_test.go b/test/integration/etcd_tools_test.go index 0a4c1e497e4..1e0b4eb9cbf 100644 --- a/test/integration/etcd_tools_test.go +++ b/test/integration/etcd_tools_test.go @@ -125,7 +125,7 @@ func TestWatch(t *testing.T) { expectedVersion = resp.Node.ModifiedIndex event = <-w.ResultChan() if event.Type != watch.Deleted { - t.Fatalf("expected deleted event", event) + t.Errorf("expected deleted event %#v", event) } pod = event.Object.(*api.Pod) if pod.ResourceVersion != expectedVersion {