diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go index dc1e9456eb7..82a8f3065ba 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go @@ -263,7 +263,7 @@ type Cacher struct { // Defines a time budget that can be spend on waiting for not-ready watchers // while dispatching event before shutting them down. - dispatchTimeoutBudget *timeBudget + dispatchTimeoutBudget timeBudget // Handling graceful termination. stopLock sync.RWMutex diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go index 5f5b0a1838f..505d306b3fb 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go @@ -998,6 +998,14 @@ func TestBookmarksOnResourceVersionUpdates(t *testing.T) { wg.Wait() } +type fakeTimeBudget struct{} + +func (f *fakeTimeBudget) takeAvailable() time.Duration { + return 2 * time.Second +} + +func (f *fakeTimeBudget) returnUnused(_ time.Duration) {} + func TestDispatchEventWillNotBeBlockedByTimedOutWatcher(t *testing.T) { backingStorage := &dummyStorage{} cacher, _, err := newTestCacher(backingStorage) @@ -1010,7 +1018,19 @@ func TestDispatchEventWillNotBeBlockedByTimedOutWatcher(t *testing.T) { cacher.ready.wait() // Ensure there is some budget for slowing down processing. - cacher.dispatchTimeoutBudget.returnUnused(50 * time.Millisecond) + // When using the official `timeBudgetImpl` we were observing flakiness + // due under the following conditions: + // 1) the watch w1 is blocked, so we were consuming the whole budget once + // its buffer was filled in (10 items) + // 2) the budget is refreshed once per second, so it basically wasn't + // happening in the test at all + // 3) if the test was cpu-starved and we weren't able to consume events + // from w2 ResultCh it could have happened that its buffer was also + // filling in and given we no longer had timeBudget (consumed in (1)) + // trying to put next item was simply breaking the watch + // Using fakeTimeBudget gives us always a budget to wait and have a test + // pick up something from ResultCh in the meantime. + cacher.dispatchTimeoutBudget = &fakeTimeBudget{} makePod := func(i int) *examplev1.Pod { return &examplev1.Pod{ @@ -1055,8 +1075,6 @@ func TestDispatchEventWillNotBeBlockedByTimedOutWatcher(t *testing.T) { shouldContinue = false break } - // Ensure there is some budget for fast watcher after slower one is blocked. - cacher.dispatchTimeoutBudget.returnUnused(50 * time.Millisecond) if event.Type == watch.Added { eventsCount++ if eventsCount == totalPods { diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/time_budget.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/time_budget.go index 3de6cbc842f..2eb0fed32d9 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/time_budget.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/time_budget.go @@ -39,7 +39,12 @@ const ( // NOTE: It's not recommended to be used concurrently from multiple threads - // if first user takes the whole timeout, the second one will get 0 timeout // even though the first one may return something later. -type timeBudget struct { +type timeBudget interface { + takeAvailable() time.Duration + returnUnused(unused time.Duration) +} + +type timeBudgetImpl struct { sync.Mutex budget time.Duration @@ -47,8 +52,8 @@ type timeBudget struct { maxBudget time.Duration } -func newTimeBudget(stopCh <-chan struct{}) *timeBudget { - result := &timeBudget{ +func newTimeBudget(stopCh <-chan struct{}) timeBudget { + result := &timeBudgetImpl{ budget: time.Duration(0), refresh: refreshPerSecond, maxBudget: maxBudget, @@ -57,7 +62,7 @@ func newTimeBudget(stopCh <-chan struct{}) *timeBudget { return result } -func (t *timeBudget) periodicallyRefresh(stopCh <-chan struct{}) { +func (t *timeBudgetImpl) periodicallyRefresh(stopCh <-chan struct{}) { ticker := time.NewTicker(time.Second) defer ticker.Stop() for { @@ -74,7 +79,7 @@ func (t *timeBudget) periodicallyRefresh(stopCh <-chan struct{}) { } } -func (t *timeBudget) takeAvailable() time.Duration { +func (t *timeBudgetImpl) takeAvailable() time.Duration { t.Lock() defer t.Unlock() result := t.budget @@ -82,7 +87,7 @@ func (t *timeBudget) takeAvailable() time.Duration { return result } -func (t *timeBudget) returnUnused(unused time.Duration) { +func (t *timeBudgetImpl) returnUnused(unused time.Duration) { t.Lock() defer t.Unlock() if unused < 0 { diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/time_budget_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/time_budget_test.go index 1276a7e541c..54cc63646cb 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/time_budget_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/time_budget_test.go @@ -22,7 +22,7 @@ import ( ) func TestTimeBudget(t *testing.T) { - budget := &timeBudget{ + budget := &timeBudgetImpl{ budget: time.Duration(0), maxBudget: time.Duration(200), }