mirror of
https://github.com/kubernetes/client-go.git
synced 2025-07-04 02:37:39 +00:00
Add exp backoff for connection refused errors
Currently when ListAndWatch() receives a connection refused error, it is assumed to be due to the apiserver being transiently unresponsive. In situations where a controller is running outside the k8s cluster it's controlling, it is more common for the controller to lose connection permanently to the apiserver and needs to exponentially backoff its retry rather than continously spamming logs with Watch attempts that will never succeed. Kubernetes-commit: 1ff789f2bb9bf7fbb3df35977bc249c0dd019d31
This commit is contained in:
parent
bb860d7b6e
commit
4d01b584c3
15
tools/cache/reflector.go
vendored
15
tools/cache/reflector.go
vendored
@ -69,6 +69,8 @@ type Reflector struct {
|
|||||||
|
|
||||||
// backoff manages backoff of ListWatch
|
// backoff manages backoff of ListWatch
|
||||||
backoffManager wait.BackoffManager
|
backoffManager wait.BackoffManager
|
||||||
|
// initConnBackoffManager manages backoff the initial connection with the Watch calll of ListAndWatch.
|
||||||
|
initConnBackoffManager wait.BackoffManager
|
||||||
|
|
||||||
resyncPeriod time.Duration
|
resyncPeriod time.Duration
|
||||||
// ShouldResync is invoked periodically and whenever it returns `true` the Store's Resync operation is invoked
|
// ShouldResync is invoked periodically and whenever it returns `true` the Store's Resync operation is invoked
|
||||||
@ -166,10 +168,11 @@ func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{},
|
|||||||
// We used to make the call every 1sec (1 QPS), the goal here is to achieve ~98% traffic reduction when
|
// We used to make the call every 1sec (1 QPS), the goal here is to achieve ~98% traffic reduction when
|
||||||
// API server is not healthy. With these parameters, backoff will stop at [30,60) sec interval which is
|
// API server is not healthy. With these parameters, backoff will stop at [30,60) sec interval which is
|
||||||
// 0.22 QPS. If we don't backoff for 2min, assume API server is healthy and we reset the backoff.
|
// 0.22 QPS. If we don't backoff for 2min, assume API server is healthy and we reset the backoff.
|
||||||
backoffManager: wait.NewExponentialBackoffManager(800*time.Millisecond, 30*time.Second, 2*time.Minute, 2.0, 1.0, realClock),
|
backoffManager: wait.NewExponentialBackoffManager(800*time.Millisecond, 30*time.Second, 2*time.Minute, 2.0, 1.0, realClock),
|
||||||
resyncPeriod: resyncPeriod,
|
initConnBackoffManager: wait.NewExponentialBackoffManager(800*time.Millisecond, 30*time.Second, 2*time.Minute, 2.0, 1.0, realClock),
|
||||||
clock: realClock,
|
resyncPeriod: resyncPeriod,
|
||||||
watchErrorHandler: WatchErrorHandler(DefaultWatchErrorHandler),
|
clock: realClock,
|
||||||
|
watchErrorHandler: WatchErrorHandler(DefaultWatchErrorHandler),
|
||||||
}
|
}
|
||||||
r.setExpectedType(expectedType)
|
r.setExpectedType(expectedType)
|
||||||
return r
|
return r
|
||||||
@ -404,9 +407,9 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
|
|||||||
// If this is "connection refused" error, it means that most likely apiserver is not responsive.
|
// If this is "connection refused" error, it means that most likely apiserver is not responsive.
|
||||||
// It doesn't make sense to re-list all objects because most likely we will be able to restart
|
// It doesn't make sense to re-list all objects because most likely we will be able to restart
|
||||||
// watch where we ended.
|
// watch where we ended.
|
||||||
// If that's the case wait and resend watch request.
|
// If that's the case begin exponentially backing off and resend watch request.
|
||||||
if utilnet.IsConnectionRefused(err) {
|
if utilnet.IsConnectionRefused(err) {
|
||||||
time.Sleep(time.Second)
|
<-r.initConnBackoffManager.Backoff().C()
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
|
80
tools/cache/reflector_test.go
vendored
80
tools/cache/reflector_test.go
vendored
@ -22,15 +22,17 @@ import (
|
|||||||
"math/rand"
|
"math/rand"
|
||||||
"reflect"
|
"reflect"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
"syscall"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"k8s.io/api/core/v1"
|
v1 "k8s.io/api/core/v1"
|
||||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||||
"k8s.io/apimachinery/pkg/runtime"
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||||
|
"k8s.io/apimachinery/pkg/util/clock"
|
||||||
"k8s.io/apimachinery/pkg/util/wait"
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
"k8s.io/apimachinery/pkg/watch"
|
"k8s.io/apimachinery/pkg/watch"
|
||||||
)
|
)
|
||||||
@ -358,6 +360,82 @@ func TestReflectorListAndWatchWithErrors(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestReflectorListAndWatchInitConnBackoff(t *testing.T) {
|
||||||
|
maxBackoff := 50 * time.Millisecond
|
||||||
|
table := []struct {
|
||||||
|
numConnFails int
|
||||||
|
expLowerBound time.Duration
|
||||||
|
expUpperBound time.Duration
|
||||||
|
}{
|
||||||
|
{5, 32 * time.Millisecond, 64 * time.Millisecond}, // case where maxBackoff is not hit, time should grow exponentially
|
||||||
|
{40, 35 * 2 * maxBackoff, 40 * 2 * maxBackoff}, // case where maxBoff is hit, backoff time should flatten
|
||||||
|
|
||||||
|
}
|
||||||
|
for _, test := range table {
|
||||||
|
t.Run(fmt.Sprintf("%d connection failures takes at least %d ms", test.numConnFails, 1<<test.numConnFails),
|
||||||
|
func(t *testing.T) {
|
||||||
|
stopCh := make(chan struct{})
|
||||||
|
connFails := test.numConnFails
|
||||||
|
fakeClock := clock.NewFakeClock(time.Unix(0, 0))
|
||||||
|
bm := wait.NewExponentialBackoffManager(time.Millisecond, maxBackoff, 100*time.Millisecond, 2.0, 1.0, fakeClock)
|
||||||
|
done := make(chan struct{})
|
||||||
|
defer close(done)
|
||||||
|
go func() {
|
||||||
|
i := 0
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-done:
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
if fakeClock.HasWaiters() {
|
||||||
|
step := (1 << (i + 1)) * time.Millisecond
|
||||||
|
if step > maxBackoff*2 {
|
||||||
|
step = maxBackoff * 2
|
||||||
|
}
|
||||||
|
fakeClock.Step(step)
|
||||||
|
i++
|
||||||
|
}
|
||||||
|
time.Sleep(100 * time.Microsecond)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
lw := &testLW{
|
||||||
|
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
|
||||||
|
if connFails > 0 {
|
||||||
|
connFails--
|
||||||
|
return nil, syscall.ECONNREFUSED
|
||||||
|
}
|
||||||
|
close(stopCh)
|
||||||
|
return watch.NewFake(), nil
|
||||||
|
},
|
||||||
|
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
|
||||||
|
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "1"}}, nil
|
||||||
|
},
|
||||||
|
}
|
||||||
|
r := &Reflector{
|
||||||
|
name: "test-reflector",
|
||||||
|
listerWatcher: lw,
|
||||||
|
store: NewFIFO(MetaNamespaceKeyFunc),
|
||||||
|
initConnBackoffManager: bm,
|
||||||
|
clock: fakeClock,
|
||||||
|
watchErrorHandler: WatchErrorHandler(DefaultWatchErrorHandler),
|
||||||
|
}
|
||||||
|
start := fakeClock.Now()
|
||||||
|
err := r.ListAndWatch(stopCh)
|
||||||
|
elapsed := fakeClock.Since(start)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("unexpected error %v", err)
|
||||||
|
}
|
||||||
|
if elapsed < (test.expLowerBound) {
|
||||||
|
t.Errorf("expected lower bound of ListAndWatch: %v, got %v", test.expLowerBound, elapsed)
|
||||||
|
}
|
||||||
|
if elapsed > (test.expUpperBound) {
|
||||||
|
t.Errorf("expected upper bound of ListAndWatch: %v, got %v", test.expUpperBound, elapsed)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestReflectorResync(t *testing.T) {
|
func TestReflectorResync(t *testing.T) {
|
||||||
iteration := 0
|
iteration := 0
|
||||||
stopCh := make(chan struct{})
|
stopCh := make(chan struct{})
|
||||||
|
Loading…
Reference in New Issue
Block a user