From ab2cdceca1a1d16d17a3984a38443ea77c1316cd Mon Sep 17 00:00:00 2001 From: Laura Lorenz Date: Sat, 26 Oct 2024 02:17:23 +0000 Subject: [PATCH] Maintain 10 minute recovery threshold for container backoff Signed-off-by: Laura Lorenz Kubernetes-commit: a0b83a774102a6d8ce03ce03c9d0431b44559019 --- util/flowcontrol/backoff.go | 28 ++++--- util/flowcontrol/backoff_test.go | 125 +++++++++++++++++++++++++++++++ 2 files changed, 143 insertions(+), 10 deletions(-) diff --git a/util/flowcontrol/backoff.go b/util/flowcontrol/backoff.go index 82e4c4c4..899b8e34 100644 --- a/util/flowcontrol/backoff.go +++ b/util/flowcontrol/backoff.go @@ -32,7 +32,12 @@ type backoffEntry struct { type Backoff struct { sync.RWMutex - Clock clock.Clock + Clock clock.Clock + // HasExpiredFunc controls the logic that determines whether the backoff + // counter should be reset, and when to GC old backoff entries. If nil, the + // default hasExpired function will restart the backoff factor to the + // beginning after observing time has passed at least equal to 2*maxDuration + HasExpiredFunc func(eventTime time.Time, lastUpdate time.Time, maxDuration time.Duration) bool defaultDuration time.Duration maxDuration time.Duration perItemBackoff map[string]*backoffEntry @@ -93,7 +98,7 @@ func (p *Backoff) Next(id string, eventTime time.Time) { p.Lock() defer p.Unlock() entry, ok := p.perItemBackoff[id] - if !ok || hasExpired(eventTime, entry.lastUpdate, p.maxDuration) { + if !ok || p.hasExpired(eventTime, entry.lastUpdate, p.maxDuration) { entry = p.initEntryUnsafe(id) entry.backoff += p.jitter(entry.backoff) } else { @@ -119,7 +124,7 @@ func (p *Backoff) IsInBackOffSince(id string, eventTime time.Time) bool { if !ok { return false } - if hasExpired(eventTime, entry.lastUpdate, p.maxDuration) { + if p.hasExpired(eventTime, entry.lastUpdate, p.maxDuration) { return false } return p.Clock.Since(eventTime) < entry.backoff @@ -133,21 +138,21 @@ func (p *Backoff) IsInBackOffSinceUpdate(id string, eventTime time.Time) bool { if !ok { return false } - if hasExpired(eventTime, entry.lastUpdate, p.maxDuration) { + if p.hasExpired(eventTime, entry.lastUpdate, p.maxDuration) { return false } return eventTime.Sub(entry.lastUpdate) < entry.backoff } -// Garbage collect records that have aged past maxDuration. Backoff users are expected -// to invoke this periodically. +// Garbage collect records that have aged past their expiration, which defaults +// to 2*maxDuration (see hasExpired godoc). Backoff users are expected to invoke +// this periodically. func (p *Backoff) GC() { p.Lock() defer p.Unlock() now := p.Clock.Now() for id, entry := range p.perItemBackoff { - if now.Sub(entry.lastUpdate) > p.maxDuration*2 { - // GC when entry has not been updated for 2*maxDuration + if p.hasExpired(now, entry.lastUpdate, p.maxDuration) { delete(p.perItemBackoff, id) } } @@ -174,7 +179,10 @@ func (p *Backoff) jitter(delay time.Duration) time.Duration { return time.Duration(p.rand.Float64() * p.maxJitterFactor * float64(delay)) } -// After 2*maxDuration we restart the backoff factor to the beginning -func hasExpired(eventTime time.Time, lastUpdate time.Time, maxDuration time.Duration) bool { +// Unless an alternate function is provided, after 2*maxDuration we restart the backoff factor to the beginning +func (p *Backoff) hasExpired(eventTime time.Time, lastUpdate time.Time, maxDuration time.Duration) bool { + if p.HasExpiredFunc != nil { + return p.HasExpiredFunc(eventTime, lastUpdate, maxDuration) + } return eventTime.Sub(lastUpdate) > maxDuration*2 // consider stable if it's ok for twice the maxDuration } diff --git a/util/flowcontrol/backoff_test.go b/util/flowcontrol/backoff_test.go index d8d23dc4..305f968d 100644 --- a/util/flowcontrol/backoff_test.go +++ b/util/flowcontrol/backoff_test.go @@ -125,6 +125,67 @@ func TestBackoffGC(t *testing.T) { } } +func TestAlternateBackoffGC(t *testing.T) { + cases := []struct { + name string + hasExpiredFunc func(time.Time, time.Time, time.Duration) bool + maxDuration time.Duration + nonExpiredTime time.Duration + expiredTime time.Duration + }{ + { + name: "default GC", + maxDuration: time.Duration(50 * time.Second), + nonExpiredTime: time.Duration(5 * time.Second), + expiredTime: time.Duration(101 * time.Second), + }, + { + name: "GC later than 2*maxDuration", + hasExpiredFunc: func(eventTime time.Time, lastUpdate time.Time, maxDuration time.Duration) bool { + return eventTime.Sub(lastUpdate) >= 200*time.Second + }, + maxDuration: time.Duration(50 * time.Second), + nonExpiredTime: time.Duration(101 * time.Second), + expiredTime: time.Duration(501 * time.Second), + }, + } + + for _, tt := range cases { + clock := testingclock.NewFakeClock(time.Now()) + base := time.Second + maxDuration := tt.maxDuration + id := tt.name + + b := NewFakeBackOff(base, maxDuration, clock) + if tt.hasExpiredFunc != nil { + b.HasExpiredFunc = tt.hasExpiredFunc + } + + // initialize backoff + lastUpdate := clock.Now() + b.Next(id, lastUpdate) + + // increment to a time within GC expiration + clock.Step(tt.nonExpiredTime) + b.GC() + + // confirm we did not GC this entry + _, found := b.perItemBackoff[id] + if !found { + t.Errorf("[%s] expected GC to skip entry, elapsed time=%s", tt.name, clock.Since(lastUpdate)) + } + + // increment to a time beyond GC expiration + clock.Step(tt.expiredTime) + b.GC() + r, found := b.perItemBackoff[id] + if found { + t.Errorf("[%s] expected GC of entry after %s got entry %v", tt.name, clock.Since(lastUpdate), r) + } + + } +} + func TestIsInBackOffSinceUpdate(t *testing.T) { id := "_idIsInBackOffSinceUpdate" tc := testingclock.NewFakeClock(time.Now()) @@ -250,3 +311,67 @@ func TestBackoffWithJitter(t *testing.T) { t.Logf("exponentially backed off jittered delays: %v", delays) } + +func TestAlternateHasExpiredFunc(t *testing.T) { + cases := []struct { + name string + hasExpiredFunc func(time.Time, time.Time, time.Duration) bool + maxDuration time.Duration + nonExpiredTime time.Duration + expiredTime time.Duration + }{ + { + name: "default expiration", + maxDuration: time.Duration(50 * time.Second), + nonExpiredTime: time.Duration(5 * time.Second), + expiredTime: time.Duration(101 * time.Second), + }, + { + name: "expires faster than maxDuration", + hasExpiredFunc: func(eventTime time.Time, lastUpdate time.Time, maxDuration time.Duration) bool { + return eventTime.Sub(lastUpdate) >= 8*time.Second + }, + maxDuration: time.Duration(50 * time.Second), + nonExpiredTime: time.Duration(5 * time.Second), + expiredTime: time.Duration(9 * time.Second), + }, + } + + for _, tt := range cases { + clock := testingclock.NewFakeClock(time.Now()) + base := time.Second + maxDuration := tt.maxDuration + id := tt.name + + b := NewFakeBackOff(base, maxDuration, clock) + + if tt.hasExpiredFunc != nil { + b.HasExpiredFunc = tt.hasExpiredFunc + } + // initialize backoff + b.Next(id, clock.Now()) + + // increment to a time within expiration + clock.Step(tt.nonExpiredTime) + b.Next(id, clock.Now()) + + // confirm we did a backoff + w := b.Get(id) + if w < base*2 { + t.Errorf("case %v: backoff object has not incremented like expected: want %s, got %s", tt.name, base*2, w) + } + + // increment to a time beyond expiration + clock.Step(tt.expiredTime) + b.Next(id, clock.Now()) + + // confirm we have reset the backoff to base + w = b.Get(id) + if w != base { + t.Errorf("case %v: hasexpired value: expected %s (backoff to be reset to initial), got %s", tt.name, base, w) + } + + clock.SetTime(time.Now()) + b.Reset(id) + } +}