use Watch for single object instead of WatchList

This commit is contained in:
Masahiro Sano 2015-04-18 01:23:58 +09:00
parent 23e806604d
commit f90dc8f413
6 changed files with 63 additions and 26 deletions

View File

@ -271,7 +271,8 @@ func (r *Registry) WatchServices(ctx api.Context, label labels.Selector, field f
if err != nil { if err != nil {
return nil, err return nil, err
} }
return r.Watch(key, version), nil // TODO: use generic.SelectionPredicate
return r.Watch(key, version, tools.Everything)
} }
if field.Empty() { if field.Empty() {
return r.WatchList(makeServiceListKey(ctx), version, tools.Everything) return r.WatchList(makeServiceListKey(ctx), version, tools.Everything)

View File

@ -429,18 +429,7 @@ func (e *Etcd) WatchPredicate(ctx api.Context, m generic.Matcher, resourceVersio
return nil, err return nil, err
} }
var watchKey string filterFunc := func(obj runtime.Object) bool {
if name, ok := m.MatchesSingle(); ok {
key, err := e.KeyFunc(ctx, name)
if err != nil {
return nil, err
}
watchKey = key
} else {
watchKey = e.KeyRootFunc(ctx)
}
return e.Helper.WatchList(watchKey, version, func(obj runtime.Object) bool {
matches, err := m.Matches(obj) matches, err := m.Matches(obj)
if err != nil { if err != nil {
glog.Errorf("unable to match watch: %v", err) glog.Errorf("unable to match watch: %v", err)
@ -453,5 +442,15 @@ func (e *Etcd) WatchPredicate(ctx api.Context, m generic.Matcher, resourceVersio
} }
} }
return matches return matches
}) }
if name, ok := m.MatchesSingle(); ok {
key, err := e.KeyFunc(ctx, name)
if err != nil {
return nil, err
}
return e.Helper.Watch(key, version, filterFunc)
}
return e.Helper.WatchList(e.KeyRootFunc(ctx), version, filterFunc)
} }

View File

@ -690,11 +690,16 @@ func TestEtcdWatch(t *testing.T) {
for name, m := range table { for name, m := range table {
podA := &api.Pod{ podA := &api.Pod{
ObjectMeta: api.ObjectMeta{Name: "foo", ResourceVersion: "1"}, ObjectMeta: api.ObjectMeta{
Spec: api.PodSpec{Host: "machine"}, Name: "foo",
Namespace: api.NamespaceDefault,
ResourceVersion: "1",
},
Spec: api.PodSpec{Host: "machine"},
} }
respWithPodA := &etcd.Response{ respWithPodA := &etcd.Response{
Node: &etcd.Node{ Node: &etcd.Node{
Key: "/registry/pods/default/foo",
Value: runtime.EncodeOrDie(testapi.Codec(), podA), Value: runtime.EncodeOrDie(testapi.Codec(), podA),
ModifiedIndex: 1, ModifiedIndex: 1,
CreatedIndex: 1, CreatedIndex: 1,

View File

@ -79,8 +79,10 @@ func (h *EtcdHelper) WatchList(key string, resourceVersion uint64, filter Filter
// Watch begins watching the specified key. Events are decoded into // Watch begins watching the specified key. Events are decoded into
// API objects and sent down the returned watch.Interface. // API objects and sent down the returned watch.Interface.
// Errors will be sent down the channel. // Errors will be sent down the channel.
func (h *EtcdHelper) Watch(key string, resourceVersion uint64) watch.Interface { func (h *EtcdHelper) Watch(key string, resourceVersion uint64, filter FilterFunc) (watch.Interface, error) {
return h.WatchAndTransform(key, resourceVersion, nil) w := newEtcdWatcher(false, nil, filter, h.Codec, h.Versioner, nil)
go w.etcdWatch(h.Client, key, resourceVersion)
return w, nil
} }
// WatchAndTransform begins watching the specified key. Events are decoded into // WatchAndTransform begins watching the specified key. Events are decoded into

View File

@ -207,7 +207,13 @@ func TestWatchEtcdError(t *testing.T) {
fakeClient.WatchImmediateError = fmt.Errorf("immediate error") fakeClient.WatchImmediateError = fmt.Errorf("immediate error")
h := EtcdHelper{fakeClient, codec, versioner} h := EtcdHelper{fakeClient, codec, versioner}
got := <-h.Watch("/some/key", 4).ResultChan() watching, err := h.Watch("/some/key", 4, Everything)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer watching.Stop()
got := <-watching.ResultChan()
if got.Type != watch.Error { if got.Type != watch.Error {
t.Fatalf("Unexpected non-error") t.Fatalf("Unexpected non-error")
} }
@ -229,7 +235,10 @@ func TestWatch(t *testing.T) {
fakeClient.expectNotFoundGetSet["/some/key"] = struct{}{} fakeClient.expectNotFoundGetSet["/some/key"] = struct{}{}
h := EtcdHelper{fakeClient, codec, versioner} h := EtcdHelper{fakeClient, codec, versioner}
watching := h.Watch("/some/key", 0) watching, err := h.Watch("/some/key", 0, Everything)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
fakeClient.WaitForWatchCompletion() fakeClient.WaitForWatchCompletion()
// when server returns not found, the watch index starts at the next value (1) // when server returns not found, the watch index starts at the next value (1)
@ -398,7 +407,11 @@ func TestWatchEtcdState(t *testing.T) {
fakeClient.Data[key] = value fakeClient.Data[key] = value
} }
h := EtcdHelper{fakeClient, codec, versioner} h := EtcdHelper{fakeClient, codec, versioner}
watching := h.Watch("/somekey/foo", testCase.From) watching, err := h.Watch("/somekey/foo", testCase.From, Everything)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
fakeClient.WaitForWatchCompletion() fakeClient.WaitForWatchCompletion()
t.Logf("Testing %v", k) t.Logf("Testing %v", k)
@ -466,7 +479,10 @@ func TestWatchFromZeroIndex(t *testing.T) {
fakeClient.Data["/some/key"] = testCase.Response fakeClient.Data["/some/key"] = testCase.Response
h := EtcdHelper{fakeClient, codec, versioner} h := EtcdHelper{fakeClient, codec, versioner}
watching := h.Watch("/some/key", 0) watching, err := h.Watch("/some/key", 0, Everything)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
fakeClient.WaitForWatchCompletion() fakeClient.WaitForWatchCompletion()
if e, a := testCase.Response.R.EtcdIndex+1, fakeClient.WatchIndex; e != a { if e, a := testCase.Response.R.EtcdIndex+1, fakeClient.WatchIndex; e != a {
@ -612,7 +628,10 @@ func TestWatchFromNotFound(t *testing.T) {
} }
h := EtcdHelper{fakeClient, codec, versioner} h := EtcdHelper{fakeClient, codec, versioner}
watching := h.Watch("/some/key", 0) watching, err := h.Watch("/some/key", 0, Everything)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
fakeClient.WaitForWatchCompletion() fakeClient.WaitForWatchCompletion()
if fakeClient.WatchIndex != 3 { if fakeClient.WatchIndex != 3 {
@ -635,7 +654,10 @@ func TestWatchFromOtherError(t *testing.T) {
} }
h := EtcdHelper{fakeClient, codec, versioner} h := EtcdHelper{fakeClient, codec, versioner}
watching := h.Watch("/some/key", 0) watching, err := h.Watch("/some/key", 0, Everything)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
errEvent := <-watching.ResultChan() errEvent := <-watching.ResultChan()
if e, a := watch.Error, errEvent.Type; e != a { if e, a := watch.Error, errEvent.Type; e != a {
@ -665,7 +687,11 @@ func TestWatchPurposefulShutdown(t *testing.T) {
fakeClient.expectNotFoundGetSet["/some/key"] = struct{}{} fakeClient.expectNotFoundGetSet["/some/key"] = struct{}{}
// Test purposeful shutdown // Test purposeful shutdown
watching := h.Watch("/some/key", 0) watching, err := h.Watch("/some/key", 0, Everything)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
fakeClient.WaitForWatchCompletion() fakeClient.WaitForWatchCompletion()
watching.Stop() watching.Stop()

View File

@ -101,7 +101,11 @@ func TestWatch(t *testing.T) {
expectedVersion := resp.Node.ModifiedIndex expectedVersion := resp.Node.ModifiedIndex
// watch should load the object at the current index // watch should load the object at the current index
w := helper.Watch(key, 0) w, err := helper.Watch(key, 0, tools.Everything)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
event := <-w.ResultChan() event := <-w.ResultChan()
if event.Type != watch.Added || event.Object == nil { if event.Type != watch.Added || event.Object == nil {
t.Fatalf("expected first value to be set to ADDED, got %#v", event) t.Fatalf("expected first value to be set to ADDED, got %#v", event)