diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go index c3ddffa9125..7fc919e0681 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go @@ -140,30 +140,41 @@ func newTestWatchCache(capacity int, indexers *cache.Indexers) *testWatchCache { type immediateTickerFactory struct{} -func (t *immediateTickerFactory) NewTicker(d time.Duration) clock.Ticker { - return &immediateTicker{stopCh: make(chan struct{})} +func (t *immediateTickerFactory) NewTimer(d time.Duration) clock.Timer { + timer := immediateTicker{ + c: make(chan time.Time), + } + timer.Reset(d) + return &timer } type immediateTicker struct { - stopCh chan struct{} + c chan time.Time +} + +func (t *immediateTicker) Reset(d time.Duration) (active bool) { + select { + case <-t.c: + active = true + default: + } + go func() { + t.c <- time.Now() + }() + return active } func (t *immediateTicker) C() <-chan time.Time { - ch := make(chan time.Time) - go func() { - for { - select { - case ch <- time.Now(): - case <-t.stopCh: - return - } - } - }() - return ch + return t.c } -func (t *immediateTicker) Stop() { - close(t.stopCh) +func (t *immediateTicker) Stop() bool { + select { + case <-t.c: + return true + default: + return false + } } func (w *testWatchCache) RequestWatchProgress(ctx context.Context) error { diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_progress.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_progress.go index 13f50bc187d..087fb14e546 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_progress.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_progress.go @@ -42,14 +42,14 @@ func newConditionalProgressRequester(requestWatchProgress WatchProgressRequester requestWatchProgress: requestWatchProgress, contextMetadata: contextMetadata, } - pr.cond = sync.NewCond(pr.mux.RLocker()) + pr.cond = sync.NewCond(&pr.mux) return pr } type WatchProgressRequester func(ctx context.Context) error type TickerFactory interface { - NewTicker(time.Duration) clock.Ticker + NewTimer(time.Duration) clock.Timer } // conditionalProgressRequester will request progress notification if there @@ -59,7 +59,7 @@ type conditionalProgressRequester struct { requestWatchProgress WatchProgressRequester contextMetadata metadata.MD - mux sync.RWMutex + mux sync.Mutex cond *sync.Cond waiting int stopped bool @@ -78,12 +78,12 @@ func (pr *conditionalProgressRequester) Run(stopCh <-chan struct{}) { pr.stopped = true pr.cond.Signal() }() - ticker := pr.clock.NewTicker(progressRequestPeriod) - defer ticker.Stop() + timer := pr.clock.NewTimer(progressRequestPeriod) + defer timer.Stop() for { stopped := func() bool { - pr.mux.RLock() - defer pr.mux.RUnlock() + pr.mux.Lock() + defer pr.mux.Unlock() for pr.waiting == 0 && !pr.stopped { pr.cond.Wait() } @@ -94,15 +94,17 @@ func (pr *conditionalProgressRequester) Run(stopCh <-chan struct{}) { } select { - case <-ticker.C(): + case <-timer.C(): shouldRequest := func() bool { - pr.mux.RLock() - defer pr.mux.RUnlock() + pr.mux.Lock() + defer pr.mux.Unlock() return pr.waiting > 0 && !pr.stopped }() if !shouldRequest { + timer.Reset(0) continue } + timer.Reset(progressRequestPeriod) err := pr.requestWatchProgress(ctx) if err != nil { klog.V(4).InfoS("Error requesting bookmark", "err", err) @@ -124,5 +126,4 @@ func (pr *conditionalProgressRequester) Remove() { pr.mux.Lock() defer pr.mux.Unlock() pr.waiting -= 1 - pr.cond.Signal() }