mirror of
https://github.com/kubernetes/client-go.git
synced 2025-09-08 18:39:16 +00:00
client-go: make retry in Request thread safe
Kubernetes-commit: 6618b8ef7c0b552839555d4578b64427d20524ef
This commit is contained in:
committed by
Kubernetes Publisher
parent
33011f1487
commit
d8531f5ff0
@@ -82,6 +82,12 @@ func (r *RequestConstructionError) Error() string {
|
||||
|
||||
var noBackoff = &NoBackoff{}
|
||||
|
||||
type requestRetryFunc func(maxRetries int) WithRetry
|
||||
|
||||
func defaultRequestRetryFn(maxRetries int) WithRetry {
|
||||
return &withRetry{maxRetries: maxRetries}
|
||||
}
|
||||
|
||||
// Request allows for building up a request to a server in a chained fashion.
|
||||
// Any errors are stored until the end of your call, so you only have to
|
||||
// check once.
|
||||
@@ -93,6 +99,7 @@ type Request struct {
|
||||
rateLimiter flowcontrol.RateLimiter
|
||||
backoff BackoffManager
|
||||
timeout time.Duration
|
||||
maxRetries int
|
||||
|
||||
// generic components accessible via method setters
|
||||
verb string
|
||||
@@ -109,9 +116,10 @@ type Request struct {
|
||||
subresource string
|
||||
|
||||
// output
|
||||
err error
|
||||
body io.Reader
|
||||
retry WithRetry
|
||||
err error
|
||||
body io.Reader
|
||||
|
||||
retryFn requestRetryFunc
|
||||
}
|
||||
|
||||
// NewRequest creates a new request helper object for accessing runtime.Objects on a server.
|
||||
@@ -142,7 +150,8 @@ func NewRequest(c *RESTClient) *Request {
|
||||
backoff: backoff,
|
||||
timeout: timeout,
|
||||
pathPrefix: pathPrefix,
|
||||
retry: &withRetry{maxRetries: 10},
|
||||
maxRetries: 10,
|
||||
retryFn: defaultRequestRetryFn,
|
||||
warningHandler: c.warningHandler,
|
||||
}
|
||||
|
||||
@@ -408,7 +417,10 @@ func (r *Request) Timeout(d time.Duration) *Request {
|
||||
// function is specifically called with a different value.
|
||||
// A zero maxRetries prevent it from doing retires and return an error immediately.
|
||||
func (r *Request) MaxRetries(maxRetries int) *Request {
|
||||
r.retry.SetMaxRetries(maxRetries)
|
||||
if maxRetries < 0 {
|
||||
maxRetries = 0
|
||||
}
|
||||
r.maxRetries = maxRetries
|
||||
return r
|
||||
}
|
||||
|
||||
@@ -612,19 +624,21 @@ func (r *Request) Watch(ctx context.Context) (watch.Interface, error) {
|
||||
}
|
||||
return false
|
||||
}
|
||||
retry := r.retryFn(r.maxRetries)
|
||||
url := r.URL().String()
|
||||
for {
|
||||
if err := r.retry.Before(ctx, r); err != nil {
|
||||
return nil, r.retry.WrapPreviousError(err)
|
||||
if err := retry.Before(ctx, r); err != nil {
|
||||
return nil, retry.WrapPreviousError(err)
|
||||
}
|
||||
|
||||
req, err := r.newHTTPRequest(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
resp, err := client.Do(req)
|
||||
updateURLMetrics(ctx, r, resp, err)
|
||||
r.retry.After(ctx, r, resp, err)
|
||||
retry.After(ctx, r, resp, err)
|
||||
if err == nil && resp.StatusCode == http.StatusOK {
|
||||
return r.newStreamWatcher(resp)
|
||||
}
|
||||
@@ -632,7 +646,7 @@ func (r *Request) Watch(ctx context.Context) (watch.Interface, error) {
|
||||
done, transformErr := func() (bool, error) {
|
||||
defer readAndCloseResponseBody(resp)
|
||||
|
||||
if r.retry.IsNextRetry(ctx, r, req, resp, err, isErrRetryableFunc) {
|
||||
if retry.IsNextRetry(ctx, r, req, resp, err, isErrRetryableFunc) {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
@@ -654,7 +668,7 @@ func (r *Request) Watch(ctx context.Context) (watch.Interface, error) {
|
||||
// we need to return the error object from that.
|
||||
err = transformErr
|
||||
}
|
||||
return nil, r.retry.WrapPreviousError(err)
|
||||
return nil, retry.WrapPreviousError(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -719,9 +733,10 @@ func (r *Request) Stream(ctx context.Context) (io.ReadCloser, error) {
|
||||
client = http.DefaultClient
|
||||
}
|
||||
|
||||
retry := r.retryFn(r.maxRetries)
|
||||
url := r.URL().String()
|
||||
for {
|
||||
if err := r.retry.Before(ctx, r); err != nil {
|
||||
if err := retry.Before(ctx, r); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -734,7 +749,7 @@ func (r *Request) Stream(ctx context.Context) (io.ReadCloser, error) {
|
||||
}
|
||||
resp, err := client.Do(req)
|
||||
updateURLMetrics(ctx, r, resp, err)
|
||||
r.retry.After(ctx, r, resp, err)
|
||||
retry.After(ctx, r, resp, err)
|
||||
if err != nil {
|
||||
// we only retry on an HTTP response with 'Retry-After' header
|
||||
return nil, err
|
||||
@@ -749,7 +764,7 @@ func (r *Request) Stream(ctx context.Context) (io.ReadCloser, error) {
|
||||
done, transformErr := func() (bool, error) {
|
||||
defer resp.Body.Close()
|
||||
|
||||
if r.retry.IsNextRetry(ctx, r, req, resp, err, neverRetryError) {
|
||||
if retry.IsNextRetry(ctx, r, req, resp, err, neverRetryError) {
|
||||
return false, nil
|
||||
}
|
||||
result := r.transformResponse(resp, req)
|
||||
@@ -856,9 +871,10 @@ func (r *Request) request(ctx context.Context, fn func(*http.Request, *http.Resp
|
||||
}
|
||||
|
||||
// Right now we make about ten retry attempts if we get a Retry-After response.
|
||||
retry := r.retryFn(r.maxRetries)
|
||||
for {
|
||||
if err := r.retry.Before(ctx, r); err != nil {
|
||||
return r.retry.WrapPreviousError(err)
|
||||
if err := retry.Before(ctx, r); err != nil {
|
||||
return retry.WrapPreviousError(err)
|
||||
}
|
||||
req, err := r.newHTTPRequest(ctx)
|
||||
if err != nil {
|
||||
@@ -871,7 +887,7 @@ func (r *Request) request(ctx context.Context, fn func(*http.Request, *http.Resp
|
||||
if req.ContentLength >= 0 && !(req.Body != nil && req.ContentLength == 0) {
|
||||
metrics.RequestSize.Observe(ctx, r.verb, r.URL().Host, float64(req.ContentLength))
|
||||
}
|
||||
r.retry.After(ctx, r, resp, err)
|
||||
retry.After(ctx, r, resp, err)
|
||||
|
||||
done := func() bool {
|
||||
defer readAndCloseResponseBody(resp)
|
||||
@@ -884,7 +900,7 @@ func (r *Request) request(ctx context.Context, fn func(*http.Request, *http.Resp
|
||||
fn(req, resp)
|
||||
}
|
||||
|
||||
if r.retry.IsNextRetry(ctx, r, req, resp, err, isErrRetryableFunc) {
|
||||
if retry.IsNextRetry(ctx, r, req, resp, err, isErrRetryableFunc) {
|
||||
return false
|
||||
}
|
||||
|
||||
@@ -892,7 +908,7 @@ func (r *Request) request(ctx context.Context, fn func(*http.Request, *http.Resp
|
||||
return true
|
||||
}()
|
||||
if done {
|
||||
return r.retry.WrapPreviousError(err)
|
||||
return retry.WrapPreviousError(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user