Fix TestConditionalProgressRequester and TestWaitUntilFreshAndListTimeout flakes

This commit is contained in:
Marek Siarkowicz 2023-07-12 13:36:51 +02:00
parent a8b90c9008
commit c1decb6763
3 changed files with 57 additions and 11 deletions

View File

@ -36,6 +36,7 @@ import (
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/storage"
"k8s.io/client-go/tools/cache"
"k8s.io/utils/clock"
testingclock "k8s.io/utils/clock/testing"
)
@ -118,10 +119,9 @@ func newTestWatchCache(capacity int, indexers *cache.Indexers) *testWatchCache {
wc := &testWatchCache{}
wc.bookmarkRevision = make(chan int64, 1)
wc.stopCh = make(chan struct{})
clock := testingclock.NewFakeClock(time.Now())
pr := newConditionalProgressRequester(wc.RequestWatchProgress, clock)
pr := newConditionalProgressRequester(wc.RequestWatchProgress, &immediateTickerFactory{})
go pr.Run(wc.stopCh)
wc.watchCache = newWatchCache(keyFunc, mockHandler, getAttrsFunc, versioner, indexers, clock, schema.GroupResource{Resource: "pods"}, pr)
wc.watchCache = newWatchCache(keyFunc, mockHandler, getAttrsFunc, versioner, indexers, testingclock.NewFakeClock(time.Now()), schema.GroupResource{Resource: "pods"}, pr)
// To preserve behavior of tests that assume a given capacity,
// resize it to th expected size.
wc.capacity = capacity
@ -132,6 +132,34 @@ func newTestWatchCache(capacity int, indexers *cache.Indexers) *testWatchCache {
return wc
}
type immediateTickerFactory struct{}
func (t *immediateTickerFactory) NewTicker(d time.Duration) clock.Ticker {
return &immediateTicker{stopCh: make(chan struct{})}
}
type immediateTicker struct {
stopCh chan struct{}
}
func (t *immediateTicker) C() <-chan time.Time {
ch := make(chan time.Time)
go func() {
for {
select {
case ch <- time.Now():
case <-t.stopCh:
return
}
}
}()
return ch
}
func (t *immediateTicker) Stop() {
close(t.stopCh)
}
func (w *testWatchCache) RequestWatchProgress(ctx context.Context) error {
go func() {
select {

View File

@ -34,7 +34,7 @@ const (
progressRequestPeriod = 100 * time.Millisecond
)
func newConditionalProgressRequester(requestWatchProgress WatchProgressRequester, clock clock.WithTicker) *conditionalProgressRequester {
func newConditionalProgressRequester(requestWatchProgress WatchProgressRequester, clock TickerFactory) *conditionalProgressRequester {
pr := &conditionalProgressRequester{
clock: clock,
requestWatchProgress: requestWatchProgress,
@ -45,10 +45,14 @@ func newConditionalProgressRequester(requestWatchProgress WatchProgressRequester
type WatchProgressRequester func(ctx context.Context) error
type TickerFactory interface {
NewTicker(time.Duration) clock.Ticker
}
// conditionalProgressRequester will request progress notification if there
// is a request waiting for watch cache to be fresh.
type conditionalProgressRequester struct {
clock clock.WithTicker
clock TickerFactory
requestWatchProgress WatchProgressRequester
mux sync.RWMutex

View File

@ -47,6 +47,11 @@ func TestConditionalProgressRequester(t *testing.T) {
var wantRequestsSent int32
var requestsSent int32
logger.Info("Wait for ticker to be created")
for !clock.HasWaiters() {
time.Sleep(pollPeriod)
}
logger.Info("No progress requests if no-one is waiting")
clock.Step(progressRequestPeriod * 2)
@ -54,11 +59,20 @@ func TestConditionalProgressRequester(t *testing.T) {
requestsSent = pr.progressRequestsSentCount.Load()
return requestsSent == wantRequestsSent
}); err != nil {
t.Errorf("Failed to wait progress requests, err: %s, want: %d , got %d", err, wantRequestsSent, requestsSent)
t.Fatalf("Failed to wait progress requests, err: %s, want: %d , got %d", err, wantRequestsSent, requestsSent)
}
logger.Info("Adding allows progress request to be sent every period")
logger.Info("Adding waiters allows progress request to be sent")
pr.Add()
wantRequestsSent++
if err := pollConditionNoChange(pollPeriod, minimalNoChange, pollTimeout, func() bool {
requestsSent = pr.progressRequestsSentCount.Load()
return requestsSent == wantRequestsSent
}); err != nil {
t.Fatalf("Failed to wait progress requests, err: %s, want: %d , got %d", err, wantRequestsSent, requestsSent)
}
logger.Info("Periodically request progress to be sent every period")
for wantRequestsSent < 10 {
clock.Step(progressRequestPeriod)
wantRequestsSent++
@ -67,7 +81,7 @@ func TestConditionalProgressRequester(t *testing.T) {
requestsSent = pr.progressRequestsSentCount.Load()
return requestsSent == wantRequestsSent
}); err != nil {
t.Errorf("Failed to wait progress requests, err: %s, want: %d , got %d", err, wantRequestsSent, requestsSent)
t.Fatalf("Failed to wait progress requests, err: %s, want: %d , got %d", err, wantRequestsSent, requestsSent)
}
}
pr.Remove()
@ -78,7 +92,7 @@ func TestConditionalProgressRequester(t *testing.T) {
requestsSent = pr.progressRequestsSentCount.Load()
return requestsSent == wantRequestsSent
}); err != nil {
t.Errorf("Failed to wait progress requests, err: %s, want: %d , got %d", err, wantRequestsSent, requestsSent)
t.Fatalf("Failed to wait progress requests, err: %s, want: %d , got %d", err, wantRequestsSent, requestsSent)
}
logger.Info("No progress after stopping")
@ -87,7 +101,7 @@ func TestConditionalProgressRequester(t *testing.T) {
requestsSent = pr.progressRequestsSentCount.Load()
return requestsSent == wantRequestsSent
}); err != nil {
t.Errorf("Failed to wait progress requests, err: %s, want: %d , got %d", err, wantRequestsSent, requestsSent)
t.Fatalf("Failed to wait progress requests, err: %s, want: %d , got %d", err, wantRequestsSent, requestsSent)
}
pr.Add()
clock.Step(progressRequestPeriod * 2)
@ -95,7 +109,7 @@ func TestConditionalProgressRequester(t *testing.T) {
requestsSent = pr.progressRequestsSentCount.Load()
return requestsSent == wantRequestsSent
}); err != nil {
t.Errorf("Failed to wait progress requests, err: %s, want: %d , got %d", err, wantRequestsSent, requestsSent)
t.Fatalf("Failed to wait progress requests, err: %s, want: %d , got %d", err, wantRequestsSent, requestsSent)
}
}