mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-04 18:00:08 +00:00
Merge pull request #6671 from smarterclayton/ensure_body_is_closed
Reorganized Request.DoRaw() to guarantee streams are closed
This commit is contained in:
commit
8dfd7001f5
@ -510,8 +510,8 @@ func (r *Request) Watch() (watch.Interface, error) {
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if resp.StatusCode != http.StatusOK {
|
if resp.StatusCode != http.StatusOK {
|
||||||
if _, _, err := r.transformResponse(resp, req, nil); err != nil {
|
if result := r.transformResponse(resp, req); result.err != nil {
|
||||||
return nil, err
|
return nil, result.err
|
||||||
}
|
}
|
||||||
return nil, fmt.Errorf("for request '%+v', got status: %v", req.URL, resp.StatusCode)
|
return nil, fmt.Errorf("for request '%+v', got status: %v", req.URL, resp.StatusCode)
|
||||||
}
|
}
|
||||||
@ -619,8 +619,23 @@ func (r *Request) Upgrade(config *Config, newRoundTripperFunc func(*tls.Config)
|
|||||||
return upgradeRoundTripper.NewConnection(resp)
|
return upgradeRoundTripper.NewConnection(resp)
|
||||||
}
|
}
|
||||||
|
|
||||||
// DoRaw executes a raw request which is not subject to interpretation as an API response.
|
// request connects to the server and invokes the provided function when a server response is
|
||||||
func (r *Request) DoRaw() ([]byte, error) {
|
// received. It handles retry behavior and up front validation of requests. It wil invoke
|
||||||
|
// fn at most once. It will return an error if a problem occured prior to connecting to the
|
||||||
|
// server - the provided function is responsible for handling server errors.
|
||||||
|
func (r *Request) request(fn func(*http.Request, *http.Response)) error {
|
||||||
|
if r.err != nil {
|
||||||
|
return r.err
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: added to catch programmer errors (invoking operations with an object with an empty namespace)
|
||||||
|
if (r.verb == "GET" || r.verb == "PUT" || r.verb == "DELETE") && r.namespaceSet && len(r.resourceName) > 0 && len(r.namespace) == 0 {
|
||||||
|
return fmt.Errorf("an empty namespace may not be set when a resource name is provided")
|
||||||
|
}
|
||||||
|
if (r.verb == "POST") && r.namespaceSet && len(r.namespace) == 0 {
|
||||||
|
return fmt.Errorf("an empty namespace may not be set during creation")
|
||||||
|
}
|
||||||
|
|
||||||
client := r.client
|
client := r.client
|
||||||
if client == nil {
|
if client == nil {
|
||||||
client = http.DefaultClient
|
client = http.DefaultClient
|
||||||
@ -628,54 +643,40 @@ func (r *Request) DoRaw() ([]byte, error) {
|
|||||||
|
|
||||||
// Right now we make about ten retry attempts if we get a Retry-After response.
|
// Right now we make about ten retry attempts if we get a Retry-After response.
|
||||||
// TODO: Change to a timeout based approach.
|
// TODO: Change to a timeout based approach.
|
||||||
|
maxRetries := 10
|
||||||
retries := 0
|
retries := 0
|
||||||
|
|
||||||
for {
|
for {
|
||||||
if r.err != nil {
|
url := r.finalURL()
|
||||||
return nil, r.err
|
req, err := http.NewRequest(r.verb, r.finalURL(), r.body)
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: added to catch programmer errors (invoking operations with an object with an empty namespace)
|
|
||||||
if (r.verb == "GET" || r.verb == "PUT" || r.verb == "DELETE") && r.namespaceSet && len(r.resourceName) > 0 && len(r.namespace) == 0 {
|
|
||||||
return nil, fmt.Errorf("an empty namespace may not be set when a resource name is provided")
|
|
||||||
}
|
|
||||||
if (r.verb == "POST") && r.namespaceSet && len(r.namespace) == 0 {
|
|
||||||
return nil, fmt.Errorf("an empty namespace may not be set during creation")
|
|
||||||
}
|
|
||||||
|
|
||||||
var err error
|
|
||||||
r.req, err = http.NewRequest(r.verb, r.finalURL(), r.body)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return err
|
||||||
}
|
}
|
||||||
r.req.Header = r.headers
|
req.Header = r.headers
|
||||||
r.resp, err = client.Do(r.req)
|
|
||||||
if err != nil {
|
resp, err := client.Do(req)
|
||||||
return nil, err
|
if err != nil {
|
||||||
}
|
return err
|
||||||
defer r.resp.Body.Close()
|
}
|
||||||
|
|
||||||
|
done := func() bool {
|
||||||
|
// ensure the response body is closed before we reconnect, so that we reuse the same
|
||||||
|
// TCP connection
|
||||||
|
defer resp.Body.Close()
|
||||||
|
|
||||||
// Check to see if we got a 429 Too Many Requests response code.
|
|
||||||
if r.resp.StatusCode == errors.StatusTooManyRequests {
|
|
||||||
if retries < 10 {
|
|
||||||
retries++
|
retries++
|
||||||
if waitFor := r.resp.Header.Get("Retry-After"); waitFor != "" {
|
if seconds, wait := checkWait(resp); wait && retries < maxRetries {
|
||||||
delay, err := strconv.Atoi(waitFor)
|
glog.V(4).Infof("Got a Retry-After %s response for attempt %d to %v", seconds, retries, url)
|
||||||
if err == nil {
|
time.Sleep(time.Duration(seconds) * time.Second)
|
||||||
glog.V(4).Infof("Got a Retry-After %s response for attempt %d to %v", waitFor, retries, r.finalURL())
|
return false
|
||||||
time.Sleep(time.Duration(delay) * time.Second)
|
|
||||||
continue
|
|
||||||
}
|
}
|
||||||
|
fn(req, resp)
|
||||||
|
return true
|
||||||
|
}()
|
||||||
|
if done {
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
body, err := ioutil.ReadAll(r.resp.Body)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return body, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Do formats and executes the request. Returns a Result object for easy response
|
// Do formats and executes the request. Returns a Result object for easy response
|
||||||
// processing.
|
// processing.
|
||||||
@ -684,25 +685,42 @@ func (r *Request) DoRaw() ([]byte, error) {
|
|||||||
// * If the request can't be constructed, or an error happened earlier while building its
|
// * If the request can't be constructed, or an error happened earlier while building its
|
||||||
// arguments: *RequestConstructionError
|
// arguments: *RequestConstructionError
|
||||||
// * If the server responds with a status: *errors.StatusError or *errors.UnexpectedObjectError
|
// * If the server responds with a status: *errors.StatusError or *errors.UnexpectedObjectError
|
||||||
// * If the status code and body don't make sense together: *UnexpectedStatusError
|
|
||||||
// * http.Client.Do errors are returned directly.
|
// * http.Client.Do errors are returned directly.
|
||||||
func (r *Request) Do() Result {
|
func (r *Request) Do() Result {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
defer func() {
|
defer func() {
|
||||||
metrics.RequestLatency.WithLabelValues(r.verb, r.finalURLTemplate()).Observe(metrics.SinceInMicroseconds(start))
|
metrics.RequestLatency.WithLabelValues(r.verb, r.finalURLTemplate()).Observe(metrics.SinceInMicroseconds(start))
|
||||||
}()
|
}()
|
||||||
body, err := r.DoRaw()
|
var result Result
|
||||||
|
err := r.request(func(req *http.Request, resp *http.Response) {
|
||||||
|
result = r.transformResponse(resp, req)
|
||||||
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return Result{err: err}
|
return Result{err: err}
|
||||||
}
|
}
|
||||||
respBody, created, err := r.transformResponse(r.resp, r.req, body)
|
return result
|
||||||
return Result{respBody, created, err, r.codec}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// transformResponse converts an API response into a structured API object. If body is nil, the response
|
// DoRaw executes the request but does not process the response body.
|
||||||
// body will be read to try and gather more response data.
|
func (r *Request) DoRaw() ([]byte, error) {
|
||||||
func (r *Request) transformResponse(resp *http.Response, req *http.Request, body []byte) ([]byte, bool, error) {
|
start := time.Now()
|
||||||
if body == nil && resp.Body != nil {
|
defer func() {
|
||||||
|
metrics.RequestLatency.WithLabelValues(r.verb, r.finalURLTemplate()).Observe(metrics.SinceInMicroseconds(start))
|
||||||
|
}()
|
||||||
|
var result Result
|
||||||
|
err := r.request(func(req *http.Request, resp *http.Response) {
|
||||||
|
result.body, result.err = ioutil.ReadAll(resp.Body)
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return result.body, result.err
|
||||||
|
}
|
||||||
|
|
||||||
|
// transformResponse converts an API response into a structured API object
|
||||||
|
func (r *Request) transformResponse(resp *http.Response, req *http.Request) Result {
|
||||||
|
var body []byte
|
||||||
|
if resp.Body != nil {
|
||||||
if data, err := ioutil.ReadAll(resp.Body); err == nil {
|
if data, err := ioutil.ReadAll(resp.Body); err == nil {
|
||||||
body = data
|
body = data
|
||||||
}
|
}
|
||||||
@ -719,21 +737,23 @@ func (r *Request) transformResponse(resp *http.Response, req *http.Request, body
|
|||||||
// no-op, we've been upgraded
|
// no-op, we've been upgraded
|
||||||
case resp.StatusCode < http.StatusOK || resp.StatusCode > http.StatusPartialContent:
|
case resp.StatusCode < http.StatusOK || resp.StatusCode > http.StatusPartialContent:
|
||||||
if !isStatusResponse {
|
if !isStatusResponse {
|
||||||
return nil, false, r.transformUnstructuredResponseError(resp, req, body)
|
return Result{err: r.transformUnstructuredResponseError(resp, req, body)}
|
||||||
}
|
}
|
||||||
return nil, false, errors.FromObject(&status)
|
return Result{err: errors.FromObject(&status)}
|
||||||
}
|
}
|
||||||
|
|
||||||
// If the server gave us a status back, look at what it was.
|
// If the server gave us a status back, look at what it was.
|
||||||
success := resp.StatusCode >= http.StatusOK && resp.StatusCode <= http.StatusPartialContent
|
success := resp.StatusCode >= http.StatusOK && resp.StatusCode <= http.StatusPartialContent
|
||||||
if isStatusResponse && (status.Status != api.StatusSuccess && !success) {
|
if isStatusResponse && (status.Status != api.StatusSuccess && !success) {
|
||||||
// "Working" requests need to be handled specially.
|
|
||||||
// "Failed" requests are clearly just an error and it makes sense to return them as such.
|
// "Failed" requests are clearly just an error and it makes sense to return them as such.
|
||||||
return nil, false, errors.FromObject(&status)
|
return Result{err: errors.FromObject(&status)}
|
||||||
}
|
}
|
||||||
|
|
||||||
created := resp.StatusCode == http.StatusCreated
|
return Result{
|
||||||
return body, created, nil
|
body: body,
|
||||||
|
created: resp.StatusCode == http.StatusCreated,
|
||||||
|
codec: r.codec,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// transformUnstructuredResponseError handles an error from the server that is not in a structured form.
|
// transformUnstructuredResponseError handles an error from the server that is not in a structured form.
|
||||||
@ -781,6 +801,16 @@ func isTextResponse(resp *http.Response) bool {
|
|||||||
return strings.HasPrefix(media, "text/")
|
return strings.HasPrefix(media, "text/")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// checkWait returns true along with a number of seconds if the server instructed us to wait
|
||||||
|
// before retrying.
|
||||||
|
func checkWait(resp *http.Response) (int, bool) {
|
||||||
|
if resp.StatusCode != errors.StatusTooManyRequests {
|
||||||
|
return 0, false
|
||||||
|
}
|
||||||
|
i, ok := retryAfterSeconds(resp)
|
||||||
|
return i, ok
|
||||||
|
}
|
||||||
|
|
||||||
// retryAfterSeconds returns the value of the Retry-After header and true, or 0 and false if
|
// retryAfterSeconds returns the value of the Retry-After header and true, or 0 and false if
|
||||||
// the header was missing or not a valid number.
|
// the header was missing or not a valid number.
|
||||||
func retryAfterSeconds(resp *http.Response) (int, bool) {
|
func retryAfterSeconds(resp *http.Response) (int, bool) {
|
||||||
|
@ -266,11 +266,8 @@ func TestTransformResponse(t *testing.T) {
|
|||||||
if test.Response.Body == nil {
|
if test.Response.Body == nil {
|
||||||
test.Response.Body = ioutil.NopCloser(bytes.NewReader([]byte{}))
|
test.Response.Body = ioutil.NopCloser(bytes.NewReader([]byte{}))
|
||||||
}
|
}
|
||||||
body, err := ioutil.ReadAll(test.Response.Body)
|
result := r.transformResponse(test.Response, &http.Request{})
|
||||||
if err != nil {
|
response, created, err := result.body, result.created, result.err
|
||||||
t.Errorf("failed to read body of response: %v", err)
|
|
||||||
}
|
|
||||||
response, created, err := r.transformResponse(test.Response, &http.Request{}, body)
|
|
||||||
hasErr := err != nil
|
hasErr := err != nil
|
||||||
if hasErr != test.Error {
|
if hasErr != test.Error {
|
||||||
t.Errorf("%d: unexpected error: %t %v", i, test.Error, err)
|
t.Errorf("%d: unexpected error: %t %v", i, test.Error, err)
|
||||||
@ -356,11 +353,8 @@ func TestTransformUnstructuredError(t *testing.T) {
|
|||||||
resourceName: testCase.Name,
|
resourceName: testCase.Name,
|
||||||
resource: testCase.Resource,
|
resource: testCase.Resource,
|
||||||
}
|
}
|
||||||
body, err := ioutil.ReadAll(testCase.Res.Body)
|
result := r.transformResponse(testCase.Res, testCase.Req)
|
||||||
if err != nil {
|
err := result.err
|
||||||
t.Errorf("failed to read body: %v", err)
|
|
||||||
}
|
|
||||||
_, _, err = r.transformResponse(testCase.Res, testCase.Req, body)
|
|
||||||
if !testCase.ErrFn(err) {
|
if !testCase.ErrFn(err) {
|
||||||
t.Errorf("unexpected error: %v", err)
|
t.Errorf("unexpected error: %v", err)
|
||||||
continue
|
continue
|
||||||
@ -748,6 +742,64 @@ func TestDoRequestNewWay(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestCheckRetryClosesBody(t *testing.T) {
|
||||||
|
count := 0
|
||||||
|
ch := make(chan struct{})
|
||||||
|
testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
|
||||||
|
count++
|
||||||
|
t.Logf("attempt %d", count)
|
||||||
|
if count >= 5 {
|
||||||
|
w.WriteHeader(http.StatusOK)
|
||||||
|
close(ch)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
w.Header().Set("Retry-After", "0")
|
||||||
|
w.WriteHeader(apierrors.StatusTooManyRequests)
|
||||||
|
}))
|
||||||
|
defer testServer.Close()
|
||||||
|
|
||||||
|
c := NewOrDie(&Config{Host: testServer.URL, Version: "v1beta2", Username: "user", Password: "pass"})
|
||||||
|
_, err := c.Verb("POST").
|
||||||
|
Prefix("foo", "bar").
|
||||||
|
Suffix("baz").
|
||||||
|
Timeout(time.Second).
|
||||||
|
Body([]byte(strings.Repeat("abcd", 1000))).
|
||||||
|
DoRaw()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Unexpected error: %v %#v", err, err)
|
||||||
|
}
|
||||||
|
<-ch
|
||||||
|
if count != 5 {
|
||||||
|
t.Errorf("unexpected retries: %d", count)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func BenchmarkCheckRetryClosesBody(t *testing.B) {
|
||||||
|
count := 0
|
||||||
|
testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
|
||||||
|
count++
|
||||||
|
if count%3 == 0 {
|
||||||
|
w.WriteHeader(http.StatusOK)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
w.Header().Set("Retry-After", "0")
|
||||||
|
w.WriteHeader(apierrors.StatusTooManyRequests)
|
||||||
|
}))
|
||||||
|
defer testServer.Close()
|
||||||
|
|
||||||
|
c := NewOrDie(&Config{Host: testServer.URL, Version: "v1beta2", Username: "user", Password: "pass"})
|
||||||
|
r := c.Verb("POST").
|
||||||
|
Prefix("foo", "bar").
|
||||||
|
Suffix("baz").
|
||||||
|
Timeout(time.Second).
|
||||||
|
Body([]byte(strings.Repeat("abcd", 1000)))
|
||||||
|
|
||||||
|
for i := 0; i < t.N; i++ {
|
||||||
|
if _, err := r.DoRaw(); err != nil {
|
||||||
|
t.Fatalf("Unexpected error: %v %#v", err, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
func TestDoRequestNewWayReader(t *testing.T) {
|
func TestDoRequestNewWayReader(t *testing.T) {
|
||||||
reqObj := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}
|
reqObj := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}
|
||||||
reqBodyExpected, _ := v1beta1.Codec.Encode(reqObj)
|
reqBodyExpected, _ := v1beta1.Codec.Encode(reqObj)
|
||||||
|
Loading…
Reference in New Issue
Block a user