diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go index c3ddffa9125..7fc919e0681 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go @@ -140,30 +140,41 @@ func newTestWatchCache(capacity int, indexers *cache.Indexers) *testWatchCache { type immediateTickerFactory struct{} -func (t *immediateTickerFactory) NewTicker(d time.Duration) clock.Ticker { - return &immediateTicker{stopCh: make(chan struct{})} +func (t *immediateTickerFactory) NewTimer(d time.Duration) clock.Timer { + timer := immediateTicker{ + c: make(chan time.Time), + } + timer.Reset(d) + return &timer } type immediateTicker struct { - stopCh chan struct{} + c chan time.Time +} + +func (t *immediateTicker) Reset(d time.Duration) (active bool) { + select { + case <-t.c: + active = true + default: + } + go func() { + t.c <- time.Now() + }() + return active } func (t *immediateTicker) C() <-chan time.Time { - ch := make(chan time.Time) - go func() { - for { - select { - case ch <- time.Now(): - case <-t.stopCh: - return - } - } - }() - return ch + return t.c } -func (t *immediateTicker) Stop() { - close(t.stopCh) +func (t *immediateTicker) Stop() bool { + select { + case <-t.c: + return true + default: + return false + } } func (w *testWatchCache) RequestWatchProgress(ctx context.Context) error { diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_progress.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_progress.go index 13f50bc187d..5e8d7d1f4b7 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_progress.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_progress.go @@ -49,7 +49,7 @@ func newConditionalProgressRequester(requestWatchProgress WatchProgressRequester type WatchProgressRequester func(ctx context.Context) error type TickerFactory interface { - NewTicker(time.Duration) clock.Ticker + NewTimer(time.Duration) clock.Timer } // conditionalProgressRequester will request progress notification if there @@ -78,8 +78,8 @@ func (pr *conditionalProgressRequester) Run(stopCh <-chan struct{}) { pr.stopped = true pr.cond.Signal() }() - ticker := pr.clock.NewTicker(progressRequestPeriod) - defer ticker.Stop() + timer := pr.clock.NewTimer(progressRequestPeriod) + defer timer.Stop() for { stopped := func() bool { pr.mux.RLock() @@ -94,7 +94,8 @@ func (pr *conditionalProgressRequester) Run(stopCh <-chan struct{}) { } select { - case <-ticker.C(): + case <-timer.C(): + timer.Reset(progressRequestPeriod) shouldRequest := func() bool { pr.mux.RLock() defer pr.mux.RUnlock()