Merge pull request #18499 from wojtek-t/fix_reflector_race

Auto commit by PR queue bot
This commit is contained in:
k8s-merge-robot 2015-12-15 01:19:06 -08:00
commit 06df5c97e7
2 changed files with 7 additions and 6 deletions

View File

@ -63,6 +63,8 @@ type Reflector struct {
// the beginning of the next one.
period time.Duration
resyncPeriod time.Duration
// now() returns current time - exposed for testing purposes
now func() time.Time
// nextResync is approximate time of next resync (0 if not scheduled)
nextResync time.Time
// lastSyncResourceVersion is the resource version token last
@ -79,8 +81,6 @@ var (
// However, it can be modified to avoid periodic resync to break the
// TCP connection.
minWatchTimeout = 5 * time.Minute
now func() time.Time = time.Now
// If we are within 'forceResyncThreshold' from the next planned resync
// and are just before issueing Watch(), resync will be forced now.
forceResyncThreshold = 3 * time.Second
@ -116,6 +116,7 @@ func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{},
expectedType: reflect.TypeOf(expectedType),
period: time.Second,
resyncPeriod: resyncPeriod,
now: time.Now,
}
return r
}
@ -187,7 +188,7 @@ func (r *Reflector) resyncChan() (<-chan time.Time, func() bool) {
// always fail so we end up listing frequently. Then, if we don't
// manually stop the timer, we could end up with many timers active
// concurrently.
r.nextResync = now().Add(r.resyncPeriod)
r.nextResync = r.now().Add(r.resyncPeriod)
t := time.NewTimer(r.resyncPeriod)
return t.C, t.Stop
}
@ -204,7 +205,7 @@ func (r *Reflector) resyncChan() (<-chan time.Time, func() bool) {
// TODO: This should be parametrizable based on server load.
func (r *Reflector) timeoutForWatch() *int64 {
randTimeout := time.Duration(float64(minWatchTimeout) * (rand.Float64() + 1.0))
timeout := r.nextResync.Sub(now()) - timeoutThreshold
timeout := r.nextResync.Sub(r.now()) - timeoutThreshold
if timeout < 0 || randTimeout < timeout {
timeout = randTimeout
}
@ -218,7 +219,7 @@ func (r *Reflector) canForceResyncNow() bool {
if r.nextResync.IsZero() {
return false
}
return now().Add(forceResyncThreshold).After(r.nextResync)
return r.now().Add(forceResyncThreshold).After(r.nextResync)
}
// Returns error if ListAndWatch didn't even tried to initialize watch.

View File

@ -367,7 +367,6 @@ func TestReflectorResync(t *testing.T) {
s := NewStore(MetaNamespaceKeyFunc)
currentTime := time.Time{}
now = func() time.Time { return currentTime }
iteration := 0
lw := &testLW{
@ -396,6 +395,7 @@ func TestReflectorResync(t *testing.T) {
}
resyncPeriod := time.Minute
r := NewReflector(lw, &api.Pod{}, s, resyncPeriod)
r.now = func() time.Time { return currentTime }
r.ListAndWatch(util.NeverStop)
if iteration != 2 {