From 7c0e9cda461e176959866b9c2d03b00e817e9b76 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Fri, 24 May 2024 10:10:27 +0200 Subject: [PATCH] Use timer instead of ticker in progress requestor Ticker behaves differently from what we want, we need a stable period interval, but ticker doesn't provide that. From NewTicker docstring: ``` The ticker will adjust the time interval or drop ticks to make up for slow receivers. ``` Unfortunatelly there is no way to test it as the FakeClock doesn't follow the real ticker behavior. --- .../pkg/storage/cacher/watch_cache_test.go | 43 ++++++++++++------- .../pkg/storage/cacher/watch_progress.go | 9 ++-- 2 files changed, 32 insertions(+), 20 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go index c3ddffa9125..7fc919e0681 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go @@ -140,30 +140,41 @@ func newTestWatchCache(capacity int, indexers *cache.Indexers) *testWatchCache { type immediateTickerFactory struct{} -func (t *immediateTickerFactory) NewTicker(d time.Duration) clock.Ticker { - return &immediateTicker{stopCh: make(chan struct{})} +func (t *immediateTickerFactory) NewTimer(d time.Duration) clock.Timer { + timer := immediateTicker{ + c: make(chan time.Time), + } + timer.Reset(d) + return &timer } type immediateTicker struct { - stopCh chan struct{} + c chan time.Time +} + +func (t *immediateTicker) Reset(d time.Duration) (active bool) { + select { + case <-t.c: + active = true + default: + } + go func() { + t.c <- time.Now() + }() + return active } func (t *immediateTicker) C() <-chan time.Time { - ch := make(chan time.Time) - go func() { - for { - select { - case ch <- time.Now(): - case <-t.stopCh: - return - } - } - }() - return ch + return t.c } -func (t *immediateTicker) Stop() { - close(t.stopCh) +func (t *immediateTicker) Stop() bool { + select { + case <-t.c: + return true + default: + return false + } } func (w *testWatchCache) RequestWatchProgress(ctx context.Context) error { diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_progress.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_progress.go index 13f50bc187d..5e8d7d1f4b7 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_progress.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_progress.go @@ -49,7 +49,7 @@ func newConditionalProgressRequester(requestWatchProgress WatchProgressRequester type WatchProgressRequester func(ctx context.Context) error type TickerFactory interface { - NewTicker(time.Duration) clock.Ticker + NewTimer(time.Duration) clock.Timer } // conditionalProgressRequester will request progress notification if there @@ -78,8 +78,8 @@ func (pr *conditionalProgressRequester) Run(stopCh <-chan struct{}) { pr.stopped = true pr.cond.Signal() }() - ticker := pr.clock.NewTicker(progressRequestPeriod) - defer ticker.Stop() + timer := pr.clock.NewTimer(progressRequestPeriod) + defer timer.Stop() for { stopped := func() bool { pr.mux.RLock() @@ -94,7 +94,8 @@ func (pr *conditionalProgressRequester) Run(stopCh <-chan struct{}) { } select { - case <-ticker.C(): + case <-timer.C(): + timer.Reset(progressRequestPeriod) shouldRequest := func() bool { pr.mux.RLock() defer pr.mux.RUnlock()