diff --git a/pkg/registry/etcd/etcd.go b/pkg/registry/etcd/etcd.go index 08f0f169f5c..5ae6da989f1 100644 --- a/pkg/registry/etcd/etcd.go +++ b/pkg/registry/etcd/etcd.go @@ -271,7 +271,8 @@ func (r *Registry) WatchServices(ctx api.Context, label labels.Selector, field f if err != nil { return nil, err } - return r.Watch(key, version), nil + // TODO: use generic.SelectionPredicate + return r.Watch(key, version, tools.Everything) } if field.Empty() { return r.WatchList(makeServiceListKey(ctx), version, tools.Everything) diff --git a/pkg/registry/generic/etcd/etcd.go b/pkg/registry/generic/etcd/etcd.go index aa98f392fe4..5bb51f54917 100644 --- a/pkg/registry/generic/etcd/etcd.go +++ b/pkg/registry/generic/etcd/etcd.go @@ -429,18 +429,7 @@ func (e *Etcd) WatchPredicate(ctx api.Context, m generic.Matcher, resourceVersio return nil, err } - var watchKey string - if name, ok := m.MatchesSingle(); ok { - key, err := e.KeyFunc(ctx, name) - if err != nil { - return nil, err - } - watchKey = key - } else { - watchKey = e.KeyRootFunc(ctx) - } - - return e.Helper.WatchList(watchKey, version, func(obj runtime.Object) bool { + filterFunc := func(obj runtime.Object) bool { matches, err := m.Matches(obj) if err != nil { glog.Errorf("unable to match watch: %v", err) @@ -453,5 +442,15 @@ func (e *Etcd) WatchPredicate(ctx api.Context, m generic.Matcher, resourceVersio } } return matches - }) + } + + if name, ok := m.MatchesSingle(); ok { + key, err := e.KeyFunc(ctx, name) + if err != nil { + return nil, err + } + return e.Helper.Watch(key, version, filterFunc) + } + + return e.Helper.WatchList(e.KeyRootFunc(ctx), version, filterFunc) } diff --git a/pkg/registry/generic/etcd/etcd_test.go b/pkg/registry/generic/etcd/etcd_test.go index 1e586e55637..2725cddd59c 100644 --- a/pkg/registry/generic/etcd/etcd_test.go +++ b/pkg/registry/generic/etcd/etcd_test.go @@ -690,11 +690,16 @@ func TestEtcdWatch(t *testing.T) { for name, m := range table { podA := &api.Pod{ - ObjectMeta: api.ObjectMeta{Name: "foo", ResourceVersion: "1"}, - Spec: api.PodSpec{Host: "machine"}, + ObjectMeta: api.ObjectMeta{ + Name: "foo", + Namespace: api.NamespaceDefault, + ResourceVersion: "1", + }, + Spec: api.PodSpec{Host: "machine"}, } respWithPodA := &etcd.Response{ Node: &etcd.Node{ + Key: "/registry/pods/default/foo", Value: runtime.EncodeOrDie(testapi.Codec(), podA), ModifiedIndex: 1, CreatedIndex: 1, diff --git a/pkg/tools/etcd_helper_watch.go b/pkg/tools/etcd_helper_watch.go index f7081134941..0bc6b818792 100644 --- a/pkg/tools/etcd_helper_watch.go +++ b/pkg/tools/etcd_helper_watch.go @@ -79,8 +79,10 @@ func (h *EtcdHelper) WatchList(key string, resourceVersion uint64, filter Filter // Watch begins watching the specified key. Events are decoded into // API objects and sent down the returned watch.Interface. // Errors will be sent down the channel. -func (h *EtcdHelper) Watch(key string, resourceVersion uint64) watch.Interface { - return h.WatchAndTransform(key, resourceVersion, nil) +func (h *EtcdHelper) Watch(key string, resourceVersion uint64, filter FilterFunc) (watch.Interface, error) { + w := newEtcdWatcher(false, nil, filter, h.Codec, h.Versioner, nil) + go w.etcdWatch(h.Client, key, resourceVersion) + return w, nil } // WatchAndTransform begins watching the specified key. Events are decoded into diff --git a/pkg/tools/etcd_helper_watch_test.go b/pkg/tools/etcd_helper_watch_test.go index ebc2f59569b..406b81404cc 100644 --- a/pkg/tools/etcd_helper_watch_test.go +++ b/pkg/tools/etcd_helper_watch_test.go @@ -207,7 +207,13 @@ func TestWatchEtcdError(t *testing.T) { fakeClient.WatchImmediateError = fmt.Errorf("immediate error") h := EtcdHelper{fakeClient, codec, versioner} - got := <-h.Watch("/some/key", 4).ResultChan() + watching, err := h.Watch("/some/key", 4, Everything) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer watching.Stop() + + got := <-watching.ResultChan() if got.Type != watch.Error { t.Fatalf("Unexpected non-error") } @@ -229,7 +235,10 @@ func TestWatch(t *testing.T) { fakeClient.expectNotFoundGetSet["/some/key"] = struct{}{} h := EtcdHelper{fakeClient, codec, versioner} - watching := h.Watch("/some/key", 0) + watching, err := h.Watch("/some/key", 0, Everything) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } fakeClient.WaitForWatchCompletion() // when server returns not found, the watch index starts at the next value (1) @@ -398,7 +407,11 @@ func TestWatchEtcdState(t *testing.T) { fakeClient.Data[key] = value } h := EtcdHelper{fakeClient, codec, versioner} - watching := h.Watch("/somekey/foo", testCase.From) + watching, err := h.Watch("/somekey/foo", testCase.From, Everything) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + fakeClient.WaitForWatchCompletion() t.Logf("Testing %v", k) @@ -466,7 +479,10 @@ func TestWatchFromZeroIndex(t *testing.T) { fakeClient.Data["/some/key"] = testCase.Response h := EtcdHelper{fakeClient, codec, versioner} - watching := h.Watch("/some/key", 0) + watching, err := h.Watch("/some/key", 0, Everything) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } fakeClient.WaitForWatchCompletion() if e, a := testCase.Response.R.EtcdIndex+1, fakeClient.WatchIndex; e != a { @@ -612,7 +628,10 @@ func TestWatchFromNotFound(t *testing.T) { } h := EtcdHelper{fakeClient, codec, versioner} - watching := h.Watch("/some/key", 0) + watching, err := h.Watch("/some/key", 0, Everything) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } fakeClient.WaitForWatchCompletion() if fakeClient.WatchIndex != 3 { @@ -635,7 +654,10 @@ func TestWatchFromOtherError(t *testing.T) { } h := EtcdHelper{fakeClient, codec, versioner} - watching := h.Watch("/some/key", 0) + watching, err := h.Watch("/some/key", 0, Everything) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } errEvent := <-watching.ResultChan() if e, a := watch.Error, errEvent.Type; e != a { @@ -665,7 +687,11 @@ func TestWatchPurposefulShutdown(t *testing.T) { fakeClient.expectNotFoundGetSet["/some/key"] = struct{}{} // Test purposeful shutdown - watching := h.Watch("/some/key", 0) + watching, err := h.Watch("/some/key", 0, Everything) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + fakeClient.WaitForWatchCompletion() watching.Stop() diff --git a/test/integration/etcd_tools_test.go b/test/integration/etcd_tools_test.go index 97a26fd27a5..02a67faa9e7 100644 --- a/test/integration/etcd_tools_test.go +++ b/test/integration/etcd_tools_test.go @@ -101,7 +101,11 @@ func TestWatch(t *testing.T) { expectedVersion := resp.Node.ModifiedIndex // watch should load the object at the current index - w := helper.Watch(key, 0) + w, err := helper.Watch(key, 0, tools.Everything) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + event := <-w.ResultChan() if event.Type != watch.Added || event.Object == nil { t.Fatalf("expected first value to be set to ADDED, got %#v", event)