From 7763f75022edd7116338d3bd56ca8b6c5427c069 Mon Sep 17 00:00:00 2001 From: Abu Kashem Date: Tue, 29 Mar 2022 13:09:26 -0400 Subject: [PATCH] client-go: make retry in Request thread safe Kubernetes-commit: 091f4f00395272e23a777d6bf068d67793bf8931 --- rest/request.go | 38 +++++++++++++++++------- rest/request_test.go | 71 ++++++++++++++++++++++++++++++++++++++------ 2 files changed, 89 insertions(+), 20 deletions(-) diff --git a/rest/request.go b/rest/request.go index 5cc9900b0..2061fb543 100644 --- a/rest/request.go +++ b/rest/request.go @@ -82,6 +82,12 @@ func (r *RequestConstructionError) Error() string { var noBackoff = &NoBackoff{} +type requestRetryFunc func(maxRetries int) WithRetry + +func defaultRequestRetryFn(maxRetries int) WithRetry { + return &withRetry{maxRetries: maxRetries} +} + // Request allows for building up a request to a server in a chained fashion. // Any errors are stored until the end of your call, so you only have to // check once. @@ -93,6 +99,7 @@ type Request struct { rateLimiter flowcontrol.RateLimiter backoff BackoffManager timeout time.Duration + maxRetries int // generic components accessible via method setters verb string @@ -109,9 +116,10 @@ type Request struct { subresource string // output - err error - body io.Reader - retry WithRetry + err error + body io.Reader + + retryFn requestRetryFunc } // NewRequest creates a new request helper object for accessing runtime.Objects on a server. @@ -142,7 +150,8 @@ func NewRequest(c *RESTClient) *Request { backoff: backoff, timeout: timeout, pathPrefix: pathPrefix, - retry: &withRetry{maxRetries: 10}, + maxRetries: 10, + retryFn: defaultRequestRetryFn, warningHandler: c.warningHandler, } @@ -408,7 +417,10 @@ func (r *Request) Timeout(d time.Duration) *Request { // function is specifically called with a different value. // A zero maxRetries prevent it from doing retires and return an error immediately. func (r *Request) MaxRetries(maxRetries int) *Request { - r.retry.SetMaxRetries(maxRetries) + if maxRetries < 0 { + maxRetries = 0 + } + r.maxRetries = maxRetries return r } @@ -688,8 +700,10 @@ func (r *Request) Watch(ctx context.Context) (watch.Interface, error) { } return false } + var retryAfter *RetryAfter url := r.URL().String() + withRetry := r.retryFn(r.maxRetries) for { req, err := r.newHTTPRequest(ctx) if err != nil { @@ -724,9 +738,9 @@ func (r *Request) Watch(ctx context.Context) (watch.Interface, error) { defer readAndCloseResponseBody(resp) var retry bool - retryAfter, retry = r.retry.NextRetry(req, resp, err, isErrRetryableFunc) + retryAfter, retry = withRetry.NextRetry(req, resp, err, isErrRetryableFunc) if retry { - err := r.retry.BeforeNextRetry(ctx, r.backoff, retryAfter, url, r.body) + err := withRetry.BeforeNextRetry(ctx, r.backoff, retryAfter, url, r.body) if err == nil { return false, nil } @@ -817,6 +831,7 @@ func (r *Request) Stream(ctx context.Context) (io.ReadCloser, error) { } var retryAfter *RetryAfter + withRetry := r.retryFn(r.maxRetries) url := r.URL().String() for { req, err := r.newHTTPRequest(ctx) @@ -862,9 +877,9 @@ func (r *Request) Stream(ctx context.Context) (io.ReadCloser, error) { defer resp.Body.Close() var retry bool - retryAfter, retry = r.retry.NextRetry(req, resp, err, neverRetryError) + retryAfter, retry = withRetry.NextRetry(req, resp, err, neverRetryError) if retry { - err := r.retry.BeforeNextRetry(ctx, r.backoff, retryAfter, url, r.body) + err := withRetry.BeforeNextRetry(ctx, r.backoff, retryAfter, url, r.body) if err == nil { return false, nil } @@ -961,6 +976,7 @@ func (r *Request) request(ctx context.Context, fn func(*http.Request, *http.Resp // Right now we make about ten retry attempts if we get a Retry-After response. var retryAfter *RetryAfter + withRetry := r.retryFn(r.maxRetries) for { req, err := r.newHTTPRequest(ctx) if err != nil { @@ -997,7 +1013,7 @@ func (r *Request) request(ctx context.Context, fn func(*http.Request, *http.Resp } var retry bool - retryAfter, retry = r.retry.NextRetry(req, resp, err, func(req *http.Request, err error) bool { + retryAfter, retry = withRetry.NextRetry(req, resp, err, func(req *http.Request, err error) bool { // "Connection reset by peer" or "apiserver is shutting down" are usually a transient errors. // Thus in case of "GET" operations, we simply retry it. // We are not automatically retrying "write" operations, as they are not idempotent. @@ -1011,7 +1027,7 @@ func (r *Request) request(ctx context.Context, fn func(*http.Request, *http.Resp return false }) if retry { - err := r.retry.BeforeNextRetry(ctx, r.backoff, retryAfter, req.URL.String(), r.body) + err := withRetry.BeforeNextRetry(ctx, r.backoff, retryAfter, req.URL.String(), r.body) if err == nil { return false } diff --git a/rest/request_test.go b/rest/request_test.go index 8e9bb92b5..dc1c4e51a 100644 --- a/rest/request_test.go +++ b/rest/request_test.go @@ -32,6 +32,7 @@ import ( "reflect" "strings" "sync" + "sync/atomic" "syscall" "testing" "time" @@ -1194,7 +1195,8 @@ func TestRequestWatch(t *testing.T) { c.Client = client } testCase.Request.backoff = &noSleepBackOff{} - testCase.Request.retry = &withRetry{maxRetries: testCase.maxRetries} + testCase.Request.maxRetries = testCase.maxRetries + testCase.Request.retryFn = defaultRequestRetryFn watch, err := testCase.Request.Watch(context.Background()) @@ -1407,7 +1409,8 @@ func TestRequestStream(t *testing.T) { c.Client = client } testCase.Request.backoff = &noSleepBackOff{} - testCase.Request.retry = &withRetry{maxRetries: testCase.maxRetries} + testCase.Request.maxRetries = testCase.maxRetries + testCase.Request.retryFn = defaultRequestRetryFn body, err := testCase.Request.Stream(context.Background()) @@ -1462,7 +1465,7 @@ func TestRequestDo(t *testing.T) { } for i, testCase := range testCases { testCase.Request.backoff = &NoBackoff{} - testCase.Request.retry = &withRetry{} + testCase.Request.retryFn = defaultRequestRetryFn body, err := testCase.Request.Do(context.Background()).Raw() hasErr := err != nil if hasErr != testCase.Err { @@ -1625,8 +1628,9 @@ func TestConnectionResetByPeerIsRetried(t *testing.T) { return nil, &net.OpError{Err: syscall.ECONNRESET} }), }, - backoff: backoff, - retry: &withRetry{maxRetries: 10}, + backoff: backoff, + maxRetries: 10, + retryFn: defaultRequestRetryFn, } // We expect two retries of "connection reset by peer" and the success. _, err := req.Do(context.Background()).Raw() @@ -2699,8 +2703,9 @@ func TestRequestWithRetry(t *testing.T) { c: &RESTClient{ Client: client, }, - backoff: &noSleepBackOff{}, - retry: &withRetry{maxRetries: 1}, + backoff: &noSleepBackOff{}, + maxRetries: 1, + retryFn: defaultRequestRetryFn, } var transformFuncInvoked int @@ -2890,8 +2895,9 @@ func testRequestWithRetry(t *testing.T, key string, doFunc func(ctx context.Cont content: defaultContentConfig(), Client: client, }, - backoff: &noSleepBackOff{}, - retry: &withRetry{maxRetries: test.maxRetries}, + backoff: &noSleepBackOff{}, + maxRetries: test.maxRetries, + retryFn: defaultRequestRetryFn, } doFunc(context.Background(), req) @@ -3093,3 +3099,50 @@ func TestTransportConcurrency(t *testing.T) { }) } } + +func TestRequestConcurrencyWithRetry(t *testing.T) { + var attempts int32 + client := clientForFunc(func(req *http.Request) (*http.Response, error) { + defer func() { + atomic.AddInt32(&attempts, 1) + }() + + // always send a retry-after response + return &http.Response{ + StatusCode: http.StatusInternalServerError, + Header: http.Header{"Retry-After": []string{"1"}}, + }, nil + }) + + req := &Request{ + verb: "POST", + c: &RESTClient{ + content: defaultContentConfig(), + Client: client, + }, + backoff: &noSleepBackOff{}, + maxRetries: 9, // 10 attempts in total, including the first + retryFn: defaultRequestRetryFn, + } + + concurrency := 20 + wg := sync.WaitGroup{} + wg.Add(concurrency) + startCh := make(chan struct{}) + for i := 0; i < concurrency; i++ { + go func() { + defer wg.Done() + <-startCh + req.Do(context.Background()) + }() + } + + close(startCh) + wg.Wait() + + // we expect (concurrency*req.maxRetries+1) attempts to be recorded + expected := concurrency * (req.maxRetries + 1) + if atomic.LoadInt32(&attempts) != int32(expected) { + t.Errorf("Expected attempts: %d, but got: %d", expected, attempts) + } +}