From 125ef6fbc8cc70477071936cd93d064d71c773e6 Mon Sep 17 00:00:00 2001 From: Clayton Coleman Date: Mon, 21 Dec 2015 00:15:35 -0500 Subject: [PATCH] Support content-type negotiation in the API server A NegotiatedSerializer is passed into the API installer (and ParameterCodec, which abstracts conversion of query params) that can be used to negotiate client/server request/response serialization. All error paths are now negotiation aware, and are at least minimally version aware. Watch is specially coded to only allow application/json - a follow up change will convert it to use negotiation. Ensure the swagger scheme will include supported serializations - this now includes application/yaml as a negotiated option. --- cmd/kube-apiserver/app/server.go | 1 - pkg/apiserver/api_installer.go | 54 +-- pkg/apiserver/apiserver.go | 166 ++++------ pkg/apiserver/apiserver_test.go | 223 +++++++++---- pkg/apiserver/errors.go | 39 +++ pkg/apiserver/negotiate.go | 116 +++++++ pkg/apiserver/negotiate_test.go | 252 ++++++++++++++ pkg/apiserver/proxy.go | 34 +- pkg/apiserver/resthandler.go | 309 ++++++++++-------- pkg/apiserver/resthandler_test.go | 3 +- pkg/apiserver/watch.go | 30 +- pkg/apiserver/watch_test.go | 28 ++ pkg/genericapiserver/genericapiserver.go | 13 + pkg/genericapiserver/genericapiserver_test.go | 1 + test/integration/master_test.go | 70 ++++ 15 files changed, 972 insertions(+), 367 deletions(-) create mode 100644 pkg/apiserver/negotiate.go create mode 100644 pkg/apiserver/negotiate_test.go diff --git a/cmd/kube-apiserver/app/server.go b/cmd/kube-apiserver/app/server.go index eb847e5ff75..ed55faaa6eb 100644 --- a/cmd/kube-apiserver/app/server.go +++ b/cmd/kube-apiserver/app/server.go @@ -34,7 +34,6 @@ import ( "k8s.io/kubernetes/cmd/kube-apiserver/app/options" "k8s.io/kubernetes/pkg/admission" "k8s.io/kubernetes/pkg/api" - "k8s.io/kubernetes/pkg/api/meta" "k8s.io/kubernetes/pkg/api/unversioned" apiutil "k8s.io/kubernetes/pkg/api/util" "k8s.io/kubernetes/pkg/apimachinery/registered" diff --git a/pkg/apiserver/api_installer.go b/pkg/apiserver/api_installer.go index 10126e5f691..819cec6a198 100644 --- a/pkg/apiserver/api_installer.go +++ b/pkg/apiserver/api_installer.go @@ -65,7 +65,13 @@ var errEmptyName = errors.NewBadRequest("name must be provided") func (a *APIInstaller) Install(ws *restful.WebService) (apiResources []unversioned.APIResource, errors []error) { errors = make([]error, 0) - proxyHandler := (&ProxyHandler{a.prefix + "/proxy/", a.group.Storage, a.group.Codec, a.group.Context, a.info}) + proxyHandler := (&ProxyHandler{ + prefix: a.prefix + "/proxy/", + storage: a.group.Storage, + serializer: a.group.Serializer, + context: a.group.Context, + requestInfoResolver: a.info, + }) // Register the paths in a deterministic (sorted) order to get a deterministic swagger spec. paths := make([]string, len(a.group.Storage)) @@ -93,9 +99,11 @@ func (a *APIInstaller) NewWebService() *restful.WebService { ws.Path(a.prefix) // a.prefix contains "prefix/group/version" ws.Doc("API at " + a.prefix) - // TODO: change to restful.MIME_JSON when we set content type in client + // Backwards compatibilty, we accepted objects with empty content-type at V1. + // If we stop using go-restful, we can default empty content-type to application/json on an + // endpoint by endpoint basis ws.Consumes("*/*") - ws.Produces(restful.MIME_JSON) + ws.Produces(a.group.Serializer.SupportedMediaTypes()...) ws.ApiVersion(a.group.GroupVersion.String()) return ws @@ -262,19 +270,14 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag getOptions runtime.Object versionedGetOptions runtime.Object getOptionsInternalKind unversioned.GroupVersionKind - getOptionsExternalKind unversioned.GroupVersionKind getSubpath bool - getSubpathKey string ) if isGetterWithOptions { - getOptions, getSubpath, getSubpathKey = getterWithOptions.NewGetOptions() + getOptions, getSubpath, _ = getterWithOptions.NewGetOptions() getOptionsInternalKind, err = a.group.Typer.ObjectKind(getOptions) if err != nil { return nil, err } - // TODO this should be a list of all the different external versions we can coerce into the internalKind - getOptionsExternalKind = optionsExternalVersion.WithKind(getOptionsInternalKind.Kind) - versionedGetOptions, err = a.group.Creater.New(optionsExternalVersion.WithKind(getOptionsInternalKind.Kind)) if err != nil { return nil, err @@ -286,19 +289,15 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag connectOptions runtime.Object versionedConnectOptions runtime.Object connectOptionsInternalKind unversioned.GroupVersionKind - connectOptionsExternalKind unversioned.GroupVersionKind connectSubpath bool - connectSubpathKey string ) if isConnecter { - connectOptions, connectSubpath, connectSubpathKey = connecter.NewConnectOptions() + connectOptions, connectSubpath, _ = connecter.NewConnectOptions() if connectOptions != nil { connectOptionsInternalKind, err = a.group.Typer.ObjectKind(connectOptions) if err != nil { return nil, err } - // TODO this should be a list of all the different external versions we can coerce into the internalKind - connectOptionsExternalKind = optionsExternalVersion.WithKind(connectOptionsInternalKind.Kind) versionedConnectOptions, err = a.group.Creater.New(optionsExternalVersion.WithKind(connectOptionsInternalKind.Kind)) } @@ -434,10 +433,11 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag // test/integration/auth_test.go is currently the most comprehensive status code test reqScope := RequestScope{ - ContextFunc: ctxFn, - Creater: a.group.Creater, - Convertor: a.group.Convertor, - Codec: mapping.Codec, + ContextFunc: ctxFn, + Serializer: a.group.Serializer, + ParameterCodec: a.group.ParameterCodec, + Creater: a.group.Creater, + Convertor: a.group.Convertor, Resource: a.group.GroupVersion.WithResource(resource), Subresource: subresource, @@ -454,7 +454,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag case "GET": // Get a resource. var handler restful.RouteFunction if isGetterWithOptions { - handler = GetResourceWithOptions(getterWithOptions, exporter, reqScope, getOptionsInternalKind, getOptionsExternalKind, getSubpath, getSubpathKey) + handler = GetResourceWithOptions(getterWithOptions, reqScope) } else { handler = GetResource(getter, exporter, reqScope) } @@ -467,7 +467,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag Doc(doc). Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")). Operation("read"+namespaced+kind+strings.Title(subresource)). - Produces(append(storageMeta.ProducesMIMETypes(action.Verb), "application/json")...). + Produces(append(storageMeta.ProducesMIMETypes(action.Verb), a.group.Serializer.SupportedMediaTypes()...)...). Returns(http.StatusOK, "OK", versionedObject). Writes(versionedObject) if isGetterWithOptions { @@ -492,7 +492,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag Doc(doc). Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")). Operation("list"+namespaced+kind+strings.Title(subresource)). - Produces("application/json"). + Produces(append(storageMeta.ProducesMIMETypes(action.Verb), a.group.Serializer.SupportedMediaTypes()...)...). Returns(http.StatusOK, "OK", versionedList). Writes(versionedList) if err := addObjectParams(ws, route, versionedListOptions); err != nil { @@ -524,7 +524,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag Doc(doc). Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")). Operation("replace"+namespaced+kind+strings.Title(subresource)). - Produces(append(storageMeta.ProducesMIMETypes(action.Verb), "application/json")...). + Produces(append(storageMeta.ProducesMIMETypes(action.Verb), a.group.Serializer.SupportedMediaTypes()...)...). Returns(http.StatusOK, "OK", versionedObject). Reads(versionedObject). Writes(versionedObject) @@ -541,7 +541,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")). Consumes(string(api.JSONPatchType), string(api.MergePatchType), string(api.StrategicMergePatchType)). Operation("patch"+namespaced+kind+strings.Title(subresource)). - Produces(append(storageMeta.ProducesMIMETypes(action.Verb), "application/json")...). + Produces(append(storageMeta.ProducesMIMETypes(action.Verb), a.group.Serializer.SupportedMediaTypes()...)...). Returns(http.StatusOK, "OK", versionedObject). Reads(unversioned.Patch{}). Writes(versionedObject) @@ -563,7 +563,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag Doc(doc). Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")). Operation("create"+namespaced+kind+strings.Title(subresource)). - Produces(append(storageMeta.ProducesMIMETypes(action.Verb), "application/json")...). + Produces(append(storageMeta.ProducesMIMETypes(action.Verb), a.group.Serializer.SupportedMediaTypes()...)...). Returns(http.StatusOK, "OK", versionedObject). Reads(versionedObject). Writes(versionedObject) @@ -579,7 +579,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag Doc(doc). Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")). Operation("delete"+namespaced+kind+strings.Title(subresource)). - Produces(append(storageMeta.ProducesMIMETypes(action.Verb), "application/json")...). + Produces(append(storageMeta.ProducesMIMETypes(action.Verb), a.group.Serializer.SupportedMediaTypes()...)...). Writes(versionedStatus). Returns(http.StatusOK, "OK", versionedStatus) if isGracefulDeleter { @@ -597,7 +597,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag Doc(doc). Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")). Operation("deletecollection"+namespaced+kind+strings.Title(subresource)). - Produces(append(storageMeta.ProducesMIMETypes(action.Verb), "application/json")...). + Produces(append(storageMeta.ProducesMIMETypes(action.Verb), a.group.Serializer.SupportedMediaTypes()...)...). Writes(versionedStatus). Returns(http.StatusOK, "OK", versionedStatus) if err := addObjectParams(ws, route, versionedListOptions); err != nil { @@ -658,7 +658,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag doc = "connect " + method + " requests to " + subresource + " of " + kind } route := ws.Method(method).Path(action.Path). - To(ConnectResource(connecter, reqScope, admit, connectOptionsInternalKind, connectOptionsExternalKind, path, connectSubpath, connectSubpathKey)). + To(ConnectResource(connecter, reqScope, admit, path)). Filter(m). Doc(doc). Operation("connect" + strings.Title(strings.ToLower(method)) + namespaced + kind + strings.Title(subresource)). diff --git a/pkg/apiserver/apiserver.go b/pkg/apiserver/apiserver.go index 2a3143f37cc..8f3c7aa898b 100644 --- a/pkg/apiserver/apiserver.go +++ b/pkg/apiserver/apiserver.go @@ -26,7 +26,6 @@ import ( "net/http" "path" rt "runtime" - "strconv" "strings" "time" @@ -36,7 +35,6 @@ import ( "k8s.io/kubernetes/pkg/api/meta" "k8s.io/kubernetes/pkg/api/rest" "k8s.io/kubernetes/pkg/api/unversioned" - "k8s.io/kubernetes/pkg/apimachinery/registered" "k8s.io/kubernetes/pkg/apiserver/metrics" "k8s.io/kubernetes/pkg/healthz" "k8s.io/kubernetes/pkg/runtime" @@ -97,7 +95,9 @@ type APIGroupVersion struct { Mapper meta.RESTMapper - Codec runtime.Codec + Serializer runtime.NegotiatedSerializer + ParameterCodec runtime.ParameterCodec + Typer runtime.ObjectTyper Creater runtime.ObjectCreater Convertor runtime.ObjectConvertor @@ -126,7 +126,7 @@ func (g *APIGroupVersion) InstallREST(container *restful.Container) error { installer := g.newInstaller() ws := installer.NewWebService() apiResources, registrationErrors := installer.Install(ws) - AddSupportedResourcesWebService(ws, g.GroupVersion, apiResources) + AddSupportedResourcesWebService(g.Serializer, ws, g.GroupVersion, apiResources) container.Add(ws) return utilerrors.NewAggregate(registrationErrors) } @@ -150,7 +150,7 @@ func (g *APIGroupVersion) UpdateREST(container *restful.Container) error { return apierrors.NewInternalError(fmt.Errorf("unable to find an existing webservice for prefix %s", installer.prefix)) } apiResources, registrationErrors := installer.Install(ws) - AddSupportedResourcesWebService(ws, g.GroupVersion, apiResources) + AddSupportedResourcesWebService(g.Serializer, ws, g.GroupVersion, apiResources) return utilerrors.NewAggregate(registrationErrors) } @@ -194,12 +194,15 @@ func InstallLogsSupport(mux Mux) { mux.Handle("/logs/", http.StripPrefix("/logs/", http.FileServer(http.Dir("/var/log/")))) } -func InstallRecoverHandler(container *restful.Container) { - container.RecoverHandler(logStackOnRecover) +// TODO: needs to perform response type negotiation, this is probably the wrong way to recover panics +func InstallRecoverHandler(s runtime.NegotiatedSerializer, container *restful.Container) { + container.RecoverHandler(func(panicReason interface{}, httpWriter http.ResponseWriter) { + logStackOnRecover(s, panicReason, httpWriter) + }) } //TODO: Unify with RecoverPanics? -func logStackOnRecover(panicReason interface{}, httpWriter http.ResponseWriter) { +func logStackOnRecover(s runtime.NegotiatedSerializer, panicReason interface{}, w http.ResponseWriter) { var buffer bytes.Buffer buffer.WriteString(fmt.Sprintf("recover from panic situation: - %v\r\n", panicReason)) for i := 2; ; i += 1 { @@ -211,125 +214,110 @@ func logStackOnRecover(panicReason interface{}, httpWriter http.ResponseWriter) } glog.Errorln(buffer.String()) - // TODO: make status unversioned or plumb enough of the request to deduce the requested API version - errorJSON(apierrors.NewGenericServerResponse(http.StatusInternalServerError, "", api.Resource(""), "", "", 0, false), registered.GroupOrDie(api.GroupName).Codec, httpWriter) + headers := http.Header{} + if ct := w.Header().Get("Content-Type"); len(ct) > 0 { + headers.Set("Accept", ct) + } + errorNegotiated(apierrors.NewGenericServerResponse(http.StatusInternalServerError, "", api.Resource(""), "", "", 0, false), s, unversioned.GroupVersion{}, w, &http.Request{Header: headers}) } -func InstallServiceErrorHandler(container *restful.Container, requestResolver *RequestInfoResolver, apiVersions []string) { +func InstallServiceErrorHandler(s runtime.NegotiatedSerializer, container *restful.Container, requestResolver *RequestInfoResolver, apiVersions []string) { container.ServiceErrorHandler(func(serviceErr restful.ServiceError, request *restful.Request, response *restful.Response) { - serviceErrorHandler(requestResolver, apiVersions, serviceErr, request, response) + serviceErrorHandler(s, requestResolver, apiVersions, serviceErr, request, response) }) } -func serviceErrorHandler(requestResolver *RequestInfoResolver, apiVersions []string, serviceErr restful.ServiceError, request *restful.Request, response *restful.Response) { - requestInfo, err := requestResolver.GetRequestInfo(request.Request) - codec := registered.GroupOrDie(api.GroupName).Codec - if err == nil && requestInfo.APIVersion != "" { - // check if the api version is valid. - for _, version := range apiVersions { - if requestInfo.APIVersion == version { - // valid api version. - codec = runtime.CodecFor(api.Scheme, unversioned.GroupVersion{Group: requestInfo.APIGroup, Version: requestInfo.APIVersion}) - break - } - } - } - - errorJSON(apierrors.NewGenericServerResponse(serviceErr.Code, "", api.Resource(""), "", "", 0, false), codec, response.ResponseWriter) +func serviceErrorHandler(s runtime.NegotiatedSerializer, requestResolver *RequestInfoResolver, apiVersions []string, serviceErr restful.ServiceError, request *restful.Request, response *restful.Response) { + errorNegotiated(apierrors.NewGenericServerResponse(serviceErr.Code, "", api.Resource(""), "", "", 0, false), s, unversioned.GroupVersion{}, response.ResponseWriter, request.Request) } // Adds a service to return the supported api versions at the legacy /api. -func AddApiWebService(container *restful.Container, apiPrefix string, versions []string) { +func AddApiWebService(s runtime.NegotiatedSerializer, container *restful.Container, apiPrefix string, versions []string) { // TODO: InstallREST should register each version automatically - versionHandler := APIVersionHandler(versions[:]...) + versionHandler := APIVersionHandler(s, versions[:]...) ws := new(restful.WebService) ws.Path(apiPrefix) ws.Doc("get available API versions") ws.Route(ws.GET("/").To(versionHandler). Doc("get available API versions"). Operation("getAPIVersions"). - Produces(restful.MIME_JSON). - Consumes(restful.MIME_JSON)) + Produces(s.SupportedMediaTypes()...). + Consumes(s.SupportedMediaTypes()...)) container.Add(ws) } // Adds a service to return the supported api versions at /apis. -func AddApisWebService(container *restful.Container, apiPrefix string, f func() []unversioned.APIGroup) { - rootAPIHandler := RootAPIHandler(f) +func AddApisWebService(s runtime.NegotiatedSerializer, container *restful.Container, apiPrefix string, f func() []unversioned.APIGroup) { + rootAPIHandler := RootAPIHandler(s, f) ws := new(restful.WebService) ws.Path(apiPrefix) ws.Doc("get available API versions") ws.Route(ws.GET("/").To(rootAPIHandler). Doc("get available API versions"). Operation("getAPIVersions"). - Produces(restful.MIME_JSON). - Consumes(restful.MIME_JSON)) + Produces(s.SupportedMediaTypes()...). + Consumes(s.SupportedMediaTypes()...)) container.Add(ws) } // Adds a service to return the supported versions, preferred version, and name // of a group. E.g., a such web service will be registered at /apis/extensions. -func AddGroupWebService(container *restful.Container, path string, group unversioned.APIGroup) { - groupHandler := GroupHandler(group) +func AddGroupWebService(s runtime.NegotiatedSerializer, container *restful.Container, path string, group unversioned.APIGroup) { + groupHandler := GroupHandler(s, group) ws := new(restful.WebService) ws.Path(path) ws.Doc("get information of a group") ws.Route(ws.GET("/").To(groupHandler). Doc("get information of a group"). Operation("getAPIGroup"). - Produces(restful.MIME_JSON). - Consumes(restful.MIME_JSON)) + Produces(s.SupportedMediaTypes()...). + Consumes(s.SupportedMediaTypes()...)) container.Add(ws) } // Adds a service to return the supported resources, E.g., a such web service // will be registered at /apis/extensions/v1. -func AddSupportedResourcesWebService(ws *restful.WebService, groupVersion unversioned.GroupVersion, apiResources []unversioned.APIResource) { - resourceHandler := SupportedResourcesHandler(groupVersion, apiResources) +func AddSupportedResourcesWebService(s runtime.NegotiatedSerializer, ws *restful.WebService, groupVersion unversioned.GroupVersion, apiResources []unversioned.APIResource) { + resourceHandler := SupportedResourcesHandler(s, groupVersion, apiResources) ws.Route(ws.GET("/").To(resourceHandler). Doc("get available resources"). Operation("getAPIResources"). - Produces(restful.MIME_JSON). - Consumes(restful.MIME_JSON)) + Produces(s.SupportedMediaTypes()...). + Consumes(s.SupportedMediaTypes()...)) } // handleVersion writes the server's version information. func handleVersion(req *restful.Request, resp *restful.Response) { - // TODO: use restful's Response methods writeRawJSON(http.StatusOK, version.Get(), resp.ResponseWriter) } // APIVersionHandler returns a handler which will list the provided versions as available. -func APIVersionHandler(versions ...string) restful.RouteFunction { +func APIVersionHandler(s runtime.NegotiatedSerializer, versions ...string) restful.RouteFunction { return func(req *restful.Request, resp *restful.Response) { - // TODO: use restful's Response methods - writeJSON(http.StatusOK, api.Codec, &unversioned.APIVersions{Versions: versions}, resp.ResponseWriter, true) + writeNegotiated(s, unversioned.GroupVersion{}, resp.ResponseWriter, req.Request, http.StatusOK, &unversioned.APIVersions{Versions: versions}) } } // RootAPIHandler returns a handler which will list the provided groups and versions as available. -func RootAPIHandler(f func() []unversioned.APIGroup) restful.RouteFunction { +func RootAPIHandler(s runtime.NegotiatedSerializer, f func() []unversioned.APIGroup) restful.RouteFunction { return func(req *restful.Request, resp *restful.Response) { - // TODO: use restful's Response methods - writeJSON(http.StatusOK, api.Codec, &unversioned.APIGroupList{Groups: f()}, resp.ResponseWriter, true) + writeNegotiated(s, unversioned.GroupVersion{}, resp.ResponseWriter, req.Request, http.StatusOK, &unversioned.APIGroupList{Groups: f()}) } } // GroupHandler returns a handler which will return the api.GroupAndVersion of // the group. -func GroupHandler(group unversioned.APIGroup) restful.RouteFunction { +func GroupHandler(s runtime.NegotiatedSerializer, group unversioned.APIGroup) restful.RouteFunction { return func(req *restful.Request, resp *restful.Response) { - // TODO: use restful's Response methods - writeJSON(http.StatusOK, api.Codec, &group, resp.ResponseWriter, true) + writeNegotiated(s, unversioned.GroupVersion{}, resp.ResponseWriter, req.Request, http.StatusOK, &group) } } // SupportedResourcesHandler returns a handler which will list the provided resources as available. -func SupportedResourcesHandler(groupVersion unversioned.GroupVersion, apiResources []unversioned.APIResource) restful.RouteFunction { +func SupportedResourcesHandler(s runtime.NegotiatedSerializer, groupVersion unversioned.GroupVersion, apiResources []unversioned.APIResource) restful.RouteFunction { return func(req *restful.Request, resp *restful.Response) { - // TODO: use restful's Response methods - writeJSON(http.StatusOK, api.Codec, &unversioned.APIResourceList{GroupVersion: groupVersion.String(), APIResources: apiResources}, resp.ResponseWriter, true) + writeNegotiated(s, unversioned.GroupVersion{}, resp.ResponseWriter, req.Request, http.StatusOK, &unversioned.APIResourceList{GroupVersion: groupVersion.String(), APIResources: apiResources}) } } @@ -338,11 +326,11 @@ func SupportedResourcesHandler(groupVersion unversioned.GroupVersion, apiResourc // response. The Accept header and current API version will be passed in, and the output will be copied // directly to the response body. If content type is returned it is used, otherwise the content type will // be "application/octet-stream". All other objects are sent to standard JSON serialization. -func write(statusCode int, groupVersion unversioned.GroupVersion, codec runtime.Codec, object runtime.Object, w http.ResponseWriter, req *http.Request) { +func write(statusCode int, gv unversioned.GroupVersion, s runtime.NegotiatedSerializer, object runtime.Object, w http.ResponseWriter, req *http.Request) { if stream, ok := object.(rest.ResourceStreamer); ok { - out, flush, contentType, err := stream.InputStream(groupVersion.String(), req.Header.Get("Accept")) + out, flush, contentType, err := stream.InputStream(gv.String(), req.Header.Get("Accept")) if err != nil { - errorJSONFatal(err, codec, w) + errorNegotiated(err, s, gv, w, req) return } if out == nil { @@ -372,64 +360,38 @@ func write(statusCode int, groupVersion unversioned.GroupVersion, codec runtime. io.Copy(writer, out) return } - writeJSON(statusCode, codec, object, w, isPrettyPrint(req)) + writeNegotiated(s, gv, w, req, statusCode, object) } -func isPrettyPrint(req *http.Request) bool { - pp := req.URL.Query().Get("pretty") - if len(pp) > 0 { - pretty, _ := strconv.ParseBool(pp) - return pretty +// writeNegotiated renders an object in the content type negotiated by the client +func writeNegotiated(s runtime.NegotiatedSerializer, gv unversioned.GroupVersion, w http.ResponseWriter, req *http.Request, statusCode int, object runtime.Object) { + serializer, contentType, err := negotiateOutputSerializer(req, s) + if err != nil { + status := errToAPIStatus(err) + writeRawJSON(int(status.Code), status, w) + return } - userAgent := req.UserAgent() - // This covers basic all browers and cli http tools - if strings.HasPrefix(userAgent, "curl") || strings.HasPrefix(userAgent, "Wget") || strings.HasPrefix(userAgent, "Mozilla/5.0") { - return true - } - return false -} -// writeJSON renders an object as JSON to the response. -func writeJSON(statusCode int, codec runtime.Codec, object runtime.Object, w http.ResponseWriter, pretty bool) { - w.Header().Set("Content-Type", "application/json") - // We send the status code before we encode the object, so if we error, the status code stays but there will - // still be an error object. This seems ok, the alternative is to validate the object before - // encoding, but this really should never happen, so it's wasted compute for every API request. + w.Header().Set("Content-Type", contentType) w.WriteHeader(statusCode) - if pretty { - prettyJSON(codec, object, w) - return - } - err := codec.EncodeToStream(object, w) - if err != nil { - errorJSONFatal(err, codec, w) + + encoder := s.EncoderForVersion(serializer, gv) + if err := encoder.EncodeToStream(object, w); err != nil { + errorJSONFatal(err, encoder, w) } } -func prettyJSON(codec runtime.Codec, object runtime.Object, w http.ResponseWriter) { - formatted := &bytes.Buffer{} - output, err := runtime.Encode(codec, object) - if err != nil { - errorJSONFatal(err, codec, w) - } - if err := json.Indent(formatted, output, "", " "); err != nil { - errorJSONFatal(err, codec, w) - return - } - w.Write(formatted.Bytes()) -} - -// errorJSON renders an error to the response. Returns the HTTP status code of the error. -func errorJSON(err error, codec runtime.Codec, w http.ResponseWriter) int { +// errorNegotiated renders an error to the response. Returns the HTTP status code of the error. +func errorNegotiated(err error, s runtime.NegotiatedSerializer, gv unversioned.GroupVersion, w http.ResponseWriter, req *http.Request) int { status := errToAPIStatus(err) code := int(status.Code) - writeJSON(code, codec, status, w, true) + writeNegotiated(s, gv, w, req, code, status) return code } // errorJSONFatal renders an error to the response, and if codec fails will render plaintext. // Returns the HTTP status code of the error. -func errorJSONFatal(err error, codec runtime.Codec, w http.ResponseWriter) int { +func errorJSONFatal(err error, codec runtime.Encoder, w http.ResponseWriter) int { util.HandleError(fmt.Errorf("apiserver was unable to write a JSON response: %v", err)) status := errToAPIStatus(err) code := int(status.Code) diff --git a/pkg/apiserver/apiserver_test.go b/pkg/apiserver/apiserver_test.go index 503c6dc0253..2a9bff8302c 100644 --- a/pkg/apiserver/apiserver_test.go +++ b/pkg/apiserver/apiserver_test.go @@ -35,6 +35,7 @@ import ( "k8s.io/kubernetes/pkg/admission" "k8s.io/kubernetes/pkg/api" apierrs "k8s.io/kubernetes/pkg/api/errors" + "k8s.io/kubernetes/pkg/api/latest" "k8s.io/kubernetes/pkg/api/meta" "k8s.io/kubernetes/pkg/api/rest" "k8s.io/kubernetes/pkg/api/unversioned" @@ -58,18 +59,21 @@ func convert(obj runtime.Object) (runtime.Object, error) { // This creates fake API versions, similar to api/latest.go. var testAPIGroup = "test.group" -var testInternalGroupVersion = unversioned.GroupVersion{Group: testAPIGroup, Version: ""} +var testInternalGroupVersion = unversioned.GroupVersion{Group: testAPIGroup, Version: runtime.APIVersionInternal} var testGroupVersion = unversioned.GroupVersion{Group: testAPIGroup, Version: "version"} var newGroupVersion = unversioned.GroupVersion{Group: testAPIGroup, Version: "version2"} var prefix = "apis" var grouplessGroupVersion = unversioned.GroupVersion{Group: "", Version: "v1"} +var grouplessInternalGroupVersion = unversioned.GroupVersion{Group: "", Version: runtime.APIVersionInternal} var grouplessPrefix = "api" -var grouplessCodec = runtime.CodecFor(api.Scheme, grouplessGroupVersion) var groupVersions = []unversioned.GroupVersion{grouplessGroupVersion, testGroupVersion, newGroupVersion} -var codec = runtime.CodecFor(api.Scheme, testGroupVersion) -var newCodec = runtime.CodecFor(api.Scheme, newGroupVersion) + +var codec = latest.Codecs.LegacyCodec(groupVersions...) +var grouplessCodec = latest.Codecs.LegacyCodec(grouplessGroupVersion) +var testCodec = latest.Codecs.LegacyCodec(testGroupVersion) +var newCodec = latest.Codecs.LegacyCodec(newGroupVersion) var accessor = meta.NewAccessor() var versioner runtime.ResourceVersioner = accessor @@ -82,19 +86,16 @@ func interfacesFor(version unversioned.GroupVersion) (*meta.VersionInterfaces, e switch version { case testGroupVersion: return &meta.VersionInterfaces{ - Codec: codec, ObjectConvertor: api.Scheme, MetadataAccessor: accessor, }, nil case newGroupVersion: return &meta.VersionInterfaces{ - Codec: newCodec, ObjectConvertor: api.Scheme, MetadataAccessor: accessor, }, nil case grouplessGroupVersion: return &meta.VersionInterfaces{ - Codec: grouplessCodec, ObjectConvertor: api.Scheme, MetadataAccessor: accessor, }, nil @@ -117,10 +118,13 @@ func addGrouplessTypes() { ResourceVersion string `json:"resourceVersion,omitempty"` TimeoutSeconds *int64 `json:"timeoutSeconds,omitempty"` } - api.Scheme.AddKnownTypes( - grouplessGroupVersion, &apiservertesting.Simple{}, &apiservertesting.SimpleList{}, &unversioned.Status{}, - &ListOptions{}, &api.DeleteOptions{}, &apiservertesting.SimpleGetOptions{}, &apiservertesting.SimpleRoot{}) + api.Scheme.AddKnownTypes(grouplessGroupVersion, + &apiservertesting.Simple{}, &apiservertesting.SimpleList{}, &ListOptions{}, + &api.DeleteOptions{}, &apiservertesting.SimpleGetOptions{}, &apiservertesting.SimpleRoot{}) api.Scheme.AddKnownTypes(grouplessGroupVersion, &api.Pod{}) + api.Scheme.AddKnownTypes(grouplessInternalGroupVersion, + &apiservertesting.Simple{}, &apiservertesting.SimpleList{}, &api.ListOptions{}, + &apiservertesting.SimpleGetOptions{}, &apiservertesting.SimpleRoot{}) } func addTestTypes() { @@ -133,11 +137,13 @@ func addTestTypes() { ResourceVersion string `json:"resourceVersion,omitempty"` TimeoutSeconds *int64 `json:"timeoutSeconds,omitempty"` } - api.Scheme.AddKnownTypes( - testGroupVersion, &apiservertesting.Simple{}, &apiservertesting.SimpleList{}, &unversioned.Status{}, - &ListOptions{}, &api.DeleteOptions{}, &apiservertesting.SimpleGetOptions{}, &apiservertesting.SimpleRoot{}, - &unversioned.ExportOptions{}) + api.Scheme.AddKnownTypes(testGroupVersion, + &apiservertesting.Simple{}, &apiservertesting.SimpleList{}, &ListOptions{}, + &api.DeleteOptions{}, &apiservertesting.SimpleGetOptions{}, &apiservertesting.SimpleRoot{}) api.Scheme.AddKnownTypes(testGroupVersion, &api.Pod{}) + api.Scheme.AddKnownTypes(testInternalGroupVersion, + &apiservertesting.Simple{}, &apiservertesting.SimpleList{}, &api.ListOptions{}, + &apiservertesting.SimpleGetOptions{}, &apiservertesting.SimpleRoot{}) } func addNewTestTypes() { @@ -150,21 +156,15 @@ func addNewTestTypes() { ResourceVersion string `json:"resourceVersion,omitempty"` TimeoutSeconds *int64 `json:"timeoutSeconds,omitempty"` } - api.Scheme.AddKnownTypes( - newGroupVersion, &apiservertesting.Simple{}, &apiservertesting.SimpleList{}, &unversioned.Status{}, - &ListOptions{}, &api.DeleteOptions{}, &apiservertesting.SimpleGetOptions{}, &apiservertesting.SimpleRoot{}, - &unversioned.ExportOptions{}) + api.Scheme.AddKnownTypes(newGroupVersion, + &apiservertesting.Simple{}, &apiservertesting.SimpleList{}, &ListOptions{}, + &api.DeleteOptions{}, &apiservertesting.SimpleGetOptions{}, &apiservertesting.SimpleRoot{}) } func init() { // Certain API objects are returned regardless of the contents of storage: // api.Status is returned in errors - // "internal" version - api.Scheme.AddKnownTypes( - testInternalGroupVersion, &apiservertesting.Simple{}, &apiservertesting.SimpleList{}, &unversioned.Status{}, - &api.ListOptions{}, &apiservertesting.SimpleGetOptions{}, &apiservertesting.SimpleRoot{}) - api.Scheme.AddInternalGroupVersion(testInternalGroupVersion) addGrouplessTypes() addTestTypes() addNewTestTypes() @@ -253,6 +253,8 @@ func handleInternal(storage map[string]rest.Storage, admissionControl admission. Linker: selfLinker, Mapper: namespaceMapper, + ParameterCodec: api.ParameterCodec, + Admit: admissionControl, Context: requestContextMapper, } @@ -263,7 +265,7 @@ func handleInternal(storage map[string]rest.Storage, admissionControl admission. group.Root = "/" + grouplessPrefix group.GroupVersion = grouplessGroupVersion group.OptionsExternalVersion = &grouplessGroupVersion - group.Codec = grouplessCodec + group.Serializer = latest.Codecs if err := (&group).InstallREST(container); err != nil { panic(fmt.Sprintf("unable to install container %s: %v", group.GroupVersion, err)) } @@ -275,7 +277,7 @@ func handleInternal(storage map[string]rest.Storage, admissionControl admission. group.Root = "/" + prefix group.GroupVersion = testGroupVersion group.OptionsExternalVersion = &testGroupVersion - group.Codec = codec + group.Serializer = latest.Codecs if err := (&group).InstallREST(container); err != nil { panic(fmt.Sprintf("unable to install container %s: %v", group.GroupVersion, err)) } @@ -287,7 +289,7 @@ func handleInternal(storage map[string]rest.Storage, admissionControl admission. group.Root = "/" + prefix group.GroupVersion = newGroupVersion group.OptionsExternalVersion = &newGroupVersion - group.Codec = newCodec + group.Serializer = latest.Codecs if err := (&group).InstallREST(container); err != nil { panic(fmt.Sprintf("unable to install container %s: %v", group.GroupVersion, err)) } @@ -430,7 +432,11 @@ func (storage *SimpleRESTStorage) Get(ctx api.Context, id string) (runtime.Objec if id == "binary" { return storage.stream, storage.errors["get"] } - return api.Scheme.CopyOrDie(&storage.item), storage.errors["get"] + copied, err := api.Scheme.Copy(&storage.item) + if err != nil { + panic(err) + } + return copied, storage.errors["get"] } func (storage *SimpleRESTStorage) checkContext(ctx api.Context) { @@ -645,7 +651,11 @@ func (storage *SimpleTypedStorage) New() runtime.Object { func (storage *SimpleTypedStorage) Get(ctx api.Context, id string) (runtime.Object, error) { storage.checkContext(ctx) - return api.Scheme.CopyOrDie(storage.item), storage.errors["get"] + copied, err := api.Scheme.Copy(storage.item) + if err != nil { + panic(err) + } + return copied, storage.errors["get"] } func (storage *SimpleTypedStorage) checkContext(ctx api.Context) { @@ -653,13 +663,16 @@ func (storage *SimpleTypedStorage) checkContext(ctx api.Context) { } func extractBody(response *http.Response, object runtime.Object) (string, error) { + return extractBodyDecoder(response, object, codec) +} + +func extractBodyDecoder(response *http.Response, object runtime.Object, decoder runtime.Decoder) (string, error) { defer response.Body.Close() body, err := ioutil.ReadAll(response.Body) if err != nil { return string(body), err } - err = runtime.DecodeInto(codec, body, object) - return string(body), err + return string(body), runtime.DecodeInto(decoder, body, object) } func TestNotFound(t *testing.T) { @@ -1196,7 +1209,11 @@ func TestMetadata(t *testing.T) { matches[s] = i + 1 } } - if matches["text/plain,application/json"] == 0 || matches["application/json"] == 0 || matches["*/*"] == 0 || len(matches) != 3 { + if matches["text/plain,application/json,application/yaml"] == 0 || + matches["application/json,application/yaml"] == 0 || + matches["application/json"] == 0 || + matches["*/*"] == 0 || + len(matches) != 4 { t.Errorf("unexpected mime types: %v", matches) } } @@ -2060,7 +2077,7 @@ func TestUpdate(t *testing.T) { }, Other: "bar", } - body, err := runtime.Encode(codec, item) + body, err := runtime.Encode(testCodec, item) if err != nil { // The following cases will fail, so die now t.Fatalf("unexpected error: %v", err) @@ -2098,7 +2115,7 @@ func TestUpdateInvokesAdmissionControl(t *testing.T) { }, Other: "bar", } - body, err := runtime.Encode(codec, item) + body, err := runtime.Encode(testCodec, item) if err != nil { // The following cases will fail, so die now t.Fatalf("unexpected error: %v", err) @@ -2128,7 +2145,7 @@ func TestUpdateRequiresMatchingName(t *testing.T) { item := &apiservertesting.Simple{ Other: "bar", } - body, err := runtime.Encode(codec, item) + body, err := runtime.Encode(testCodec, item) if err != nil { // The following cases will fail, so die now t.Fatalf("unexpected error: %v", err) @@ -2161,7 +2178,7 @@ func TestUpdateAllowsMissingNamespace(t *testing.T) { }, Other: "bar", } - body, err := runtime.Encode(codec, item) + body, err := runtime.Encode(testCodec, item) if err != nil { // The following cases will fail, so die now t.Fatalf("unexpected error: %v", err) @@ -2200,7 +2217,7 @@ func TestUpdateAllowsMismatchedNamespaceOnError(t *testing.T) { }, Other: "bar", } - body, err := runtime.Encode(codec, item) + body, err := runtime.Encode(testCodec, item) if err != nil { // The following cases will fail, so die now t.Fatalf("unexpected error: %v", err) @@ -2238,7 +2255,7 @@ func TestUpdatePreventsMismatchedNamespace(t *testing.T) { }, Other: "bar", } - body, err := runtime.Encode(codec, item) + body, err := runtime.Encode(testCodec, item) if err != nil { // The following cases will fail, so die now t.Fatalf("unexpected error: %v", err) @@ -2274,7 +2291,7 @@ func TestUpdateMissing(t *testing.T) { }, Other: "bar", } - body, err := runtime.Encode(codec, item) + body, err := runtime.Encode(testCodec, item) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -2304,7 +2321,7 @@ func TestCreateNotFound(t *testing.T) { client := http.Client{} simple := &apiservertesting.Simple{Other: "foo"} - data, err := runtime.Encode(codec, simple) + data, err := runtime.Encode(testCodec, simple) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -2331,7 +2348,7 @@ func TestCreateChecksDecode(t *testing.T) { client := http.Client{} simple := &api.Pod{} - data, err := runtime.Encode(codec, simple) + data, err := runtime.Encode(codec, simple, testGroupVersion) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -2373,7 +2390,9 @@ func TestUpdateREST(t *testing.T) { GroupVersion: newGroupVersion, OptionsExternalVersion: &newGroupVersion, - Codec: newCodec, + + Serializer: latest.Codecs, + ParameterCodec: api.ParameterCodec, } } @@ -2455,7 +2474,9 @@ func TestParentResourceIsRequired(t *testing.T) { GroupVersion: newGroupVersion, OptionsExternalVersion: &newGroupVersion, - Codec: newCodec, + + Serializer: latest.Codecs, + ParameterCodec: api.ParameterCodec, } container := restful.NewContainer() if err := group.InstallREST(container); err == nil { @@ -2484,7 +2505,9 @@ func TestParentResourceIsRequired(t *testing.T) { GroupVersion: newGroupVersion, OptionsExternalVersion: &newGroupVersion, - Codec: newCodec, + + Serializer: latest.Codecs, + ParameterCodec: api.ParameterCodec, } container = restful.NewContainer() if err := group.InstallREST(container); err != nil { @@ -2522,7 +2545,7 @@ func TestCreateWithName(t *testing.T) { client := http.Client{} simple := &apiservertesting.Simple{Other: "foo"} - data, err := runtime.Encode(codec, simple) + data, err := runtime.Encode(testCodec, simple) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -2550,7 +2573,7 @@ func TestUpdateChecksDecode(t *testing.T) { client := http.Client{} simple := &api.Pod{} - data, err := runtime.Encode(codec, simple) + data, err := runtime.Encode(codec, simple, testGroupVersion) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -2563,7 +2586,7 @@ func TestUpdateChecksDecode(t *testing.T) { t.Errorf("unexpected error: %v", err) } if response.StatusCode != http.StatusBadRequest { - t.Errorf("Unexpected response %#v", response) + t.Errorf("Unexpected response %#v\n%s", response, readBodyOrDie(response.Body)) } b, err := ioutil.ReadAll(response.Body) if err != nil { @@ -2627,7 +2650,7 @@ func TestCreate(t *testing.T) { simple := &apiservertesting.Simple{ Other: "bar", } - data, err := runtime.Encode(codec, simple) + data, err := runtime.Encode(testCodec, simple) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -2651,7 +2674,7 @@ func TestCreate(t *testing.T) { var itemOut apiservertesting.Simple body, err := extractBody(response, &itemOut) if err != nil { - t.Errorf("unexpected error: %v", err) + t.Errorf("unexpected error: %v %#v", err, response) } if !reflect.DeepEqual(&itemOut, simple) { @@ -2665,6 +2688,74 @@ func TestCreate(t *testing.T) { } } +func TestCreateYAML(t *testing.T) { + storage := SimpleRESTStorage{ + injectedFunction: func(obj runtime.Object) (runtime.Object, error) { + time.Sleep(5 * time.Millisecond) + return obj, nil + }, + } + selfLinker := &setTestSelfLinker{ + t: t, + name: "bar", + namespace: "default", + expectedSet: "/" + prefix + "/" + testGroupVersion.Group + "/" + testGroupVersion.Version + "/namespaces/default/foo/bar", + } + handler := handleLinker(map[string]rest.Storage{"foo": &storage}, selfLinker) + server := httptest.NewServer(handler) + defer server.Close() + client := http.Client{} + + // yaml encoder + simple := &apiservertesting.Simple{ + Other: "bar", + } + serializer, ok := latest.Codecs.SerializerForMediaType("application/yaml", nil) + if !ok { + t.Fatal("No yaml serializer") + } + encoder := latest.Codecs.EncoderForVersion(serializer, testGroupVersion) + decoder := latest.Codecs.DecoderToVersion(serializer, testInternalGroupVersion) + + data, err := runtime.Encode(encoder, simple) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + request, err := http.NewRequest("POST", server.URL+"/"+prefix+"/"+testGroupVersion.Group+"/"+testGroupVersion.Version+"/namespaces/default/foo", bytes.NewBuffer(data)) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + request.Header.Set("Accept", "application/yaml, application/json") + request.Header.Set("Content-Type", "application/yaml") + + wg := sync.WaitGroup{} + wg.Add(1) + var response *http.Response + go func() { + response, err = client.Do(request) + wg.Done() + }() + wg.Wait() + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + var itemOut apiservertesting.Simple + body, err := extractBodyDecoder(response, &itemOut, decoder) + if err != nil { + t.Fatalf("unexpected error: %v %#v", err, response) + } + + if !reflect.DeepEqual(&itemOut, simple) { + t.Errorf("Unexpected data: %#v, expected %#v (%s)", itemOut, simple, string(body)) + } + if response.StatusCode != http.StatusCreated { + t.Errorf("Unexpected status: %d, Expected: %d, %#v", response.StatusCode, http.StatusOK, response) + } + if !selfLinker.called { + t.Errorf("Never set self link") + } +} func TestCreateInNamespace(t *testing.T) { storage := SimpleRESTStorage{ injectedFunction: func(obj runtime.Object) (runtime.Object, error) { @@ -2687,13 +2778,13 @@ func TestCreateInNamespace(t *testing.T) { simple := &apiservertesting.Simple{ Other: "bar", } - data, err := runtime.Encode(codec, simple) + data, err := runtime.Encode(testCodec, simple) if err != nil { - t.Errorf("unexpected error: %v", err) + t.Fatalf("unexpected error: %v", err) } request, err := http.NewRequest("POST", server.URL+"/"+prefix+"/"+testGroupVersion.Group+"/"+testGroupVersion.Version+"/namespaces/other/foo", bytes.NewBuffer(data)) if err != nil { - t.Errorf("unexpected error: %v", err) + t.Fatalf("unexpected error: %v", err) } wg := sync.WaitGroup{} @@ -2705,13 +2796,13 @@ func TestCreateInNamespace(t *testing.T) { }() wg.Wait() if err != nil { - t.Errorf("unexpected error: %v", err) + t.Fatalf("unexpected error: %v", err) } var itemOut apiservertesting.Simple body, err := extractBody(response, &itemOut) if err != nil { - t.Errorf("unexpected error: %v", err) + t.Fatalf("unexpected error: %v\n%s", err, data) } if !reflect.DeepEqual(&itemOut, simple) { @@ -2747,7 +2838,7 @@ func TestCreateInvokesAdmissionControl(t *testing.T) { simple := &apiservertesting.Simple{ Other: "bar", } - data, err := runtime.Encode(codec, simple) + data, err := runtime.Encode(testCodec, simple) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -2822,7 +2913,7 @@ func (obj *UnregisteredAPIObject) GetObjectKind() unversioned.ObjectKind { func TestWriteJSONDecodeError(t *testing.T) { server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { - writeJSON(http.StatusOK, codec, &UnregisteredAPIObject{"Undecodable"}, w, false) + writeNegotiated(latest.Codecs, newGroupVersion, w, req, http.StatusOK, &UnregisteredAPIObject{"Undecodable"}) })) // TODO: Uncomment when fix #19254 // defer server.Close() @@ -2833,7 +2924,7 @@ func TestWriteJSONDecodeError(t *testing.T) { if status.Reason != unversioned.StatusReasonUnknown { t.Errorf("unexpected reason %#v", status) } - if !strings.Contains(status.Message, "type apiserver.UnregisteredAPIObject is not registered") { + if !strings.Contains(status.Message, "no kind is registered for the type apiserver.UnregisteredAPIObject") { t.Errorf("unexpected message %#v", status) } } @@ -2881,7 +2972,7 @@ func TestCreateTimeout(t *testing.T) { // defer server.Close() simple := &apiservertesting.Simple{Other: "foo"} - data, err := runtime.Encode(codec, simple) + data, err := runtime.Encode(testCodec, simple) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -2993,7 +3084,7 @@ func TestCreateChecksAPIVersion(t *testing.T) { b, err := ioutil.ReadAll(response.Body) if err != nil { t.Errorf("unexpected error: %v", err) - } else if !strings.Contains(string(b), "does not match the specified apiVersion") { + } else if !strings.Contains(string(b), "does not match the expected API version") { t.Errorf("unexpected response: %s", string(b)) } } @@ -3044,15 +3135,15 @@ func TestUpdateChecksAPIVersion(t *testing.T) { simple := &apiservertesting.Simple{ObjectMeta: api.ObjectMeta{Name: "bar"}} data, err := runtime.Encode(newCodec, simple) if err != nil { - t.Errorf("unexpected error: %v", err) + t.Fatalf("unexpected error: %v", err) } request, err := http.NewRequest("PUT", server.URL+"/"+prefix+"/"+testGroupVersion.Group+"/"+testGroupVersion.Version+"/namespaces/default/simple/bar", bytes.NewBuffer(data)) if err != nil { - t.Errorf("unexpected error: %v", err) + t.Fatalf("unexpected error: %v", err) } response, err := client.Do(request) if err != nil { - t.Errorf("unexpected error: %v", err) + t.Fatalf("unexpected error: %v", err) } if response.StatusCode != http.StatusBadRequest { t.Errorf("Unexpected response %#v", response) @@ -3060,7 +3151,15 @@ func TestUpdateChecksAPIVersion(t *testing.T) { b, err := ioutil.ReadAll(response.Body) if err != nil { t.Errorf("unexpected error: %v", err) - } else if !strings.Contains(string(b), "does not match the specified apiVersion") { + } else if !strings.Contains(string(b), "does not match the expected API version") { t.Errorf("unexpected response: %s", string(b)) } } + +func readBodyOrDie(r io.Reader) []byte { + body, err := ioutil.ReadAll(r) + if err != nil { + panic(err) + } + return body +} diff --git a/pkg/apiserver/errors.go b/pkg/apiserver/errors.go index d3d485e7d09..c7c5e9a90f1 100644 --- a/pkg/apiserver/errors.go +++ b/pkg/apiserver/errors.go @@ -19,6 +19,7 @@ package apiserver import ( "fmt" "net/http" + "strings" "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/storage" @@ -105,3 +106,41 @@ func IsAPIPrefixNotFound(err error) bool { _, ok := err.(*errAPIPrefixNotFound) return ok } + +// errNotAcceptable indicates Accept negotiation has failed +// TODO: move to api/errors if other code needs to return this +type errNotAcceptable struct { + accepted []string +} + +func (e errNotAcceptable) Error() string { + return fmt.Sprintf("only the following media types are accepted: %v", strings.Join(e.accepted, ", ")) +} + +func (e errNotAcceptable) Status() unversioned.Status { + return unversioned.Status{ + Status: unversioned.StatusFailure, + Code: http.StatusNotAcceptable, + Reason: unversioned.StatusReason("NotAcceptable"), + Message: e.Error(), + } +} + +// errNotAcceptable indicates Content-Type is not recognized +// TODO: move to api/errors if other code needs to return this +type errUnsupportedMediaType struct { + accepted []string +} + +func (e errUnsupportedMediaType) Error() string { + return fmt.Sprintf("the body of the request was in an unknown format - accepted media types include: %v", strings.Join(e.accepted, ", ")) +} + +func (e errUnsupportedMediaType) Status() unversioned.Status { + return unversioned.Status{ + Status: unversioned.StatusFailure, + Code: http.StatusUnsupportedMediaType, + Reason: unversioned.StatusReason("UnsupportedMediaType"), + Message: e.Error(), + } +} diff --git a/pkg/apiserver/negotiate.go b/pkg/apiserver/negotiate.go new file mode 100644 index 00000000000..1457addbfc2 --- /dev/null +++ b/pkg/apiserver/negotiate.go @@ -0,0 +1,116 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package apiserver + +import ( + "mime" + "net/http" + "strconv" + "strings" + + "bitbucket.org/ww/goautoneg" + + "k8s.io/kubernetes/pkg/runtime" +) + +func negotiateOutputSerializer(req *http.Request, ns runtime.NegotiatedSerializer) (runtime.Serializer, string, error) { + acceptHeader := req.Header.Get("Accept") + supported := ns.SupportedMediaTypes() + if len(acceptHeader) == 0 && len(supported) > 0 { + acceptHeader = supported[0] + } + accept, ok := negotiate(acceptHeader, supported) + if !ok { + return nil, "", errNotAcceptable{supported} + } + + pretty := isPrettyPrint(req) + if _, ok := accept.Params["pretty"]; !ok && pretty { + accept.Params["pretty"] = "1" + } + mediaType := accept.Type + if len(accept.SubType) > 0 { + mediaType += "/" + accept.SubType + } + if s, ok := ns.SerializerForMediaType(mediaType, accept.Params); ok { + return s, mediaType, nil + } + + return nil, "", errNotAcceptable{supported} +} + +func negotiateInputSerializer(req *http.Request, s runtime.NegotiatedSerializer) (runtime.Serializer, error) { + supported := s.SupportedMediaTypes() + mediaType := req.Header.Get("Content-Type") + if len(mediaType) == 0 { + mediaType = supported[0] + } + mediaType, options, err := mime.ParseMediaType(mediaType) + if err != nil { + return nil, errUnsupportedMediaType{supported} + } + out, ok := s.SerializerForMediaType(mediaType, options) + if !ok { + return nil, errUnsupportedMediaType{supported} + } + return out, nil +} + +// isPrettyPrint returns true if the "pretty" query parameter is true or if the User-Agent +// matches known "human" clients. +func isPrettyPrint(req *http.Request) bool { + // DEPRECATED: should be part of the content type + if req.URL != nil { + pp := req.URL.Query().Get("pretty") + if len(pp) > 0 { + pretty, _ := strconv.ParseBool(pp) + return pretty + } + } + userAgent := req.UserAgent() + // This covers basic all browers and cli http tools + if strings.HasPrefix(userAgent, "curl") || strings.HasPrefix(userAgent, "Wget") || strings.HasPrefix(userAgent, "Mozilla/5.0") { + return true + } + return false +} + +// negotiate the most appropriate content type given the accept header and a list of +// alternatives. +func negotiate(header string, alternatives []string) (goautoneg.Accept, bool) { + alternates := make([][]string, 0, len(alternatives)) + for _, alternate := range alternatives { + alternates = append(alternates, strings.SplitN(alternate, "/", 2)) + } + for _, clause := range goautoneg.ParseAccept(header) { + for _, alternate := range alternates { + if clause.Type == alternate[0] && clause.SubType == alternate[1] { + return clause, true + } + if clause.Type == alternate[0] && clause.SubType == "*" { + clause.SubType = alternate[1] + return clause, true + } + if clause.Type == "*" && clause.SubType == "*" { + clause.Type = alternate[0] + clause.SubType = alternate[1] + return clause, true + } + } + } + return goautoneg.Accept{}, false +} diff --git a/pkg/apiserver/negotiate_test.go b/pkg/apiserver/negotiate_test.go new file mode 100644 index 00000000000..8e59d6d69cc --- /dev/null +++ b/pkg/apiserver/negotiate_test.go @@ -0,0 +1,252 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package apiserver + +import ( + "net/http" + "net/url" + "reflect" + "testing" + + "k8s.io/kubernetes/pkg/api/unversioned" + "k8s.io/kubernetes/pkg/runtime" +) + +type fakeNegotiater struct { + serializer runtime.Serializer + types []string + mediaType string + options map[string]string +} + +func (n *fakeNegotiater) SupportedMediaTypes() []string { + return n.types +} + +func (n *fakeNegotiater) SerializerForMediaType(mediaType string, options map[string]string) (runtime.Serializer, bool) { + n.mediaType = mediaType + if len(options) > 0 { + n.options = options + } + return n.serializer, n.serializer != nil +} + +func (n *fakeNegotiater) EncoderForVersion(serializer runtime.Serializer, gv unversioned.GroupVersion) runtime.Encoder { + return n.serializer +} + +func (n *fakeNegotiater) DecoderToVersion(serializer runtime.Serializer, gv unversioned.GroupVersion) runtime.Decoder { + return n.serializer +} + +var fakeCodec = runtime.NewCodec(runtime.NoopEncoder{}, runtime.NoopDecoder{}) + +func TestNegotiate(t *testing.T) { + testCases := []struct { + accept string + req *http.Request + ns *fakeNegotiater + serializer runtime.Serializer + contentType string + params map[string]string + errFn func(error) bool + }{ + // pick a default + { + req: &http.Request{}, + contentType: "application/json", + ns: &fakeNegotiater{serializer: fakeCodec, types: []string{"application/json"}}, + serializer: fakeCodec, + }, + { + accept: "", + contentType: "application/json", + ns: &fakeNegotiater{serializer: fakeCodec, types: []string{"application/json"}}, + serializer: fakeCodec, + }, + { + accept: "*/*", + contentType: "application/json", + ns: &fakeNegotiater{serializer: fakeCodec, types: []string{"application/json"}}, + serializer: fakeCodec, + }, + { + accept: "application/*", + contentType: "application/json", + ns: &fakeNegotiater{serializer: fakeCodec, types: []string{"application/json"}}, + serializer: fakeCodec, + }, + { + accept: "application/json", + contentType: "application/json", + ns: &fakeNegotiater{serializer: fakeCodec, types: []string{"application/json"}}, + serializer: fakeCodec, + }, + { + accept: "application/json", + contentType: "application/json", + ns: &fakeNegotiater{serializer: fakeCodec, types: []string{"application/json", "application/protobuf"}}, + serializer: fakeCodec, + }, + { + accept: "application/protobuf", + contentType: "application/protobuf", + ns: &fakeNegotiater{serializer: fakeCodec, types: []string{"application/json", "application/protobuf"}}, + serializer: fakeCodec, + }, + { + accept: "application/json; pretty=1", + contentType: "application/json", + ns: &fakeNegotiater{serializer: fakeCodec, types: []string{"application/json"}}, + serializer: fakeCodec, + params: map[string]string{"pretty": "1"}, + }, + { + accept: "unrecognized/stuff,application/json; pretty=1", + contentType: "application/json", + ns: &fakeNegotiater{serializer: fakeCodec, types: []string{"application/json"}}, + serializer: fakeCodec, + params: map[string]string{"pretty": "1"}, + }, + + // query param triggers pretty + { + req: &http.Request{ + Header: http.Header{"Accept": []string{"application/json"}}, + URL: &url.URL{RawQuery: "pretty=1"}, + }, + contentType: "application/json", + ns: &fakeNegotiater{serializer: fakeCodec, types: []string{"application/json"}}, + serializer: fakeCodec, + params: map[string]string{"pretty": "1"}, + }, + + // certain user agents trigger pretty + { + req: &http.Request{ + Header: http.Header{ + "Accept": []string{"application/json"}, + "User-Agent": []string{"curl"}, + }, + }, + contentType: "application/json", + ns: &fakeNegotiater{serializer: fakeCodec, types: []string{"application/json"}}, + serializer: fakeCodec, + params: map[string]string{"pretty": "1"}, + }, + { + req: &http.Request{ + Header: http.Header{ + "Accept": []string{"application/json"}, + "User-Agent": []string{"Wget"}, + }, + }, + contentType: "application/json", + ns: &fakeNegotiater{serializer: fakeCodec, types: []string{"application/json"}}, + serializer: fakeCodec, + params: map[string]string{"pretty": "1"}, + }, + { + req: &http.Request{ + Header: http.Header{ + "Accept": []string{"application/json"}, + "User-Agent": []string{"Mozilla/5.0"}, + }, + }, + contentType: "application/json", + ns: &fakeNegotiater{serializer: fakeCodec, types: []string{"application/json"}}, + serializer: fakeCodec, + params: map[string]string{"pretty": "1"}, + }, + + // "application" is not a valid media type, so the server will reject the response during + // negotiation (the server, in error, has specified an invalid media type) + { + accept: "application", + ns: &fakeNegotiater{serializer: fakeCodec, types: []string{"application"}}, + errFn: func(err error) bool { + return err.Error() == "only the following media types are accepted: application" + }, + }, + { + ns: &fakeNegotiater{types: []string{"a/b/c"}}, + errFn: func(err error) bool { + return err.Error() == "only the following media types are accepted: a/b/c" + }, + }, + { + ns: &fakeNegotiater{}, + errFn: func(err error) bool { + return err.Error() == "only the following media types are accepted: " + }, + }, + { + accept: "*/*", + ns: &fakeNegotiater{}, + errFn: func(err error) bool { + return err.Error() == "only the following media types are accepted: " + }, + }, + { + accept: "application/json", + ns: &fakeNegotiater{types: []string{"application/json"}}, + errFn: func(err error) bool { + return err.Error() == "only the following media types are accepted: application/json" + }, + }, + } + + for i, test := range testCases { + req := test.req + if req == nil { + req = &http.Request{Header: http.Header{}} + req.Header.Set("Accept", test.accept) + } + s, contentType, err := negotiateOutputSerializer(req, test.ns) + switch { + case err == nil && test.errFn != nil: + t.Errorf("%d: failed: expected error", i) + continue + case err != nil && test.errFn == nil: + t.Errorf("%d: failed: %v", i, err) + continue + case err != nil: + if !test.errFn(err) { + t.Errorf("%d: failed: %v", i, err) + } + status, ok := err.(statusError) + if !ok { + t.Errorf("%d: failed, error should be statusError: %v", i, err) + continue + } + if status.Status().Status != unversioned.StatusFailure || status.Status().Code != http.StatusNotAcceptable { + t.Errorf("%d: failed: %v", i, err) + continue + } + continue + } + if test.contentType != contentType { + t.Errorf("%d: unexpected %s %s", i, test.contentType, contentType) + } + if s != test.serializer { + t.Errorf("%d: unexpected %s %s", i, test.serializer, s) + } + if !reflect.DeepEqual(test.params, test.ns.options) { + t.Errorf("%d: unexpected %#v %#v", i, test.params, test.ns.options) + } + } +} diff --git a/pkg/apiserver/proxy.go b/pkg/apiserver/proxy.go index 32474daaa9c..d640cb957f6 100644 --- a/pkg/apiserver/proxy.go +++ b/pkg/apiserver/proxy.go @@ -29,6 +29,7 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/api/rest" + "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/apiserver/metrics" "k8s.io/kubernetes/pkg/httplog" "k8s.io/kubernetes/pkg/runtime" @@ -44,7 +45,7 @@ import ( type ProxyHandler struct { prefix string storage map[string]rest.Storage - codec runtime.Codec + serializer runtime.NegotiatedSerializer context api.RequestContextMapper requestInfoResolver *RequestInfoResolver } @@ -98,20 +99,19 @@ func (r *ProxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { } apiResource = resource + gv := unversioned.GroupVersion{Group: requestInfo.APIGroup, Version: requestInfo.APIVersion} + redirector, ok := storage.(rest.Redirector) if !ok { httplog.LogOf(req, w).Addf("'%v' is not a redirector", resource) - httpCode = errorJSON(errors.NewMethodNotSupported(api.Resource(resource), "proxy"), r.codec, w) + httpCode = errorNegotiated(errors.NewMethodNotSupported(api.Resource(resource), "proxy"), r.serializer, gv, w, req) return } location, roundTripper, err := redirector.ResourceLocation(ctx, id) if err != nil { httplog.LogOf(req, w).Addf("Error getting ResourceLocation: %v", err) - status := errToAPIStatus(err) - code := int(status.Code) - writeJSON(code, r.codec, status, w, true) - httpCode = code + httpCode = errorNegotiated(err, r.serializer, gv, w, req) return } if location == nil { @@ -144,11 +144,7 @@ func (r *ProxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { newReq, err := http.NewRequest(req.Method, location.String(), req.Body) if err != nil { - status := errToAPIStatus(err) - code := int(status.Code) - writeJSON(code, r.codec, status, w, true) - notFound(w, req) - httpCode = code + httpCode = errorNegotiated(err, r.serializer, gv, w, req) return } httpCode = http.StatusOK @@ -161,7 +157,7 @@ func (r *ProxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { // TODO convert this entire proxy to an UpgradeAwareProxy similar to // https://github.com/openshift/origin/blob/master/pkg/util/httpproxy/upgradeawareproxy.go. // That proxy needs to be modified to support multiple backends, not just 1. - if r.tryUpgrade(w, req, newReq, location, roundTripper) { + if r.tryUpgrade(w, req, newReq, location, roundTripper, gv) { return } @@ -210,15 +206,13 @@ func (r *ProxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { } // tryUpgrade returns true if the request was handled. -func (r *ProxyHandler) tryUpgrade(w http.ResponseWriter, req, newReq *http.Request, location *url.URL, transport http.RoundTripper) bool { +func (r *ProxyHandler) tryUpgrade(w http.ResponseWriter, req, newReq *http.Request, location *url.URL, transport http.RoundTripper, gv unversioned.GroupVersion) bool { if !httpstream.IsUpgradeRequest(req) { return false } backendConn, err := proxyutil.DialURL(location, transport) if err != nil { - status := errToAPIStatus(err) - code := int(status.Code) - writeJSON(code, r.codec, status, w, true) + errorNegotiated(err, r.serializer, gv, w, req) return true } defer backendConn.Close() @@ -228,17 +222,13 @@ func (r *ProxyHandler) tryUpgrade(w http.ResponseWriter, req, newReq *http.Reque // hijack, just for reference... requestHijackedConn, _, err := w.(http.Hijacker).Hijack() if err != nil { - status := errToAPIStatus(err) - code := int(status.Code) - writeJSON(code, r.codec, status, w, true) + errorNegotiated(err, r.serializer, gv, w, req) return true } defer requestHijackedConn.Close() if err = newReq.Write(backendConn); err != nil { - status := errToAPIStatus(err) - code := int(status.Code) - writeJSON(code, r.codec, status, w, true) + errorNegotiated(err, r.serializer, gv, w, req) return true } diff --git a/pkg/apiserver/resthandler.go b/pkg/apiserver/resthandler.go index 41032066af1..43674b933ef 100644 --- a/pkg/apiserver/resthandler.go +++ b/pkg/apiserver/resthandler.go @@ -32,7 +32,6 @@ import ( "k8s.io/kubernetes/pkg/api/meta" "k8s.io/kubernetes/pkg/api/rest" "k8s.io/kubernetes/pkg/api/unversioned" - "k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/fields" "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/util" @@ -70,7 +69,8 @@ type ScopeNamer interface { type RequestScope struct { Namer ScopeNamer ContextFunc - runtime.Codec + Serializer runtime.NegotiatedSerializer + runtime.ParameterCodec Creater runtime.ObjectCreater Convertor runtime.ObjectConvertor @@ -79,6 +79,10 @@ type RequestScope struct { Subresource string } +func (scope *RequestScope) err(err error, req *restful.Request, res *restful.Response) { + errorNegotiated(err, scope.Serializer, scope.Kind.GroupVersion(), res.ResponseWriter, req.Request) +} + // getterFunc performs a get request with the given context and object name. The request // may be used to deserialize an options object to pass to the getter. type getterFunc func(ctx api.Context, name string, req *restful.Request) (runtime.Object, error) @@ -93,7 +97,7 @@ func getResourceHandler(scope RequestScope, getter getterFunc) restful.RouteFunc w := res.ResponseWriter namespace, name, err := scope.Namer.Name(req) if err != nil { - errorJSON(err, scope.Codec, w) + scope.err(err, req, res) return } ctx := scope.ContextFunc(req) @@ -101,14 +105,14 @@ func getResourceHandler(scope RequestScope, getter getterFunc) restful.RouteFunc result, err := getter(ctx, name, req) if err != nil { - errorJSON(err, scope.Codec, w) + scope.err(err, req, res) return } if err := setSelfLink(result, req, scope.Namer); err != nil { - errorJSON(err, scope.Codec, w) + scope.err(err, req, res) return } - write(http.StatusOK, scope.Kind.GroupVersion(), scope.Codec, result, w, req.Request) + write(http.StatusOK, scope.Kind.GroupVersion(), scope.Serializer, result, w, req.Request) } } @@ -119,44 +123,41 @@ func GetResource(r rest.Getter, e rest.Exporter, scope RequestScope) restful.Rou // For performance tracking purposes. trace := util.NewTrace("Get " + req.Request.URL.Path) defer trace.LogIfLong(250 * time.Millisecond) - opts := v1.ExportOptions{} - if err := scope.Codec.DecodeParametersInto(req.Request.URL.Query(), &opts); err != nil { - return nil, err - } - internalOpts := unversioned.ExportOptions{} - scope.Convertor.Convert(&opts, &internalOpts) - if internalOpts.Export { - if e == nil { - return nil, errors.NewBadRequest("export unsupported") + + // check for export + if values := req.Request.URL.Query(); len(values) > 0 { + // TODO: this is internal version, not unversioned + exports := unversioned.ExportOptions{} + if err := scope.ParameterCodec.DecodeParameters(values, unversioned.GroupVersion{Version: "v1"}, &exports); err != nil { + return nil, err + } + if exports.Export { + if e == nil { + return nil, errors.NewBadRequest(fmt.Sprintf("export of %q is not supported", scope.Resource.Resource)) + } + return e.Export(ctx, name, exports) } - return e.Export(ctx, name, internalOpts) } + return r.Get(ctx, name) }) } // GetResourceWithOptions returns a function that handles retrieving a single resource from a rest.Storage object. -func GetResourceWithOptions(r rest.GetterWithOptions, e rest.Exporter, scope RequestScope, internalKind, externalKind unversioned.GroupVersionKind, subpath bool, subpathKey string) restful.RouteFunction { +func GetResourceWithOptions(r rest.GetterWithOptions, scope RequestScope) restful.RouteFunction { return getResourceHandler(scope, func(ctx api.Context, name string, req *restful.Request) (runtime.Object, error) { - opts, err := getRequestOptions(req, scope, internalKind, externalKind, subpath, subpathKey) - if err != nil { + opts, subpath, subpathKey := r.NewGetOptions() + if err := getRequestOptions(req, scope, opts, subpath, subpathKey); err != nil { return nil, err } - exportOpts := unversioned.ExportOptions{} - if err := scope.Codec.DecodeParametersInto(req.Request.URL.Query(), &exportOpts); err != nil { - return nil, err - } - if exportOpts.Export { - return nil, errors.NewBadRequest("export unsupported") - } return r.Get(ctx, name, opts) }) } -func getRequestOptions(req *restful.Request, scope RequestScope, internalKind, externalKind unversioned.GroupVersionKind, subpath bool, subpathKey string) (runtime.Object, error) { - if internalKind.IsEmpty() { - return nil, nil +func getRequestOptions(req *restful.Request, scope RequestScope, into runtime.Object, subpath bool, subpathKey string) error { + if into == nil { + return nil } query := req.Request.URL.Query() @@ -168,37 +169,23 @@ func getRequestOptions(req *restful.Request, scope RequestScope, internalKind, e newQuery[subpathKey] = []string{req.PathParameter("path")} query = newQuery } - - versioned, err := scope.Creater.New(externalKind) - if err != nil { - return nil, err - } - - if err := scope.Codec.DecodeParametersInto(query, versioned); err != nil { - return nil, errors.NewBadRequest(err.Error()) - } - out, err := scope.Convertor.ConvertToVersion(versioned, internalKind.GroupVersion().String()) - if err != nil { - // programmer error - return nil, err - } - return out, nil + return scope.ParameterCodec.DecodeParameters(query, scope.Kind.GroupVersion(), into) } // ConnectResource returns a function that handles a connect request on a rest.Storage object. -func ConnectResource(connecter rest.Connecter, scope RequestScope, admit admission.Interface, internalKind, externalKind unversioned.GroupVersionKind, restPath string, subpath bool, subpathKey string) restful.RouteFunction { +func ConnectResource(connecter rest.Connecter, scope RequestScope, admit admission.Interface, restPath string) restful.RouteFunction { return func(req *restful.Request, res *restful.Response) { w := res.ResponseWriter namespace, name, err := scope.Namer.Name(req) if err != nil { - errorJSON(err, scope.Codec, w) + scope.err(err, req, res) return } ctx := scope.ContextFunc(req) ctx = api.WithNamespace(ctx, namespace) - opts, err := getRequestOptions(req, scope, internalKind, externalKind, subpath, subpathKey) - if err != nil { - errorJSON(err, scope.Codec, w) + opts, subpath, subpathKey := connecter.NewConnectOptions() + if err := getRequestOptions(req, scope, opts, subpath, subpathKey); err != nil { + scope.err(err, req, res) return } if admit.Handles(admission.Connect) { @@ -211,13 +198,13 @@ func ConnectResource(connecter rest.Connecter, scope RequestScope, admit admissi err = admit.Admit(admission.NewAttributesRecord(connectRequest, scope.Kind.GroupKind(), namespace, name, scope.Resource.GroupResource(), scope.Subresource, admission.Connect, userInfo)) if err != nil { - errorJSON(err, scope.Codec, w) + scope.err(err, req, res) return } } - handler, err := connecter.Connect(ctx, name, opts, &responder{scope: scope, req: req.Request, w: w}) + handler, err := connecter.Connect(ctx, name, opts, &responder{scope: scope, req: req, res: res}) if err != nil { - errorJSON(err, scope.Codec, w) + scope.err(err, req, res) return } handler.ServeHTTP(w, req.Request) @@ -227,16 +214,16 @@ func ConnectResource(connecter rest.Connecter, scope RequestScope, admit admissi // responder implements rest.Responder for assisting a connector in writing objects or errors. type responder struct { scope RequestScope - req *http.Request - w http.ResponseWriter + req *restful.Request + res *restful.Response } func (r *responder) Object(statusCode int, obj runtime.Object) { - write(statusCode, r.scope.Kind.GroupVersion(), r.scope.Codec, obj, r.w, r.req) + write(statusCode, r.scope.Kind.GroupVersion(), r.scope.Serializer, obj, r.res.ResponseWriter, r.req.Request) } func (r *responder) Error(err error) { - errorJSON(err, r.scope.Codec, r.w) + r.scope.err(err, r.req, r.res) } // ListResource returns a function that handles retrieving a list of resources from a rest.Storage object. @@ -249,7 +236,7 @@ func ListResource(r rest.Lister, rw rest.Watcher, scope RequestScope, forceWatch namespace, err := scope.Namer.Namespace(req) if err != nil { - errorJSON(err, scope.Codec, w) + scope.err(err, req, res) return } @@ -264,19 +251,9 @@ func ListResource(r rest.Lister, rw rest.Watcher, scope RequestScope, forceWatch ctx := scope.ContextFunc(req) ctx = api.WithNamespace(ctx, namespace) - listOptionsGVK := scope.Kind.GroupVersion().WithKind("ListOptions") - versioned, err := scope.Creater.New(listOptionsGVK) - if err != nil { - errorJSON(err, scope.Codec, w) - return - } - if err := scope.Codec.DecodeParametersInto(req.Request.URL.Query(), versioned); err != nil { - errorJSON(err, scope.Codec, w) - return - } opts := api.ListOptions{} - if err := scope.Convertor.Convert(versioned, &opts); err != nil { - errorJSON(err, scope.Codec, w) + if err := scope.ParameterCodec.DecodeParameters(req.Request.URL.Query(), scope.Kind.GroupVersion(), &opts); err != nil { + scope.err(err, req, res) return } @@ -289,7 +266,7 @@ func ListResource(r rest.Lister, rw rest.Watcher, scope RequestScope, forceWatch if opts.FieldSelector, err = opts.FieldSelector.Transform(fn); err != nil { // TODO: allow bad request to set field causes based on query parameters err = errors.NewBadRequest(err.Error()) - errorJSON(err, scope.Codec, w) + scope.err(err, req, res) return } } @@ -305,11 +282,7 @@ func ListResource(r rest.Lister, rw rest.Watcher, scope RequestScope, forceWatch // and a field selector, since just the name is // sufficient to narrow down the request to a // single object. - errorJSON( - errors.NewBadRequest("both a name and a field selector provided; please provide one or the other."), - scope.Codec, - w, - ) + scope.err(errors.NewBadRequest("both a name and a field selector provided; please provide one or the other."), req, res) return } opts.FieldSelector = nameSelector @@ -318,7 +291,7 @@ func ListResource(r rest.Lister, rw rest.Watcher, scope RequestScope, forceWatch if (opts.Watch || forceWatch) && rw != nil { watcher, err := rw.Watch(ctx, &opts) if err != nil { - errorJSON(err, scope.Codec, w) + scope.err(err, req, res) return } // TODO: Currently we explicitly ignore ?timeout= and use only ?timeoutSeconds=. @@ -329,7 +302,7 @@ func ListResource(r rest.Lister, rw rest.Watcher, scope RequestScope, forceWatch if timeout == 0 && minRequestTimeout > 0 { timeout = time.Duration(float64(minRequestTimeout) * (rand.Float64() + 1.0)) } - serveWatch(watcher, scope, w, req, timeout) + serveWatch(watcher, scope, req, res, timeout) return } @@ -338,17 +311,17 @@ func ListResource(r rest.Lister, rw rest.Watcher, scope RequestScope, forceWatch trace.Step("About to List from storage") result, err := r.List(ctx, &opts) if err != nil { - errorJSON(err, scope.Codec, w) + scope.err(err, req, res) return } trace.Step("Listing from storage done") numberOfItems, err := setListSelfLink(result, req, scope.Namer) if err != nil { - errorJSON(err, scope.Codec, w) + scope.err(err, req, res) return } trace.Step("Self-linking done") - write(http.StatusOK, scope.Kind.GroupVersion(), scope.Codec, result, w, req.Request) + write(http.StatusOK, scope.Kind.GroupVersion(), scope.Serializer, result, w, req.Request) trace.Step(fmt.Sprintf("Writing http response done (%d items)", numberOfItems)) } } @@ -374,25 +347,39 @@ func createHandler(r rest.NamedCreater, scope RequestScope, typer runtime.Object namespace, err = scope.Namer.Namespace(req) } if err != nil { - errorJSON(err, scope.Codec, w) + scope.err(err, req, res) return } ctx := scope.ContextFunc(req) ctx = api.WithNamespace(ctx, namespace) + gv := scope.Kind.GroupVersion() + s, err := negotiateInputSerializer(req.Request, scope.Serializer) + if err != nil { + scope.err(err, req, res) + return + } + decoder := scope.Serializer.DecoderToVersion(s, unversioned.GroupVersion{Group: gv.Group, Version: runtime.APIVersionInternal}) + body, err := readBody(req.Request) if err != nil { - errorJSON(err, scope.Codec, w) + scope.err(err, req, res) return } - obj := r.New() + defaultGVK := scope.Kind + original := r.New() trace.Step("About to convert to expected version") - // TODO this cleans up with proper typing - if err := scope.Codec.DecodeIntoWithSpecifiedVersionKind(body, obj, scope.Kind); err != nil { - err = transformDecodeError(typer, err, obj, body) - errorJSON(err, scope.Codec, w) + obj, gvk, err := decoder.Decode(body, &defaultGVK, original) + if err != nil { + err = transformDecodeError(typer, err, original, gvk) + scope.err(err, req, res) + return + } + if gvk.GroupVersion() != gv { + err = errors.NewBadRequest(fmt.Sprintf("the API version in the data (%s) does not match the expected API version (%v)", gvk.GroupVersion().String(), gv.String())) + scope.err(err, req, res) return } trace.Step("Conversion done") @@ -402,7 +389,7 @@ func createHandler(r rest.NamedCreater, scope RequestScope, typer runtime.Object err = admit.Admit(admission.NewAttributesRecord(obj, scope.Kind.GroupKind(), namespace, name, scope.Resource.GroupResource(), scope.Subresource, admission.Create, userInfo)) if err != nil { - errorJSON(err, scope.Codec, w) + scope.err(err, req, res) return } } @@ -416,18 +403,18 @@ func createHandler(r rest.NamedCreater, scope RequestScope, typer runtime.Object return out, err }) if err != nil { - errorJSON(err, scope.Codec, w) + scope.err(err, req, res) return } trace.Step("Object stored in database") if err := setSelfLink(result, req, scope.Namer); err != nil { - errorJSON(err, scope.Codec, w) + scope.err(err, req, res) return } trace.Step("Self-link added") - write(http.StatusCreated, scope.Kind.GroupVersion(), scope.Codec, result, w, req.Request) + write(http.StatusCreated, scope.Kind.GroupVersion(), scope.Serializer, result, w, req.Request) } } @@ -462,7 +449,7 @@ func PatchResource(r rest.Patcher, scope RequestScope, typer runtime.ObjectTyper namespace, name, err := scope.Namer.Name(req) if err != nil { - errorJSON(err, scope.Codec, w) + scope.err(err, req, res) return } @@ -471,10 +458,11 @@ func PatchResource(r rest.Patcher, scope RequestScope, typer runtime.ObjectTyper versionedObj, err := converter.ConvertToVersion(r.New(), scope.Kind.GroupVersion().String()) if err != nil { - errorJSON(err, scope.Codec, w) + scope.err(err, req, res) return } + // TODO: handle this in negotiation contentType := req.HeaderParameter("Content-Type") // Remove "; charset=" if included in header. if idx := strings.Index(contentType, ";"); idx > 0 { @@ -484,10 +472,21 @@ func PatchResource(r rest.Patcher, scope RequestScope, typer runtime.ObjectTyper patchJS, err := readBody(req.Request) if err != nil { - errorJSON(err, scope.Codec, w) + scope.err(err, req, res) return } + s, ok := scope.Serializer.SerializerForMediaType("application/json", nil) + if !ok { + scope.err(fmt.Errorf("no serializer defined for JSON"), req, res) + return + } + gv := scope.Kind.GroupVersion() + codec := runtime.NewCodec( + scope.Serializer.EncoderForVersion(s, gv), + scope.Serializer.DecoderToVersion(s, unversioned.GroupVersion{Group: gv.Group, Version: runtime.APIVersionInternal}), + ) + updateAdmit := func(updatedObject runtime.Object) error { if admit != nil && admit.Handles(admission.Update) { userInfo, _ := api.UserFrom(ctx) @@ -497,18 +496,18 @@ func PatchResource(r rest.Patcher, scope RequestScope, typer runtime.ObjectTyper return nil } - result, err := patchResource(ctx, updateAdmit, timeout, versionedObj, r, name, patchType, patchJS, scope.Namer, scope.Codec) + result, err := patchResource(ctx, updateAdmit, timeout, versionedObj, r, name, patchType, patchJS, scope.Namer, codec) if err != nil { - errorJSON(err, scope.Codec, w) + scope.err(err, req, res) return } if err := setSelfLink(result, req, scope.Namer); err != nil { - errorJSON(err, scope.Codec, w) + scope.err(err, req, res) return } - write(http.StatusOK, scope.Kind.GroupVersion(), scope.Codec, result, w, req.Request) + write(http.StatusOK, scope.Kind.GroupVersion(), scope.Serializer, result, w, req.Request) } } @@ -625,7 +624,7 @@ func UpdateResource(r rest.Updater, scope RequestScope, typer runtime.ObjectType namespace, name, err := scope.Namer.Name(req) if err != nil { - errorJSON(err, scope.Codec, w) + scope.err(err, req, res) return } ctx := scope.ContextFunc(req) @@ -633,21 +632,33 @@ func UpdateResource(r rest.Updater, scope RequestScope, typer runtime.ObjectType body, err := readBody(req.Request) if err != nil { - errorJSON(err, scope.Codec, w) + scope.err(err, req, res) return } - obj := r.New() + s, err := negotiateInputSerializer(req.Request, scope.Serializer) + if err != nil { + scope.err(err, req, res) + return + } + defaultGVK := scope.Kind + original := r.New() trace.Step("About to convert to expected version") - if err := scope.Codec.DecodeIntoWithSpecifiedVersionKind(body, obj, scope.Kind); err != nil { - err = transformDecodeError(typer, err, obj, body) - errorJSON(err, scope.Codec, w) + obj, gvk, err := scope.Serializer.DecoderToVersion(s, defaultGVK.GroupVersion()).Decode(body, &defaultGVK, original) + if err != nil { + err = transformDecodeError(typer, err, original, gvk) + scope.err(err, req, res) + return + } + if gvk.GroupVersion() != defaultGVK.GroupVersion() { + err = errors.NewBadRequest(fmt.Sprintf("the API version in the data (%s) does not match the expected API version (%s)", gvk.GroupVersion(), defaultGVK.GroupVersion())) + scope.err(err, req, res) return } trace.Step("Conversion done") if err := checkName(obj, name, namespace, scope.Namer); err != nil { - errorJSON(err, scope.Codec, w) + scope.err(err, req, res) return } @@ -656,7 +667,7 @@ func UpdateResource(r rest.Updater, scope RequestScope, typer runtime.ObjectType err = admit.Admit(admission.NewAttributesRecord(obj, scope.Kind.GroupKind(), namespace, name, scope.Resource.GroupResource(), scope.Subresource, admission.Update, userInfo)) if err != nil { - errorJSON(err, scope.Codec, w) + scope.err(err, req, res) return } } @@ -669,13 +680,13 @@ func UpdateResource(r rest.Updater, scope RequestScope, typer runtime.ObjectType return obj, err }) if err != nil { - errorJSON(err, scope.Codec, w) + scope.err(err, req, res) return } trace.Step("Object stored in database") if err := setSelfLink(result, req, scope.Namer); err != nil { - errorJSON(err, scope.Codec, w) + scope.err(err, req, res) return } trace.Step("Self-link added") @@ -684,7 +695,7 @@ func UpdateResource(r rest.Updater, scope RequestScope, typer runtime.ObjectType if wasCreated { status = http.StatusCreated } - writeJSON(status, scope.Codec, result, w, isPrettyPrint(req.Request)) + write(status, scope.Kind.GroupVersion(), scope.Serializer, result, w, req.Request) } } @@ -702,7 +713,7 @@ func DeleteResource(r rest.GracefulDeleter, checkBody bool, scope RequestScope, namespace, name, err := scope.Namer.Name(req) if err != nil { - errorJSON(err, scope.Codec, w) + scope.err(err, req, res) return } ctx := scope.ContextFunc(req) @@ -712,12 +723,23 @@ func DeleteResource(r rest.GracefulDeleter, checkBody bool, scope RequestScope, if checkBody { body, err := readBody(req.Request) if err != nil { - errorJSON(err, scope.Codec, w) + scope.err(err, req, res) return } if len(body) > 0 { - if err := scope.Codec.DecodeInto(body, options); err != nil { - errorJSON(err, scope.Codec, w) + s, err := negotiateInputSerializer(req.Request, scope.Serializer) + if err != nil { + scope.err(err, req, res) + return + } + defaultGVK := scope.Kind.GroupVersion().WithKind("DeleteOptions") + obj, _, err := scope.Serializer.DecoderToVersion(s, defaultGVK.GroupVersion()).Decode(body, &defaultGVK, options) + if err != nil { + scope.err(err, req, res) + return + } + if obj != options { + scope.err(fmt.Errorf("decoded object cannot be converted to DeleteOptions"), req, res) return } } @@ -728,7 +750,7 @@ func DeleteResource(r rest.GracefulDeleter, checkBody bool, scope RequestScope, err = admit.Admit(admission.NewAttributesRecord(nil, scope.Kind.GroupKind(), namespace, name, scope.Resource.GroupResource(), scope.Subresource, admission.Delete, userInfo)) if err != nil { - errorJSON(err, scope.Codec, w) + scope.err(err, req, res) return } } @@ -738,7 +760,7 @@ func DeleteResource(r rest.GracefulDeleter, checkBody bool, scope RequestScope, return r.Delete(ctx, name, options) }) if err != nil { - errorJSON(err, scope.Codec, w) + scope.err(err, req, res) return } trace.Step("Object deleted from database") @@ -758,12 +780,12 @@ func DeleteResource(r rest.GracefulDeleter, checkBody bool, scope RequestScope, // when a non-status response is returned, set the self link if _, ok := result.(*unversioned.Status); !ok { if err := setSelfLink(result, req, scope.Namer); err != nil { - errorJSON(err, scope.Codec, w) + scope.err(err, req, res) return } } } - write(http.StatusOK, scope.Kind.GroupVersion(), scope.Codec, result, w, req.Request) + write(http.StatusOK, scope.Kind.GroupVersion(), scope.Serializer, result, w, req.Request) } } @@ -777,7 +799,7 @@ func DeleteCollection(r rest.CollectionDeleter, checkBody bool, scope RequestSco namespace, err := scope.Namer.Namespace(req) if err != nil { - errorJSON(err, scope.Codec, w) + scope.err(err, req, res) return } @@ -789,24 +811,14 @@ func DeleteCollection(r rest.CollectionDeleter, checkBody bool, scope RequestSco err = admit.Admit(admission.NewAttributesRecord(nil, scope.Kind.GroupKind(), namespace, "", scope.Resource.GroupResource(), scope.Subresource, admission.Delete, userInfo)) if err != nil { - errorJSON(err, scope.Codec, w) + scope.err(err, req, res) return } } - listOptionsGVK := scope.Kind.GroupVersion().WithKind("ListOptions") - versioned, err := scope.Creater.New(listOptionsGVK) - if err != nil { - errorJSON(err, scope.Codec, w) - return - } - if err := scope.Codec.DecodeParametersInto(req.Request.URL.Query(), versioned); err != nil { - errorJSON(err, scope.Codec, w) - return - } listOptions := api.ListOptions{} - if err := scope.Convertor.Convert(versioned, &listOptions); err != nil { - errorJSON(err, scope.Codec, w) + if err := scope.ParameterCodec.DecodeParameters(req.Request.URL.Query(), scope.Kind.GroupVersion(), &listOptions); err != nil { + scope.err(err, req, res) return } @@ -819,7 +831,7 @@ func DeleteCollection(r rest.CollectionDeleter, checkBody bool, scope RequestSco if listOptions.FieldSelector, err = listOptions.FieldSelector.Transform(fn); err != nil { // TODO: allow bad request to set field causes based on query parameters err = errors.NewBadRequest(err.Error()) - errorJSON(err, scope.Codec, w) + scope.err(err, req, res) return } } @@ -828,12 +840,23 @@ func DeleteCollection(r rest.CollectionDeleter, checkBody bool, scope RequestSco if checkBody { body, err := readBody(req.Request) if err != nil { - errorJSON(err, scope.Codec, w) + scope.err(err, req, res) return } if len(body) > 0 { - if err := scope.Codec.DecodeInto(body, options); err != nil { - errorJSON(err, scope.Codec, w) + s, err := negotiateInputSerializer(req.Request, scope.Serializer) + if err != nil { + scope.err(err, req, res) + return + } + defaultGVK := scope.Kind.GroupVersion().WithKind("DeleteOptions") + obj, _, err := scope.Serializer.DecoderToVersion(s, defaultGVK.GroupVersion()).Decode(body, &defaultGVK, options) + if err != nil { + scope.err(err, req, res) + return + } + if obj != options { + scope.err(fmt.Errorf("decoded object cannot be converted to DeleteOptions"), req, res) return } } @@ -843,7 +866,7 @@ func DeleteCollection(r rest.CollectionDeleter, checkBody bool, scope RequestSco return r.DeleteCollection(ctx, options, &listOptions) }) if err != nil { - errorJSON(err, scope.Codec, w) + scope.err(err, req, res) return } @@ -861,12 +884,12 @@ func DeleteCollection(r rest.CollectionDeleter, checkBody bool, scope RequestSco // when a non-status response is returned, set the self link if _, ok := result.(*unversioned.Status); !ok { if _, err := setListSelfLink(result, req, scope.Namer); err != nil { - errorJSON(err, scope.Codec, w) + scope.err(err, req, res) return } } } - write(http.StatusOK, scope.Kind.GroupVersion(), scope.Codec, result, w, req.Request) + writeNegotiated(scope.Serializer, scope.Kind.GroupVersion(), w, req.Request, http.StatusOK, result) } } @@ -911,15 +934,15 @@ func finishRequest(timeout time.Duration, fn resultFunc) (result runtime.Object, } // transformDecodeError adds additional information when a decode fails. -func transformDecodeError(typer runtime.ObjectTyper, baseErr error, into runtime.Object, body []byte) error { - objectGroupVersionKind, err := typer.ObjectKind(into) +func transformDecodeError(typer runtime.ObjectTyper, baseErr error, into runtime.Object, gvk *unversioned.GroupVersionKind) error { + objGVK, err := typer.ObjectKind(into) if err != nil { return err } - if dataGroupVersionKind, err := typer.DataKind(body); err == nil && len(dataGroupVersionKind.Kind) > 0 { - return errors.NewBadRequest(fmt.Sprintf("%s in version %v cannot be handled as a %s: %v", dataGroupVersionKind.Kind, dataGroupVersionKind.GroupVersion(), objectGroupVersionKind.Kind, baseErr)) + if gvk != nil && len(gvk.Kind) > 0 { + return errors.NewBadRequest(fmt.Sprintf("%s in version %q cannot be handled as a %s: %v", gvk.Kind, gvk.Version, objGVK.Kind, baseErr)) } - return errors.NewBadRequest(fmt.Sprintf("the object provided is unrecognized (must be of type %s): %v", objectGroupVersionKind.Kind, baseErr)) + return errors.NewBadRequest(fmt.Sprintf("the object provided is unrecognized (must be of type %s): %v", objGVK.Kind, baseErr)) } // setSelfLink sets the self link of an object (or the child items in a list) to the base URL of the request diff --git a/pkg/apiserver/resthandler_test.go b/pkg/apiserver/resthandler_test.go index 53d6f0c86c0..0ddf0056b72 100644 --- a/pkg/apiserver/resthandler_test.go +++ b/pkg/apiserver/resthandler_test.go @@ -28,6 +28,7 @@ import ( "k8s.io/kubernetes/pkg/api" apierrors "k8s.io/kubernetes/pkg/api/errors" + "k8s.io/kubernetes/pkg/api/testapi" "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/apimachinery/registered" "k8s.io/kubernetes/pkg/runtime" @@ -155,7 +156,7 @@ func (tc *patchTestCase) Run(t *testing.T) { namespace := tc.startingPod.Namespace name := tc.startingPod.Name - codec := registered.GroupOrDie(api.GroupName).Codec + codec := testapi.Default.Codec() admit := tc.admit if admit == nil { admit = func(updatedObject runtime.Object) error { diff --git a/pkg/apiserver/watch.go b/pkg/apiserver/watch.go index 3519e729079..8b951d9e539 100644 --- a/pkg/apiserver/watch.go +++ b/pkg/apiserver/watch.go @@ -65,23 +65,34 @@ func (w *realTimeoutFactory) TimeoutCh() (<-chan time.Time, func() bool) { } // serveWatch handles serving requests to the server -func serveWatch(watcher watch.Interface, scope RequestScope, w http.ResponseWriter, req *restful.Request, timeout time.Duration) { - watchServer := &WatchServer{watcher, scope.Codec, func(obj runtime.Object) { +func serveWatch(watcher watch.Interface, scope RequestScope, req *restful.Request, res *restful.Response, timeout time.Duration) { + s, mediaType, err := negotiateOutputSerializer(req.Request, scope.Serializer) + if err != nil { + scope.err(err, req, res) + return + } + // TODO: replace with typed serialization + if mediaType != "application/json" { + writeRawJSON(http.StatusNotAcceptable, (errNotAcceptable{[]string{"application/json"}}).Status(), res.ResponseWriter) + return + } + encoder := scope.Serializer.EncoderForVersion(s, scope.Kind.GroupVersion()) + watchServer := &WatchServer{watcher, encoder, func(obj runtime.Object) { if err := setSelfLink(obj, req, scope.Namer); err != nil { glog.V(5).Infof("Failed to set self link for object %v: %v", reflect.TypeOf(obj), err) } }, &realTimeoutFactory{timeout}} if isWebsocketRequest(req.Request) { - websocket.Handler(watchServer.HandleWS).ServeHTTP(httplog.Unlogged(w), req.Request) + websocket.Handler(watchServer.HandleWS).ServeHTTP(httplog.Unlogged(res.ResponseWriter), req.Request) } else { - watchServer.ServeHTTP(w, req.Request) + watchServer.ServeHTTP(res.ResponseWriter, req.Request) } } // WatchServer serves a watch.Interface over a websocket or vanilla HTTP. type WatchServer struct { watching watch.Interface - codec runtime.Codec + encoder runtime.Encoder fixup func(runtime.Object) t timeoutFactory } @@ -108,7 +119,7 @@ func (w *WatchServer) HandleWS(ws *websocket.Conn) { return } w.fixup(event.Object) - obj, err := watchjson.Object(w.codec, &event) + obj, err := watchjson.Object(w.encoder, &event) if err != nil { // Client disconnect. w.watching.Stop() @@ -134,20 +145,21 @@ func (self *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) { cn, ok := w.(http.CloseNotifier) if !ok { - loggedW.Addf("unable to get CloseNotifier") + loggedW.Addf("unable to get CloseNotifier: %#v", w) http.NotFound(w, req) return } flusher, ok := w.(http.Flusher) if !ok { - loggedW.Addf("unable to get Flusher") + loggedW.Addf("unable to get Flusher: %#v", w) http.NotFound(w, req) return } w.Header().Set("Transfer-Encoding", "chunked") w.WriteHeader(http.StatusOK) flusher.Flush() - encoder := watchjson.NewEncoder(w, self.codec) + // TODO: use arbitrary serialization on watch + encoder := watchjson.NewEncoder(w, self.encoder) for { select { case <-cn.CloseNotify(): diff --git a/pkg/apiserver/watch_test.go b/pkg/apiserver/watch_test.go index 130a799c19a..4ff824e000c 100644 --- a/pkg/apiserver/watch_test.go +++ b/pkg/apiserver/watch_test.go @@ -169,6 +169,34 @@ func TestWatchHTTP(t *testing.T) { } } +func TestWatchHTTPAccept(t *testing.T) { + simpleStorage := &SimpleRESTStorage{} + handler := handle(map[string]rest.Storage{"simples": simpleStorage}) + server := httptest.NewServer(handler) + defer server.Close() + client := http.Client{} + + dest, _ := url.Parse(server.URL) + dest.Path = "/" + prefix + "/" + testGroupVersion.Group + "/" + testGroupVersion.Version + "/watch/simples" + dest.RawQuery = "" + + request, err := http.NewRequest("GET", dest.String(), nil) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + + request.Header.Set("Accept", "application/yaml") + response, err := client.Do(request) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + + // TODO: once this is fixed, this test will change + if response.StatusCode != http.StatusNotAcceptable { + t.Errorf("Unexpected response %#v", response) + } +} + func TestWatchParamParsing(t *testing.T) { simpleStorage := &SimpleRESTStorage{} handler := handle(map[string]rest.Storage{ diff --git a/pkg/genericapiserver/genericapiserver.go b/pkg/genericapiserver/genericapiserver.go index c538d557b28..18be54f5cae 100644 --- a/pkg/genericapiserver/genericapiserver.go +++ b/pkg/genericapiserver/genericapiserver.go @@ -148,6 +148,15 @@ type APIGroupInfo struct { // If nil, defaults to groupMeta.GroupVersion. // TODO: Remove this when https://github.com/kubernetes/kubernetes/issues/19018 is fixed. OptionsExternalVersion *unversioned.GroupVersion + + // Scheme includes all of the types used by this group and how to convert between them (or + // to convert objects from outside of this group that are accepted in this API). + // TODO: replace with interfaces + Scheme *runtime.Scheme + // NegotiatedSerializer controls how this group encodes and decodes data + NegotiatedSerializer runtime.NegotiatedSerializer + // ParameterCodec performs conversions for query parameters passed to API calls + ParameterCodec runtime.ParameterCodec } // Config is a structure used to configure a GenericAPIServer. @@ -275,6 +284,10 @@ type GenericAPIServer struct { // storage contains the RESTful endpoints exposed by this GenericAPIServer storage map[string]rest.Storage + // Serializer controls how common API objects not in a group/version prefix are serialized for this server. + // Individual APIGroups may define their own serializers. + Serializer runtime.NegotiatedSerializer + // "Outputs" Handler http.Handler InsecureHandler http.Handler diff --git a/pkg/genericapiserver/genericapiserver_test.go b/pkg/genericapiserver/genericapiserver_test.go index 671f6cccce4..e12e7e9b2a0 100644 --- a/pkg/genericapiserver/genericapiserver_test.go +++ b/pkg/genericapiserver/genericapiserver_test.go @@ -94,6 +94,7 @@ func TestInstallAPIGroups(t *testing.T) { config.ProxyTLSClientConfig = &tls.Config{} config.APIPrefix = "/apiPrefix" config.APIGroupPrefix = "/apiGroupPrefix" + config.Serializer = latest.Codecs s := New(&config) apiGroupMeta := registered.GroupOrDie(api.GroupName) diff --git a/test/integration/master_test.go b/test/integration/master_test.go index cfd95db08c6..bd389fa3b88 100644 --- a/test/integration/master_test.go +++ b/test/integration/master_test.go @@ -51,3 +51,73 @@ func TestWatchSucceedsWithoutArgs(t *testing.T) { } resp.Body.Close() } + +func TestAccept(t *testing.T) { + _, s := framework.RunAMaster(t) + defer s.Close() + + resp, err := http.Get(s.URL + "/api/") + if err != nil { + t.Fatalf("unexpected error getting api: %v", err) + } + if resp.StatusCode != http.StatusOK { + t.Fatalf("got status %v instead of 200 OK", resp.StatusCode) + } + + body, _ := ioutil.ReadAll(resp.Body) + if resp.Header.Get("Content-Type") != "application/json" { + t.Errorf("unexpected content: %s", body) + } + if err := json.Unmarshal(body, &map[string]interface{}{}); err != nil { + t.Fatal(err) + } + + req, err := http.NewRequest("GET", s.URL+"/api/", nil) + if err != nil { + t.Fatal(err) + } + req.Header.Set("Accept", "application/yaml") + resp, err = http.DefaultClient.Do(req) + if err != nil { + t.Fatal(err) + } + body, _ = ioutil.ReadAll(resp.Body) + if resp.Header.Get("Content-Type") != "application/yaml" { + t.Errorf("unexpected content: %s", body) + } + t.Logf("body: %s", body) + if err := yaml.Unmarshal(body, &map[string]interface{}{}); err != nil { + t.Fatal(err) + } + + req, err = http.NewRequest("GET", s.URL+"/api/", nil) + if err != nil { + t.Fatal(err) + } + req.Header.Set("Accept", "application/json, application/yaml") + resp, err = http.DefaultClient.Do(req) + if err != nil { + t.Fatal(err) + } + body, _ = ioutil.ReadAll(resp.Body) + if resp.Header.Get("Content-Type") != "application/json" { + t.Errorf("unexpected content: %s", body) + } + t.Logf("body: %s", body) + if err := yaml.Unmarshal(body, &map[string]interface{}{}); err != nil { + t.Fatal(err) + } + + req, err = http.NewRequest("GET", s.URL+"/api/", nil) + if err != nil { + t.Fatal(err) + } + req.Header.Set("Accept", "application") // not a valid media type + resp, err = http.DefaultClient.Do(req) + if err != nil { + t.Fatal(err) + } + if resp.StatusCode != http.StatusNotAcceptable { + t.Errorf("unexpected error from the server") + } +}