diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index 4b66c2c0f..9ea9e7c4a 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -352,7 +352,7 @@ }, { "ImportPath": "k8s.io/apimachinery", - "Rev": "fa0d5bf06730" + "Rev": "591a38b7a7b7" }, { "ImportPath": "k8s.io/gengo", diff --git a/go.mod b/go.mod index dfae0fa43..bec8df72c 100644 --- a/go.mod +++ b/go.mod @@ -29,7 +29,7 @@ require ( golang.org/x/time v0.0.0-20191024005414-555d28b269f0 google.golang.org/appengine v1.5.0 // indirect k8s.io/api v0.0.0-20200403220253-fa879b399cd0 - k8s.io/apimachinery v0.0.0-20200403220105-fa0d5bf06730 + k8s.io/apimachinery v0.0.0-20200407101112-591a38b7a7b7 k8s.io/klog v1.0.0 k8s.io/utils v0.0.0-20200324210504-a9aa75ae1b89 sigs.k8s.io/yaml v1.2.0 @@ -39,5 +39,5 @@ replace ( golang.org/x/sys => golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a // pinned to release-branch.go1.13 golang.org/x/tools => golang.org/x/tools v0.0.0-20190821162956-65e3620a7ae7 // pinned to release-branch.go1.13 k8s.io/api => k8s.io/api v0.0.0-20200403220253-fa879b399cd0 - k8s.io/apimachinery => k8s.io/apimachinery v0.0.0-20200403220105-fa0d5bf06730 + k8s.io/apimachinery => k8s.io/apimachinery v0.0.0-20200407101112-591a38b7a7b7 ) diff --git a/go.sum b/go.sum index ebf8d232f..fad76fb5f 100644 --- a/go.sum +++ b/go.sum @@ -188,7 +188,7 @@ gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= k8s.io/api v0.0.0-20200403220253-fa879b399cd0/go.mod h1:IHUHI0RKOjAF2/LmT+5g7hlTufmlzdZqccff7saac3A= -k8s.io/apimachinery v0.0.0-20200403220105-fa0d5bf06730/go.mod h1:plsINh5MKcZh761kXB2H+kXnD8vwFIAy7bitct1VPNU= +k8s.io/apimachinery v0.0.0-20200407101112-591a38b7a7b7/go.mod h1:plsINh5MKcZh761kXB2H+kXnD8vwFIAy7bitct1VPNU= k8s.io/gengo v0.0.0-20190128074634-0689ccc1d7d6/go.mod h1:ezvh/TsK7cY6rbqRK0oQQ8IAqLxYwwyPxAX1Pzy0ii0= k8s.io/klog v0.0.0-20181102134211-b9b56d5dfc92/go.mod h1:Gq+BEi5rUBO/HRz0bTSXDUcqjScdoY3a9IHpCEIOOfk= k8s.io/klog v0.3.0/go.mod h1:Gq+BEi5rUBO/HRz0bTSXDUcqjScdoY3a9IHpCEIOOfk= diff --git a/rest/request.go b/rest/request.go index fe048ec42..96fb19ce8 100644 --- a/rest/request.go +++ b/rest/request.go @@ -669,7 +669,7 @@ func (r *Request) Watch(ctx context.Context) (watch.Interface, error) { if err != nil { // The watch stream mechanism handles many common partial data errors, so closed // connections can be retried in many cases. - if net.IsProbableEOF(err) { + if net.IsProbableEOF(err) || net.IsTimeout(err) { return watch.NewEmptyWatch(), nil } return nil, err diff --git a/tools/cache/reflector.go b/tools/cache/reflector.go index 7c581ceda..8dfc41694 100644 --- a/tools/cache/reflector.go +++ b/tools/cache/reflector.go @@ -396,6 +396,8 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { AllowWatchBookmarks: true, } + // start the clock before sending the request, since some proxies won't flush headers until after the first watch event is sent + start := r.clock.Now() w, err := r.listerWatcher.Watch(options) if err != nil { // If this is "connection refused" error, it means that most likely apiserver is not responsive. @@ -409,7 +411,7 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { return err } - if err := r.watchHandler(w, &resourceVersion, resyncerrc, stopCh); err != nil { + if err := r.watchHandler(start, w, &resourceVersion, resyncerrc, stopCh); err != nil { if err != errorStopRequested { switch { case isExpiredError(err): @@ -436,8 +438,7 @@ func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) err } // watchHandler watches w and keeps *resourceVersion up to date. -func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error { - start := r.clock.Now() +func (r *Reflector) watchHandler(start time.Time, w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error { eventCount := 0 // Stopping the watcher should be idempotent and if we return from this function there's no way diff --git a/tools/cache/reflector_test.go b/tools/cache/reflector_test.go index 59b66256d..3e2fcc19b 100644 --- a/tools/cache/reflector_test.go +++ b/tools/cache/reflector_test.go @@ -136,7 +136,7 @@ func TestReflectorWatchHandlerError(t *testing.T) { fw.Stop() }() var resumeRV string - err := g.watchHandler(fw, &resumeRV, nevererrc, wait.NeverStop) + err := g.watchHandler(time.Now(), fw, &resumeRV, nevererrc, wait.NeverStop) if err == nil { t.Errorf("unexpected non-error") } @@ -156,7 +156,7 @@ func TestReflectorWatchHandler(t *testing.T) { fw.Stop() }() var resumeRV string - err := g.watchHandler(fw, &resumeRV, nevererrc, wait.NeverStop) + err := g.watchHandler(time.Now(), fw, &resumeRV, nevererrc, wait.NeverStop) if err != nil { t.Errorf("unexpected error %v", err) } @@ -205,7 +205,7 @@ func TestReflectorStopWatch(t *testing.T) { var resumeRV string stopWatch := make(chan struct{}, 1) stopWatch <- struct{}{} - err := g.watchHandler(fw, &resumeRV, nevererrc, stopWatch) + err := g.watchHandler(time.Now(), fw, &resumeRV, nevererrc, stopWatch) if err != errorStopRequested { t.Errorf("expected stop error, got %q", err) } diff --git a/tools/watch/retrywatcher.go b/tools/watch/retrywatcher.go index 47ae9df4a..edf28d164 100644 --- a/tools/watch/retrywatcher.go +++ b/tools/watch/retrywatcher.go @@ -120,7 +120,7 @@ func (rw *RetryWatcher) doReceive() (bool, time.Duration) { default: msg := "Watch failed: %v" - if net.IsProbableEOF(err) { + if net.IsProbableEOF(err) || net.IsTimeout(err) { klog.V(5).Infof(msg, err) // Retry return false, 0