diff --git a/pkg/tools/etcd_tools_watch.go b/pkg/tools/etcd_tools_watch.go index f8a4d9f845f..6fd11a75a98 100644 --- a/pkg/tools/etcd_tools_watch.go +++ b/pkg/tools/etcd_tools_watch.go @@ -18,6 +18,7 @@ package tools import ( "sync" + "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" @@ -69,7 +70,7 @@ func (h *EtcdHelper) Watch(key string, resourceVersion uint64) (watch.Interface, func (h *EtcdHelper) WatchAndTransform(key string, resourceVersion uint64, transform TransformFunc) (watch.Interface, error) { w := newEtcdWatcher(false, Everything, h.Codec, h.ResourceVersioner, transform) go w.etcdWatch(h.Client, key, resourceVersion) - return w, nil + return w, <-w.immediateError } // TransformFunc attempts to convert an object to another object for use with a watcher. @@ -88,6 +89,11 @@ type etcdWatcher struct { etcdStop chan bool etcdCallEnded chan struct{} + // etcdWatch will send an error down this channel if the Watch fails. + // Otherwise, a nil will be sent down this channel watchWaitDuration + // after the watch starts. + immediateError chan error + outgoing chan watch.Event userStop chan struct{} stopped bool @@ -97,20 +103,24 @@ type etcdWatcher struct { emit func(watch.Event) } +// watchWaitDuration is the amount of time to wait for an error from watch. +const watchWaitDuration = 100 * time.Millisecond + // newEtcdWatcher returns a new etcdWatcher; if list is true, watch sub-nodes. If you provide a transform // and a versioner, the versioner must be able to handle the objects that transform creates. func newEtcdWatcher(list bool, filter FilterFunc, encoding runtime.Codec, versioner runtime.ResourceVersioner, transform TransformFunc) *etcdWatcher { w := &etcdWatcher{ - encoding: encoding, - versioner: versioner, - transform: transform, - list: list, - filter: filter, - etcdIncoming: make(chan *etcd.Response), - etcdStop: make(chan bool), - etcdCallEnded: make(chan struct{}), - outgoing: make(chan watch.Event), - userStop: make(chan struct{}), + encoding: encoding, + versioner: versioner, + transform: transform, + list: list, + filter: filter, + etcdIncoming: make(chan *etcd.Response), + etcdStop: make(chan bool), + etcdCallEnded: make(chan struct{}), + immediateError: make(chan error), + outgoing: make(chan watch.Event), + userStop: make(chan struct{}), } w.emit = func(e watch.Event) { w.outgoing <- e } go w.translate() @@ -118,10 +128,17 @@ func newEtcdWatcher(list bool, filter FilterFunc, encoding runtime.Codec, versio } // etcdWatch calls etcd's Watch function, and handles any errors. Meant to be called -// as a goroutine. +// as a goroutine. Will either send an error over w.immediateError if Watch fails, or in 100ms will func (w *etcdWatcher) etcdWatch(client EtcdGetSet, key string, resourceVersion uint64) { defer util.HandleCrash() defer close(w.etcdCallEnded) + go func() { + // This is racy; assume that Watch will fail within 100ms if it is going to fail. + // It's still more useful than blocking until the first result shows up. + // Trying to detect the 401: watch window expired error. + <-time.After(watchWaitDuration) + w.immediateError <- nil + }() if resourceVersion == 0 { latest, ok := etcdGetInitialWatchState(client, key, w.list, w.etcdIncoming) if !ok { @@ -132,6 +149,7 @@ func (w *etcdWatcher) etcdWatch(client EtcdGetSet, key string, resourceVersion u _, err := client.Watch(key, resourceVersion, w.list, w.etcdIncoming, w.etcdStop) if err != etcd.ErrWatchStoppedByUser { glog.Errorf("etcd.Watch stopped unexpectedly: %v (%#v)", err, key) + w.immediateError <- err } } diff --git a/pkg/tools/etcd_tools_watch_test.go b/pkg/tools/etcd_tools_watch_test.go index 67af1e28f35..24f7a71722e 100644 --- a/pkg/tools/etcd_tools_watch_test.go +++ b/pkg/tools/etcd_tools_watch_test.go @@ -198,6 +198,19 @@ func TestWatchInterpretation_ResponseBadData(t *testing.T) { } } +func TestWatchEtcdError(t *testing.T) { + codec := latest.Codec + fakeClient := NewFakeEtcdClient(t) + fakeClient.expectNotFoundGetSet["/some/key"] = struct{}{} + fakeClient.WatchImmediateError = fmt.Errorf("immediate error") + h := EtcdHelper{fakeClient, codec, versioner} + + _, err := h.Watch("/some/key", 0) + if err == nil { + t.Fatalf("Unexpected non-error") + } +} + func TestWatch(t *testing.T) { codec := latest.Codec fakeClient := NewFakeEtcdClient(t) diff --git a/pkg/tools/fake_etcd_client.go b/pkg/tools/fake_etcd_client.go index 96293a2670c..0f3c078f7a5 100644 --- a/pkg/tools/fake_etcd_client.go +++ b/pkg/tools/fake_etcd_client.go @@ -57,6 +57,8 @@ type FakeEtcdClient struct { // Write to this to prematurely stop a Watch that is running in a goroutine. WatchInjectError chan<- error WatchStop chan<- bool + // If non-nil, will be returned immediately when Watch is called. + WatchImmediateError error } func NewFakeEtcdClient(t TestLogger) *FakeEtcdClient { @@ -250,6 +252,9 @@ func (f *FakeEtcdClient) WaitForWatchCompletion() { } func (f *FakeEtcdClient) Watch(prefix string, waitIndex uint64, recursive bool, receiver chan *etcd.Response, stop chan bool) (*etcd.Response, error) { + if f.WatchImmediateError != nil { + return nil, f.WatchImmediateError + } f.WatchResponse = receiver f.WatchStop = stop f.WatchIndex = waitIndex