client-go: refactor retry logic for backoff, rate limiter and metric

Kubernetes-commit: cecc563d3b9a9438cd3e6ae1576baa0a36f2d843
This commit is contained in:
Abu Kashem 2022-02-17 16:57:45 -05:00 committed by Kubernetes Publisher
parent 8e46da3fd1
commit 34f3aff43e
4 changed files with 306 additions and 127 deletions

View File

@ -610,7 +610,6 @@ func (r *Request) Watch(ctx context.Context) (watch.Interface, error) {
}
return false
}
var retryAfter *RetryAfter
url := r.URL().String()
for {
req, err := r.newHTTPRequest(ctx)
@ -618,26 +617,13 @@ func (r *Request) Watch(ctx context.Context) (watch.Interface, error) {
return nil, err
}
r.backoff.Sleep(r.backoff.CalculateBackoff(r.URL()))
if retryAfter != nil {
// We are retrying the request that we already send to apiserver
// at least once before.
// This request should also be throttled with the client-internal rate limiter.
if err := r.tryThrottleWithInfo(ctx, retryAfter.Reason); err != nil {
return nil, err
}
retryAfter = nil
if err := r.retry.Before(ctx, r); err != nil {
return nil, err
}
resp, err := client.Do(req)
updateURLMetrics(ctx, r, resp, err)
if r.c.base != nil {
if err != nil {
r.backoff.UpdateBackoff(r.c.base, err, 0)
} else {
r.backoff.UpdateBackoff(r.c.base, err, resp.StatusCode)
}
}
r.retry.After(ctx, r, resp, err)
if err == nil && resp.StatusCode == http.StatusOK {
return r.newStreamWatcher(resp)
}
@ -645,14 +631,8 @@ func (r *Request) Watch(ctx context.Context) (watch.Interface, error) {
done, transformErr := func() (bool, error) {
defer readAndCloseResponseBody(resp)
var retry bool
retryAfter, retry = r.retry.NextRetry(req, resp, err, isErrRetryableFunc)
if retry {
err := r.retry.BeforeNextRetry(ctx, r.backoff, retryAfter, url, r.body)
if err == nil {
return false, nil
}
klog.V(4).Infof("Could not retry request - %v", err)
if r.retry.IsNextRetry(ctx, r, req, resp, err, isErrRetryableFunc) {
return false, nil
}
if resp == nil {
@ -738,7 +718,6 @@ func (r *Request) Stream(ctx context.Context) (io.ReadCloser, error) {
client = http.DefaultClient
}
var retryAfter *RetryAfter
url := r.URL().String()
for {
req, err := r.newHTTPRequest(ctx)
@ -749,26 +728,13 @@ func (r *Request) Stream(ctx context.Context) (io.ReadCloser, error) {
req.Body = ioutil.NopCloser(r.body)
}
r.backoff.Sleep(r.backoff.CalculateBackoff(r.URL()))
if retryAfter != nil {
// We are retrying the request that we already send to apiserver
// at least once before.
// This request should also be throttled with the client-internal rate limiter.
if err := r.tryThrottleWithInfo(ctx, retryAfter.Reason); err != nil {
return nil, err
}
retryAfter = nil
if err := r.retry.Before(ctx, r); err != nil {
return nil, err
}
resp, err := client.Do(req)
updateURLMetrics(ctx, r, resp, err)
if r.c.base != nil {
if err != nil {
r.backoff.UpdateBackoff(r.URL(), err, 0)
} else {
r.backoff.UpdateBackoff(r.URL(), err, resp.StatusCode)
}
}
r.retry.After(ctx, r, resp, err)
if err != nil {
// we only retry on an HTTP response with 'Retry-After' header
return nil, err
@ -783,14 +749,8 @@ func (r *Request) Stream(ctx context.Context) (io.ReadCloser, error) {
done, transformErr := func() (bool, error) {
defer resp.Body.Close()
var retry bool
retryAfter, retry = r.retry.NextRetry(req, resp, err, neverRetryError)
if retry {
err := r.retry.BeforeNextRetry(ctx, r.backoff, retryAfter, url, r.body)
if err == nil {
return false, nil
}
klog.V(4).Infof("Could not retry request - %v", err)
if r.retry.IsNextRetry(ctx, r, req, resp, err, neverRetryError) {
return false, nil
}
result := r.transformResponse(resp, req)
if err := result.Error(); err != nil {
@ -881,23 +841,29 @@ func (r *Request) request(ctx context.Context, fn func(*http.Request, *http.Resp
defer cancel()
}
isErrRetryableFunc := func(req *http.Request, err error) bool {
// "Connection reset by peer" or "apiserver is shutting down" are usually a transient errors.
// Thus in case of "GET" operations, we simply retry it.
// We are not automatically retrying "write" operations, as they are not idempotent.
if req.Method != "GET" {
return false
}
// For connection errors and apiserver shutdown errors retry.
if net.IsConnectionReset(err) || net.IsProbableEOF(err) {
return true
}
return false
}
// Right now we make about ten retry attempts if we get a Retry-After response.
var retryAfter *RetryAfter
for {
req, err := r.newHTTPRequest(ctx)
if err != nil {
return err
}
r.backoff.Sleep(r.backoff.CalculateBackoff(r.URL()))
if retryAfter != nil {
// We are retrying the request that we already send to apiserver
// at least once before.
// This request should also be throttled with the client-internal rate limiter.
if err := r.tryThrottleWithInfo(ctx, retryAfter.Reason); err != nil {
return err
}
retryAfter = nil
if err := r.retry.Before(ctx, r); err != nil {
return err
}
resp, err := client.Do(req)
updateURLMetrics(ctx, r, resp, err)
@ -906,11 +872,7 @@ func (r *Request) request(ctx context.Context, fn func(*http.Request, *http.Resp
if req.ContentLength >= 0 && !(req.Body != nil && req.ContentLength == 0) {
metrics.RequestSize.Observe(ctx, r.verb, r.URL().Host, float64(req.ContentLength))
}
if err != nil {
r.backoff.UpdateBackoff(r.URL(), err, 0)
} else {
r.backoff.UpdateBackoff(r.URL(), err, resp.StatusCode)
}
r.retry.After(ctx, r, resp, err)
done := func() bool {
defer readAndCloseResponseBody(resp)
@ -923,26 +885,8 @@ func (r *Request) request(ctx context.Context, fn func(*http.Request, *http.Resp
fn(req, resp)
}
var retry bool
retryAfter, retry = r.retry.NextRetry(req, resp, err, func(req *http.Request, err error) bool {
// "Connection reset by peer" or "apiserver is shutting down" are usually a transient errors.
// Thus in case of "GET" operations, we simply retry it.
// We are not automatically retrying "write" operations, as they are not idempotent.
if r.verb != "GET" {
return false
}
// For connection errors and apiserver shutdown errors retry.
if net.IsConnectionReset(err) || net.IsProbableEOF(err) {
return true
}
if r.retry.IsNextRetry(ctx, r, req, resp, err, isErrRetryableFunc) {
return false
})
if retry {
err := r.retry.BeforeNextRetry(ctx, r.backoff, retryAfter, req.URL.String(), r.body)
if err == nil {
return false
}
klog.V(4).Infof("Could not retry request - %v", err)
}
f(req, resp)

View File

@ -2584,6 +2584,34 @@ func TestRequestWatchRetryWithRateLimiterBackoffAndMetrics(t *testing.T) {
})
}
func TestRequestDoWithRetryInvokeOrder(t *testing.T) {
// both request.Do and request.DoRaw have the same behavior and expectations
testWithRetryInvokeOrder(t, "Do", func(ctx context.Context, r *Request) {
r.DoRaw(ctx)
})
}
func TestRequestStreamWithRetryInvokeOrder(t *testing.T) {
testWithRetryInvokeOrder(t, "Stream", func(ctx context.Context, r *Request) {
r.Stream(ctx)
})
}
func TestRequestWatchWithRetryInvokeOrder(t *testing.T) {
testWithRetryInvokeOrder(t, "Watch", func(ctx context.Context, r *Request) {
w, err := r.Watch(ctx)
if err == nil {
// in this test the the response body returned by the server is always empty,
// this will cause StreamWatcher.receive() to:
// - return an io.EOF to indicate that the watch closed normally and
// - then close the io.Reader
// since we assert on the number of times 'Close' has been called on the
// body of the response object, we need to wait here to avoid race condition.
<-w.ResultChan()
}
})
}
func testRequestWithRetry(t *testing.T, key string, doFunc func(ctx context.Context, r *Request)) {
type expected struct {
attempts int
@ -2968,6 +2996,134 @@ func testRetryWithRateLimiterBackoffAndMetrics(t *testing.T, key string, doFunc
}
}
type retryInterceptor struct {
WithRetry
invokeOrderGot []string
}
func (ri *retryInterceptor) IsNextRetry(ctx context.Context, restReq *Request, httpReq *http.Request, resp *http.Response, err error, f IsRetryableErrorFunc) bool {
ri.invokeOrderGot = append(ri.invokeOrderGot, "WithRetry.IsNextRetry")
return ri.WithRetry.IsNextRetry(ctx, restReq, httpReq, resp, err, f)
}
func (ri *retryInterceptor) Before(ctx context.Context, request *Request) error {
ri.invokeOrderGot = append(ri.invokeOrderGot, "WithRetry.Before")
return ri.WithRetry.Before(ctx, request)
}
func (ri *retryInterceptor) After(ctx context.Context, request *Request, resp *http.Response, err error) {
ri.invokeOrderGot = append(ri.invokeOrderGot, "WithRetry.After")
ri.WithRetry.After(ctx, request, resp, err)
}
func (ri *retryInterceptor) Do() {
ri.invokeOrderGot = append(ri.invokeOrderGot, "Client.Do")
}
func testWithRetryInvokeOrder(t *testing.T, key string, doFunc func(ctx context.Context, r *Request)) {
// we define the expected order of how the client
// should invoke the retry interface
// scenario:
// - A: original request fails with a retryable response: (500, 'Retry-After: 1')
// - B: retry 1: successful with a status code 200
// so we have a total of 2 attempts
defaultInvokeOrderWant := []string{
// first attempt (A)
"WithRetry.Before",
"Client.Do",
"WithRetry.After",
// server returns a retryable response: (500, 'Retry-After: 1')
// IsNextRetry is expected to return true
"WithRetry.IsNextRetry",
// second attempt (B) - retry 1: successful with a status code 200
"WithRetry.Before",
"Client.Do",
"WithRetry.After",
// success: IsNextRetry is expected to return false
// Watch and Stream are an exception, they return as soon as the
// server sends a status code of success.
"WithRetry.IsNextRetry",
}
tests := []struct {
name string
maxRetries int
serverReturns []responseErr
// expectations differ based on whether it is 'Watch', 'Stream' or 'Do'
expectations map[string][]string
}{
{
name: "success after one retry",
maxRetries: 1,
serverReturns: []responseErr{
{response: retryAfterResponse(), err: nil},
{response: &http.Response{StatusCode: http.StatusOK}, err: nil},
},
expectations: map[string][]string{
"Do": defaultInvokeOrderWant,
// Watch and Stream skip the final 'IsNextRetry' by returning
// as soon as they see a success from the server.
"Watch": defaultInvokeOrderWant[0 : len(defaultInvokeOrderWant)-1],
"Stream": defaultInvokeOrderWant[0 : len(defaultInvokeOrderWant)-1],
},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
interceptor := &retryInterceptor{
WithRetry: &withRetry{maxRetries: test.maxRetries},
}
var attempts int
client := clientForFunc(func(req *http.Request) (*http.Response, error) {
defer func() {
attempts++
}()
interceptor.Do()
resp := test.serverReturns[attempts].response
if resp != nil {
resp.Body = ioutil.NopCloser(bytes.NewReader([]byte{}))
}
return resp, test.serverReturns[attempts].err
})
base, err := url.Parse("http://foo.bar")
if err != nil {
t.Fatalf("Wrong test setup - did not find expected for: %s", key)
}
req := &Request{
verb: "GET",
body: bytes.NewReader([]byte{}),
c: &RESTClient{
base: base,
content: defaultContentConfig(),
Client: client,
},
pathPrefix: "/api/v1",
rateLimiter: flowcontrol.NewFakeAlwaysRateLimiter(),
backoff: &NoBackoff{},
retry: interceptor,
}
doFunc(context.Background(), req)
if attempts != 2 {
t.Errorf("%s: Expected attempts: %d, but got: %d", key, 2, attempts)
}
invokeOrderWant, ok := test.expectations[key]
if !ok {
t.Fatalf("Wrong test setup - did not find expected for: %s", key)
}
if !cmp.Equal(invokeOrderWant, interceptor.invokeOrderGot) {
t.Errorf("%s: Expected invoke order to match, diff: %s", key, cmp.Diff(invokeOrderWant, interceptor.invokeOrderGot))
}
})
}
}
func TestReuseRequest(t *testing.T) {
var tests = []struct {
name string

View File

@ -57,36 +57,32 @@ type WithRetry interface {
// A zero maxRetries should prevent from doing any retry and return immediately.
SetMaxRetries(maxRetries int)
// NextRetry advances the retry counter appropriately and returns true if the
// request should be retried, otherwise it returns false if:
// IsNextRetry advances the retry counter appropriately
// and returns true if the request should be retried,
// otherwise it returns false, if:
// - we have already reached the maximum retry threshold.
// - the error does not fall into the retryable category.
// - the server has not sent us a 429, or 5xx status code and the
// 'Retry-After' response header is not set with a value.
// - we need to seek to the beginning of the request body before we
// initiate the next retry, the function should log an error and
// return false if it fails to do so.
//
// if retry is set to true, retryAfter will contain the information
// regarding the next retry.
//
// request: the original request sent to the server
// restReq: the associated rest.Request
// httpReq: the HTTP Request sent to the server
// resp: the response sent from the server, it is set if err is nil
// err: the server sent this error to us, if err is set then resp is nil.
// f: a IsRetryableErrorFunc function provided by the client that determines
// if the err sent by the server is retryable.
NextRetry(req *http.Request, resp *http.Response, err error, f IsRetryableErrorFunc) (*RetryAfter, bool)
IsNextRetry(ctx context.Context, restReq *Request, httpReq *http.Request, resp *http.Response, err error, f IsRetryableErrorFunc) bool
// BeforeNextRetry is responsible for carrying out operations that need
// to be completed before the next retry is initiated:
// - if the request context is already canceled there is no need to
// retry, the function will return ctx.Err().
// - we need to seek to the beginning of the request body before we
// initiate the next retry, the function should return an error if
// it fails to do so.
// - we should wait the number of seconds the server has asked us to
// in the 'Retry-After' response header.
//
// If BeforeNextRetry returns an error the client should abort the retry,
// otherwise it is safe to initiate the next retry.
BeforeNextRetry(ctx context.Context, backoff BackoffManager, retryAfter *RetryAfter, url string, body io.Reader) error
// Before should be invoked prior to each attempt, including
// the first one. if an error is returned, the request
// should be aborted immediately.
Before(ctx context.Context, r *Request) error
// After should be invoked immediately after an attempt is made.
After(ctx context.Context, r *Request, resp *http.Response, err error)
}
// RetryAfter holds information associated with the next retry.
@ -107,6 +103,14 @@ type RetryAfter struct {
type withRetry struct {
maxRetries int
attempts int
// retry after parameters that pertain to the attempt that is to
// be made soon, so as to enable 'Before' and 'After' to refer
// to the retry parameters.
// - for the first attempt, it will always be nil
// - for consecutive attempts, it is non nil and holds the
// retry after parameters for the next attempt to be made.
retryAfter *RetryAfter
}
func (r *withRetry) SetMaxRetries(maxRetries int) {
@ -116,28 +120,28 @@ func (r *withRetry) SetMaxRetries(maxRetries int) {
r.maxRetries = maxRetries
}
func (r *withRetry) NextRetry(req *http.Request, resp *http.Response, err error, f IsRetryableErrorFunc) (*RetryAfter, bool) {
if req == nil || (resp == nil && err == nil) {
func (r *withRetry) IsNextRetry(ctx context.Context, restReq *Request, httpReq *http.Request, resp *http.Response, err error, f IsRetryableErrorFunc) bool {
if httpReq == nil || (resp == nil && err == nil) {
// bad input, we do nothing.
return nil, false
return false
}
r.attempts++
retryAfter := &RetryAfter{Attempt: r.attempts}
r.retryAfter = &RetryAfter{Attempt: r.attempts}
if r.attempts > r.maxRetries {
return retryAfter, false
return false
}
// if the server returned an error, it takes precedence over the http response.
var errIsRetryable bool
if f != nil && err != nil && f.IsErrorRetryable(req, err) {
if f != nil && err != nil && f.IsErrorRetryable(httpReq, err) {
errIsRetryable = true
// we have a retryable error, for which we will create an
// artificial "Retry-After" response.
resp = retryAfterResponse()
}
if err != nil && !errIsRetryable {
return retryAfter, false
return false
}
// if we are here, we have either a or b:
@ -147,34 +151,100 @@ func (r *withRetry) NextRetry(req *http.Request, resp *http.Response, err error,
// need to check if it is retryable
seconds, wait := checkWait(resp)
if !wait {
return retryAfter, false
return false
}
retryAfter.Wait = time.Duration(seconds) * time.Second
retryAfter.Reason = getRetryReason(r.attempts, seconds, resp, err)
return retryAfter, true
r.retryAfter.Wait = time.Duration(seconds) * time.Second
r.retryAfter.Reason = getRetryReason(r.attempts, seconds, resp, err)
if err := r.prepareForNextRetry(ctx, restReq); err != nil {
klog.V(4).Infof("Could not retry request - %v", err)
return false
}
return true
}
func (r *withRetry) BeforeNextRetry(ctx context.Context, backoff BackoffManager, retryAfter *RetryAfter, url string, body io.Reader) error {
// Ensure the response body is fully read and closed before
// we reconnect, so that we reuse the same TCP connection.
// prepareForNextRetry is responsible for carrying out operations that need
// to be completed before the next retry is initiated:
// - if the request context is already canceled there is no need to
// retry, the function will return ctx.Err().
// - we need to seek to the beginning of the request body before we
// initiate the next retry, the function should return an error if
// it fails to do so.
func (r *withRetry) prepareForNextRetry(ctx context.Context, request *Request) error {
if ctx.Err() != nil {
return ctx.Err()
}
if seeker, ok := body.(io.Seeker); ok && body != nil {
// Ensure the response body is fully read and closed before
// we reconnect, so that we reuse the same TCP connection.
if seeker, ok := request.body.(io.Seeker); ok && request.body != nil {
if _, err := seeker.Seek(0, 0); err != nil {
return fmt.Errorf("can't Seek() back to beginning of body for %T", r)
return fmt.Errorf("can't Seek() back to beginning of body for %T", request)
}
}
klog.V(4).Infof("Got a Retry-After %s response for attempt %d to %v", retryAfter.Wait, retryAfter.Attempt, url)
if backoff != nil {
backoff.Sleep(retryAfter.Wait)
}
klog.V(4).Infof("Got a Retry-After %s response for attempt %d to %v", r.retryAfter.Wait, r.retryAfter.Attempt, request.URL().String())
return nil
}
func (r *withRetry) Before(ctx context.Context, request *Request) error {
if ctx.Err() != nil {
return ctx.Err()
}
url := request.URL()
// r.retryAfter represents the retry after parameters calculated
// from the (response, err) tuple from the last attempt, so 'Before'
// can apply these retry after parameters prior to the next attempt.
// 'r.retryAfter == nil' indicates that this is the very first attempt.
if r.retryAfter == nil {
// we do a backoff sleep before the first attempt is made,
// (preserving current behavior).
request.backoff.Sleep(request.backoff.CalculateBackoff(url))
return nil
}
// if we are here, we have made attempt(s) al least once before.
if request.backoff != nil {
// TODO(tkashem) with default set to use exponential backoff
// we can merge these two sleeps:
// BackOffManager.Sleep(max(backoffManager.CalculateBackoff(), retryAfter))
// see https://github.com/kubernetes/kubernetes/issues/108302
request.backoff.Sleep(r.retryAfter.Wait)
request.backoff.Sleep(request.backoff.CalculateBackoff(url))
}
// We are retrying the request that we already send to
// apiserver at least once before. This request should
// also be throttled with the client-internal rate limiter.
if err := request.tryThrottleWithInfo(ctx, r.retryAfter.Reason); err != nil {
return err
}
return nil
}
func (r *withRetry) After(ctx context.Context, request *Request, resp *http.Response, err error) {
// 'After' is invoked immediately after an attempt is made, let's label
// the attempt we have just made as attempt 'N'.
// the current value of r.retryAfter represents the retry after
// parameters calculated from the (response, err) tuple from
// attempt N-1, so r.retryAfter is outdated and should not be
// referred to here.
r.retryAfter = nil
if request.c.base != nil {
if err != nil {
request.backoff.UpdateBackoff(request.URL(), err, 0)
} else {
request.backoff.UpdateBackoff(request.URL(), err, resp.StatusCode)
}
}
}
// checkWait returns true along with a number of seconds if
// the server instructed us to wait before retrying.
func checkWait(resp *http.Response) (int, bool) {

View File

@ -17,8 +17,11 @@ limitations under the License.
package rest
import (
"bytes"
"context"
"errors"
"net/http"
"net/url"
"reflect"
"testing"
"time"
@ -30,7 +33,7 @@ var alwaysRetryError = IsRetryableErrorFunc(func(_ *http.Request, _ error) bool
return true
})
func TestNextRetry(t *testing.T) {
func TestIsNextRetry(t *testing.T) {
fakeError := errors.New("fake error")
tests := []struct {
name string
@ -205,14 +208,20 @@ func TestNextRetry(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
restReq := &Request{
body: bytes.NewReader([]byte{}),
c: &RESTClient{
base: &url.URL{},
},
}
r := &withRetry{maxRetries: test.maxRetries}
retryGot := make([]bool, 0)
retryAfterGot := make([]*RetryAfter, 0)
for i := 0; i < test.attempts; i++ {
retryAfter, retry := r.NextRetry(test.request, test.response, test.err, test.retryableErrFunc)
retry := r.IsNextRetry(context.TODO(), restReq, test.request, test.response, test.err, test.retryableErrFunc)
retryGot = append(retryGot, retry)
retryAfterGot = append(retryAfterGot, retryAfter)
retryAfterGot = append(retryAfterGot, r.retryAfter)
}
if !reflect.DeepEqual(test.retryExpected, retryGot) {