mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 03:41:45 +00:00
Merge pull request #25945 from xiang90/fix_reflector
Automatic merge from submit-queue cache: reflector should never stop watching A recent change tries to separate resync and relist. The motivation was to avoid triggering relist when a resync is required. However, the change is not effective since it stops the watcher. As hongchao mentioned in the original comment, today's storage interface will not deliever any progress notification to the watch chan. So any watcher that does not receive events for the last few seconds will not be able to catch up from the previous index after a hard close since the index of the last received event is out of the cache window inside etcd2. This pull request tries to fix this issue by not stoping watcher when a resync is required. /cc @hongchaodeng @wojtek-t @timothysc @rrati @smarterclayton
This commit is contained in:
commit
6528a4a6a3
83
pkg/client/cache/reflector.go
vendored
83
pkg/client/cache/reflector.go
vendored
@ -246,35 +246,6 @@ func (r *Reflector) resyncChan() (<-chan time.Time, func() bool) {
|
||||
return t.C, t.Stop
|
||||
}
|
||||
|
||||
// We want to avoid situations when periodic resyncing is breaking the TCP
|
||||
// connection.
|
||||
// If response`s body is not read to completion before calling body.Close(),
|
||||
// that TCP connection will not be reused in the future - see #15664 issue
|
||||
// for more details.
|
||||
// Thus, we set timeout for watch requests to be smaller than the remaining
|
||||
// time until next periodic resync and force resyncing ourself to avoid
|
||||
// breaking TCP connection.
|
||||
//
|
||||
// TODO: This should be parametrizable based on server load.
|
||||
func (r *Reflector) timeoutForWatch() *int64 {
|
||||
randTimeout := time.Duration(float64(minWatchTimeout) * (rand.Float64() + 1.0))
|
||||
timeout := r.nextResync.Sub(r.now()) - timeoutThreshold
|
||||
if timeout < 0 || randTimeout < timeout {
|
||||
timeout = randTimeout
|
||||
}
|
||||
timeoutSeconds := int64(timeout.Seconds())
|
||||
return &timeoutSeconds
|
||||
}
|
||||
|
||||
// Returns true if we are close enough to next planned periodic resync
|
||||
// and we can force resyncing ourself now.
|
||||
func (r *Reflector) canForceResyncNow() bool {
|
||||
if r.nextResync.IsZero() {
|
||||
return false
|
||||
}
|
||||
return r.now().Add(forceResyncThreshold).After(r.nextResync)
|
||||
}
|
||||
|
||||
// ListAndWatch first lists all items and get the resource version at the moment of call,
|
||||
// and then use the resource version to watch.
|
||||
// It returns error if ListAndWatch didn't even try to initialize watch.
|
||||
@ -306,13 +277,33 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
|
||||
}
|
||||
r.setLastSyncResourceVersion(resourceVersion)
|
||||
|
||||
for {
|
||||
options := api.ListOptions{
|
||||
ResourceVersion: resourceVersion,
|
||||
// We want to avoid situations when resyncing is breaking the TCP connection
|
||||
// - see comment for 'timeoutForWatch()' for more details.
|
||||
TimeoutSeconds: r.timeoutForWatch(),
|
||||
resyncerrc := make(chan error, 1)
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-resyncCh:
|
||||
case <-stopCh:
|
||||
return
|
||||
}
|
||||
glog.V(4).Infof("%s: next resync planned for %#v, forcing now", r.name, r.nextResync)
|
||||
if err := r.store.Resync(); err != nil {
|
||||
resyncerrc <- err
|
||||
return
|
||||
}
|
||||
cleanup()
|
||||
resyncCh, cleanup = r.resyncChan()
|
||||
}
|
||||
}()
|
||||
|
||||
for {
|
||||
timemoutseconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
|
||||
options = api.ListOptions{
|
||||
ResourceVersion: resourceVersion,
|
||||
// We want to avoid situations of hanging watchers. Stop any wachers that do not
|
||||
// receive any events within the timeout window.
|
||||
TimeoutSeconds: &timemoutseconds,
|
||||
}
|
||||
|
||||
w, err := r.listerWatcher.Watch(options)
|
||||
if err != nil {
|
||||
switch err {
|
||||
@ -338,21 +329,11 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := r.watchHandler(w, &resourceVersion, resyncCh, stopCh); err != nil {
|
||||
if err != errorResyncRequested && err != errorStopRequested {
|
||||
if err := r.watchHandler(w, &resourceVersion, resyncerrc, stopCh); err != nil {
|
||||
if err != errorStopRequested {
|
||||
glog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedType, err)
|
||||
}
|
||||
if err != errorResyncRequested {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
if r.canForceResyncNow() {
|
||||
glog.V(4).Infof("%s: next resync planned for %#v, forcing now", r.name, r.nextResync)
|
||||
if err := r.store.Resync(); err != nil {
|
||||
return err
|
||||
}
|
||||
cleanup()
|
||||
resyncCh, cleanup = r.resyncChan()
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -367,7 +348,7 @@ func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) err
|
||||
}
|
||||
|
||||
// watchHandler watches w and keeps *resourceVersion up to date.
|
||||
func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *string, resyncCh <-chan time.Time, stopCh <-chan struct{}) error {
|
||||
func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
|
||||
start := time.Now()
|
||||
eventCount := 0
|
||||
|
||||
@ -380,8 +361,8 @@ loop:
|
||||
select {
|
||||
case <-stopCh:
|
||||
return errorStopRequested
|
||||
case <-resyncCh:
|
||||
return errorResyncRequested
|
||||
case err := <-errc:
|
||||
return err
|
||||
case event, ok := <-w.ResultChan():
|
||||
if !ok {
|
||||
break loop
|
||||
|
10
pkg/client/cache/reflector_test.go
vendored
10
pkg/client/cache/reflector_test.go
vendored
@ -30,6 +30,8 @@ import (
|
||||
"k8s.io/kubernetes/pkg/watch"
|
||||
)
|
||||
|
||||
var nevererrc chan error
|
||||
|
||||
type testLW struct {
|
||||
ListFunc func() (runtime.Object, error)
|
||||
WatchFunc func(options api.ListOptions) (watch.Interface, error)
|
||||
@ -84,7 +86,7 @@ func TestRunUntil(t *testing.T) {
|
||||
// Synchronously add a dummy pod into the watch channel so we
|
||||
// know the RunUntil go routine is in the watch handler.
|
||||
fw.Add(&api.Pod{ObjectMeta: api.ObjectMeta{Name: "bar"}})
|
||||
stopCh <- struct{}{}
|
||||
close(stopCh)
|
||||
select {
|
||||
case _, ok := <-fw.ResultChan():
|
||||
if ok {
|
||||
@ -129,7 +131,7 @@ func TestReflectorWatchHandlerError(t *testing.T) {
|
||||
fw.Stop()
|
||||
}()
|
||||
var resumeRV string
|
||||
err := g.watchHandler(fw, &resumeRV, neverExitWatch, wait.NeverStop)
|
||||
err := g.watchHandler(fw, &resumeRV, nevererrc, wait.NeverStop)
|
||||
if err == nil {
|
||||
t.Errorf("unexpected non-error")
|
||||
}
|
||||
@ -149,7 +151,7 @@ func TestReflectorWatchHandler(t *testing.T) {
|
||||
fw.Stop()
|
||||
}()
|
||||
var resumeRV string
|
||||
err := g.watchHandler(fw, &resumeRV, neverExitWatch, wait.NeverStop)
|
||||
err := g.watchHandler(fw, &resumeRV, nevererrc, wait.NeverStop)
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error %v", err)
|
||||
}
|
||||
@ -198,7 +200,7 @@ func TestReflectorStopWatch(t *testing.T) {
|
||||
var resumeRV string
|
||||
stopWatch := make(chan struct{}, 1)
|
||||
stopWatch <- struct{}{}
|
||||
err := g.watchHandler(fw, &resumeRV, neverExitWatch, stopWatch)
|
||||
err := g.watchHandler(fw, &resumeRV, nevererrc, stopWatch)
|
||||
if err != errorStopRequested {
|
||||
t.Errorf("expected stop error, got %q", err)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user