From 265f889260c6621e0a1e53b3c7b17d00f33a65f2 Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Mon, 22 Sep 2014 16:12:32 -0700 Subject: [PATCH] convert etcd errors to watch errors --- pkg/tools/etcd_tools_watch.go | 86 +++++++++++++++--------------- pkg/tools/etcd_tools_watch_test.go | 62 ++++++++++++--------- 2 files changed, 78 insertions(+), 70 deletions(-) diff --git a/pkg/tools/etcd_tools_watch.go b/pkg/tools/etcd_tools_watch.go index 6fd11a75a98..6f0ada02519 100644 --- a/pkg/tools/etcd_tools_watch.go +++ b/pkg/tools/etcd_tools_watch.go @@ -20,6 +20,7 @@ import ( "sync" "time" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" @@ -48,7 +49,8 @@ func (h *EtcdHelper) WatchList(key string, resourceVersion uint64, filter Filter // Watch begins watching the specified key. Events are decoded into // API objects and sent down the returned watch.Interface. -func (h *EtcdHelper) Watch(key string, resourceVersion uint64) (watch.Interface, error) { +// Errors will be sent down the channel. +func (h *EtcdHelper) Watch(key string, resourceVersion uint64) watch.Interface { return h.WatchAndTransform(key, resourceVersion, nil) } @@ -67,10 +69,11 @@ func (h *EtcdHelper) Watch(key string, resourceVersion uint64) (watch.Interface, // return value, nil // }) // -func (h *EtcdHelper) WatchAndTransform(key string, resourceVersion uint64, transform TransformFunc) (watch.Interface, error) { +// Errors will be sent down the channel. +func (h *EtcdHelper) WatchAndTransform(key string, resourceVersion uint64, transform TransformFunc) watch.Interface { w := newEtcdWatcher(false, Everything, h.Codec, h.ResourceVersioner, transform) go w.etcdWatch(h.Client, key, resourceVersion) - return w, <-w.immediateError + return w } // TransformFunc attempts to convert an object to another object for use with a watcher. @@ -86,14 +89,10 @@ type etcdWatcher struct { filter FilterFunc etcdIncoming chan *etcd.Response + etcdError chan error etcdStop chan bool etcdCallEnded chan struct{} - // etcdWatch will send an error down this channel if the Watch fails. - // Otherwise, a nil will be sent down this channel watchWaitDuration - // after the watch starts. - immediateError chan error - outgoing chan watch.Event userStop chan struct{} stopped bool @@ -110,17 +109,16 @@ const watchWaitDuration = 100 * time.Millisecond // and a versioner, the versioner must be able to handle the objects that transform creates. func newEtcdWatcher(list bool, filter FilterFunc, encoding runtime.Codec, versioner runtime.ResourceVersioner, transform TransformFunc) *etcdWatcher { w := &etcdWatcher{ - encoding: encoding, - versioner: versioner, - transform: transform, - list: list, - filter: filter, - etcdIncoming: make(chan *etcd.Response), - etcdStop: make(chan bool), - etcdCallEnded: make(chan struct{}), - immediateError: make(chan error), - outgoing: make(chan watch.Event), - userStop: make(chan struct{}), + encoding: encoding, + versioner: versioner, + transform: transform, + list: list, + filter: filter, + etcdIncoming: make(chan *etcd.Response), + etcdError: make(chan error, 1), + etcdStop: make(chan bool), + outgoing: make(chan watch.Event), + userStop: make(chan struct{}), } w.emit = func(e watch.Event) { w.outgoing <- e } go w.translate() @@ -128,46 +126,36 @@ func newEtcdWatcher(list bool, filter FilterFunc, encoding runtime.Codec, versio } // etcdWatch calls etcd's Watch function, and handles any errors. Meant to be called -// as a goroutine. Will either send an error over w.immediateError if Watch fails, or in 100ms will +// as a goroutine. func (w *etcdWatcher) etcdWatch(client EtcdGetSet, key string, resourceVersion uint64) { defer util.HandleCrash() - defer close(w.etcdCallEnded) - go func() { - // This is racy; assume that Watch will fail within 100ms if it is going to fail. - // It's still more useful than blocking until the first result shows up. - // Trying to detect the 401: watch window expired error. - <-time.After(watchWaitDuration) - w.immediateError <- nil - }() + defer close(w.etcdError) if resourceVersion == 0 { - latest, ok := etcdGetInitialWatchState(client, key, w.list, w.etcdIncoming) - if !ok { + latest, err := etcdGetInitialWatchState(client, key, w.list, w.etcdIncoming) + if err != nil { + w.etcdError <- err return } resourceVersion = latest + 1 } _, err := client.Watch(key, resourceVersion, w.list, w.etcdIncoming, w.etcdStop) - if err != etcd.ErrWatchStoppedByUser { - glog.Errorf("etcd.Watch stopped unexpectedly: %v (%#v)", err, key) - w.immediateError <- err + if err != nil && err != etcd.ErrWatchStoppedByUser { + w.etcdError <- err } } // etcdGetInitialWatchState turns an etcd Get request into a watch equivalent -func etcdGetInitialWatchState(client EtcdGetSet, key string, recursive bool, incoming chan<- *etcd.Response) (resourceVersion uint64, success bool) { - success = true - +func etcdGetInitialWatchState(client EtcdGetSet, key string, recursive bool, incoming chan<- *etcd.Response) (resourceVersion uint64, err error) { resp, err := client.Get(key, false, recursive) if err != nil { if !IsEtcdNotFound(err) { glog.Errorf("watch was unable to retrieve the current index for the provided key: %v (%#v)", err, key) - success = false - return + return resourceVersion, err } if index, ok := etcdErrorIndex(err); ok { resourceVersion = index } - return + return resourceVersion, nil } resourceVersion = resp.EtcdIndex convertRecursiveResponse(resp.Node, resp, incoming) @@ -189,7 +177,7 @@ func convertRecursiveResponse(node *etcd.Node, response *etcd.Response, incoming incoming <- &copied } -// translate pulls stuff from etcd, convert, and push out the outgoing channel. Meant to be +// translate pulls stuff from etcd, converts, and pushes out the outgoing channel. Meant to be // called as a goroutine. func (w *etcdWatcher) translate() { defer close(w.outgoing) @@ -197,16 +185,26 @@ func (w *etcdWatcher) translate() { for { select { - case <-w.etcdCallEnded: + case err := <-w.etcdError: + if err != nil { + w.emit(watch.Event{ + watch.Error, + &api.Status{ + Status: api.StatusFailure, + Message: err.Error(), + }, + }) + } return case <-w.userStop: w.etcdStop <- true return case res, ok := <-w.etcdIncoming: - if !ok { - return + if ok { + w.sendResult(res) } - w.sendResult(res) + // If !ok, don't return here-- must wait for etcdError channel + // to give an error or be closed. } } } diff --git a/pkg/tools/etcd_tools_watch_test.go b/pkg/tools/etcd_tools_watch_test.go index 24f7a71722e..791c0f0fa53 100644 --- a/pkg/tools/etcd_tools_watch_test.go +++ b/pkg/tools/etcd_tools_watch_test.go @@ -205,10 +205,20 @@ func TestWatchEtcdError(t *testing.T) { fakeClient.WatchImmediateError = fmt.Errorf("immediate error") h := EtcdHelper{fakeClient, codec, versioner} - _, err := h.Watch("/some/key", 0) - if err == nil { + got := <-h.Watch("/some/key", 4).ResultChan() + if got.Type != watch.Error { t.Fatalf("Unexpected non-error") } + status, ok := got.Object.(*api.Status) + if !ok { + t.Fatalf("Unexpected non-error object type") + } + if status.Message != "immediate error" { + t.Errorf("Unexpected wrong error") + } + if status.Status != api.StatusFailure { + t.Errorf("Unexpected wrong error status") + } } func TestWatch(t *testing.T) { @@ -217,10 +227,7 @@ func TestWatch(t *testing.T) { fakeClient.expectNotFoundGetSet["/some/key"] = struct{}{} h := EtcdHelper{fakeClient, codec, versioner} - watching, err := h.Watch("/some/key", 0) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } + watching := h.Watch("/some/key", 0) fakeClient.WaitForWatchCompletion() // when server returns not found, the watch index starts at the next value (1) @@ -249,6 +256,17 @@ func TestWatch(t *testing.T) { // Test error case fakeClient.WatchInjectError <- fmt.Errorf("Injected error") + if errEvent, ok := <-watching.ResultChan(); !ok { + t.Errorf("no error result?") + } else { + if e, a := watch.Error, errEvent.Type; e != a { + t.Errorf("Expected %v, got %v", e, a) + } + if e, a := "Injected error", errEvent.Object.(*api.Status).Message; e != a { + t.Errorf("Expected %v, got %v", e, a) + } + } + // Did everything shut down? if _, open := <-fakeClient.WatchResponse; open { t.Errorf("An injected error did not cause a graceful shutdown") @@ -349,11 +367,7 @@ func TestWatchEtcdState(t *testing.T) { fakeClient.Data[key] = value } h := EtcdHelper{fakeClient, codec, versioner} - watching, err := h.Watch("/somekey/foo", testCase.From) - if err != nil { - t.Errorf("%s: unexpected error: %v", k, err) - continue - } + watching := h.Watch("/somekey/foo", testCase.From) fakeClient.WaitForWatchCompletion() t.Logf("Testing %v", k) @@ -421,10 +435,7 @@ func TestWatchFromZeroIndex(t *testing.T) { fakeClient.Data["/some/key"] = testCase.Response h := EtcdHelper{fakeClient, codec, versioner} - watching, err := h.Watch("/some/key", 0) - if err != nil { - t.Fatalf("%s: unexpected error: %v", k, err) - } + watching := h.Watch("/some/key", 0) fakeClient.WaitForWatchCompletion() if e, a := testCase.Response.R.EtcdIndex+1, fakeClient.WatchIndex; e != a { @@ -525,10 +536,7 @@ func TestWatchFromNotFound(t *testing.T) { } h := EtcdHelper{fakeClient, codec, versioner} - watching, err := h.Watch("/some/key", 0) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } + watching := h.Watch("/some/key", 0) fakeClient.WaitForWatchCompletion() if fakeClient.WatchIndex != 3 { @@ -551,9 +559,14 @@ func TestWatchFromOtherError(t *testing.T) { } h := EtcdHelper{fakeClient, codec, versioner} - watching, err := h.Watch("/some/key", 0) - if err != nil { - t.Fatalf("Unexpected error: %v", err) + watching := h.Watch("/some/key", 0) + + errEvent := <-watching.ResultChan() + if e, a := watch.Error, errEvent.Type; e != a { + t.Errorf("Expected %v, got %v", e, a) + } + if e, a := "101: () [2]", errEvent.Object.(*api.Status).Message; e != a { + t.Errorf("Expected %v, got %v", e, a) } select { @@ -576,10 +589,7 @@ func TestWatchPurposefulShutdown(t *testing.T) { fakeClient.expectNotFoundGetSet["/some/key"] = struct{}{} // Test purposeful shutdown - watching, err := h.Watch("/some/key", 0) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } + watching := h.Watch("/some/key", 0) fakeClient.WaitForWatchCompletion() watching.Stop()