From 7c0e9cda461e176959866b9c2d03b00e817e9b76 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Fri, 24 May 2024 10:10:27 +0200 Subject: [PATCH 1/4] Use timer instead of ticker in progress requestor Ticker behaves differently from what we want, we need a stable period interval, but ticker doesn't provide that. From NewTicker docstring: ``` The ticker will adjust the time interval or drop ticks to make up for slow receivers. ``` Unfortunatelly there is no way to test it as the FakeClock doesn't follow the real ticker behavior. --- .../pkg/storage/cacher/watch_cache_test.go | 43 ++++++++++++------- .../pkg/storage/cacher/watch_progress.go | 9 ++-- 2 files changed, 32 insertions(+), 20 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go index c3ddffa9125..7fc919e0681 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go @@ -140,30 +140,41 @@ func newTestWatchCache(capacity int, indexers *cache.Indexers) *testWatchCache { type immediateTickerFactory struct{} -func (t *immediateTickerFactory) NewTicker(d time.Duration) clock.Ticker { - return &immediateTicker{stopCh: make(chan struct{})} +func (t *immediateTickerFactory) NewTimer(d time.Duration) clock.Timer { + timer := immediateTicker{ + c: make(chan time.Time), + } + timer.Reset(d) + return &timer } type immediateTicker struct { - stopCh chan struct{} + c chan time.Time +} + +func (t *immediateTicker) Reset(d time.Duration) (active bool) { + select { + case <-t.c: + active = true + default: + } + go func() { + t.c <- time.Now() + }() + return active } func (t *immediateTicker) C() <-chan time.Time { - ch := make(chan time.Time) - go func() { - for { - select { - case ch <- time.Now(): - case <-t.stopCh: - return - } - } - }() - return ch + return t.c } -func (t *immediateTicker) Stop() { - close(t.stopCh) +func (t *immediateTicker) Stop() bool { + select { + case <-t.c: + return true + default: + return false + } } func (w *testWatchCache) RequestWatchProgress(ctx context.Context) error { diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_progress.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_progress.go index 13f50bc187d..5e8d7d1f4b7 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_progress.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_progress.go @@ -49,7 +49,7 @@ func newConditionalProgressRequester(requestWatchProgress WatchProgressRequester type WatchProgressRequester func(ctx context.Context) error type TickerFactory interface { - NewTicker(time.Duration) clock.Ticker + NewTimer(time.Duration) clock.Timer } // conditionalProgressRequester will request progress notification if there @@ -78,8 +78,8 @@ func (pr *conditionalProgressRequester) Run(stopCh <-chan struct{}) { pr.stopped = true pr.cond.Signal() }() - ticker := pr.clock.NewTicker(progressRequestPeriod) - defer ticker.Stop() + timer := pr.clock.NewTimer(progressRequestPeriod) + defer timer.Stop() for { stopped := func() bool { pr.mux.RLock() @@ -94,7 +94,8 @@ func (pr *conditionalProgressRequester) Run(stopCh <-chan struct{}) { } select { - case <-ticker.C(): + case <-timer.C(): + timer.Reset(progressRequestPeriod) shouldRequest := func() bool { pr.mux.RLock() defer pr.mux.RUnlock() From e6b54149bb42d58301e34872ebbcf2ea4bcfb474 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Fri, 24 May 2024 10:13:52 +0200 Subject: [PATCH 2/4] Don't signal when lowering number of waiting routines Signal is not needed as we never need to wake up when the waiting is lowered, only when increased. --- .../src/k8s.io/apiserver/pkg/storage/cacher/watch_progress.go | 1 - 1 file changed, 1 deletion(-) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_progress.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_progress.go index 5e8d7d1f4b7..0946eecc253 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_progress.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_progress.go @@ -125,5 +125,4 @@ func (pr *conditionalProgressRequester) Remove() { pr.mux.Lock() defer pr.mux.Unlock() pr.waiting -= 1 - pr.cond.Signal() } From 544ea424826ef60d703c5f4fb91b2c6a95f303aa Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Fri, 24 May 2024 10:16:48 +0200 Subject: [PATCH 3/4] Use normal Mutex instead of RWMutex There is no benefit of having RWMutex as we have one reader and multiple writers. In such cases RWMutex has worse performance than Mutex. --- .../apiserver/pkg/storage/cacher/watch_progress.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_progress.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_progress.go index 0946eecc253..4c7121f14bb 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_progress.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_progress.go @@ -42,7 +42,7 @@ func newConditionalProgressRequester(requestWatchProgress WatchProgressRequester requestWatchProgress: requestWatchProgress, contextMetadata: contextMetadata, } - pr.cond = sync.NewCond(pr.mux.RLocker()) + pr.cond = sync.NewCond(&pr.mux) return pr } @@ -59,7 +59,7 @@ type conditionalProgressRequester struct { requestWatchProgress WatchProgressRequester contextMetadata metadata.MD - mux sync.RWMutex + mux sync.Mutex cond *sync.Cond waiting int stopped bool @@ -82,8 +82,8 @@ func (pr *conditionalProgressRequester) Run(stopCh <-chan struct{}) { defer timer.Stop() for { stopped := func() bool { - pr.mux.RLock() - defer pr.mux.RUnlock() + pr.mux.Lock() + defer pr.mux.Unlock() for pr.waiting == 0 && !pr.stopped { pr.cond.Wait() } @@ -97,8 +97,8 @@ func (pr *conditionalProgressRequester) Run(stopCh <-chan struct{}) { case <-timer.C(): timer.Reset(progressRequestPeriod) shouldRequest := func() bool { - pr.mux.RLock() - defer pr.mux.RUnlock() + pr.mux.Lock() + defer pr.mux.Unlock() return pr.waiting > 0 && !pr.stopped }() if !shouldRequest { From 98e384fcd093d3cff4767ee009f8164b83bd4753 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Fri, 24 May 2024 11:07:01 +0200 Subject: [PATCH 4/4] Reset timer to zero when no progress notify was sent --- .../src/k8s.io/apiserver/pkg/storage/cacher/watch_progress.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_progress.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_progress.go index 4c7121f14bb..087fb14e546 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_progress.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_progress.go @@ -95,15 +95,16 @@ func (pr *conditionalProgressRequester) Run(stopCh <-chan struct{}) { select { case <-timer.C(): - timer.Reset(progressRequestPeriod) shouldRequest := func() bool { pr.mux.Lock() defer pr.mux.Unlock() return pr.waiting > 0 && !pr.stopped }() if !shouldRequest { + timer.Reset(0) continue } + timer.Reset(progressRequestPeriod) err := pr.requestWatchProgress(ctx) if err != nil { klog.V(4).InfoS("Error requesting bookmark", "err", err)