From c1decb6763d2abf76d96aee8641ad56a23e0ba52 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Wed, 12 Jul 2023 13:36:51 +0200 Subject: [PATCH] Fix TestConditionalProgressRequester and TestWaitUntilFreshAndListTimeout flakes --- .../pkg/storage/cacher/watch_cache_test.go | 34 +++++++++++++++++-- .../pkg/storage/cacher/watch_progress.go | 8 +++-- .../pkg/storage/cacher/watch_progress_test.go | 26 ++++++++++---- 3 files changed, 57 insertions(+), 11 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go index 1a4c071e9f0..3a772f347f7 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go @@ -36,6 +36,7 @@ import ( "k8s.io/apimachinery/pkg/watch" "k8s.io/apiserver/pkg/storage" "k8s.io/client-go/tools/cache" + "k8s.io/utils/clock" testingclock "k8s.io/utils/clock/testing" ) @@ -118,10 +119,9 @@ func newTestWatchCache(capacity int, indexers *cache.Indexers) *testWatchCache { wc := &testWatchCache{} wc.bookmarkRevision = make(chan int64, 1) wc.stopCh = make(chan struct{}) - clock := testingclock.NewFakeClock(time.Now()) - pr := newConditionalProgressRequester(wc.RequestWatchProgress, clock) + pr := newConditionalProgressRequester(wc.RequestWatchProgress, &immediateTickerFactory{}) go pr.Run(wc.stopCh) - wc.watchCache = newWatchCache(keyFunc, mockHandler, getAttrsFunc, versioner, indexers, clock, schema.GroupResource{Resource: "pods"}, pr) + wc.watchCache = newWatchCache(keyFunc, mockHandler, getAttrsFunc, versioner, indexers, testingclock.NewFakeClock(time.Now()), schema.GroupResource{Resource: "pods"}, pr) // To preserve behavior of tests that assume a given capacity, // resize it to th expected size. wc.capacity = capacity @@ -132,6 +132,34 @@ func newTestWatchCache(capacity int, indexers *cache.Indexers) *testWatchCache { return wc } +type immediateTickerFactory struct{} + +func (t *immediateTickerFactory) NewTicker(d time.Duration) clock.Ticker { + return &immediateTicker{stopCh: make(chan struct{})} +} + +type immediateTicker struct { + stopCh chan struct{} +} + +func (t *immediateTicker) C() <-chan time.Time { + ch := make(chan time.Time) + go func() { + for { + select { + case ch <- time.Now(): + case <-t.stopCh: + return + } + } + }() + return ch +} + +func (t *immediateTicker) Stop() { + close(t.stopCh) +} + func (w *testWatchCache) RequestWatchProgress(ctx context.Context) error { go func() { select { diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_progress.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_progress.go index 332c27b5d2c..f44ca9325b8 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_progress.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_progress.go @@ -34,7 +34,7 @@ const ( progressRequestPeriod = 100 * time.Millisecond ) -func newConditionalProgressRequester(requestWatchProgress WatchProgressRequester, clock clock.WithTicker) *conditionalProgressRequester { +func newConditionalProgressRequester(requestWatchProgress WatchProgressRequester, clock TickerFactory) *conditionalProgressRequester { pr := &conditionalProgressRequester{ clock: clock, requestWatchProgress: requestWatchProgress, @@ -45,10 +45,14 @@ func newConditionalProgressRequester(requestWatchProgress WatchProgressRequester type WatchProgressRequester func(ctx context.Context) error +type TickerFactory interface { + NewTicker(time.Duration) clock.Ticker +} + // conditionalProgressRequester will request progress notification if there // is a request waiting for watch cache to be fresh. type conditionalProgressRequester struct { - clock clock.WithTicker + clock TickerFactory requestWatchProgress WatchProgressRequester mux sync.RWMutex diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_progress_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_progress_test.go index e8cc13c3c50..e004ed35125 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_progress_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_progress_test.go @@ -47,6 +47,11 @@ func TestConditionalProgressRequester(t *testing.T) { var wantRequestsSent int32 var requestsSent int32 + logger.Info("Wait for ticker to be created") + for !clock.HasWaiters() { + time.Sleep(pollPeriod) + } + logger.Info("No progress requests if no-one is waiting") clock.Step(progressRequestPeriod * 2) @@ -54,11 +59,20 @@ func TestConditionalProgressRequester(t *testing.T) { requestsSent = pr.progressRequestsSentCount.Load() return requestsSent == wantRequestsSent }); err != nil { - t.Errorf("Failed to wait progress requests, err: %s, want: %d , got %d", err, wantRequestsSent, requestsSent) + t.Fatalf("Failed to wait progress requests, err: %s, want: %d , got %d", err, wantRequestsSent, requestsSent) } - logger.Info("Adding allows progress request to be sent every period") + logger.Info("Adding waiters allows progress request to be sent") pr.Add() + wantRequestsSent++ + if err := pollConditionNoChange(pollPeriod, minimalNoChange, pollTimeout, func() bool { + requestsSent = pr.progressRequestsSentCount.Load() + return requestsSent == wantRequestsSent + }); err != nil { + t.Fatalf("Failed to wait progress requests, err: %s, want: %d , got %d", err, wantRequestsSent, requestsSent) + } + + logger.Info("Periodically request progress to be sent every period") for wantRequestsSent < 10 { clock.Step(progressRequestPeriod) wantRequestsSent++ @@ -67,7 +81,7 @@ func TestConditionalProgressRequester(t *testing.T) { requestsSent = pr.progressRequestsSentCount.Load() return requestsSent == wantRequestsSent }); err != nil { - t.Errorf("Failed to wait progress requests, err: %s, want: %d , got %d", err, wantRequestsSent, requestsSent) + t.Fatalf("Failed to wait progress requests, err: %s, want: %d , got %d", err, wantRequestsSent, requestsSent) } } pr.Remove() @@ -78,7 +92,7 @@ func TestConditionalProgressRequester(t *testing.T) { requestsSent = pr.progressRequestsSentCount.Load() return requestsSent == wantRequestsSent }); err != nil { - t.Errorf("Failed to wait progress requests, err: %s, want: %d , got %d", err, wantRequestsSent, requestsSent) + t.Fatalf("Failed to wait progress requests, err: %s, want: %d , got %d", err, wantRequestsSent, requestsSent) } logger.Info("No progress after stopping") @@ -87,7 +101,7 @@ func TestConditionalProgressRequester(t *testing.T) { requestsSent = pr.progressRequestsSentCount.Load() return requestsSent == wantRequestsSent }); err != nil { - t.Errorf("Failed to wait progress requests, err: %s, want: %d , got %d", err, wantRequestsSent, requestsSent) + t.Fatalf("Failed to wait progress requests, err: %s, want: %d , got %d", err, wantRequestsSent, requestsSent) } pr.Add() clock.Step(progressRequestPeriod * 2) @@ -95,7 +109,7 @@ func TestConditionalProgressRequester(t *testing.T) { requestsSent = pr.progressRequestsSentCount.Load() return requestsSent == wantRequestsSent }); err != nil { - t.Errorf("Failed to wait progress requests, err: %s, want: %d , got %d", err, wantRequestsSent, requestsSent) + t.Fatalf("Failed to wait progress requests, err: %s, want: %d , got %d", err, wantRequestsSent, requestsSent) } }