client-go: Replace deprecated BackoffManager with DelayFunc in Reflector

Move backoff documentation comments to var block for better discoverability.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

Kubernetes-commit: 33ebd41b2c1abdc03beabd9ccff3428a8fd46473
This commit is contained in:
Christian Nuss
2026-02-07 12:04:08 -05:00
committed by Kubernetes Publisher
parent fd1b7118fd
commit b5668cea26
2 changed files with 168 additions and 38 deletions

View File

@@ -21,6 +21,7 @@ import (
"errors"
"fmt"
"io"
"math"
"math/rand"
"reflect"
"strings"
@@ -50,9 +51,21 @@ import (
const defaultExpectedTypeName = "<unspecified>"
// We try to spread the load on apiserver by setting timeouts for
// watch requests - it is random in [minWatchTimeout, 2*minWatchTimeout].
var defaultMinWatchTimeout = 5 * time.Minute
var (
// We try to spread the load on apiserver by setting timeouts for
// watch requests - it is random in [minWatchTimeout, 2*minWatchTimeout].
defaultMinWatchTimeout = 5 * time.Minute
defaultMaxWatchTimeout = 2 * defaultMinWatchTimeout
// We used to make the call every 1sec (1 QPS), the goal here is to achieve ~98% traffic reduction when
// API server is not healthy. With these parameters, backoff will stop at [30,60) sec interval which is
// 0.22 QPS.
defaultBackoffInit = 800 * time.Millisecond
defaultBackoffMax = 30 * time.Second
// If we don't backoff for 2min, assume API server is healthy and we reset the backoff.
defaultBackoffReset = 2 * time.Minute
defaultBackoffFactor = 2.0
defaultBackoffJitter = 1.0
)
// ReflectorStore is the subset of cache.Store that the reflector uses
type ReflectorStore interface {
@@ -110,11 +123,14 @@ type Reflector struct {
store ReflectorStore
// listerWatcher is used to perform lists and watches.
listerWatcher ListerWatcherWithContext
// backoff manages backoff of ListWatch
backoffManager wait.BackoffManager
resyncPeriod time.Duration
// delay returns the next backoff interval for retries.
resyncPeriod time.Duration
delayHandler wait.DelayFunc
// minWatchTimeout defines the minimum timeout for watch requests.
minWatchTimeout time.Duration
// maxWatchTimeout defines the maximum timeout for watch requests.
// Actual timeout is random in [minWatchTimeout, maxWatchTimeout].
maxWatchTimeout time.Duration
// clock allows tests to manipulate time
clock clock.Clock
// paginatedResult defines whether pagination should be forced for list calls.
@@ -259,6 +275,12 @@ type ReflectorOptions struct {
// Clock allows tests to control time. If unset defaults to clock.RealClock{}
Clock clock.Clock
// Backoff is an optional custom backoff configuration.
// If set, it will be used instead of the default exponential backoff.
// DelayWithReset(clock, resetDuration) will be called on it to create the delay function.
// TODO(#136943): Expose this configuration through SharedInformerFactory.
Backoff *wait.Backoff
}
// NewReflectorWithOptions creates a new Reflector object which will keep the
@@ -276,21 +298,42 @@ func NewReflectorWithOptions(lw ListerWatcher, expectedType interface{}, store R
if reflectorClock == nil {
reflectorClock = clock.RealClock{}
}
minWatchTimeout := defaultMinWatchTimeout
maxWatchTimeout := defaultMaxWatchTimeout
if options.MinWatchTimeout > defaultMinWatchTimeout {
minWatchTimeout = options.MinWatchTimeout
maxWatchTimeout = 2 * minWatchTimeout
}
if maxWatchTimeout < minWatchTimeout {
klog.TODO().V(3).Info(
"maxWatchTimeout was less than minWatchTimeout, overriding to minWatchTimeout. Watch timeout randomization is disabled.",
"minWatchTimeout", minWatchTimeout,
"maxWatchTimeout", maxWatchTimeout,
)
maxWatchTimeout = minWatchTimeout
}
backoff := options.Backoff
if backoff == nil {
backoff = &wait.Backoff{
Duration: defaultBackoffInit,
Cap: defaultBackoffMax,
Steps: int(math.Ceil(float64(defaultBackoffMax) / float64(defaultBackoffInit))),
Factor: defaultBackoffFactor,
Jitter: defaultBackoffJitter,
}
}
r := &Reflector{
name: options.Name,
resyncPeriod: options.ResyncPeriod,
minWatchTimeout: minWatchTimeout,
typeDescription: options.TypeDescription,
listerWatcher: ToListerWatcherWithContext(lw),
store: store,
// We used to make the call every 1sec (1 QPS), the goal here is to achieve ~98% traffic reduction when
// API server is not healthy. With these parameters, backoff will stop at [30,60) sec interval which is
// 0.22 QPS. If we don't backoff for 2min, assume API server is healthy and we reset the backoff.
backoffManager: wait.NewExponentialBackoffManager(800*time.Millisecond, 30*time.Second, 2*time.Minute, 2.0, 1.0, reflectorClock),
name: options.Name,
resyncPeriod: options.ResyncPeriod,
minWatchTimeout: minWatchTimeout,
maxWatchTimeout: maxWatchTimeout,
typeDescription: options.TypeDescription,
listerWatcher: ToListerWatcherWithContext(lw),
store: store,
delayHandler: backoff.DelayWithReset(reflectorClock, defaultBackoffReset),
clock: reflectorClock,
watchErrorHandler: WatchErrorHandlerWithContext(DefaultWatchErrorHandler),
expectedType: reflect.TypeOf(expectedType),
@@ -380,11 +423,16 @@ func (r *Reflector) Run(stopCh <-chan struct{}) {
func (r *Reflector) RunWithContext(ctx context.Context) {
logger := klog.FromContext(ctx)
logger.V(3).Info("Starting reflector", "type", r.typeDescription, "resyncPeriod", r.resyncPeriod, "reflector", r.name)
wait.BackoffUntil(func() {
// Until runs the loop immediately (immediate=true) and resets the backoff timer after each
// successful iteration (sliding=true). See backoff constants at top of file for generalized QPS targets (~0.22 QPS).
if err := r.delayHandler.Until(ctx, true, true, func(ctx context.Context) (bool, error) {
if err := r.ListAndWatchWithContext(ctx); err != nil {
r.watchErrorHandler(ctx, r, err)
}
}, r.backoffManager, true, ctx.Done())
return false, nil
}); err != nil {
logger.Error(err, "Reflector stopped with error", "type", r.typeDescription, "reflector", r.name)
}
logger.V(3).Info("Stopping reflector", "type", r.typeDescription, "resyncPeriod", r.resyncPeriod, "reflector", r.name)
}
@@ -539,7 +587,7 @@ func (r *Reflector) watch(ctx context.Context, w watch.Interface, resyncerrc cha
// if w is already initialized, it must be past any synthetic non-rv-ordered added events
propagateRVFromStart := true
if w == nil {
timeoutSeconds := int64(r.minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
timeoutSeconds := int64(r.minWatchTimeout.Seconds() + rand.Float64()*(r.maxWatchTimeout.Seconds()-r.minWatchTimeout.Seconds()))
options := metav1.ListOptions{
ResourceVersion: r.LastSyncResourceVersion(),
// We want to avoid situations of hanging watchers. Stop any watchers that do not
@@ -563,7 +611,7 @@ func (r *Reflector) watch(ctx context.Context, w watch.Interface, resyncerrc cha
select {
case <-stopCh:
return nil
case <-r.backoffManager.Backoff().C():
case <-r.clock.After(r.delayHandler()):
continue
}
}
@@ -608,7 +656,7 @@ func (r *Reflector) watch(ctx context.Context, w watch.Interface, resyncerrc cha
select {
case <-stopCh:
return nil
case <-r.backoffManager.Backoff().C():
case <-r.clock.After(r.delayHandler()):
continue
}
case apierrors.IsInternalError(err) && retry.ShouldRetry():
@@ -762,7 +810,7 @@ func (r *Reflector) watchList(ctx context.Context) (watch.Interface, error) {
isErrorRetriableWithSideEffectsFn := func(err error) bool {
if canRetry := isWatchErrorRetriable(err); canRetry {
logger.V(2).Info("watch-list failed - backing off", "reflector", r.name, "type", r.typeDescription, "err", err)
<-r.backoffManager.Backoff().C()
<-r.clock.After(r.delayHandler())
return true
}
if isExpiredError(err) || isTooLargeResourceVersionError(err) {
@@ -798,7 +846,7 @@ func (r *Reflector) watchList(ctx context.Context) (watch.Interface, error) {
// TODO(#115478): large "list", slow clients, slow network, p&f
// might slow down streaming and eventually fail.
// maybe in such a case we should retry with an increased timeout?
timeoutSeconds := int64(r.minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
timeoutSeconds := int64(r.minWatchTimeout.Seconds() + rand.Float64()*(r.maxWatchTimeout.Seconds()-r.minWatchTimeout.Seconds()))
options := metav1.ListOptions{
ResourceVersion: lastKnownRV,
AllowWatchBookmarks: true,

View File

@@ -745,7 +745,14 @@ func TestReflectorListAndWatchInitConnBackoff(t *testing.T) {
ctx, cancel := context.WithCancelCause(ctx)
connFails := test.numConnFails
fakeClock := testingclock.NewFakeClock(time.Unix(0, 0))
bm := wait.NewExponentialBackoffManager(time.Millisecond, maxBackoff, 100*time.Millisecond, 2.0, 1.0, fakeClock)
backoff := wait.Backoff{
Duration: time.Millisecond,
Cap: maxBackoff,
Steps: 1000, // large number to not run out
Factor: 2.0,
Jitter: 1.0,
}
delayFn := backoff.DelayWithReset(fakeClock, 100*time.Millisecond)
done := make(chan struct{})
defer close(done)
go func() {
@@ -784,7 +791,7 @@ func TestReflectorListAndWatchInitConnBackoff(t *testing.T) {
name: "test-reflector",
listerWatcher: lw,
store: NewFIFO(MetaNamespaceKeyFunc),
backoffManager: bm,
delayHandler: delayFn,
clock: fakeClock,
watchErrorHandler: WatchErrorHandlerWithContext(DefaultWatchErrorHandler),
}
@@ -804,28 +811,103 @@ func TestReflectorListAndWatchInitConnBackoff(t *testing.T) {
}
}
type fakeBackoff struct {
clock clock.Clock
func TestNewReflectorWithCustomBackoff(t *testing.T) {
testCases := []struct {
name string
backoff *wait.Backoff
numConnFails int
expLowerBound time.Duration
expUpperBound time.Duration
}{
{
// Default backoff uses jitter so timing is non-deterministic
// Just verify it completes without error
name: "default backoff",
backoff: nil,
numConnFails: 2,
expLowerBound: 0,
expUpperBound: 10 * time.Second,
},
{
// Custom backoff: 10ms initial, 2x factor, no jitter, 100ms cap
// After 5 failures: 10 + 20 + 40 + 80 + 100 = 250ms
name: "custom backoff",
backoff: &wait.Backoff{
Duration: 10 * time.Millisecond,
Factor: 2.0,
Jitter: 0,
Steps: 100,
Cap: 100 * time.Millisecond,
},
numConnFails: 5,
expLowerBound: 250 * time.Millisecond,
expUpperBound: 300 * time.Millisecond,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancelCause(ctx)
connFails := tc.numConnFails
lw := &ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "1"}}, nil
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
if connFails > 0 {
connFails--
return nil, syscall.ECONNREFUSED
}
cancel(errors.New("done"))
return watch.NewFake(), nil
},
}
opts := ReflectorOptions{
Backoff: tc.backoff,
}
r := NewReflectorWithOptions(lw, &v1.Pod{}, NewStore(MetaNamespaceKeyFunc), opts)
start := time.Now()
err := r.ListAndWatchWithContext(ctx)
elapsed := time.Since(start)
if err != nil {
t.Errorf("unexpected error %v", err)
}
if elapsed < tc.expLowerBound {
t.Errorf("expected lower bound %v, got %v", tc.expLowerBound, elapsed)
}
if elapsed > tc.expUpperBound {
t.Errorf("expected upper bound %v, got %v", tc.expUpperBound, elapsed)
}
})
}
}
type fakeDelayFunc struct {
calls int
}
func (f *fakeBackoff) Backoff() clock.Timer {
func (f *fakeDelayFunc) delayFunc() time.Duration {
f.calls++
return f.clock.NewTimer(time.Duration(0))
return 0
}
func TestBackoffOnTooManyRequests(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
err := apierrors.NewTooManyRequests("too many requests", 1)
clock := &clock.RealClock{}
bm := &fakeBackoff{clock: clock}
fd := &fakeDelayFunc{}
lw := &ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "1"}}, nil
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
switch bm.calls {
switch fd.calls {
case 0:
return nil, err
case 1:
@@ -845,7 +927,7 @@ func TestBackoffOnTooManyRequests(t *testing.T) {
name: "test-reflector",
listerWatcher: lw,
store: NewFIFO(MetaNamespaceKeyFunc),
backoffManager: bm,
delayHandler: fd.delayFunc,
clock: clock,
watchErrorHandler: WatchErrorHandlerWithContext(DefaultWatchErrorHandler),
}
@@ -855,15 +937,15 @@ func TestBackoffOnTooManyRequests(t *testing.T) {
t.Fatal(err)
}
close(stopCh)
if bm.calls != 2 {
t.Errorf("unexpected watch backoff calls: %d", bm.calls)
if fd.calls != 2 {
t.Errorf("unexpected watch backoff calls: %d", fd.calls)
}
}
func TestNoRelistOnTooManyRequests(t *testing.T) {
err := apierrors.NewTooManyRequests("too many requests", 1)
clock := &clock.RealClock{}
bm := &fakeBackoff{clock: clock}
fd := &fakeDelayFunc{}
listCalls, watchCalls := 0, 0
lw := &ListWatch{
@@ -886,7 +968,7 @@ func TestNoRelistOnTooManyRequests(t *testing.T) {
name: "test-reflector",
listerWatcher: lw,
store: NewFIFO(MetaNamespaceKeyFunc),
backoffManager: bm,
delayHandler: fd.delayFunc,
clock: clock,
watchErrorHandler: WatchErrorHandlerWithContext(DefaultWatchErrorHandler),
}
@@ -933,7 +1015,7 @@ func TestRetryInternalError(t *testing.T) {
for _, tc := range testCases {
err := apierrors.NewInternalError(fmt.Errorf("etcdserver: no leader"))
fakeClock := testingclock.NewFakeClock(time.Now())
bm := &fakeBackoff{clock: fakeClock}
fd := &fakeDelayFunc{}
counter := 0
@@ -961,7 +1043,7 @@ func TestRetryInternalError(t *testing.T) {
name: "test-reflector",
listerWatcher: lw,
store: NewFIFO(MetaNamespaceKeyFunc),
backoffManager: bm,
delayHandler: fd.delayFunc,
clock: fakeClock,
watchErrorHandler: WatchErrorHandlerWithContext(DefaultWatchErrorHandler),
}