From 34f3aff43e4f1f64b182fafbc2c86bc9b644de6a Mon Sep 17 00:00:00 2001 From: Abu Kashem Date: Thu, 17 Feb 2022 16:57:45 -0500 Subject: [PATCH] client-go: refactor retry logic for backoff, rate limiter and metric Kubernetes-commit: cecc563d3b9a9438cd3e6ae1576baa0a36f2d843 --- rest/request.go | 112 ++++++++--------------------- rest/request_test.go | 156 ++++++++++++++++++++++++++++++++++++++++ rest/with_retry.go | 150 +++++++++++++++++++++++++++----------- rest/with_retry_test.go | 15 +++- 4 files changed, 306 insertions(+), 127 deletions(-) diff --git a/rest/request.go b/rest/request.go index 7b63ad29..93bed5b9 100644 --- a/rest/request.go +++ b/rest/request.go @@ -610,7 +610,6 @@ func (r *Request) Watch(ctx context.Context) (watch.Interface, error) { } return false } - var retryAfter *RetryAfter url := r.URL().String() for { req, err := r.newHTTPRequest(ctx) @@ -618,26 +617,13 @@ func (r *Request) Watch(ctx context.Context) (watch.Interface, error) { return nil, err } - r.backoff.Sleep(r.backoff.CalculateBackoff(r.URL())) - if retryAfter != nil { - // We are retrying the request that we already send to apiserver - // at least once before. - // This request should also be throttled with the client-internal rate limiter. - if err := r.tryThrottleWithInfo(ctx, retryAfter.Reason); err != nil { - return nil, err - } - retryAfter = nil + if err := r.retry.Before(ctx, r); err != nil { + return nil, err } resp, err := client.Do(req) updateURLMetrics(ctx, r, resp, err) - if r.c.base != nil { - if err != nil { - r.backoff.UpdateBackoff(r.c.base, err, 0) - } else { - r.backoff.UpdateBackoff(r.c.base, err, resp.StatusCode) - } - } + r.retry.After(ctx, r, resp, err) if err == nil && resp.StatusCode == http.StatusOK { return r.newStreamWatcher(resp) } @@ -645,14 +631,8 @@ func (r *Request) Watch(ctx context.Context) (watch.Interface, error) { done, transformErr := func() (bool, error) { defer readAndCloseResponseBody(resp) - var retry bool - retryAfter, retry = r.retry.NextRetry(req, resp, err, isErrRetryableFunc) - if retry { - err := r.retry.BeforeNextRetry(ctx, r.backoff, retryAfter, url, r.body) - if err == nil { - return false, nil - } - klog.V(4).Infof("Could not retry request - %v", err) + if r.retry.IsNextRetry(ctx, r, req, resp, err, isErrRetryableFunc) { + return false, nil } if resp == nil { @@ -738,7 +718,6 @@ func (r *Request) Stream(ctx context.Context) (io.ReadCloser, error) { client = http.DefaultClient } - var retryAfter *RetryAfter url := r.URL().String() for { req, err := r.newHTTPRequest(ctx) @@ -749,26 +728,13 @@ func (r *Request) Stream(ctx context.Context) (io.ReadCloser, error) { req.Body = ioutil.NopCloser(r.body) } - r.backoff.Sleep(r.backoff.CalculateBackoff(r.URL())) - if retryAfter != nil { - // We are retrying the request that we already send to apiserver - // at least once before. - // This request should also be throttled with the client-internal rate limiter. - if err := r.tryThrottleWithInfo(ctx, retryAfter.Reason); err != nil { - return nil, err - } - retryAfter = nil + if err := r.retry.Before(ctx, r); err != nil { + return nil, err } resp, err := client.Do(req) updateURLMetrics(ctx, r, resp, err) - if r.c.base != nil { - if err != nil { - r.backoff.UpdateBackoff(r.URL(), err, 0) - } else { - r.backoff.UpdateBackoff(r.URL(), err, resp.StatusCode) - } - } + r.retry.After(ctx, r, resp, err) if err != nil { // we only retry on an HTTP response with 'Retry-After' header return nil, err @@ -783,14 +749,8 @@ func (r *Request) Stream(ctx context.Context) (io.ReadCloser, error) { done, transformErr := func() (bool, error) { defer resp.Body.Close() - var retry bool - retryAfter, retry = r.retry.NextRetry(req, resp, err, neverRetryError) - if retry { - err := r.retry.BeforeNextRetry(ctx, r.backoff, retryAfter, url, r.body) - if err == nil { - return false, nil - } - klog.V(4).Infof("Could not retry request - %v", err) + if r.retry.IsNextRetry(ctx, r, req, resp, err, neverRetryError) { + return false, nil } result := r.transformResponse(resp, req) if err := result.Error(); err != nil { @@ -881,23 +841,29 @@ func (r *Request) request(ctx context.Context, fn func(*http.Request, *http.Resp defer cancel() } + isErrRetryableFunc := func(req *http.Request, err error) bool { + // "Connection reset by peer" or "apiserver is shutting down" are usually a transient errors. + // Thus in case of "GET" operations, we simply retry it. + // We are not automatically retrying "write" operations, as they are not idempotent. + if req.Method != "GET" { + return false + } + // For connection errors and apiserver shutdown errors retry. + if net.IsConnectionReset(err) || net.IsProbableEOF(err) { + return true + } + return false + } + // Right now we make about ten retry attempts if we get a Retry-After response. - var retryAfter *RetryAfter for { req, err := r.newHTTPRequest(ctx) if err != nil { return err } - r.backoff.Sleep(r.backoff.CalculateBackoff(r.URL())) - if retryAfter != nil { - // We are retrying the request that we already send to apiserver - // at least once before. - // This request should also be throttled with the client-internal rate limiter. - if err := r.tryThrottleWithInfo(ctx, retryAfter.Reason); err != nil { - return err - } - retryAfter = nil + if err := r.retry.Before(ctx, r); err != nil { + return err } resp, err := client.Do(req) updateURLMetrics(ctx, r, resp, err) @@ -906,11 +872,7 @@ func (r *Request) request(ctx context.Context, fn func(*http.Request, *http.Resp if req.ContentLength >= 0 && !(req.Body != nil && req.ContentLength == 0) { metrics.RequestSize.Observe(ctx, r.verb, r.URL().Host, float64(req.ContentLength)) } - if err != nil { - r.backoff.UpdateBackoff(r.URL(), err, 0) - } else { - r.backoff.UpdateBackoff(r.URL(), err, resp.StatusCode) - } + r.retry.After(ctx, r, resp, err) done := func() bool { defer readAndCloseResponseBody(resp) @@ -923,26 +885,8 @@ func (r *Request) request(ctx context.Context, fn func(*http.Request, *http.Resp fn(req, resp) } - var retry bool - retryAfter, retry = r.retry.NextRetry(req, resp, err, func(req *http.Request, err error) bool { - // "Connection reset by peer" or "apiserver is shutting down" are usually a transient errors. - // Thus in case of "GET" operations, we simply retry it. - // We are not automatically retrying "write" operations, as they are not idempotent. - if r.verb != "GET" { - return false - } - // For connection errors and apiserver shutdown errors retry. - if net.IsConnectionReset(err) || net.IsProbableEOF(err) { - return true - } + if r.retry.IsNextRetry(ctx, r, req, resp, err, isErrRetryableFunc) { return false - }) - if retry { - err := r.retry.BeforeNextRetry(ctx, r.backoff, retryAfter, req.URL.String(), r.body) - if err == nil { - return false - } - klog.V(4).Infof("Could not retry request - %v", err) } f(req, resp) diff --git a/rest/request_test.go b/rest/request_test.go index c69b04dc..e2f6e81f 100644 --- a/rest/request_test.go +++ b/rest/request_test.go @@ -2584,6 +2584,34 @@ func TestRequestWatchRetryWithRateLimiterBackoffAndMetrics(t *testing.T) { }) } +func TestRequestDoWithRetryInvokeOrder(t *testing.T) { + // both request.Do and request.DoRaw have the same behavior and expectations + testWithRetryInvokeOrder(t, "Do", func(ctx context.Context, r *Request) { + r.DoRaw(ctx) + }) +} + +func TestRequestStreamWithRetryInvokeOrder(t *testing.T) { + testWithRetryInvokeOrder(t, "Stream", func(ctx context.Context, r *Request) { + r.Stream(ctx) + }) +} + +func TestRequestWatchWithRetryInvokeOrder(t *testing.T) { + testWithRetryInvokeOrder(t, "Watch", func(ctx context.Context, r *Request) { + w, err := r.Watch(ctx) + if err == nil { + // in this test the the response body returned by the server is always empty, + // this will cause StreamWatcher.receive() to: + // - return an io.EOF to indicate that the watch closed normally and + // - then close the io.Reader + // since we assert on the number of times 'Close' has been called on the + // body of the response object, we need to wait here to avoid race condition. + <-w.ResultChan() + } + }) +} + func testRequestWithRetry(t *testing.T, key string, doFunc func(ctx context.Context, r *Request)) { type expected struct { attempts int @@ -2968,6 +2996,134 @@ func testRetryWithRateLimiterBackoffAndMetrics(t *testing.T, key string, doFunc } } +type retryInterceptor struct { + WithRetry + invokeOrderGot []string +} + +func (ri *retryInterceptor) IsNextRetry(ctx context.Context, restReq *Request, httpReq *http.Request, resp *http.Response, err error, f IsRetryableErrorFunc) bool { + ri.invokeOrderGot = append(ri.invokeOrderGot, "WithRetry.IsNextRetry") + return ri.WithRetry.IsNextRetry(ctx, restReq, httpReq, resp, err, f) +} + +func (ri *retryInterceptor) Before(ctx context.Context, request *Request) error { + ri.invokeOrderGot = append(ri.invokeOrderGot, "WithRetry.Before") + return ri.WithRetry.Before(ctx, request) +} + +func (ri *retryInterceptor) After(ctx context.Context, request *Request, resp *http.Response, err error) { + ri.invokeOrderGot = append(ri.invokeOrderGot, "WithRetry.After") + ri.WithRetry.After(ctx, request, resp, err) +} + +func (ri *retryInterceptor) Do() { + ri.invokeOrderGot = append(ri.invokeOrderGot, "Client.Do") +} + +func testWithRetryInvokeOrder(t *testing.T, key string, doFunc func(ctx context.Context, r *Request)) { + // we define the expected order of how the client + // should invoke the retry interface + // scenario: + // - A: original request fails with a retryable response: (500, 'Retry-After: 1') + // - B: retry 1: successful with a status code 200 + // so we have a total of 2 attempts + defaultInvokeOrderWant := []string{ + // first attempt (A) + "WithRetry.Before", + "Client.Do", + "WithRetry.After", + // server returns a retryable response: (500, 'Retry-After: 1') + // IsNextRetry is expected to return true + "WithRetry.IsNextRetry", + + // second attempt (B) - retry 1: successful with a status code 200 + "WithRetry.Before", + "Client.Do", + "WithRetry.After", + // success: IsNextRetry is expected to return false + // Watch and Stream are an exception, they return as soon as the + // server sends a status code of success. + "WithRetry.IsNextRetry", + } + + tests := []struct { + name string + maxRetries int + serverReturns []responseErr + // expectations differ based on whether it is 'Watch', 'Stream' or 'Do' + expectations map[string][]string + }{ + { + name: "success after one retry", + maxRetries: 1, + serverReturns: []responseErr{ + {response: retryAfterResponse(), err: nil}, + {response: &http.Response{StatusCode: http.StatusOK}, err: nil}, + }, + expectations: map[string][]string{ + "Do": defaultInvokeOrderWant, + // Watch and Stream skip the final 'IsNextRetry' by returning + // as soon as they see a success from the server. + "Watch": defaultInvokeOrderWant[0 : len(defaultInvokeOrderWant)-1], + "Stream": defaultInvokeOrderWant[0 : len(defaultInvokeOrderWant)-1], + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + interceptor := &retryInterceptor{ + WithRetry: &withRetry{maxRetries: test.maxRetries}, + } + + var attempts int + client := clientForFunc(func(req *http.Request) (*http.Response, error) { + defer func() { + attempts++ + }() + + interceptor.Do() + resp := test.serverReturns[attempts].response + if resp != nil { + resp.Body = ioutil.NopCloser(bytes.NewReader([]byte{})) + } + return resp, test.serverReturns[attempts].err + }) + + base, err := url.Parse("http://foo.bar") + if err != nil { + t.Fatalf("Wrong test setup - did not find expected for: %s", key) + } + req := &Request{ + verb: "GET", + body: bytes.NewReader([]byte{}), + c: &RESTClient{ + base: base, + content: defaultContentConfig(), + Client: client, + }, + pathPrefix: "/api/v1", + rateLimiter: flowcontrol.NewFakeAlwaysRateLimiter(), + backoff: &NoBackoff{}, + retry: interceptor, + } + + doFunc(context.Background(), req) + + if attempts != 2 { + t.Errorf("%s: Expected attempts: %d, but got: %d", key, 2, attempts) + } + invokeOrderWant, ok := test.expectations[key] + if !ok { + t.Fatalf("Wrong test setup - did not find expected for: %s", key) + } + if !cmp.Equal(invokeOrderWant, interceptor.invokeOrderGot) { + t.Errorf("%s: Expected invoke order to match, diff: %s", key, cmp.Diff(invokeOrderWant, interceptor.invokeOrderGot)) + } + }) + } +} + func TestReuseRequest(t *testing.T) { var tests = []struct { name string diff --git a/rest/with_retry.go b/rest/with_retry.go index 1b7360b5..11b9b522 100644 --- a/rest/with_retry.go +++ b/rest/with_retry.go @@ -57,36 +57,32 @@ type WithRetry interface { // A zero maxRetries should prevent from doing any retry and return immediately. SetMaxRetries(maxRetries int) - // NextRetry advances the retry counter appropriately and returns true if the - // request should be retried, otherwise it returns false if: + // IsNextRetry advances the retry counter appropriately + // and returns true if the request should be retried, + // otherwise it returns false, if: // - we have already reached the maximum retry threshold. // - the error does not fall into the retryable category. // - the server has not sent us a 429, or 5xx status code and the // 'Retry-After' response header is not set with a value. + // - we need to seek to the beginning of the request body before we + // initiate the next retry, the function should log an error and + // return false if it fails to do so. // - // if retry is set to true, retryAfter will contain the information - // regarding the next retry. - // - // request: the original request sent to the server + // restReq: the associated rest.Request + // httpReq: the HTTP Request sent to the server // resp: the response sent from the server, it is set if err is nil // err: the server sent this error to us, if err is set then resp is nil. // f: a IsRetryableErrorFunc function provided by the client that determines // if the err sent by the server is retryable. - NextRetry(req *http.Request, resp *http.Response, err error, f IsRetryableErrorFunc) (*RetryAfter, bool) + IsNextRetry(ctx context.Context, restReq *Request, httpReq *http.Request, resp *http.Response, err error, f IsRetryableErrorFunc) bool - // BeforeNextRetry is responsible for carrying out operations that need - // to be completed before the next retry is initiated: - // - if the request context is already canceled there is no need to - // retry, the function will return ctx.Err(). - // - we need to seek to the beginning of the request body before we - // initiate the next retry, the function should return an error if - // it fails to do so. - // - we should wait the number of seconds the server has asked us to - // in the 'Retry-After' response header. - // - // If BeforeNextRetry returns an error the client should abort the retry, - // otherwise it is safe to initiate the next retry. - BeforeNextRetry(ctx context.Context, backoff BackoffManager, retryAfter *RetryAfter, url string, body io.Reader) error + // Before should be invoked prior to each attempt, including + // the first one. if an error is returned, the request + // should be aborted immediately. + Before(ctx context.Context, r *Request) error + + // After should be invoked immediately after an attempt is made. + After(ctx context.Context, r *Request, resp *http.Response, err error) } // RetryAfter holds information associated with the next retry. @@ -107,6 +103,14 @@ type RetryAfter struct { type withRetry struct { maxRetries int attempts int + + // retry after parameters that pertain to the attempt that is to + // be made soon, so as to enable 'Before' and 'After' to refer + // to the retry parameters. + // - for the first attempt, it will always be nil + // - for consecutive attempts, it is non nil and holds the + // retry after parameters for the next attempt to be made. + retryAfter *RetryAfter } func (r *withRetry) SetMaxRetries(maxRetries int) { @@ -116,28 +120,28 @@ func (r *withRetry) SetMaxRetries(maxRetries int) { r.maxRetries = maxRetries } -func (r *withRetry) NextRetry(req *http.Request, resp *http.Response, err error, f IsRetryableErrorFunc) (*RetryAfter, bool) { - if req == nil || (resp == nil && err == nil) { +func (r *withRetry) IsNextRetry(ctx context.Context, restReq *Request, httpReq *http.Request, resp *http.Response, err error, f IsRetryableErrorFunc) bool { + if httpReq == nil || (resp == nil && err == nil) { // bad input, we do nothing. - return nil, false + return false } r.attempts++ - retryAfter := &RetryAfter{Attempt: r.attempts} + r.retryAfter = &RetryAfter{Attempt: r.attempts} if r.attempts > r.maxRetries { - return retryAfter, false + return false } // if the server returned an error, it takes precedence over the http response. var errIsRetryable bool - if f != nil && err != nil && f.IsErrorRetryable(req, err) { + if f != nil && err != nil && f.IsErrorRetryable(httpReq, err) { errIsRetryable = true // we have a retryable error, for which we will create an // artificial "Retry-After" response. resp = retryAfterResponse() } if err != nil && !errIsRetryable { - return retryAfter, false + return false } // if we are here, we have either a or b: @@ -147,34 +151,100 @@ func (r *withRetry) NextRetry(req *http.Request, resp *http.Response, err error, // need to check if it is retryable seconds, wait := checkWait(resp) if !wait { - return retryAfter, false + return false } - retryAfter.Wait = time.Duration(seconds) * time.Second - retryAfter.Reason = getRetryReason(r.attempts, seconds, resp, err) - return retryAfter, true + r.retryAfter.Wait = time.Duration(seconds) * time.Second + r.retryAfter.Reason = getRetryReason(r.attempts, seconds, resp, err) + + if err := r.prepareForNextRetry(ctx, restReq); err != nil { + klog.V(4).Infof("Could not retry request - %v", err) + return false + } + + return true } -func (r *withRetry) BeforeNextRetry(ctx context.Context, backoff BackoffManager, retryAfter *RetryAfter, url string, body io.Reader) error { - // Ensure the response body is fully read and closed before - // we reconnect, so that we reuse the same TCP connection. +// prepareForNextRetry is responsible for carrying out operations that need +// to be completed before the next retry is initiated: +// - if the request context is already canceled there is no need to +// retry, the function will return ctx.Err(). +// - we need to seek to the beginning of the request body before we +// initiate the next retry, the function should return an error if +// it fails to do so. +func (r *withRetry) prepareForNextRetry(ctx context.Context, request *Request) error { if ctx.Err() != nil { return ctx.Err() } - if seeker, ok := body.(io.Seeker); ok && body != nil { + // Ensure the response body is fully read and closed before + // we reconnect, so that we reuse the same TCP connection. + if seeker, ok := request.body.(io.Seeker); ok && request.body != nil { if _, err := seeker.Seek(0, 0); err != nil { - return fmt.Errorf("can't Seek() back to beginning of body for %T", r) + return fmt.Errorf("can't Seek() back to beginning of body for %T", request) } } - klog.V(4).Infof("Got a Retry-After %s response for attempt %d to %v", retryAfter.Wait, retryAfter.Attempt, url) - if backoff != nil { - backoff.Sleep(retryAfter.Wait) - } + klog.V(4).Infof("Got a Retry-After %s response for attempt %d to %v", r.retryAfter.Wait, r.retryAfter.Attempt, request.URL().String()) return nil } +func (r *withRetry) Before(ctx context.Context, request *Request) error { + if ctx.Err() != nil { + return ctx.Err() + } + + url := request.URL() + + // r.retryAfter represents the retry after parameters calculated + // from the (response, err) tuple from the last attempt, so 'Before' + // can apply these retry after parameters prior to the next attempt. + // 'r.retryAfter == nil' indicates that this is the very first attempt. + if r.retryAfter == nil { + // we do a backoff sleep before the first attempt is made, + // (preserving current behavior). + request.backoff.Sleep(request.backoff.CalculateBackoff(url)) + return nil + } + + // if we are here, we have made attempt(s) al least once before. + if request.backoff != nil { + // TODO(tkashem) with default set to use exponential backoff + // we can merge these two sleeps: + // BackOffManager.Sleep(max(backoffManager.CalculateBackoff(), retryAfter)) + // see https://github.com/kubernetes/kubernetes/issues/108302 + request.backoff.Sleep(r.retryAfter.Wait) + request.backoff.Sleep(request.backoff.CalculateBackoff(url)) + } + + // We are retrying the request that we already send to + // apiserver at least once before. This request should + // also be throttled with the client-internal rate limiter. + if err := request.tryThrottleWithInfo(ctx, r.retryAfter.Reason); err != nil { + return err + } + + return nil +} + +func (r *withRetry) After(ctx context.Context, request *Request, resp *http.Response, err error) { + // 'After' is invoked immediately after an attempt is made, let's label + // the attempt we have just made as attempt 'N'. + // the current value of r.retryAfter represents the retry after + // parameters calculated from the (response, err) tuple from + // attempt N-1, so r.retryAfter is outdated and should not be + // referred to here. + r.retryAfter = nil + + if request.c.base != nil { + if err != nil { + request.backoff.UpdateBackoff(request.URL(), err, 0) + } else { + request.backoff.UpdateBackoff(request.URL(), err, resp.StatusCode) + } + } +} + // checkWait returns true along with a number of seconds if // the server instructed us to wait before retrying. func checkWait(resp *http.Response) (int, bool) { diff --git a/rest/with_retry_test.go b/rest/with_retry_test.go index 25b9016d..f9f495ff 100644 --- a/rest/with_retry_test.go +++ b/rest/with_retry_test.go @@ -17,8 +17,11 @@ limitations under the License. package rest import ( + "bytes" + "context" "errors" "net/http" + "net/url" "reflect" "testing" "time" @@ -30,7 +33,7 @@ var alwaysRetryError = IsRetryableErrorFunc(func(_ *http.Request, _ error) bool return true }) -func TestNextRetry(t *testing.T) { +func TestIsNextRetry(t *testing.T) { fakeError := errors.New("fake error") tests := []struct { name string @@ -205,14 +208,20 @@ func TestNextRetry(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { + restReq := &Request{ + body: bytes.NewReader([]byte{}), + c: &RESTClient{ + base: &url.URL{}, + }, + } r := &withRetry{maxRetries: test.maxRetries} retryGot := make([]bool, 0) retryAfterGot := make([]*RetryAfter, 0) for i := 0; i < test.attempts; i++ { - retryAfter, retry := r.NextRetry(test.request, test.response, test.err, test.retryableErrFunc) + retry := r.IsNextRetry(context.TODO(), restReq, test.request, test.response, test.err, test.retryableErrFunc) retryGot = append(retryGot, retry) - retryAfterGot = append(retryAfterGot, retryAfter) + retryAfterGot = append(retryAfterGot, r.retryAfter) } if !reflect.DeepEqual(test.retryExpected, retryGot) {