Add Connecter storage interface to API server

Connecter is a type of resource that connects a request
coming from the client to an internal request within the cluster.
It will be used for exposing a pod's proxy, exec, and portforward
endpoints.
This commit is contained in:
Cesar Wong 2015-04-14 10:57:00 -04:00
parent a3f5dfd0e2
commit 49abf9133e
4 changed files with 288 additions and 10 deletions

View File

@ -171,6 +171,32 @@ type Redirector interface {
ResourceLocation(ctx api.Context, id string) (remoteLocation *url.URL, transport http.RoundTripper, err error)
}
// ConnectHandler is a handler for HTTP connection requests. It extends the standard
// http.Handler interface by adding a method that returns an error object if an error
// occurred during the handling of the request.
type ConnectHandler interface {
http.Handler
// RequestError returns an error if one occurred during handling of an HTTP request
RequestError() error
}
// Connecter is a storage object that responds to a connection request
type Connecter interface {
// Connect returns a ConnectHandler that will handle the request/response for a request
Connect(ctx api.Context, id string, options runtime.Object) (ConnectHandler, error)
// NewConnectOptions returns an empty options object that will be used to pass
// options to the Connect method. If nil, then a nil options object is passed to
// Connect. It may return a bool and a string. If true, the value of the request
// path below the object will be included as the named string in the serialization
// of the runtime object.
NewConnectOptions() (runtime.Object, bool, string)
// ConnectMethods returns the list of HTTP methods handled by Connect
ConnectMethods() []string
}
// ResourceStreamer is an interface implemented by objects that prefer to be streamed from the server
// instead of decoded directly.
type ResourceStreamer interface {

View File

@ -139,6 +139,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
patcher, isPatcher := storage.(rest.Patcher)
watcher, isWatcher := storage.(rest.Watcher)
_, isRedirector := storage.(rest.Redirector)
connecter, isConnecter := storage.(rest.Connecter)
storageMeta, isMetadata := storage.(rest.StorageMetadata)
if !isMetadata {
storageMeta = defaultStorageMetadata{}
@ -193,6 +194,22 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
isGetter = true
}
var (
connectOptions runtime.Object
connectOptionsKind string
connectSubpath bool
connectSubpathKey string
)
if isConnecter {
connectOptions, connectSubpath, connectSubpathKey = connecter.NewConnectOptions()
if connectOptions != nil {
_, connectOptionsKind, err = a.group.Typer.ObjectVersionAndKind(connectOptions)
if err != nil {
return err
}
}
}
var ctxFn ContextFunc
ctxFn = func(req *restful.Request) api.Context {
if ctx, ok := context.Get(req.Request); ok {
@ -238,6 +255,8 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
actions = appendIf(actions, action{"REDIRECT", "redirect/" + itemPath, nameParams, namer}, isRedirector)
actions = appendIf(actions, action{"PROXY", "proxy/" + itemPath + "/{path:*}", proxyParams, namer}, isRedirector)
actions = appendIf(actions, action{"PROXY", "proxy/" + itemPath, nameParams, namer}, isRedirector)
actions = appendIf(actions, action{"CONNECT", itemPath, nameParams, namer}, isConnecter)
actions = appendIf(actions, action{"CONNECT", itemPath + "/{path:*}", nameParams, namer}, isConnecter && connectSubpath)
} else {
// v1beta3 format with namespace in path
@ -275,6 +294,8 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
actions = appendIf(actions, action{"REDIRECT", "redirect/" + itemPath, nameParams, namer}, isRedirector)
actions = appendIf(actions, action{"PROXY", "proxy/" + itemPath + "/{path:*}", proxyParams, namer}, isRedirector)
actions = appendIf(actions, action{"PROXY", "proxy/" + itemPath, nameParams, namer}, isRedirector)
actions = appendIf(actions, action{"CONNECT", itemPath, nameParams, namer}, isConnecter)
actions = appendIf(actions, action{"CONNECT", itemPath + "/{path:*}", nameParams, namer}, isConnecter && connectSubpath)
// list across namespace.
namer = scopeNaming{scope, a.group.Linker, gpath.Join(a.prefix, itemPath), true}
@ -315,6 +336,8 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
actions = appendIf(actions, action{"REDIRECT", "redirect/" + itemPath, nameParams, namer}, isRedirector)
actions = appendIf(actions, action{"PROXY", "proxy/" + itemPath + "/{path:*}", proxyParams, namer}, isRedirector)
actions = appendIf(actions, action{"PROXY", "proxy/" + itemPath, nameParams, namer}, isRedirector)
actions = appendIf(actions, action{"CONNECT", itemPath, nameParams, namer}, isConnecter)
actions = appendIf(actions, action{"CONNECT", itemPath + "/{path:*}", nameParams, namer}, isConnecter && connectSubpath)
}
}
@ -480,6 +503,23 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
addProxyRoute(ws, "PUT", a.prefix, action.Path, proxyHandler, kind, resource, action.Params)
addProxyRoute(ws, "POST", a.prefix, action.Path, proxyHandler, kind, resource, action.Params)
addProxyRoute(ws, "DELETE", a.prefix, action.Path, proxyHandler, kind, resource, action.Params)
case "CONNECT":
for _, method := range connecter.ConnectMethods() {
route := ws.Method(method).Path(action.Path).
To(ConnectResource(connecter, reqScope, connectOptionsKind, connectSubpath, connectSubpathKey)).
Filter(m).
Doc("connect " + method + " requests to " + kind).
Operation("connect" + method + kind).
Produces("*/*").
Consumes("*/*").
Writes("string")
if connectOptions != nil {
if err := addObjectParams(ws, route, connectOptions); err != nil {
return err
}
}
ws.Route(route)
}
default:
return fmt.Errorf("unrecognized action verb: %s", action.Verb)
}

View File

@ -348,6 +348,19 @@ func (s *SimpleStream) InputStream(version, accept string) (io.ReadCloser, bool,
return s, false, s.contentType, s.err
}
type SimpleConnectHandler struct {
response string
err error
}
func (h *SimpleConnectHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
w.Write([]byte(h.response))
}
func (h *SimpleConnectHandler) RequestError() error {
return h.err
}
func (storage *SimpleRESTStorage) Get(ctx api.Context, id string) (runtime.Object, error) {
storage.checkContext(ctx)
if id == "binary" {
@ -443,6 +456,39 @@ func (storage *SimpleRESTStorage) ResourceLocation(ctx api.Context, id string) (
return &locationCopy, nil, nil
}
// Implement Connecter
type ConnecterRESTStorage struct {
connectHandler rest.ConnectHandler
emptyConnectOptions runtime.Object
receivedConnectOptions runtime.Object
receivedID string
takesPath string
}
// Implement Connecter
var _ = rest.Connecter(&ConnecterRESTStorage{})
func (s *ConnecterRESTStorage) New() runtime.Object {
return &Simple{}
}
func (s *ConnecterRESTStorage) Connect(ctx api.Context, id string, options runtime.Object) (rest.ConnectHandler, error) {
s.receivedConnectOptions = options
s.receivedID = id
return s.connectHandler, nil
}
func (s *ConnecterRESTStorage) ConnectMethods() []string {
return []string{"GET", "POST", "PUT", "DELETE"}
}
func (s *ConnecterRESTStorage) NewConnectOptions() (runtime.Object, bool, string) {
if len(s.takesPath) > 0 {
return s.emptyConnectOptions, true, s.takesPath
}
return s.emptyConnectOptions, false, ""
}
type LegacyRESTStorage struct {
*SimpleRESTStorage
}
@ -1108,6 +1154,135 @@ func TestGetMissing(t *testing.T) {
}
}
func TestConnect(t *testing.T) {
responseText := "Hello World"
itemID := "theID"
connectStorage := &ConnecterRESTStorage{
connectHandler: &SimpleConnectHandler{
response: responseText,
},
}
storage := map[string]rest.Storage{
"simple/connect": connectStorage,
}
handler := handle(storage)
server := httptest.NewServer(handler)
defer server.Close()
resp, err := http.Get(server.URL + "/api/version/simple/" + itemID + "/connect")
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if resp.StatusCode != http.StatusOK {
t.Errorf("unexpected response: %#v", resp)
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if connectStorage.receivedID != itemID {
t.Errorf("Unexpected item id. Expected: %s. Actual: %s.", itemID, connectStorage.receivedID)
}
if string(body) != responseText {
t.Errorf("Unexpected response. Expected: %s. Actual: %s.", responseText, string(body))
}
}
func TestConnectWithOptions(t *testing.T) {
responseText := "Hello World"
itemID := "theID"
connectStorage := &ConnecterRESTStorage{
connectHandler: &SimpleConnectHandler{
response: responseText,
},
emptyConnectOptions: &SimpleGetOptions{},
}
storage := map[string]rest.Storage{
"simple/connect": connectStorage,
}
handler := handle(storage)
server := httptest.NewServer(handler)
defer server.Close()
resp, err := http.Get(server.URL + "/api/version/simple/" + itemID + "/connect?param1=value1&param2=value2")
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if resp.StatusCode != http.StatusOK {
t.Errorf("unexpected response: %#v", resp)
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if connectStorage.receivedID != itemID {
t.Errorf("Unexpected item id. Expected: %s. Actual: %s.", itemID, connectStorage.receivedID)
}
if string(body) != responseText {
t.Errorf("Unexpected response. Expected: %s. Actual: %s.", responseText, string(body))
}
opts, ok := connectStorage.receivedConnectOptions.(*SimpleGetOptions)
if !ok {
t.Errorf("Unexpected options type: %#v", connectStorage.receivedConnectOptions)
}
if opts.Param1 != "value1" && opts.Param2 != "value2" {
t.Errorf("Unexpected options value: %#v", opts)
}
}
func TestConnectWithOptionsAndPath(t *testing.T) {
responseText := "Hello World"
itemID := "theID"
testPath := "a/b/c/def"
connectStorage := &ConnecterRESTStorage{
connectHandler: &SimpleConnectHandler{
response: responseText,
},
emptyConnectOptions: &SimpleGetOptions{},
takesPath: "atAPath",
}
storage := map[string]rest.Storage{
"simple/connect": connectStorage,
}
handler := handle(storage)
server := httptest.NewServer(handler)
defer server.Close()
resp, err := http.Get(server.URL + "/api/version/simple/" + itemID + "/connect/" + testPath + "?param1=value1&param2=value2")
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if resp.StatusCode != http.StatusOK {
t.Errorf("unexpected response: %#v", resp)
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if connectStorage.receivedID != itemID {
t.Errorf("Unexpected item id. Expected: %s. Actual: %s.", itemID, connectStorage.receivedID)
}
if string(body) != responseText {
t.Errorf("Unexpected response. Expected: %s. Actual: %s.", responseText, string(body))
}
opts, ok := connectStorage.receivedConnectOptions.(*SimpleGetOptions)
if !ok {
t.Errorf("Unexpected options type: %#v", connectStorage.receivedConnectOptions)
}
if opts.Param1 != "value1" && opts.Param2 != "value2" {
t.Errorf("Unexpected options value: %#v", opts)
}
if opts.Path != testPath {
t.Errorf("Unexpected path value. Expected: %s. Actual: %s.", testPath, opts.Path)
}
}
func TestDelete(t *testing.T) {
storage := map[string]rest.Storage{}
simpleStorage := SimpleRESTStorage{}

View File

@ -116,16 +116,7 @@ func GetResource(r rest.Getter, scope RequestScope) restful.RouteFunction {
func GetResourceWithOptions(r rest.GetterWithOptions, scope RequestScope, getOptionsKind string, subpath bool, subpathKey string) restful.RouteFunction {
return getResourceHandler(scope,
func(ctx api.Context, name string, req *restful.Request) (runtime.Object, error) {
query := req.Request.URL.Query()
if subpath {
newQuery := make(url.Values)
for k, v := range query {
newQuery[k] = v
}
newQuery[subpathKey] = []string{req.PathParameter("path")}
query = newQuery
}
opts, err := queryToObject(query, scope, getOptionsKind)
opts, err := getRequestOptions(req, scope, getOptionsKind, subpath, subpathKey)
if err != nil {
return nil, err
}
@ -133,6 +124,52 @@ func GetResourceWithOptions(r rest.GetterWithOptions, scope RequestScope, getOpt
})
}
func getRequestOptions(req *restful.Request, scope RequestScope, kind string, subpath bool, subpathKey string) (runtime.Object, error) {
if len(kind) == 0 {
return nil, nil
}
query := req.Request.URL.Query()
if subpath {
newQuery := make(url.Values)
for k, v := range query {
newQuery[k] = v
}
newQuery[subpathKey] = []string{req.PathParameter("path")}
query = newQuery
}
return queryToObject(query, scope, kind)
}
// ConnectResource returns a function that handles a connect request on a rest.Storage object.
func ConnectResource(connecter rest.Connecter, scope RequestScope, connectOptionsKind string, subpath bool, subpathKey string) restful.RouteFunction {
return func(req *restful.Request, res *restful.Response) {
w := res.ResponseWriter
namespace, name, err := scope.Namer.Name(req)
if err != nil {
errorJSON(err, scope.Codec, w)
return
}
ctx := scope.ContextFunc(req)
ctx = api.WithNamespace(ctx, namespace)
opts, err := getRequestOptions(req, scope, connectOptionsKind, subpath, subpathKey)
if err != nil {
errorJSON(err, scope.Codec, w)
return
}
handler, err := connecter.Connect(ctx, name, opts)
if err != nil {
errorJSON(err, scope.Codec, w)
return
}
handler.ServeHTTP(w, req.Request)
err = handler.RequestError()
if err != nil {
errorJSON(err, scope.Codec, w)
return
}
}
}
// ListResource returns a function that handles retrieving a list of resources from a rest.Storage object.
func ListResource(r rest.Lister, rw rest.Watcher, scope RequestScope, forceWatch bool) restful.RouteFunction {
return func(req *restful.Request, res *restful.Response) {