diff --git a/go.mod b/go.mod index d285878be..517f817fb 100644 --- a/go.mod +++ b/go.mod @@ -31,7 +31,7 @@ require ( golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac google.golang.org/protobuf v1.27.1 k8s.io/api v0.0.0-20220331200636-27e5860b52b6 - k8s.io/apimachinery v0.0.0-20220124172104-276a8a7530a3 + k8s.io/apimachinery v0.0.0-20220919103122-c74de7aa33d7 k8s.io/klog/v2 v2.30.0 k8s.io/kube-openapi v0.0.0-20211115234752-e816edb12b65 k8s.io/utils v0.0.0-20211116205334-6203023598ed @@ -41,5 +41,5 @@ require ( replace ( k8s.io/api => k8s.io/api v0.0.0-20220331200636-27e5860b52b6 - k8s.io/apimachinery => k8s.io/apimachinery v0.0.0-20220124172104-276a8a7530a3 + k8s.io/apimachinery => k8s.io/apimachinery v0.0.0-20220919103122-c74de7aa33d7 ) diff --git a/go.sum b/go.sum index 88544db4c..d2a3162db 100644 --- a/go.sum +++ b/go.sum @@ -612,8 +612,8 @@ honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9 honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k= k8s.io/api v0.0.0-20220331200636-27e5860b52b6 h1:4dgxatAVTPABorofxL+Z3U2MLAIA8NTrgecUAnCao6w= k8s.io/api v0.0.0-20220331200636-27e5860b52b6/go.mod h1:9Vo9KEQyesRPDP6eG+jDkliZvSar3OLe8f5zfqiRoec= -k8s.io/apimachinery v0.0.0-20220124172104-276a8a7530a3 h1:n2y4Rh6ixZSeru9imTOu4bYNuAObzwMBnUcPVFzaXnk= -k8s.io/apimachinery v0.0.0-20220124172104-276a8a7530a3/go.mod h1:BEuFMMBaIbcOqVIJqNZJXGFTP4W6AycEpb5+m/97hrM= +k8s.io/apimachinery v0.0.0-20220919103122-c74de7aa33d7 h1:Pr7Y4uDWlUJfc/WJKdECI/uiizC87vTHfwZeXar7rk0= +k8s.io/apimachinery v0.0.0-20220919103122-c74de7aa33d7/go.mod h1:BEuFMMBaIbcOqVIJqNZJXGFTP4W6AycEpb5+m/97hrM= k8s.io/gengo v0.0.0-20210813121822-485abfe95c7c/go.mod h1:FiNAH4ZV3gBg2Kwh89tzAEV2be7d5xI0vBa/VySYy3E= k8s.io/klog/v2 v2.0.0/go.mod h1:PBfzABfn139FHAV07az/IF9Wp1bkk3vpT2XSJ76fSDE= k8s.io/klog/v2 v2.2.0/go.mod h1:Od+F08eJP+W3HUb4pSrPpgp9DGU4GzlpG/TmITuYh/Y= diff --git a/rest/request.go b/rest/request.go index 5cc9900b0..2061fb543 100644 --- a/rest/request.go +++ b/rest/request.go @@ -82,6 +82,12 @@ func (r *RequestConstructionError) Error() string { var noBackoff = &NoBackoff{} +type requestRetryFunc func(maxRetries int) WithRetry + +func defaultRequestRetryFn(maxRetries int) WithRetry { + return &withRetry{maxRetries: maxRetries} +} + // Request allows for building up a request to a server in a chained fashion. // Any errors are stored until the end of your call, so you only have to // check once. @@ -93,6 +99,7 @@ type Request struct { rateLimiter flowcontrol.RateLimiter backoff BackoffManager timeout time.Duration + maxRetries int // generic components accessible via method setters verb string @@ -109,9 +116,10 @@ type Request struct { subresource string // output - err error - body io.Reader - retry WithRetry + err error + body io.Reader + + retryFn requestRetryFunc } // NewRequest creates a new request helper object for accessing runtime.Objects on a server. @@ -142,7 +150,8 @@ func NewRequest(c *RESTClient) *Request { backoff: backoff, timeout: timeout, pathPrefix: pathPrefix, - retry: &withRetry{maxRetries: 10}, + maxRetries: 10, + retryFn: defaultRequestRetryFn, warningHandler: c.warningHandler, } @@ -408,7 +417,10 @@ func (r *Request) Timeout(d time.Duration) *Request { // function is specifically called with a different value. // A zero maxRetries prevent it from doing retires and return an error immediately. func (r *Request) MaxRetries(maxRetries int) *Request { - r.retry.SetMaxRetries(maxRetries) + if maxRetries < 0 { + maxRetries = 0 + } + r.maxRetries = maxRetries return r } @@ -688,8 +700,10 @@ func (r *Request) Watch(ctx context.Context) (watch.Interface, error) { } return false } + var retryAfter *RetryAfter url := r.URL().String() + withRetry := r.retryFn(r.maxRetries) for { req, err := r.newHTTPRequest(ctx) if err != nil { @@ -724,9 +738,9 @@ func (r *Request) Watch(ctx context.Context) (watch.Interface, error) { defer readAndCloseResponseBody(resp) var retry bool - retryAfter, retry = r.retry.NextRetry(req, resp, err, isErrRetryableFunc) + retryAfter, retry = withRetry.NextRetry(req, resp, err, isErrRetryableFunc) if retry { - err := r.retry.BeforeNextRetry(ctx, r.backoff, retryAfter, url, r.body) + err := withRetry.BeforeNextRetry(ctx, r.backoff, retryAfter, url, r.body) if err == nil { return false, nil } @@ -817,6 +831,7 @@ func (r *Request) Stream(ctx context.Context) (io.ReadCloser, error) { } var retryAfter *RetryAfter + withRetry := r.retryFn(r.maxRetries) url := r.URL().String() for { req, err := r.newHTTPRequest(ctx) @@ -862,9 +877,9 @@ func (r *Request) Stream(ctx context.Context) (io.ReadCloser, error) { defer resp.Body.Close() var retry bool - retryAfter, retry = r.retry.NextRetry(req, resp, err, neverRetryError) + retryAfter, retry = withRetry.NextRetry(req, resp, err, neverRetryError) if retry { - err := r.retry.BeforeNextRetry(ctx, r.backoff, retryAfter, url, r.body) + err := withRetry.BeforeNextRetry(ctx, r.backoff, retryAfter, url, r.body) if err == nil { return false, nil } @@ -961,6 +976,7 @@ func (r *Request) request(ctx context.Context, fn func(*http.Request, *http.Resp // Right now we make about ten retry attempts if we get a Retry-After response. var retryAfter *RetryAfter + withRetry := r.retryFn(r.maxRetries) for { req, err := r.newHTTPRequest(ctx) if err != nil { @@ -997,7 +1013,7 @@ func (r *Request) request(ctx context.Context, fn func(*http.Request, *http.Resp } var retry bool - retryAfter, retry = r.retry.NextRetry(req, resp, err, func(req *http.Request, err error) bool { + retryAfter, retry = withRetry.NextRetry(req, resp, err, func(req *http.Request, err error) bool { // "Connection reset by peer" or "apiserver is shutting down" are usually a transient errors. // Thus in case of "GET" operations, we simply retry it. // We are not automatically retrying "write" operations, as they are not idempotent. @@ -1011,7 +1027,7 @@ func (r *Request) request(ctx context.Context, fn func(*http.Request, *http.Resp return false }) if retry { - err := r.retry.BeforeNextRetry(ctx, r.backoff, retryAfter, req.URL.String(), r.body) + err := withRetry.BeforeNextRetry(ctx, r.backoff, retryAfter, req.URL.String(), r.body) if err == nil { return false } diff --git a/rest/request_test.go b/rest/request_test.go index 8e9bb92b5..dc1c4e51a 100644 --- a/rest/request_test.go +++ b/rest/request_test.go @@ -32,6 +32,7 @@ import ( "reflect" "strings" "sync" + "sync/atomic" "syscall" "testing" "time" @@ -1194,7 +1195,8 @@ func TestRequestWatch(t *testing.T) { c.Client = client } testCase.Request.backoff = &noSleepBackOff{} - testCase.Request.retry = &withRetry{maxRetries: testCase.maxRetries} + testCase.Request.maxRetries = testCase.maxRetries + testCase.Request.retryFn = defaultRequestRetryFn watch, err := testCase.Request.Watch(context.Background()) @@ -1407,7 +1409,8 @@ func TestRequestStream(t *testing.T) { c.Client = client } testCase.Request.backoff = &noSleepBackOff{} - testCase.Request.retry = &withRetry{maxRetries: testCase.maxRetries} + testCase.Request.maxRetries = testCase.maxRetries + testCase.Request.retryFn = defaultRequestRetryFn body, err := testCase.Request.Stream(context.Background()) @@ -1462,7 +1465,7 @@ func TestRequestDo(t *testing.T) { } for i, testCase := range testCases { testCase.Request.backoff = &NoBackoff{} - testCase.Request.retry = &withRetry{} + testCase.Request.retryFn = defaultRequestRetryFn body, err := testCase.Request.Do(context.Background()).Raw() hasErr := err != nil if hasErr != testCase.Err { @@ -1625,8 +1628,9 @@ func TestConnectionResetByPeerIsRetried(t *testing.T) { return nil, &net.OpError{Err: syscall.ECONNRESET} }), }, - backoff: backoff, - retry: &withRetry{maxRetries: 10}, + backoff: backoff, + maxRetries: 10, + retryFn: defaultRequestRetryFn, } // We expect two retries of "connection reset by peer" and the success. _, err := req.Do(context.Background()).Raw() @@ -2699,8 +2703,9 @@ func TestRequestWithRetry(t *testing.T) { c: &RESTClient{ Client: client, }, - backoff: &noSleepBackOff{}, - retry: &withRetry{maxRetries: 1}, + backoff: &noSleepBackOff{}, + maxRetries: 1, + retryFn: defaultRequestRetryFn, } var transformFuncInvoked int @@ -2890,8 +2895,9 @@ func testRequestWithRetry(t *testing.T, key string, doFunc func(ctx context.Cont content: defaultContentConfig(), Client: client, }, - backoff: &noSleepBackOff{}, - retry: &withRetry{maxRetries: test.maxRetries}, + backoff: &noSleepBackOff{}, + maxRetries: test.maxRetries, + retryFn: defaultRequestRetryFn, } doFunc(context.Background(), req) @@ -3093,3 +3099,50 @@ func TestTransportConcurrency(t *testing.T) { }) } } + +func TestRequestConcurrencyWithRetry(t *testing.T) { + var attempts int32 + client := clientForFunc(func(req *http.Request) (*http.Response, error) { + defer func() { + atomic.AddInt32(&attempts, 1) + }() + + // always send a retry-after response + return &http.Response{ + StatusCode: http.StatusInternalServerError, + Header: http.Header{"Retry-After": []string{"1"}}, + }, nil + }) + + req := &Request{ + verb: "POST", + c: &RESTClient{ + content: defaultContentConfig(), + Client: client, + }, + backoff: &noSleepBackOff{}, + maxRetries: 9, // 10 attempts in total, including the first + retryFn: defaultRequestRetryFn, + } + + concurrency := 20 + wg := sync.WaitGroup{} + wg.Add(concurrency) + startCh := make(chan struct{}) + for i := 0; i < concurrency; i++ { + go func() { + defer wg.Done() + <-startCh + req.Do(context.Background()) + }() + } + + close(startCh) + wg.Wait() + + // we expect (concurrency*req.maxRetries+1) attempts to be recorded + expected := concurrency * (req.maxRetries + 1) + if atomic.LoadInt32(&attempts) != int32(expected) { + t.Errorf("Expected attempts: %d, but got: %d", expected, attempts) + } +}