Merge pull request #36001 from smarterclayton/change_double_decode

Automatic merge from submit-queue

Avoid double decoding all client responses

Fixes #35982 

The linked issue uncovered that we were always double decoding the response in restclient for get, list, update, create, and patch.  That's fairly expensive, most especially for list.  This PR refines the behavior of the rest client to avoid double decoding, and does so while minimizing the changes to rest client consumers.

restclient must be able to deal with multiple types of servers. Alter the behavior of restclient.Result#Raw() to not process the body on error, but instead to return the generic error (which still matches the error checking cases in api/error like IsBadRequest). If the caller uses
.Error(), .Into(), or .Get(), try decoding the body as a Status.

For older servers, continue to default apiVersion "v1" when calling restclient.Result#Error(). This was only for 1.1 servers and the extensions group, which we have since fixed.

This removes a double decode of very large objects (like LIST) - we were trying to DecodeInto status, but that ends up decoding the entire result and then throwing it away.  This makes the decode behavior specific to the type of action the user wants.

```release-note
The error handling behavior of `pkg/client/restclient.Result` has changed.  Calls to `Result.Raw()` will no longer parse the body, although they will still return errors that react to `pkg/api/errors.Is*()` as in previous releases.  Callers of `Get()` and `Into()` will continue to receive errors that are parsed from the body if the kind and apiVersion of the body match the `Status` object.

This more closely aligns rest client as a generic RESTful client, while preserving the special Kube API extended error handling for the `Get` and `Into` methods (which most Kube clients use).
```
This commit is contained in:
Kubernetes Submit Queue 2016-11-02 11:36:41 -07:00 committed by GitHub
commit 7d14b568c3
18 changed files with 278 additions and 56 deletions

View File

@ -50,6 +50,7 @@ func addKnownTypes(scheme *runtime.Scheme) error {
&ClusterInfo{},
&api.ListOptions{},
&api.DeleteOptions{},
&api.ExportOptions{},
)
return nil
}

View File

@ -50,6 +50,7 @@ func addKnownTypes(scheme *runtime.Scheme) error {
&ClusterInfo{},
&v1.ListOptions{},
&v1.DeleteOptions{},
&v1.ExportOptions{},
)
return nil
}

View File

@ -40,6 +40,7 @@ func addKnownTypes(scheme *runtime.Scheme) error {
&ClusterList{},
&v1.ListOptions{},
&v1.DeleteOptions{},
&v1.ExportOptions{},
)
versionedwatch.AddToGroupVersion(scheme, SchemeGroupVersion)
return nil

View File

@ -244,16 +244,38 @@ var nonRoundTrippableTypes = sets.NewString(
"WatchEvent",
)
var commonKinds = []string{"ListOptions", "DeleteOptions"}
var commonKinds = []string{"Status", "ListOptions", "DeleteOptions", "ExportOptions"}
// verify all external group/versions have the common kinds like the ListOptions, DeleteOptions are registered.
// verify all external group/versions have the common kinds registered.
func TestCommonKindsRegistered(t *testing.T) {
for _, kind := range commonKinds {
for _, group := range testapi.Groups {
gv := group.GroupVersion()
if _, err := api.Scheme.New(gv.WithKind(kind)); err != nil {
gvk := gv.WithKind(kind)
obj, err := api.Scheme.New(gvk)
if err != nil {
t.Error(err)
}
defaults := gv.WithKind("")
if _, got, err := api.Codecs.LegacyCodec().Decode([]byte(`{"kind":"`+kind+`"}`), &defaults, nil); err != nil || gvk != *got {
t.Errorf("expected %v: %v %v", gvk, got, err)
}
data, err := runtime.Encode(api.Codecs.LegacyCodec(*gv), obj)
if err != nil {
t.Errorf("expected %v: %v\n%s", gvk, err, string(data))
continue
}
if !bytes.Contains(data, []byte(`"kind":"`+kind+`","apiVersion":"`+gv.String()+`"`)) {
if kind != "Status" {
t.Errorf("expected %v: %v\n%s", gvk, err, string(data))
continue
}
// TODO: this is wrong, but legacy clients expect it
if !bytes.Contains(data, []byte(`"kind":"`+kind+`","apiVersion":"v1"`)) {
t.Errorf("expected %v: %v\n%s", gvk, err, string(data))
continue
}
}
}
}
}

View File

@ -27,6 +27,7 @@ go_library(
"//pkg/apis/apps:go_default_library",
"//pkg/apis/apps/install:go_default_library",
"//pkg/apis/authentication/install:go_default_library",
"//pkg/apis/authorization:go_default_library",
"//pkg/apis/authorization/install:go_default_library",
"//pkg/apis/autoscaling:go_default_library",
"//pkg/apis/autoscaling/install:go_default_library",

View File

@ -37,6 +37,7 @@ import (
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/apimachinery/registered"
"k8s.io/kubernetes/pkg/apis/apps"
"k8s.io/kubernetes/pkg/apis/authorization"
"k8s.io/kubernetes/pkg/apis/autoscaling"
"k8s.io/kubernetes/pkg/apis/batch"
"k8s.io/kubernetes/pkg/apis/certificates"
@ -66,18 +67,19 @@ import (
)
var (
Groups = make(map[string]TestGroup)
Default TestGroup
Autoscaling TestGroup
Batch TestGroup
Extensions TestGroup
Apps TestGroup
Policy TestGroup
Federation TestGroup
Rbac TestGroup
Certificates TestGroup
Storage TestGroup
ImagePolicy TestGroup
Groups = make(map[string]TestGroup)
Default TestGroup
Authorization TestGroup
Autoscaling TestGroup
Batch TestGroup
Extensions TestGroup
Apps TestGroup
Policy TestGroup
Federation TestGroup
Rbac TestGroup
Certificates TestGroup
Storage TestGroup
ImagePolicy TestGroup
serializer runtime.SerializerInfo
storageSerializer runtime.SerializerInfo
@ -258,6 +260,15 @@ func init() {
externalTypes: api.Scheme.KnownTypes(externalGroupVersion),
}
}
if _, ok := Groups[authorization.GroupName]; !ok {
externalGroupVersion := unversioned.GroupVersion{Group: authorization.GroupName, Version: registered.GroupOrDie(authorization.GroupName).GroupVersion.Version}
Groups[authorization.GroupName] = TestGroup{
externalGroupVersion: externalGroupVersion,
internalGroupVersion: authorization.SchemeGroupVersion,
internalTypes: api.Scheme.KnownTypes(authorization.SchemeGroupVersion),
externalTypes: api.Scheme.KnownTypes(externalGroupVersion),
}
}
if _, ok := Groups[kubeadm.GroupName]; !ok {
externalGroupVersion := unversioned.GroupVersion{Group: kubeadm.GroupName, Version: registered.GroupOrDie(kubeadm.GroupName).GroupVersion.Version}
Groups[kubeadm.GroupName] = TestGroup{
@ -279,6 +290,7 @@ func init() {
Rbac = Groups[rbac.GroupName]
Storage = Groups[storage.GroupName]
ImagePolicy = Groups[imagepolicy.GroupName]
Authorization = Groups[authorization.GroupName]
}
func (g TestGroup) ContentConfig() (string, *unversioned.GroupVersion, runtime.Codec) {

View File

@ -41,6 +41,7 @@ func addKnownTypes(scheme *runtime.Scheme) error {
&StatefulSetList{},
&v1.ListOptions{},
&v1.DeleteOptions{},
&v1.ExportOptions{},
)
versionedwatch.AddToGroupVersion(scheme, SchemeGroupVersion)
return nil

View File

@ -39,6 +39,7 @@ func addKnownTypes(scheme *runtime.Scheme) error {
scheme.AddKnownTypes(SchemeGroupVersion,
&v1.ListOptions{},
&v1.DeleteOptions{},
&v1.ExportOptions{},
&SelfSubjectAccessReview{},
&SubjectAccessReview{},

View File

@ -42,6 +42,7 @@ func addKnownTypes(scheme *runtime.Scheme) error {
&Scale{},
&v1.ListOptions{},
&v1.DeleteOptions{},
&v1.ExportOptions{},
)
versionedwatch.AddToGroupVersion(scheme, SchemeGroupVersion)
return nil

View File

@ -41,6 +41,7 @@ func addKnownTypes(scheme *runtime.Scheme) error {
&JobList{},
&v1.ListOptions{},
&v1.DeleteOptions{},
&v1.ExportOptions{},
)
versionedwatch.AddToGroupVersion(scheme, SchemeGroupVersion)
return nil

View File

@ -51,6 +51,7 @@ func addKnownTypes(scheme *runtime.Scheme) error {
&CertificateSigningRequestList{},
&v1.ListOptions{},
&v1.DeleteOptions{},
&v1.ExportOptions{},
)
// Add the watch version that applies

View File

@ -56,6 +56,7 @@ func addKnownTypes(scheme *runtime.Scheme) error {
&IngressList{},
&v1.ListOptions{},
&v1.DeleteOptions{},
&v1.ExportOptions{},
&ReplicaSet{},
&ReplicaSetList{},
&PodSecurityPolicy{},

View File

@ -42,6 +42,7 @@ func addKnownTypes(scheme *runtime.Scheme) error {
&Eviction{},
&v1.ListOptions{},
&v1.DeleteOptions{},
&v1.ExportOptions{},
)
// Add the watch version that applies
versionedwatch.AddToGroupVersion(scheme, SchemeGroupVersion)

View File

@ -79,13 +79,52 @@ func TestDoRequestFailed(t *testing.T) {
testServer := httptest.NewServer(&fakeHandler)
defer testServer.Close()
c, err := restClient(testServer)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
err = c.Get().Do().Error()
if err == nil {
t.Errorf("unexpected non-error")
}
ss, ok := err.(errors.APIStatus)
if !ok {
t.Errorf("unexpected error type %v", err)
}
actual := ss.Status()
if !reflect.DeepEqual(status, &actual) {
t.Errorf("Unexpected mis-match: %s", diff.ObjectReflectDiff(status, &actual))
}
}
func TestDoRawRequestFailed(t *testing.T) {
status := &unversioned.Status{
Code: http.StatusNotFound,
Status: unversioned.StatusFailure,
Reason: unversioned.StatusReasonNotFound,
Message: "the server could not find the requested resource",
Details: &unversioned.StatusDetails{
Causes: []unversioned.StatusCause{
{Type: unversioned.CauseTypeUnexpectedServerResponse, Message: "unknown"},
},
},
}
expectedBody, _ := runtime.Encode(testapi.Default.Codec(), status)
fakeHandler := utiltesting.FakeHandler{
StatusCode: 404,
ResponseBody: string(expectedBody),
T: t,
}
testServer := httptest.NewServer(&fakeHandler)
defer testServer.Close()
c, err := restClient(testServer)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
body, err := c.Get().Do().Raw()
if err == nil || body != nil {
if err == nil || body == nil {
t.Errorf("unexpected non-error: %#v", body)
}
ss, ok := err.(errors.APIStatus)
@ -93,12 +132,8 @@ func TestDoRequestFailed(t *testing.T) {
t.Errorf("unexpected error type %v", err)
}
actual := ss.Status()
expected := *status
// The decoder will apply the default Version and Kind to the Status.
expected.APIVersion = "v1"
expected.Kind = "Status"
if !reflect.DeepEqual(&expected, &actual) {
t.Errorf("Unexpected mis-match: %s", diff.ObjectDiff(status, &actual))
if !reflect.DeepEqual(status, &actual) {
t.Errorf("Unexpected mis-match: %s", diff.ObjectReflectDiff(status, &actual))
}
}

View File

@ -930,33 +930,21 @@ func (r *Request) transformResponse(resp *http.Response, req *http.Request) Resu
}
}
// Did the server give us a status response?
isStatusResponse := false
status := &unversioned.Status{}
// Because release-1.1 server returns Status with empty APIVersion at paths
// to the Extensions resources, we need to use DecodeInto here to provide
// default groupVersion, otherwise a status response won't be correctly
// decoded.
err := runtime.DecodeInto(decoder, body, status)
if err == nil && len(status.Status) > 0 {
isStatusResponse = true
}
switch {
case resp.StatusCode == http.StatusSwitchingProtocols:
// no-op, we've been upgraded
case resp.StatusCode < http.StatusOK || resp.StatusCode > http.StatusPartialContent:
if !isStatusResponse {
return Result{err: r.transformUnstructuredResponseError(resp, req, body)}
// calculate an unstructured error from the response which the Result object may use if the caller
// did not return a structured error.
retryAfter, _ := retryAfterSeconds(resp)
err := r.newUnstructuredResponseError(body, isTextResponse(resp), resp.StatusCode, req.Method, retryAfter)
return Result{
body: body,
contentType: contentType,
statusCode: resp.StatusCode,
decoder: decoder,
err: err,
}
return Result{err: errors.FromObject(status)}
}
// If the server gave us a status back, look at what it was.
success := resp.StatusCode >= http.StatusOK && resp.StatusCode <= http.StatusPartialContent
if isStatusResponse && (status.Status != unversioned.StatusSuccess && !success) {
// "Failed" requests are clearly just an error and it makes sense to return them as such.
return Result{err: errors.FromObject(status)}
}
return Result{
@ -967,6 +955,9 @@ func (r *Request) transformResponse(resp *http.Response, req *http.Request) Resu
}
}
// maxUnstructuredResponseTextBytes is an upper bound on how much output to include in the unstructured error.
const maxUnstructuredResponseTextBytes = 2048
// transformUnstructuredResponseError handles an error from the server that is not in a structured form.
// It is expected to transform any response that is not recognizable as a clear server sent error from the
// K8S API using the information provided with the request. In practice, HTTP proxies and client libraries
@ -987,20 +978,29 @@ func (r *Request) transformResponse(resp *http.Response, req *http.Request) Resu
// TODO: introduce transformation of generic http.Client.Do() errors that separates 4.
func (r *Request) transformUnstructuredResponseError(resp *http.Response, req *http.Request, body []byte) error {
if body == nil && resp.Body != nil {
if data, err := ioutil.ReadAll(resp.Body); err == nil {
if data, err := ioutil.ReadAll(&io.LimitedReader{R: resp.Body, N: maxUnstructuredResponseTextBytes}); err == nil {
body = data
}
}
retryAfter, _ := retryAfterSeconds(resp)
return r.newUnstructuredResponseError(body, isTextResponse(resp), resp.StatusCode, req.Method, retryAfter)
}
// newUnstructuredResponseError instantiates the appropriate generic error for the provided input. It also logs the body.
func (r *Request) newUnstructuredResponseError(body []byte, isTextResponse bool, statusCode int, method string, retryAfter int) error {
// cap the amount of output we create
if len(body) > maxUnstructuredResponseTextBytes {
body = body[:maxUnstructuredResponseTextBytes]
}
glog.V(8).Infof("Response Body: %#v", string(body))
message := "unknown"
if isTextResponse(resp) {
if isTextResponse {
message = strings.TrimSpace(string(body))
}
retryAfter, _ := retryAfterSeconds(resp)
return errors.NewGenericServerResponse(
resp.StatusCode,
req.Method,
statusCode,
method,
unversioned.GroupResource{
Group: r.content.GroupVersion.Group,
Resource: r.resource,
@ -1064,15 +1064,31 @@ func (r Result) Raw() ([]byte, error) {
return r.body, r.err
}
// Get returns the result as an object.
// Get returns the result as an object, which means it passes through the decoder.
// If the returned object is of type Status and has .Status != StatusSuccess, the
// additional information in Status will be used to enrich the error.
func (r Result) Get() (runtime.Object, error) {
if r.err != nil {
return nil, r.err
// Check whether the result has a Status object in the body and prefer that.
return nil, r.Error()
}
if r.decoder == nil {
return nil, fmt.Errorf("serializer for %s doesn't exist", r.contentType)
}
return runtime.Decode(r.decoder, r.body)
// decode, but if the result is Status return that as an error instead.
out, _, err := r.decoder.Decode(r.body, nil, nil)
if err != nil {
return nil, err
}
switch t := out.(type) {
case *unversioned.Status:
// any status besides StatusSuccess is considered an error.
if t.Status != unversioned.StatusSuccess {
return nil, errors.FromObject(t)
}
}
return out, nil
}
// StatusCode returns the HTTP status code of the request. (Only valid if no
@ -1083,14 +1099,31 @@ func (r Result) StatusCode(statusCode *int) Result {
}
// Into stores the result into obj, if possible. If obj is nil it is ignored.
// If the returned object is of type Status and has .Status != StatusSuccess, the
// additional information in Status will be used to enrich the error.
func (r Result) Into(obj runtime.Object) error {
if r.err != nil {
return r.err
// Check whether the result has a Status object in the body and prefer that.
return r.Error()
}
if r.decoder == nil {
return fmt.Errorf("serializer for %s doesn't exist", r.contentType)
}
return runtime.DecodeInto(r.decoder, r.body, obj)
out, _, err := r.decoder.Decode(r.body, nil, obj)
if err != nil || out == obj {
return err
}
// if a different object is returned, see if it is Status and avoid double decoding
// the object.
switch t := out.(type) {
case *unversioned.Status:
// any status besides StatusSuccess is considered an error.
if t.Status != unversioned.StatusSuccess {
return errors.FromObject(t)
}
}
return nil
}
// WasCreated updates the provided bool pointer to whether the server returned
@ -1101,7 +1134,29 @@ func (r Result) WasCreated(wasCreated *bool) Result {
}
// Error returns the error executing the request, nil if no error occurred.
// If the returned object is of type Status and has Status != StatusSuccess, the
// additional information in Status will be used to enrich the error.
// See the Request.Do() comment for what errors you might get.
func (r Result) Error() error {
// if we have received an unexpected server error, and we have a body and decoder, we can try to extract
// a Status object.
if r.err == nil || !errors.IsUnexpectedServerError(r.err) || len(r.body) == 0 || r.decoder == nil {
return r.err
}
// attempt to convert the body into a Status object
// to be backwards compatible with old servers that do not return a version, default to "v1"
out, _, err := r.decoder.Decode(r.body, &unversioned.GroupVersionKind{Version: "v1"}, nil)
if err != nil {
glog.V(5).Infof("body was not decodable (unable to check for Status): %v", err)
return r.err
}
switch t := out.(type) {
case *unversioned.Status:
// because we default the kind, we *must* check for StatusFailure
if t.Status == unversioned.StatusFailure {
return errors.FromObject(t)
}
}
return r.err
}

View File

@ -41,6 +41,7 @@ import (
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/runtime/serializer/streaming"
"k8s.io/kubernetes/pkg/util/clock"
"k8s.io/kubernetes/pkg/util/diff"
"k8s.io/kubernetes/pkg/util/flowcontrol"
"k8s.io/kubernetes/pkg/util/httpstream"
"k8s.io/kubernetes/pkg/util/intstr"
@ -582,7 +583,8 @@ func TestTransformUnstructuredError(t *testing.T) {
Resource string
Name string
ErrFn func(error) bool
ErrFn func(error) bool
Transformed error
}{
{
Resource: "foo",
@ -626,9 +628,46 @@ func TestTransformUnstructuredError(t *testing.T) {
},
ErrFn: apierrors.IsBadRequest,
},
{
// status in response overrides transformed result
Req: &http.Request{},
Res: &http.Response{StatusCode: http.StatusBadRequest, Body: ioutil.NopCloser(bytes.NewReader([]byte(`{"kind":"Status","apiVersion":"v1","status":"Failure","code":404}`)))},
ErrFn: apierrors.IsBadRequest,
Transformed: &apierrors.StatusError{
ErrStatus: unversioned.Status{Status: unversioned.StatusFailure, Code: http.StatusNotFound},
},
},
{
// successful status is ignored
Req: &http.Request{},
Res: &http.Response{StatusCode: http.StatusBadRequest, Body: ioutil.NopCloser(bytes.NewReader([]byte(`{"kind":"Status","apiVersion":"v1","status":"Success","code":404}`)))},
ErrFn: apierrors.IsBadRequest,
},
{
// empty object does not change result
Req: &http.Request{},
Res: &http.Response{StatusCode: http.StatusBadRequest, Body: ioutil.NopCloser(bytes.NewReader([]byte(`{}`)))},
ErrFn: apierrors.IsBadRequest,
},
{
// we default apiVersion for backwards compatibility with old clients
// TODO: potentially remove in 1.7
Req: &http.Request{},
Res: &http.Response{StatusCode: http.StatusBadRequest, Body: ioutil.NopCloser(bytes.NewReader([]byte(`{"kind":"Status","status":"Failure","code":404}`)))},
ErrFn: apierrors.IsBadRequest,
Transformed: &apierrors.StatusError{
ErrStatus: unversioned.Status{Status: unversioned.StatusFailure, Code: http.StatusNotFound},
},
},
{
// we do not default kind
Req: &http.Request{},
Res: &http.Response{StatusCode: http.StatusBadRequest, Body: ioutil.NopCloser(bytes.NewReader([]byte(`{"status":"Failure","code":404}`)))},
ErrFn: apierrors.IsBadRequest,
},
}
for _, testCase := range testCases {
for i, testCase := range testCases {
r := &Request{
content: defaultContentConfig(),
serializers: defaultSerializers(),
@ -641,12 +680,40 @@ func TestTransformUnstructuredError(t *testing.T) {
t.Errorf("unexpected error: %v", err)
continue
}
if !apierrors.IsUnexpectedServerError(err) {
t.Errorf("%d: unexpected error type: %v", i, err)
}
if len(testCase.Name) != 0 && !strings.Contains(err.Error(), testCase.Name) {
t.Errorf("unexpected error string: %s", err)
}
if len(testCase.Resource) != 0 && !strings.Contains(err.Error(), testCase.Resource) {
t.Errorf("unexpected error string: %s", err)
}
// verify Error() properly transforms the error
transformed := result.Error()
expect := testCase.Transformed
if expect == nil {
expect = err
}
if !reflect.DeepEqual(expect, transformed) {
t.Errorf("%d: unexpected Error(): %s", i, diff.ObjectReflectDiff(expect, transformed))
}
// verify result.Get properly transforms the error
if _, err := result.Get(); !reflect.DeepEqual(expect, err) {
t.Errorf("%d: unexpected error on Get(): %s", i, diff.ObjectReflectDiff(expect, err))
}
// verify result.Into properly handles the error
if err := result.Into(&api.Pod{}); !reflect.DeepEqual(expect, err) {
t.Errorf("%d: unexpected error on Into(): %s", i, diff.ObjectReflectDiff(expect, err))
}
// verify result.Raw leaves the error in the untransformed state
if _, err := result.Raw(); !reflect.DeepEqual(result.err, err) {
t.Errorf("%d: unexpected error on Raw(): %s", i, diff.ObjectReflectDiff(expect, err))
}
}
}

View File

@ -519,6 +519,15 @@ func (s *Scheme) convertToVersion(copy bool, in Object, target GroupVersioner) (
gvk, ok := target.KindForGroupVersionKinds(kinds)
if !ok {
// try to see if this type is listed as unversioned (for legacy support)
// TODO: when we move to server API versions, we should completely remove the unversioned concept
if unversionedKind, ok := s.unversionedTypes[t]; ok {
if gvk, ok := target.KindForGroupVersionKinds([]unversioned.GroupVersionKind{unversionedKind}); ok {
return copyAndSetTargetKind(copy, s, in, gvk)
}
return copyAndSetTargetKind(copy, s, in, unversionedKind)
}
// TODO: should this be a typed error?
return nil, fmt.Errorf("%v is not suitable for converting to %q", t, target)
}

View File

@ -625,6 +625,17 @@ func TestConvertToVersion(t *testing.T) {
A: "test",
},
},
// unversioned type returned when not included in the target types
{
scheme: GetTestScheme(),
in: &UnversionedType{A: "test"},
gv: unversioned.GroupVersions{{Group: "other", Version: "v2"}},
same: true,
out: &UnversionedType{
MyWeirdCustomEmbeddedVersionKindField: MyWeirdCustomEmbeddedVersionKindField{APIVersion: "v1", ObjectKind: "UnversionedType"},
A: "test",
},
},
// detected as already being in the target version
{
scheme: GetTestScheme(),