diff --git a/pkg/client/request.go b/pkg/client/request.go index 41d52b18be4..d87b44dab21 100644 --- a/pkg/client/request.go +++ b/pkg/client/request.go @@ -510,8 +510,8 @@ func (r *Request) Watch() (watch.Interface, error) { return nil, err } if resp.StatusCode != http.StatusOK { - if _, _, err := r.transformResponse(resp, req, nil); err != nil { - return nil, err + if result := r.transformResponse(resp, req); result.err != nil { + return nil, result.err } return nil, fmt.Errorf("for request '%+v', got status: %v", req.URL, resp.StatusCode) } @@ -619,8 +619,23 @@ func (r *Request) Upgrade(config *Config, newRoundTripperFunc func(*tls.Config) return upgradeRoundTripper.NewConnection(resp) } -// DoRaw executes a raw request which is not subject to interpretation as an API response. -func (r *Request) DoRaw() ([]byte, error) { +// request connects to the server and invokes the provided function when a server response is +// received. It handles retry behavior and up front validation of requests. It wil invoke +// fn at most once. It will return an error if a problem occured prior to connecting to the +// server - the provided function is responsible for handling server errors. +func (r *Request) request(fn func(*http.Request, *http.Response)) error { + if r.err != nil { + return r.err + } + + // TODO: added to catch programmer errors (invoking operations with an object with an empty namespace) + if (r.verb == "GET" || r.verb == "PUT" || r.verb == "DELETE") && r.namespaceSet && len(r.resourceName) > 0 && len(r.namespace) == 0 { + return fmt.Errorf("an empty namespace may not be set when a resource name is provided") + } + if (r.verb == "POST") && r.namespaceSet && len(r.namespace) == 0 { + return fmt.Errorf("an empty namespace may not be set during creation") + } + client := r.client if client == nil { client = http.DefaultClient @@ -628,52 +643,38 @@ func (r *Request) DoRaw() ([]byte, error) { // Right now we make about ten retry attempts if we get a Retry-After response. // TODO: Change to a timeout based approach. + maxRetries := 10 retries := 0 - for { - if r.err != nil { - return nil, r.err - } - - // TODO: added to catch programmer errors (invoking operations with an object with an empty namespace) - if (r.verb == "GET" || r.verb == "PUT" || r.verb == "DELETE") && r.namespaceSet && len(r.resourceName) > 0 && len(r.namespace) == 0 { - return nil, fmt.Errorf("an empty namespace may not be set when a resource name is provided") - } - if (r.verb == "POST") && r.namespaceSet && len(r.namespace) == 0 { - return nil, fmt.Errorf("an empty namespace may not be set during creation") - } - - var err error - r.req, err = http.NewRequest(r.verb, r.finalURL(), r.body) + url := r.finalURL() + req, err := http.NewRequest(r.verb, r.finalURL(), r.body) if err != nil { - return nil, err + return err } - r.req.Header = r.headers - r.resp, err = client.Do(r.req) - if err != nil { - return nil, err - } - defer r.resp.Body.Close() + req.Header = r.headers - // Check to see if we got a 429 Too Many Requests response code. - if r.resp.StatusCode == errors.StatusTooManyRequests { - if retries < 10 { - retries++ - if waitFor := r.resp.Header.Get("Retry-After"); waitFor != "" { - delay, err := strconv.Atoi(waitFor) - if err == nil { - glog.V(4).Infof("Got a Retry-After %s response for attempt %d to %v", waitFor, retries, r.finalURL()) - time.Sleep(time.Duration(delay) * time.Second) - continue - } - } + resp, err := client.Do(req) + if err != nil { + return err + } + + done := func() bool { + // ensure the response body is closed before we reconnect, so that we reuse the same + // TCP connection + defer resp.Body.Close() + + retries++ + if seconds, wait := checkWait(resp); wait && retries < maxRetries { + glog.V(4).Infof("Got a Retry-After %s response for attempt %d to %v", seconds, retries, url) + time.Sleep(time.Duration(seconds) * time.Second) + return false } + fn(req, resp) + return true + }() + if done { + return nil } - body, err := ioutil.ReadAll(r.resp.Body) - if err != nil { - return nil, err - } - return body, err } } @@ -684,25 +685,42 @@ func (r *Request) DoRaw() ([]byte, error) { // * If the request can't be constructed, or an error happened earlier while building its // arguments: *RequestConstructionError // * If the server responds with a status: *errors.StatusError or *errors.UnexpectedObjectError -// * If the status code and body don't make sense together: *UnexpectedStatusError // * http.Client.Do errors are returned directly. func (r *Request) Do() Result { start := time.Now() defer func() { metrics.RequestLatency.WithLabelValues(r.verb, r.finalURLTemplate()).Observe(metrics.SinceInMicroseconds(start)) }() - body, err := r.DoRaw() + var result Result + err := r.request(func(req *http.Request, resp *http.Response) { + result = r.transformResponse(resp, req) + }) if err != nil { return Result{err: err} } - respBody, created, err := r.transformResponse(r.resp, r.req, body) - return Result{respBody, created, err, r.codec} + return result } -// transformResponse converts an API response into a structured API object. If body is nil, the response -// body will be read to try and gather more response data. -func (r *Request) transformResponse(resp *http.Response, req *http.Request, body []byte) ([]byte, bool, error) { - if body == nil && resp.Body != nil { +// DoRaw executes the request but does not process the response body. +func (r *Request) DoRaw() ([]byte, error) { + start := time.Now() + defer func() { + metrics.RequestLatency.WithLabelValues(r.verb, r.finalURLTemplate()).Observe(metrics.SinceInMicroseconds(start)) + }() + var result Result + err := r.request(func(req *http.Request, resp *http.Response) { + result.body, result.err = ioutil.ReadAll(resp.Body) + }) + if err != nil { + return nil, err + } + return result.body, result.err +} + +// transformResponse converts an API response into a structured API object +func (r *Request) transformResponse(resp *http.Response, req *http.Request) Result { + var body []byte + if resp.Body != nil { if data, err := ioutil.ReadAll(resp.Body); err == nil { body = data } @@ -719,21 +737,23 @@ func (r *Request) transformResponse(resp *http.Response, req *http.Request, body // no-op, we've been upgraded case resp.StatusCode < http.StatusOK || resp.StatusCode > http.StatusPartialContent: if !isStatusResponse { - return nil, false, r.transformUnstructuredResponseError(resp, req, body) + return Result{err: r.transformUnstructuredResponseError(resp, req, body)} } - return nil, false, errors.FromObject(&status) + return Result{err: errors.FromObject(&status)} } // If the server gave us a status back, look at what it was. success := resp.StatusCode >= http.StatusOK && resp.StatusCode <= http.StatusPartialContent if isStatusResponse && (status.Status != api.StatusSuccess && !success) { - // "Working" requests need to be handled specially. // "Failed" requests are clearly just an error and it makes sense to return them as such. - return nil, false, errors.FromObject(&status) + return Result{err: errors.FromObject(&status)} } - created := resp.StatusCode == http.StatusCreated - return body, created, nil + return Result{ + body: body, + created: resp.StatusCode == http.StatusCreated, + codec: r.codec, + } } // transformUnstructuredResponseError handles an error from the server that is not in a structured form. @@ -781,6 +801,16 @@ func isTextResponse(resp *http.Response) bool { return strings.HasPrefix(media, "text/") } +// checkWait returns true along with a number of seconds if the server instructed us to wait +// before retrying. +func checkWait(resp *http.Response) (int, bool) { + if resp.StatusCode != errors.StatusTooManyRequests { + return 0, false + } + i, ok := retryAfterSeconds(resp) + return i, ok +} + // retryAfterSeconds returns the value of the Retry-After header and true, or 0 and false if // the header was missing or not a valid number. func retryAfterSeconds(resp *http.Response) (int, bool) { diff --git a/pkg/client/request_test.go b/pkg/client/request_test.go index a553f97091c..fe77a8c5901 100644 --- a/pkg/client/request_test.go +++ b/pkg/client/request_test.go @@ -266,11 +266,8 @@ func TestTransformResponse(t *testing.T) { if test.Response.Body == nil { test.Response.Body = ioutil.NopCloser(bytes.NewReader([]byte{})) } - body, err := ioutil.ReadAll(test.Response.Body) - if err != nil { - t.Errorf("failed to read body of response: %v", err) - } - response, created, err := r.transformResponse(test.Response, &http.Request{}, body) + result := r.transformResponse(test.Response, &http.Request{}) + response, created, err := result.body, result.created, result.err hasErr := err != nil if hasErr != test.Error { t.Errorf("%d: unexpected error: %t %v", i, test.Error, err) @@ -356,11 +353,8 @@ func TestTransformUnstructuredError(t *testing.T) { resourceName: testCase.Name, resource: testCase.Resource, } - body, err := ioutil.ReadAll(testCase.Res.Body) - if err != nil { - t.Errorf("failed to read body: %v", err) - } - _, _, err = r.transformResponse(testCase.Res, testCase.Req, body) + result := r.transformResponse(testCase.Res, testCase.Req) + err := result.err if !testCase.ErrFn(err) { t.Errorf("unexpected error: %v", err) continue @@ -748,6 +742,64 @@ func TestDoRequestNewWay(t *testing.T) { } } +func TestCheckRetryClosesBody(t *testing.T) { + count := 0 + ch := make(chan struct{}) + testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + count++ + t.Logf("attempt %d", count) + if count >= 5 { + w.WriteHeader(http.StatusOK) + close(ch) + return + } + w.Header().Set("Retry-After", "0") + w.WriteHeader(apierrors.StatusTooManyRequests) + })) + defer testServer.Close() + + c := NewOrDie(&Config{Host: testServer.URL, Version: "v1beta2", Username: "user", Password: "pass"}) + _, err := c.Verb("POST"). + Prefix("foo", "bar"). + Suffix("baz"). + Timeout(time.Second). + Body([]byte(strings.Repeat("abcd", 1000))). + DoRaw() + if err != nil { + t.Fatalf("Unexpected error: %v %#v", err, err) + } + <-ch + if count != 5 { + t.Errorf("unexpected retries: %d", count) + } +} + +func BenchmarkCheckRetryClosesBody(t *testing.B) { + count := 0 + testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + count++ + if count%3 == 0 { + w.WriteHeader(http.StatusOK) + return + } + w.Header().Set("Retry-After", "0") + w.WriteHeader(apierrors.StatusTooManyRequests) + })) + defer testServer.Close() + + c := NewOrDie(&Config{Host: testServer.URL, Version: "v1beta2", Username: "user", Password: "pass"}) + r := c.Verb("POST"). + Prefix("foo", "bar"). + Suffix("baz"). + Timeout(time.Second). + Body([]byte(strings.Repeat("abcd", 1000))) + + for i := 0; i < t.N; i++ { + if _, err := r.DoRaw(); err != nil { + t.Fatalf("Unexpected error: %v %#v", err, err) + } + } +} func TestDoRequestNewWayReader(t *testing.T) { reqObj := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}} reqBodyExpected, _ := v1beta1.Codec.Encode(reqObj)