diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go index 03efddb5bc7..3eeba7103ec 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go @@ -342,7 +342,7 @@ func NewCacherFromConfig(config Config) (*Cacher, error) { }, // TODO: Figure out the correct value for the buffer size. incoming: make(chan watchCacheEvent, 100), - dispatchTimeoutBudget: newTimeBudget(stopCh), + dispatchTimeoutBudget: newTimeBudget(), // We need to (potentially) stop both: // - wait.Until go-routine // - reflector.ListAndWatch diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/time_budget.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/time_budget.go index 2eb0fed32d9..2e14a782359 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/time_budget.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/time_budget.go @@ -19,6 +19,8 @@ package cacher import ( "sync" "time" + + "k8s.io/apimachinery/pkg/util/clock" ) const ( @@ -46,42 +48,39 @@ type timeBudget interface { type timeBudgetImpl struct { sync.Mutex - budget time.Duration - - refresh time.Duration + clock clock.Clock + budget time.Duration maxBudget time.Duration + refresh time.Duration + // last store last access time + last time.Time } -func newTimeBudget(stopCh <-chan struct{}) timeBudget { +func newTimeBudget() timeBudget { result := &timeBudgetImpl{ + clock: clock.RealClock{}, budget: time.Duration(0), refresh: refreshPerSecond, maxBudget: maxBudget, } - go result.periodicallyRefresh(stopCh) + result.last = result.clock.Now() return result } -func (t *timeBudgetImpl) periodicallyRefresh(stopCh <-chan struct{}) { - ticker := time.NewTicker(time.Second) - defer ticker.Stop() - for { - select { - case <-ticker.C: - t.Lock() - if t.budget = t.budget + t.refresh; t.budget > t.maxBudget { - t.budget = t.maxBudget - } - t.Unlock() - case <-stopCh: - return - } - } -} - func (t *timeBudgetImpl) takeAvailable() time.Duration { t.Lock() defer t.Unlock() + // budget accumulated since last access + now := t.clock.Now() + acc := now.Sub(t.last).Seconds() * t.refresh.Seconds() + if acc < 0 { + acc = 0 + } + // update current budget and store the current time + if t.budget = t.budget + time.Duration(acc*1e9); t.budget > t.maxBudget { + t.budget = t.maxBudget + } + t.last = now result := t.budget t.budget = time.Duration(0) return result @@ -94,6 +93,8 @@ func (t *timeBudgetImpl) returnUnused(unused time.Duration) { // We used more than allowed. return } + // add the unused time directly to the budget + // takeAvailable() will take into account the elapsed time if t.budget = t.budget + unused; t.budget > t.maxBudget { t.budget = t.maxBudget } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/time_budget_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/time_budget_test.go index 54cc63646cb..5bd7f44a58b 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/time_budget_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/time_budget_test.go @@ -19,35 +19,68 @@ package cacher import ( "testing" "time" + + "k8s.io/apimachinery/pkg/util/clock" ) func TestTimeBudget(t *testing.T) { + fakeClock := clock.NewFakeClock(time.Now()) + budget := &timeBudgetImpl{ + clock: fakeClock, budget: time.Duration(0), - maxBudget: time.Duration(200), + maxBudget: 200 * time.Millisecond, + refresh: 50 * time.Millisecond, + last: fakeClock.Now(), } if res := budget.takeAvailable(); res != time.Duration(0) { t.Errorf("Expected: %v, got: %v", time.Duration(0), res) } - budget.budget = time.Duration(100) - if res := budget.takeAvailable(); res != time.Duration(100) { - t.Errorf("Expected: %v, got: %v", time.Duration(100), res) + + // wait for longer than the maxBudget + nextTime := time.Now().Add(10 * time.Second) + fakeClock.SetTime(nextTime) + if res := budget.takeAvailable(); res != budget.maxBudget { + t.Errorf("Expected: %v, got: %v", budget.maxBudget, res) } - if res := budget.takeAvailable(); res != time.Duration(0) { - t.Errorf("Expected: %v, got: %v", time.Duration(0), res) + // add two refresh intervals to accumulate 2*refresh durations + nextTime = nextTime.Add(2 * time.Second) + fakeClock.SetTime(nextTime) + if res := budget.takeAvailable(); res != 2*budget.refresh { + t.Errorf("Expected: %v, got: %v", 2*budget.refresh, res) } - budget.returnUnused(time.Duration(50)) - if res := budget.takeAvailable(); res != time.Duration(50) { - t.Errorf("Expected: %v, got: %v", time.Duration(50), res) + // return one refresh duration to have only one refresh duration available + // we didn't advanced on time yet + budget.returnUnused(budget.refresh) + if res := budget.takeAvailable(); res != budget.refresh { + t.Errorf("Expected: %v, got: %v", budget.refresh, res) } - budget.budget = time.Duration(100) + + // return a negative value to the budget + // we didn't advanced on time yet budget.returnUnused(-time.Duration(50)) - if res := budget.takeAvailable(); res != time.Duration(100) { - t.Errorf("Expected: %v, got: %v", time.Duration(100), res) + if res := budget.takeAvailable(); res != time.Duration(0) { + t.Errorf("Expected: %v, got: %v", time.Duration(0), res) } - // test overflow. - budget.returnUnused(time.Duration(500)) - if res := budget.takeAvailable(); res != time.Duration(200) { - t.Errorf("Expected: %v, got: %v", time.Duration(200), res) + // handle back in time problem with an empty budget + nextTime = nextTime.Add(-2 * time.Minute) + fakeClock.SetTime(nextTime) + if res := budget.takeAvailable(); res != time.Duration(0) { + t.Errorf("Expected: %v, got: %v", time.Duration(0), res) + } + // wait for longer than the maxBudget + // verify that adding a negative value didn't affected + nextTime = nextTime.Add(10 * time.Minute) + fakeClock.SetTime(nextTime) + if res := budget.takeAvailable(); res != budget.maxBudget { + t.Errorf("Expected: %v, got: %v", budget.maxBudget, res) + } + + // handle back in time problem with time on the budget + budget.returnUnused(10 * time.Second) + nextTime = nextTime.Add(-2 * time.Minute) + fakeClock.SetTime(nextTime) + if res := budget.takeAvailable(); res != budget.maxBudget { + t.Errorf("Expected: %v, got: %v", budget.maxBudget, res) } }