From 4d7c6e2657a04d581851cd00f972451c906eb2b6 Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Fri, 19 Sep 2014 18:09:40 -0700 Subject: [PATCH] Treat super short watches with no items received the same as watches that error immediately --- pkg/client/cache/reflector.go | 22 ++++++++++++++++++---- pkg/client/cache/reflector_test.go | 19 ++++++++++++++++++- 2 files changed, 36 insertions(+), 5 deletions(-) diff --git a/pkg/client/cache/reflector.go b/pkg/client/cache/reflector.go index 1e309975048..72ceb7b88ab 100644 --- a/pkg/client/cache/reflector.go +++ b/pkg/client/cache/reflector.go @@ -17,6 +17,7 @@ limitations under the License. package cache import ( + "errors" "fmt" "reflect" "time" @@ -99,7 +100,10 @@ func (r *Reflector) listAndWatch() { glog.Errorf("failed to watch %v: %v", r.expectedType, err) return } - r.watchHandler(w, &resourceVersion) + if err := r.watchHandler(w, &resourceVersion); err != nil { + glog.Errorf("failed to watch %v: %v", r.expectedType, err) + return + } } } @@ -119,12 +123,13 @@ func (r *Reflector) syncWith(items []runtime.Object) error { } // watchHandler watches w and keeps *resourceVersion up to date. -func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *uint64) { +func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *uint64) error { + start := time.Now() + eventCount := 0 for { event, ok := <-w.ResultChan() if !ok { - glog.Errorf("unexpected watch close") - return + break } if e, a := r.expectedType, reflect.TypeOf(event.Object); e != a { glog.Errorf("expected type %v, but watch event object had type %v", e, a) @@ -149,5 +154,14 @@ func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *uint64) { glog.Errorf("unable to understand watch event %#v", event) } *resourceVersion = jsonBase.ResourceVersion() + 1 + eventCount++ } + + watchDuration := time.Now().Sub(start) + if watchDuration < 1*time.Second && eventCount == 0 { + glog.Errorf("unexpected watch close - watch lasted less than a second and no items received") + return errors.New("very short watch") + } + glog.Infof("unexpected watch close - %v total items received", eventCount) + return nil } diff --git a/pkg/client/cache/reflector_test.go b/pkg/client/cache/reflector_test.go index 60963d793af..df706c3b1e6 100644 --- a/pkg/client/cache/reflector_test.go +++ b/pkg/client/cache/reflector_test.go @@ -35,6 +35,20 @@ func (t *testLW) Watch(resourceVersion uint64) (watch.Interface, error) { return t.WatchFunc(resourceVersion) } +func TestReflector_watchHandlerError(t *testing.T) { + s := NewStore() + g := NewReflector(&testLW{}, &api.Pod{}, s) + fw := watch.NewFake() + go func() { + fw.Stop() + }() + var resumeRV uint64 + err := g.watchHandler(fw, &resumeRV) + if err == nil { + t.Errorf("unexpected non-error") + } +} + func TestReflector_watchHandler(t *testing.T) { s := NewStore() g := NewReflector(&testLW{}, &api.Pod{}, s) @@ -49,7 +63,10 @@ func TestReflector_watchHandler(t *testing.T) { fw.Stop() }() var resumeRV uint64 - g.watchHandler(fw, &resumeRV) + err := g.watchHandler(fw, &resumeRV) + if err != nil { + t.Errorf("unexpected error %v", err) + } table := []struct { ID string