From ce36431954a0e012a60a3dfa535903d662f69b1c Mon Sep 17 00:00:00 2001 From: Clayton Coleman Date: Thu, 15 Jan 2015 16:00:49 -0500 Subject: [PATCH] WatchList should not convey events for the root key It's possible to watch /foo/bar, get events for /foo/bar/baz, and then if someone deletes /foo/bar you'll get an event for /foo/bar without a value. This results in an error being printed in the logs because /foo/bar has value "" and we can't decode that. This commit excludes the parent directory from watch events for now. --- pkg/tools/etcd_tools_watch.go | 31 ++++++++++++++--- pkg/tools/etcd_tools_watch_test.go | 53 +++++++++++++++++++++++++++--- 2 files changed, 75 insertions(+), 9 deletions(-) diff --git a/pkg/tools/etcd_tools_watch.go b/pkg/tools/etcd_tools_watch.go index eab0b1753c1..6e5027acbb8 100644 --- a/pkg/tools/etcd_tools_watch.go +++ b/pkg/tools/etcd_tools_watch.go @@ -61,7 +61,7 @@ func ParseWatchResourceVersion(resourceVersion, kind string) (uint64, error) { // watch.Interface. resourceVersion may be used to specify what version to begin // watching (e.g., for reconnecting without missing any updates). func (h *EtcdHelper) WatchList(key string, resourceVersion uint64, filter FilterFunc) (watch.Interface, error) { - w := newEtcdWatcher(true, filter, h.Codec, h.ResourceVersioner, nil) + w := newEtcdWatcher(true, exceptKey(key), filter, h.Codec, h.ResourceVersioner, nil) go w.etcdWatch(h.Client, key, resourceVersion) return w, nil } @@ -90,7 +90,7 @@ func (h *EtcdHelper) Watch(key string, resourceVersion uint64) watch.Interface { // // Errors will be sent down the channel. func (h *EtcdHelper) WatchAndTransform(key string, resourceVersion uint64, transform TransformFunc) watch.Interface { - w := newEtcdWatcher(false, Everything, h.Codec, h.ResourceVersioner, transform) + w := newEtcdWatcher(false, nil, Everything, h.Codec, h.ResourceVersioner, transform) go w.etcdWatch(h.Client, key, resourceVersion) return w } @@ -98,14 +98,25 @@ func (h *EtcdHelper) WatchAndTransform(key string, resourceVersion uint64, trans // TransformFunc attempts to convert an object to another object for use with a watcher. type TransformFunc func(runtime.Object) (runtime.Object, error) +// includeFunc returns true if the given key should be considered part of a watch +type includeFunc func(key string) bool + +// exceptKey is an includeFunc that returns false when the provided key matches the watched key +func exceptKey(except string) includeFunc { + return func(key string) bool { + return key != except + } +} + // etcdWatcher converts a native etcd watch to a watch.Interface. type etcdWatcher struct { encoding runtime.Codec versioner EtcdResourceVersioner transform TransformFunc - list bool // If we're doing a recursive watch, should be true. - filter FilterFunc + list bool // If we're doing a recursive watch, should be true. + include includeFunc + filter FilterFunc etcdIncoming chan *etcd.Response etcdError chan error @@ -126,12 +137,13 @@ const watchWaitDuration = 100 * time.Millisecond // newEtcdWatcher returns a new etcdWatcher; if list is true, watch sub-nodes. If you provide a transform // and a versioner, the versioner must be able to handle the objects that transform creates. -func newEtcdWatcher(list bool, filter FilterFunc, encoding runtime.Codec, versioner EtcdResourceVersioner, transform TransformFunc) *etcdWatcher { +func newEtcdWatcher(list bool, include includeFunc, filter FilterFunc, encoding runtime.Codec, versioner EtcdResourceVersioner, transform TransformFunc) *etcdWatcher { w := &etcdWatcher{ encoding: encoding, versioner: versioner, transform: transform, list: list, + include: include, filter: filter, etcdIncoming: make(chan *etcd.Response), etcdError: make(chan error, 1), @@ -258,6 +270,9 @@ func (w *etcdWatcher) sendAdd(res *etcd.Response) { glog.Errorf("unexpected nil node: %#v", res) return } + if w.include != nil && !w.include(res.Node.Key) { + return + } data := []byte(res.Node.Value) obj, err := w.decodeObject(data, res.Node.ModifiedIndex) if err != nil { @@ -285,6 +300,9 @@ func (w *etcdWatcher) sendModify(res *etcd.Response) { glog.Errorf("unexpected nil node: %#v", res) return } + if w.include != nil && !w.include(res.Node.Key) { + return + } curData := []byte(res.Node.Value) curObj, err := w.decodeObject(curData, res.Node.ModifiedIndex) if err != nil { @@ -331,6 +349,9 @@ func (w *etcdWatcher) sendDelete(res *etcd.Response) { glog.Errorf("unexpected nil prev node: %#v", res) return } + if w.include != nil && !w.include(res.PrevNode.Key) { + return + } data := []byte(res.PrevNode.Value) index := res.PrevNode.ModifiedIndex if res.Node != nil { diff --git a/pkg/tools/etcd_tools_watch_test.go b/pkg/tools/etcd_tools_watch_test.go index ad5bd071634..67e508c6773 100644 --- a/pkg/tools/etcd_tools_watch_test.go +++ b/pkg/tools/etcd_tools_watch_test.go @@ -113,7 +113,7 @@ func TestWatchInterpretations(t *testing.T) { for name, item := range table { for _, action := range item.actions { - w := newEtcdWatcher(true, firstLetterIsB, codec, versioner, nil) + w := newEtcdWatcher(true, nil, firstLetterIsB, codec, versioner, nil) emitCalled := false w.emit = func(event watch.Event) { emitCalled = true @@ -151,7 +151,7 @@ func TestWatchInterpretations(t *testing.T) { } func TestWatchInterpretation_ResponseNotSet(t *testing.T) { - w := newEtcdWatcher(false, Everything, codec, versioner, nil) + w := newEtcdWatcher(false, nil, Everything, codec, versioner, nil) w.emit = func(e watch.Event) { t.Errorf("Unexpected emit: %v", e) } @@ -165,7 +165,7 @@ func TestWatchInterpretation_ResponseNotSet(t *testing.T) { func TestWatchInterpretation_ResponseNoNode(t *testing.T) { actions := []string{"create", "set", "compareAndSwap", "delete"} for _, action := range actions { - w := newEtcdWatcher(false, Everything, codec, versioner, nil) + w := newEtcdWatcher(false, nil, Everything, codec, versioner, nil) w.emit = func(e watch.Event) { t.Errorf("Unexpected emit: %v", e) } @@ -179,7 +179,7 @@ func TestWatchInterpretation_ResponseNoNode(t *testing.T) { func TestWatchInterpretation_ResponseBadData(t *testing.T) { actions := []string{"create", "set", "compareAndSwap", "delete"} for _, action := range actions { - w := newEtcdWatcher(false, Everything, codec, versioner, nil) + w := newEtcdWatcher(false, nil, Everything, codec, versioner, nil) w.emit = func(e watch.Event) { t.Errorf("Unexpected emit: %v", e) } @@ -524,6 +524,51 @@ func TestWatchListFromZeroIndex(t *testing.T) { watching.Stop() } +func TestWatchListIgnoresRootKey(t *testing.T) { + codec := latest.Codec + pod := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}} + + fakeClient := NewFakeEtcdClient(t) + h := EtcdHelper{fakeClient, codec, versioner} + + watching, err := h.WatchList("/some/key", 1, Everything) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + fakeClient.WaitForWatchCompletion() + + // This is the root directory of the watch, which happens to have a value encoded + fakeClient.WatchResponse <- &etcd.Response{ + Action: "delete", + PrevNode: &etcd.Node{ + Key: "/some/key", + Value: runtime.EncodeOrDie(codec, pod), + CreatedIndex: 1, + ModifiedIndex: 1, + }, + } + // Delete of the parent directory of a key is an event that a list watch would receive, + // but will have no value so the decode will fail. + fakeClient.WatchResponse <- &etcd.Response{ + Action: "delete", + PrevNode: &etcd.Node{ + Key: "/some/key", + Value: "", + CreatedIndex: 1, + ModifiedIndex: 1, + }, + } + close(fakeClient.WatchStop) + + // the existing node is detected and the index set + _, open := <-watching.ResultChan() + if open { + t.Fatalf("unexpected channel open") + } + + watching.Stop() +} + func TestWatchFromNotFound(t *testing.T) { fakeClient := NewFakeEtcdClient(t) fakeClient.Data["/some/key"] = EtcdResponseWithError{