mirror of
https://github.com/kubernetes/client-go.git
synced 2025-07-19 09:38:39 +00:00
Merge pull request #110100 from tkashem/client-go-backoff-fix
client-go: fix backoff delay Kubernetes-commit: 9997897f44976f8969fcc79678cddc291987fab4
This commit is contained in:
commit
33115b49ac
4
go.mod
4
go.mod
@ -25,7 +25,7 @@ require (
|
|||||||
golang.org/x/time v0.0.0-20220210224613-90d013bbcef8
|
golang.org/x/time v0.0.0-20220210224613-90d013bbcef8
|
||||||
google.golang.org/protobuf v1.27.1
|
google.golang.org/protobuf v1.27.1
|
||||||
k8s.io/api v0.0.0-20220512153301-be84346886a4
|
k8s.io/api v0.0.0-20220512153301-be84346886a4
|
||||||
k8s.io/apimachinery v0.0.0-20220517160406-e11374f12506
|
k8s.io/apimachinery v0.0.0-20220518000438-e57249028810
|
||||||
k8s.io/klog/v2 v2.60.1
|
k8s.io/klog/v2 v2.60.1
|
||||||
k8s.io/kube-openapi v0.0.0-20220328201542-3ee0da9b0b42
|
k8s.io/kube-openapi v0.0.0-20220328201542-3ee0da9b0b42
|
||||||
k8s.io/utils v0.0.0-20220210201930-3a6ce19ff2f9
|
k8s.io/utils v0.0.0-20220210201930-3a6ce19ff2f9
|
||||||
@ -64,5 +64,5 @@ require (
|
|||||||
|
|
||||||
replace (
|
replace (
|
||||||
k8s.io/api => k8s.io/api v0.0.0-20220512153301-be84346886a4
|
k8s.io/api => k8s.io/api v0.0.0-20220512153301-be84346886a4
|
||||||
k8s.io/apimachinery => k8s.io/apimachinery v0.0.0-20220517160406-e11374f12506
|
k8s.io/apimachinery => k8s.io/apimachinery v0.0.0-20220518000438-e57249028810
|
||||||
)
|
)
|
||||||
|
4
go.sum
4
go.sum
@ -513,8 +513,8 @@ honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9
|
|||||||
honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
|
honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
|
||||||
k8s.io/api v0.0.0-20220512153301-be84346886a4 h1:ATL01GmGTBjwQjXxfD0UGj3une2C8WyzTOXXzNFI790=
|
k8s.io/api v0.0.0-20220512153301-be84346886a4 h1:ATL01GmGTBjwQjXxfD0UGj3une2C8WyzTOXXzNFI790=
|
||||||
k8s.io/api v0.0.0-20220512153301-be84346886a4/go.mod h1:swy7cXqCDjYcBv6ylU8ErrJ2ALa+9PSVEdM4zw65jFQ=
|
k8s.io/api v0.0.0-20220512153301-be84346886a4/go.mod h1:swy7cXqCDjYcBv6ylU8ErrJ2ALa+9PSVEdM4zw65jFQ=
|
||||||
k8s.io/apimachinery v0.0.0-20220517160406-e11374f12506 h1:A+vVaBjqEcFqPLVX9RBO98INUqJCFLlI4XDnH+dDclY=
|
k8s.io/apimachinery v0.0.0-20220518000438-e57249028810 h1:AaArI0wyQ6/2bFlQ0mKgDTYNi8ibRDMBR29PHD6ZesI=
|
||||||
k8s.io/apimachinery v0.0.0-20220517160406-e11374f12506/go.mod h1:1oBVxgNUfLl978lJAlywA+H45m2ctSuqJU2stpbcjT4=
|
k8s.io/apimachinery v0.0.0-20220518000438-e57249028810/go.mod h1:1oBVxgNUfLl978lJAlywA+H45m2ctSuqJU2stpbcjT4=
|
||||||
k8s.io/gengo v0.0.0-20210813121822-485abfe95c7c/go.mod h1:FiNAH4ZV3gBg2Kwh89tzAEV2be7d5xI0vBa/VySYy3E=
|
k8s.io/gengo v0.0.0-20210813121822-485abfe95c7c/go.mod h1:FiNAH4ZV3gBg2Kwh89tzAEV2be7d5xI0vBa/VySYy3E=
|
||||||
k8s.io/klog/v2 v2.0.0/go.mod h1:PBfzABfn139FHAV07az/IF9Wp1bkk3vpT2XSJ76fSDE=
|
k8s.io/klog/v2 v2.0.0/go.mod h1:PBfzABfn139FHAV07az/IF9Wp1bkk3vpT2XSJ76fSDE=
|
||||||
k8s.io/klog/v2 v2.2.0/go.mod h1:Od+F08eJP+W3HUb4pSrPpgp9DGU4GzlpG/TmITuYh/Y=
|
k8s.io/klog/v2 v2.2.0/go.mod h1:Od+F08eJP+W3HUb4pSrPpgp9DGU4GzlpG/TmITuYh/Y=
|
||||||
|
@ -1392,7 +1392,9 @@ func TestCheckRetryClosesBody(t *testing.T) {
|
|||||||
defer testServer.Close()
|
defer testServer.Close()
|
||||||
|
|
||||||
backoff := &testBackoffManager{}
|
backoff := &testBackoffManager{}
|
||||||
expectedSleeps := []time.Duration{0, time.Second, 0, time.Second, 0, time.Second, 0, time.Second, 0}
|
|
||||||
|
// testBackoffManager.CalculateBackoff always returns 0
|
||||||
|
expectedSleeps := []time.Duration{0, time.Second, time.Second, time.Second, time.Second}
|
||||||
|
|
||||||
c := testRESTClient(t, testServer)
|
c := testRESTClient(t, testServer)
|
||||||
c.createBackoffMgr = func() BackoffManager { return backoff }
|
c.createBackoffMgr = func() BackoffManager { return backoff }
|
||||||
@ -1440,10 +1442,12 @@ func TestConnectionResetByPeerIsRetried(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("Unexpected error: %v", err)
|
t.Errorf("Unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
// We have a sleep before each retry (including the initial one) and for
|
if count != 3 {
|
||||||
// every "retry-after" call - thus 5 together.
|
t.Errorf("Expected 3 attempts, got: %d", count)
|
||||||
if len(backoff.sleeps) != 5 {
|
}
|
||||||
t.Errorf("Expected 5 retries, got: %d", len(backoff.sleeps))
|
// We have a sleep before each retry (including the initial one) thus 3 together.
|
||||||
|
if len(backoff.sleeps) != 3 {
|
||||||
|
t.Errorf("Expected 3 backoff.Sleep, got: %d", len(backoff.sleeps))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -2824,7 +2828,8 @@ type withRateLimiterBackoffManagerAndMetrics struct {
|
|||||||
flowcontrol.RateLimiter
|
flowcontrol.RateLimiter
|
||||||
*NoBackoff
|
*NoBackoff
|
||||||
metrics.ResultMetric
|
metrics.ResultMetric
|
||||||
backoffWaitSeconds int
|
calculateBackoffSeq int64
|
||||||
|
calculateBackoffFn func(i int64) time.Duration
|
||||||
|
|
||||||
invokeOrderGot []string
|
invokeOrderGot []string
|
||||||
sleepsGot []string
|
sleepsGot []string
|
||||||
@ -2839,9 +2844,8 @@ func (lb *withRateLimiterBackoffManagerAndMetrics) Wait(ctx context.Context) err
|
|||||||
func (lb *withRateLimiterBackoffManagerAndMetrics) CalculateBackoff(actualUrl *url.URL) time.Duration {
|
func (lb *withRateLimiterBackoffManagerAndMetrics) CalculateBackoff(actualUrl *url.URL) time.Duration {
|
||||||
lb.invokeOrderGot = append(lb.invokeOrderGot, "BackoffManager.CalculateBackoff")
|
lb.invokeOrderGot = append(lb.invokeOrderGot, "BackoffManager.CalculateBackoff")
|
||||||
|
|
||||||
// we simulate a sleep sequence of 0m, 2m, 4m, 6m, ...
|
waitFor := lb.calculateBackoffFn(lb.calculateBackoffSeq)
|
||||||
waitFor := time.Duration(lb.backoffWaitSeconds) * time.Minute
|
lb.calculateBackoffSeq++
|
||||||
lb.backoffWaitSeconds += 2
|
|
||||||
return waitFor
|
return waitFor
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -2868,14 +2872,16 @@ func (lb *withRateLimiterBackoffManagerAndMetrics) Do() {
|
|||||||
|
|
||||||
func testRetryWithRateLimiterBackoffAndMetrics(t *testing.T, key string, doFunc func(ctx context.Context, r *Request)) {
|
func testRetryWithRateLimiterBackoffAndMetrics(t *testing.T, key string, doFunc func(ctx context.Context, r *Request)) {
|
||||||
type expected struct {
|
type expected struct {
|
||||||
attempts int
|
attempts int
|
||||||
order []string
|
order []string
|
||||||
|
sleeps []string
|
||||||
|
statusCodes []string
|
||||||
}
|
}
|
||||||
|
|
||||||
// we define the expected order of how the client invokes the
|
// we define the expected order of how the client invokes the
|
||||||
// rate limiter, backoff, and metrics methods.
|
// rate limiter, backoff, and metrics methods.
|
||||||
// scenario:
|
// scenario:
|
||||||
// - A: original request fails with a retryable response: (500, 'Retry-After: 1')
|
// - A: original request fails with a retryable response: (500, 'Retry-After: N')
|
||||||
// - B: retry 1: successful with a status code 200
|
// - B: retry 1: successful with a status code 200
|
||||||
// so we have a total of 2 attempts
|
// so we have a total of 2 attempts
|
||||||
invokeOrderWant := []string{
|
invokeOrderWant := []string{
|
||||||
@ -2887,17 +2893,16 @@ func testRetryWithRateLimiterBackoffAndMetrics(t *testing.T, key string, doFunc
|
|||||||
"BackoffManager.Sleep",
|
"BackoffManager.Sleep",
|
||||||
|
|
||||||
// A: first attempt for which the server sends a retryable response
|
// A: first attempt for which the server sends a retryable response
|
||||||
|
// status code: 500, Retry-Afer: N
|
||||||
"Client.Do",
|
"Client.Do",
|
||||||
|
|
||||||
// we got a response object, status code: 500, Retry-Afer: 1
|
// we got a response object, status code: 500, Retry-Afer: N
|
||||||
// - call metrics method with appropriate status code
|
// - call metrics method with appropriate status code
|
||||||
// - update backoff parameters with the status code returned
|
// - update backoff parameters with the status code returned
|
||||||
// - sleep for N seconds from 'Retry-After: N' response header
|
|
||||||
"RequestResult.Increment",
|
"RequestResult.Increment",
|
||||||
"BackoffManager.UpdateBackoff",
|
"BackoffManager.UpdateBackoff",
|
||||||
"BackoffManager.Sleep",
|
|
||||||
// sleep for delay dictated by backoff parameters
|
|
||||||
"BackoffManager.CalculateBackoff",
|
"BackoffManager.CalculateBackoff",
|
||||||
|
// sleep for delay=max(BackoffManager.CalculateBackoff, Retry-After: N)
|
||||||
"BackoffManager.Sleep",
|
"BackoffManager.Sleep",
|
||||||
// wait as dictated by the client rate lmiter
|
// wait as dictated by the client rate lmiter
|
||||||
"RateLimiter.Wait",
|
"RateLimiter.Wait",
|
||||||
@ -2910,46 +2915,104 @@ func testRetryWithRateLimiterBackoffAndMetrics(t *testing.T, key string, doFunc
|
|||||||
"RequestResult.Increment",
|
"RequestResult.Increment",
|
||||||
"BackoffManager.UpdateBackoff",
|
"BackoffManager.UpdateBackoff",
|
||||||
}
|
}
|
||||||
sleepWant := []string{
|
|
||||||
// initial backoff.Sleep before we send the request to the server for the first time
|
|
||||||
"0s",
|
|
||||||
// from 'Retry-After: 1' response header (A)
|
|
||||||
(1 * time.Second).String(),
|
|
||||||
// backoff.Sleep before retry 1 (B)
|
|
||||||
(2 * time.Minute).String(),
|
|
||||||
}
|
|
||||||
statusCodesWant := []string{
|
statusCodesWant := []string{
|
||||||
"500",
|
"500",
|
||||||
"200",
|
"200",
|
||||||
}
|
}
|
||||||
|
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
name string
|
name string
|
||||||
maxRetries int
|
maxRetries int
|
||||||
serverReturns []responseErr
|
serverReturns []responseErr
|
||||||
|
calculateBackoffFn func(i int64) time.Duration
|
||||||
// expectations differ based on whether it is 'Watch', 'Stream' or 'Do'
|
// expectations differ based on whether it is 'Watch', 'Stream' or 'Do'
|
||||||
expectations map[string]expected
|
expectations map[string]expected
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
name: "success after one retry",
|
name: "success after one retry, Retry-After: N > BackoffManager.CalculateBackoff",
|
||||||
maxRetries: 1,
|
maxRetries: 1,
|
||||||
serverReturns: []responseErr{
|
serverReturns: []responseErr{
|
||||||
{response: retryAfterResponse(), err: nil},
|
{response: retryAfterResponseWithDelay("5"), err: nil},
|
||||||
{response: &http.Response{StatusCode: http.StatusOK}, err: nil},
|
{response: &http.Response{StatusCode: http.StatusOK}, err: nil},
|
||||||
},
|
},
|
||||||
|
// we simulate a sleep sequence of 0s, 1s, 2s, 3s, ...
|
||||||
|
calculateBackoffFn: func(i int64) time.Duration { return time.Duration(i * int64(time.Second)) },
|
||||||
expectations: map[string]expected{
|
expectations: map[string]expected{
|
||||||
"Do": {
|
"Do": {
|
||||||
attempts: 2,
|
attempts: 2,
|
||||||
order: invokeOrderWant,
|
order: invokeOrderWant,
|
||||||
|
statusCodes: statusCodesWant,
|
||||||
|
sleeps: []string{
|
||||||
|
// initial backoff.Sleep before we send the request to the server for the first time
|
||||||
|
"0s",
|
||||||
|
// maximum of:
|
||||||
|
// - 'Retry-After: 5' response header from (A)
|
||||||
|
// - BackoffManager.CalculateBackoff (will return 1s)
|
||||||
|
(5 * time.Second).String(),
|
||||||
|
},
|
||||||
},
|
},
|
||||||
"Watch": {
|
"Watch": {
|
||||||
attempts: 2,
|
attempts: 2,
|
||||||
// Watch does not do 'RateLimiter.Wait' before initially sending the request to the server
|
// Watch does not do 'RateLimiter.Wait' before initially sending the request to the server
|
||||||
order: invokeOrderWant[1:],
|
order: invokeOrderWant[1:],
|
||||||
|
statusCodes: statusCodesWant,
|
||||||
|
sleeps: []string{
|
||||||
|
"0s",
|
||||||
|
(5 * time.Second).String(),
|
||||||
|
},
|
||||||
},
|
},
|
||||||
"Stream": {
|
"Stream": {
|
||||||
|
attempts: 2,
|
||||||
|
order: invokeOrderWant,
|
||||||
|
statusCodes: statusCodesWant,
|
||||||
|
sleeps: []string{
|
||||||
|
"0s",
|
||||||
|
(5 * time.Second).String(),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "success after one retry, Retry-After: N < BackoffManager.CalculateBackoff",
|
||||||
|
maxRetries: 1,
|
||||||
|
serverReturns: []responseErr{
|
||||||
|
{response: retryAfterResponseWithDelay("2"), err: nil},
|
||||||
|
{response: &http.Response{StatusCode: http.StatusOK}, err: nil},
|
||||||
|
},
|
||||||
|
// we simulate a sleep sequence of 0s, 4s, 8s, 16s, ...
|
||||||
|
calculateBackoffFn: func(i int64) time.Duration { return time.Duration(i * int64(4*time.Second)) },
|
||||||
|
expectations: map[string]expected{
|
||||||
|
"Do": {
|
||||||
|
attempts: 2,
|
||||||
|
order: invokeOrderWant,
|
||||||
|
statusCodes: statusCodesWant,
|
||||||
|
sleeps: []string{
|
||||||
|
// initial backoff.Sleep before we send the request to the server for the first time
|
||||||
|
"0s",
|
||||||
|
// maximum of:
|
||||||
|
// - 'Retry-After: 2' response header from (A)
|
||||||
|
// - BackoffManager.CalculateBackoff (will return 4s)
|
||||||
|
(4 * time.Second).String(),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"Watch": {
|
||||||
attempts: 2,
|
attempts: 2,
|
||||||
order: invokeOrderWant,
|
// Watch does not do 'RateLimiter.Wait' before initially sending the request to the server
|
||||||
|
order: invokeOrderWant[1:],
|
||||||
|
statusCodes: statusCodesWant,
|
||||||
|
sleeps: []string{
|
||||||
|
"0s",
|
||||||
|
(4 * time.Second).String(),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"Stream": {
|
||||||
|
attempts: 2,
|
||||||
|
order: invokeOrderWant,
|
||||||
|
statusCodes: statusCodesWant,
|
||||||
|
sleeps: []string{
|
||||||
|
"0s",
|
||||||
|
(4 * time.Second).String(),
|
||||||
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@ -2958,8 +3021,9 @@ func testRetryWithRateLimiterBackoffAndMetrics(t *testing.T, key string, doFunc
|
|||||||
for _, test := range tests {
|
for _, test := range tests {
|
||||||
t.Run(test.name, func(t *testing.T) {
|
t.Run(test.name, func(t *testing.T) {
|
||||||
interceptor := &withRateLimiterBackoffManagerAndMetrics{
|
interceptor := &withRateLimiterBackoffManagerAndMetrics{
|
||||||
RateLimiter: flowcontrol.NewFakeAlwaysRateLimiter(),
|
RateLimiter: flowcontrol.NewFakeAlwaysRateLimiter(),
|
||||||
NoBackoff: &NoBackoff{},
|
NoBackoff: &NoBackoff{},
|
||||||
|
calculateBackoffFn: test.calculateBackoffFn,
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: today this is the only site where a test overrides the
|
// TODO: today this is the only site where a test overrides the
|
||||||
@ -3027,11 +3091,11 @@ func testRetryWithRateLimiterBackoffAndMetrics(t *testing.T, key string, doFunc
|
|||||||
if !cmp.Equal(want.order, interceptor.invokeOrderGot) {
|
if !cmp.Equal(want.order, interceptor.invokeOrderGot) {
|
||||||
t.Errorf("%s: Expected invoke order to match, diff: %s", key, cmp.Diff(want.order, interceptor.invokeOrderGot))
|
t.Errorf("%s: Expected invoke order to match, diff: %s", key, cmp.Diff(want.order, interceptor.invokeOrderGot))
|
||||||
}
|
}
|
||||||
if !cmp.Equal(sleepWant, interceptor.sleepsGot) {
|
if !cmp.Equal(want.sleeps, interceptor.sleepsGot) {
|
||||||
t.Errorf("%s: Expected sleep sequence to match, diff: %s", key, cmp.Diff(sleepWant, interceptor.sleepsGot))
|
t.Errorf("%s: Expected sleep sequence to match, diff: %s", key, cmp.Diff(want.sleeps, interceptor.sleepsGot))
|
||||||
}
|
}
|
||||||
if !cmp.Equal(statusCodesWant, interceptor.statusCodesGot) {
|
if !cmp.Equal(want.statusCodes, interceptor.statusCodesGot) {
|
||||||
t.Errorf("%s: Expected status codes to match, diff: %s", key, cmp.Diff(statusCodesWant, interceptor.statusCodesGot))
|
t.Errorf("%s: Expected status codes to match, diff: %s", key, cmp.Diff(want.statusCodes, interceptor.statusCodesGot))
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
@ -204,7 +204,9 @@ func (r *withRetry) Before(ctx context.Context, request *Request) error {
|
|||||||
if r.retryAfter == nil {
|
if r.retryAfter == nil {
|
||||||
// we do a backoff sleep before the first attempt is made,
|
// we do a backoff sleep before the first attempt is made,
|
||||||
// (preserving current behavior).
|
// (preserving current behavior).
|
||||||
request.backoff.Sleep(request.backoff.CalculateBackoff(url))
|
if request.backoff != nil {
|
||||||
|
request.backoff.Sleep(request.backoff.CalculateBackoff(url))
|
||||||
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -222,12 +224,11 @@ func (r *withRetry) Before(ctx context.Context, request *Request) error {
|
|||||||
|
|
||||||
// if we are here, we have made attempt(s) al least once before.
|
// if we are here, we have made attempt(s) al least once before.
|
||||||
if request.backoff != nil {
|
if request.backoff != nil {
|
||||||
// TODO(tkashem) with default set to use exponential backoff
|
delay := request.backoff.CalculateBackoff(url)
|
||||||
// we can merge these two sleeps:
|
if r.retryAfter.Wait > delay {
|
||||||
// BackOffManager.Sleep(max(backoffManager.CalculateBackoff(), retryAfter))
|
delay = r.retryAfter.Wait
|
||||||
// see https://github.com/kubernetes/kubernetes/issues/108302
|
}
|
||||||
request.backoff.Sleep(r.retryAfter.Wait)
|
request.backoff.Sleep(delay)
|
||||||
request.backoff.Sleep(request.backoff.CalculateBackoff(url))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// We are retrying the request that we already send to
|
// We are retrying the request that we already send to
|
||||||
@ -349,8 +350,12 @@ func readAndCloseResponseBody(resp *http.Response) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func retryAfterResponse() *http.Response {
|
func retryAfterResponse() *http.Response {
|
||||||
|
return retryAfterResponseWithDelay("1")
|
||||||
|
}
|
||||||
|
|
||||||
|
func retryAfterResponseWithDelay(delay string) *http.Response {
|
||||||
return &http.Response{
|
return &http.Response{
|
||||||
StatusCode: http.StatusInternalServerError,
|
StatusCode: http.StatusInternalServerError,
|
||||||
Header: http.Header{"Retry-After": []string{"1"}},
|
Header: http.Header{"Retry-After": []string{delay}},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user