diff --git a/pkg/registry/etcd/etcd.go b/pkg/registry/etcd/etcd.go index 371454a9ba4..41407b65c3b 100644 --- a/pkg/registry/etcd/etcd.go +++ b/pkg/registry/etcd/etcd.go @@ -79,8 +79,15 @@ func (r *Registry) ListPods(selector labels.Selector) ([]api.Pod, error) { } // WatchPods begins watching for new, changed, or deleted pods. -func (r *Registry) WatchPods(resourceVersion uint64) (watch.Interface, error) { - return r.WatchList("/registry/pods", resourceVersion, tools.Everything) +func (r *Registry) WatchPods(resourceVersion uint64, filter func(*api.Pod) bool) (watch.Interface, error) { + return r.WatchList("/registry/pods", resourceVersion, func(obj interface{}) bool { + pod, ok := obj.(*api.Pod) + if !ok { + glog.Errorf("Unexpected object during pod watch: %#v", obj) + return false + } + return filter(pod) + }) } // GetPod gets a specific pod specified by its ID. diff --git a/pkg/registry/pod/registry.go b/pkg/registry/pod/registry.go index d65f82a080b..2aaf488deab 100644 --- a/pkg/registry/pod/registry.go +++ b/pkg/registry/pod/registry.go @@ -27,7 +27,7 @@ type Registry interface { // ListPods obtains a list of pods that match selector. ListPods(selector labels.Selector) ([]api.Pod, error) // Watch for new/changed/deleted pods - WatchPods(resourceVersion uint64) (watch.Interface, error) + WatchPods(resourceVersion uint64, filter func(*api.Pod) bool) (watch.Interface, error) // Get a specific pod GetPod(podID string) (*api.Pod, error) // Create a pod based on a specification. diff --git a/pkg/registry/pod/storage.go b/pkg/registry/pod/storage.go index 2f9410c8d8c..eec4e143c2d 100644 --- a/pkg/registry/pod/storage.go +++ b/pkg/registry/pod/storage.go @@ -118,20 +118,14 @@ func (rs *RegistryStorage) List(selector labels.Selector) (interface{}, error) { // Watch begins watching for new, changed, or deleted pods. func (rs *RegistryStorage) Watch(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { - source, err := rs.registry.WatchPods(resourceVersion) - if err != nil { - return nil, err - } - return watch.Filter(source, func(e watch.Event) (watch.Event, bool) { - pod := e.Object.(*api.Pod) + return rs.registry.WatchPods(resourceVersion, func(pod *api.Pod) bool { fields := labels.Set{ "ID": pod.ID, "DesiredState.Status": string(pod.DesiredState.Status), "DesiredState.Host": pod.DesiredState.Host, } - passesFilter := label.Matches(labels.Set(pod.Labels)) && field.Matches(fields) - return e, passesFilter - }), nil + return label.Matches(labels.Set(pod.Labels)) && field.Matches(fields) + }) } func (rs RegistryStorage) New() interface{} { diff --git a/pkg/registry/registrytest/pod.go b/pkg/registry/registrytest/pod.go index 6efaaf7fa87..d097e7dafad 100644 --- a/pkg/registry/registrytest/pod.go +++ b/pkg/registry/registrytest/pod.go @@ -55,7 +55,8 @@ func (r *PodRegistry) ListPods(selector labels.Selector) ([]api.Pod, error) { return filtered, nil } -func (r *PodRegistry) WatchPods(resourceVersion uint64) (watch.Interface, error) { +func (r *PodRegistry) WatchPods(resourceVersion uint64, filter func(*api.Pod) bool) (watch.Interface, error) { + // TODO: wire filter down into the mux; it needs access to current and previous state :( return r.mux.Watch(), nil } diff --git a/pkg/tools/etcd_tools.go b/pkg/tools/etcd_tools.go index 358d69ebefb..90bad267643 100644 --- a/pkg/tools/etcd_tools.go +++ b/pkg/tools/etcd_tools.go @@ -20,12 +20,8 @@ import ( "errors" "fmt" "reflect" - "sync" - "github.com/GoogleCloudPlatform/kubernetes/pkg/util" - "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" "github.com/coreos/go-etcd/etcd" - "github.com/golang/glog" ) const ( @@ -301,256 +297,3 @@ func (h *EtcdHelper) AtomicUpdate(key string, ptrToType interface{}, tryUpdate E return err } } - -// FilterFunc is a predicate which takes an API object and returns true -// iff the object should remain in the set. -type FilterFunc func(obj interface{}) bool - -// Everything is a FilterFunc which accepts all objects. -func Everything(interface{}) bool { - return true -} - -// WatchList begins watching the specified key's items. Items are decoded into -// API objects, and any items passing 'filter' are sent down the returned -// watch.Interface. resourceVersion may be used to specify what version to begin -// watching (e.g., for reconnecting without missing any updates). -func (h *EtcdHelper) WatchList(key string, resourceVersion uint64, filter FilterFunc) (watch.Interface, error) { - w := newEtcdWatcher(true, filter, h.Codec, h.ResourceVersioner, nil) - go w.etcdWatch(h.Client, key, resourceVersion) - return w, nil -} - -// Watch begins watching the specified key. Events are decoded into -// API objects and sent down the returned watch.Interface. -func (h *EtcdHelper) Watch(key string, resourceVersion uint64) (watch.Interface, error) { - return h.WatchAndTransform(key, resourceVersion, nil) -} - -// WatchAndTransform begins watching the specified key. Events are decoded into -// API objects and sent down the returned watch.Interface. If the transform -// function is provided, the value decoded from etcd will be passed to the function -// prior to being returned. -// -// The transform function can be used to populate data not available to etcd, or to -// change or wrap the serialized etcd object. -// -// startTime := time.Now() -// helper.WatchAndTransform(key, version, func(input interface{}) (interface{}, error) { -// value := input.(TimeAwareValue) -// value.Since = startTime -// return value, nil -// }) -// -func (h *EtcdHelper) WatchAndTransform(key string, resourceVersion uint64, transform TransformFunc) (watch.Interface, error) { - w := newEtcdWatcher(false, nil, h.Codec, h.ResourceVersioner, transform) - go w.etcdWatch(h.Client, key, resourceVersion) - return w, nil -} - -// TransformFunc attempts to convert an object to another object for use with a watcher -type TransformFunc func(interface{}) (interface{}, error) - -// etcdWatcher converts a native etcd watch to a watch.Interface. -type etcdWatcher struct { - encoding Codec - versioner ResourceVersioner - transform TransformFunc - - list bool // If we're doing a recursive watch, should be true. - filter FilterFunc - - etcdIncoming chan *etcd.Response - etcdStop chan bool - etcdCallEnded chan struct{} - - outgoing chan watch.Event - userStop chan struct{} - stopped bool - stopLock sync.Mutex - - // Injectable for testing. Send the event down the outgoing channel. - emit func(watch.Event) -} - -// newEtcdWatcher returns a new etcdWatcher; if list is true, watch sub-nodes. If you provide a transform -// and a versioner, the versioner must be able to handle the objects that transform creates. -func newEtcdWatcher(list bool, filter FilterFunc, encoding Codec, versioner ResourceVersioner, transform TransformFunc) *etcdWatcher { - w := &etcdWatcher{ - encoding: encoding, - versioner: versioner, - transform: transform, - list: list, - filter: filter, - etcdIncoming: make(chan *etcd.Response), - etcdStop: make(chan bool), - etcdCallEnded: make(chan struct{}), - outgoing: make(chan watch.Event), - userStop: make(chan struct{}), - } - w.emit = func(e watch.Event) { w.outgoing <- e } - go w.translate() - return w -} - -// etcdWatch calls etcd's Watch function, and handles any errors. Meant to be called -// as a goroutine. -func (w *etcdWatcher) etcdWatch(client EtcdGetSet, key string, resourceVersion uint64) { - defer util.HandleCrash() - defer close(w.etcdCallEnded) - if resourceVersion == 0 { - latest, ok := etcdGetInitialWatchState(client, key, w.list, w.etcdIncoming) - if !ok { - return - } - resourceVersion = latest - } - _, err := client.Watch(key, resourceVersion, w.list, w.etcdIncoming, w.etcdStop) - if err != etcd.ErrWatchStoppedByUser { - glog.Errorf("etcd.Watch stopped unexpectedly: %v (%#v)", err, key) - } -} - -// etcdGetInitialWatchState turns an etcd Get request into a watch equivalent -func etcdGetInitialWatchState(client EtcdGetSet, key string, recursive bool, incoming chan<- *etcd.Response) (resourceVersion uint64, success bool) { - success = true - - resp, err := client.Get(key, false, recursive) - if err != nil { - if !IsEtcdNotFound(err) { - glog.Errorf("watch was unable to retrieve the current index for the provided key: %v (%#v)", err, key) - success = false - return - } - if index, ok := etcdErrorIndex(err); ok { - resourceVersion = index + 1 - } - return - } - resourceVersion = resp.EtcdIndex + 1 - convertRecursiveResponse(resp.Node, resp, incoming) - return -} - -// convertRecursiveResponse turns a recursive get response from etcd into individual response objects -// by copying the original response. This emulates the behavior of a recursive watch. -func convertRecursiveResponse(node *etcd.Node, response *etcd.Response, incoming chan<- *etcd.Response) { - if node.Dir { - for i := range node.Nodes { - convertRecursiveResponse(node.Nodes[i], response, incoming) - } - return - } - copied := *response - if node.ModifiedIndex == node.CreatedIndex { - copied.Action = "create" - } else { - copied.Action = "set" - } - copied.Node = node - incoming <- &copied -} - -// translate pulls stuff from etcd, convert, and push out the outgoing channel. Meant to be -// called as a goroutine. -func (w *etcdWatcher) translate() { - defer close(w.outgoing) - defer util.HandleCrash() - - for { - select { - case <-w.etcdCallEnded: - return - case <-w.userStop: - w.etcdStop <- true - return - case res, ok := <-w.etcdIncoming: - if !ok { - return - } - w.sendResult(res) - } - } -} - -func (w *etcdWatcher) sendResult(res *etcd.Response) { - var action watch.EventType - var data []byte - var index uint64 - switch res.Action { - case "create": - if res.Node == nil { - glog.Errorf("unexpected nil node: %#v", res) - return - } - data = []byte(res.Node.Value) - index = res.Node.ModifiedIndex - action = watch.Added - case "set", "compareAndSwap", "get": - if res.Node == nil { - glog.Errorf("unexpected nil node: %#v", res) - return - } - data = []byte(res.Node.Value) - index = res.Node.ModifiedIndex - action = watch.Modified - case "delete": - if res.PrevNode == nil { - glog.Errorf("unexpected nil prev node: %#v", res) - return - } - data = []byte(res.PrevNode.Value) - index = res.Node.ModifiedIndex - action = watch.Deleted - default: - glog.Errorf("unknown action: %v", res.Action) - return - } - - obj, err := w.encoding.Decode(data) - if err != nil { - glog.Errorf("failure to decode api object: '%v' from %#v %#v", string(data), res, res.Node) - // TODO: expose an error through watch.Interface? - w.Stop() - return - } - - // ensure resource version is set on the object we load from etcd - if w.versioner != nil { - if err := w.versioner.SetResourceVersion(obj, index); err != nil { - glog.Errorf("failure to version api object (%d) %#v: %v", index, obj, err) - } - } - - // perform any necessary transformation - if w.transform != nil { - obj, err = w.transform(obj) - if err != nil { - glog.Errorf("failure to transform api object %#v: %v", obj, err) - // TODO: expose an error through watch.Interface? - w.Stop() - return - } - } - - w.emit(watch.Event{ - Type: action, - Object: obj, - }) -} - -// ResultChannel implements watch.Interface. -func (w *etcdWatcher) ResultChan() <-chan watch.Event { - return w.outgoing -} - -// Stop implements watch.Interface. -func (w *etcdWatcher) Stop() { - w.stopLock.Lock() - defer w.stopLock.Unlock() - // Prevent double channel closes. - if !w.stopped { - w.stopped = true - close(w.userStop) - } -} diff --git a/pkg/tools/etcd_tools_test.go b/pkg/tools/etcd_tools_test.go index 2b36bc8e52d..b641a061cc9 100644 --- a/pkg/tools/etcd_tools_test.go +++ b/pkg/tools/etcd_tools_test.go @@ -22,12 +22,10 @@ import ( "reflect" "sync" "testing" - "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/conversion" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" - "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" "github.com/coreos/go-etcd/etcd" ) @@ -364,379 +362,3 @@ func TestAtomicUpdate_CreateCollision(t *testing.T) { t.Errorf("Some of the writes were lost. Stored value: %d", stored.Value) } } - -func TestWatchInterpretation_ListCreate(t *testing.T) { - w := newEtcdWatcher(true, func(interface{}) bool { - t.Errorf("unexpected filter call") - return true - }, codec, versioner, nil) - pod := &api.Pod{JSONBase: api.JSONBase{ID: "foo"}} - podBytes, _ := codec.Encode(pod) - - go w.sendResult(&etcd.Response{ - Action: "create", - Node: &etcd.Node{ - Value: string(podBytes), - }, - }) - - got := <-w.outgoing - if e, a := watch.Added, got.Type; e != a { - t.Errorf("Expected %v, got %v", e, a) - } - if e, a := pod, got.Object; !reflect.DeepEqual(e, a) { - t.Errorf("Expected %v, got %v", e, a) - } -} - -func TestWatchInterpretation_ListAdd(t *testing.T) { - w := newEtcdWatcher(true, func(interface{}) bool { - t.Errorf("unexpected filter call") - return true - }, codec, versioner, nil) - pod := &api.Pod{JSONBase: api.JSONBase{ID: "foo"}} - podBytes, _ := codec.Encode(pod) - - go w.sendResult(&etcd.Response{ - Action: "set", - Node: &etcd.Node{ - Value: string(podBytes), - }, - }) - - got := <-w.outgoing - if e, a := watch.Modified, got.Type; e != a { - t.Errorf("Expected %v, got %v", e, a) - } - if e, a := pod, got.Object; !reflect.DeepEqual(e, a) { - t.Errorf("Expected %v, got %v", e, a) - } -} - -func TestWatchInterpretation_Delete(t *testing.T) { - w := newEtcdWatcher(true, func(interface{}) bool { - t.Errorf("unexpected filter call") - return true - }, codec, versioner, nil) - pod := &api.Pod{JSONBase: api.JSONBase{ID: "foo"}} - podBytes, _ := codec.Encode(pod) - - go w.sendResult(&etcd.Response{ - Action: "delete", - Node: &etcd.Node{ - ModifiedIndex: 2, - }, - PrevNode: &etcd.Node{ - Value: string(podBytes), - ModifiedIndex: 1, - }, - }) - - got := <-w.outgoing - if e, a := watch.Deleted, got.Type; e != a { - t.Errorf("Expected %v, got %v", e, a) - } - pod.ResourceVersion = 2 - if e, a := pod, got.Object; !reflect.DeepEqual(e, a) { - t.Errorf("Expected %v, got %v", e, a) - } -} - -func TestWatchInterpretation_ResponseNotSet(t *testing.T) { - w := newEtcdWatcher(false, func(interface{}) bool { - t.Errorf("unexpected filter call") - return true - }, codec, versioner, nil) - w.emit = func(e watch.Event) { - t.Errorf("Unexpected emit: %v", e) - } - - w.sendResult(&etcd.Response{ - Action: "update", - }) -} - -func TestWatchInterpretation_ResponseNoNode(t *testing.T) { - w := newEtcdWatcher(false, func(interface{}) bool { - t.Errorf("unexpected filter call") - return true - }, codec, versioner, nil) - w.emit = func(e watch.Event) { - t.Errorf("Unexpected emit: %v", e) - } - w.sendResult(&etcd.Response{ - Action: "set", - }) -} - -func TestWatchInterpretation_ResponseBadData(t *testing.T) { - w := newEtcdWatcher(false, func(interface{}) bool { - t.Errorf("unexpected filter call") - return true - }, codec, versioner, nil) - w.emit = func(e watch.Event) { - t.Errorf("Unexpected emit: %v", e) - } - w.sendResult(&etcd.Response{ - Action: "set", - Node: &etcd.Node{ - Value: "foobar", - }, - }) -} - -func TestWatch(t *testing.T) { - fakeClient := NewFakeEtcdClient(t) - fakeClient.expectNotFoundGetSet["/some/key"] = struct{}{} - h := EtcdHelper{fakeClient, codec, versioner} - - watching, err := h.Watch("/some/key", 0) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - - fakeClient.WaitForWatchCompletion() - // when server returns not found, the watch index starts at the next value (1) - if fakeClient.WatchIndex != 1 { - t.Errorf("Expected client to be at index %d, got %#v", 1, fakeClient) - } - - // Test normal case - pod := &api.Pod{JSONBase: api.JSONBase{ID: "foo"}} - podBytes, _ := codec.Encode(pod) - fakeClient.WatchResponse <- &etcd.Response{ - Action: "set", - Node: &etcd.Node{ - Value: string(podBytes), - }, - } - - event := <-watching.ResultChan() - if e, a := watch.Modified, event.Type; e != a { - t.Errorf("Expected %v, got %v", e, a) - } - if e, a := pod, event.Object; !reflect.DeepEqual(e, a) { - t.Errorf("Expected %v, got %v", e, a) - } - - // Test error case - fakeClient.WatchInjectError <- fmt.Errorf("Injected error") - - // Did everything shut down? - if _, open := <-fakeClient.WatchResponse; open { - t.Errorf("An injected error did not cause a graceful shutdown") - } - if _, open := <-watching.ResultChan(); open { - t.Errorf("An injected error did not cause a graceful shutdown") - } -} - -func TestWatchFromZeroIndex(t *testing.T) { - pod := &api.Pod{JSONBase: api.JSONBase{ID: "foo"}} - - testCases := map[string]struct { - Response EtcdResponseWithError - ExpectedVersion uint64 - ExpectedType watch.EventType - }{ - "get value created": { - EtcdResponseWithError{ - R: &etcd.Response{ - Node: &etcd.Node{ - Value: api.EncodeOrDie(pod), - CreatedIndex: 1, - ModifiedIndex: 1, - }, - Action: "get", - EtcdIndex: 2, - }, - }, - 1, - watch.Added, - }, - "get value modified": { - EtcdResponseWithError{ - R: &etcd.Response{ - Node: &etcd.Node{ - Value: api.EncodeOrDie(pod), - CreatedIndex: 1, - ModifiedIndex: 2, - }, - Action: "get", - EtcdIndex: 3, - }, - }, - 2, - watch.Modified, - }, - } - - for k, testCase := range testCases { - fakeClient := NewFakeEtcdClient(t) - fakeClient.Data["/some/key"] = testCase.Response - h := EtcdHelper{fakeClient, codec, versioner} - - watching, err := h.Watch("/some/key", 0) - if err != nil { - t.Fatalf("%s: unexpected error: %v", k, err) - } - - fakeClient.WaitForWatchCompletion() - if e, a := testCase.Response.R.EtcdIndex+1, fakeClient.WatchIndex; e != a { - t.Errorf("%s: expected watch index to be %d, got %d", k, e, a) - } - - // the existing node is detected and the index set - event := <-watching.ResultChan() - if e, a := testCase.ExpectedType, event.Type; e != a { - t.Errorf("%s: expected %v, got %v", k, e, a) - } - actualPod, ok := event.Object.(*api.Pod) - if !ok { - t.Fatalf("%s: expected a pod, got %#v", k, event.Object) - } - if actualPod.ResourceVersion != testCase.ExpectedVersion { - t.Errorf("%s: expected pod with resource version %d, Got %#v", k, testCase.ExpectedVersion, actualPod) - } - pod.ResourceVersion = testCase.ExpectedVersion - if e, a := pod, event.Object; !reflect.DeepEqual(e, a) { - t.Errorf("%s: expected %v, got %v", k, e, a) - } - watching.Stop() - } -} - -func TestWatchListFromZeroIndex(t *testing.T) { - pod := &api.Pod{JSONBase: api.JSONBase{ID: "foo"}} - - fakeClient := NewFakeEtcdClient(t) - fakeClient.Data["/some/key"] = EtcdResponseWithError{ - R: &etcd.Response{ - Node: &etcd.Node{ - Dir: true, - Nodes: etcd.Nodes{ - &etcd.Node{ - Value: api.EncodeOrDie(pod), - ModifiedIndex: 1, - Nodes: etcd.Nodes{}, - }, - &etcd.Node{ - Value: api.EncodeOrDie(pod), - ModifiedIndex: 2, - Nodes: etcd.Nodes{}, - }, - }, - }, - Action: "get", - EtcdIndex: 3, - }, - } - h := EtcdHelper{fakeClient, codec, versioner} - - watching, err := h.WatchList("/some/key", 0, nil) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - - // the existing node is detected and the index set - event := <-watching.ResultChan() - for i := 0; i < 2; i++ { - if e, a := watch.Modified, event.Type; e != a { - t.Errorf("Expected %v, got %v", e, a) - } - actualPod, ok := event.Object.(*api.Pod) - if !ok { - t.Fatalf("expected a pod, got %#v", event.Object) - } - if actualPod.ResourceVersion != 1 { - t.Errorf("Expected pod with resource version %d, Got %#v", 1, actualPod) - } - pod.ResourceVersion = 1 - if e, a := pod, event.Object; !reflect.DeepEqual(e, a) { - t.Errorf("Expected %v, got %v", e, a) - } - } - - fakeClient.WaitForWatchCompletion() - watching.Stop() -} - -func TestWatchFromNotFound(t *testing.T) { - fakeClient := NewFakeEtcdClient(t) - fakeClient.Data["/some/key"] = EtcdResponseWithError{ - R: &etcd.Response{ - Node: nil, - }, - E: &etcd.EtcdError{ - Index: 2, - ErrorCode: 100, - }, - } - h := EtcdHelper{fakeClient, codec, versioner} - - watching, err := h.Watch("/some/key", 0) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - - fakeClient.WaitForWatchCompletion() - if fakeClient.WatchIndex != 3 { - t.Errorf("Expected client to wait for %d, got %#v", 3, fakeClient) - } - - watching.Stop() -} - -func TestWatchFromOtherError(t *testing.T) { - fakeClient := NewFakeEtcdClient(t) - fakeClient.Data["/some/key"] = EtcdResponseWithError{ - R: &etcd.Response{ - Node: nil, - }, - E: &etcd.EtcdError{ - Index: 2, - ErrorCode: 101, - }, - } - h := EtcdHelper{fakeClient, codec, versioner} - - watching, err := h.Watch("/some/key", 0) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - - select { - case _, ok := <-watching.ResultChan(): - if ok { - t.Fatalf("expected result channel to be closed") - } - case <-time.After(1 * time.Millisecond): - t.Fatalf("watch should have closed channel: %#v", watching) - } - - if fakeClient.WatchResponse != nil || fakeClient.WatchIndex != 0 { - t.Fatalf("Watch should not have been invoked: %#v", fakeClient) - } -} - -func TestWatchPurposefulShutdown(t *testing.T) { - fakeClient := NewFakeEtcdClient(t) - h := EtcdHelper{fakeClient, codec, versioner} - fakeClient.expectNotFoundGetSet["/some/key"] = struct{}{} - - // Test purposeful shutdown - watching, err := h.Watch("/some/key", 0) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - fakeClient.WaitForWatchCompletion() - watching.Stop() - - // Did everything shut down? - if _, open := <-fakeClient.WatchResponse; open { - t.Errorf("A stop did not cause a graceful shutdown") - } - if _, open := <-watching.ResultChan(); open { - t.Errorf("An injected error did not cause a graceful shutdown") - } -} diff --git a/pkg/tools/etcd_tools_watch.go b/pkg/tools/etcd_tools_watch.go new file mode 100644 index 00000000000..738a8294c5c --- /dev/null +++ b/pkg/tools/etcd_tools_watch.go @@ -0,0 +1,350 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package tools + +import ( + "sync" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" + "github.com/coreos/go-etcd/etcd" + "github.com/golang/glog" +) + +// FilterFunc is a predicate which takes an API object and returns true +// iff the object should remain in the set. +type FilterFunc func(obj interface{}) bool + +// Everything is a FilterFunc which accepts all objects. +func Everything(interface{}) bool { + return true +} + +// WatchList begins watching the specified key's items. Items are decoded into +// API objects, and any items passing 'filter' are sent down the returned +// watch.Interface. resourceVersion may be used to specify what version to begin +// watching (e.g., for reconnecting without missing any updates). +func (h *EtcdHelper) WatchList(key string, resourceVersion uint64, filter FilterFunc) (watch.Interface, error) { + w := newEtcdWatcher(true, filter, h.Codec, h.ResourceVersioner, nil) + go w.etcdWatch(h.Client, key, resourceVersion) + return w, nil +} + +// Watch begins watching the specified key. Events are decoded into +// API objects and sent down the returned watch.Interface. +func (h *EtcdHelper) Watch(key string, resourceVersion uint64) (watch.Interface, error) { + return h.WatchAndTransform(key, resourceVersion, nil) +} + +// WatchAndTransform begins watching the specified key. Events are decoded into +// API objects and sent down the returned watch.Interface. If the transform +// function is provided, the value decoded from etcd will be passed to the function +// prior to being returned. +// +// The transform function can be used to populate data not available to etcd, or to +// change or wrap the serialized etcd object. +// +// startTime := time.Now() +// helper.WatchAndTransform(key, version, func(input interface{}) (interface{}, error) { +// value := input.(TimeAwareValue) +// value.Since = startTime +// return value, nil +// }) +// +func (h *EtcdHelper) WatchAndTransform(key string, resourceVersion uint64, transform TransformFunc) (watch.Interface, error) { + w := newEtcdWatcher(false, Everything, h.Codec, h.ResourceVersioner, transform) + go w.etcdWatch(h.Client, key, resourceVersion) + return w, nil +} + +// TransformFunc attempts to convert an object to another object for use with a watcher +type TransformFunc func(interface{}) (interface{}, error) + +// etcdWatcher converts a native etcd watch to a watch.Interface. +type etcdWatcher struct { + encoding Codec + versioner ResourceVersioner + transform TransformFunc + + list bool // If we're doing a recursive watch, should be true. + filter FilterFunc + + etcdIncoming chan *etcd.Response + etcdStop chan bool + etcdCallEnded chan struct{} + + outgoing chan watch.Event + userStop chan struct{} + stopped bool + stopLock sync.Mutex + + // Injectable for testing. Send the event down the outgoing channel. + emit func(watch.Event) +} + +// newEtcdWatcher returns a new etcdWatcher; if list is true, watch sub-nodes. If you provide a transform +// and a versioner, the versioner must be able to handle the objects that transform creates. +func newEtcdWatcher(list bool, filter FilterFunc, encoding Codec, versioner ResourceVersioner, transform TransformFunc) *etcdWatcher { + w := &etcdWatcher{ + encoding: encoding, + versioner: versioner, + transform: transform, + list: list, + filter: filter, + etcdIncoming: make(chan *etcd.Response), + etcdStop: make(chan bool), + etcdCallEnded: make(chan struct{}), + outgoing: make(chan watch.Event), + userStop: make(chan struct{}), + } + w.emit = func(e watch.Event) { w.outgoing <- e } + go w.translate() + return w +} + +// etcdWatch calls etcd's Watch function, and handles any errors. Meant to be called +// as a goroutine. +func (w *etcdWatcher) etcdWatch(client EtcdGetSet, key string, resourceVersion uint64) { + defer util.HandleCrash() + defer close(w.etcdCallEnded) + if resourceVersion == 0 { + latest, ok := etcdGetInitialWatchState(client, key, w.list, w.etcdIncoming) + if !ok { + return + } + resourceVersion = latest + 1 + } + _, err := client.Watch(key, resourceVersion, w.list, w.etcdIncoming, w.etcdStop) + if err != etcd.ErrWatchStoppedByUser { + glog.Errorf("etcd.Watch stopped unexpectedly: %v (%#v)", err, key) + } +} + +// etcdGetInitialWatchState turns an etcd Get request into a watch equivalent +func etcdGetInitialWatchState(client EtcdGetSet, key string, recursive bool, incoming chan<- *etcd.Response) (resourceVersion uint64, success bool) { + success = true + + resp, err := client.Get(key, false, recursive) + if err != nil { + if !IsEtcdNotFound(err) { + glog.Errorf("watch was unable to retrieve the current index for the provided key: %v (%#v)", err, key) + success = false + return + } + if index, ok := etcdErrorIndex(err); ok { + resourceVersion = index + } + return + } + resourceVersion = resp.EtcdIndex + convertRecursiveResponse(resp.Node, resp, incoming) + return +} + +// convertRecursiveResponse turns a recursive get response from etcd into individual response objects +// by copying the original response. This emulates the behavior of a recursive watch. +func convertRecursiveResponse(node *etcd.Node, response *etcd.Response, incoming chan<- *etcd.Response) { + if node.Dir { + for i := range node.Nodes { + convertRecursiveResponse(node.Nodes[i], response, incoming) + } + return + } + copied := *response + copied.Action = "get" + copied.Node = node + incoming <- &copied +} + +// translate pulls stuff from etcd, convert, and push out the outgoing channel. Meant to be +// called as a goroutine. +func (w *etcdWatcher) translate() { + defer close(w.outgoing) + defer util.HandleCrash() + + for { + select { + case <-w.etcdCallEnded: + return + case <-w.userStop: + w.etcdStop <- true + return + case res, ok := <-w.etcdIncoming: + if !ok { + return + } + w.sendResult(res) + } + } +} + +func (w *etcdWatcher) decodeObject(data []byte, index uint64) (interface{}, error) { + obj, err := w.encoding.Decode(data) + if err != nil { + return nil, err + } + + // ensure resource version is set on the object we load from etcd + if w.versioner != nil { + if err := w.versioner.SetResourceVersion(obj, index); err != nil { + glog.Errorf("failure to version api object (%d) %#v: %v", index, obj, err) + } + } + + // perform any necessary transformation + if w.transform != nil { + obj, err = w.transform(obj) + if err != nil { + glog.Errorf("failure to transform api object %#v: %v", obj, err) + return nil, err + } + } + + return obj, nil +} + +func (w *etcdWatcher) sendAdd(res *etcd.Response) { + if res.Node == nil { + glog.Errorf("unexpected nil node: %#v", res) + return + } + data := []byte(res.Node.Value) + obj, err := w.decodeObject(data, res.Node.ModifiedIndex) + if err != nil { + glog.Errorf("failure to decode api object: '%v' from %#v %#v", string(data), res, res.Node) + // TODO: expose an error through watch.Interface? + // Ignore this value. If we stop the watch on a bad value, a client that uses + // the resourceVersion to resume will never be able to get past a bad value. + return + } + if !w.filter(obj) { + return + } + action := watch.Added + if res.Node.ModifiedIndex != res.Node.CreatedIndex { + action = watch.Modified + } + w.emit(watch.Event{ + Type: action, + Object: obj, + }) +} + +func (w *etcdWatcher) sendModify(res *etcd.Response) { + if res.Node == nil { + glog.Errorf("unexpected nil node: %#v", res) + return + } + curData := []byte(res.Node.Value) + curObj, err := w.decodeObject(curData, res.Node.ModifiedIndex) + if err != nil { + glog.Errorf("failure to decode api object: '%v' from %#v %#v", string(curData), res, res.Node) + // TODO: expose an error through watch.Interface? + // Ignore this value. If we stop the watch on a bad value, a client that uses + // the resourceVersion to resume will never be able to get past a bad value. + return + } + curObjPasses := w.filter(curObj) + oldObjPasses := false + var oldObj interface{} + if res.PrevNode != nil && res.PrevNode.Value != "" { + // Ignore problems reading the old object. + if oldObj, err = w.decodeObject([]byte(res.PrevNode.Value), res.PrevNode.ModifiedIndex); err == nil { + oldObjPasses = w.filter(oldObj) + } + } + // Some changes to an object may cause it to start or stop matching a filter. + // We need to report those as adds/deletes. So we have to check both the previous + // and current value of the object. + switch { + case curObjPasses && oldObjPasses: + w.emit(watch.Event{ + Type: watch.Modified, + Object: curObj, + }) + case curObjPasses && !oldObjPasses: + w.emit(watch.Event{ + Type: watch.Added, + Object: curObj, + }) + case !curObjPasses && oldObjPasses: + w.emit(watch.Event{ + Type: watch.Deleted, + Object: oldObj, + }) + } + // Do nothing if neither new nor old object passed the filter. +} + +func (w *etcdWatcher) sendDelete(res *etcd.Response) { + if res.PrevNode == nil { + glog.Errorf("unexpected nil prev node: %#v", res) + return + } + data := []byte(res.PrevNode.Value) + index := res.PrevNode.ModifiedIndex + if res.Node != nil { + // Note that this sends the *old* object with the etcd index for the time at + // which it gets deleted. This will allow users to restart the watch at the right + // index. + index = res.Node.ModifiedIndex + } + obj, err := w.decodeObject(data, index) + if err != nil { + glog.Errorf("failure to decode api object: '%v' from %#v %#v", string(data), res, res.PrevNode) + // TODO: expose an error through watch.Interface? + // Ignore this value. If we stop the watch on a bad value, a client that uses + // the resourceVersion to resume will never be able to get past a bad value. + return + } + if !w.filter(obj) { + return + } + w.emit(watch.Event{ + Type: watch.Deleted, + Object: obj, + }) +} + +func (w *etcdWatcher) sendResult(res *etcd.Response) { + switch res.Action { + case "create", "get": + w.sendAdd(res) + case "set", "compareAndSwap": + w.sendModify(res) + case "delete": + w.sendDelete(res) + default: + glog.Errorf("unknown action: %v", res.Action) + } +} + +// ResultChannel implements watch.Interface. +func (w *etcdWatcher) ResultChan() <-chan watch.Event { + return w.outgoing +} + +// Stop implements watch.Interface. +func (w *etcdWatcher) Stop() { + w.stopLock.Lock() + defer w.stopLock.Unlock() + // Prevent double channel closes. + if !w.stopped { + w.stopped = true + close(w.userStop) + } +} diff --git a/pkg/tools/etcd_tools_watch_test.go b/pkg/tools/etcd_tools_watch_test.go new file mode 100644 index 00000000000..e0f04925bcd --- /dev/null +++ b/pkg/tools/etcd_tools_watch_test.go @@ -0,0 +1,457 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package tools + +import ( + "fmt" + "reflect" + "testing" + "time" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" + "github.com/coreos/go-etcd/etcd" +) + +func TestWatchInterpretations(t *testing.T) { + // Declare some pods to make the test cases compact. + podFoo := &api.Pod{JSONBase: api.JSONBase{ID: "foo"}} + podBar := &api.Pod{JSONBase: api.JSONBase{ID: "bar"}} + podBaz := &api.Pod{JSONBase: api.JSONBase{ID: "baz"}} + firstLetterIsB := func(obj interface{}) bool { + return obj.(*api.Pod).ID[0] == 'b' + } + + // All of these test cases will be run with the firstLetterIsB FilterFunc. + table := map[string]struct { + actions []string // Run this test item for every action here. + prevNodeValue string + nodeValue string + expectEmit bool + expectType watch.EventType + expectObject interface{} + }{ + "create": { + actions: []string{"create", "get"}, + nodeValue: api.EncodeOrDie(podBar), + expectEmit: true, + expectType: watch.Added, + expectObject: podBar, + }, + "create but filter blocks": { + actions: []string{"create", "get"}, + nodeValue: api.EncodeOrDie(podFoo), + expectEmit: false, + }, + "delete": { + actions: []string{"delete"}, + prevNodeValue: api.EncodeOrDie(podBar), + expectEmit: true, + expectType: watch.Deleted, + expectObject: podBar, + }, + "delete but filter blocks": { + actions: []string{"delete"}, + nodeValue: api.EncodeOrDie(podFoo), + expectEmit: false, + }, + "modify appears to create 1": { + actions: []string{"set", "compareAndSwap"}, + nodeValue: api.EncodeOrDie(podBar), + expectEmit: true, + expectType: watch.Added, + expectObject: podBar, + }, + "modify appears to create 2": { + actions: []string{"set", "compareAndSwap"}, + prevNodeValue: api.EncodeOrDie(podFoo), + nodeValue: api.EncodeOrDie(podBar), + expectEmit: true, + expectType: watch.Added, + expectObject: podBar, + }, + "modify appears to delete": { + actions: []string{"set", "compareAndSwap"}, + prevNodeValue: api.EncodeOrDie(podBar), + nodeValue: api.EncodeOrDie(podFoo), + expectEmit: true, + expectType: watch.Deleted, + expectObject: podBar, // Should return last state that passed the filter! + }, + "modify modifies": { + actions: []string{"set", "compareAndSwap"}, + prevNodeValue: api.EncodeOrDie(podBar), + nodeValue: api.EncodeOrDie(podBaz), + expectEmit: true, + expectType: watch.Modified, + expectObject: podBaz, + }, + "modify ignores": { + actions: []string{"set", "compareAndSwap"}, + nodeValue: api.EncodeOrDie(podFoo), + expectEmit: false, + }, + } + + for name, item := range table { + for _, action := range item.actions { + w := newEtcdWatcher(true, firstLetterIsB, codec, versioner, nil) + emitCalled := false + w.emit = func(event watch.Event) { + emitCalled = true + if !item.expectEmit { + return + } + if e, a := item.expectType, event.Type; e != a { + t.Errorf("'%v - %v': expected %v, got %v", name, action, e, a) + } + if e, a := item.expectObject, event.Object; !reflect.DeepEqual(e, a) { + t.Errorf("'%v - %v': expected %v, got %v", name, action, e, a) + } + } + + var n, pn *etcd.Node + if item.nodeValue != "" { + n = &etcd.Node{Value: item.nodeValue} + } + if item.prevNodeValue != "" { + pn = &etcd.Node{Value: item.prevNodeValue} + } + + w.sendResult(&etcd.Response{ + Action: action, + Node: n, + PrevNode: pn, + }) + + if e, a := item.expectEmit, emitCalled; e != a { + t.Errorf("'%v - %v': expected %v, got %v", name, action, e, a) + } + w.Stop() + } + } +} + +func TestWatchInterpretation_ResponseNotSet(t *testing.T) { + w := newEtcdWatcher(false, Everything, codec, versioner, nil) + w.emit = func(e watch.Event) { + t.Errorf("Unexpected emit: %v", e) + } + + w.sendResult(&etcd.Response{ + Action: "update", + }) + w.Stop() +} + +func TestWatchInterpretation_ResponseNoNode(t *testing.T) { + actions := []string{"create", "set", "compareAndSwap", "delete"} + for _, action := range actions { + w := newEtcdWatcher(false, Everything, codec, versioner, nil) + w.emit = func(e watch.Event) { + t.Errorf("Unexpected emit: %v", e) + } + w.sendResult(&etcd.Response{ + Action: action, + }) + w.Stop() + } +} + +func TestWatchInterpretation_ResponseBadData(t *testing.T) { + actions := []string{"create", "set", "compareAndSwap", "delete"} + for _, action := range actions { + w := newEtcdWatcher(false, Everything, codec, versioner, nil) + w.emit = func(e watch.Event) { + t.Errorf("Unexpected emit: %v", e) + } + w.sendResult(&etcd.Response{ + Action: action, + Node: &etcd.Node{ + Value: "foobar", + }, + }) + w.sendResult(&etcd.Response{ + Action: action, + PrevNode: &etcd.Node{ + Value: "foobar", + }, + }) + w.Stop() + } +} + +func TestWatch(t *testing.T) { + fakeClient := NewFakeEtcdClient(t) + fakeClient.expectNotFoundGetSet["/some/key"] = struct{}{} + h := EtcdHelper{fakeClient, codec, versioner} + + watching, err := h.Watch("/some/key", 0) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + fakeClient.WaitForWatchCompletion() + // when server returns not found, the watch index starts at the next value (1) + if fakeClient.WatchIndex != 1 { + t.Errorf("Expected client to be at index %d, got %#v", 1, fakeClient) + } + + // Test normal case + pod := &api.Pod{JSONBase: api.JSONBase{ID: "foo"}} + podBytes, _ := codec.Encode(pod) + fakeClient.WatchResponse <- &etcd.Response{ + Action: "set", + Node: &etcd.Node{ + Value: string(podBytes), + }, + } + + event := <-watching.ResultChan() + if e, a := watch.Added, event.Type; e != a { + t.Errorf("Expected %v, got %v", e, a) + } + if e, a := pod, event.Object; !reflect.DeepEqual(e, a) { + t.Errorf("Expected %v, got %v", e, a) + } + + // Test error case + fakeClient.WatchInjectError <- fmt.Errorf("Injected error") + + // Did everything shut down? + if _, open := <-fakeClient.WatchResponse; open { + t.Errorf("An injected error did not cause a graceful shutdown") + } + if _, open := <-watching.ResultChan(); open { + t.Errorf("An injected error did not cause a graceful shutdown") + } +} + +func TestWatchFromZeroIndex(t *testing.T) { + pod := &api.Pod{JSONBase: api.JSONBase{ID: "foo"}} + + testCases := map[string]struct { + Response EtcdResponseWithError + ExpectedVersion uint64 + ExpectedType watch.EventType + }{ + "get value created": { + EtcdResponseWithError{ + R: &etcd.Response{ + Node: &etcd.Node{ + Value: api.EncodeOrDie(pod), + CreatedIndex: 1, + ModifiedIndex: 1, + }, + Action: "get", + EtcdIndex: 2, + }, + }, + 1, + watch.Added, + }, + "get value modified": { + EtcdResponseWithError{ + R: &etcd.Response{ + Node: &etcd.Node{ + Value: api.EncodeOrDie(pod), + CreatedIndex: 1, + ModifiedIndex: 2, + }, + Action: "get", + EtcdIndex: 3, + }, + }, + 2, + watch.Modified, + }, + } + + for k, testCase := range testCases { + fakeClient := NewFakeEtcdClient(t) + fakeClient.Data["/some/key"] = testCase.Response + h := EtcdHelper{fakeClient, codec, versioner} + + watching, err := h.Watch("/some/key", 0) + if err != nil { + t.Fatalf("%s: unexpected error: %v", k, err) + } + + fakeClient.WaitForWatchCompletion() + if e, a := testCase.Response.R.EtcdIndex+1, fakeClient.WatchIndex; e != a { + t.Errorf("%s: expected watch index to be %d, got %d", k, e, a) + } + + // the existing node is detected and the index set + event := <-watching.ResultChan() + if e, a := testCase.ExpectedType, event.Type; e != a { + t.Errorf("%s: expected %v, got %v", k, e, a) + } + actualPod, ok := event.Object.(*api.Pod) + if !ok { + t.Fatalf("%s: expected a pod, got %#v", k, event.Object) + } + if actualPod.ResourceVersion != testCase.ExpectedVersion { + t.Errorf("%s: expected pod with resource version %d, Got %#v", k, testCase.ExpectedVersion, actualPod) + } + pod.ResourceVersion = testCase.ExpectedVersion + if e, a := pod, event.Object; !reflect.DeepEqual(e, a) { + t.Errorf("%s: expected %v, got %v", k, e, a) + } + watching.Stop() + } +} + +func TestWatchListFromZeroIndex(t *testing.T) { + pod := &api.Pod{JSONBase: api.JSONBase{ID: "foo"}} + + fakeClient := NewFakeEtcdClient(t) + fakeClient.Data["/some/key"] = EtcdResponseWithError{ + R: &etcd.Response{ + Node: &etcd.Node{ + Dir: true, + Nodes: etcd.Nodes{ + &etcd.Node{ + Value: api.EncodeOrDie(pod), + CreatedIndex: 1, + ModifiedIndex: 1, + Nodes: etcd.Nodes{}, + }, + &etcd.Node{ + Value: api.EncodeOrDie(pod), + CreatedIndex: 2, + ModifiedIndex: 2, + Nodes: etcd.Nodes{}, + }, + }, + }, + Action: "get", + EtcdIndex: 3, + }, + } + h := EtcdHelper{fakeClient, codec, versioner} + + watching, err := h.WatchList("/some/key", 0, Everything) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + // the existing node is detected and the index set + event, open := <-watching.ResultChan() + if !open { + t.Fatalf("unexpected channel close") + } + for i := 0; i < 2; i++ { + if e, a := watch.Added, event.Type; e != a { + t.Errorf("Expected %v, got %v", e, a) + } + actualPod, ok := event.Object.(*api.Pod) + if !ok { + t.Fatalf("expected a pod, got %#v", event.Object) + } + if actualPod.ResourceVersion != 1 { + t.Errorf("Expected pod with resource version %d, Got %#v", 1, actualPod) + } + pod.ResourceVersion = 1 + if e, a := pod, event.Object; !reflect.DeepEqual(e, a) { + t.Errorf("Expected %v, got %v", e, a) + } + } + + fakeClient.WaitForWatchCompletion() + watching.Stop() +} + +func TestWatchFromNotFound(t *testing.T) { + fakeClient := NewFakeEtcdClient(t) + fakeClient.Data["/some/key"] = EtcdResponseWithError{ + R: &etcd.Response{ + Node: nil, + }, + E: &etcd.EtcdError{ + Index: 2, + ErrorCode: 100, + }, + } + h := EtcdHelper{fakeClient, codec, versioner} + + watching, err := h.Watch("/some/key", 0) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + fakeClient.WaitForWatchCompletion() + if fakeClient.WatchIndex != 3 { + t.Errorf("Expected client to wait for %d, got %#v", 3, fakeClient) + } + + watching.Stop() +} + +func TestWatchFromOtherError(t *testing.T) { + fakeClient := NewFakeEtcdClient(t) + fakeClient.Data["/some/key"] = EtcdResponseWithError{ + R: &etcd.Response{ + Node: nil, + }, + E: &etcd.EtcdError{ + Index: 2, + ErrorCode: 101, + }, + } + h := EtcdHelper{fakeClient, codec, versioner} + + watching, err := h.Watch("/some/key", 0) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + select { + case _, ok := <-watching.ResultChan(): + if ok { + t.Fatalf("expected result channel to be closed") + } + case <-time.After(1 * time.Millisecond): + t.Fatalf("watch should have closed channel: %#v", watching) + } + + if fakeClient.WatchResponse != nil || fakeClient.WatchIndex != 0 { + t.Fatalf("Watch should not have been invoked: %#v", fakeClient) + } +} + +func TestWatchPurposefulShutdown(t *testing.T) { + fakeClient := NewFakeEtcdClient(t) + h := EtcdHelper{fakeClient, codec, versioner} + fakeClient.expectNotFoundGetSet["/some/key"] = struct{}{} + + // Test purposeful shutdown + watching, err := h.Watch("/some/key", 0) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + fakeClient.WaitForWatchCompletion() + watching.Stop() + + // Did everything shut down? + if _, open := <-fakeClient.WatchResponse; open { + t.Errorf("A stop did not cause a graceful shutdown") + } + if _, open := <-watching.ResultChan(); open { + t.Errorf("An injected error did not cause a graceful shutdown") + } +} diff --git a/test/integration/etcd_tools_test.go b/test/integration/etcd_tools_test.go index 0a4c1e497e4..1e0b4eb9cbf 100644 --- a/test/integration/etcd_tools_test.go +++ b/test/integration/etcd_tools_test.go @@ -125,7 +125,7 @@ func TestWatch(t *testing.T) { expectedVersion = resp.Node.ModifiedIndex event = <-w.ResultChan() if event.Type != watch.Deleted { - t.Fatalf("expected deleted event", event) + t.Errorf("expected deleted event %#v", event) } pod = event.Object.(*api.Pod) if pod.ResourceVersion != expectedVersion {