Merge pull request #118132 from marseel/improve_reflector_retries

Improve backoff policy in reflector.

Kubernetes-commit: 7f2a1e8cd3363b04d970c619daf9f9d5ee5d50c0
This commit is contained in:
Kubernetes Publisher 2023-05-29 08:23:44 -07:00
commit f21df6e02d
4 changed files with 30 additions and 33 deletions

4
go.mod
View File

@ -23,7 +23,7 @@ require (
golang.org/x/term v0.7.0 golang.org/x/term v0.7.0
golang.org/x/time v0.3.0 golang.org/x/time v0.3.0
google.golang.org/protobuf v1.30.0 google.golang.org/protobuf v1.30.0
k8s.io/api v0.0.0-20230515170019-2f9553831ec2 k8s.io/api v0.0.0-20230527221710-63505c4e796f
k8s.io/apimachinery v0.0.0-20230526141509-403fb36f6a53 k8s.io/apimachinery v0.0.0-20230526141509-403fb36f6a53
k8s.io/klog/v2 v2.100.1 k8s.io/klog/v2 v2.100.1
k8s.io/kube-openapi v0.0.0-20230524182850-78281498afbb k8s.io/kube-openapi v0.0.0-20230524182850-78281498afbb
@ -60,6 +60,6 @@ require (
) )
replace ( replace (
k8s.io/api => k8s.io/api v0.0.0-20230515170019-2f9553831ec2 k8s.io/api => k8s.io/api v0.0.0-20230527221710-63505c4e796f
k8s.io/apimachinery => k8s.io/apimachinery v0.0.0-20230526141509-403fb36f6a53 k8s.io/apimachinery => k8s.io/apimachinery v0.0.0-20230526141509-403fb36f6a53
) )

4
go.sum
View File

@ -210,8 +210,8 @@ gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
k8s.io/api v0.0.0-20230515170019-2f9553831ec2 h1:ZHyoWIXElhnLI03GkOsebYT1uqYowGjuVCbxBqcjjGw= k8s.io/api v0.0.0-20230527221710-63505c4e796f h1:5n05Z8OLrrVvkQF6z0OgUK3yGzRm7GY1bUwhUBp3g5g=
k8s.io/api v0.0.0-20230515170019-2f9553831ec2/go.mod h1:qZNz3vKcyZb5wFaBlHqUjHk7D5XhIaXu/d/df6PjNKk= k8s.io/api v0.0.0-20230527221710-63505c4e796f/go.mod h1:iclI1239YG5Z5YyOH/tKJi2vGhQ0k0dl+1xSvFaR6Dk=
k8s.io/apimachinery v0.0.0-20230526141509-403fb36f6a53 h1:8/61R28R74ZzZ1WiUVu4zqCtyQSdVCBa4qCNjPLjOsU= k8s.io/apimachinery v0.0.0-20230526141509-403fb36f6a53 h1:8/61R28R74ZzZ1WiUVu4zqCtyQSdVCBa4qCNjPLjOsU=
k8s.io/apimachinery v0.0.0-20230526141509-403fb36f6a53/go.mod h1:qCTDst7QeP2n3JDxbpuJSTTaAxalvFKMTN0Lga1+0zU= k8s.io/apimachinery v0.0.0-20230526141509-403fb36f6a53/go.mod h1:qCTDst7QeP2n3JDxbpuJSTTaAxalvFKMTN0Lga1+0zU=
k8s.io/klog/v2 v2.100.1 h1:7WCHKK6K8fNhTqfBhISHQ97KrnJNFZMcQvKp7gP/tmg= k8s.io/klog/v2 v2.100.1 h1:7WCHKK6K8fNhTqfBhISHQ97KrnJNFZMcQvKp7gP/tmg=

View File

@ -70,8 +70,6 @@ type Reflector struct {
listerWatcher ListerWatcher listerWatcher ListerWatcher
// backoff manages backoff of ListWatch // backoff manages backoff of ListWatch
backoffManager wait.BackoffManager backoffManager wait.BackoffManager
// initConnBackoffManager manages backoff the initial connection with the Watch call of ListAndWatch.
initConnBackoffManager wait.BackoffManager
resyncPeriod time.Duration resyncPeriod time.Duration
// clock allows tests to manipulate time // clock allows tests to manipulate time
clock clock.Clock clock clock.Clock
@ -222,7 +220,6 @@ func NewReflectorWithOptions(lw ListerWatcher, expectedType interface{}, store S
// API server is not healthy. With these parameters, backoff will stop at [30,60) sec interval which is // API server is not healthy. With these parameters, backoff will stop at [30,60) sec interval which is
// 0.22 QPS. If we don't backoff for 2min, assume API server is healthy and we reset the backoff. // 0.22 QPS. If we don't backoff for 2min, assume API server is healthy and we reset the backoff.
backoffManager: wait.NewExponentialBackoffManager(800*time.Millisecond, 30*time.Second, 2*time.Minute, 2.0, 1.0, reflectorClock), backoffManager: wait.NewExponentialBackoffManager(800*time.Millisecond, 30*time.Second, 2*time.Minute, 2.0, 1.0, reflectorClock),
initConnBackoffManager: wait.NewExponentialBackoffManager(800*time.Millisecond, 30*time.Second, 2*time.Minute, 2.0, 1.0, reflectorClock),
clock: reflectorClock, clock: reflectorClock,
watchErrorHandler: WatchErrorHandler(DefaultWatchErrorHandler), watchErrorHandler: WatchErrorHandler(DefaultWatchErrorHandler),
expectedType: reflect.TypeOf(expectedType), expectedType: reflect.TypeOf(expectedType),
@ -425,7 +422,7 @@ func (r *Reflector) watch(w watch.Interface, stopCh <-chan struct{}, resyncerrc
select { select {
case <-stopCh: case <-stopCh:
return nil return nil
case <-r.initConnBackoffManager.Backoff().C(): case <-r.backoffManager.Backoff().C():
continue continue
} }
} }
@ -451,7 +448,7 @@ func (r *Reflector) watch(w watch.Interface, stopCh <-chan struct{}, resyncerrc
select { select {
case <-stopCh: case <-stopCh:
return nil return nil
case <-r.initConnBackoffManager.Backoff().C(): case <-r.backoffManager.Backoff().C():
continue continue
} }
case apierrors.IsInternalError(err) && retry.ShouldRetry(): case apierrors.IsInternalError(err) && retry.ShouldRetry():
@ -604,7 +601,7 @@ func (r *Reflector) watchList(stopCh <-chan struct{}) (watch.Interface, error) {
isErrorRetriableWithSideEffectsFn := func(err error) bool { isErrorRetriableWithSideEffectsFn := func(err error) bool {
if canRetry := isWatchErrorRetriable(err); canRetry { if canRetry := isWatchErrorRetriable(err); canRetry {
klog.V(2).Infof("%s: watch-list of %v returned %v - backing off", r.name, r.typeDescription, err) klog.V(2).Infof("%s: watch-list of %v returned %v - backing off", r.name, r.typeDescription, err)
<-r.initConnBackoffManager.Backoff().C() <-r.backoffManager.Backoff().C()
return true return true
} }
if isExpiredError(err) || isTooLargeResourceVersionError(err) { if isExpiredError(err) || isTooLargeResourceVersionError(err) {

View File

@ -414,7 +414,7 @@ func TestReflectorListAndWatchInitConnBackoff(t *testing.T) {
name: "test-reflector", name: "test-reflector",
listerWatcher: lw, listerWatcher: lw,
store: NewFIFO(MetaNamespaceKeyFunc), store: NewFIFO(MetaNamespaceKeyFunc),
initConnBackoffManager: bm, backoffManager: bm,
clock: fakeClock, clock: fakeClock,
watchErrorHandler: WatchErrorHandler(DefaultWatchErrorHandler), watchErrorHandler: WatchErrorHandler(DefaultWatchErrorHandler),
} }
@ -474,7 +474,7 @@ func TestBackoffOnTooManyRequests(t *testing.T) {
name: "test-reflector", name: "test-reflector",
listerWatcher: lw, listerWatcher: lw,
store: NewFIFO(MetaNamespaceKeyFunc), store: NewFIFO(MetaNamespaceKeyFunc),
initConnBackoffManager: bm, backoffManager: bm,
clock: clock, clock: clock,
watchErrorHandler: WatchErrorHandler(DefaultWatchErrorHandler), watchErrorHandler: WatchErrorHandler(DefaultWatchErrorHandler),
} }
@ -543,7 +543,7 @@ func TestRetryInternalError(t *testing.T) {
name: "test-reflector", name: "test-reflector",
listerWatcher: lw, listerWatcher: lw,
store: NewFIFO(MetaNamespaceKeyFunc), store: NewFIFO(MetaNamespaceKeyFunc),
initConnBackoffManager: bm, backoffManager: bm,
clock: fakeClock, clock: fakeClock,
watchErrorHandler: WatchErrorHandler(DefaultWatchErrorHandler), watchErrorHandler: WatchErrorHandler(DefaultWatchErrorHandler),
} }