mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-29 14:37:00 +00:00
Merge pull request #94235 from kevindelgado/draft/connection-refused-backoff
Add exponential backoff for connection refused errors
This commit is contained in:
commit
e688a0692c
@ -69,6 +69,8 @@ type Reflector struct {
|
|||||||
|
|
||||||
// backoff manages backoff of ListWatch
|
// backoff manages backoff of ListWatch
|
||||||
backoffManager wait.BackoffManager
|
backoffManager wait.BackoffManager
|
||||||
|
// initConnBackoffManager manages backoff the initial connection with the Watch calll of ListAndWatch.
|
||||||
|
initConnBackoffManager wait.BackoffManager
|
||||||
|
|
||||||
resyncPeriod time.Duration
|
resyncPeriod time.Duration
|
||||||
// ShouldResync is invoked periodically and whenever it returns `true` the Store's Resync operation is invoked
|
// ShouldResync is invoked periodically and whenever it returns `true` the Store's Resync operation is invoked
|
||||||
@ -166,10 +168,11 @@ func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{},
|
|||||||
// We used to make the call every 1sec (1 QPS), the goal here is to achieve ~98% traffic reduction when
|
// We used to make the call every 1sec (1 QPS), the goal here is to achieve ~98% traffic reduction when
|
||||||
// API server is not healthy. With these parameters, backoff will stop at [30,60) sec interval which is
|
// API server is not healthy. With these parameters, backoff will stop at [30,60) sec interval which is
|
||||||
// 0.22 QPS. If we don't backoff for 2min, assume API server is healthy and we reset the backoff.
|
// 0.22 QPS. If we don't backoff for 2min, assume API server is healthy and we reset the backoff.
|
||||||
backoffManager: wait.NewExponentialBackoffManager(800*time.Millisecond, 30*time.Second, 2*time.Minute, 2.0, 1.0, realClock),
|
backoffManager: wait.NewExponentialBackoffManager(800*time.Millisecond, 30*time.Second, 2*time.Minute, 2.0, 1.0, realClock),
|
||||||
resyncPeriod: resyncPeriod,
|
initConnBackoffManager: wait.NewExponentialBackoffManager(800*time.Millisecond, 30*time.Second, 2*time.Minute, 2.0, 1.0, realClock),
|
||||||
clock: realClock,
|
resyncPeriod: resyncPeriod,
|
||||||
watchErrorHandler: WatchErrorHandler(DefaultWatchErrorHandler),
|
clock: realClock,
|
||||||
|
watchErrorHandler: WatchErrorHandler(DefaultWatchErrorHandler),
|
||||||
}
|
}
|
||||||
r.setExpectedType(expectedType)
|
r.setExpectedType(expectedType)
|
||||||
return r
|
return r
|
||||||
@ -404,9 +407,9 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
|
|||||||
// If this is "connection refused" error, it means that most likely apiserver is not responsive.
|
// If this is "connection refused" error, it means that most likely apiserver is not responsive.
|
||||||
// It doesn't make sense to re-list all objects because most likely we will be able to restart
|
// It doesn't make sense to re-list all objects because most likely we will be able to restart
|
||||||
// watch where we ended.
|
// watch where we ended.
|
||||||
// If that's the case wait and resend watch request.
|
// If that's the case begin exponentially backing off and resend watch request.
|
||||||
if utilnet.IsConnectionRefused(err) {
|
if utilnet.IsConnectionRefused(err) {
|
||||||
time.Sleep(time.Second)
|
<-r.initConnBackoffManager.Backoff().C()
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
|
@ -22,15 +22,17 @@ import (
|
|||||||
"math/rand"
|
"math/rand"
|
||||||
"reflect"
|
"reflect"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
"syscall"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"k8s.io/api/core/v1"
|
v1 "k8s.io/api/core/v1"
|
||||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||||
"k8s.io/apimachinery/pkg/runtime"
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||||
|
"k8s.io/apimachinery/pkg/util/clock"
|
||||||
"k8s.io/apimachinery/pkg/util/wait"
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
"k8s.io/apimachinery/pkg/watch"
|
"k8s.io/apimachinery/pkg/watch"
|
||||||
)
|
)
|
||||||
@ -358,6 +360,82 @@ func TestReflectorListAndWatchWithErrors(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestReflectorListAndWatchInitConnBackoff(t *testing.T) {
|
||||||
|
maxBackoff := 50 * time.Millisecond
|
||||||
|
table := []struct {
|
||||||
|
numConnFails int
|
||||||
|
expLowerBound time.Duration
|
||||||
|
expUpperBound time.Duration
|
||||||
|
}{
|
||||||
|
{5, 32 * time.Millisecond, 64 * time.Millisecond}, // case where maxBackoff is not hit, time should grow exponentially
|
||||||
|
{40, 35 * 2 * maxBackoff, 40 * 2 * maxBackoff}, // case where maxBoff is hit, backoff time should flatten
|
||||||
|
|
||||||
|
}
|
||||||
|
for _, test := range table {
|
||||||
|
t.Run(fmt.Sprintf("%d connection failures takes at least %d ms", test.numConnFails, 1<<test.numConnFails),
|
||||||
|
func(t *testing.T) {
|
||||||
|
stopCh := make(chan struct{})
|
||||||
|
connFails := test.numConnFails
|
||||||
|
fakeClock := clock.NewFakeClock(time.Unix(0, 0))
|
||||||
|
bm := wait.NewExponentialBackoffManager(time.Millisecond, maxBackoff, 100*time.Millisecond, 2.0, 1.0, fakeClock)
|
||||||
|
done := make(chan struct{})
|
||||||
|
defer close(done)
|
||||||
|
go func() {
|
||||||
|
i := 0
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-done:
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
if fakeClock.HasWaiters() {
|
||||||
|
step := (1 << (i + 1)) * time.Millisecond
|
||||||
|
if step > maxBackoff*2 {
|
||||||
|
step = maxBackoff * 2
|
||||||
|
}
|
||||||
|
fakeClock.Step(step)
|
||||||
|
i++
|
||||||
|
}
|
||||||
|
time.Sleep(100 * time.Microsecond)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
lw := &testLW{
|
||||||
|
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
|
||||||
|
if connFails > 0 {
|
||||||
|
connFails--
|
||||||
|
return nil, syscall.ECONNREFUSED
|
||||||
|
}
|
||||||
|
close(stopCh)
|
||||||
|
return watch.NewFake(), nil
|
||||||
|
},
|
||||||
|
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
|
||||||
|
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "1"}}, nil
|
||||||
|
},
|
||||||
|
}
|
||||||
|
r := &Reflector{
|
||||||
|
name: "test-reflector",
|
||||||
|
listerWatcher: lw,
|
||||||
|
store: NewFIFO(MetaNamespaceKeyFunc),
|
||||||
|
initConnBackoffManager: bm,
|
||||||
|
clock: fakeClock,
|
||||||
|
watchErrorHandler: WatchErrorHandler(DefaultWatchErrorHandler),
|
||||||
|
}
|
||||||
|
start := fakeClock.Now()
|
||||||
|
err := r.ListAndWatch(stopCh)
|
||||||
|
elapsed := fakeClock.Since(start)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("unexpected error %v", err)
|
||||||
|
}
|
||||||
|
if elapsed < (test.expLowerBound) {
|
||||||
|
t.Errorf("expected lower bound of ListAndWatch: %v, got %v", test.expLowerBound, elapsed)
|
||||||
|
}
|
||||||
|
if elapsed > (test.expUpperBound) {
|
||||||
|
t.Errorf("expected upper bound of ListAndWatch: %v, got %v", test.expUpperBound, elapsed)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestReflectorResync(t *testing.T) {
|
func TestReflectorResync(t *testing.T) {
|
||||||
iteration := 0
|
iteration := 0
|
||||||
stopCh := make(chan struct{})
|
stopCh := make(chan struct{})
|
||||||
|
Loading…
Reference in New Issue
Block a user