implement backoff manager

This commit is contained in:
Harry Zhang 2020-02-04 11:34:15 -08:00
parent 4b29407945
commit fc8a39d439
3 changed files with 144 additions and 26 deletions

View File

@ -10,7 +10,10 @@ go_test(
name = "go_default_test", name = "go_default_test",
srcs = ["wait_test.go"], srcs = ["wait_test.go"],
embed = [":go_default_library"], embed = [":go_default_library"],
deps = ["//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library"], deps = [
"//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
],
) )
go_library( go_library(
@ -21,7 +24,10 @@ go_library(
], ],
importmap = "k8s.io/kubernetes/vendor/k8s.io/apimachinery/pkg/util/wait", importmap = "k8s.io/kubernetes/vendor/k8s.io/apimachinery/pkg/util/wait",
importpath = "k8s.io/apimachinery/pkg/util/wait", importpath = "k8s.io/apimachinery/pkg/util/wait",
deps = ["//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library"], deps = [
"//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
],
) )
filegroup( filegroup(

View File

@ -19,10 +19,12 @@ package wait
import ( import (
"context" "context"
"errors" "errors"
"math"
"math/rand" "math/rand"
"sync" "sync"
"time" "time"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/runtime"
) )
@ -128,9 +130,15 @@ func NonSlidingUntilWithContext(ctx context.Context, f func(context.Context), pe
// Close stopCh to stop. f may not be invoked if stop channel is already // Close stopCh to stop. f may not be invoked if stop channel is already
// closed. Pass NeverStop to if you don't want it stop. // closed. Pass NeverStop to if you don't want it stop.
func JitterUntil(f func(), period time.Duration, jitterFactor float64, sliding bool, stopCh <-chan struct{}) { func JitterUntil(f func(), period time.Duration, jitterFactor float64, sliding bool, stopCh <-chan struct{}) {
var t *time.Timer BackoffUntil(f, NewJitteredBackoffManager(period, jitterFactor, &clock.RealClock{}), sliding, stopCh)
var sawTimeout bool }
// BackoffUntil loops until stop channel is closed, run f every duration given by BackoffManager.
//
// If sliding is true, the period is computed after f runs. If it is false then
// period includes the runtime for f.
func BackoffUntil(f func(), backoff BackoffManager, sliding bool, stopCh <-chan struct{}) {
var t clock.Timer
for { for {
select { select {
case <-stopCh: case <-stopCh:
@ -138,13 +146,8 @@ func JitterUntil(f func(), period time.Duration, jitterFactor float64, sliding b
default: default:
} }
jitteredPeriod := period
if jitterFactor > 0.0 {
jitteredPeriod = Jitter(period, jitterFactor)
}
if !sliding { if !sliding {
t = resetOrReuseTimer(t, jitteredPeriod, sawTimeout) t = backoff.Backoff()
} }
func() { func() {
@ -153,7 +156,7 @@ func JitterUntil(f func(), period time.Duration, jitterFactor float64, sliding b
}() }()
if sliding { if sliding {
t = resetOrReuseTimer(t, jitteredPeriod, sawTimeout) t = backoff.Backoff()
} }
// NOTE: b/c there is no priority selection in golang // NOTE: b/c there is no priority selection in golang
@ -164,8 +167,7 @@ func JitterUntil(f func(), period time.Duration, jitterFactor float64, sliding b
select { select {
case <-stopCh: case <-stopCh:
return return
case <-t.C: case <-t.C():
sawTimeout = true
} }
} }
} }
@ -283,6 +285,92 @@ func contextForChannel(parentCh <-chan struct{}) (context.Context, context.Cance
return ctx, cancel return ctx, cancel
} }
// BackoffManager manages backoff with a particular scheme based on its underlying implementation. It provides
// an interface to return a timer for backoff, and caller shall backoff until Timer.C returns. If the second Backoff()
// is called before the timer from the first Backoff() call finishes, the first timer will NOT be drained.
// The BackoffManager is supposed to be called in a single-threaded environment.
type BackoffManager interface {
Backoff() clock.Timer
}
type exponentialBackoffManagerImpl struct {
backoff *Backoff
backoffTimer clock.Timer
lastBackoffStart time.Time
initialBackoff time.Duration
backoffResetDuration time.Duration
clock clock.Clock
}
// NewExponentialBackoffManager returns a manager for managing exponential backoff. Each backoff is jittered and
// backoff will not exceed the given max. If the backoff is not called within resetDuration, the backoff is reset.
// This backoff manager is used to reduce load during upstream unhealthiness.
func NewExponentialBackoffManager(initBackoff, maxBackoff, resetDuration time.Duration, backoffFactor, jitter float64, c clock.Clock) BackoffManager {
return &exponentialBackoffManagerImpl{
backoff: &Backoff{
Duration: initBackoff,
Factor: backoffFactor,
Jitter: jitter,
// the current impl of wait.Backoff returns Backoff.Duration once steps are used up, which is not
// what we ideally need here, we set it to max int and assume we will never use up the steps
Steps: math.MaxInt32,
Cap: maxBackoff,
},
backoffTimer: c.NewTimer(0),
initialBackoff: initBackoff,
lastBackoffStart: c.Now(),
backoffResetDuration: resetDuration,
clock: c,
}
}
func (b *exponentialBackoffManagerImpl) getNextBackoff() time.Duration {
if b.clock.Now().Sub(b.lastBackoffStart) > b.backoffResetDuration {
b.backoff.Steps = math.MaxInt32
b.backoff.Duration = b.initialBackoff
}
b.lastBackoffStart = b.clock.Now()
return b.backoff.Step()
}
// Backoff implements BackoffManager.Backoff, it returns a timer so caller can block on the timer for backoff.
func (b *exponentialBackoffManagerImpl) Backoff() clock.Timer {
b.backoffTimer.Reset(b.getNextBackoff())
return b.backoffTimer
}
type jitteredBackoffManagerImpl struct {
clock clock.Clock
duration time.Duration
jitter float64
backoffTimer clock.Timer
}
// NewJitteredBackoffManager returns a BackoffManager that backoffs with given duration plus given jitter. If the jitter
// is negative, backoff will not be jittered.
func NewJitteredBackoffManager(duration time.Duration, jitter float64, c clock.Clock) BackoffManager {
return &jitteredBackoffManagerImpl{
clock: c,
duration: duration,
jitter: jitter,
backoffTimer: c.NewTimer(0),
}
}
func (j *jitteredBackoffManagerImpl) getNextBackoff() time.Duration {
jitteredPeriod := j.duration
if j.jitter > 0.0 {
jitteredPeriod = Jitter(j.duration, j.jitter)
}
return jitteredPeriod
}
func (j *jitteredBackoffManagerImpl) Backoff() clock.Timer {
j.backoffTimer.Reset(j.getNextBackoff())
return j.backoffTimer
}
// ExponentialBackoff repeats a condition check with exponential backoff. // ExponentialBackoff repeats a condition check with exponential backoff.
// //
// It repeatedly checks the condition and then sleeps, using `backoff.Step()` // It repeatedly checks the condition and then sleeps, using `backoff.Step()`
@ -503,16 +591,3 @@ func poller(interval, timeout time.Duration) WaitFunc {
return ch return ch
}) })
} }
// resetOrReuseTimer avoids allocating a new timer if one is already in use.
// Not safe for multiple threads.
func resetOrReuseTimer(t *time.Timer, d time.Duration, sawTimeout bool) *time.Timer {
if t == nil {
return time.NewTimer(d)
}
if !t.Stop() && !sawTimeout {
<-t.C
}
t.Reset(d)
return t
}

View File

@ -26,6 +26,7 @@ import (
"testing" "testing"
"time" "time"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/runtime"
) )
@ -694,3 +695,39 @@ func TestContextForChannel(t *testing.T) {
t.Errorf("unexepcted timeout waiting for parent to cancel child contexts") t.Errorf("unexepcted timeout waiting for parent to cancel child contexts")
} }
} }
func TestExponentialBackoffManagerGetNextBackoff(t *testing.T) {
fc := clock.NewFakeClock(time.Now())
backoff := NewExponentialBackoffManager(1, 10, 10, 2.0, 0.0, fc)
durations := []time.Duration{1, 2, 4, 8, 10, 10, 10}
for i := 0; i < len(durations); i++ {
generatedBackoff := backoff.(*exponentialBackoffManagerImpl).getNextBackoff()
if generatedBackoff != durations[i] {
t.Errorf("unexpected %d-th backoff: %d, expecting %d", i, generatedBackoff, durations[i])
}
}
fc.Step(11)
resetDuration := backoff.(*exponentialBackoffManagerImpl).getNextBackoff()
if resetDuration != 1 {
t.Errorf("after reset, backoff should be 1, but got %d", resetDuration)
}
}
func TestJitteredBackoffManagerGetNextBackoff(t *testing.T) {
// positive jitter
backoffMgr := NewJitteredBackoffManager(1, 1, clock.NewFakeClock(time.Now()))
for i := 0; i < 5; i++ {
backoff := backoffMgr.(*jitteredBackoffManagerImpl).getNextBackoff()
if backoff < 1 || backoff > 2 {
t.Errorf("backoff out of range: %d", backoff)
}
}
// negative jitter, shall be a fixed backoff
backoffMgr = NewJitteredBackoffManager(1, -1, clock.NewFakeClock(time.Now()))
backoff := backoffMgr.(*jitteredBackoffManagerImpl).getNextBackoff()
if backoff != 1 {
t.Errorf("backoff should be 1, but got %d", backoff)
}
}