mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-22 11:21:47 +00:00
Fix cacher test flakiness
This commit is contained in:
parent
b2de4a6159
commit
6eb71c4089
@ -263,7 +263,7 @@ type Cacher struct {
|
||||
|
||||
// Defines a time budget that can be spend on waiting for not-ready watchers
|
||||
// while dispatching event before shutting them down.
|
||||
dispatchTimeoutBudget *timeBudget
|
||||
dispatchTimeoutBudget timeBudget
|
||||
|
||||
// Handling graceful termination.
|
||||
stopLock sync.RWMutex
|
||||
|
@ -998,6 +998,14 @@ func TestBookmarksOnResourceVersionUpdates(t *testing.T) {
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
type fakeTimeBudget struct{}
|
||||
|
||||
func (f *fakeTimeBudget) takeAvailable() time.Duration {
|
||||
return 2 * time.Second
|
||||
}
|
||||
|
||||
func (f *fakeTimeBudget) returnUnused(_ time.Duration) {}
|
||||
|
||||
func TestDispatchEventWillNotBeBlockedByTimedOutWatcher(t *testing.T) {
|
||||
backingStorage := &dummyStorage{}
|
||||
cacher, _, err := newTestCacher(backingStorage)
|
||||
@ -1010,7 +1018,19 @@ func TestDispatchEventWillNotBeBlockedByTimedOutWatcher(t *testing.T) {
|
||||
cacher.ready.wait()
|
||||
|
||||
// Ensure there is some budget for slowing down processing.
|
||||
cacher.dispatchTimeoutBudget.returnUnused(50 * time.Millisecond)
|
||||
// When using the official `timeBudgetImpl` we were observing flakiness
|
||||
// due under the following conditions:
|
||||
// 1) the watch w1 is blocked, so we were consuming the whole budget once
|
||||
// its buffer was filled in (10 items)
|
||||
// 2) the budget is refreshed once per second, so it basically wasn't
|
||||
// happening in the test at all
|
||||
// 3) if the test was cpu-starved and we weren't able to consume events
|
||||
// from w2 ResultCh it could have happened that its buffer was also
|
||||
// filling in and given we no longer had timeBudget (consumed in (1))
|
||||
// trying to put next item was simply breaking the watch
|
||||
// Using fakeTimeBudget gives us always a budget to wait and have a test
|
||||
// pick up something from ResultCh in the meantime.
|
||||
cacher.dispatchTimeoutBudget = &fakeTimeBudget{}
|
||||
|
||||
makePod := func(i int) *examplev1.Pod {
|
||||
return &examplev1.Pod{
|
||||
@ -1055,8 +1075,6 @@ func TestDispatchEventWillNotBeBlockedByTimedOutWatcher(t *testing.T) {
|
||||
shouldContinue = false
|
||||
break
|
||||
}
|
||||
// Ensure there is some budget for fast watcher after slower one is blocked.
|
||||
cacher.dispatchTimeoutBudget.returnUnused(50 * time.Millisecond)
|
||||
if event.Type == watch.Added {
|
||||
eventsCount++
|
||||
if eventsCount == totalPods {
|
||||
|
@ -39,7 +39,12 @@ const (
|
||||
// NOTE: It's not recommended to be used concurrently from multiple threads -
|
||||
// if first user takes the whole timeout, the second one will get 0 timeout
|
||||
// even though the first one may return something later.
|
||||
type timeBudget struct {
|
||||
type timeBudget interface {
|
||||
takeAvailable() time.Duration
|
||||
returnUnused(unused time.Duration)
|
||||
}
|
||||
|
||||
type timeBudgetImpl struct {
|
||||
sync.Mutex
|
||||
budget time.Duration
|
||||
|
||||
@ -47,8 +52,8 @@ type timeBudget struct {
|
||||
maxBudget time.Duration
|
||||
}
|
||||
|
||||
func newTimeBudget(stopCh <-chan struct{}) *timeBudget {
|
||||
result := &timeBudget{
|
||||
func newTimeBudget(stopCh <-chan struct{}) timeBudget {
|
||||
result := &timeBudgetImpl{
|
||||
budget: time.Duration(0),
|
||||
refresh: refreshPerSecond,
|
||||
maxBudget: maxBudget,
|
||||
@ -57,7 +62,7 @@ func newTimeBudget(stopCh <-chan struct{}) *timeBudget {
|
||||
return result
|
||||
}
|
||||
|
||||
func (t *timeBudget) periodicallyRefresh(stopCh <-chan struct{}) {
|
||||
func (t *timeBudgetImpl) periodicallyRefresh(stopCh <-chan struct{}) {
|
||||
ticker := time.NewTicker(time.Second)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
@ -74,7 +79,7 @@ func (t *timeBudget) periodicallyRefresh(stopCh <-chan struct{}) {
|
||||
}
|
||||
}
|
||||
|
||||
func (t *timeBudget) takeAvailable() time.Duration {
|
||||
func (t *timeBudgetImpl) takeAvailable() time.Duration {
|
||||
t.Lock()
|
||||
defer t.Unlock()
|
||||
result := t.budget
|
||||
@ -82,7 +87,7 @@ func (t *timeBudget) takeAvailable() time.Duration {
|
||||
return result
|
||||
}
|
||||
|
||||
func (t *timeBudget) returnUnused(unused time.Duration) {
|
||||
func (t *timeBudgetImpl) returnUnused(unused time.Duration) {
|
||||
t.Lock()
|
||||
defer t.Unlock()
|
||||
if unused < 0 {
|
||||
|
@ -22,7 +22,7 @@ import (
|
||||
)
|
||||
|
||||
func TestTimeBudget(t *testing.T) {
|
||||
budget := &timeBudget{
|
||||
budget := &timeBudgetImpl{
|
||||
budget: time.Duration(0),
|
||||
maxBudget: time.Duration(200),
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user