From ec93e854ca0924b7f984641b0bbdefc12b269f2b Mon Sep 17 00:00:00 2001 From: Abu Kashem Date: Mon, 8 Nov 2021 19:30:03 -0500 Subject: [PATCH] client-go: add jitter to flowcontrol.Backoff --- .../client-go/util/flowcontrol/backoff.go | 47 ++++++++++++--- .../util/flowcontrol/backoff_test.go | 57 +++++++++++++++++++ 2 files changed, 96 insertions(+), 8 deletions(-) diff --git a/staging/src/k8s.io/client-go/util/flowcontrol/backoff.go b/staging/src/k8s.io/client-go/util/flowcontrol/backoff.go index ca60e107f49..3ef88dbdb89 100644 --- a/staging/src/k8s.io/client-go/util/flowcontrol/backoff.go +++ b/staging/src/k8s.io/client-go/util/flowcontrol/backoff.go @@ -17,6 +17,7 @@ limitations under the License. package flowcontrol import ( + "math/rand" "sync" "time" @@ -36,23 +37,43 @@ type Backoff struct { defaultDuration time.Duration maxDuration time.Duration perItemBackoff map[string]*backoffEntry + rand *rand.Rand + + // maxJitterFactor adds jitter to the exponentially backed off delay. + // if maxJitterFactor is zero, no jitter is added to the delay in + // order to maintain current behavior. + maxJitterFactor float64 } func NewFakeBackOff(initial, max time.Duration, tc *testingclock.FakeClock) *Backoff { - return &Backoff{ - perItemBackoff: map[string]*backoffEntry{}, - Clock: tc, - defaultDuration: initial, - maxDuration: max, - } + return newBackoff(tc, initial, max, 0.0) } func NewBackOff(initial, max time.Duration) *Backoff { + return NewBackOffWithJitter(initial, max, 0.0) +} + +func NewFakeBackOffWithJitter(initial, max time.Duration, tc *testingclock.FakeClock, maxJitterFactor float64) *Backoff { + return newBackoff(tc, initial, max, maxJitterFactor) +} + +func NewBackOffWithJitter(initial, max time.Duration, maxJitterFactor float64) *Backoff { + clock := clock.RealClock{} + return newBackoff(clock, initial, max, maxJitterFactor) +} + +func newBackoff(clock clock.Clock, initial, max time.Duration, maxJitterFactor float64) *Backoff { + var random *rand.Rand + if maxJitterFactor > 0 { + random = rand.New(rand.NewSource(clock.Now().UnixNano())) + } return &Backoff{ perItemBackoff: map[string]*backoffEntry{}, - Clock: clock.RealClock{}, + Clock: clock, defaultDuration: initial, maxDuration: max, + maxJitterFactor: maxJitterFactor, + rand: random, } } @@ -75,8 +96,10 @@ func (p *Backoff) Next(id string, eventTime time.Time) { entry, ok := p.perItemBackoff[id] if !ok || hasExpired(eventTime, entry.lastUpdate, p.maxDuration) { entry = p.initEntryUnsafe(id) + entry.backoff += p.jitter(entry.backoff) } else { - delay := entry.backoff * 2 // exponential + delay := entry.backoff * 2 // exponential + delay += p.jitter(entry.backoff) // add some jitter to the delay entry.backoff = time.Duration(integer.Int64Min(int64(delay), int64(p.maxDuration))) } entry.lastUpdate = p.Clock.Now() @@ -144,6 +167,14 @@ func (p *Backoff) initEntryUnsafe(id string) *backoffEntry { return entry } +func (p *Backoff) jitter(delay time.Duration) time.Duration { + if p.rand == nil { + return 0 + } + + return time.Duration(p.rand.Float64() * p.maxJitterFactor * float64(delay)) +} + // After 2*maxDuration we restart the backoff factor to the beginning func hasExpired(eventTime time.Time, lastUpdate time.Time, maxDuration time.Duration) bool { return eventTime.Sub(lastUpdate) > maxDuration*2 // consider stable if it's ok for twice the maxDuration diff --git a/staging/src/k8s.io/client-go/util/flowcontrol/backoff_test.go b/staging/src/k8s.io/client-go/util/flowcontrol/backoff_test.go index 08f60d0eefc..d8d23dc4a89 100644 --- a/staging/src/k8s.io/client-go/util/flowcontrol/backoff_test.go +++ b/staging/src/k8s.io/client-go/util/flowcontrol/backoff_test.go @@ -193,3 +193,60 @@ func TestIsInBackOffSinceUpdate(t *testing.T) { } } } + +func TestBackoffWithJitter(t *testing.T) { + id := "_idJitter" + tc := testingclock.NewFakeClock(time.Now()) + + // test setup: we show 11 iterations, series of delays we expect with + // a jitter factor of zero each time: + // 100ms 200ms 400ms 800ms 1.6s 3.2s 06.4s 12.8s 25.6s 51.2s 1m42s + // and with jitter factor of 0.1 (max) each time: + // 110ms 231ms 485ms 1.0s 2.1s 4.4s 09.4s 19.8s 41.6s 1m27s 2m6s + // + // with the following configuration, it is guaranteed that the maximum delay + // will be reached even though we are unlucky and get jitter factor of zero. + // This ensures that this test covers the code path for checking whether + // maximum delay has been reached with jitter enabled. + initial := 100 * time.Millisecond + maxDuration := time.Minute + maxJitterFactor := 0.1 + attempts := 10 + + b := NewFakeBackOffWithJitter(initial, maxDuration, tc, maxJitterFactor) + + assert := func(t *testing.T, factor int, prevDelayGot, curDelayGot time.Duration) { + low := time.Duration((float64(prevDelayGot) * float64(factor))) + high := low + time.Duration(maxJitterFactor*float64(prevDelayGot)) + if !((curDelayGot > low && curDelayGot <= high) || curDelayGot == maxDuration) { + t.Errorf("jittered delay not within range: (%s - %s], but got %s", low, high, curDelayGot) + } + } + + delays := make([]time.Duration, 0) + next := func() time.Duration { + tc.Step(initial) + b.Next(id, tc.Now()) + + delay := b.Get(id) + delays = append(delays, delay) + return delay + } + + if got := b.Get(id); got != 0 { + t.Errorf("expected a zero wait durtion, but got: %s", got) + } + + delayGot := next() + assert(t, 1, initial, delayGot) + + prevDelayGot := delayGot + for i := 0; i < attempts; i++ { + delayGot = next() + assert(t, 2, prevDelayGot, delayGot) + + prevDelayGot = delayGot + } + + t.Logf("exponentially backed off jittered delays: %v", delays) +}