handle watch errors everywhere

This commit is contained in:
Daniel Smith 2014-09-22 17:37:12 -07:00
parent 8fd1fb4337
commit f211e46f20
6 changed files with 47 additions and 14 deletions

View File

@ -21,6 +21,7 @@ import (
"net/http"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
)
// statusError is an error intended for consumption by a REST API server.
@ -38,6 +39,16 @@ func (e *statusError) Status() api.Status {
return e.status
}
// FromObject generates an statusError from an api.Status, if that is the type of obj; otherwise,
// returns an error created by fmt.Errorf.
func FromObject(obj runtime.Object) error {
switch t := obj.(type) {
case *api.Status:
return &statusError{*t}
}
return fmt.Errorf("unexpected object: %v", obj)
}
// NewNotFound returns a new error which indicates that the resource of the kind and the name was not found.
func NewNotFound(kind, name string) error {
return &statusError{api.Status{

View File

@ -23,6 +23,7 @@ import (
"testing"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
)
func TestErrorNew(t *testing.T) {
@ -131,3 +132,23 @@ func Test_reasonForError(t *testing.T) {
t.Errorf("unexpected reason type: %#v", a)
}
}
type TestType struct{}
func (*TestType) IsAnAPIObject() {}
func TestFromObject(t *testing.T) {
table := []struct {
obj runtime.Object
message string
}{
{&api.Status{Message: "foobar"}, "foobar"},
{&TestType{}, "unexpected object: &{}"},
}
for _, item := range table {
if e, a := item.message, FromObject(item.obj).Error(); e != a {
t.Errorf("Expected %v, got %v", e, a)
}
}
}

View File

@ -22,6 +22,7 @@ import (
"reflect"
"time"
apierrs "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
@ -101,7 +102,7 @@ func (r *Reflector) listAndWatch() {
return
}
if err := r.watchHandler(w, &resourceVersion); err != nil {
glog.Errorf("failed to watch %v: %v", r.expectedType, err)
glog.Errorf("watch of %v ended with error: %v", r.expectedType, err)
return
}
}
@ -131,6 +132,9 @@ func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *uint64) err
if !ok {
break
}
if event.Type == watch.Error {
return apierrs.FromObject(event.Object)
}
if e, a := r.expectedType, reflect.TypeOf(event.Object); e != a {
glog.Errorf("expected type %v, but watch event object had type %v", e, a)
continue
@ -162,6 +166,6 @@ func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *uint64) err
glog.Errorf("unexpected watch close - watch lasted less than a second and no items received")
return errors.New("very short watch")
}
glog.Infof("unexpected watch close - %v total items received", eventCount)
glog.Infof("watch close - %v total items received", eventCount)
return nil
}

View File

@ -60,18 +60,18 @@ func NewSourceEtcd(key string, client tools.EtcdClient, updates chan<- interface
}
func (s *SourceEtcd) run() {
watching, err := s.helper.Watch(s.key, 0)
if err != nil {
glog.Errorf("Failed to initialize etcd watch: %v", err)
return
}
watching := s.helper.Watch(s.key, 0)
for {
select {
case event, ok := <-watching.ResultChan():
if !ok {
return
}
if event.Type == watch.Error {
glog.Errorf("Watch error: %v", event.Object)
watching.Stop()
return
}
pods, err := eventToPods(event)
if err != nil {
glog.Errorf("Failed to parse result from etcd watch: %v", err)

View File

@ -343,7 +343,7 @@ func (r *Registry) WatchServices(label, field labels.Selector, resourceVersion u
return nil, fmt.Errorf("label selectors are not supported on services")
}
if value, found := field.RequiresExactMatch("ID"); found {
return r.Watch(makeServiceKey(value), resourceVersion)
return r.Watch(makeServiceKey(value), resourceVersion), nil
}
if field.Empty() {
return r.WatchList("/registry/services/specs", resourceVersion, tools.Everything)
@ -375,7 +375,7 @@ func (r *Registry) WatchEndpoints(label, field labels.Selector, resourceVersion
return nil, fmt.Errorf("label selectors are not supported on endpoints")
}
if value, found := field.RequiresExactMatch("ID"); found {
return r.Watch(makeServiceEndpointsKey(value), resourceVersion)
return r.Watch(makeServiceEndpointsKey(value), resourceVersion), nil
}
if field.Empty() {
return r.WatchList("/registry/services/endpoints", resourceVersion, tools.Everything)

View File

@ -101,10 +101,7 @@ func TestWatch(t *testing.T) {
expectedVersion := resp.Node.ModifiedIndex
// watch should load the object at the current index
w, err := helper.Watch(key, 0)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
w := helper.Watch(key, 0)
event := <-w.ResultChan()
if event.Type != watch.Added || event.Object == nil {
t.Fatalf("expected first value to be set to ADDED, got %#v", event)