mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-21 10:51:29 +00:00
Merge pull request #109114 from tkashem/client-go-retry-thread-safe
client-go: make retry in Request thread safe
This commit is contained in:
commit
0424c7c74d
@ -82,6 +82,12 @@ func (r *RequestConstructionError) Error() string {
|
|||||||
|
|
||||||
var noBackoff = &NoBackoff{}
|
var noBackoff = &NoBackoff{}
|
||||||
|
|
||||||
|
type requestRetryFunc func(maxRetries int) WithRetry
|
||||||
|
|
||||||
|
func defaultRequestRetryFn(maxRetries int) WithRetry {
|
||||||
|
return &withRetry{maxRetries: maxRetries}
|
||||||
|
}
|
||||||
|
|
||||||
// Request allows for building up a request to a server in a chained fashion.
|
// Request allows for building up a request to a server in a chained fashion.
|
||||||
// Any errors are stored until the end of your call, so you only have to
|
// Any errors are stored until the end of your call, so you only have to
|
||||||
// check once.
|
// check once.
|
||||||
@ -93,6 +99,7 @@ type Request struct {
|
|||||||
rateLimiter flowcontrol.RateLimiter
|
rateLimiter flowcontrol.RateLimiter
|
||||||
backoff BackoffManager
|
backoff BackoffManager
|
||||||
timeout time.Duration
|
timeout time.Duration
|
||||||
|
maxRetries int
|
||||||
|
|
||||||
// generic components accessible via method setters
|
// generic components accessible via method setters
|
||||||
verb string
|
verb string
|
||||||
@ -109,9 +116,10 @@ type Request struct {
|
|||||||
subresource string
|
subresource string
|
||||||
|
|
||||||
// output
|
// output
|
||||||
err error
|
err error
|
||||||
body io.Reader
|
body io.Reader
|
||||||
retry WithRetry
|
|
||||||
|
retryFn requestRetryFunc
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewRequest creates a new request helper object for accessing runtime.Objects on a server.
|
// NewRequest creates a new request helper object for accessing runtime.Objects on a server.
|
||||||
@ -142,7 +150,8 @@ func NewRequest(c *RESTClient) *Request {
|
|||||||
backoff: backoff,
|
backoff: backoff,
|
||||||
timeout: timeout,
|
timeout: timeout,
|
||||||
pathPrefix: pathPrefix,
|
pathPrefix: pathPrefix,
|
||||||
retry: &withRetry{maxRetries: 10},
|
maxRetries: 10,
|
||||||
|
retryFn: defaultRequestRetryFn,
|
||||||
warningHandler: c.warningHandler,
|
warningHandler: c.warningHandler,
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -408,7 +417,10 @@ func (r *Request) Timeout(d time.Duration) *Request {
|
|||||||
// function is specifically called with a different value.
|
// function is specifically called with a different value.
|
||||||
// A zero maxRetries prevent it from doing retires and return an error immediately.
|
// A zero maxRetries prevent it from doing retires and return an error immediately.
|
||||||
func (r *Request) MaxRetries(maxRetries int) *Request {
|
func (r *Request) MaxRetries(maxRetries int) *Request {
|
||||||
r.retry.SetMaxRetries(maxRetries)
|
if maxRetries < 0 {
|
||||||
|
maxRetries = 0
|
||||||
|
}
|
||||||
|
r.maxRetries = maxRetries
|
||||||
return r
|
return r
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -612,19 +624,21 @@ func (r *Request) Watch(ctx context.Context) (watch.Interface, error) {
|
|||||||
}
|
}
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
retry := r.retryFn(r.maxRetries)
|
||||||
url := r.URL().String()
|
url := r.URL().String()
|
||||||
for {
|
for {
|
||||||
if err := r.retry.Before(ctx, r); err != nil {
|
if err := retry.Before(ctx, r); err != nil {
|
||||||
return nil, r.retry.WrapPreviousError(err)
|
return nil, retry.WrapPreviousError(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
req, err := r.newHTTPRequest(ctx)
|
req, err := r.newHTTPRequest(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
resp, err := client.Do(req)
|
resp, err := client.Do(req)
|
||||||
updateURLMetrics(ctx, r, resp, err)
|
updateURLMetrics(ctx, r, resp, err)
|
||||||
r.retry.After(ctx, r, resp, err)
|
retry.After(ctx, r, resp, err)
|
||||||
if err == nil && resp.StatusCode == http.StatusOK {
|
if err == nil && resp.StatusCode == http.StatusOK {
|
||||||
return r.newStreamWatcher(resp)
|
return r.newStreamWatcher(resp)
|
||||||
}
|
}
|
||||||
@ -632,7 +646,7 @@ func (r *Request) Watch(ctx context.Context) (watch.Interface, error) {
|
|||||||
done, transformErr := func() (bool, error) {
|
done, transformErr := func() (bool, error) {
|
||||||
defer readAndCloseResponseBody(resp)
|
defer readAndCloseResponseBody(resp)
|
||||||
|
|
||||||
if r.retry.IsNextRetry(ctx, r, req, resp, err, isErrRetryableFunc) {
|
if retry.IsNextRetry(ctx, r, req, resp, err, isErrRetryableFunc) {
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -654,7 +668,7 @@ func (r *Request) Watch(ctx context.Context) (watch.Interface, error) {
|
|||||||
// we need to return the error object from that.
|
// we need to return the error object from that.
|
||||||
err = transformErr
|
err = transformErr
|
||||||
}
|
}
|
||||||
return nil, r.retry.WrapPreviousError(err)
|
return nil, retry.WrapPreviousError(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -719,9 +733,10 @@ func (r *Request) Stream(ctx context.Context) (io.ReadCloser, error) {
|
|||||||
client = http.DefaultClient
|
client = http.DefaultClient
|
||||||
}
|
}
|
||||||
|
|
||||||
|
retry := r.retryFn(r.maxRetries)
|
||||||
url := r.URL().String()
|
url := r.URL().String()
|
||||||
for {
|
for {
|
||||||
if err := r.retry.Before(ctx, r); err != nil {
|
if err := retry.Before(ctx, r); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -734,7 +749,7 @@ func (r *Request) Stream(ctx context.Context) (io.ReadCloser, error) {
|
|||||||
}
|
}
|
||||||
resp, err := client.Do(req)
|
resp, err := client.Do(req)
|
||||||
updateURLMetrics(ctx, r, resp, err)
|
updateURLMetrics(ctx, r, resp, err)
|
||||||
r.retry.After(ctx, r, resp, err)
|
retry.After(ctx, r, resp, err)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// we only retry on an HTTP response with 'Retry-After' header
|
// we only retry on an HTTP response with 'Retry-After' header
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -749,7 +764,7 @@ func (r *Request) Stream(ctx context.Context) (io.ReadCloser, error) {
|
|||||||
done, transformErr := func() (bool, error) {
|
done, transformErr := func() (bool, error) {
|
||||||
defer resp.Body.Close()
|
defer resp.Body.Close()
|
||||||
|
|
||||||
if r.retry.IsNextRetry(ctx, r, req, resp, err, neverRetryError) {
|
if retry.IsNextRetry(ctx, r, req, resp, err, neverRetryError) {
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
result := r.transformResponse(resp, req)
|
result := r.transformResponse(resp, req)
|
||||||
@ -856,9 +871,10 @@ func (r *Request) request(ctx context.Context, fn func(*http.Request, *http.Resp
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Right now we make about ten retry attempts if we get a Retry-After response.
|
// Right now we make about ten retry attempts if we get a Retry-After response.
|
||||||
|
retry := r.retryFn(r.maxRetries)
|
||||||
for {
|
for {
|
||||||
if err := r.retry.Before(ctx, r); err != nil {
|
if err := retry.Before(ctx, r); err != nil {
|
||||||
return r.retry.WrapPreviousError(err)
|
return retry.WrapPreviousError(err)
|
||||||
}
|
}
|
||||||
req, err := r.newHTTPRequest(ctx)
|
req, err := r.newHTTPRequest(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -871,7 +887,7 @@ func (r *Request) request(ctx context.Context, fn func(*http.Request, *http.Resp
|
|||||||
if req.ContentLength >= 0 && !(req.Body != nil && req.ContentLength == 0) {
|
if req.ContentLength >= 0 && !(req.Body != nil && req.ContentLength == 0) {
|
||||||
metrics.RequestSize.Observe(ctx, r.verb, r.URL().Host, float64(req.ContentLength))
|
metrics.RequestSize.Observe(ctx, r.verb, r.URL().Host, float64(req.ContentLength))
|
||||||
}
|
}
|
||||||
r.retry.After(ctx, r, resp, err)
|
retry.After(ctx, r, resp, err)
|
||||||
|
|
||||||
done := func() bool {
|
done := func() bool {
|
||||||
defer readAndCloseResponseBody(resp)
|
defer readAndCloseResponseBody(resp)
|
||||||
@ -884,7 +900,7 @@ func (r *Request) request(ctx context.Context, fn func(*http.Request, *http.Resp
|
|||||||
fn(req, resp)
|
fn(req, resp)
|
||||||
}
|
}
|
||||||
|
|
||||||
if r.retry.IsNextRetry(ctx, r, req, resp, err, isErrRetryableFunc) {
|
if retry.IsNextRetry(ctx, r, req, resp, err, isErrRetryableFunc) {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -892,7 +908,7 @@ func (r *Request) request(ctx context.Context, fn func(*http.Request, *http.Resp
|
|||||||
return true
|
return true
|
||||||
}()
|
}()
|
||||||
if done {
|
if done {
|
||||||
return r.retry.WrapPreviousError(err)
|
return retry.WrapPreviousError(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -998,7 +998,8 @@ func TestRequestWatch(t *testing.T) {
|
|||||||
c.Client = client
|
c.Client = client
|
||||||
}
|
}
|
||||||
testCase.Request.backoff = &noSleepBackOff{}
|
testCase.Request.backoff = &noSleepBackOff{}
|
||||||
testCase.Request.retry = &withRetry{maxRetries: testCase.maxRetries}
|
testCase.Request.maxRetries = testCase.maxRetries
|
||||||
|
testCase.Request.retryFn = defaultRequestRetryFn
|
||||||
|
|
||||||
watch, err := testCase.Request.Watch(context.Background())
|
watch, err := testCase.Request.Watch(context.Background())
|
||||||
|
|
||||||
@ -1211,7 +1212,8 @@ func TestRequestStream(t *testing.T) {
|
|||||||
c.Client = client
|
c.Client = client
|
||||||
}
|
}
|
||||||
testCase.Request.backoff = &noSleepBackOff{}
|
testCase.Request.backoff = &noSleepBackOff{}
|
||||||
testCase.Request.retry = &withRetry{maxRetries: testCase.maxRetries}
|
testCase.Request.maxRetries = testCase.maxRetries
|
||||||
|
testCase.Request.retryFn = defaultRequestRetryFn
|
||||||
|
|
||||||
body, err := testCase.Request.Stream(context.Background())
|
body, err := testCase.Request.Stream(context.Background())
|
||||||
|
|
||||||
@ -1266,7 +1268,7 @@ func TestRequestDo(t *testing.T) {
|
|||||||
}
|
}
|
||||||
for i, testCase := range testCases {
|
for i, testCase := range testCases {
|
||||||
testCase.Request.backoff = &NoBackoff{}
|
testCase.Request.backoff = &NoBackoff{}
|
||||||
testCase.Request.retry = &withRetry{}
|
testCase.Request.retryFn = defaultRequestRetryFn
|
||||||
body, err := testCase.Request.Do(context.Background()).Raw()
|
body, err := testCase.Request.Do(context.Background()).Raw()
|
||||||
hasErr := err != nil
|
hasErr := err != nil
|
||||||
if hasErr != testCase.Err {
|
if hasErr != testCase.Err {
|
||||||
@ -1429,8 +1431,9 @@ func TestConnectionResetByPeerIsRetried(t *testing.T) {
|
|||||||
return nil, &net.OpError{Err: syscall.ECONNRESET}
|
return nil, &net.OpError{Err: syscall.ECONNRESET}
|
||||||
}),
|
}),
|
||||||
},
|
},
|
||||||
backoff: backoff,
|
backoff: backoff,
|
||||||
retry: &withRetry{maxRetries: 10},
|
maxRetries: 10,
|
||||||
|
retryFn: defaultRequestRetryFn,
|
||||||
}
|
}
|
||||||
// We expect two retries of "connection reset by peer" and the success.
|
// We expect two retries of "connection reset by peer" and the success.
|
||||||
_, err := req.Do(context.Background()).Raw()
|
_, err := req.Do(context.Background()).Raw()
|
||||||
@ -2504,8 +2507,9 @@ func TestRequestWithRetry(t *testing.T) {
|
|||||||
c: &RESTClient{
|
c: &RESTClient{
|
||||||
Client: client,
|
Client: client,
|
||||||
},
|
},
|
||||||
backoff: &noSleepBackOff{},
|
backoff: &noSleepBackOff{},
|
||||||
retry: &withRetry{maxRetries: 1},
|
maxRetries: 1,
|
||||||
|
retryFn: defaultRequestRetryFn,
|
||||||
}
|
}
|
||||||
|
|
||||||
var transformFuncInvoked int
|
var transformFuncInvoked int
|
||||||
@ -2782,8 +2786,9 @@ func testRequestWithRetry(t *testing.T, key string, doFunc func(ctx context.Cont
|
|||||||
content: defaultContentConfig(),
|
content: defaultContentConfig(),
|
||||||
Client: client,
|
Client: client,
|
||||||
},
|
},
|
||||||
backoff: &noSleepBackOff{},
|
backoff: &noSleepBackOff{},
|
||||||
retry: &withRetry{maxRetries: test.maxRetries},
|
maxRetries: test.maxRetries,
|
||||||
|
retryFn: defaultRequestRetryFn,
|
||||||
}
|
}
|
||||||
|
|
||||||
doFunc(context.Background(), req)
|
doFunc(context.Background(), req)
|
||||||
@ -3006,7 +3011,8 @@ func testRetryWithRateLimiterBackoffAndMetrics(t *testing.T, key string, doFunc
|
|||||||
pathPrefix: "/api/v1",
|
pathPrefix: "/api/v1",
|
||||||
rateLimiter: interceptor,
|
rateLimiter: interceptor,
|
||||||
backoff: interceptor,
|
backoff: interceptor,
|
||||||
retry: &withRetry{maxRetries: test.maxRetries},
|
maxRetries: test.maxRetries,
|
||||||
|
retryFn: defaultRequestRetryFn,
|
||||||
}
|
}
|
||||||
|
|
||||||
doFunc(ctx, req)
|
doFunc(ctx, req)
|
||||||
@ -3140,7 +3146,7 @@ func testWithRetryInvokeOrder(t *testing.T, key string, doFunc func(ctx context.
|
|||||||
pathPrefix: "/api/v1",
|
pathPrefix: "/api/v1",
|
||||||
rateLimiter: flowcontrol.NewFakeAlwaysRateLimiter(),
|
rateLimiter: flowcontrol.NewFakeAlwaysRateLimiter(),
|
||||||
backoff: &NoBackoff{},
|
backoff: &NoBackoff{},
|
||||||
retry: interceptor,
|
retryFn: func(_ int) WithRetry { return interceptor },
|
||||||
}
|
}
|
||||||
|
|
||||||
doFunc(context.Background(), req)
|
doFunc(context.Background(), req)
|
||||||
@ -3315,7 +3321,8 @@ func testWithWrapPreviousError(t *testing.T, doFunc func(ctx context.Context, r
|
|||||||
pathPrefix: "/api/v1",
|
pathPrefix: "/api/v1",
|
||||||
rateLimiter: flowcontrol.NewFakeAlwaysRateLimiter(),
|
rateLimiter: flowcontrol.NewFakeAlwaysRateLimiter(),
|
||||||
backoff: &noSleepBackOff{},
|
backoff: &noSleepBackOff{},
|
||||||
retry: &withRetry{maxRetries: test.maxRetries},
|
maxRetries: test.maxRetries,
|
||||||
|
retryFn: defaultRequestRetryFn,
|
||||||
}
|
}
|
||||||
|
|
||||||
err = doFunc(context.Background(), req)
|
err = doFunc(context.Background(), req)
|
||||||
@ -3618,8 +3625,9 @@ func TestRequestBodyResetOrder(t *testing.T) {
|
|||||||
content: defaultContentConfig(),
|
content: defaultContentConfig(),
|
||||||
Client: client,
|
Client: client,
|
||||||
},
|
},
|
||||||
backoff: &noSleepBackOff{},
|
backoff: &noSleepBackOff{},
|
||||||
retry: &withRetry{maxRetries: 1},
|
maxRetries: 1,
|
||||||
|
retryFn: defaultRequestRetryFn,
|
||||||
}
|
}
|
||||||
|
|
||||||
req.Do(context.Background())
|
req.Do(context.Background())
|
||||||
|
@ -52,12 +52,6 @@ var neverRetryError = IsRetryableErrorFunc(func(_ *http.Request, _ error) bool {
|
|||||||
// Note that WithRetry is not safe for concurrent use by multiple
|
// Note that WithRetry is not safe for concurrent use by multiple
|
||||||
// goroutines without additional locking or coordination.
|
// goroutines without additional locking or coordination.
|
||||||
type WithRetry interface {
|
type WithRetry interface {
|
||||||
// SetMaxRetries makes the request use the specified integer as a ceiling
|
|
||||||
// for retries upon receiving a 429 status code and the "Retry-After" header
|
|
||||||
// in the response.
|
|
||||||
// A zero maxRetries should prevent from doing any retry and return immediately.
|
|
||||||
SetMaxRetries(maxRetries int)
|
|
||||||
|
|
||||||
// IsNextRetry advances the retry counter appropriately
|
// IsNextRetry advances the retry counter appropriately
|
||||||
// and returns true if the request should be retried,
|
// and returns true if the request should be retried,
|
||||||
// otherwise it returns false, if:
|
// otherwise it returns false, if:
|
||||||
@ -144,13 +138,6 @@ type withRetry struct {
|
|||||||
previousErr, currentErr error
|
previousErr, currentErr error
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *withRetry) SetMaxRetries(maxRetries int) {
|
|
||||||
if maxRetries < 0 {
|
|
||||||
maxRetries = 0
|
|
||||||
}
|
|
||||||
r.maxRetries = maxRetries
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *withRetry) trackPreviousError(err error) {
|
func (r *withRetry) trackPreviousError(err error) {
|
||||||
// keep track of two most recent errors
|
// keep track of two most recent errors
|
||||||
if r.currentErr != nil {
|
if r.currentErr != nil {
|
||||||
|
Loading…
Reference in New Issue
Block a user