Fix client watch reestablishment handling of client-side timeouts

Kubernetes-commit: 343c1e7636fe5c75cdd378c0b170b26935806de5
This commit is contained in:
Jordan Liggitt 2020-03-30 10:36:01 -04:00 committed by Kubernetes Publisher
parent fe32aa3b94
commit 33c1f9f2a4
4 changed files with 9 additions and 8 deletions

View File

@ -669,7 +669,7 @@ func (r *Request) Watch(ctx context.Context) (watch.Interface, error) {
if err != nil { if err != nil {
// The watch stream mechanism handles many common partial data errors, so closed // The watch stream mechanism handles many common partial data errors, so closed
// connections can be retried in many cases. // connections can be retried in many cases.
if net.IsProbableEOF(err) { if net.IsProbableEOF(err) || net.IsTimeout(err) {
return watch.NewEmptyWatch(), nil return watch.NewEmptyWatch(), nil
} }
return nil, err return nil, err

View File

@ -396,6 +396,8 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
AllowWatchBookmarks: true, AllowWatchBookmarks: true,
} }
// start the clock before sending the request, since some proxies won't flush headers until after the first watch event is sent
start := r.clock.Now()
w, err := r.listerWatcher.Watch(options) w, err := r.listerWatcher.Watch(options)
if err != nil { if err != nil {
// If this is "connection refused" error, it means that most likely apiserver is not responsive. // If this is "connection refused" error, it means that most likely apiserver is not responsive.
@ -409,7 +411,7 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
return err return err
} }
if err := r.watchHandler(w, &resourceVersion, resyncerrc, stopCh); err != nil { if err := r.watchHandler(start, w, &resourceVersion, resyncerrc, stopCh); err != nil {
if err != errorStopRequested { if err != errorStopRequested {
switch { switch {
case isExpiredError(err): case isExpiredError(err):
@ -436,8 +438,7 @@ func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) err
} }
// watchHandler watches w and keeps *resourceVersion up to date. // watchHandler watches w and keeps *resourceVersion up to date.
func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error { func (r *Reflector) watchHandler(start time.Time, w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
start := r.clock.Now()
eventCount := 0 eventCount := 0
// Stopping the watcher should be idempotent and if we return from this function there's no way // Stopping the watcher should be idempotent and if we return from this function there's no way

View File

@ -136,7 +136,7 @@ func TestReflectorWatchHandlerError(t *testing.T) {
fw.Stop() fw.Stop()
}() }()
var resumeRV string var resumeRV string
err := g.watchHandler(fw, &resumeRV, nevererrc, wait.NeverStop) err := g.watchHandler(time.Now(), fw, &resumeRV, nevererrc, wait.NeverStop)
if err == nil { if err == nil {
t.Errorf("unexpected non-error") t.Errorf("unexpected non-error")
} }
@ -156,7 +156,7 @@ func TestReflectorWatchHandler(t *testing.T) {
fw.Stop() fw.Stop()
}() }()
var resumeRV string var resumeRV string
err := g.watchHandler(fw, &resumeRV, nevererrc, wait.NeverStop) err := g.watchHandler(time.Now(), fw, &resumeRV, nevererrc, wait.NeverStop)
if err != nil { if err != nil {
t.Errorf("unexpected error %v", err) t.Errorf("unexpected error %v", err)
} }
@ -205,7 +205,7 @@ func TestReflectorStopWatch(t *testing.T) {
var resumeRV string var resumeRV string
stopWatch := make(chan struct{}, 1) stopWatch := make(chan struct{}, 1)
stopWatch <- struct{}{} stopWatch <- struct{}{}
err := g.watchHandler(fw, &resumeRV, nevererrc, stopWatch) err := g.watchHandler(time.Now(), fw, &resumeRV, nevererrc, stopWatch)
if err != errorStopRequested { if err != errorStopRequested {
t.Errorf("expected stop error, got %q", err) t.Errorf("expected stop error, got %q", err)
} }

View File

@ -120,7 +120,7 @@ func (rw *RetryWatcher) doReceive() (bool, time.Duration) {
default: default:
msg := "Watch failed: %v" msg := "Watch failed: %v"
if net.IsProbableEOF(err) { if net.IsProbableEOF(err) || net.IsTimeout(err) {
klog.V(5).Infof(msg, err) klog.V(5).Infof(msg, err)
// Retry // Retry
return false, 0 return false, 0