From c4a6de2f338c145706ac3d18527aa8a760ff0dae Mon Sep 17 00:00:00 2001 From: Mike Danese Date: Mon, 27 Jan 2020 19:52:47 -0800 Subject: [PATCH] various context related cleanups to rest.Request * Move all usage of r.ctx to the beginning of Do, DoRaw, Stream, Watch * Move tryThrottle from Do and DoRaw into request() * Make request() and tryThrottle take a context * In request(), remove the timeout context setting out of the loop These changes should be entirely behavior preserving. Kubernetes-commit: d95ed2c8470158256466fb24728e63ac3afe0899 --- rest/request.go | 73 +++++++++++++++++++++++++++---------------------- 1 file changed, 40 insertions(+), 33 deletions(-) diff --git a/rest/request.go b/rest/request.go index 51b76e30..948d3d9c 100644 --- a/rest/request.go +++ b/rest/request.go @@ -548,18 +548,14 @@ func (r Request) finalURLTemplate() url.URL { return *url } -func (r *Request) tryThrottle() error { +func (r *Request) tryThrottle(ctx context.Context) error { if r.rateLimiter == nil { return nil } now := time.Now() - var err error - if r.ctx != nil { - err = r.rateLimiter.Wait(r.ctx) - } else { - r.rateLimiter.Accept() - } + + err := r.rateLimiter.Wait(ctx) if latency := time.Since(now); latency > longThrottleLatency { klog.V(3).Infof("Throttling request took %v, request: %s:%s", latency, r.verb, r.URL().String()) @@ -571,6 +567,11 @@ func (r *Request) tryThrottle() error { // Watch attempts to begin watching the requested location. // Returns a watch.Interface, or an error. func (r *Request) Watch() (watch.Interface, error) { + ctx := context.Background() + if r.ctx != nil { + ctx = r.ctx + } + // We specifically don't want to rate limit watches, so we // don't use r.rateLimiter here. if r.err != nil { @@ -582,9 +583,7 @@ func (r *Request) Watch() (watch.Interface, error) { if err != nil { return nil, err } - if r.ctx != nil { - req = req.WithContext(r.ctx) - } + req = req.WithContext(ctx) req.Header = r.headers client := r.c.Client if client == nil { @@ -660,11 +659,16 @@ func updateURLMetrics(req *Request, resp *http.Response, err error) { // Any non-2xx http status code causes an error. If we get a non-2xx code, we try to convert the body into an APIStatus object. // If we can, we return that as an error. Otherwise, we create an error that lists the http status and the content of the response. func (r *Request) Stream() (io.ReadCloser, error) { + ctx := context.Background() + if r.ctx != nil { + ctx = r.ctx + } + if r.err != nil { return nil, r.err } - if err := r.tryThrottle(); err != nil { + if err := r.tryThrottle(ctx); err != nil { return nil, err } @@ -676,9 +680,7 @@ func (r *Request) Stream() (io.ReadCloser, error) { if r.body != nil { req.Body = ioutil.NopCloser(r.body) } - if r.ctx != nil { - req = req.WithContext(r.ctx) - } + req = req.WithContext(ctx) req.Header = r.headers client := r.c.Client if client == nil { @@ -746,7 +748,7 @@ func (r *Request) requestPreflightCheck() error { // received. It handles retry behavior and up front validation of requests. It will invoke // fn at most once. It will return an error if a problem occurred prior to connecting to the // server - the provided function is responsible for handling server errors. -func (r *Request) request(fn func(*http.Request, *http.Response)) error { +func (r *Request) request(ctx context.Context, fn func(*http.Request, *http.Response)) error { //Metrics for total request latency start := time.Now() defer func() { @@ -767,6 +769,19 @@ func (r *Request) request(fn func(*http.Request, *http.Response)) error { client = http.DefaultClient } + // Throttle the first try before setting up the timeout configured on the + // client. We don't want a throttled client to return timeouts to callers + // before it makes a single request. + if err := r.tryThrottle(ctx); err != nil { + return err + } + + if r.timeout > 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, r.timeout) + defer cancel() + } + // Right now we make about ten retry attempts if we get a Retry-After response. maxRetries := 10 retries := 0 @@ -776,17 +791,7 @@ func (r *Request) request(fn func(*http.Request, *http.Response)) error { if err != nil { return err } - if r.timeout > 0 { - if r.ctx == nil { - r.ctx = context.Background() - } - var cancelFn context.CancelFunc - r.ctx, cancelFn = context.WithTimeout(r.ctx, r.timeout) - defer cancelFn() - } - if r.ctx != nil { - req = req.WithContext(r.ctx) - } + req = req.WithContext(ctx) req.Header = r.headers r.backoff.Sleep(r.backoff.CalculateBackoff(r.URL())) @@ -794,7 +799,7 @@ func (r *Request) request(fn func(*http.Request, *http.Response)) error { // We are retrying the request that we already send to apiserver // at least once before. // This request should also be throttled with the client-internal rate limiter. - if err := r.tryThrottle(); err != nil { + if err := r.tryThrottle(ctx); err != nil { return err } } @@ -870,12 +875,13 @@ func (r *Request) request(fn func(*http.Request, *http.Response)) error { // * If the server responds with a status: *errors.StatusError or *errors.UnexpectedObjectError // * http.Client.Do errors are returned directly. func (r *Request) Do() Result { - if err := r.tryThrottle(); err != nil { - return Result{err: err} + ctx := context.Background() + if r.ctx != nil { + ctx = r.ctx } var result Result - err := r.request(func(req *http.Request, resp *http.Response) { + err := r.request(ctx, func(req *http.Request, resp *http.Response) { result = r.transformResponse(resp, req) }) if err != nil { @@ -886,12 +892,13 @@ func (r *Request) Do() Result { // DoRaw executes the request but does not process the response body. func (r *Request) DoRaw() ([]byte, error) { - if err := r.tryThrottle(); err != nil { - return nil, err + ctx := context.Background() + if r.ctx != nil { + ctx = r.ctx } var result Result - err := r.request(func(req *http.Request, resp *http.Response) { + err := r.request(ctx, func(req *http.Request, resp *http.Response) { result.body, result.err = ioutil.ReadAll(resp.Body) glogBody("Response Body", result.body) if resp.StatusCode < http.StatusOK || resp.StatusCode > http.StatusPartialContent {