From d167c11b59fceac66e1901d9c7718b90f09563b8 Mon Sep 17 00:00:00 2001 From: Clayton Coleman Date: Mon, 9 Feb 2015 09:47:13 -0500 Subject: [PATCH] Remove layers of indirection between apiinstaller and resthandler Make the RESTHandler feel more go-restful, set the stage for adding new types of subresource collections. --- pkg/api/errors/errors.go | 17 + pkg/apiserver/api_installer.go | 144 ++++++-- pkg/apiserver/apiserver.go | 59 +-- pkg/apiserver/apiserver_test.go | 210 ++++++++++- pkg/apiserver/errors.go | 16 +- pkg/apiserver/interfaces.go | 10 +- pkg/apiserver/resthandler.go | 581 ++++++++++++++++-------------- pkg/apiserver/resthandler_test.go | 69 ---- pkg/apiserver/watch.go | 18 +- test/integration/auth_test.go | 67 ++-- 10 files changed, 747 insertions(+), 444 deletions(-) delete mode 100644 pkg/apiserver/resthandler_test.go diff --git a/pkg/api/errors/errors.go b/pkg/api/errors/errors.go index e4345761ab1..6507ed8082c 100644 --- a/pkg/api/errors/errors.go +++ b/pkg/api/errors/errors.go @@ -29,6 +29,12 @@ import ( const ( StatusUnprocessableEntity = 422 StatusTooManyRequests = 429 + // HTTP recommendations are for servers to define 5xx error codes + // for scenarios not covered by behavior. In this case, TryAgainLater + // is an indication that a transient server error has occured and the + // client *should* retry, with an optional Retry-After header to specify + // the back off window. + StatusTryAgainLater = 504 ) // StatusError is an error intended for consumption by a REST API server; it can also be @@ -202,6 +208,17 @@ func NewInternalError(err error) error { }} } +// NewTimeoutError returns an error indicating that a timeout occurred before the request +// could be completed. Clients may retry, but the operation may still complete. +func NewTimeoutError(message string) error { + return &StatusError{api.Status{ + Status: api.StatusFailure, + Code: StatusTryAgainLater, + Reason: api.StatusReasonTimeout, + Message: fmt.Sprintf("Timeout: %s", message), + }} +} + // IsNotFound returns true if the specified error was created by NewNotFoundErr. func IsNotFound(err error) bool { return reasonForError(err) == api.StatusReasonNotFound diff --git a/pkg/apiserver/api_installer.go b/pkg/apiserver/api_installer.go index c0b19bb7a00..7ae84677fd0 100644 --- a/pkg/apiserver/api_installer.go +++ b/pkg/apiserver/api_installer.go @@ -17,20 +17,24 @@ limitations under the License. package apiserver import ( + "fmt" "net/http" + "net/url" + gpath "path" "reflect" + "strings" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/meta" + "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/emicklei/go-restful" ) type APIInstaller struct { - prefix string // Path prefix where API resources are to be registered. - version string // The API version being installed. - restHandler *RESTHandler - mapper meta.RESTMapper + group *APIGroupVersion + prefix string // Path prefix where API resources are to be registered. + version string // The API version being installed. } // Struct capturing information about an action ("GET", "POST", "WATCH", PROXY", etc). @@ -49,16 +53,16 @@ func (a *APIInstaller) Install() (ws *restful.WebService, errors []error) { // Initialize the custom handlers. watchHandler := (&WatchHandler{ - storage: a.restHandler.storage, - codec: a.restHandler.codec, - canonicalPrefix: a.restHandler.canonicalPrefix, - selfLinker: a.restHandler.selfLinker, - apiRequestInfoResolver: a.restHandler.apiRequestInfoResolver, + storage: a.group.storage, + codec: a.group.codec, + prefix: a.group.prefix, + linker: a.group.linker, + info: a.group.info, }) - redirectHandler := (&RedirectHandler{a.restHandler.storage, a.restHandler.codec, a.restHandler.apiRequestInfoResolver}) - proxyHandler := (&ProxyHandler{a.prefix + "/proxy/", a.restHandler.storage, a.restHandler.codec, a.restHandler.apiRequestInfoResolver}) + redirectHandler := (&RedirectHandler{a.group.storage, a.group.codec, a.group.info}) + proxyHandler := (&ProxyHandler{a.prefix + "/proxy/", a.group.storage, a.group.codec, a.group.info}) - for path, storage := range a.restHandler.storage { + for path, storage := range a.group.storage { if err := a.registerResourceHandlers(path, storage, ws, watchHandler, redirectHandler, proxyHandler); err != nil { errors = append(errors, err) } @@ -78,8 +82,11 @@ func (a *APIInstaller) newWebService() *restful.WebService { } func (a *APIInstaller) registerResourceHandlers(path string, storage RESTStorage, ws *restful.WebService, watchHandler http.Handler, redirectHandler http.Handler, proxyHandler http.Handler) error { - // Handler for standard REST verbs (GET, PUT, POST and DELETE). - restVerbHandler := restfulStripPrefix(a.prefix, a.restHandler) + codec := a.group.codec + admit := a.group.admit + linker := a.group.linker + resource := path + object := storage.New() // TODO: add scheme to APIInstaller rather than using api.Scheme _, kind, err := api.Scheme.ObjectVersionAndKind(object) @@ -103,28 +110,31 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage RESTStorage versionedList = indirectArbitraryPointer(versionedListPtr) } - mapping, err := a.mapper.RESTMapping(kind, a.version) + mapping, err := a.group.mapper.RESTMapping(kind, a.version) if err != nil { return err } // what verbs are supported by the storage, used to know what verbs we support per path storageVerbs := map[string]bool{} - if _, ok := storage.(RESTCreater); ok { - // Handler for standard REST verbs (GET, PUT, POST and DELETE). + creater, ok := storage.(RESTCreater) + if ok { storageVerbs["RESTCreater"] = true } - if _, ok := storage.(RESTLister); ok { - // Handler for standard REST verbs (GET, PUT, POST and DELETE). + lister, ok := storage.(RESTLister) + if ok { storageVerbs["RESTLister"] = true } - if _, ok := storage.(RESTGetter); ok { + getter, ok := storage.(RESTGetter) + if ok { storageVerbs["RESTGetter"] = true } - if _, ok := storage.(RESTDeleter); ok { + deleter, ok := storage.(RESTDeleter) + if ok { storageVerbs["RESTDeleter"] = true } - if _, ok := storage.(RESTUpdater); ok { + updater, ok := storage.(RESTUpdater) + if ok { storageVerbs["RESTUpdater"] = true } if _, ok := storage.(ResourceWatcher); ok { @@ -134,6 +144,14 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage RESTStorage storageVerbs["Redirector"] = true } + var namespaceFn ResourceNamespaceFunc + var nameFn ResourceNameFunc + var generateLinkFn linkFunc + var objNameFn ObjectNameFunc + linkFn := func(req *restful.Request, obj runtime.Object) error { + return setSelfLink(obj, req.Request, a.group.linker, generateLinkFn) + } + allowWatchList := storageVerbs["ResourceWatcher"] && storageVerbs["RESTLister"] // watching on lists is allowed only for kinds that support both watch and list. scope := mapping.Scope nameParam := ws.PathParameter("name", "name of the "+kind).DataType("string") @@ -141,6 +159,11 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage RESTStorage actions := []action{} // Get the list of actions for the given scope. if scope.Name() != meta.RESTScopeNameNamespace { + objNameFn = func(obj runtime.Object) (namespace, name string, err error) { + name, err = linker.Name(obj) + return + } + // Handler for standard REST verbs (GET, PUT, POST and DELETE). actions = appendIf(actions, action{"LIST", path, params}, storageVerbs["RESTLister"]) actions = appendIf(actions, action{"POST", path, params}, storageVerbs["RESTCreater"]) @@ -148,6 +171,19 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage RESTStorage itemPath := path + "/{name}" nameParams := append(params, nameParam) + namespaceFn = func(req *restful.Request) (namespace string, err error) { + return + } + nameFn = func(req *restful.Request) (namespace, name string, err error) { + name = req.PathParameter("name") + return + } + generateLinkFn = func(namespace, name string) (path string, query string) { + path = strings.Replace(itemPath, "{name}", name, 1) + path = gpath.Join(a.prefix, path) + return + } + actions = appendIf(actions, action{"GET", itemPath, nameParams}, storageVerbs["RESTGetter"]) actions = appendIf(actions, action{"PUT", itemPath, nameParams}, storageVerbs["RESTUpdater"]) actions = appendIf(actions, action{"DELETE", itemPath, nameParams}, storageVerbs["RESTDeleter"]) @@ -156,18 +192,46 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage RESTStorage actions = appendIf(actions, action{"PROXY", "/proxy/" + itemPath + "/{path:*}", nameParams}, storageVerbs["Redirector"]) actions = appendIf(actions, action{"PROXY", "/proxy/" + itemPath, nameParams}, storageVerbs["Redirector"]) } else { + objNameFn = func(obj runtime.Object) (namespace, name string, err error) { + if name, err = linker.Name(obj); err != nil { + return + } + namespace, err = linker.Namespace(obj) + return + } + // v1beta3 format with namespace in path if scope.ParamPath() { // Handler for standard REST verbs (GET, PUT, POST and DELETE). namespaceParam := ws.PathParameter(scope.ParamName(), scope.ParamDescription()).DataType("string") namespacedPath := scope.ParamName() + "/{" + scope.ParamName() + "}/" + path namespaceParams := []*restful.Parameter{namespaceParam} + namespaceFn = func(req *restful.Request) (namespace string, err error) { + namespace = req.PathParameter(scope.ParamName()) + if len(namespace) == 0 { + namespace = api.NamespaceDefault + } + return + } + actions = appendIf(actions, action{"LIST", namespacedPath, namespaceParams}, storageVerbs["RESTLister"]) actions = appendIf(actions, action{"POST", namespacedPath, namespaceParams}, storageVerbs["RESTCreater"]) actions = appendIf(actions, action{"WATCHLIST", "/watch/" + namespacedPath, namespaceParams}, allowWatchList) itemPath := namespacedPath + "/{name}" nameParams := append(namespaceParams, nameParam) + nameFn = func(req *restful.Request) (namespace, name string, err error) { + namespace, _ = namespaceFn(req) + name = req.PathParameter("name") + return + } + generateLinkFn = func(namespace, name string) (path string, query string) { + path = strings.Replace(itemPath, "{name}", name, 1) + path = strings.Replace(path, "{"+scope.ParamName()+"}", namespace, 1) + path = gpath.Join(a.prefix, path) + return + } + actions = appendIf(actions, action{"GET", itemPath, nameParams}, storageVerbs["RESTGetter"]) actions = appendIf(actions, action{"PUT", itemPath, nameParams}, storageVerbs["RESTUpdater"]) actions = appendIf(actions, action{"DELETE", itemPath, nameParams}, storageVerbs["RESTDeleter"]) @@ -184,12 +248,36 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage RESTStorage // v1beta1/v1beta2 format where namespace was a query parameter namespaceParam := ws.QueryParameter(scope.ParamName(), scope.ParamDescription()).DataType("string") namespaceParams := []*restful.Parameter{namespaceParam} + namespaceFn = func(req *restful.Request) (namespace string, err error) { + namespace = req.QueryParameter(scope.ParamName()) + if len(namespace) == 0 { + namespace = api.NamespaceDefault + } + return + } + actions = appendIf(actions, action{"LIST", path, namespaceParams}, storageVerbs["RESTLister"]) actions = appendIf(actions, action{"POST", path, namespaceParams}, storageVerbs["RESTCreater"]) actions = appendIf(actions, action{"WATCHLIST", "/watch/" + path, namespaceParams}, allowWatchList) itemPath := path + "/{name}" nameParams := append(namespaceParams, nameParam) + nameFn = func(req *restful.Request) (namespace, name string, err error) { + namespace, _ = namespaceFn(req) + name = req.PathParameter("name") + return + } + generateLinkFn = func(namespace, name string) (path string, query string) { + path = strings.Replace(itemPath, "{name}", name, -1) + path = gpath.Join(a.prefix, path) + if len(namespace) > 0 { + values := make(url.Values) + values.Set(scope.ParamName(), namespace) + query = values.Encode() + } + return + } + actions = appendIf(actions, action{"GET", itemPath, nameParams}, storageVerbs["RESTGetter"]) actions = appendIf(actions, action{"PUT", itemPath, nameParams}, storageVerbs["RESTUpdater"]) actions = appendIf(actions, action{"DELETE", itemPath, nameParams}, storageVerbs["RESTDeleter"]) @@ -220,35 +308,35 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage RESTStorage for _, action := range actions { switch action.Verb { case "GET": // Get a resource. - route := ws.GET(action.Path).To(restVerbHandler). + route := ws.GET(action.Path).To(GetResource(getter, nameFn, linkFn, codec)). Doc("read the specified " + kind). Operation("read" + kind). Writes(versionedObject) addParams(route, action.Params) ws.Route(route) case "LIST": // List all resources of a kind. - route := ws.GET(action.Path).To(restVerbHandler). + route := ws.GET(action.Path).To(ListResource(lister, namespaceFn, linkFn, codec)). Doc("list objects of kind " + kind). Operation("list" + kind). Writes(versionedList) addParams(route, action.Params) ws.Route(route) case "PUT": // Update a resource. - route := ws.PUT(action.Path).To(restVerbHandler). + route := ws.PUT(action.Path).To(UpdateResource(updater, nameFn, objNameFn, linkFn, codec, resource, admit)). Doc("update the specified " + kind). Operation("update" + kind). Reads(versionedObject) addParams(route, action.Params) ws.Route(route) case "POST": // Create a resource. - route := ws.POST(action.Path).To(restVerbHandler). + route := ws.POST(action.Path).To(CreateResource(creater, namespaceFn, linkFn, codec, resource, admit)). Doc("create a " + kind). Operation("create" + kind). Reads(versionedObject) addParams(route, action.Params) ws.Route(route) case "DELETE": // Delete a resource. - route := ws.DELETE(action.Path).To(restVerbHandler). + route := ws.DELETE(action.Path).To(DeleteResource(deleter, nameFn, linkFn, codec, resource, kind, admit)). Doc("delete a " + kind). Operation("delete" + kind) addParams(route, action.Params) @@ -281,6 +369,8 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage RESTStorage addProxyRoute(ws, "PUT", a.prefix, action.Path, proxyHandler, kind, action.Params) addProxyRoute(ws, "POST", a.prefix, action.Path, proxyHandler, kind, action.Params) addProxyRoute(ws, "DELETE", a.prefix, action.Path, proxyHandler, kind, action.Params) + default: + return fmt.Errorf("unrecognized action verb: %s", action.Verb) } // Note: update GetAttribs() when adding a custom handler. } diff --git a/pkg/apiserver/apiserver.go b/pkg/apiserver/apiserver.go index 738639eb257..682634ca034 100644 --- a/pkg/apiserver/apiserver.go +++ b/pkg/apiserver/apiserver.go @@ -89,9 +89,9 @@ type defaultAPIServer struct { // as RESTful resources at prefix, serialized by codec, and also includes the support // http resources. // Note: This method is used only in tests. -func Handle(storage map[string]RESTStorage, codec runtime.Codec, root string, version string, selfLinker runtime.SelfLinker, admissionControl admission.Interface, mapper meta.RESTMapper) http.Handler { - prefix := root + "/" + version - group := NewAPIGroupVersion(storage, codec, root, prefix, selfLinker, admissionControl, mapper) +func Handle(storage map[string]RESTStorage, codec runtime.Codec, root string, version string, linker runtime.SelfLinker, admissionControl admission.Interface, mapper meta.RESTMapper) http.Handler { + prefix := path.Join(root, version) + group := NewAPIGroupVersion(storage, codec, root, prefix, linker, admissionControl, mapper) container := restful.NewContainer() container.Router(restful.CurlyRouter{}) mux := container.ServeMux @@ -102,16 +102,19 @@ func Handle(storage map[string]RESTStorage, codec runtime.Codec, root string, ve return &defaultAPIServer{mux, group} } -// TODO: This is a whole API version right now. Maybe should rename it. -// APIGroupVersion is a http.Handler that exposes multiple RESTStorage objects +// APIGroupVersion is a helper for exposing RESTStorage objects as http.Handlers via go-restful // It handles URLs of the form: // /${storage_key}[/${object_name}] // Where 'storage_key' points to a RESTStorage object stored in storage. -// -// TODO: consider migrating this to go-restful which is a more full-featured version of the same thing. type APIGroupVersion struct { - handler RESTHandler + storage map[string]RESTStorage + codec runtime.Codec + prefix string + linker runtime.SelfLinker + admit admission.Interface mapper meta.RESTMapper + // TODO: put me into a cleaner interface + info *APIRequestInfoResolver } // NewAPIGroupVersion returns an object that will serve a set of REST resources and their @@ -119,18 +122,15 @@ type APIGroupVersion struct { // This is a helper method for registering multiple sets of REST handlers under different // prefixes onto a server. // TODO: add multitype codec serialization -func NewAPIGroupVersion(storage map[string]RESTStorage, codec runtime.Codec, apiRoot, canonicalPrefix string, selfLinker runtime.SelfLinker, admissionControl admission.Interface, mapper meta.RESTMapper) *APIGroupVersion { +func NewAPIGroupVersion(storage map[string]RESTStorage, codec runtime.Codec, root, prefix string, linker runtime.SelfLinker, admissionControl admission.Interface, mapper meta.RESTMapper) *APIGroupVersion { return &APIGroupVersion{ - handler: RESTHandler{ - storage: storage, - codec: codec, - canonicalPrefix: canonicalPrefix, - selfLinker: selfLinker, - ops: NewOperations(), - admissionControl: admissionControl, - apiRequestInfoResolver: &APIRequestInfoResolver{util.NewStringSet(apiRoot), latest.RESTMapper}, - }, - mapper: mapper, + storage: storage, + codec: codec, + prefix: prefix, + linker: linker, + admit: admissionControl, + mapper: mapper, + info: &APIRequestInfoResolver{util.NewStringSet(root), latest.RESTMapper}, } } @@ -139,7 +139,12 @@ func NewAPIGroupVersion(storage map[string]RESTStorage, codec runtime.Codec, api // in a slash. A restful WebService is created for the group and version. func (g *APIGroupVersion) InstallREST(container *restful.Container, root string, version string) error { prefix := path.Join(root, version) - ws, registrationErrors := (&APIInstaller{prefix, version, &g.handler, g.mapper}).Install() + installer := &APIInstaller{ + group: g, + prefix: prefix, + version: version, + } + ws, registrationErrors := installer.Install() container.Add(ws) return errors.NewAggregate(registrationErrors) } @@ -186,15 +191,15 @@ func AddApiWebService(container *restful.Container, apiPrefix string, versions [ // TODO: InstallREST should register each version automatically versionHandler := APIVersionHandler(versions[:]...) - getApiVersionsWebService := new(restful.WebService) - getApiVersionsWebService.Path(apiPrefix) - getApiVersionsWebService.Doc("get available api versions") - getApiVersionsWebService.Route(getApiVersionsWebService.GET("/").To(versionHandler). - Doc("get available api versions"). - Operation("getApiVersions"). + ws := new(restful.WebService) + ws.Path(apiPrefix) + ws.Doc("get available API versions") + ws.Route(ws.GET("/").To(versionHandler). + Doc("get available API versions"). + Operation("getAPIVersions"). Produces(restful.MIME_JSON). Consumes(restful.MIME_JSON)) - container.Add(getApiVersionsWebService) + container.Add(ws) } // handleVersion writes the server's version information. diff --git a/pkg/apiserver/apiserver_test.go b/pkg/apiserver/apiserver_test.go index 2adca83fbce..f00e66076a7 100644 --- a/pkg/apiserver/apiserver_test.go +++ b/pkg/apiserver/apiserver_test.go @@ -489,7 +489,9 @@ func TestGet(t *testing.T) { } selfLinker := &setTestSelfLinker{ t: t, - expectedSet: "/prefix/version/simple/id", + expectedSet: "/prefix/version/simple/id?namespace=default", + name: "id", + namespace: "default", } storage["simple"] = &simpleStorage handler := Handle(storage, codec, "/prefix", testVersion, selfLinker, admissionControl, mapper) @@ -497,6 +499,12 @@ func TestGet(t *testing.T) { defer server.Close() resp, err := http.Get(server.URL + "/prefix/version/simple/id") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if resp.StatusCode != http.StatusOK { + t.Fatalf("unexpected response: %#v", resp) + } var itemOut Simple body, err := extractBody(resp, &itemOut) if err != nil { @@ -511,6 +519,81 @@ func TestGet(t *testing.T) { } } +func TestGetAlternateSelfLink(t *testing.T) { + storage := map[string]RESTStorage{} + simpleStorage := SimpleRESTStorage{ + item: Simple{ + Other: "foo", + }, + } + selfLinker := &setTestSelfLinker{ + t: t, + expectedSet: "/prefix/version/simple/id?namespace=test", + name: "id", + namespace: "test", + } + storage["simple"] = &simpleStorage + handler := Handle(storage, codec, "/prefix", testVersion, selfLinker, admissionControl, legacyNamespaceMapper) + server := httptest.NewServer(handler) + defer server.Close() + + resp, err := http.Get(server.URL + "/prefix/version/simple/id?namespace=test") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if resp.StatusCode != http.StatusOK { + t.Fatalf("unexpected response: %#v", resp) + } + var itemOut Simple + body, err := extractBody(resp, &itemOut) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if itemOut.Name != simpleStorage.item.Name { + t.Errorf("Unexpected data: %#v, expected %#v (%s)", itemOut, simpleStorage.item, string(body)) + } + if !selfLinker.called { + t.Errorf("Never set self link") + } +} + +func TestGetNamespaceSelfLink(t *testing.T) { + storage := map[string]RESTStorage{} + simpleStorage := SimpleRESTStorage{ + item: Simple{ + Other: "foo", + }, + } + selfLinker := &setTestSelfLinker{ + t: t, + expectedSet: "/prefix/version/namespaces/foo/simple/id", + name: "id", + namespace: "foo", + } + storage["simple"] = &simpleStorage + handler := Handle(storage, codec, "/prefix", testVersion, selfLinker, admissionControl, namespaceMapper) + server := httptest.NewServer(handler) + defer server.Close() + + resp, err := http.Get(server.URL + "/prefix/version/namespaces/foo/simple/id") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if resp.StatusCode != http.StatusOK { + t.Fatalf("unexpected response: %#v", resp) + } + var itemOut Simple + body, err := extractBody(resp, &itemOut) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if itemOut.Name != simpleStorage.item.Name { + t.Errorf("Unexpected data: %#v, expected %#v (%s)", itemOut, simpleStorage.item, string(body)) + } + if !selfLinker.called { + t.Errorf("Never set self link") + } +} func TestGetMissing(t *testing.T) { storage := map[string]RESTStorage{} simpleStorage := SimpleRESTStorage{ @@ -542,11 +625,13 @@ func TestDelete(t *testing.T) { client := http.Client{} request, err := http.NewRequest("DELETE", server.URL+"/prefix/version/simple/"+ID, nil) - _, err = client.Do(request) + res, err := client.Do(request) if err != nil { - t.Errorf("unexpected error: %v", err) + t.Fatalf("unexpected error: %v", err) + } + if res.StatusCode != http.StatusOK { + t.Errorf("unexpected response: %#v", res) } - if simpleStorage.deleted != ID { t.Errorf("Unexpected delete: %s, expected %s", simpleStorage.deleted, ID) } @@ -602,13 +687,19 @@ func TestUpdate(t *testing.T) { storage["simple"] = &simpleStorage selfLinker := &setTestSelfLinker{ t: t, - expectedSet: "/prefix/version/simple/" + ID, + expectedSet: "/prefix/version/simple/" + ID + "?namespace=default", + name: ID, + namespace: api.NamespaceDefault, } handler := Handle(storage, codec, "/prefix", testVersion, selfLinker, admissionControl, mapper) server := httptest.NewServer(handler) defer server.Close() item := &Simple{ + ObjectMeta: api.ObjectMeta{ + Name: ID, + Namespace: "", // update should allow the client to send an empty namespace + }, Other: "bar", } body, err := codec.Encode(item) @@ -637,15 +728,15 @@ func TestUpdateInvokesAdmissionControl(t *testing.T) { simpleStorage := SimpleRESTStorage{} ID := "id" storage["simple"] = &simpleStorage - selfLinker := &setTestSelfLinker{ - t: t, - expectedSet: "/prefix/version/simple/" + ID, - } handler := Handle(storage, codec, "/prefix", testVersion, selfLinker, deny.NewAlwaysDeny(), mapper) server := httptest.NewServer(handler) defer server.Close() item := &Simple{ + ObjectMeta: api.ObjectMeta{ + Name: ID, + Namespace: api.NamespaceDefault, + }, Other: "bar", } body, err := codec.Encode(item) @@ -665,6 +756,100 @@ func TestUpdateInvokesAdmissionControl(t *testing.T) { } } +func TestUpdateRequiresMatchingName(t *testing.T) { + storage := map[string]RESTStorage{} + simpleStorage := SimpleRESTStorage{} + ID := "id" + storage["simple"] = &simpleStorage + handler := Handle(storage, codec, "/prefix", testVersion, selfLinker, deny.NewAlwaysDeny(), mapper) + server := httptest.NewServer(handler) + defer server.Close() + + item := &Simple{ + Other: "bar", + } + body, err := codec.Encode(item) + if err != nil { + // The following cases will fail, so die now + t.Fatalf("unexpected error: %v", err) + } + + client := http.Client{} + request, err := http.NewRequest("PUT", server.URL+"/prefix/version/simple/"+ID, bytes.NewReader(body)) + response, err := client.Do(request) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + if response.StatusCode != http.StatusBadRequest { + t.Errorf("Unexpected response %#v", response) + } +} + +func TestUpdateAllowsMissingNamespace(t *testing.T) { + storage := map[string]RESTStorage{} + simpleStorage := SimpleRESTStorage{} + ID := "id" + storage["simple"] = &simpleStorage + handler := Handle(storage, codec, "/prefix", testVersion, selfLinker, admissionControl, mapper) + server := httptest.NewServer(handler) + defer server.Close() + + item := &Simple{ + ObjectMeta: api.ObjectMeta{ + Name: ID, + }, + Other: "bar", + } + body, err := codec.Encode(item) + if err != nil { + // The following cases will fail, so die now + t.Fatalf("unexpected error: %v", err) + } + + client := http.Client{} + request, err := http.NewRequest("PUT", server.URL+"/prefix/version/simple/"+ID, bytes.NewReader(body)) + response, err := client.Do(request) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + if response.StatusCode != http.StatusOK { + t.Errorf("Unexpected response %#v", response) + } +} + +func TestUpdatePreventsMismatchedNamespace(t *testing.T) { + storage := map[string]RESTStorage{} + simpleStorage := SimpleRESTStorage{} + ID := "id" + storage["simple"] = &simpleStorage + handler := Handle(storage, codec, "/prefix", testVersion, selfLinker, admissionControl, mapper) + server := httptest.NewServer(handler) + defer server.Close() + + item := &Simple{ + ObjectMeta: api.ObjectMeta{ + Name: ID, + Namespace: "other", + }, + Other: "bar", + } + body, err := codec.Encode(item) + if err != nil { + // The following cases will fail, so die now + t.Fatalf("unexpected error: %v", err) + } + + client := http.Client{} + request, err := http.NewRequest("PUT", server.URL+"/prefix/version/simple/"+ID, bytes.NewReader(body)) + response, err := client.Do(request) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + if response.StatusCode != http.StatusBadRequest { + t.Errorf("Unexpected response %#v", response) + } +} + func TestUpdateMissing(t *testing.T) { storage := map[string]RESTStorage{} ID := "id" @@ -677,6 +862,10 @@ func TestUpdateMissing(t *testing.T) { defer server.Close() item := &Simple{ + ObjectMeta: api.ObjectMeta{ + Name: ID, + Namespace: api.NamespaceDefault, + }, Other: "bar", } body, err := codec.Encode(item) @@ -690,7 +879,6 @@ func TestUpdateMissing(t *testing.T) { if err != nil { t.Errorf("unexpected error: %v", err) } - if response.StatusCode != http.StatusNotFound { t.Errorf("Unexpected response %#v", response) } @@ -961,7 +1149,7 @@ func TestCreateTimeout(t *testing.T) { simple := &Simple{Other: "foo"} data, _ := codec.Encode(simple) - itemOut := expectApiStatus(t, "POST", server.URL+"/prefix/version/foo?timeout=4ms", data, http.StatusAccepted) + itemOut := expectApiStatus(t, "POST", server.URL+"/prefix/version/foo?timeout=4ms", data, apierrs.StatusTryAgainLater) if itemOut.Status != api.StatusFailure || itemOut.Reason != api.StatusReasonTimeout { t.Errorf("Unexpected status %#v", itemOut) } diff --git a/pkg/apiserver/errors.go b/pkg/apiserver/errors.go index 91b7e564147..a05d7afffb7 100644 --- a/pkg/apiserver/errors.go +++ b/pkg/apiserver/errors.go @@ -35,7 +35,21 @@ func errToAPIStatus(err error) *api.Status { switch t := err.(type) { case statusError: status := t.Status() - status.Status = api.StatusFailure + if len(status.Status) == 0 { + } + switch status.Status { + case api.StatusSuccess: + if status.Code == 0 { + status.Code = http.StatusOK + } + case "": + status.Status = api.StatusFailure + fallthrough + case api.StatusFailure: + if status.Code == 0 { + status.Code = http.StatusInternalServerError + } + } //TODO: check for invalid responses return &status default: diff --git a/pkg/apiserver/interfaces.go b/pkg/apiserver/interfaces.go index 94f58a6747e..4bf2329e00f 100644 --- a/pkg/apiserver/interfaces.go +++ b/pkg/apiserver/interfaces.go @@ -56,14 +56,22 @@ type RESTDeleter interface { } type RESTCreater interface { + // New returns an empty object that can be used with Create after request data has been put into it. + // This object must be a pointer type for use with Codec.DecodeInto([]byte, runtime.Object) + New() runtime.Object + // Create creates a new version of a resource. Create(ctx api.Context, obj runtime.Object) (<-chan RESTResult, error) } type RESTUpdater interface { + // New returns an empty object that can be used with Update after request data has been put into it. + // This object must be a pointer type for use with Codec.DecodeInto([]byte, runtime.Object) + New() runtime.Object + // Update finds a resource in the storage and updates it. Some implementations // may allow updates creates the object - they should set the Created flag of - // the returned RESTResultto true. In the event of an asynchronous error returned + // the returned RESTResult to true. In the event of an asynchronous error returned // via an api.Status object, the Created flag is ignored. Update(ctx api.Context, obj runtime.Object) (<-chan RESTResult, error) } diff --git a/pkg/apiserver/resthandler.go b/pkg/apiserver/resthandler.go index e8cb3a35e92..11e725aa19e 100644 --- a/pkg/apiserver/resthandler.go +++ b/pkg/apiserver/resthandler.go @@ -17,8 +17,8 @@ limitations under the License. package apiserver import ( + "fmt" "net/http" - "path" "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/admission" @@ -27,73 +27,322 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" - "github.com/golang/glog" + "github.com/emicklei/go-restful" ) -// RESTHandler implements HTTP verbs on a set of RESTful resources identified by name. -type RESTHandler struct { - storage map[string]RESTStorage - codec runtime.Codec - canonicalPrefix string - selfLinker runtime.SelfLinker - ops *Operations - admissionControl admission.Interface - apiRequestInfoResolver *APIRequestInfoResolver +// ResourceNameFunc returns a name (and optional namespace) given a request - if no name is present +// an error must be returned. +type ResourceNameFunc func(req *restful.Request) (namespace, name string, err error) + +// ObjectNameFunc returns the name (and optional namespace) of an object +type ObjectNameFunc func(obj runtime.Object) (namespace, name string, err error) + +// ResourceNamespaceFunc returns the namespace associated with the given request - if no namespace +// is present an error must be returned. +type ResourceNamespaceFunc func(req *restful.Request) (namespace string, err error) + +// LinkResourceFunc updates the provided object with a SelfLink that is appropriate for the current +// request. +type LinkResourceFunc func(req *restful.Request, obj runtime.Object) error + +// GetResource returns a function that handles retrieving a single resource from a RESTStorage object. +func GetResource(r RESTGetter, nameFn ResourceNameFunc, linkFn LinkResourceFunc, codec runtime.Codec) restful.RouteFunction { + return func(req *restful.Request, res *restful.Response) { + w := res.ResponseWriter + namespace, name, err := nameFn(req) + if err != nil { + notFound(w, req.Request) + return + } + ctx := api.NewContext() + if len(namespace) > 0 { + ctx = api.WithNamespace(ctx, namespace) + } + item, err := r.Get(ctx, name) + if err != nil { + errorJSON(err, codec, w) + return + } + if err := linkFn(req, item); err != nil { + errorJSON(err, codec, w) + return + } + writeJSON(http.StatusOK, codec, item, w) + } } -// ServeHTTP handles requests to all RESTStorage objects. -func (h *RESTHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { - var verb string - var apiResource string - var httpCode int - reqStart := time.Now() - defer func() { monitor("rest", verb, apiResource, httpCode, reqStart) }() +// ListResource returns a function that handles retrieving a list of resources from a RESTStorage object. +func ListResource(r RESTLister, namespaceFn ResourceNamespaceFunc, linkFn LinkResourceFunc, codec runtime.Codec) restful.RouteFunction { + return func(req *restful.Request, res *restful.Response) { + w := res.ResponseWriter - requestInfo, err := h.apiRequestInfoResolver.GetAPIRequestInfo(req) - if err != nil { - glog.Errorf("Unable to handle request %s %s %v", requestInfo.Namespace, requestInfo.Kind, err) - notFound(w, req) - httpCode = http.StatusNotFound - return + namespace, err := namespaceFn(req) + if err != nil { + notFound(w, req.Request) + return + } + ctx := api.NewContext() + if len(namespace) > 0 { + ctx = api.WithNamespace(ctx, namespace) + } + label, err := labels.ParseSelector(req.Request.URL.Query().Get("labels")) + if err != nil { + errorJSON(err, codec, w) + return + } + field, err := labels.ParseSelector(req.Request.URL.Query().Get("fields")) + if err != nil { + errorJSON(err, codec, w) + return + } + + item, err := r.List(ctx, label, field) + if err != nil { + errorJSON(err, codec, w) + return + } + if err := linkFn(req, item); err != nil { + errorJSON(err, codec, w) + return + } + writeJSON(http.StatusOK, codec, item, w) } - verb = requestInfo.Verb - - storage, ok := h.storage[requestInfo.Resource] - if !ok { - notFound(w, req) - httpCode = http.StatusNotFound - return - } - apiResource = requestInfo.Resource - - httpCode = h.handleRESTStorage(requestInfo.Parts, req, w, storage, requestInfo.Namespace, requestInfo.Resource) } -// Sets the SelfLink field of the object. -func (h *RESTHandler) setSelfLink(obj runtime.Object, req *http.Request) error { - newURL := *req.URL - newURL.Path = path.Join(h.canonicalPrefix, req.URL.Path) - newURL.RawQuery = "" - newURL.Fragment = "" - namespace, err := h.selfLinker.Namespace(obj) +// CreateResource returns a function that will handle a resource creation. +func CreateResource(r RESTCreater, namespaceFn ResourceNamespaceFunc, linkFn LinkResourceFunc, codec runtime.Codec, resource string, admit admission.Interface) restful.RouteFunction { + return func(req *restful.Request, res *restful.Response) { + w := res.ResponseWriter + + // TODO: we either want to remove timeout or document it (if we document, move timeout out of this function and declare it in api_installer) + timeout := parseTimeout(req.Request.URL.Query().Get("timeout")) + + namespace, err := namespaceFn(req) + if err != nil { + notFound(w, req.Request) + return + } + ctx := api.NewContext() + if len(namespace) > 0 { + ctx = api.WithNamespace(ctx, namespace) + } + + body, err := readBody(req.Request) + if err != nil { + errorJSON(err, codec, w) + return + } + + obj := r.New() + if err := codec.DecodeInto(body, obj); err != nil { + errorJSON(err, codec, w) + return + } + + err = admit.Admit(admission.NewAttributesRecord(obj, namespace, resource, "CREATE")) + if err != nil { + errorJSON(err, codec, w) + return + } + + out, err := r.Create(ctx, obj) + if err != nil { + errorJSON(err, codec, w) + return + } + + result, err := finishRequest(out, timeout, codec) + if err != nil { + errorJSON(err, codec, w) + return + } + + item := result.Object + if err := linkFn(req, item); err != nil { + errorJSON(err, codec, w) + return + } + + status := http.StatusOK + if result.Created { + status = http.StatusCreated + } + writeJSON(status, codec, item, w) + } +} + +// UpdateResource returns a function that will handle a resource update +func UpdateResource(r RESTUpdater, nameFn ResourceNameFunc, objNameFunc ObjectNameFunc, linkFn LinkResourceFunc, codec runtime.Codec, resource string, admit admission.Interface) restful.RouteFunction { + return func(req *restful.Request, res *restful.Response) { + w := res.ResponseWriter + + // TODO: we either want to remove timeout or document it (if we document, move timeout out of this function and declare it in api_installer) + timeout := parseTimeout(req.Request.URL.Query().Get("timeout")) + + namespace, name, err := nameFn(req) + if err != nil { + notFound(w, req.Request) + return + } + ctx := api.NewContext() + if len(namespace) > 0 { + ctx = api.WithNamespace(ctx, namespace) + } + + body, err := readBody(req.Request) + if err != nil { + errorJSON(err, codec, w) + return + } + + obj := r.New() + if err := codec.DecodeInto(body, obj); err != nil { + errorJSON(err, codec, w) + return + } + + objNamespace, objName, err := objNameFunc(obj) + if err != nil { + errorJSON(err, codec, w) + return + } + if objName != name { + errorJSON(errors.NewBadRequest("the name of the object does not match the name on the URL"), codec, w) + return + } + if len(namespace) > 0 { + if len(objNamespace) > 0 && objNamespace != namespace { + errorJSON(errors.NewBadRequest("the namespace of the object does not match the namespace on the request"), codec, w) + return + } + } + + err = admit.Admit(admission.NewAttributesRecord(obj, namespace, resource, "UPDATE")) + if err != nil { + errorJSON(err, codec, w) + return + } + + out, err := r.Update(ctx, obj) + if err != nil { + errorJSON(err, codec, w) + return + } + + result, err := finishRequest(out, timeout, codec) + if err != nil { + errorJSON(err, codec, w) + return + } + + item := result.Object + if err := linkFn(req, item); err != nil { + errorJSON(err, codec, w) + return + } + + status := http.StatusOK + if result.Created { + status = http.StatusCreated + } + writeJSON(status, codec, item, w) + } +} + +// DeleteResource returns a function that will handle a resource deletion +func DeleteResource(r RESTDeleter, nameFn ResourceNameFunc, linkFn LinkResourceFunc, codec runtime.Codec, resource, kind string, admit admission.Interface) restful.RouteFunction { + return func(req *restful.Request, res *restful.Response) { + w := res.ResponseWriter + + // TODO: we either want to remove timeout or document it (if we document, move timeout out of this function and declare it in api_installer) + timeout := parseTimeout(req.Request.URL.Query().Get("timeout")) + + namespace, name, err := nameFn(req) + if err != nil { + notFound(w, req.Request) + return + } + ctx := api.NewContext() + if len(namespace) > 0 { + ctx = api.WithNamespace(ctx, namespace) + } + + err = admit.Admit(admission.NewAttributesRecord(nil, namespace, resource, "DELETE")) + if err != nil { + errorJSON(err, codec, w) + return + } + + out, err := r.Delete(ctx, name) + if err != nil { + errorJSON(err, codec, w) + return + } + + result, err := finishRequest(out, timeout, codec) + if err != nil { + errorJSON(err, codec, w) + return + } + + // if the RESTDeleter returns a nil object, fill out a status. Callers may return a valid + // object with the response. + item := result.Object + if item == nil { + item = &api.Status{ + Status: api.StatusSuccess, + Code: http.StatusOK, + Details: &api.StatusDetails{ + ID: name, + Kind: kind, + }, + } + } + writeJSON(http.StatusOK, codec, item, w) + } +} + +// finishRequest waits for the result channel to close or clear, and writes the appropriate response. +// Any api.Status object returned is considered an "error", which interrupts the normal response flow. +func finishRequest(ch <-chan RESTResult, timeout time.Duration, codec runtime.Codec) (*RESTResult, error) { + select { + case result, ok := <-ch: + if !ok { + // likely programming error + return nil, fmt.Errorf("operation channel closed without returning result") + } + if status, ok := result.Object.(*api.Status); ok { + return nil, errors.FromObject(status) + } + return &result, nil + case <-time.After(timeout): + return nil, errors.NewTimeoutError("request did not complete within allowed duration") + } +} + +type linkFunc func(namespace, name string) (path string, query string) + +// setSelfLink sets the self link of an object (or the child items in a list) to the base URL of the request +// plus the path and query generated by the provided linkFunc +func setSelfLink(obj runtime.Object, req *http.Request, linker runtime.SelfLinker, fn linkFunc) error { + namespace, err := linker.Namespace(obj) if err != nil { return err } - - // we need to add namespace as a query param, if its not in the resource path - if len(namespace) > 0 { - parts := splitPath(req.URL.Path) - if parts[0] != "ns" { - query := newURL.Query() - query.Set("namespace", namespace) - newURL.RawQuery = query.Encode() - } - } - - err = h.selfLinker.SetSelfLink(obj, newURL.String()) + name, err := linker.Name(obj) if err != nil { return err } + path, query := fn(namespace, name) + + newURL := *req.URL + newURL.Path = path + newURL.RawQuery = query + newURL.Fragment = "" + + if err := linker.SetSelfLink(obj, newURL.String()); err != nil { + return err + } if !runtime.IsListType(obj) { return nil } @@ -104,231 +353,9 @@ func (h *RESTHandler) setSelfLink(obj runtime.Object, req *http.Request) error { return err } for i := range items { - if err := h.setSelfLinkAddName(items[i], req); err != nil { + if err := setSelfLink(items[i], req, linker, fn); err != nil { return err } } return runtime.SetList(obj, items) } - -// Like setSelfLink, but appends the object's name. -func (h *RESTHandler) setSelfLinkAddName(obj runtime.Object, req *http.Request) error { - name, err := h.selfLinker.Name(obj) - if err != nil { - return err - } - namespace, err := h.selfLinker.Namespace(obj) - if err != nil { - return err - } - newURL := *req.URL - newURL.Path = path.Join(h.canonicalPrefix, req.URL.Path, name) - newURL.RawQuery = "" - newURL.Fragment = "" - // we need to add namespace as a query param, if its not in the resource path - if len(namespace) > 0 { - parts := splitPath(req.URL.Path) - if parts[0] != "ns" { - query := newURL.Query() - query.Set("namespace", namespace) - newURL.RawQuery = query.Encode() - } - } - return h.selfLinker.SetSelfLink(obj, newURL.String()) -} - -// curry adapts either of the self link setting functions into a function appropriate for operation's hook. -func curry(f func(runtime.Object, *http.Request) error, req *http.Request) func(RESTResult) { - return func(obj RESTResult) { - if err := f(obj.Object, req); err != nil { - glog.Errorf("unable to set self link for %#v: %v", obj, err) - } - } -} - -// handleRESTStorage is the main dispatcher for a storage object. It switches on the HTTP method, and then -// on path length, according to the following table: -// Method Path Action -// GET /foo list -// GET /foo/bar get 'bar' -// POST /foo create -// PUT /foo/bar update 'bar' -// DELETE /foo/bar delete 'bar' -// Responds with a 404 if the method/pattern doesn't match one of these entries. -// The s accepts several query parameters: -// timeout= Timeout for synchronous requests -// labels= Used for filtering list operations -// Returns the HTTP status code written to the response. -func (h *RESTHandler) handleRESTStorage(parts []string, req *http.Request, w http.ResponseWriter, storage RESTStorage, namespace, kind string) int { - ctx := api.WithNamespace(api.NewContext(), namespace) - // TODO: Document the timeout query parameter. - timeout := parseTimeout(req.URL.Query().Get("timeout")) - switch req.Method { - case "GET": - switch len(parts) { - case 1: - label, err := labels.ParseSelector(req.URL.Query().Get("labels")) - if err != nil { - return errorJSON(err, h.codec, w) - } - field, err := labels.ParseSelector(req.URL.Query().Get("fields")) - if err != nil { - return errorJSON(err, h.codec, w) - } - lister, ok := storage.(RESTLister) - if !ok { - return errorJSON(errors.NewMethodNotSupported(kind, "list"), h.codec, w) - } - list, err := lister.List(ctx, label, field) - if err != nil { - return errorJSON(err, h.codec, w) - } - if err := h.setSelfLink(list, req); err != nil { - return errorJSON(err, h.codec, w) - } - writeJSON(http.StatusOK, h.codec, list, w) - case 2: - getter, ok := storage.(RESTGetter) - if !ok { - return errorJSON(errors.NewMethodNotSupported(kind, "get"), h.codec, w) - } - item, err := getter.Get(ctx, parts[1]) - if err != nil { - return errorJSON(err, h.codec, w) - } - if err := h.setSelfLink(item, req); err != nil { - return errorJSON(err, h.codec, w) - } - writeJSON(http.StatusOK, h.codec, item, w) - default: - notFound(w, req) - return http.StatusNotFound - } - - case "POST": - if len(parts) != 1 { - notFound(w, req) - return http.StatusNotFound - } - creater, ok := storage.(RESTCreater) - if !ok { - return errorJSON(errors.NewMethodNotSupported(kind, "create"), h.codec, w) - } - - body, err := readBody(req) - if err != nil { - return errorJSON(err, h.codec, w) - } - obj := storage.New() - err = h.codec.DecodeInto(body, obj) - if err != nil { - return errorJSON(err, h.codec, w) - } - - // invoke admission control - err = h.admissionControl.Admit(admission.NewAttributesRecord(obj, namespace, parts[0], "CREATE")) - if err != nil { - return errorJSON(err, h.codec, w) - } - - out, err := creater.Create(ctx, obj) - if err != nil { - return errorJSON(err, h.codec, w) - } - op := h.createOperation(out, timeout, curry(h.setSelfLinkAddName, req)) - return h.finishReq(op, req, w) - - case "DELETE": - if len(parts) != 2 { - notFound(w, req) - return http.StatusNotFound - } - deleter, ok := storage.(RESTDeleter) - if !ok { - return errorJSON(errors.NewMethodNotSupported(kind, "delete"), h.codec, w) - } - - // invoke admission control - err := h.admissionControl.Admit(admission.NewAttributesRecord(nil, namespace, parts[0], "DELETE")) - if err != nil { - return errorJSON(err, h.codec, w) - } - - out, err := deleter.Delete(ctx, parts[1]) - if err != nil { - return errorJSON(err, h.codec, w) - } - op := h.createOperation(out, timeout, nil) - return h.finishReq(op, req, w) - - case "PUT": - if len(parts) != 2 { - notFound(w, req) - return http.StatusNotFound - } - updater, ok := storage.(RESTUpdater) - if !ok { - return errorJSON(errors.NewMethodNotSupported(kind, "create"), h.codec, w) - } - - body, err := readBody(req) - if err != nil { - return errorJSON(err, h.codec, w) - } - obj := storage.New() - err = h.codec.DecodeInto(body, obj) - if err != nil { - return errorJSON(err, h.codec, w) - } - - // invoke admission control - err = h.admissionControl.Admit(admission.NewAttributesRecord(obj, namespace, parts[0], "UPDATE")) - if err != nil { - return errorJSON(err, h.codec, w) - } - - out, err := updater.Update(ctx, obj) - if err != nil { - return errorJSON(err, h.codec, w) - } - op := h.createOperation(out, timeout, curry(h.setSelfLink, req)) - return h.finishReq(op, req, w) - - default: - notFound(w, req) - return http.StatusNotFound - } - return http.StatusOK -} - -// createOperation creates an operation to process a channel response. -func (h *RESTHandler) createOperation(out <-chan RESTResult, timeout time.Duration, onReceive func(RESTResult)) *Operation { - op := h.ops.NewOperation(out, onReceive) - op.WaitFor(timeout) - return op -} - -// finishReq finishes up a request, waiting until the operation finishes or, after a timeout, creating an -// Operation to receive the result and returning its ID down the writer. -// Returns the HTTP status code written to the response. -func (h *RESTHandler) finishReq(op *Operation, req *http.Request, w http.ResponseWriter) int { - result, complete := op.StatusOrResult() - obj := result.Object - var status int - if complete { - status = http.StatusOK - if result.Created { - status = http.StatusCreated - } - switch stat := obj.(type) { - case *api.Status: - if stat.Code != 0 { - status = stat.Code - } - } - } else { - status = http.StatusAccepted - } - writeJSON(status, h.codec, obj, w) - return status -} diff --git a/pkg/apiserver/resthandler_test.go b/pkg/apiserver/resthandler_test.go deleted file mode 100644 index 2f471beda19..00000000000 --- a/pkg/apiserver/resthandler_test.go +++ /dev/null @@ -1,69 +0,0 @@ -/* -Copyright 2014 Google Inc. All rights reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package apiserver - -import ( - "encoding/json" - "net/http" - "net/http/httptest" - "testing" - "time" - - "github.com/GoogleCloudPlatform/kubernetes/pkg/api" -) - -func TestFinishReq(t *testing.T) { - handler := &RESTHandler{codec: api.Codec} - op := &Operation{finished: &time.Time{}, result: RESTResult{Object: &api.Status{Code: http.StatusNotFound}}} - resp := httptest.NewRecorder() - handler.finishReq(op, nil, resp) - status := &api.Status{} - if err := json.Unmarshal([]byte(resp.Body.String()), status); err != nil { - t.Fatalf("unexpected error: %v", err) - } - if resp.Code != http.StatusNotFound || status.Code != http.StatusNotFound { - t.Errorf("unexpected status: %#v", status) - } -} - -func TestFinishReqUnwrap(t *testing.T) { - handler := &RESTHandler{codec: api.Codec} - op := &Operation{finished: &time.Time{}, result: RESTResult{Created: true, Object: &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}}} - resp := httptest.NewRecorder() - handler.finishReq(op, nil, resp) - obj := &api.Pod{} - if err := json.Unmarshal([]byte(resp.Body.String()), obj); err != nil { - t.Fatalf("unexpected error: %v", err) - } - if resp.Code != http.StatusCreated || obj.Name != "foo" { - t.Errorf("unexpected object: %#v", obj) - } -} - -func TestFinishReqUnwrapStatus(t *testing.T) { - handler := &RESTHandler{codec: api.Codec} - op := &Operation{finished: &time.Time{}, result: RESTResult{Created: true, Object: &api.Status{Code: http.StatusNotFound}}} - resp := httptest.NewRecorder() - handler.finishReq(op, nil, resp) - obj := &api.Status{} - if err := json.Unmarshal([]byte(resp.Body.String()), obj); err != nil { - t.Fatalf("unexpected error: %v", err) - } - if resp.Code != http.StatusNotFound || obj.Code != http.StatusNotFound { - t.Errorf("unexpected object: %#v", obj) - } -} diff --git a/pkg/apiserver/watch.go b/pkg/apiserver/watch.go index 9f6578be991..ae3178504d0 100644 --- a/pkg/apiserver/watch.go +++ b/pkg/apiserver/watch.go @@ -37,24 +37,24 @@ import ( ) type WatchHandler struct { - storage map[string]RESTStorage - codec runtime.Codec - canonicalPrefix string - selfLinker runtime.SelfLinker - apiRequestInfoResolver *APIRequestInfoResolver + storage map[string]RESTStorage + codec runtime.Codec + prefix string + linker runtime.SelfLinker + info *APIRequestInfoResolver } // setSelfLinkAddName sets the self link, appending the object's name to the canonical path & type. func (h *WatchHandler) setSelfLinkAddName(obj runtime.Object, req *http.Request) error { - name, err := h.selfLinker.Name(obj) + name, err := h.linker.Name(obj) if err != nil { return err } newURL := *req.URL - newURL.Path = path.Join(h.canonicalPrefix, req.URL.Path, name) + newURL.Path = path.Join(h.prefix, req.URL.Path, name) newURL.RawQuery = "" newURL.Fragment = "" - return h.selfLinker.SetSelfLink(obj, newURL.String()) + return h.linker.SetSelfLink(obj, newURL.String()) } func getWatchParams(query url.Values) (label, field labels.Selector, resourceVersion string, err error) { @@ -96,7 +96,7 @@ func (h *WatchHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { return } - requestInfo, err := h.apiRequestInfoResolver.GetAPIRequestInfo(req) + requestInfo, err := h.info.GetAPIRequestInfo(req) if err != nil { notFound(w, req) httpCode = http.StatusNotFound diff --git a/test/integration/auth_test.go b/test/integration/auth_test.go index 397c2c4b020..63b66e51a08 100644 --- a/test/integration/auth_test.go +++ b/test/integration/auth_test.go @@ -78,6 +78,20 @@ var aPod string = ` }%s } ` +var aPodInBar string = ` +{ + "kind": "Pod", + "apiVersion": "v1beta1", + "id": "a", + "desiredState": { + "manifest": { + "version": "v1beta1", + "id": "a", + "containers": [{ "name": "foo", "image": "bar/foo", }] + } + }%s +} +` var aRC string = ` { "kind": "ReplicationController", @@ -126,7 +140,6 @@ var aEvent string = ` { "kind": "Event", "apiVersion": "v1beta1", - "namespace": "default", "id": "a", "involvedObject": { "kind": "Minion", @@ -316,14 +329,16 @@ func TestAuthModeAlwaysAllow(t *testing.T) { t.Logf("case %v", r) var bodyStr string if r.body != "" { - bodyStr = fmt.Sprintf(r.body, "") + sub := "" if r.verb == "PUT" && r.body != "" { // For update operations, insert previous resource version if resVersion := previousResourceVersion[getPreviousResourceVersionKey(r.URL, "")]; resVersion != 0 { - resourceVersionJson := fmt.Sprintf(",\r\n\"resourceVersion\": %v", resVersion) - bodyStr = fmt.Sprintf(r.body, resourceVersionJson) + sub += fmt.Sprintf(",\r\n\"resourceVersion\": %v", resVersion) } + namespace := "default" + sub += fmt.Sprintf(",\r\n\"namespace\": %v", namespace) } + bodyStr = fmt.Sprintf(r.body, sub) } bodyBytes := bytes.NewReader([]byte(bodyStr)) req, err := http.NewRequest(r.verb, s.URL+r.URL, bodyBytes) @@ -483,14 +498,16 @@ func TestAliceNotForbiddenOrUnauthorized(t *testing.T) { t.Logf("case %v", r) var bodyStr string if r.body != "" { - bodyStr = fmt.Sprintf(r.body, "") + sub := "" if r.verb == "PUT" && r.body != "" { // For update operations, insert previous resource version if resVersion := previousResourceVersion[getPreviousResourceVersionKey(r.URL, "")]; resVersion != 0 { - resourceVersionJson := fmt.Sprintf(",\r\n\"resourceVersion\": %v", resVersion) - bodyStr = fmt.Sprintf(r.body, resourceVersionJson) + sub += fmt.Sprintf(",\r\n\"resourceVersion\": %v", resVersion) } + namespace := "default" + sub += fmt.Sprintf(",\r\n\"namespace\": %v", namespace) } + bodyStr = fmt.Sprintf(r.body, sub) } bodyBytes := bytes.NewReader([]byte(bodyStr)) req, err := http.NewRequest(r.verb, s.URL+r.URL, bodyBytes) @@ -705,24 +722,25 @@ func TestNamespaceAuthorization(t *testing.T) { requests := []struct { verb string URL string + namespace string body string statusCodes map[int]bool // allowed status codes. }{ - {"POST", "/api/v1beta1/pods" + timeoutFlag + "&namespace=foo", aPod, code200}, - {"GET", "/api/v1beta1/pods?namespace=foo", "", code200}, - {"GET", "/api/v1beta1/pods/a?namespace=foo", "", code200}, - {"DELETE", "/api/v1beta1/pods/a" + timeoutFlag + "&namespace=foo", "", code200}, + {"POST", "/api/v1beta1/pods" + timeoutFlag + "&namespace=foo", "foo", aPod, code200}, + {"GET", "/api/v1beta1/pods?namespace=foo", "foo", "", code200}, + {"GET", "/api/v1beta1/pods/a?namespace=foo", "foo", "", code200}, + {"DELETE", "/api/v1beta1/pods/a" + timeoutFlag + "&namespace=foo", "foo", "", code200}, - {"POST", "/api/v1beta1/pods" + timeoutFlag + "&namespace=bar", aPod, code403}, - {"GET", "/api/v1beta1/pods?namespace=bar", "", code403}, - {"GET", "/api/v1beta1/pods/a?namespace=bar", "", code403}, - {"DELETE", "/api/v1beta1/pods/a" + timeoutFlag + "&namespace=bar", "", code403}, + {"POST", "/api/v1beta1/pods" + timeoutFlag + "&namespace=bar", "bar", aPod, code403}, + {"GET", "/api/v1beta1/pods?namespace=bar", "bar", "", code403}, + {"GET", "/api/v1beta1/pods/a?namespace=bar", "bar", "", code403}, + {"DELETE", "/api/v1beta1/pods/a" + timeoutFlag + "&namespace=bar", "bar", "", code403}, - {"POST", "/api/v1beta1/pods" + timeoutFlag, aPod, code403}, - {"GET", "/api/v1beta1/pods", "", code403}, - {"GET", "/api/v1beta1/pods/a", "", code403}, - {"DELETE", "/api/v1beta1/pods/a" + timeoutFlag, "", code403}, + {"POST", "/api/v1beta1/pods" + timeoutFlag, "", aPod, code403}, + {"GET", "/api/v1beta1/pods", "", "", code403}, + {"GET", "/api/v1beta1/pods/a", "", "", code403}, + {"DELETE", "/api/v1beta1/pods/a" + timeoutFlag, "", "", code403}, } for _, r := range requests { @@ -730,14 +748,19 @@ func TestNamespaceAuthorization(t *testing.T) { t.Logf("case %v", r) var bodyStr string if r.body != "" { - bodyStr = fmt.Sprintf(r.body, "") + sub := "" if r.verb == "PUT" && r.body != "" { // For update operations, insert previous resource version if resVersion := previousResourceVersion[getPreviousResourceVersionKey(r.URL, "")]; resVersion != 0 { - resourceVersionJson := fmt.Sprintf(",\r\n\"resourceVersion\": %v", resVersion) - bodyStr = fmt.Sprintf(r.body, resourceVersionJson) + sub += fmt.Sprintf(",\r\n\"resourceVersion\": %v", resVersion) } + namespace := r.namespace + if len(namespace) == 0 { + namespace = "default" + } + sub += fmt.Sprintf(",\r\n\"namespace\": %v", namespace) } + bodyStr = fmt.Sprintf(r.body, sub) } bodyBytes := bytes.NewReader([]byte(bodyStr)) req, err := http.NewRequest(r.verb, s.URL+r.URL, bodyBytes)