From 6618b8ef7c0b552839555d4578b64427d20524ef Mon Sep 17 00:00:00 2001 From: Abu Kashem Date: Tue, 29 Mar 2022 13:09:26 -0400 Subject: [PATCH] client-go: make retry in Request thread safe --- staging/src/k8s.io/client-go/rest/request.go | 52 ++++++++++++------- .../src/k8s.io/client-go/rest/request_test.go | 36 ++++++++----- .../src/k8s.io/client-go/rest/with_retry.go | 13 ----- 3 files changed, 56 insertions(+), 45 deletions(-) diff --git a/staging/src/k8s.io/client-go/rest/request.go b/staging/src/k8s.io/client-go/rest/request.go index ab9348d9ccb..3a1560df0dc 100644 --- a/staging/src/k8s.io/client-go/rest/request.go +++ b/staging/src/k8s.io/client-go/rest/request.go @@ -82,6 +82,12 @@ func (r *RequestConstructionError) Error() string { var noBackoff = &NoBackoff{} +type requestRetryFunc func(maxRetries int) WithRetry + +func defaultRequestRetryFn(maxRetries int) WithRetry { + return &withRetry{maxRetries: maxRetries} +} + // Request allows for building up a request to a server in a chained fashion. // Any errors are stored until the end of your call, so you only have to // check once. @@ -93,6 +99,7 @@ type Request struct { rateLimiter flowcontrol.RateLimiter backoff BackoffManager timeout time.Duration + maxRetries int // generic components accessible via method setters verb string @@ -109,9 +116,10 @@ type Request struct { subresource string // output - err error - body io.Reader - retry WithRetry + err error + body io.Reader + + retryFn requestRetryFunc } // NewRequest creates a new request helper object for accessing runtime.Objects on a server. @@ -142,7 +150,8 @@ func NewRequest(c *RESTClient) *Request { backoff: backoff, timeout: timeout, pathPrefix: pathPrefix, - retry: &withRetry{maxRetries: 10}, + maxRetries: 10, + retryFn: defaultRequestRetryFn, warningHandler: c.warningHandler, } @@ -408,7 +417,10 @@ func (r *Request) Timeout(d time.Duration) *Request { // function is specifically called with a different value. // A zero maxRetries prevent it from doing retires and return an error immediately. func (r *Request) MaxRetries(maxRetries int) *Request { - r.retry.SetMaxRetries(maxRetries) + if maxRetries < 0 { + maxRetries = 0 + } + r.maxRetries = maxRetries return r } @@ -612,19 +624,21 @@ func (r *Request) Watch(ctx context.Context) (watch.Interface, error) { } return false } + retry := r.retryFn(r.maxRetries) url := r.URL().String() for { - if err := r.retry.Before(ctx, r); err != nil { - return nil, r.retry.WrapPreviousError(err) + if err := retry.Before(ctx, r); err != nil { + return nil, retry.WrapPreviousError(err) } req, err := r.newHTTPRequest(ctx) if err != nil { return nil, err } + resp, err := client.Do(req) updateURLMetrics(ctx, r, resp, err) - r.retry.After(ctx, r, resp, err) + retry.After(ctx, r, resp, err) if err == nil && resp.StatusCode == http.StatusOK { return r.newStreamWatcher(resp) } @@ -632,7 +646,7 @@ func (r *Request) Watch(ctx context.Context) (watch.Interface, error) { done, transformErr := func() (bool, error) { defer readAndCloseResponseBody(resp) - if r.retry.IsNextRetry(ctx, r, req, resp, err, isErrRetryableFunc) { + if retry.IsNextRetry(ctx, r, req, resp, err, isErrRetryableFunc) { return false, nil } @@ -654,7 +668,7 @@ func (r *Request) Watch(ctx context.Context) (watch.Interface, error) { // we need to return the error object from that. err = transformErr } - return nil, r.retry.WrapPreviousError(err) + return nil, retry.WrapPreviousError(err) } } } @@ -719,9 +733,10 @@ func (r *Request) Stream(ctx context.Context) (io.ReadCloser, error) { client = http.DefaultClient } + retry := r.retryFn(r.maxRetries) url := r.URL().String() for { - if err := r.retry.Before(ctx, r); err != nil { + if err := retry.Before(ctx, r); err != nil { return nil, err } @@ -734,7 +749,7 @@ func (r *Request) Stream(ctx context.Context) (io.ReadCloser, error) { } resp, err := client.Do(req) updateURLMetrics(ctx, r, resp, err) - r.retry.After(ctx, r, resp, err) + retry.After(ctx, r, resp, err) if err != nil { // we only retry on an HTTP response with 'Retry-After' header return nil, err @@ -749,7 +764,7 @@ func (r *Request) Stream(ctx context.Context) (io.ReadCloser, error) { done, transformErr := func() (bool, error) { defer resp.Body.Close() - if r.retry.IsNextRetry(ctx, r, req, resp, err, neverRetryError) { + if retry.IsNextRetry(ctx, r, req, resp, err, neverRetryError) { return false, nil } result := r.transformResponse(resp, req) @@ -856,9 +871,10 @@ func (r *Request) request(ctx context.Context, fn func(*http.Request, *http.Resp } // Right now we make about ten retry attempts if we get a Retry-After response. + retry := r.retryFn(r.maxRetries) for { - if err := r.retry.Before(ctx, r); err != nil { - return r.retry.WrapPreviousError(err) + if err := retry.Before(ctx, r); err != nil { + return retry.WrapPreviousError(err) } req, err := r.newHTTPRequest(ctx) if err != nil { @@ -871,7 +887,7 @@ func (r *Request) request(ctx context.Context, fn func(*http.Request, *http.Resp if req.ContentLength >= 0 && !(req.Body != nil && req.ContentLength == 0) { metrics.RequestSize.Observe(ctx, r.verb, r.URL().Host, float64(req.ContentLength)) } - r.retry.After(ctx, r, resp, err) + retry.After(ctx, r, resp, err) done := func() bool { defer readAndCloseResponseBody(resp) @@ -884,7 +900,7 @@ func (r *Request) request(ctx context.Context, fn func(*http.Request, *http.Resp fn(req, resp) } - if r.retry.IsNextRetry(ctx, r, req, resp, err, isErrRetryableFunc) { + if retry.IsNextRetry(ctx, r, req, resp, err, isErrRetryableFunc) { return false } @@ -892,7 +908,7 @@ func (r *Request) request(ctx context.Context, fn func(*http.Request, *http.Resp return true }() if done { - return r.retry.WrapPreviousError(err) + return retry.WrapPreviousError(err) } } } diff --git a/staging/src/k8s.io/client-go/rest/request_test.go b/staging/src/k8s.io/client-go/rest/request_test.go index 8c3c6508201..c685fce7b7e 100644 --- a/staging/src/k8s.io/client-go/rest/request_test.go +++ b/staging/src/k8s.io/client-go/rest/request_test.go @@ -998,7 +998,8 @@ func TestRequestWatch(t *testing.T) { c.Client = client } testCase.Request.backoff = &noSleepBackOff{} - testCase.Request.retry = &withRetry{maxRetries: testCase.maxRetries} + testCase.Request.maxRetries = testCase.maxRetries + testCase.Request.retryFn = defaultRequestRetryFn watch, err := testCase.Request.Watch(context.Background()) @@ -1211,7 +1212,8 @@ func TestRequestStream(t *testing.T) { c.Client = client } testCase.Request.backoff = &noSleepBackOff{} - testCase.Request.retry = &withRetry{maxRetries: testCase.maxRetries} + testCase.Request.maxRetries = testCase.maxRetries + testCase.Request.retryFn = defaultRequestRetryFn body, err := testCase.Request.Stream(context.Background()) @@ -1266,7 +1268,7 @@ func TestRequestDo(t *testing.T) { } for i, testCase := range testCases { testCase.Request.backoff = &NoBackoff{} - testCase.Request.retry = &withRetry{} + testCase.Request.retryFn = defaultRequestRetryFn body, err := testCase.Request.Do(context.Background()).Raw() hasErr := err != nil if hasErr != testCase.Err { @@ -1429,8 +1431,9 @@ func TestConnectionResetByPeerIsRetried(t *testing.T) { return nil, &net.OpError{Err: syscall.ECONNRESET} }), }, - backoff: backoff, - retry: &withRetry{maxRetries: 10}, + backoff: backoff, + maxRetries: 10, + retryFn: defaultRequestRetryFn, } // We expect two retries of "connection reset by peer" and the success. _, err := req.Do(context.Background()).Raw() @@ -2504,8 +2507,9 @@ func TestRequestWithRetry(t *testing.T) { c: &RESTClient{ Client: client, }, - backoff: &noSleepBackOff{}, - retry: &withRetry{maxRetries: 1}, + backoff: &noSleepBackOff{}, + maxRetries: 1, + retryFn: defaultRequestRetryFn, } var transformFuncInvoked int @@ -2782,8 +2786,9 @@ func testRequestWithRetry(t *testing.T, key string, doFunc func(ctx context.Cont content: defaultContentConfig(), Client: client, }, - backoff: &noSleepBackOff{}, - retry: &withRetry{maxRetries: test.maxRetries}, + backoff: &noSleepBackOff{}, + maxRetries: test.maxRetries, + retryFn: defaultRequestRetryFn, } doFunc(context.Background(), req) @@ -3006,7 +3011,8 @@ func testRetryWithRateLimiterBackoffAndMetrics(t *testing.T, key string, doFunc pathPrefix: "/api/v1", rateLimiter: interceptor, backoff: interceptor, - retry: &withRetry{maxRetries: test.maxRetries}, + maxRetries: test.maxRetries, + retryFn: defaultRequestRetryFn, } doFunc(ctx, req) @@ -3140,7 +3146,7 @@ func testWithRetryInvokeOrder(t *testing.T, key string, doFunc func(ctx context. pathPrefix: "/api/v1", rateLimiter: flowcontrol.NewFakeAlwaysRateLimiter(), backoff: &NoBackoff{}, - retry: interceptor, + retryFn: func(_ int) WithRetry { return interceptor }, } doFunc(context.Background(), req) @@ -3315,7 +3321,8 @@ func testWithWrapPreviousError(t *testing.T, doFunc func(ctx context.Context, r pathPrefix: "/api/v1", rateLimiter: flowcontrol.NewFakeAlwaysRateLimiter(), backoff: &noSleepBackOff{}, - retry: &withRetry{maxRetries: test.maxRetries}, + maxRetries: test.maxRetries, + retryFn: defaultRequestRetryFn, } err = doFunc(context.Background(), req) @@ -3618,8 +3625,9 @@ func TestRequestBodyResetOrder(t *testing.T) { content: defaultContentConfig(), Client: client, }, - backoff: &noSleepBackOff{}, - retry: &withRetry{maxRetries: 1}, + backoff: &noSleepBackOff{}, + maxRetries: 1, + retryFn: defaultRequestRetryFn, } req.Do(context.Background()) diff --git a/staging/src/k8s.io/client-go/rest/with_retry.go b/staging/src/k8s.io/client-go/rest/with_retry.go index 3082959d186..497d2608f88 100644 --- a/staging/src/k8s.io/client-go/rest/with_retry.go +++ b/staging/src/k8s.io/client-go/rest/with_retry.go @@ -52,12 +52,6 @@ var neverRetryError = IsRetryableErrorFunc(func(_ *http.Request, _ error) bool { // Note that WithRetry is not safe for concurrent use by multiple // goroutines without additional locking or coordination. type WithRetry interface { - // SetMaxRetries makes the request use the specified integer as a ceiling - // for retries upon receiving a 429 status code and the "Retry-After" header - // in the response. - // A zero maxRetries should prevent from doing any retry and return immediately. - SetMaxRetries(maxRetries int) - // IsNextRetry advances the retry counter appropriately // and returns true if the request should be retried, // otherwise it returns false, if: @@ -144,13 +138,6 @@ type withRetry struct { previousErr, currentErr error } -func (r *withRetry) SetMaxRetries(maxRetries int) { - if maxRetries < 0 { - maxRetries = 0 - } - r.maxRetries = maxRetries -} - func (r *withRetry) trackPreviousError(err error) { // keep track of two most recent errors if r.currentErr != nil {