From efc7f86baf77c722b17da2a1d9f70bd11c0b8a80 Mon Sep 17 00:00:00 2001 From: Cesar Wong Date: Mon, 6 Apr 2015 12:58:00 -0400 Subject: [PATCH] Add GetterWithOptions and allow stream flushing In addition to Getter interface, API Installer now supports a GetterWithOptions interface that takes an additional options object when getting a resource. A flag is now returned from rest.ResourceStreamer that indicates whether the streamed response should be flushed when written back to the client. This is to support log streaming. --- pkg/api/rest/rest.go | 27 ++++++++-- pkg/apiserver/api_installer.go | 25 ++++++++- pkg/apiserver/apiserver.go | 14 ++++- pkg/apiserver/apiserver_test.go | 95 +++++++++++++++++++++++++++++++-- pkg/apiserver/resthandler.go | 31 +++++++++-- 5 files changed, 176 insertions(+), 16 deletions(-) diff --git a/pkg/api/rest/rest.go b/pkg/api/rest/rest.go index 14e434c7749..65ee1c81b25 100644 --- a/pkg/api/rest/rest.go +++ b/pkg/api/rest/rest.go @@ -28,7 +28,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" ) -// RESTStorage is a generic interface for RESTful storage services. +// Storage is a generic interface for RESTful storage services. // Resources which are exported to the RESTful API of apiserver need to implement this interface. It is expected // that objects may implement any of the below interfaces. type Storage interface { @@ -54,6 +54,21 @@ type Getter interface { Get(ctx api.Context, name string) (runtime.Object, error) } +// GetterWithOptions is an object that retrieve a named RESTful resource and takes +// additional options on the get request +type GetterWithOptions interface { + // Get finds a resource in the storage by name and returns it. + // Although it can return an arbitrary error value, IsNotFound(err) is true for the + // returned error value err when the specified resource is not found. + // The options object passed to it is of the same type returned by the NewGetOptions + // method. + Get(ctx api.Context, name string, options runtime.Object) (runtime.Object, error) + + // NewGetOptions returns an empty options object that will be used to pass + // options to the Get method. + NewGetOptions() runtime.Object +} + // Deleter is an object that can delete a named RESTful resource. type Deleter interface { // Delete finds a resource in the storage and deletes it. @@ -119,6 +134,7 @@ type CreaterUpdater interface { // CreaterUpdater must satisfy the Updater interface. var _ Updater = CreaterUpdater(nil) +// Patcher is a storage object that supports both get and update. type Patcher interface { Getter Updater @@ -153,11 +169,12 @@ type Redirector interface { // ResourceStreamer is an interface implemented by objects that prefer to be streamed from the server // instead of decoded directly. type ResourceStreamer interface { - // InputStream should return an io.Reader if the provided object supports streaming. The desired + // InputStream should return an io.ReadCloser if the provided object supports streaming. The desired // api version and a accept header (may be empty) are passed to the call. If no error occurs, - // the caller may return a content type string with the reader that indicates the type of the - // stream. - InputStream(apiVersion, acceptHeader string) (io.ReadCloser, string, error) + // the caller may return a flag indicating whether the result should be flushed as writes occur + // and a content type string that indicates the type of the stream. + // If a null stream is returned, a StatusNoContent response wil be generated. + InputStream(apiVersion, acceptHeader string) (stream io.ReadCloser, flush bool, mimeType string, err error) } // StorageMetadata is an optional interface that callers can implement to provide additional diff --git a/pkg/apiserver/api_installer.go b/pkg/apiserver/api_installer.go index 25d73fbb3b7..92c861d5624 100644 --- a/pkg/apiserver/api_installer.go +++ b/pkg/apiserver/api_installer.go @@ -130,6 +130,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag creater, isCreater := storage.(rest.Creater) lister, isLister := storage.(rest.Lister) getter, isGetter := storage.(rest.Getter) + getterWithOptions, isGetterWithOptions := storage.(rest.GetterWithOptions) deleter, isDeleter := storage.(rest.Deleter) gracefulDeleter, isGracefulDeleter := storage.(rest.GracefulDeleter) updater, isUpdater := storage.(rest.Updater) @@ -170,6 +171,17 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag gracefulDeleter = rest.GracefulDeleteAdapter{deleter} } + var getOptions runtime.Object + var getOptionsKind string + if isGetterWithOptions { + getOptions = getterWithOptions.NewGetOptions() + _, getOptionsKind, err = a.group.Typer.ObjectVersionAndKind(getOptions) + if err != nil { + return err + } + isGetter = true + } + var ctxFn ContextFunc ctxFn = func(req *restful.Request) api.Context { if ctx, ok := context.Get(req.Request); ok { @@ -316,12 +328,23 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag m := monitorFilter(action.Verb, resource) switch action.Verb { case "GET": // Get a resource. - route := ws.GET(action.Path).To(GetResource(getter, reqScope)). + var handler restful.RouteFunction + if isGetterWithOptions { + handler = GetResourceWithOptions(getterWithOptions, reqScope, getOptionsKind) + } else { + handler = GetResource(getter, reqScope) + } + route := ws.GET(action.Path).To(handler). Filter(m). Doc("read the specified " + kind). Operation("read" + kind). Produces(append(storageMeta.ProducesMIMETypes(action.Verb), "application/json")...). Writes(versionedObject) + if isGetterWithOptions { + if err := addObjectParams(ws, route, getOptions); err != nil { + return err + } + } addParams(route, action.Params) ws.Route(route) case "LIST": // List all resources of a kind. diff --git a/pkg/apiserver/apiserver.go b/pkg/apiserver/apiserver.go index a3d0b56976d..bcfc1551cfd 100644 --- a/pkg/apiserver/apiserver.go +++ b/pkg/apiserver/apiserver.go @@ -36,6 +36,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/util/errors" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util/flushwriter" "github.com/GoogleCloudPlatform/kubernetes/pkg/version" "github.com/emicklei/go-restful" @@ -204,18 +205,27 @@ func APIVersionHandler(versions ...string) restful.RouteFunction { // be "application/octet-stream". All other objects are sent to standard JSON serialization. func write(statusCode int, apiVersion string, codec runtime.Codec, object runtime.Object, w http.ResponseWriter, req *http.Request) { if stream, ok := object.(rest.ResourceStreamer); ok { - out, contentType, err := stream.InputStream(apiVersion, req.Header.Get("Accept")) + out, flush, contentType, err := stream.InputStream(apiVersion, req.Header.Get("Accept")) if err != nil { errorJSONFatal(err, codec, w) return } + if out == nil { + // No output provided - return StatusNoContent + w.WriteHeader(http.StatusNoContent) + return + } defer out.Close() if len(contentType) == 0 { contentType = "application/octet-stream" } w.Header().Set("Content-Type", contentType) w.WriteHeader(statusCode) - io.Copy(w, out) + writer := w.(io.Writer) + if flush { + writer = flushwriter.Wrap(w) + } + io.Copy(writer, out) return } writeJSON(statusCode, codec, object, w) diff --git a/pkg/apiserver/apiserver_test.go b/pkg/apiserver/apiserver_test.go index 5631b4df79d..ecc3a6af0b5 100644 --- a/pkg/apiserver/apiserver_test.go +++ b/pkg/apiserver/apiserver_test.go @@ -114,13 +114,17 @@ func init() { // api.Status is returned in errors // "internal" version - api.Scheme.AddKnownTypes("", &Simple{}, &SimpleList{}, &api.Status{}, &api.ListOptions{}) + api.Scheme.AddKnownTypes("", &Simple{}, &SimpleList{}, &api.Status{}, &api.ListOptions{}, &SimpleGetOptions{}) // "version" version // TODO: Use versioned api objects? - api.Scheme.AddKnownTypes(testVersion, &Simple{}, &SimpleList{}, &v1beta1.Status{}) + api.Scheme.AddKnownTypes(testVersion, &Simple{}, &SimpleList{}, &v1beta1.Status{}, &SimpleGetOptions{}) // "version2" version // TODO: Use versioned api objects? - api.Scheme.AddKnownTypes(testVersion2, &Simple{}, &SimpleList{}, &v1beta3.Status{}) + api.Scheme.AddKnownTypes(testVersion2, &Simple{}, &SimpleList{}, &v1beta3.Status{}, &SimpleGetOptions{}) + + // Register SimpleGetOptions with the server versions to convert query params to it + api.Scheme.AddKnownTypes("v1beta1", &SimpleGetOptions{}) + api.Scheme.AddKnownTypes("v1beta3", &SimpleGetOptions{}) nsMapper := newMapper() legacyNsMapper := newMapper() @@ -231,6 +235,14 @@ type Simple struct { func (*Simple) IsAnAPIObject() {} +type SimpleGetOptions struct { + api.TypeMeta `json:",inline"` + Param1 string `json:"param1"` + Param2 string `json:"param2"` +} + +func (*SimpleGetOptions) IsAnAPIObject() {} + type SimpleList struct { api.TypeMeta `json:",inline"` api.ListMeta `json:"metadata,inline"` @@ -254,6 +266,21 @@ func TestSimpleSetupRight(t *testing.T) { } } +func TestSimpleOptionsSetupRight(t *testing.T) { + s := &SimpleGetOptions{} + wire, err := codec.Encode(s) + if err != nil { + t.Fatal(err) + } + s2, err := codec.Decode(wire) + if err != nil { + t.Fatal(err) + } + if !reflect.DeepEqual(s, s2) { + t.Fatalf("encode/decode broken:\n%#v\n%#v\n", s, s2) + } +} + type SimpleRESTStorage struct { errors map[string]error list []Simple @@ -314,10 +341,10 @@ func (s *SimpleStream) Close() error { func (s *SimpleStream) IsAnAPIObject() {} -func (s *SimpleStream) InputStream(version, accept string) (io.ReadCloser, string, error) { +func (s *SimpleStream) InputStream(version, accept string) (io.ReadCloser, bool, string, error) { s.version = version s.accept = accept - return s, s.contentType, s.err + return s, false, s.contentType, s.err } func (storage *SimpleRESTStorage) Get(ctx api.Context, id string) (runtime.Object, error) { @@ -432,6 +459,23 @@ func (m *MetadataRESTStorage) ProducesMIMETypes(method string) []string { return m.types } +type GetWithOptionsRESTStorage struct { + *SimpleRESTStorage + optionsReceived runtime.Object +} + +func (r *GetWithOptionsRESTStorage) Get(ctx api.Context, name string, options runtime.Object) (runtime.Object, error) { + if _, ok := options.(*SimpleGetOptions); !ok { + return nil, fmt.Errorf("Unexpected options object: %#v", options) + } + r.optionsReceived = options + return r.SimpleRESTStorage.Get(ctx, name) +} + +func (r *GetWithOptionsRESTStorage) NewGetOptions() runtime.Object { + return &SimpleGetOptions{} +} + func extractBody(response *http.Response, object runtime.Object) (string, error) { defer response.Body.Close() body, err := ioutil.ReadAll(response.Body) @@ -878,6 +922,47 @@ func TestGetBinary(t *testing.T) { } } +func TestGetWithOptions(t *testing.T) { + storage := map[string]rest.Storage{} + simpleStorage := GetWithOptionsRESTStorage{ + SimpleRESTStorage: &SimpleRESTStorage{ + item: Simple{ + Other: "foo", + }, + }, + } + storage["simple"] = &simpleStorage + handler := handle(storage) + server := httptest.NewServer(handler) + defer server.Close() + + resp, err := http.Get(server.URL + "/api/version/simple/id?param1=test1¶m2=test2") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if resp.StatusCode != http.StatusOK { + t.Fatalf("unexpected response: %#v", resp) + } + var itemOut Simple + body, err := extractBody(resp, &itemOut) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + + if itemOut.Name != simpleStorage.item.Name { + t.Errorf("Unexpected data: %#v, expected %#v (%s)", itemOut, simpleStorage.item, string(body)) + } + + opts, ok := simpleStorage.optionsReceived.(*SimpleGetOptions) + if !ok { + t.Errorf("Unexpected options object received: %#v", simpleStorage.optionsReceived) + return + } + if opts.Param1 != "test1" || opts.Param2 != "test2" { + t.Errorf("Did not receive expected options: %#v", opts) + } +} + func TestGetAlternateSelfLink(t *testing.T) { storage := map[string]rest.Storage{} simpleStorage := SimpleRESTStorage{ diff --git a/pkg/apiserver/resthandler.go b/pkg/apiserver/resthandler.go index 3e6b211a6fc..bd3d351c6f6 100644 --- a/pkg/apiserver/resthandler.go +++ b/pkg/apiserver/resthandler.go @@ -74,8 +74,13 @@ type RequestScope struct { ServerAPIVersion string } -// GetResource returns a function that handles retrieving a single resource from a rest.Storage object. -func GetResource(r rest.Getter, scope RequestScope) restful.RouteFunction { +// getterFunc performs a get request with the given context and object name. The request +// may be used to deserialize an options object to pass to the getter. +type getterFunc func(ctx api.Context, name string, req *restful.Request) (runtime.Object, error) + +// getResourceHandler is an HTTP handler function for get requests. It delegates to the +// passed-in getterFunc to perform the actual get. +func getResourceHandler(scope RequestScope, getter getterFunc) restful.RouteFunction { return func(req *restful.Request, res *restful.Response) { w := res.ResponseWriter namespace, name, err := scope.Namer.Name(req) @@ -86,7 +91,7 @@ func GetResource(r rest.Getter, scope RequestScope) restful.RouteFunction { ctx := scope.ContextFunc(req) ctx = api.WithNamespace(ctx, namespace) - result, err := r.Get(ctx, name) + result, err := getter(ctx, name, req) if err != nil { errorJSON(err, scope.Codec, w) return @@ -99,6 +104,26 @@ func GetResource(r rest.Getter, scope RequestScope) restful.RouteFunction { } } +// GetResource returns a function that handles retrieving a single resource from a rest.Storage object. +func GetResource(r rest.Getter, scope RequestScope) restful.RouteFunction { + return getResourceHandler(scope, + func(ctx api.Context, name string, req *restful.Request) (runtime.Object, error) { + return r.Get(ctx, name) + }) +} + +// GetResourceWithOptions returns a function that handles retrieving a single resource from a rest.Storage object. +func GetResourceWithOptions(r rest.GetterWithOptions, scope RequestScope, getOptionsKind string) restful.RouteFunction { + return getResourceHandler(scope, + func(ctx api.Context, name string, req *restful.Request) (runtime.Object, error) { + opts, err := queryToObject(req.Request.URL.Query(), scope, getOptionsKind) + if err != nil { + return nil, err + } + return r.Get(ctx, name, opts) + }) +} + // ListResource returns a function that handles retrieving a list of resources from a rest.Storage object. func ListResource(r rest.Lister, rw rest.Watcher, scope RequestScope, forceWatch bool) restful.RouteFunction { return func(req *restful.Request, res *restful.Response) {