diff --git a/pkg/client/cache/reflector.go b/pkg/client/cache/reflector.go index 159f35f4d11..3a5025a28a7 100644 --- a/pkg/client/cache/reflector.go +++ b/pkg/client/cache/reflector.go @@ -246,35 +246,6 @@ func (r *Reflector) resyncChan() (<-chan time.Time, func() bool) { return t.C, t.Stop } -// We want to avoid situations when periodic resyncing is breaking the TCP -// connection. -// If response`s body is not read to completion before calling body.Close(), -// that TCP connection will not be reused in the future - see #15664 issue -// for more details. -// Thus, we set timeout for watch requests to be smaller than the remaining -// time until next periodic resync and force resyncing ourself to avoid -// breaking TCP connection. -// -// TODO: This should be parametrizable based on server load. -func (r *Reflector) timeoutForWatch() *int64 { - randTimeout := time.Duration(float64(minWatchTimeout) * (rand.Float64() + 1.0)) - timeout := r.nextResync.Sub(r.now()) - timeoutThreshold - if timeout < 0 || randTimeout < timeout { - timeout = randTimeout - } - timeoutSeconds := int64(timeout.Seconds()) - return &timeoutSeconds -} - -// Returns true if we are close enough to next planned periodic resync -// and we can force resyncing ourself now. -func (r *Reflector) canForceResyncNow() bool { - if r.nextResync.IsZero() { - return false - } - return r.now().Add(forceResyncThreshold).After(r.nextResync) -} - // ListAndWatch first lists all items and get the resource version at the moment of call, // and then use the resource version to watch. // It returns error if ListAndWatch didn't even try to initialize watch. @@ -306,13 +277,33 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { } r.setLastSyncResourceVersion(resourceVersion) - for { - options := api.ListOptions{ - ResourceVersion: resourceVersion, - // We want to avoid situations when resyncing is breaking the TCP connection - // - see comment for 'timeoutForWatch()' for more details. - TimeoutSeconds: r.timeoutForWatch(), + resyncerrc := make(chan error, 1) + go func() { + for { + select { + case <-resyncCh: + case <-stopCh: + return + } + glog.V(4).Infof("%s: next resync planned for %#v, forcing now", r.name, r.nextResync) + if err := r.store.Resync(); err != nil { + resyncerrc <- err + return + } + cleanup() + resyncCh, cleanup = r.resyncChan() } + }() + + for { + timemoutseconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0)) + options = api.ListOptions{ + ResourceVersion: resourceVersion, + // We want to avoid situations of hanging watchers. Stop any wachers that do not + // receive any events within the timeout window. + TimeoutSeconds: &timemoutseconds, + } + w, err := r.listerWatcher.Watch(options) if err != nil { switch err { @@ -338,21 +329,11 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { return nil } - if err := r.watchHandler(w, &resourceVersion, resyncCh, stopCh); err != nil { - if err != errorResyncRequested && err != errorStopRequested { + if err := r.watchHandler(w, &resourceVersion, resyncerrc, stopCh); err != nil { + if err != errorStopRequested { glog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedType, err) } - if err != errorResyncRequested { - return nil - } - } - if r.canForceResyncNow() { - glog.V(4).Infof("%s: next resync planned for %#v, forcing now", r.name, r.nextResync) - if err := r.store.Resync(); err != nil { - return err - } - cleanup() - resyncCh, cleanup = r.resyncChan() + return nil } } } @@ -367,7 +348,7 @@ func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) err } // watchHandler watches w and keeps *resourceVersion up to date. -func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *string, resyncCh <-chan time.Time, stopCh <-chan struct{}) error { +func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error { start := time.Now() eventCount := 0 @@ -380,8 +361,8 @@ loop: select { case <-stopCh: return errorStopRequested - case <-resyncCh: - return errorResyncRequested + case err := <-errc: + return err case event, ok := <-w.ResultChan(): if !ok { break loop diff --git a/pkg/client/cache/reflector_test.go b/pkg/client/cache/reflector_test.go index 8937ce17d9a..0e1e48ec42a 100644 --- a/pkg/client/cache/reflector_test.go +++ b/pkg/client/cache/reflector_test.go @@ -30,6 +30,8 @@ import ( "k8s.io/kubernetes/pkg/watch" ) +var nevererrc chan error + type testLW struct { ListFunc func() (runtime.Object, error) WatchFunc func(options api.ListOptions) (watch.Interface, error) @@ -84,7 +86,7 @@ func TestRunUntil(t *testing.T) { // Synchronously add a dummy pod into the watch channel so we // know the RunUntil go routine is in the watch handler. fw.Add(&api.Pod{ObjectMeta: api.ObjectMeta{Name: "bar"}}) - stopCh <- struct{}{} + close(stopCh) select { case _, ok := <-fw.ResultChan(): if ok { @@ -129,7 +131,7 @@ func TestReflectorWatchHandlerError(t *testing.T) { fw.Stop() }() var resumeRV string - err := g.watchHandler(fw, &resumeRV, neverExitWatch, wait.NeverStop) + err := g.watchHandler(fw, &resumeRV, nevererrc, wait.NeverStop) if err == nil { t.Errorf("unexpected non-error") } @@ -149,7 +151,7 @@ func TestReflectorWatchHandler(t *testing.T) { fw.Stop() }() var resumeRV string - err := g.watchHandler(fw, &resumeRV, neverExitWatch, wait.NeverStop) + err := g.watchHandler(fw, &resumeRV, nevererrc, wait.NeverStop) if err != nil { t.Errorf("unexpected error %v", err) } @@ -198,7 +200,7 @@ func TestReflectorStopWatch(t *testing.T) { var resumeRV string stopWatch := make(chan struct{}, 1) stopWatch <- struct{}{} - err := g.watchHandler(fw, &resumeRV, neverExitWatch, stopWatch) + err := g.watchHandler(fw, &resumeRV, nevererrc, stopWatch) if err != errorStopRequested { t.Errorf("expected stop error, got %q", err) }