From 05eff2e910b2ade853d04723fa362a230b11eba4 Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Mon, 22 Sep 2014 16:11:43 -0700 Subject: [PATCH 1/4] add new watch event type --- pkg/watch/watch.go | 13 +++++++++++-- pkg/watch/watch_test.go | 2 ++ 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/pkg/watch/watch.go b/pkg/watch/watch.go index bed7163f144..e70ae15e93f 100644 --- a/pkg/watch/watch.go +++ b/pkg/watch/watch.go @@ -41,14 +41,18 @@ const ( Added EventType = "ADDED" Modified EventType = "MODIFIED" Deleted EventType = "DELETED" + Error EventType = "ERROR" ) // Event represents a single event to a watched resource. type Event struct { Type EventType - // If Type == Deleted, then this is the state of the object - // immediately before deletion. + // Object is: + // * If Type is Added or Modified: the new state of the object. + // * If Type is Deleted: the state of the object immediately before deletion. + // * If Type is Error: *api.Status is recommended; other types may make sense + // depending on context. Object runtime.Object } @@ -94,6 +98,11 @@ func (f *FakeWatcher) Delete(lastValue runtime.Object) { f.result <- Event{Deleted, lastValue} } +// Error sends an Error event. +func (f *FakeWatcher) Error(errValue runtime.Object) { + f.result <- Event{Error, errValue} +} + // Action sends an event of the requested type, for table-based testing. func (f *FakeWatcher) Action(action EventType, obj runtime.Object) { f.result <- Event{action, obj} diff --git a/pkg/watch/watch_test.go b/pkg/watch/watch_test.go index a44bad88aff..2ab8b4d2c5e 100644 --- a/pkg/watch/watch_test.go +++ b/pkg/watch/watch_test.go @@ -35,6 +35,7 @@ func TestFake(t *testing.T) { {Modified, testType("qux")}, {Modified, testType("bar")}, {Deleted, testType("bar")}, + {Error, testType("error: blah")}, } // Prove that f implements Interface by phrasing this as a function. @@ -62,6 +63,7 @@ func TestFake(t *testing.T) { f.Action(Modified, testType("qux")) f.Modify(testType("bar")) f.Delete(testType("bar")) + f.Error(testType("error: blah")) f.Stop() } From 265f889260c6621e0a1e53b3c7b17d00f33a65f2 Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Mon, 22 Sep 2014 16:12:32 -0700 Subject: [PATCH 2/4] convert etcd errors to watch errors --- pkg/tools/etcd_tools_watch.go | 86 +++++++++++++++--------------- pkg/tools/etcd_tools_watch_test.go | 62 ++++++++++++--------- 2 files changed, 78 insertions(+), 70 deletions(-) diff --git a/pkg/tools/etcd_tools_watch.go b/pkg/tools/etcd_tools_watch.go index 6fd11a75a98..6f0ada02519 100644 --- a/pkg/tools/etcd_tools_watch.go +++ b/pkg/tools/etcd_tools_watch.go @@ -20,6 +20,7 @@ import ( "sync" "time" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" @@ -48,7 +49,8 @@ func (h *EtcdHelper) WatchList(key string, resourceVersion uint64, filter Filter // Watch begins watching the specified key. Events are decoded into // API objects and sent down the returned watch.Interface. -func (h *EtcdHelper) Watch(key string, resourceVersion uint64) (watch.Interface, error) { +// Errors will be sent down the channel. +func (h *EtcdHelper) Watch(key string, resourceVersion uint64) watch.Interface { return h.WatchAndTransform(key, resourceVersion, nil) } @@ -67,10 +69,11 @@ func (h *EtcdHelper) Watch(key string, resourceVersion uint64) (watch.Interface, // return value, nil // }) // -func (h *EtcdHelper) WatchAndTransform(key string, resourceVersion uint64, transform TransformFunc) (watch.Interface, error) { +// Errors will be sent down the channel. +func (h *EtcdHelper) WatchAndTransform(key string, resourceVersion uint64, transform TransformFunc) watch.Interface { w := newEtcdWatcher(false, Everything, h.Codec, h.ResourceVersioner, transform) go w.etcdWatch(h.Client, key, resourceVersion) - return w, <-w.immediateError + return w } // TransformFunc attempts to convert an object to another object for use with a watcher. @@ -86,14 +89,10 @@ type etcdWatcher struct { filter FilterFunc etcdIncoming chan *etcd.Response + etcdError chan error etcdStop chan bool etcdCallEnded chan struct{} - // etcdWatch will send an error down this channel if the Watch fails. - // Otherwise, a nil will be sent down this channel watchWaitDuration - // after the watch starts. - immediateError chan error - outgoing chan watch.Event userStop chan struct{} stopped bool @@ -110,17 +109,16 @@ const watchWaitDuration = 100 * time.Millisecond // and a versioner, the versioner must be able to handle the objects that transform creates. func newEtcdWatcher(list bool, filter FilterFunc, encoding runtime.Codec, versioner runtime.ResourceVersioner, transform TransformFunc) *etcdWatcher { w := &etcdWatcher{ - encoding: encoding, - versioner: versioner, - transform: transform, - list: list, - filter: filter, - etcdIncoming: make(chan *etcd.Response), - etcdStop: make(chan bool), - etcdCallEnded: make(chan struct{}), - immediateError: make(chan error), - outgoing: make(chan watch.Event), - userStop: make(chan struct{}), + encoding: encoding, + versioner: versioner, + transform: transform, + list: list, + filter: filter, + etcdIncoming: make(chan *etcd.Response), + etcdError: make(chan error, 1), + etcdStop: make(chan bool), + outgoing: make(chan watch.Event), + userStop: make(chan struct{}), } w.emit = func(e watch.Event) { w.outgoing <- e } go w.translate() @@ -128,46 +126,36 @@ func newEtcdWatcher(list bool, filter FilterFunc, encoding runtime.Codec, versio } // etcdWatch calls etcd's Watch function, and handles any errors. Meant to be called -// as a goroutine. Will either send an error over w.immediateError if Watch fails, or in 100ms will +// as a goroutine. func (w *etcdWatcher) etcdWatch(client EtcdGetSet, key string, resourceVersion uint64) { defer util.HandleCrash() - defer close(w.etcdCallEnded) - go func() { - // This is racy; assume that Watch will fail within 100ms if it is going to fail. - // It's still more useful than blocking until the first result shows up. - // Trying to detect the 401: watch window expired error. - <-time.After(watchWaitDuration) - w.immediateError <- nil - }() + defer close(w.etcdError) if resourceVersion == 0 { - latest, ok := etcdGetInitialWatchState(client, key, w.list, w.etcdIncoming) - if !ok { + latest, err := etcdGetInitialWatchState(client, key, w.list, w.etcdIncoming) + if err != nil { + w.etcdError <- err return } resourceVersion = latest + 1 } _, err := client.Watch(key, resourceVersion, w.list, w.etcdIncoming, w.etcdStop) - if err != etcd.ErrWatchStoppedByUser { - glog.Errorf("etcd.Watch stopped unexpectedly: %v (%#v)", err, key) - w.immediateError <- err + if err != nil && err != etcd.ErrWatchStoppedByUser { + w.etcdError <- err } } // etcdGetInitialWatchState turns an etcd Get request into a watch equivalent -func etcdGetInitialWatchState(client EtcdGetSet, key string, recursive bool, incoming chan<- *etcd.Response) (resourceVersion uint64, success bool) { - success = true - +func etcdGetInitialWatchState(client EtcdGetSet, key string, recursive bool, incoming chan<- *etcd.Response) (resourceVersion uint64, err error) { resp, err := client.Get(key, false, recursive) if err != nil { if !IsEtcdNotFound(err) { glog.Errorf("watch was unable to retrieve the current index for the provided key: %v (%#v)", err, key) - success = false - return + return resourceVersion, err } if index, ok := etcdErrorIndex(err); ok { resourceVersion = index } - return + return resourceVersion, nil } resourceVersion = resp.EtcdIndex convertRecursiveResponse(resp.Node, resp, incoming) @@ -189,7 +177,7 @@ func convertRecursiveResponse(node *etcd.Node, response *etcd.Response, incoming incoming <- &copied } -// translate pulls stuff from etcd, convert, and push out the outgoing channel. Meant to be +// translate pulls stuff from etcd, converts, and pushes out the outgoing channel. Meant to be // called as a goroutine. func (w *etcdWatcher) translate() { defer close(w.outgoing) @@ -197,16 +185,26 @@ func (w *etcdWatcher) translate() { for { select { - case <-w.etcdCallEnded: + case err := <-w.etcdError: + if err != nil { + w.emit(watch.Event{ + watch.Error, + &api.Status{ + Status: api.StatusFailure, + Message: err.Error(), + }, + }) + } return case <-w.userStop: w.etcdStop <- true return case res, ok := <-w.etcdIncoming: - if !ok { - return + if ok { + w.sendResult(res) } - w.sendResult(res) + // If !ok, don't return here-- must wait for etcdError channel + // to give an error or be closed. } } } diff --git a/pkg/tools/etcd_tools_watch_test.go b/pkg/tools/etcd_tools_watch_test.go index 24f7a71722e..791c0f0fa53 100644 --- a/pkg/tools/etcd_tools_watch_test.go +++ b/pkg/tools/etcd_tools_watch_test.go @@ -205,10 +205,20 @@ func TestWatchEtcdError(t *testing.T) { fakeClient.WatchImmediateError = fmt.Errorf("immediate error") h := EtcdHelper{fakeClient, codec, versioner} - _, err := h.Watch("/some/key", 0) - if err == nil { + got := <-h.Watch("/some/key", 4).ResultChan() + if got.Type != watch.Error { t.Fatalf("Unexpected non-error") } + status, ok := got.Object.(*api.Status) + if !ok { + t.Fatalf("Unexpected non-error object type") + } + if status.Message != "immediate error" { + t.Errorf("Unexpected wrong error") + } + if status.Status != api.StatusFailure { + t.Errorf("Unexpected wrong error status") + } } func TestWatch(t *testing.T) { @@ -217,10 +227,7 @@ func TestWatch(t *testing.T) { fakeClient.expectNotFoundGetSet["/some/key"] = struct{}{} h := EtcdHelper{fakeClient, codec, versioner} - watching, err := h.Watch("/some/key", 0) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } + watching := h.Watch("/some/key", 0) fakeClient.WaitForWatchCompletion() // when server returns not found, the watch index starts at the next value (1) @@ -249,6 +256,17 @@ func TestWatch(t *testing.T) { // Test error case fakeClient.WatchInjectError <- fmt.Errorf("Injected error") + if errEvent, ok := <-watching.ResultChan(); !ok { + t.Errorf("no error result?") + } else { + if e, a := watch.Error, errEvent.Type; e != a { + t.Errorf("Expected %v, got %v", e, a) + } + if e, a := "Injected error", errEvent.Object.(*api.Status).Message; e != a { + t.Errorf("Expected %v, got %v", e, a) + } + } + // Did everything shut down? if _, open := <-fakeClient.WatchResponse; open { t.Errorf("An injected error did not cause a graceful shutdown") @@ -349,11 +367,7 @@ func TestWatchEtcdState(t *testing.T) { fakeClient.Data[key] = value } h := EtcdHelper{fakeClient, codec, versioner} - watching, err := h.Watch("/somekey/foo", testCase.From) - if err != nil { - t.Errorf("%s: unexpected error: %v", k, err) - continue - } + watching := h.Watch("/somekey/foo", testCase.From) fakeClient.WaitForWatchCompletion() t.Logf("Testing %v", k) @@ -421,10 +435,7 @@ func TestWatchFromZeroIndex(t *testing.T) { fakeClient.Data["/some/key"] = testCase.Response h := EtcdHelper{fakeClient, codec, versioner} - watching, err := h.Watch("/some/key", 0) - if err != nil { - t.Fatalf("%s: unexpected error: %v", k, err) - } + watching := h.Watch("/some/key", 0) fakeClient.WaitForWatchCompletion() if e, a := testCase.Response.R.EtcdIndex+1, fakeClient.WatchIndex; e != a { @@ -525,10 +536,7 @@ func TestWatchFromNotFound(t *testing.T) { } h := EtcdHelper{fakeClient, codec, versioner} - watching, err := h.Watch("/some/key", 0) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } + watching := h.Watch("/some/key", 0) fakeClient.WaitForWatchCompletion() if fakeClient.WatchIndex != 3 { @@ -551,9 +559,14 @@ func TestWatchFromOtherError(t *testing.T) { } h := EtcdHelper{fakeClient, codec, versioner} - watching, err := h.Watch("/some/key", 0) - if err != nil { - t.Fatalf("Unexpected error: %v", err) + watching := h.Watch("/some/key", 0) + + errEvent := <-watching.ResultChan() + if e, a := watch.Error, errEvent.Type; e != a { + t.Errorf("Expected %v, got %v", e, a) + } + if e, a := "101: () [2]", errEvent.Object.(*api.Status).Message; e != a { + t.Errorf("Expected %v, got %v", e, a) } select { @@ -576,10 +589,7 @@ func TestWatchPurposefulShutdown(t *testing.T) { fakeClient.expectNotFoundGetSet["/some/key"] = struct{}{} // Test purposeful shutdown - watching, err := h.Watch("/some/key", 0) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } + watching := h.Watch("/some/key", 0) fakeClient.WaitForWatchCompletion() watching.Stop() From 8fd1fb43379f0708de9a0059f1e471134966545f Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Mon, 22 Sep 2014 17:05:15 -0700 Subject: [PATCH 3/4] update pkg/api documentation --- pkg/watch/json/types.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/watch/json/types.go b/pkg/watch/json/types.go index 8851a09601f..0969abe8e89 100644 --- a/pkg/watch/json/types.go +++ b/pkg/watch/json/types.go @@ -33,6 +33,7 @@ type watchEvent struct { // For added or modified objects, this is the new object; for deleted objects, // it's the state of the object immediately prior to its deletion. + // For errors, it's an api.Status. Object runtime.RawExtension `json:"object,omitempty" yaml:"object,omitempty"` } From f211e46f20e6e6e86243cfd40612c6f6330a736b Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Mon, 22 Sep 2014 17:37:12 -0700 Subject: [PATCH 4/4] handle watch errors everywhere --- pkg/api/errors/errors.go | 11 +++++++++++ pkg/api/errors/errors_test.go | 21 +++++++++++++++++++++ pkg/client/cache/reflector.go | 8 ++++++-- pkg/kubelet/config/etcd.go | 12 ++++++------ pkg/registry/etcd/etcd.go | 4 ++-- test/integration/etcd_tools_test.go | 5 +---- 6 files changed, 47 insertions(+), 14 deletions(-) diff --git a/pkg/api/errors/errors.go b/pkg/api/errors/errors.go index 65791c0af7b..148b1068793 100644 --- a/pkg/api/errors/errors.go +++ b/pkg/api/errors/errors.go @@ -21,6 +21,7 @@ import ( "net/http" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" ) // statusError is an error intended for consumption by a REST API server. @@ -38,6 +39,16 @@ func (e *statusError) Status() api.Status { return e.status } +// FromObject generates an statusError from an api.Status, if that is the type of obj; otherwise, +// returns an error created by fmt.Errorf. +func FromObject(obj runtime.Object) error { + switch t := obj.(type) { + case *api.Status: + return &statusError{*t} + } + return fmt.Errorf("unexpected object: %v", obj) +} + // NewNotFound returns a new error which indicates that the resource of the kind and the name was not found. func NewNotFound(kind, name string) error { return &statusError{api.Status{ diff --git a/pkg/api/errors/errors_test.go b/pkg/api/errors/errors_test.go index cde0ea3dfb5..49edec7880d 100644 --- a/pkg/api/errors/errors_test.go +++ b/pkg/api/errors/errors_test.go @@ -23,6 +23,7 @@ import ( "testing" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" ) func TestErrorNew(t *testing.T) { @@ -131,3 +132,23 @@ func Test_reasonForError(t *testing.T) { t.Errorf("unexpected reason type: %#v", a) } } + +type TestType struct{} + +func (*TestType) IsAnAPIObject() {} + +func TestFromObject(t *testing.T) { + table := []struct { + obj runtime.Object + message string + }{ + {&api.Status{Message: "foobar"}, "foobar"}, + {&TestType{}, "unexpected object: &{}"}, + } + + for _, item := range table { + if e, a := item.message, FromObject(item.obj).Error(); e != a { + t.Errorf("Expected %v, got %v", e, a) + } + } +} diff --git a/pkg/client/cache/reflector.go b/pkg/client/cache/reflector.go index 72ceb7b88ab..e0bc6ed4894 100644 --- a/pkg/client/cache/reflector.go +++ b/pkg/client/cache/reflector.go @@ -22,6 +22,7 @@ import ( "reflect" "time" + apierrs "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" @@ -101,7 +102,7 @@ func (r *Reflector) listAndWatch() { return } if err := r.watchHandler(w, &resourceVersion); err != nil { - glog.Errorf("failed to watch %v: %v", r.expectedType, err) + glog.Errorf("watch of %v ended with error: %v", r.expectedType, err) return } } @@ -131,6 +132,9 @@ func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *uint64) err if !ok { break } + if event.Type == watch.Error { + return apierrs.FromObject(event.Object) + } if e, a := r.expectedType, reflect.TypeOf(event.Object); e != a { glog.Errorf("expected type %v, but watch event object had type %v", e, a) continue @@ -162,6 +166,6 @@ func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *uint64) err glog.Errorf("unexpected watch close - watch lasted less than a second and no items received") return errors.New("very short watch") } - glog.Infof("unexpected watch close - %v total items received", eventCount) + glog.Infof("watch close - %v total items received", eventCount) return nil } diff --git a/pkg/kubelet/config/etcd.go b/pkg/kubelet/config/etcd.go index 1c4672c558d..4f16bd510d3 100644 --- a/pkg/kubelet/config/etcd.go +++ b/pkg/kubelet/config/etcd.go @@ -60,18 +60,18 @@ func NewSourceEtcd(key string, client tools.EtcdClient, updates chan<- interface } func (s *SourceEtcd) run() { - watching, err := s.helper.Watch(s.key, 0) - if err != nil { - glog.Errorf("Failed to initialize etcd watch: %v", err) - return - } + watching := s.helper.Watch(s.key, 0) for { select { case event, ok := <-watching.ResultChan(): if !ok { return } - + if event.Type == watch.Error { + glog.Errorf("Watch error: %v", event.Object) + watching.Stop() + return + } pods, err := eventToPods(event) if err != nil { glog.Errorf("Failed to parse result from etcd watch: %v", err) diff --git a/pkg/registry/etcd/etcd.go b/pkg/registry/etcd/etcd.go index 0b9f29081e7..88734a98cb3 100644 --- a/pkg/registry/etcd/etcd.go +++ b/pkg/registry/etcd/etcd.go @@ -343,7 +343,7 @@ func (r *Registry) WatchServices(label, field labels.Selector, resourceVersion u return nil, fmt.Errorf("label selectors are not supported on services") } if value, found := field.RequiresExactMatch("ID"); found { - return r.Watch(makeServiceKey(value), resourceVersion) + return r.Watch(makeServiceKey(value), resourceVersion), nil } if field.Empty() { return r.WatchList("/registry/services/specs", resourceVersion, tools.Everything) @@ -375,7 +375,7 @@ func (r *Registry) WatchEndpoints(label, field labels.Selector, resourceVersion return nil, fmt.Errorf("label selectors are not supported on endpoints") } if value, found := field.RequiresExactMatch("ID"); found { - return r.Watch(makeServiceEndpointsKey(value), resourceVersion) + return r.Watch(makeServiceEndpointsKey(value), resourceVersion), nil } if field.Empty() { return r.WatchList("/registry/services/endpoints", resourceVersion, tools.Everything) diff --git a/test/integration/etcd_tools_test.go b/test/integration/etcd_tools_test.go index 87b8134b9c0..a13218df78d 100644 --- a/test/integration/etcd_tools_test.go +++ b/test/integration/etcd_tools_test.go @@ -101,10 +101,7 @@ func TestWatch(t *testing.T) { expectedVersion := resp.Node.ModifiedIndex // watch should load the object at the current index - w, err := helper.Watch(key, 0) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } + w := helper.Watch(key, 0) event := <-w.ResultChan() if event.Type != watch.Added || event.Object == nil { t.Fatalf("expected first value to be set to ADDED, got %#v", event)