From 0d69393a43e25437511366c510ab6becbfa2f3ff Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Fri, 19 Sep 2014 17:53:55 -0700 Subject: [PATCH 1/2] make watch actually return an error when there's an error --- pkg/tools/etcd_tools_watch.go | 42 +++++++++++++++++++++--------- pkg/tools/etcd_tools_watch_test.go | 13 +++++++++ pkg/tools/fake_etcd_client.go | 5 ++++ 3 files changed, 48 insertions(+), 12 deletions(-) diff --git a/pkg/tools/etcd_tools_watch.go b/pkg/tools/etcd_tools_watch.go index f8a4d9f845f..6fd11a75a98 100644 --- a/pkg/tools/etcd_tools_watch.go +++ b/pkg/tools/etcd_tools_watch.go @@ -18,6 +18,7 @@ package tools import ( "sync" + "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" @@ -69,7 +70,7 @@ func (h *EtcdHelper) Watch(key string, resourceVersion uint64) (watch.Interface, func (h *EtcdHelper) WatchAndTransform(key string, resourceVersion uint64, transform TransformFunc) (watch.Interface, error) { w := newEtcdWatcher(false, Everything, h.Codec, h.ResourceVersioner, transform) go w.etcdWatch(h.Client, key, resourceVersion) - return w, nil + return w, <-w.immediateError } // TransformFunc attempts to convert an object to another object for use with a watcher. @@ -88,6 +89,11 @@ type etcdWatcher struct { etcdStop chan bool etcdCallEnded chan struct{} + // etcdWatch will send an error down this channel if the Watch fails. + // Otherwise, a nil will be sent down this channel watchWaitDuration + // after the watch starts. + immediateError chan error + outgoing chan watch.Event userStop chan struct{} stopped bool @@ -97,20 +103,24 @@ type etcdWatcher struct { emit func(watch.Event) } +// watchWaitDuration is the amount of time to wait for an error from watch. +const watchWaitDuration = 100 * time.Millisecond + // newEtcdWatcher returns a new etcdWatcher; if list is true, watch sub-nodes. If you provide a transform // and a versioner, the versioner must be able to handle the objects that transform creates. func newEtcdWatcher(list bool, filter FilterFunc, encoding runtime.Codec, versioner runtime.ResourceVersioner, transform TransformFunc) *etcdWatcher { w := &etcdWatcher{ - encoding: encoding, - versioner: versioner, - transform: transform, - list: list, - filter: filter, - etcdIncoming: make(chan *etcd.Response), - etcdStop: make(chan bool), - etcdCallEnded: make(chan struct{}), - outgoing: make(chan watch.Event), - userStop: make(chan struct{}), + encoding: encoding, + versioner: versioner, + transform: transform, + list: list, + filter: filter, + etcdIncoming: make(chan *etcd.Response), + etcdStop: make(chan bool), + etcdCallEnded: make(chan struct{}), + immediateError: make(chan error), + outgoing: make(chan watch.Event), + userStop: make(chan struct{}), } w.emit = func(e watch.Event) { w.outgoing <- e } go w.translate() @@ -118,10 +128,17 @@ func newEtcdWatcher(list bool, filter FilterFunc, encoding runtime.Codec, versio } // etcdWatch calls etcd's Watch function, and handles any errors. Meant to be called -// as a goroutine. +// as a goroutine. Will either send an error over w.immediateError if Watch fails, or in 100ms will func (w *etcdWatcher) etcdWatch(client EtcdGetSet, key string, resourceVersion uint64) { defer util.HandleCrash() defer close(w.etcdCallEnded) + go func() { + // This is racy; assume that Watch will fail within 100ms if it is going to fail. + // It's still more useful than blocking until the first result shows up. + // Trying to detect the 401: watch window expired error. + <-time.After(watchWaitDuration) + w.immediateError <- nil + }() if resourceVersion == 0 { latest, ok := etcdGetInitialWatchState(client, key, w.list, w.etcdIncoming) if !ok { @@ -132,6 +149,7 @@ func (w *etcdWatcher) etcdWatch(client EtcdGetSet, key string, resourceVersion u _, err := client.Watch(key, resourceVersion, w.list, w.etcdIncoming, w.etcdStop) if err != etcd.ErrWatchStoppedByUser { glog.Errorf("etcd.Watch stopped unexpectedly: %v (%#v)", err, key) + w.immediateError <- err } } diff --git a/pkg/tools/etcd_tools_watch_test.go b/pkg/tools/etcd_tools_watch_test.go index 67af1e28f35..24f7a71722e 100644 --- a/pkg/tools/etcd_tools_watch_test.go +++ b/pkg/tools/etcd_tools_watch_test.go @@ -198,6 +198,19 @@ func TestWatchInterpretation_ResponseBadData(t *testing.T) { } } +func TestWatchEtcdError(t *testing.T) { + codec := latest.Codec + fakeClient := NewFakeEtcdClient(t) + fakeClient.expectNotFoundGetSet["/some/key"] = struct{}{} + fakeClient.WatchImmediateError = fmt.Errorf("immediate error") + h := EtcdHelper{fakeClient, codec, versioner} + + _, err := h.Watch("/some/key", 0) + if err == nil { + t.Fatalf("Unexpected non-error") + } +} + func TestWatch(t *testing.T) { codec := latest.Codec fakeClient := NewFakeEtcdClient(t) diff --git a/pkg/tools/fake_etcd_client.go b/pkg/tools/fake_etcd_client.go index 96293a2670c..0f3c078f7a5 100644 --- a/pkg/tools/fake_etcd_client.go +++ b/pkg/tools/fake_etcd_client.go @@ -57,6 +57,8 @@ type FakeEtcdClient struct { // Write to this to prematurely stop a Watch that is running in a goroutine. WatchInjectError chan<- error WatchStop chan<- bool + // If non-nil, will be returned immediately when Watch is called. + WatchImmediateError error } func NewFakeEtcdClient(t TestLogger) *FakeEtcdClient { @@ -250,6 +252,9 @@ func (f *FakeEtcdClient) WaitForWatchCompletion() { } func (f *FakeEtcdClient) Watch(prefix string, waitIndex uint64, recursive bool, receiver chan *etcd.Response, stop chan bool) (*etcd.Response, error) { + if f.WatchImmediateError != nil { + return nil, f.WatchImmediateError + } f.WatchResponse = receiver f.WatchStop = stop f.WatchIndex = waitIndex From 4d7c6e2657a04d581851cd00f972451c906eb2b6 Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Fri, 19 Sep 2014 18:09:40 -0700 Subject: [PATCH 2/2] Treat super short watches with no items received the same as watches that error immediately --- pkg/client/cache/reflector.go | 22 ++++++++++++++++++---- pkg/client/cache/reflector_test.go | 19 ++++++++++++++++++- 2 files changed, 36 insertions(+), 5 deletions(-) diff --git a/pkg/client/cache/reflector.go b/pkg/client/cache/reflector.go index 1e309975048..72ceb7b88ab 100644 --- a/pkg/client/cache/reflector.go +++ b/pkg/client/cache/reflector.go @@ -17,6 +17,7 @@ limitations under the License. package cache import ( + "errors" "fmt" "reflect" "time" @@ -99,7 +100,10 @@ func (r *Reflector) listAndWatch() { glog.Errorf("failed to watch %v: %v", r.expectedType, err) return } - r.watchHandler(w, &resourceVersion) + if err := r.watchHandler(w, &resourceVersion); err != nil { + glog.Errorf("failed to watch %v: %v", r.expectedType, err) + return + } } } @@ -119,12 +123,13 @@ func (r *Reflector) syncWith(items []runtime.Object) error { } // watchHandler watches w and keeps *resourceVersion up to date. -func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *uint64) { +func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *uint64) error { + start := time.Now() + eventCount := 0 for { event, ok := <-w.ResultChan() if !ok { - glog.Errorf("unexpected watch close") - return + break } if e, a := r.expectedType, reflect.TypeOf(event.Object); e != a { glog.Errorf("expected type %v, but watch event object had type %v", e, a) @@ -149,5 +154,14 @@ func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *uint64) { glog.Errorf("unable to understand watch event %#v", event) } *resourceVersion = jsonBase.ResourceVersion() + 1 + eventCount++ } + + watchDuration := time.Now().Sub(start) + if watchDuration < 1*time.Second && eventCount == 0 { + glog.Errorf("unexpected watch close - watch lasted less than a second and no items received") + return errors.New("very short watch") + } + glog.Infof("unexpected watch close - %v total items received", eventCount) + return nil } diff --git a/pkg/client/cache/reflector_test.go b/pkg/client/cache/reflector_test.go index 60963d793af..df706c3b1e6 100644 --- a/pkg/client/cache/reflector_test.go +++ b/pkg/client/cache/reflector_test.go @@ -35,6 +35,20 @@ func (t *testLW) Watch(resourceVersion uint64) (watch.Interface, error) { return t.WatchFunc(resourceVersion) } +func TestReflector_watchHandlerError(t *testing.T) { + s := NewStore() + g := NewReflector(&testLW{}, &api.Pod{}, s) + fw := watch.NewFake() + go func() { + fw.Stop() + }() + var resumeRV uint64 + err := g.watchHandler(fw, &resumeRV) + if err == nil { + t.Errorf("unexpected non-error") + } +} + func TestReflector_watchHandler(t *testing.T) { s := NewStore() g := NewReflector(&testLW{}, &api.Pod{}, s) @@ -49,7 +63,10 @@ func TestReflector_watchHandler(t *testing.T) { fw.Stop() }() var resumeRV uint64 - g.watchHandler(fw, &resumeRV) + err := g.watchHandler(fw, &resumeRV) + if err != nil { + t.Errorf("unexpected error %v", err) + } table := []struct { ID string