diff --git a/pkg/registry/generic/rest/response_checker.go b/pkg/registry/generic/rest/response_checker.go new file mode 100644 index 00000000000..9213229df4d --- /dev/null +++ b/pkg/registry/generic/rest/response_checker.go @@ -0,0 +1,70 @@ +/* +Copyright 2014 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package rest + +import ( + "fmt" + "io" + "io/ioutil" + "net/http" + + "k8s.io/kubernetes/pkg/api/errors" +) + +// Check the http error status from a location URL. +// And convert an error into a structured API object. +// Finally ensure we close the body before returning the error +type HttpResponseChecker interface { + Check(resp *http.Response) error +} + +// Max length read from the response body of a location which returns error status +const ( + maxReadLength = 50000 +) + +// A generic http response checker to transform the error. +type GenericHttpResponseChecker struct { + Kind string + Name string +} + +func (checker GenericHttpResponseChecker) Check(resp *http.Response) error { + if resp.StatusCode < http.StatusOK || resp.StatusCode > http.StatusPartialContent { + defer resp.Body.Close() + bodyBytes, err := ioutil.ReadAll(io.LimitReader(resp.Body, maxReadLength)) + if err != nil { + return errors.NewInternalError(err) + } + bodyText := string(bodyBytes) + + switch { + case resp.StatusCode == http.StatusInternalServerError: + return errors.NewInternalError(fmt.Errorf("%s", bodyText)) + case resp.StatusCode == http.StatusBadRequest: + return errors.NewBadRequest(bodyText) + case resp.StatusCode == http.StatusNotFound: + return errors.NewGenericServerResponse(resp.StatusCode, "", checker.Kind, checker.Name, bodyText, 0, false) + } + return errors.NewGenericServerResponse(resp.StatusCode, "", checker.Kind, checker.Name, bodyText, 0, false) + } + return nil +} + +func NewGenericHttpResponseChecker(kind, name string) GenericHttpResponseChecker { + return GenericHttpResponseChecker{Kind: kind, Name: name} +} diff --git a/pkg/registry/generic/rest/response_checker_test.go b/pkg/registry/generic/rest/response_checker_test.go new file mode 100644 index 00000000000..ece5cd0a588 --- /dev/null +++ b/pkg/registry/generic/rest/response_checker_test.go @@ -0,0 +1,94 @@ +/* +Copyright 2014 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package rest + +import ( + "bytes" + "fmt" + "io/ioutil" + "net/http" + "reflect" + "strings" + "testing" + + "k8s.io/kubernetes/pkg/api/errors" +) + +func TestGenericHttpResponseChecker(t *testing.T) { + responseChecker := NewGenericHttpResponseChecker("Pod", "foo") + tests := []struct { + resp *http.Response + expectError bool + expected error + name string + }{ + { + resp: &http.Response{ + Body: ioutil.NopCloser(bytes.NewBufferString("Success")), + StatusCode: http.StatusOK, + }, + expectError: false, + name: "ok", + }, + { + resp: &http.Response{ + Body: ioutil.NopCloser(bytes.NewBufferString("Invalid request.")), + StatusCode: http.StatusBadRequest, + }, + expectError: true, + expected: errors.NewBadRequest("Invalid request."), + name: "bad request", + }, + { + resp: &http.Response{ + Body: ioutil.NopCloser(bytes.NewBufferString("Pod does not exist.")), + StatusCode: http.StatusInternalServerError, + }, + expectError: true, + expected: errors.NewInternalError(fmt.Errorf("%s", "Pod does not exist.")), + name: "internal server error", + }, + } + for _, test := range tests { + err := responseChecker.Check(test.resp) + if test.expectError && err == nil { + t.Error("unexpected non-error") + } + if !test.expectError && err != nil { + t.Errorf("unexpected error: %v", err) + } + if test.expectError && !reflect.DeepEqual(err, test.expected) { + t.Errorf("expected: %s, saw: %s", test.expected, err) + } + } +} + +func TestGenericHttpResponseCheckerLimitReader(t *testing.T) { + responseChecker := NewGenericHttpResponseChecker("Pod", "foo") + excessedString := strings.Repeat("a", (maxReadLength + 10000)) + resp := &http.Response{ + Body: ioutil.NopCloser(bytes.NewBufferString(excessedString)), + StatusCode: http.StatusBadRequest, + } + err := responseChecker.Check(resp) + if err == nil { + t.Error("unexpected non-error") + } + if len(err.Error()) != maxReadLength { + t.Errorf("expected lenth of error message: %d, saw: %d", maxReadLength, len(err.Error())) + } +} diff --git a/pkg/registry/generic/rest/streamer.go b/pkg/registry/generic/rest/streamer.go index ee303dbb862..6c5dedaba06 100644 --- a/pkg/registry/generic/rest/streamer.go +++ b/pkg/registry/generic/rest/streamer.go @@ -28,10 +28,11 @@ import ( // LocationStreamer is a resource that streams the contents of a particular // location URL type LocationStreamer struct { - Location *url.URL - Transport http.RoundTripper - ContentType string - Flush bool + Location *url.URL + Transport http.RoundTripper + ContentType string + Flush bool + ResponseChecker HttpResponseChecker } // a LocationStreamer must implement a rest.ResourceStreamer @@ -54,8 +55,15 @@ func (s *LocationStreamer) InputStream(apiVersion, acceptHeader string) (stream client := &http.Client{Transport: transport} resp, err := client.Get(s.Location.String()) if err != nil { - return + return nil, false, "", err } + + if s.ResponseChecker != nil { + if err = s.ResponseChecker.Check(resp); err != nil { + return nil, false, "", err + } + } + contentType = s.ContentType if len(contentType) == 0 { contentType = resp.Header.Get("Content-Type") diff --git a/pkg/registry/generic/rest/streamer_test.go b/pkg/registry/generic/rest/streamer_test.go index 9b6d1bec8e0..c81c22439f3 100644 --- a/pkg/registry/generic/rest/streamer_test.go +++ b/pkg/registry/generic/rest/streamer_test.go @@ -24,7 +24,10 @@ import ( "net/http" "net/http/httptest" "net/url" + "reflect" "testing" + + "k8s.io/kubernetes/pkg/api/errors" ) func TestInputStreamReader(t *testing.T) { @@ -116,3 +119,29 @@ func TestInputStreamTransport(t *testing.T) { t.Errorf("Stream content does not match. Got: %s. Expected: %s.", string(result), message) } } + +func fakeInternalServerErrorTransport(mime, message string) http.RoundTripper { + content := fmt.Sprintf("HTTP/1.1 500 \"Internal Server Error\"\nContent-Type: %s\n\n%s", mime, message) + return &testTransport{body: content} +} + +func TestInputStreamInternalServerErrorTransport(t *testing.T) { + message := "Pod is in PodPending" + location, _ := url.Parse("http://www.example.com") + streamer := &LocationStreamer{ + Location: location, + Transport: fakeInternalServerErrorTransport("text/plain", message), + ResponseChecker: NewGenericHttpResponseChecker("", ""), + } + expectedError := errors.NewInternalError(fmt.Errorf("%s", message)) + + _, _, _, err := streamer.InputStream("", "") + if err == nil { + t.Errorf("unexpected non-error") + return + } + + if !reflect.DeepEqual(err, expectedError) { + t.Errorf("StreamInternalServerError does not match. Got: %s. Expected: %s.", err, expectedError) + } +} diff --git a/pkg/registry/pod/etcd/etcd.go b/pkg/registry/pod/etcd/etcd.go index 4779e8aa950..ff72e654c65 100644 --- a/pkg/registry/pod/etcd/etcd.go +++ b/pkg/registry/pod/etcd/etcd.go @@ -241,10 +241,11 @@ func (r *LogREST) Get(ctx api.Context, name string, opts runtime.Object) (runtim return nil, err } return &genericrest.LocationStreamer{ - Location: location, - Transport: transport, - ContentType: "text/plain", - Flush: logOpts.Follow, + Location: location, + Transport: transport, + ContentType: "text/plain", + Flush: logOpts.Follow, + ResponseChecker: genericrest.NewGenericHttpResponseChecker("Pod", name), }, nil }