Use timer instead of ticker in progress requestor

Ticker behaves differently from what we want, we need a stable period
interval, but ticker doesn't provide that. From NewTicker docstring:
```
The ticker will adjust the time interval or drop ticks to make up for slow receivers.
```

Unfortunatelly there is no way to test it as the FakeClock doesn't
follow the real ticker behavior.
This commit is contained in:
Marek Siarkowicz 2024-05-24 10:10:27 +02:00
parent 3332eef27d
commit 7c0e9cda46
2 changed files with 32 additions and 20 deletions

View File

@ -140,30 +140,41 @@ func newTestWatchCache(capacity int, indexers *cache.Indexers) *testWatchCache {
type immediateTickerFactory struct{} type immediateTickerFactory struct{}
func (t *immediateTickerFactory) NewTicker(d time.Duration) clock.Ticker { func (t *immediateTickerFactory) NewTimer(d time.Duration) clock.Timer {
return &immediateTicker{stopCh: make(chan struct{})} timer := immediateTicker{
c: make(chan time.Time),
}
timer.Reset(d)
return &timer
} }
type immediateTicker struct { type immediateTicker struct {
stopCh chan struct{} c chan time.Time
}
func (t *immediateTicker) Reset(d time.Duration) (active bool) {
select {
case <-t.c:
active = true
default:
}
go func() {
t.c <- time.Now()
}()
return active
} }
func (t *immediateTicker) C() <-chan time.Time { func (t *immediateTicker) C() <-chan time.Time {
ch := make(chan time.Time) return t.c
go func() {
for {
select {
case ch <- time.Now():
case <-t.stopCh:
return
}
}
}()
return ch
} }
func (t *immediateTicker) Stop() { func (t *immediateTicker) Stop() bool {
close(t.stopCh) select {
case <-t.c:
return true
default:
return false
}
} }
func (w *testWatchCache) RequestWatchProgress(ctx context.Context) error { func (w *testWatchCache) RequestWatchProgress(ctx context.Context) error {

View File

@ -49,7 +49,7 @@ func newConditionalProgressRequester(requestWatchProgress WatchProgressRequester
type WatchProgressRequester func(ctx context.Context) error type WatchProgressRequester func(ctx context.Context) error
type TickerFactory interface { type TickerFactory interface {
NewTicker(time.Duration) clock.Ticker NewTimer(time.Duration) clock.Timer
} }
// conditionalProgressRequester will request progress notification if there // conditionalProgressRequester will request progress notification if there
@ -78,8 +78,8 @@ func (pr *conditionalProgressRequester) Run(stopCh <-chan struct{}) {
pr.stopped = true pr.stopped = true
pr.cond.Signal() pr.cond.Signal()
}() }()
ticker := pr.clock.NewTicker(progressRequestPeriod) timer := pr.clock.NewTimer(progressRequestPeriod)
defer ticker.Stop() defer timer.Stop()
for { for {
stopped := func() bool { stopped := func() bool {
pr.mux.RLock() pr.mux.RLock()
@ -94,7 +94,8 @@ func (pr *conditionalProgressRequester) Run(stopCh <-chan struct{}) {
} }
select { select {
case <-ticker.C(): case <-timer.C():
timer.Reset(progressRequestPeriod)
shouldRequest := func() bool { shouldRequest := func() bool {
pr.mux.RLock() pr.mux.RLock()
defer pr.mux.RUnlock() defer pr.mux.RUnlock()