mirror of
https://github.com/kubernetes/client-go.git
synced 2025-07-06 11:46:36 +00:00
various context related cleanups to rest.Request
* Move all usage of r.ctx to the beginning of Do, DoRaw, Stream, Watch * Move tryThrottle from Do and DoRaw into request() * Make request() and tryThrottle take a context * In request(), remove the timeout context setting out of the loop These changes should be entirely behavior preserving. Kubernetes-commit: d95ed2c8470158256466fb24728e63ac3afe0899
This commit is contained in:
parent
996f3529d3
commit
c4a6de2f33
@ -548,18 +548,14 @@ func (r Request) finalURLTemplate() url.URL {
|
|||||||
return *url
|
return *url
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Request) tryThrottle() error {
|
func (r *Request) tryThrottle(ctx context.Context) error {
|
||||||
if r.rateLimiter == nil {
|
if r.rateLimiter == nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
var err error
|
|
||||||
if r.ctx != nil {
|
err := r.rateLimiter.Wait(ctx)
|
||||||
err = r.rateLimiter.Wait(r.ctx)
|
|
||||||
} else {
|
|
||||||
r.rateLimiter.Accept()
|
|
||||||
}
|
|
||||||
|
|
||||||
if latency := time.Since(now); latency > longThrottleLatency {
|
if latency := time.Since(now); latency > longThrottleLatency {
|
||||||
klog.V(3).Infof("Throttling request took %v, request: %s:%s", latency, r.verb, r.URL().String())
|
klog.V(3).Infof("Throttling request took %v, request: %s:%s", latency, r.verb, r.URL().String())
|
||||||
@ -571,6 +567,11 @@ func (r *Request) tryThrottle() error {
|
|||||||
// Watch attempts to begin watching the requested location.
|
// Watch attempts to begin watching the requested location.
|
||||||
// Returns a watch.Interface, or an error.
|
// Returns a watch.Interface, or an error.
|
||||||
func (r *Request) Watch() (watch.Interface, error) {
|
func (r *Request) Watch() (watch.Interface, error) {
|
||||||
|
ctx := context.Background()
|
||||||
|
if r.ctx != nil {
|
||||||
|
ctx = r.ctx
|
||||||
|
}
|
||||||
|
|
||||||
// We specifically don't want to rate limit watches, so we
|
// We specifically don't want to rate limit watches, so we
|
||||||
// don't use r.rateLimiter here.
|
// don't use r.rateLimiter here.
|
||||||
if r.err != nil {
|
if r.err != nil {
|
||||||
@ -582,9 +583,7 @@ func (r *Request) Watch() (watch.Interface, error) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if r.ctx != nil {
|
req = req.WithContext(ctx)
|
||||||
req = req.WithContext(r.ctx)
|
|
||||||
}
|
|
||||||
req.Header = r.headers
|
req.Header = r.headers
|
||||||
client := r.c.Client
|
client := r.c.Client
|
||||||
if client == nil {
|
if client == nil {
|
||||||
@ -660,11 +659,16 @@ func updateURLMetrics(req *Request, resp *http.Response, err error) {
|
|||||||
// Any non-2xx http status code causes an error. If we get a non-2xx code, we try to convert the body into an APIStatus object.
|
// Any non-2xx http status code causes an error. If we get a non-2xx code, we try to convert the body into an APIStatus object.
|
||||||
// If we can, we return that as an error. Otherwise, we create an error that lists the http status and the content of the response.
|
// If we can, we return that as an error. Otherwise, we create an error that lists the http status and the content of the response.
|
||||||
func (r *Request) Stream() (io.ReadCloser, error) {
|
func (r *Request) Stream() (io.ReadCloser, error) {
|
||||||
|
ctx := context.Background()
|
||||||
|
if r.ctx != nil {
|
||||||
|
ctx = r.ctx
|
||||||
|
}
|
||||||
|
|
||||||
if r.err != nil {
|
if r.err != nil {
|
||||||
return nil, r.err
|
return nil, r.err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := r.tryThrottle(); err != nil {
|
if err := r.tryThrottle(ctx); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -676,9 +680,7 @@ func (r *Request) Stream() (io.ReadCloser, error) {
|
|||||||
if r.body != nil {
|
if r.body != nil {
|
||||||
req.Body = ioutil.NopCloser(r.body)
|
req.Body = ioutil.NopCloser(r.body)
|
||||||
}
|
}
|
||||||
if r.ctx != nil {
|
req = req.WithContext(ctx)
|
||||||
req = req.WithContext(r.ctx)
|
|
||||||
}
|
|
||||||
req.Header = r.headers
|
req.Header = r.headers
|
||||||
client := r.c.Client
|
client := r.c.Client
|
||||||
if client == nil {
|
if client == nil {
|
||||||
@ -746,7 +748,7 @@ func (r *Request) requestPreflightCheck() error {
|
|||||||
// received. It handles retry behavior and up front validation of requests. It will invoke
|
// received. It handles retry behavior and up front validation of requests. It will invoke
|
||||||
// fn at most once. It will return an error if a problem occurred prior to connecting to the
|
// fn at most once. It will return an error if a problem occurred prior to connecting to the
|
||||||
// server - the provided function is responsible for handling server errors.
|
// server - the provided function is responsible for handling server errors.
|
||||||
func (r *Request) request(fn func(*http.Request, *http.Response)) error {
|
func (r *Request) request(ctx context.Context, fn func(*http.Request, *http.Response)) error {
|
||||||
//Metrics for total request latency
|
//Metrics for total request latency
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
defer func() {
|
defer func() {
|
||||||
@ -767,6 +769,19 @@ func (r *Request) request(fn func(*http.Request, *http.Response)) error {
|
|||||||
client = http.DefaultClient
|
client = http.DefaultClient
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Throttle the first try before setting up the timeout configured on the
|
||||||
|
// client. We don't want a throttled client to return timeouts to callers
|
||||||
|
// before it makes a single request.
|
||||||
|
if err := r.tryThrottle(ctx); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if r.timeout > 0 {
|
||||||
|
var cancel context.CancelFunc
|
||||||
|
ctx, cancel = context.WithTimeout(ctx, r.timeout)
|
||||||
|
defer cancel()
|
||||||
|
}
|
||||||
|
|
||||||
// Right now we make about ten retry attempts if we get a Retry-After response.
|
// Right now we make about ten retry attempts if we get a Retry-After response.
|
||||||
maxRetries := 10
|
maxRetries := 10
|
||||||
retries := 0
|
retries := 0
|
||||||
@ -776,17 +791,7 @@ func (r *Request) request(fn func(*http.Request, *http.Response)) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if r.timeout > 0 {
|
req = req.WithContext(ctx)
|
||||||
if r.ctx == nil {
|
|
||||||
r.ctx = context.Background()
|
|
||||||
}
|
|
||||||
var cancelFn context.CancelFunc
|
|
||||||
r.ctx, cancelFn = context.WithTimeout(r.ctx, r.timeout)
|
|
||||||
defer cancelFn()
|
|
||||||
}
|
|
||||||
if r.ctx != nil {
|
|
||||||
req = req.WithContext(r.ctx)
|
|
||||||
}
|
|
||||||
req.Header = r.headers
|
req.Header = r.headers
|
||||||
|
|
||||||
r.backoff.Sleep(r.backoff.CalculateBackoff(r.URL()))
|
r.backoff.Sleep(r.backoff.CalculateBackoff(r.URL()))
|
||||||
@ -794,7 +799,7 @@ func (r *Request) request(fn func(*http.Request, *http.Response)) error {
|
|||||||
// We are retrying the request that we already send to apiserver
|
// We are retrying the request that we already send to apiserver
|
||||||
// at least once before.
|
// at least once before.
|
||||||
// This request should also be throttled with the client-internal rate limiter.
|
// This request should also be throttled with the client-internal rate limiter.
|
||||||
if err := r.tryThrottle(); err != nil {
|
if err := r.tryThrottle(ctx); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -870,12 +875,13 @@ func (r *Request) request(fn func(*http.Request, *http.Response)) error {
|
|||||||
// * If the server responds with a status: *errors.StatusError or *errors.UnexpectedObjectError
|
// * If the server responds with a status: *errors.StatusError or *errors.UnexpectedObjectError
|
||||||
// * http.Client.Do errors are returned directly.
|
// * http.Client.Do errors are returned directly.
|
||||||
func (r *Request) Do() Result {
|
func (r *Request) Do() Result {
|
||||||
if err := r.tryThrottle(); err != nil {
|
ctx := context.Background()
|
||||||
return Result{err: err}
|
if r.ctx != nil {
|
||||||
|
ctx = r.ctx
|
||||||
}
|
}
|
||||||
|
|
||||||
var result Result
|
var result Result
|
||||||
err := r.request(func(req *http.Request, resp *http.Response) {
|
err := r.request(ctx, func(req *http.Request, resp *http.Response) {
|
||||||
result = r.transformResponse(resp, req)
|
result = r.transformResponse(resp, req)
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -886,12 +892,13 @@ func (r *Request) Do() Result {
|
|||||||
|
|
||||||
// DoRaw executes the request but does not process the response body.
|
// DoRaw executes the request but does not process the response body.
|
||||||
func (r *Request) DoRaw() ([]byte, error) {
|
func (r *Request) DoRaw() ([]byte, error) {
|
||||||
if err := r.tryThrottle(); err != nil {
|
ctx := context.Background()
|
||||||
return nil, err
|
if r.ctx != nil {
|
||||||
|
ctx = r.ctx
|
||||||
}
|
}
|
||||||
|
|
||||||
var result Result
|
var result Result
|
||||||
err := r.request(func(req *http.Request, resp *http.Response) {
|
err := r.request(ctx, func(req *http.Request, resp *http.Response) {
|
||||||
result.body, result.err = ioutil.ReadAll(resp.Body)
|
result.body, result.err = ioutil.ReadAll(resp.Body)
|
||||||
glogBody("Response Body", result.body)
|
glogBody("Response Body", result.body)
|
||||||
if resp.StatusCode < http.StatusOK || resp.StatusCode > http.StatusPartialContent {
|
if resp.StatusCode < http.StatusOK || resp.StatusCode > http.StatusPartialContent {
|
||||||
|
Loading…
Reference in New Issue
Block a user