diff --git a/rest/request.go b/rest/request.go index 0570615fc..8056db97a 100644 --- a/rest/request.go +++ b/rest/request.go @@ -582,7 +582,7 @@ func (r *Request) WatchWithSpecificDecoders(wrapperDecoderFn func(io.ReadCloser) if err != nil { // The watch stream mechanism handles many common partial data errors, so closed // connections can be retried in many cases. - if net.IsProbableEOF(err) { + if net.IsProbableEOF(err) || net.IsTimeout(err) { return watch.NewEmptyWatch(), nil } return nil, err diff --git a/tools/cache/reflector.go b/tools/cache/reflector.go index 2daa44ba5..59806cd47 100644 --- a/tools/cache/reflector.go +++ b/tools/cache/reflector.go @@ -272,6 +272,8 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { AllowWatchBookmarks: false, } + // start the clock before sending the request, since some proxies won't flush headers until after the first watch event is sent + start := r.clock.Now() w, err := r.listerWatcher.Watch(options) if err != nil { switch err { @@ -297,7 +299,7 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { return nil } - if err := r.watchHandler(w, &resourceVersion, resyncerrc, stopCh); err != nil { + if err := r.watchHandler(start, w, &resourceVersion, resyncerrc, stopCh); err != nil { if err != errorStopRequested { klog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedType, err) } @@ -316,8 +318,7 @@ func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) err } // watchHandler watches w and keeps *resourceVersion up to date. -func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error { - start := r.clock.Now() +func (r *Reflector) watchHandler(start time.Time, w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error { eventCount := 0 // Stopping the watcher should be idempotent and if we return from this function there's no way diff --git a/tools/cache/reflector_test.go b/tools/cache/reflector_test.go index caa072323..0153c924c 100644 --- a/tools/cache/reflector_test.go +++ b/tools/cache/reflector_test.go @@ -132,7 +132,7 @@ func TestReflectorWatchHandlerError(t *testing.T) { fw.Stop() }() var resumeRV string - err := g.watchHandler(fw, &resumeRV, nevererrc, wait.NeverStop) + err := g.watchHandler(time.Now(), fw, &resumeRV, nevererrc, wait.NeverStop) if err == nil { t.Errorf("unexpected non-error") } @@ -152,7 +152,7 @@ func TestReflectorWatchHandler(t *testing.T) { fw.Stop() }() var resumeRV string - err := g.watchHandler(fw, &resumeRV, nevererrc, wait.NeverStop) + err := g.watchHandler(time.Now(), fw, &resumeRV, nevererrc, wait.NeverStop) if err != nil { t.Errorf("unexpected error %v", err) } @@ -201,7 +201,7 @@ func TestReflectorStopWatch(t *testing.T) { var resumeRV string stopWatch := make(chan struct{}, 1) stopWatch <- struct{}{} - err := g.watchHandler(fw, &resumeRV, nevererrc, stopWatch) + err := g.watchHandler(time.Now(), fw, &resumeRV, nevererrc, stopWatch) if err != errorStopRequested { t.Errorf("expected stop error, got %q", err) } diff --git a/tools/watch/retrywatcher.go b/tools/watch/retrywatcher.go index 47ae9df4a..edf28d164 100644 --- a/tools/watch/retrywatcher.go +++ b/tools/watch/retrywatcher.go @@ -120,7 +120,7 @@ func (rw *RetryWatcher) doReceive() (bool, time.Duration) { default: msg := "Watch failed: %v" - if net.IsProbableEOF(err) { + if net.IsProbableEOF(err) || net.IsTimeout(err) { klog.V(5).Infof(msg, err) // Retry return false, 0