Reorganized Request.DoRaw() to guarantee streams are closed

Added a test to verify 429 triggers a client retry based on
header. Forced resp.Body.Close() to close after each request,
which allows Golang to reuse the TCP connection to the server
(should reduce connection establishment under retries).
Possibly fixed a server leak of request bodies.
This commit is contained in:
Clayton Coleman 2015-04-10 01:10:35 -04:00
parent 4f3f19511f
commit 852ef7b637
2 changed files with 149 additions and 67 deletions

View File

@ -510,8 +510,8 @@ func (r *Request) Watch() (watch.Interface, error) {
return nil, err
}
if resp.StatusCode != http.StatusOK {
if _, _, err := r.transformResponse(resp, req, nil); err != nil {
return nil, err
if result := r.transformResponse(resp, req); result.err != nil {
return nil, result.err
}
return nil, fmt.Errorf("for request '%+v', got status: %v", req.URL, resp.StatusCode)
}
@ -619,8 +619,23 @@ func (r *Request) Upgrade(config *Config, newRoundTripperFunc func(*tls.Config)
return upgradeRoundTripper.NewConnection(resp)
}
// DoRaw executes a raw request which is not subject to interpretation as an API response.
func (r *Request) DoRaw() ([]byte, error) {
// request connects to the server and invokes the provided function when a server response is
// received. It handles retry behavior and up front validation of requests. It wil invoke
// fn at most once. It will return an error if a problem occured prior to connecting to the
// server - the provided function is responsible for handling server errors.
func (r *Request) request(fn func(*http.Request, *http.Response)) error {
if r.err != nil {
return r.err
}
// TODO: added to catch programmer errors (invoking operations with an object with an empty namespace)
if (r.verb == "GET" || r.verb == "PUT" || r.verb == "DELETE") && r.namespaceSet && len(r.resourceName) > 0 && len(r.namespace) == 0 {
return fmt.Errorf("an empty namespace may not be set when a resource name is provided")
}
if (r.verb == "POST") && r.namespaceSet && len(r.namespace) == 0 {
return fmt.Errorf("an empty namespace may not be set during creation")
}
client := r.client
if client == nil {
client = http.DefaultClient
@ -628,52 +643,38 @@ func (r *Request) DoRaw() ([]byte, error) {
// Right now we make about ten retry attempts if we get a Retry-After response.
// TODO: Change to a timeout based approach.
maxRetries := 10
retries := 0
for {
if r.err != nil {
return nil, r.err
}
// TODO: added to catch programmer errors (invoking operations with an object with an empty namespace)
if (r.verb == "GET" || r.verb == "PUT" || r.verb == "DELETE") && r.namespaceSet && len(r.resourceName) > 0 && len(r.namespace) == 0 {
return nil, fmt.Errorf("an empty namespace may not be set when a resource name is provided")
}
if (r.verb == "POST") && r.namespaceSet && len(r.namespace) == 0 {
return nil, fmt.Errorf("an empty namespace may not be set during creation")
}
var err error
r.req, err = http.NewRequest(r.verb, r.finalURL(), r.body)
url := r.finalURL()
req, err := http.NewRequest(r.verb, r.finalURL(), r.body)
if err != nil {
return nil, err
return err
}
r.req.Header = r.headers
r.resp, err = client.Do(r.req)
if err != nil {
return nil, err
}
defer r.resp.Body.Close()
req.Header = r.headers
// Check to see if we got a 429 Too Many Requests response code.
if r.resp.StatusCode == errors.StatusTooManyRequests {
if retries < 10 {
retries++
if waitFor := r.resp.Header.Get("Retry-After"); waitFor != "" {
delay, err := strconv.Atoi(waitFor)
if err == nil {
glog.V(4).Infof("Got a Retry-After %s response for attempt %d to %v", waitFor, retries, r.finalURL())
time.Sleep(time.Duration(delay) * time.Second)
continue
}
}
resp, err := client.Do(req)
if err != nil {
return err
}
done := func() bool {
// ensure the response body is closed before we reconnect, so that we reuse the same
// TCP connection
defer resp.Body.Close()
retries++
if seconds, wait := checkWait(resp); wait && retries < maxRetries {
glog.V(4).Infof("Got a Retry-After %s response for attempt %d to %v", seconds, retries, url)
time.Sleep(time.Duration(seconds) * time.Second)
return false
}
fn(req, resp)
return true
}()
if done {
return nil
}
body, err := ioutil.ReadAll(r.resp.Body)
if err != nil {
return nil, err
}
return body, err
}
}
@ -684,25 +685,42 @@ func (r *Request) DoRaw() ([]byte, error) {
// * If the request can't be constructed, or an error happened earlier while building its
// arguments: *RequestConstructionError
// * If the server responds with a status: *errors.StatusError or *errors.UnexpectedObjectError
// * If the status code and body don't make sense together: *UnexpectedStatusError
// * http.Client.Do errors are returned directly.
func (r *Request) Do() Result {
start := time.Now()
defer func() {
metrics.RequestLatency.WithLabelValues(r.verb, r.finalURLTemplate()).Observe(metrics.SinceInMicroseconds(start))
}()
body, err := r.DoRaw()
var result Result
err := r.request(func(req *http.Request, resp *http.Response) {
result = r.transformResponse(resp, req)
})
if err != nil {
return Result{err: err}
}
respBody, created, err := r.transformResponse(r.resp, r.req, body)
return Result{respBody, created, err, r.codec}
return result
}
// transformResponse converts an API response into a structured API object. If body is nil, the response
// body will be read to try and gather more response data.
func (r *Request) transformResponse(resp *http.Response, req *http.Request, body []byte) ([]byte, bool, error) {
if body == nil && resp.Body != nil {
// DoRaw executes the request but does not process the response body.
func (r *Request) DoRaw() ([]byte, error) {
start := time.Now()
defer func() {
metrics.RequestLatency.WithLabelValues(r.verb, r.finalURLTemplate()).Observe(metrics.SinceInMicroseconds(start))
}()
var result Result
err := r.request(func(req *http.Request, resp *http.Response) {
result.body, result.err = ioutil.ReadAll(resp.Body)
})
if err != nil {
return nil, err
}
return result.body, result.err
}
// transformResponse converts an API response into a structured API object
func (r *Request) transformResponse(resp *http.Response, req *http.Request) Result {
var body []byte
if resp.Body != nil {
if data, err := ioutil.ReadAll(resp.Body); err == nil {
body = data
}
@ -719,21 +737,23 @@ func (r *Request) transformResponse(resp *http.Response, req *http.Request, body
// no-op, we've been upgraded
case resp.StatusCode < http.StatusOK || resp.StatusCode > http.StatusPartialContent:
if !isStatusResponse {
return nil, false, r.transformUnstructuredResponseError(resp, req, body)
return Result{err: r.transformUnstructuredResponseError(resp, req, body)}
}
return nil, false, errors.FromObject(&status)
return Result{err: errors.FromObject(&status)}
}
// If the server gave us a status back, look at what it was.
success := resp.StatusCode >= http.StatusOK && resp.StatusCode <= http.StatusPartialContent
if isStatusResponse && (status.Status != api.StatusSuccess && !success) {
// "Working" requests need to be handled specially.
// "Failed" requests are clearly just an error and it makes sense to return them as such.
return nil, false, errors.FromObject(&status)
return Result{err: errors.FromObject(&status)}
}
created := resp.StatusCode == http.StatusCreated
return body, created, nil
return Result{
body: body,
created: resp.StatusCode == http.StatusCreated,
codec: r.codec,
}
}
// transformUnstructuredResponseError handles an error from the server that is not in a structured form.
@ -781,6 +801,16 @@ func isTextResponse(resp *http.Response) bool {
return strings.HasPrefix(media, "text/")
}
// checkWait returns true along with a number of seconds if the server instructed us to wait
// before retrying.
func checkWait(resp *http.Response) (int, bool) {
if resp.StatusCode != errors.StatusTooManyRequests {
return 0, false
}
i, ok := retryAfterSeconds(resp)
return i, ok
}
// retryAfterSeconds returns the value of the Retry-After header and true, or 0 and false if
// the header was missing or not a valid number.
func retryAfterSeconds(resp *http.Response) (int, bool) {

View File

@ -266,11 +266,8 @@ func TestTransformResponse(t *testing.T) {
if test.Response.Body == nil {
test.Response.Body = ioutil.NopCloser(bytes.NewReader([]byte{}))
}
body, err := ioutil.ReadAll(test.Response.Body)
if err != nil {
t.Errorf("failed to read body of response: %v", err)
}
response, created, err := r.transformResponse(test.Response, &http.Request{}, body)
result := r.transformResponse(test.Response, &http.Request{})
response, created, err := result.body, result.created, result.err
hasErr := err != nil
if hasErr != test.Error {
t.Errorf("%d: unexpected error: %t %v", i, test.Error, err)
@ -356,11 +353,8 @@ func TestTransformUnstructuredError(t *testing.T) {
resourceName: testCase.Name,
resource: testCase.Resource,
}
body, err := ioutil.ReadAll(testCase.Res.Body)
if err != nil {
t.Errorf("failed to read body: %v", err)
}
_, _, err = r.transformResponse(testCase.Res, testCase.Req, body)
result := r.transformResponse(testCase.Res, testCase.Req)
err := result.err
if !testCase.ErrFn(err) {
t.Errorf("unexpected error: %v", err)
continue
@ -748,6 +742,64 @@ func TestDoRequestNewWay(t *testing.T) {
}
}
func TestCheckRetryClosesBody(t *testing.T) {
count := 0
ch := make(chan struct{})
testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
count++
t.Logf("attempt %d", count)
if count >= 5 {
w.WriteHeader(http.StatusOK)
close(ch)
return
}
w.Header().Set("Retry-After", "0")
w.WriteHeader(apierrors.StatusTooManyRequests)
}))
defer testServer.Close()
c := NewOrDie(&Config{Host: testServer.URL, Version: "v1beta2", Username: "user", Password: "pass"})
_, err := c.Verb("POST").
Prefix("foo", "bar").
Suffix("baz").
Timeout(time.Second).
Body([]byte(strings.Repeat("abcd", 1000))).
DoRaw()
if err != nil {
t.Fatalf("Unexpected error: %v %#v", err, err)
}
<-ch
if count != 5 {
t.Errorf("unexpected retries: %d", count)
}
}
func BenchmarkCheckRetryClosesBody(t *testing.B) {
count := 0
testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
count++
if count%3 == 0 {
w.WriteHeader(http.StatusOK)
return
}
w.Header().Set("Retry-After", "0")
w.WriteHeader(apierrors.StatusTooManyRequests)
}))
defer testServer.Close()
c := NewOrDie(&Config{Host: testServer.URL, Version: "v1beta2", Username: "user", Password: "pass"})
r := c.Verb("POST").
Prefix("foo", "bar").
Suffix("baz").
Timeout(time.Second).
Body([]byte(strings.Repeat("abcd", 1000)))
for i := 0; i < t.N; i++ {
if _, err := r.DoRaw(); err != nil {
t.Fatalf("Unexpected error: %v %#v", err, err)
}
}
}
func TestDoRequestNewWayReader(t *testing.T) {
reqObj := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}
reqBodyExpected, _ := v1beta1.Codec.Encode(reqObj)