From 49abf9133e7e48786ddab58ab166d90767fca460 Mon Sep 17 00:00:00 2001 From: Cesar Wong Date: Tue, 14 Apr 2015 10:57:00 -0400 Subject: [PATCH] Add Connecter storage interface to API server Connecter is a type of resource that connects a request coming from the client to an internal request within the cluster. It will be used for exposing a pod's proxy, exec, and portforward endpoints. --- pkg/api/rest/rest.go | 26 +++++ pkg/apiserver/api_installer.go | 40 ++++++++ pkg/apiserver/apiserver_test.go | 175 ++++++++++++++++++++++++++++++++ pkg/apiserver/resthandler.go | 57 +++++++++-- 4 files changed, 288 insertions(+), 10 deletions(-) diff --git a/pkg/api/rest/rest.go b/pkg/api/rest/rest.go index 069ed3c2f8c..967b92eade8 100644 --- a/pkg/api/rest/rest.go +++ b/pkg/api/rest/rest.go @@ -171,6 +171,32 @@ type Redirector interface { ResourceLocation(ctx api.Context, id string) (remoteLocation *url.URL, transport http.RoundTripper, err error) } +// ConnectHandler is a handler for HTTP connection requests. It extends the standard +// http.Handler interface by adding a method that returns an error object if an error +// occurred during the handling of the request. +type ConnectHandler interface { + http.Handler + + // RequestError returns an error if one occurred during handling of an HTTP request + RequestError() error +} + +// Connecter is a storage object that responds to a connection request +type Connecter interface { + // Connect returns a ConnectHandler that will handle the request/response for a request + Connect(ctx api.Context, id string, options runtime.Object) (ConnectHandler, error) + + // NewConnectOptions returns an empty options object that will be used to pass + // options to the Connect method. If nil, then a nil options object is passed to + // Connect. It may return a bool and a string. If true, the value of the request + // path below the object will be included as the named string in the serialization + // of the runtime object. + NewConnectOptions() (runtime.Object, bool, string) + + // ConnectMethods returns the list of HTTP methods handled by Connect + ConnectMethods() []string +} + // ResourceStreamer is an interface implemented by objects that prefer to be streamed from the server // instead of decoded directly. type ResourceStreamer interface { diff --git a/pkg/apiserver/api_installer.go b/pkg/apiserver/api_installer.go index 10ff4c90c5f..4215ef87418 100644 --- a/pkg/apiserver/api_installer.go +++ b/pkg/apiserver/api_installer.go @@ -139,6 +139,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag patcher, isPatcher := storage.(rest.Patcher) watcher, isWatcher := storage.(rest.Watcher) _, isRedirector := storage.(rest.Redirector) + connecter, isConnecter := storage.(rest.Connecter) storageMeta, isMetadata := storage.(rest.StorageMetadata) if !isMetadata { storageMeta = defaultStorageMetadata{} @@ -193,6 +194,22 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag isGetter = true } + var ( + connectOptions runtime.Object + connectOptionsKind string + connectSubpath bool + connectSubpathKey string + ) + if isConnecter { + connectOptions, connectSubpath, connectSubpathKey = connecter.NewConnectOptions() + if connectOptions != nil { + _, connectOptionsKind, err = a.group.Typer.ObjectVersionAndKind(connectOptions) + if err != nil { + return err + } + } + } + var ctxFn ContextFunc ctxFn = func(req *restful.Request) api.Context { if ctx, ok := context.Get(req.Request); ok { @@ -238,6 +255,8 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag actions = appendIf(actions, action{"REDIRECT", "redirect/" + itemPath, nameParams, namer}, isRedirector) actions = appendIf(actions, action{"PROXY", "proxy/" + itemPath + "/{path:*}", proxyParams, namer}, isRedirector) actions = appendIf(actions, action{"PROXY", "proxy/" + itemPath, nameParams, namer}, isRedirector) + actions = appendIf(actions, action{"CONNECT", itemPath, nameParams, namer}, isConnecter) + actions = appendIf(actions, action{"CONNECT", itemPath + "/{path:*}", nameParams, namer}, isConnecter && connectSubpath) } else { // v1beta3 format with namespace in path @@ -275,6 +294,8 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag actions = appendIf(actions, action{"REDIRECT", "redirect/" + itemPath, nameParams, namer}, isRedirector) actions = appendIf(actions, action{"PROXY", "proxy/" + itemPath + "/{path:*}", proxyParams, namer}, isRedirector) actions = appendIf(actions, action{"PROXY", "proxy/" + itemPath, nameParams, namer}, isRedirector) + actions = appendIf(actions, action{"CONNECT", itemPath, nameParams, namer}, isConnecter) + actions = appendIf(actions, action{"CONNECT", itemPath + "/{path:*}", nameParams, namer}, isConnecter && connectSubpath) // list across namespace. namer = scopeNaming{scope, a.group.Linker, gpath.Join(a.prefix, itemPath), true} @@ -315,6 +336,8 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag actions = appendIf(actions, action{"REDIRECT", "redirect/" + itemPath, nameParams, namer}, isRedirector) actions = appendIf(actions, action{"PROXY", "proxy/" + itemPath + "/{path:*}", proxyParams, namer}, isRedirector) actions = appendIf(actions, action{"PROXY", "proxy/" + itemPath, nameParams, namer}, isRedirector) + actions = appendIf(actions, action{"CONNECT", itemPath, nameParams, namer}, isConnecter) + actions = appendIf(actions, action{"CONNECT", itemPath + "/{path:*}", nameParams, namer}, isConnecter && connectSubpath) } } @@ -480,6 +503,23 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag addProxyRoute(ws, "PUT", a.prefix, action.Path, proxyHandler, kind, resource, action.Params) addProxyRoute(ws, "POST", a.prefix, action.Path, proxyHandler, kind, resource, action.Params) addProxyRoute(ws, "DELETE", a.prefix, action.Path, proxyHandler, kind, resource, action.Params) + case "CONNECT": + for _, method := range connecter.ConnectMethods() { + route := ws.Method(method).Path(action.Path). + To(ConnectResource(connecter, reqScope, connectOptionsKind, connectSubpath, connectSubpathKey)). + Filter(m). + Doc("connect " + method + " requests to " + kind). + Operation("connect" + method + kind). + Produces("*/*"). + Consumes("*/*"). + Writes("string") + if connectOptions != nil { + if err := addObjectParams(ws, route, connectOptions); err != nil { + return err + } + } + ws.Route(route) + } default: return fmt.Errorf("unrecognized action verb: %s", action.Verb) } diff --git a/pkg/apiserver/apiserver_test.go b/pkg/apiserver/apiserver_test.go index a15a4dbc709..fc2d2ebc8ff 100644 --- a/pkg/apiserver/apiserver_test.go +++ b/pkg/apiserver/apiserver_test.go @@ -348,6 +348,19 @@ func (s *SimpleStream) InputStream(version, accept string) (io.ReadCloser, bool, return s, false, s.contentType, s.err } +type SimpleConnectHandler struct { + response string + err error +} + +func (h *SimpleConnectHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { + w.Write([]byte(h.response)) +} + +func (h *SimpleConnectHandler) RequestError() error { + return h.err +} + func (storage *SimpleRESTStorage) Get(ctx api.Context, id string) (runtime.Object, error) { storage.checkContext(ctx) if id == "binary" { @@ -443,6 +456,39 @@ func (storage *SimpleRESTStorage) ResourceLocation(ctx api.Context, id string) ( return &locationCopy, nil, nil } +// Implement Connecter +type ConnecterRESTStorage struct { + connectHandler rest.ConnectHandler + emptyConnectOptions runtime.Object + receivedConnectOptions runtime.Object + receivedID string + takesPath string +} + +// Implement Connecter +var _ = rest.Connecter(&ConnecterRESTStorage{}) + +func (s *ConnecterRESTStorage) New() runtime.Object { + return &Simple{} +} + +func (s *ConnecterRESTStorage) Connect(ctx api.Context, id string, options runtime.Object) (rest.ConnectHandler, error) { + s.receivedConnectOptions = options + s.receivedID = id + return s.connectHandler, nil +} + +func (s *ConnecterRESTStorage) ConnectMethods() []string { + return []string{"GET", "POST", "PUT", "DELETE"} +} + +func (s *ConnecterRESTStorage) NewConnectOptions() (runtime.Object, bool, string) { + if len(s.takesPath) > 0 { + return s.emptyConnectOptions, true, s.takesPath + } + return s.emptyConnectOptions, false, "" +} + type LegacyRESTStorage struct { *SimpleRESTStorage } @@ -1108,6 +1154,135 @@ func TestGetMissing(t *testing.T) { } } +func TestConnect(t *testing.T) { + responseText := "Hello World" + itemID := "theID" + connectStorage := &ConnecterRESTStorage{ + connectHandler: &SimpleConnectHandler{ + response: responseText, + }, + } + storage := map[string]rest.Storage{ + "simple/connect": connectStorage, + } + handler := handle(storage) + server := httptest.NewServer(handler) + defer server.Close() + + resp, err := http.Get(server.URL + "/api/version/simple/" + itemID + "/connect") + + if err != nil { + t.Errorf("unexpected error: %v", err) + } + if resp.StatusCode != http.StatusOK { + t.Errorf("unexpected response: %#v", resp) + } + defer resp.Body.Close() + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if connectStorage.receivedID != itemID { + t.Errorf("Unexpected item id. Expected: %s. Actual: %s.", itemID, connectStorage.receivedID) + } + if string(body) != responseText { + t.Errorf("Unexpected response. Expected: %s. Actual: %s.", responseText, string(body)) + } +} + +func TestConnectWithOptions(t *testing.T) { + responseText := "Hello World" + itemID := "theID" + connectStorage := &ConnecterRESTStorage{ + connectHandler: &SimpleConnectHandler{ + response: responseText, + }, + emptyConnectOptions: &SimpleGetOptions{}, + } + storage := map[string]rest.Storage{ + "simple/connect": connectStorage, + } + handler := handle(storage) + server := httptest.NewServer(handler) + defer server.Close() + + resp, err := http.Get(server.URL + "/api/version/simple/" + itemID + "/connect?param1=value1¶m2=value2") + + if err != nil { + t.Errorf("unexpected error: %v", err) + } + if resp.StatusCode != http.StatusOK { + t.Errorf("unexpected response: %#v", resp) + } + defer resp.Body.Close() + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if connectStorage.receivedID != itemID { + t.Errorf("Unexpected item id. Expected: %s. Actual: %s.", itemID, connectStorage.receivedID) + } + if string(body) != responseText { + t.Errorf("Unexpected response. Expected: %s. Actual: %s.", responseText, string(body)) + } + opts, ok := connectStorage.receivedConnectOptions.(*SimpleGetOptions) + if !ok { + t.Errorf("Unexpected options type: %#v", connectStorage.receivedConnectOptions) + } + if opts.Param1 != "value1" && opts.Param2 != "value2" { + t.Errorf("Unexpected options value: %#v", opts) + } +} + +func TestConnectWithOptionsAndPath(t *testing.T) { + responseText := "Hello World" + itemID := "theID" + testPath := "a/b/c/def" + connectStorage := &ConnecterRESTStorage{ + connectHandler: &SimpleConnectHandler{ + response: responseText, + }, + emptyConnectOptions: &SimpleGetOptions{}, + takesPath: "atAPath", + } + storage := map[string]rest.Storage{ + "simple/connect": connectStorage, + } + handler := handle(storage) + server := httptest.NewServer(handler) + defer server.Close() + + resp, err := http.Get(server.URL + "/api/version/simple/" + itemID + "/connect/" + testPath + "?param1=value1¶m2=value2") + + if err != nil { + t.Errorf("unexpected error: %v", err) + } + if resp.StatusCode != http.StatusOK { + t.Errorf("unexpected response: %#v", resp) + } + defer resp.Body.Close() + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if connectStorage.receivedID != itemID { + t.Errorf("Unexpected item id. Expected: %s. Actual: %s.", itemID, connectStorage.receivedID) + } + if string(body) != responseText { + t.Errorf("Unexpected response. Expected: %s. Actual: %s.", responseText, string(body)) + } + opts, ok := connectStorage.receivedConnectOptions.(*SimpleGetOptions) + if !ok { + t.Errorf("Unexpected options type: %#v", connectStorage.receivedConnectOptions) + } + if opts.Param1 != "value1" && opts.Param2 != "value2" { + t.Errorf("Unexpected options value: %#v", opts) + } + if opts.Path != testPath { + t.Errorf("Unexpected path value. Expected: %s. Actual: %s.", testPath, opts.Path) + } +} + func TestDelete(t *testing.T) { storage := map[string]rest.Storage{} simpleStorage := SimpleRESTStorage{} diff --git a/pkg/apiserver/resthandler.go b/pkg/apiserver/resthandler.go index e92f7b30a83..e3828df7df6 100644 --- a/pkg/apiserver/resthandler.go +++ b/pkg/apiserver/resthandler.go @@ -116,16 +116,7 @@ func GetResource(r rest.Getter, scope RequestScope) restful.RouteFunction { func GetResourceWithOptions(r rest.GetterWithOptions, scope RequestScope, getOptionsKind string, subpath bool, subpathKey string) restful.RouteFunction { return getResourceHandler(scope, func(ctx api.Context, name string, req *restful.Request) (runtime.Object, error) { - query := req.Request.URL.Query() - if subpath { - newQuery := make(url.Values) - for k, v := range query { - newQuery[k] = v - } - newQuery[subpathKey] = []string{req.PathParameter("path")} - query = newQuery - } - opts, err := queryToObject(query, scope, getOptionsKind) + opts, err := getRequestOptions(req, scope, getOptionsKind, subpath, subpathKey) if err != nil { return nil, err } @@ -133,6 +124,52 @@ func GetResourceWithOptions(r rest.GetterWithOptions, scope RequestScope, getOpt }) } +func getRequestOptions(req *restful.Request, scope RequestScope, kind string, subpath bool, subpathKey string) (runtime.Object, error) { + if len(kind) == 0 { + return nil, nil + } + query := req.Request.URL.Query() + if subpath { + newQuery := make(url.Values) + for k, v := range query { + newQuery[k] = v + } + newQuery[subpathKey] = []string{req.PathParameter("path")} + query = newQuery + } + return queryToObject(query, scope, kind) +} + +// ConnectResource returns a function that handles a connect request on a rest.Storage object. +func ConnectResource(connecter rest.Connecter, scope RequestScope, connectOptionsKind string, subpath bool, subpathKey string) restful.RouteFunction { + return func(req *restful.Request, res *restful.Response) { + w := res.ResponseWriter + namespace, name, err := scope.Namer.Name(req) + if err != nil { + errorJSON(err, scope.Codec, w) + return + } + ctx := scope.ContextFunc(req) + ctx = api.WithNamespace(ctx, namespace) + opts, err := getRequestOptions(req, scope, connectOptionsKind, subpath, subpathKey) + if err != nil { + errorJSON(err, scope.Codec, w) + return + } + handler, err := connecter.Connect(ctx, name, opts) + if err != nil { + errorJSON(err, scope.Codec, w) + return + } + handler.ServeHTTP(w, req.Request) + err = handler.RequestError() + if err != nil { + errorJSON(err, scope.Codec, w) + return + } + } +} + // ListResource returns a function that handles retrieving a list of resources from a rest.Storage object. func ListResource(r rest.Lister, rw rest.Watcher, scope RequestScope, forceWatch bool) restful.RouteFunction { return func(req *restful.Request, res *restful.Response) {