diff --git a/pkg/api/errors/errors.go b/pkg/api/errors/errors.go index 65791c0af7b..148b1068793 100644 --- a/pkg/api/errors/errors.go +++ b/pkg/api/errors/errors.go @@ -21,6 +21,7 @@ import ( "net/http" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" ) // statusError is an error intended for consumption by a REST API server. @@ -38,6 +39,16 @@ func (e *statusError) Status() api.Status { return e.status } +// FromObject generates an statusError from an api.Status, if that is the type of obj; otherwise, +// returns an error created by fmt.Errorf. +func FromObject(obj runtime.Object) error { + switch t := obj.(type) { + case *api.Status: + return &statusError{*t} + } + return fmt.Errorf("unexpected object: %v", obj) +} + // NewNotFound returns a new error which indicates that the resource of the kind and the name was not found. func NewNotFound(kind, name string) error { return &statusError{api.Status{ diff --git a/pkg/api/errors/errors_test.go b/pkg/api/errors/errors_test.go index cde0ea3dfb5..49edec7880d 100644 --- a/pkg/api/errors/errors_test.go +++ b/pkg/api/errors/errors_test.go @@ -23,6 +23,7 @@ import ( "testing" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" ) func TestErrorNew(t *testing.T) { @@ -131,3 +132,23 @@ func Test_reasonForError(t *testing.T) { t.Errorf("unexpected reason type: %#v", a) } } + +type TestType struct{} + +func (*TestType) IsAnAPIObject() {} + +func TestFromObject(t *testing.T) { + table := []struct { + obj runtime.Object + message string + }{ + {&api.Status{Message: "foobar"}, "foobar"}, + {&TestType{}, "unexpected object: &{}"}, + } + + for _, item := range table { + if e, a := item.message, FromObject(item.obj).Error(); e != a { + t.Errorf("Expected %v, got %v", e, a) + } + } +} diff --git a/pkg/client/cache/reflector.go b/pkg/client/cache/reflector.go index 72ceb7b88ab..e0bc6ed4894 100644 --- a/pkg/client/cache/reflector.go +++ b/pkg/client/cache/reflector.go @@ -22,6 +22,7 @@ import ( "reflect" "time" + apierrs "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" @@ -101,7 +102,7 @@ func (r *Reflector) listAndWatch() { return } if err := r.watchHandler(w, &resourceVersion); err != nil { - glog.Errorf("failed to watch %v: %v", r.expectedType, err) + glog.Errorf("watch of %v ended with error: %v", r.expectedType, err) return } } @@ -131,6 +132,9 @@ func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *uint64) err if !ok { break } + if event.Type == watch.Error { + return apierrs.FromObject(event.Object) + } if e, a := r.expectedType, reflect.TypeOf(event.Object); e != a { glog.Errorf("expected type %v, but watch event object had type %v", e, a) continue @@ -162,6 +166,6 @@ func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *uint64) err glog.Errorf("unexpected watch close - watch lasted less than a second and no items received") return errors.New("very short watch") } - glog.Infof("unexpected watch close - %v total items received", eventCount) + glog.Infof("watch close - %v total items received", eventCount) return nil } diff --git a/pkg/kubelet/config/etcd.go b/pkg/kubelet/config/etcd.go index 1c4672c558d..4f16bd510d3 100644 --- a/pkg/kubelet/config/etcd.go +++ b/pkg/kubelet/config/etcd.go @@ -60,18 +60,18 @@ func NewSourceEtcd(key string, client tools.EtcdClient, updates chan<- interface } func (s *SourceEtcd) run() { - watching, err := s.helper.Watch(s.key, 0) - if err != nil { - glog.Errorf("Failed to initialize etcd watch: %v", err) - return - } + watching := s.helper.Watch(s.key, 0) for { select { case event, ok := <-watching.ResultChan(): if !ok { return } - + if event.Type == watch.Error { + glog.Errorf("Watch error: %v", event.Object) + watching.Stop() + return + } pods, err := eventToPods(event) if err != nil { glog.Errorf("Failed to parse result from etcd watch: %v", err) diff --git a/pkg/registry/etcd/etcd.go b/pkg/registry/etcd/etcd.go index 0b9f29081e7..88734a98cb3 100644 --- a/pkg/registry/etcd/etcd.go +++ b/pkg/registry/etcd/etcd.go @@ -343,7 +343,7 @@ func (r *Registry) WatchServices(label, field labels.Selector, resourceVersion u return nil, fmt.Errorf("label selectors are not supported on services") } if value, found := field.RequiresExactMatch("ID"); found { - return r.Watch(makeServiceKey(value), resourceVersion) + return r.Watch(makeServiceKey(value), resourceVersion), nil } if field.Empty() { return r.WatchList("/registry/services/specs", resourceVersion, tools.Everything) @@ -375,7 +375,7 @@ func (r *Registry) WatchEndpoints(label, field labels.Selector, resourceVersion return nil, fmt.Errorf("label selectors are not supported on endpoints") } if value, found := field.RequiresExactMatch("ID"); found { - return r.Watch(makeServiceEndpointsKey(value), resourceVersion) + return r.Watch(makeServiceEndpointsKey(value), resourceVersion), nil } if field.Empty() { return r.WatchList("/registry/services/endpoints", resourceVersion, tools.Everything) diff --git a/test/integration/etcd_tools_test.go b/test/integration/etcd_tools_test.go index 87b8134b9c0..a13218df78d 100644 --- a/test/integration/etcd_tools_test.go +++ b/test/integration/etcd_tools_test.go @@ -101,10 +101,7 @@ func TestWatch(t *testing.T) { expectedVersion := resp.Node.ModifiedIndex // watch should load the object at the current index - w, err := helper.Watch(key, 0) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } + w := helper.Watch(key, 0) event := <-w.ResultChan() if event.Type != watch.Added || event.Object == nil { t.Fatalf("expected first value to be set to ADDED, got %#v", event)