diff --git a/pkg/client/cache/reflector.go b/pkg/client/cache/reflector.go index 7c1dd0d28e2..97ab5470ec2 100644 --- a/pkg/client/cache/reflector.go +++ b/pkg/client/cache/reflector.go @@ -63,6 +63,8 @@ type Reflector struct { // the beginning of the next one. period time.Duration resyncPeriod time.Duration + // now() returns current time - exposed for testing purposes + now func() time.Time // nextResync is approximate time of next resync (0 if not scheduled) nextResync time.Time // lastSyncResourceVersion is the resource version token last @@ -79,8 +81,6 @@ var ( // However, it can be modified to avoid periodic resync to break the // TCP connection. minWatchTimeout = 5 * time.Minute - - now func() time.Time = time.Now // If we are within 'forceResyncThreshold' from the next planned resync // and are just before issueing Watch(), resync will be forced now. forceResyncThreshold = 3 * time.Second @@ -116,6 +116,7 @@ func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, expectedType: reflect.TypeOf(expectedType), period: time.Second, resyncPeriod: resyncPeriod, + now: time.Now, } return r } @@ -187,7 +188,7 @@ func (r *Reflector) resyncChan() (<-chan time.Time, func() bool) { // always fail so we end up listing frequently. Then, if we don't // manually stop the timer, we could end up with many timers active // concurrently. - r.nextResync = now().Add(r.resyncPeriod) + r.nextResync = r.now().Add(r.resyncPeriod) t := time.NewTimer(r.resyncPeriod) return t.C, t.Stop } @@ -204,7 +205,7 @@ func (r *Reflector) resyncChan() (<-chan time.Time, func() bool) { // TODO: This should be parametrizable based on server load. func (r *Reflector) timeoutForWatch() *int64 { randTimeout := time.Duration(float64(minWatchTimeout) * (rand.Float64() + 1.0)) - timeout := r.nextResync.Sub(now()) - timeoutThreshold + timeout := r.nextResync.Sub(r.now()) - timeoutThreshold if timeout < 0 || randTimeout < timeout { timeout = randTimeout } @@ -218,7 +219,7 @@ func (r *Reflector) canForceResyncNow() bool { if r.nextResync.IsZero() { return false } - return now().Add(forceResyncThreshold).After(r.nextResync) + return r.now().Add(forceResyncThreshold).After(r.nextResync) } // Returns error if ListAndWatch didn't even tried to initialize watch. diff --git a/pkg/client/cache/reflector_test.go b/pkg/client/cache/reflector_test.go index 17c332ca37e..b4c0da0b243 100644 --- a/pkg/client/cache/reflector_test.go +++ b/pkg/client/cache/reflector_test.go @@ -367,7 +367,6 @@ func TestReflectorResync(t *testing.T) { s := NewStore(MetaNamespaceKeyFunc) currentTime := time.Time{} - now = func() time.Time { return currentTime } iteration := 0 lw := &testLW{ @@ -396,6 +395,7 @@ func TestReflectorResync(t *testing.T) { } resyncPeriod := time.Minute r := NewReflector(lw, &api.Pod{}, s, resyncPeriod) + r.now = func() time.Time { return currentTime } r.ListAndWatch(util.NeverStop) if iteration != 2 {