Remove layers of indirection between apiinstaller and resthandler

Make the RESTHandler feel more go-restful, set the stage for adding
new types of subresource collections.
This commit is contained in:
Clayton Coleman
2015-02-09 09:47:13 -05:00
parent 55e8357c0f
commit d167c11b59
10 changed files with 747 additions and 444 deletions

View File

@@ -17,8 +17,8 @@ limitations under the License.
package apiserver
import (
"fmt"
"net/http"
"path"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/admission"
@@ -27,73 +27,322 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/golang/glog"
"github.com/emicklei/go-restful"
)
// RESTHandler implements HTTP verbs on a set of RESTful resources identified by name.
type RESTHandler struct {
storage map[string]RESTStorage
codec runtime.Codec
canonicalPrefix string
selfLinker runtime.SelfLinker
ops *Operations
admissionControl admission.Interface
apiRequestInfoResolver *APIRequestInfoResolver
// ResourceNameFunc returns a name (and optional namespace) given a request - if no name is present
// an error must be returned.
type ResourceNameFunc func(req *restful.Request) (namespace, name string, err error)
// ObjectNameFunc returns the name (and optional namespace) of an object
type ObjectNameFunc func(obj runtime.Object) (namespace, name string, err error)
// ResourceNamespaceFunc returns the namespace associated with the given request - if no namespace
// is present an error must be returned.
type ResourceNamespaceFunc func(req *restful.Request) (namespace string, err error)
// LinkResourceFunc updates the provided object with a SelfLink that is appropriate for the current
// request.
type LinkResourceFunc func(req *restful.Request, obj runtime.Object) error
// GetResource returns a function that handles retrieving a single resource from a RESTStorage object.
func GetResource(r RESTGetter, nameFn ResourceNameFunc, linkFn LinkResourceFunc, codec runtime.Codec) restful.RouteFunction {
return func(req *restful.Request, res *restful.Response) {
w := res.ResponseWriter
namespace, name, err := nameFn(req)
if err != nil {
notFound(w, req.Request)
return
}
ctx := api.NewContext()
if len(namespace) > 0 {
ctx = api.WithNamespace(ctx, namespace)
}
item, err := r.Get(ctx, name)
if err != nil {
errorJSON(err, codec, w)
return
}
if err := linkFn(req, item); err != nil {
errorJSON(err, codec, w)
return
}
writeJSON(http.StatusOK, codec, item, w)
}
}
// ServeHTTP handles requests to all RESTStorage objects.
func (h *RESTHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
var verb string
var apiResource string
var httpCode int
reqStart := time.Now()
defer func() { monitor("rest", verb, apiResource, httpCode, reqStart) }()
// ListResource returns a function that handles retrieving a list of resources from a RESTStorage object.
func ListResource(r RESTLister, namespaceFn ResourceNamespaceFunc, linkFn LinkResourceFunc, codec runtime.Codec) restful.RouteFunction {
return func(req *restful.Request, res *restful.Response) {
w := res.ResponseWriter
requestInfo, err := h.apiRequestInfoResolver.GetAPIRequestInfo(req)
if err != nil {
glog.Errorf("Unable to handle request %s %s %v", requestInfo.Namespace, requestInfo.Kind, err)
notFound(w, req)
httpCode = http.StatusNotFound
return
namespace, err := namespaceFn(req)
if err != nil {
notFound(w, req.Request)
return
}
ctx := api.NewContext()
if len(namespace) > 0 {
ctx = api.WithNamespace(ctx, namespace)
}
label, err := labels.ParseSelector(req.Request.URL.Query().Get("labels"))
if err != nil {
errorJSON(err, codec, w)
return
}
field, err := labels.ParseSelector(req.Request.URL.Query().Get("fields"))
if err != nil {
errorJSON(err, codec, w)
return
}
item, err := r.List(ctx, label, field)
if err != nil {
errorJSON(err, codec, w)
return
}
if err := linkFn(req, item); err != nil {
errorJSON(err, codec, w)
return
}
writeJSON(http.StatusOK, codec, item, w)
}
verb = requestInfo.Verb
storage, ok := h.storage[requestInfo.Resource]
if !ok {
notFound(w, req)
httpCode = http.StatusNotFound
return
}
apiResource = requestInfo.Resource
httpCode = h.handleRESTStorage(requestInfo.Parts, req, w, storage, requestInfo.Namespace, requestInfo.Resource)
}
// Sets the SelfLink field of the object.
func (h *RESTHandler) setSelfLink(obj runtime.Object, req *http.Request) error {
newURL := *req.URL
newURL.Path = path.Join(h.canonicalPrefix, req.URL.Path)
newURL.RawQuery = ""
newURL.Fragment = ""
namespace, err := h.selfLinker.Namespace(obj)
// CreateResource returns a function that will handle a resource creation.
func CreateResource(r RESTCreater, namespaceFn ResourceNamespaceFunc, linkFn LinkResourceFunc, codec runtime.Codec, resource string, admit admission.Interface) restful.RouteFunction {
return func(req *restful.Request, res *restful.Response) {
w := res.ResponseWriter
// TODO: we either want to remove timeout or document it (if we document, move timeout out of this function and declare it in api_installer)
timeout := parseTimeout(req.Request.URL.Query().Get("timeout"))
namespace, err := namespaceFn(req)
if err != nil {
notFound(w, req.Request)
return
}
ctx := api.NewContext()
if len(namespace) > 0 {
ctx = api.WithNamespace(ctx, namespace)
}
body, err := readBody(req.Request)
if err != nil {
errorJSON(err, codec, w)
return
}
obj := r.New()
if err := codec.DecodeInto(body, obj); err != nil {
errorJSON(err, codec, w)
return
}
err = admit.Admit(admission.NewAttributesRecord(obj, namespace, resource, "CREATE"))
if err != nil {
errorJSON(err, codec, w)
return
}
out, err := r.Create(ctx, obj)
if err != nil {
errorJSON(err, codec, w)
return
}
result, err := finishRequest(out, timeout, codec)
if err != nil {
errorJSON(err, codec, w)
return
}
item := result.Object
if err := linkFn(req, item); err != nil {
errorJSON(err, codec, w)
return
}
status := http.StatusOK
if result.Created {
status = http.StatusCreated
}
writeJSON(status, codec, item, w)
}
}
// UpdateResource returns a function that will handle a resource update
func UpdateResource(r RESTUpdater, nameFn ResourceNameFunc, objNameFunc ObjectNameFunc, linkFn LinkResourceFunc, codec runtime.Codec, resource string, admit admission.Interface) restful.RouteFunction {
return func(req *restful.Request, res *restful.Response) {
w := res.ResponseWriter
// TODO: we either want to remove timeout or document it (if we document, move timeout out of this function and declare it in api_installer)
timeout := parseTimeout(req.Request.URL.Query().Get("timeout"))
namespace, name, err := nameFn(req)
if err != nil {
notFound(w, req.Request)
return
}
ctx := api.NewContext()
if len(namespace) > 0 {
ctx = api.WithNamespace(ctx, namespace)
}
body, err := readBody(req.Request)
if err != nil {
errorJSON(err, codec, w)
return
}
obj := r.New()
if err := codec.DecodeInto(body, obj); err != nil {
errorJSON(err, codec, w)
return
}
objNamespace, objName, err := objNameFunc(obj)
if err != nil {
errorJSON(err, codec, w)
return
}
if objName != name {
errorJSON(errors.NewBadRequest("the name of the object does not match the name on the URL"), codec, w)
return
}
if len(namespace) > 0 {
if len(objNamespace) > 0 && objNamespace != namespace {
errorJSON(errors.NewBadRequest("the namespace of the object does not match the namespace on the request"), codec, w)
return
}
}
err = admit.Admit(admission.NewAttributesRecord(obj, namespace, resource, "UPDATE"))
if err != nil {
errorJSON(err, codec, w)
return
}
out, err := r.Update(ctx, obj)
if err != nil {
errorJSON(err, codec, w)
return
}
result, err := finishRequest(out, timeout, codec)
if err != nil {
errorJSON(err, codec, w)
return
}
item := result.Object
if err := linkFn(req, item); err != nil {
errorJSON(err, codec, w)
return
}
status := http.StatusOK
if result.Created {
status = http.StatusCreated
}
writeJSON(status, codec, item, w)
}
}
// DeleteResource returns a function that will handle a resource deletion
func DeleteResource(r RESTDeleter, nameFn ResourceNameFunc, linkFn LinkResourceFunc, codec runtime.Codec, resource, kind string, admit admission.Interface) restful.RouteFunction {
return func(req *restful.Request, res *restful.Response) {
w := res.ResponseWriter
// TODO: we either want to remove timeout or document it (if we document, move timeout out of this function and declare it in api_installer)
timeout := parseTimeout(req.Request.URL.Query().Get("timeout"))
namespace, name, err := nameFn(req)
if err != nil {
notFound(w, req.Request)
return
}
ctx := api.NewContext()
if len(namespace) > 0 {
ctx = api.WithNamespace(ctx, namespace)
}
err = admit.Admit(admission.NewAttributesRecord(nil, namespace, resource, "DELETE"))
if err != nil {
errorJSON(err, codec, w)
return
}
out, err := r.Delete(ctx, name)
if err != nil {
errorJSON(err, codec, w)
return
}
result, err := finishRequest(out, timeout, codec)
if err != nil {
errorJSON(err, codec, w)
return
}
// if the RESTDeleter returns a nil object, fill out a status. Callers may return a valid
// object with the response.
item := result.Object
if item == nil {
item = &api.Status{
Status: api.StatusSuccess,
Code: http.StatusOK,
Details: &api.StatusDetails{
ID: name,
Kind: kind,
},
}
}
writeJSON(http.StatusOK, codec, item, w)
}
}
// finishRequest waits for the result channel to close or clear, and writes the appropriate response.
// Any api.Status object returned is considered an "error", which interrupts the normal response flow.
func finishRequest(ch <-chan RESTResult, timeout time.Duration, codec runtime.Codec) (*RESTResult, error) {
select {
case result, ok := <-ch:
if !ok {
// likely programming error
return nil, fmt.Errorf("operation channel closed without returning result")
}
if status, ok := result.Object.(*api.Status); ok {
return nil, errors.FromObject(status)
}
return &result, nil
case <-time.After(timeout):
return nil, errors.NewTimeoutError("request did not complete within allowed duration")
}
}
type linkFunc func(namespace, name string) (path string, query string)
// setSelfLink sets the self link of an object (or the child items in a list) to the base URL of the request
// plus the path and query generated by the provided linkFunc
func setSelfLink(obj runtime.Object, req *http.Request, linker runtime.SelfLinker, fn linkFunc) error {
namespace, err := linker.Namespace(obj)
if err != nil {
return err
}
// we need to add namespace as a query param, if its not in the resource path
if len(namespace) > 0 {
parts := splitPath(req.URL.Path)
if parts[0] != "ns" {
query := newURL.Query()
query.Set("namespace", namespace)
newURL.RawQuery = query.Encode()
}
}
err = h.selfLinker.SetSelfLink(obj, newURL.String())
name, err := linker.Name(obj)
if err != nil {
return err
}
path, query := fn(namespace, name)
newURL := *req.URL
newURL.Path = path
newURL.RawQuery = query
newURL.Fragment = ""
if err := linker.SetSelfLink(obj, newURL.String()); err != nil {
return err
}
if !runtime.IsListType(obj) {
return nil
}
@@ -104,231 +353,9 @@ func (h *RESTHandler) setSelfLink(obj runtime.Object, req *http.Request) error {
return err
}
for i := range items {
if err := h.setSelfLinkAddName(items[i], req); err != nil {
if err := setSelfLink(items[i], req, linker, fn); err != nil {
return err
}
}
return runtime.SetList(obj, items)
}
// Like setSelfLink, but appends the object's name.
func (h *RESTHandler) setSelfLinkAddName(obj runtime.Object, req *http.Request) error {
name, err := h.selfLinker.Name(obj)
if err != nil {
return err
}
namespace, err := h.selfLinker.Namespace(obj)
if err != nil {
return err
}
newURL := *req.URL
newURL.Path = path.Join(h.canonicalPrefix, req.URL.Path, name)
newURL.RawQuery = ""
newURL.Fragment = ""
// we need to add namespace as a query param, if its not in the resource path
if len(namespace) > 0 {
parts := splitPath(req.URL.Path)
if parts[0] != "ns" {
query := newURL.Query()
query.Set("namespace", namespace)
newURL.RawQuery = query.Encode()
}
}
return h.selfLinker.SetSelfLink(obj, newURL.String())
}
// curry adapts either of the self link setting functions into a function appropriate for operation's hook.
func curry(f func(runtime.Object, *http.Request) error, req *http.Request) func(RESTResult) {
return func(obj RESTResult) {
if err := f(obj.Object, req); err != nil {
glog.Errorf("unable to set self link for %#v: %v", obj, err)
}
}
}
// handleRESTStorage is the main dispatcher for a storage object. It switches on the HTTP method, and then
// on path length, according to the following table:
// Method Path Action
// GET /foo list
// GET /foo/bar get 'bar'
// POST /foo create
// PUT /foo/bar update 'bar'
// DELETE /foo/bar delete 'bar'
// Responds with a 404 if the method/pattern doesn't match one of these entries.
// The s accepts several query parameters:
// timeout=<duration> Timeout for synchronous requests
// labels=<label-selector> Used for filtering list operations
// Returns the HTTP status code written to the response.
func (h *RESTHandler) handleRESTStorage(parts []string, req *http.Request, w http.ResponseWriter, storage RESTStorage, namespace, kind string) int {
ctx := api.WithNamespace(api.NewContext(), namespace)
// TODO: Document the timeout query parameter.
timeout := parseTimeout(req.URL.Query().Get("timeout"))
switch req.Method {
case "GET":
switch len(parts) {
case 1:
label, err := labels.ParseSelector(req.URL.Query().Get("labels"))
if err != nil {
return errorJSON(err, h.codec, w)
}
field, err := labels.ParseSelector(req.URL.Query().Get("fields"))
if err != nil {
return errorJSON(err, h.codec, w)
}
lister, ok := storage.(RESTLister)
if !ok {
return errorJSON(errors.NewMethodNotSupported(kind, "list"), h.codec, w)
}
list, err := lister.List(ctx, label, field)
if err != nil {
return errorJSON(err, h.codec, w)
}
if err := h.setSelfLink(list, req); err != nil {
return errorJSON(err, h.codec, w)
}
writeJSON(http.StatusOK, h.codec, list, w)
case 2:
getter, ok := storage.(RESTGetter)
if !ok {
return errorJSON(errors.NewMethodNotSupported(kind, "get"), h.codec, w)
}
item, err := getter.Get(ctx, parts[1])
if err != nil {
return errorJSON(err, h.codec, w)
}
if err := h.setSelfLink(item, req); err != nil {
return errorJSON(err, h.codec, w)
}
writeJSON(http.StatusOK, h.codec, item, w)
default:
notFound(w, req)
return http.StatusNotFound
}
case "POST":
if len(parts) != 1 {
notFound(w, req)
return http.StatusNotFound
}
creater, ok := storage.(RESTCreater)
if !ok {
return errorJSON(errors.NewMethodNotSupported(kind, "create"), h.codec, w)
}
body, err := readBody(req)
if err != nil {
return errorJSON(err, h.codec, w)
}
obj := storage.New()
err = h.codec.DecodeInto(body, obj)
if err != nil {
return errorJSON(err, h.codec, w)
}
// invoke admission control
err = h.admissionControl.Admit(admission.NewAttributesRecord(obj, namespace, parts[0], "CREATE"))
if err != nil {
return errorJSON(err, h.codec, w)
}
out, err := creater.Create(ctx, obj)
if err != nil {
return errorJSON(err, h.codec, w)
}
op := h.createOperation(out, timeout, curry(h.setSelfLinkAddName, req))
return h.finishReq(op, req, w)
case "DELETE":
if len(parts) != 2 {
notFound(w, req)
return http.StatusNotFound
}
deleter, ok := storage.(RESTDeleter)
if !ok {
return errorJSON(errors.NewMethodNotSupported(kind, "delete"), h.codec, w)
}
// invoke admission control
err := h.admissionControl.Admit(admission.NewAttributesRecord(nil, namespace, parts[0], "DELETE"))
if err != nil {
return errorJSON(err, h.codec, w)
}
out, err := deleter.Delete(ctx, parts[1])
if err != nil {
return errorJSON(err, h.codec, w)
}
op := h.createOperation(out, timeout, nil)
return h.finishReq(op, req, w)
case "PUT":
if len(parts) != 2 {
notFound(w, req)
return http.StatusNotFound
}
updater, ok := storage.(RESTUpdater)
if !ok {
return errorJSON(errors.NewMethodNotSupported(kind, "create"), h.codec, w)
}
body, err := readBody(req)
if err != nil {
return errorJSON(err, h.codec, w)
}
obj := storage.New()
err = h.codec.DecodeInto(body, obj)
if err != nil {
return errorJSON(err, h.codec, w)
}
// invoke admission control
err = h.admissionControl.Admit(admission.NewAttributesRecord(obj, namespace, parts[0], "UPDATE"))
if err != nil {
return errorJSON(err, h.codec, w)
}
out, err := updater.Update(ctx, obj)
if err != nil {
return errorJSON(err, h.codec, w)
}
op := h.createOperation(out, timeout, curry(h.setSelfLink, req))
return h.finishReq(op, req, w)
default:
notFound(w, req)
return http.StatusNotFound
}
return http.StatusOK
}
// createOperation creates an operation to process a channel response.
func (h *RESTHandler) createOperation(out <-chan RESTResult, timeout time.Duration, onReceive func(RESTResult)) *Operation {
op := h.ops.NewOperation(out, onReceive)
op.WaitFor(timeout)
return op
}
// finishReq finishes up a request, waiting until the operation finishes or, after a timeout, creating an
// Operation to receive the result and returning its ID down the writer.
// Returns the HTTP status code written to the response.
func (h *RESTHandler) finishReq(op *Operation, req *http.Request, w http.ResponseWriter) int {
result, complete := op.StatusOrResult()
obj := result.Object
var status int
if complete {
status = http.StatusOK
if result.Created {
status = http.StatusCreated
}
switch stat := obj.(type) {
case *api.Status:
if stat.Code != 0 {
status = stat.Code
}
}
} else {
status = http.StatusAccepted
}
writeJSON(status, h.codec, obj, w)
return status
}