decouple timeBudget from real clock

instead of using a goroutine refreshing the budget, obtain
the value from the last time the budget was accessed.
This commit is contained in:
Antonio Ojea 2021-07-20 13:54:59 +02:00
parent 59c0523bca
commit dd2c383060
3 changed files with 73 additions and 39 deletions

View File

@ -342,7 +342,7 @@ func NewCacherFromConfig(config Config) (*Cacher, error) {
},
// TODO: Figure out the correct value for the buffer size.
incoming: make(chan watchCacheEvent, 100),
dispatchTimeoutBudget: newTimeBudget(stopCh),
dispatchTimeoutBudget: newTimeBudget(),
// We need to (potentially) stop both:
// - wait.Until go-routine
// - reflector.ListAndWatch

View File

@ -19,6 +19,8 @@ package cacher
import (
"sync"
"time"
"k8s.io/apimachinery/pkg/util/clock"
)
const (
@ -46,42 +48,39 @@ type timeBudget interface {
type timeBudgetImpl struct {
sync.Mutex
budget time.Duration
refresh time.Duration
clock clock.Clock
budget time.Duration
maxBudget time.Duration
refresh time.Duration
// last store last access time
last time.Time
}
func newTimeBudget(stopCh <-chan struct{}) timeBudget {
func newTimeBudget() timeBudget {
result := &timeBudgetImpl{
clock: clock.RealClock{},
budget: time.Duration(0),
refresh: refreshPerSecond,
maxBudget: maxBudget,
}
go result.periodicallyRefresh(stopCh)
result.last = result.clock.Now()
return result
}
func (t *timeBudgetImpl) periodicallyRefresh(stopCh <-chan struct{}) {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
t.Lock()
if t.budget = t.budget + t.refresh; t.budget > t.maxBudget {
t.budget = t.maxBudget
}
t.Unlock()
case <-stopCh:
return
}
}
}
func (t *timeBudgetImpl) takeAvailable() time.Duration {
t.Lock()
defer t.Unlock()
// budget accumulated since last access
now := t.clock.Now()
acc := now.Sub(t.last).Seconds() * t.refresh.Seconds()
if acc < 0 {
acc = 0
}
// update current budget and store the current time
if t.budget = t.budget + time.Duration(acc*1e9); t.budget > t.maxBudget {
t.budget = t.maxBudget
}
t.last = now
result := t.budget
t.budget = time.Duration(0)
return result
@ -94,6 +93,8 @@ func (t *timeBudgetImpl) returnUnused(unused time.Duration) {
// We used more than allowed.
return
}
// add the unused time directly to the budget
// takeAvailable() will take into account the elapsed time
if t.budget = t.budget + unused; t.budget > t.maxBudget {
t.budget = t.maxBudget
}

View File

@ -19,35 +19,68 @@ package cacher
import (
"testing"
"time"
"k8s.io/apimachinery/pkg/util/clock"
)
func TestTimeBudget(t *testing.T) {
fakeClock := clock.NewFakeClock(time.Now())
budget := &timeBudgetImpl{
clock: fakeClock,
budget: time.Duration(0),
maxBudget: time.Duration(200),
maxBudget: 200 * time.Millisecond,
refresh: 50 * time.Millisecond,
last: fakeClock.Now(),
}
if res := budget.takeAvailable(); res != time.Duration(0) {
t.Errorf("Expected: %v, got: %v", time.Duration(0), res)
}
budget.budget = time.Duration(100)
if res := budget.takeAvailable(); res != time.Duration(100) {
t.Errorf("Expected: %v, got: %v", time.Duration(100), res)
// wait for longer than the maxBudget
nextTime := time.Now().Add(10 * time.Second)
fakeClock.SetTime(nextTime)
if res := budget.takeAvailable(); res != budget.maxBudget {
t.Errorf("Expected: %v, got: %v", budget.maxBudget, res)
}
if res := budget.takeAvailable(); res != time.Duration(0) {
t.Errorf("Expected: %v, got: %v", time.Duration(0), res)
// add two refresh intervals to accumulate 2*refresh durations
nextTime = nextTime.Add(2 * time.Second)
fakeClock.SetTime(nextTime)
if res := budget.takeAvailable(); res != 2*budget.refresh {
t.Errorf("Expected: %v, got: %v", 2*budget.refresh, res)
}
budget.returnUnused(time.Duration(50))
if res := budget.takeAvailable(); res != time.Duration(50) {
t.Errorf("Expected: %v, got: %v", time.Duration(50), res)
// return one refresh duration to have only one refresh duration available
// we didn't advanced on time yet
budget.returnUnused(budget.refresh)
if res := budget.takeAvailable(); res != budget.refresh {
t.Errorf("Expected: %v, got: %v", budget.refresh, res)
}
budget.budget = time.Duration(100)
// return a negative value to the budget
// we didn't advanced on time yet
budget.returnUnused(-time.Duration(50))
if res := budget.takeAvailable(); res != time.Duration(100) {
t.Errorf("Expected: %v, got: %v", time.Duration(100), res)
if res := budget.takeAvailable(); res != time.Duration(0) {
t.Errorf("Expected: %v, got: %v", time.Duration(0), res)
}
// test overflow.
budget.returnUnused(time.Duration(500))
if res := budget.takeAvailable(); res != time.Duration(200) {
t.Errorf("Expected: %v, got: %v", time.Duration(200), res)
// handle back in time problem with an empty budget
nextTime = nextTime.Add(-2 * time.Minute)
fakeClock.SetTime(nextTime)
if res := budget.takeAvailable(); res != time.Duration(0) {
t.Errorf("Expected: %v, got: %v", time.Duration(0), res)
}
// wait for longer than the maxBudget
// verify that adding a negative value didn't affected
nextTime = nextTime.Add(10 * time.Minute)
fakeClock.SetTime(nextTime)
if res := budget.takeAvailable(); res != budget.maxBudget {
t.Errorf("Expected: %v, got: %v", budget.maxBudget, res)
}
// handle back in time problem with time on the budget
budget.returnUnused(10 * time.Second)
nextTime = nextTime.Add(-2 * time.Minute)
fakeClock.SetTime(nextTime)
if res := budget.takeAvailable(); res != budget.maxBudget {
t.Errorf("Expected: %v, got: %v", budget.maxBudget, res)
}
}