mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-29 14:37:00 +00:00
Add GetterWithOptions and allow stream flushing
In addition to Getter interface, API Installer now supports a GetterWithOptions interface that takes an additional options object when getting a resource. A flag is now returned from rest.ResourceStreamer that indicates whether the streamed response should be flushed when written back to the client. This is to support log streaming.
This commit is contained in:
parent
6dba6aa178
commit
efc7f86baf
@ -28,7 +28,7 @@ import (
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
|
||||
)
|
||||
|
||||
// RESTStorage is a generic interface for RESTful storage services.
|
||||
// Storage is a generic interface for RESTful storage services.
|
||||
// Resources which are exported to the RESTful API of apiserver need to implement this interface. It is expected
|
||||
// that objects may implement any of the below interfaces.
|
||||
type Storage interface {
|
||||
@ -54,6 +54,21 @@ type Getter interface {
|
||||
Get(ctx api.Context, name string) (runtime.Object, error)
|
||||
}
|
||||
|
||||
// GetterWithOptions is an object that retrieve a named RESTful resource and takes
|
||||
// additional options on the get request
|
||||
type GetterWithOptions interface {
|
||||
// Get finds a resource in the storage by name and returns it.
|
||||
// Although it can return an arbitrary error value, IsNotFound(err) is true for the
|
||||
// returned error value err when the specified resource is not found.
|
||||
// The options object passed to it is of the same type returned by the NewGetOptions
|
||||
// method.
|
||||
Get(ctx api.Context, name string, options runtime.Object) (runtime.Object, error)
|
||||
|
||||
// NewGetOptions returns an empty options object that will be used to pass
|
||||
// options to the Get method.
|
||||
NewGetOptions() runtime.Object
|
||||
}
|
||||
|
||||
// Deleter is an object that can delete a named RESTful resource.
|
||||
type Deleter interface {
|
||||
// Delete finds a resource in the storage and deletes it.
|
||||
@ -119,6 +134,7 @@ type CreaterUpdater interface {
|
||||
// CreaterUpdater must satisfy the Updater interface.
|
||||
var _ Updater = CreaterUpdater(nil)
|
||||
|
||||
// Patcher is a storage object that supports both get and update.
|
||||
type Patcher interface {
|
||||
Getter
|
||||
Updater
|
||||
@ -153,11 +169,12 @@ type Redirector interface {
|
||||
// ResourceStreamer is an interface implemented by objects that prefer to be streamed from the server
|
||||
// instead of decoded directly.
|
||||
type ResourceStreamer interface {
|
||||
// InputStream should return an io.Reader if the provided object supports streaming. The desired
|
||||
// InputStream should return an io.ReadCloser if the provided object supports streaming. The desired
|
||||
// api version and a accept header (may be empty) are passed to the call. If no error occurs,
|
||||
// the caller may return a content type string with the reader that indicates the type of the
|
||||
// stream.
|
||||
InputStream(apiVersion, acceptHeader string) (io.ReadCloser, string, error)
|
||||
// the caller may return a flag indicating whether the result should be flushed as writes occur
|
||||
// and a content type string that indicates the type of the stream.
|
||||
// If a null stream is returned, a StatusNoContent response wil be generated.
|
||||
InputStream(apiVersion, acceptHeader string) (stream io.ReadCloser, flush bool, mimeType string, err error)
|
||||
}
|
||||
|
||||
// StorageMetadata is an optional interface that callers can implement to provide additional
|
||||
|
@ -130,6 +130,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
|
||||
creater, isCreater := storage.(rest.Creater)
|
||||
lister, isLister := storage.(rest.Lister)
|
||||
getter, isGetter := storage.(rest.Getter)
|
||||
getterWithOptions, isGetterWithOptions := storage.(rest.GetterWithOptions)
|
||||
deleter, isDeleter := storage.(rest.Deleter)
|
||||
gracefulDeleter, isGracefulDeleter := storage.(rest.GracefulDeleter)
|
||||
updater, isUpdater := storage.(rest.Updater)
|
||||
@ -170,6 +171,17 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
|
||||
gracefulDeleter = rest.GracefulDeleteAdapter{deleter}
|
||||
}
|
||||
|
||||
var getOptions runtime.Object
|
||||
var getOptionsKind string
|
||||
if isGetterWithOptions {
|
||||
getOptions = getterWithOptions.NewGetOptions()
|
||||
_, getOptionsKind, err = a.group.Typer.ObjectVersionAndKind(getOptions)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
isGetter = true
|
||||
}
|
||||
|
||||
var ctxFn ContextFunc
|
||||
ctxFn = func(req *restful.Request) api.Context {
|
||||
if ctx, ok := context.Get(req.Request); ok {
|
||||
@ -316,12 +328,23 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
|
||||
m := monitorFilter(action.Verb, resource)
|
||||
switch action.Verb {
|
||||
case "GET": // Get a resource.
|
||||
route := ws.GET(action.Path).To(GetResource(getter, reqScope)).
|
||||
var handler restful.RouteFunction
|
||||
if isGetterWithOptions {
|
||||
handler = GetResourceWithOptions(getterWithOptions, reqScope, getOptionsKind)
|
||||
} else {
|
||||
handler = GetResource(getter, reqScope)
|
||||
}
|
||||
route := ws.GET(action.Path).To(handler).
|
||||
Filter(m).
|
||||
Doc("read the specified " + kind).
|
||||
Operation("read" + kind).
|
||||
Produces(append(storageMeta.ProducesMIMETypes(action.Verb), "application/json")...).
|
||||
Writes(versionedObject)
|
||||
if isGetterWithOptions {
|
||||
if err := addObjectParams(ws, route, getOptions); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
addParams(route, action.Params)
|
||||
ws.Route(route)
|
||||
case "LIST": // List all resources of a kind.
|
||||
|
@ -36,6 +36,7 @@ import (
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/errors"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/flushwriter"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/version"
|
||||
|
||||
"github.com/emicklei/go-restful"
|
||||
@ -204,18 +205,27 @@ func APIVersionHandler(versions ...string) restful.RouteFunction {
|
||||
// be "application/octet-stream". All other objects are sent to standard JSON serialization.
|
||||
func write(statusCode int, apiVersion string, codec runtime.Codec, object runtime.Object, w http.ResponseWriter, req *http.Request) {
|
||||
if stream, ok := object.(rest.ResourceStreamer); ok {
|
||||
out, contentType, err := stream.InputStream(apiVersion, req.Header.Get("Accept"))
|
||||
out, flush, contentType, err := stream.InputStream(apiVersion, req.Header.Get("Accept"))
|
||||
if err != nil {
|
||||
errorJSONFatal(err, codec, w)
|
||||
return
|
||||
}
|
||||
if out == nil {
|
||||
// No output provided - return StatusNoContent
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
return
|
||||
}
|
||||
defer out.Close()
|
||||
if len(contentType) == 0 {
|
||||
contentType = "application/octet-stream"
|
||||
}
|
||||
w.Header().Set("Content-Type", contentType)
|
||||
w.WriteHeader(statusCode)
|
||||
io.Copy(w, out)
|
||||
writer := w.(io.Writer)
|
||||
if flush {
|
||||
writer = flushwriter.Wrap(w)
|
||||
}
|
||||
io.Copy(writer, out)
|
||||
return
|
||||
}
|
||||
writeJSON(statusCode, codec, object, w)
|
||||
|
@ -114,13 +114,17 @@ func init() {
|
||||
// api.Status is returned in errors
|
||||
|
||||
// "internal" version
|
||||
api.Scheme.AddKnownTypes("", &Simple{}, &SimpleList{}, &api.Status{}, &api.ListOptions{})
|
||||
api.Scheme.AddKnownTypes("", &Simple{}, &SimpleList{}, &api.Status{}, &api.ListOptions{}, &SimpleGetOptions{})
|
||||
// "version" version
|
||||
// TODO: Use versioned api objects?
|
||||
api.Scheme.AddKnownTypes(testVersion, &Simple{}, &SimpleList{}, &v1beta1.Status{})
|
||||
api.Scheme.AddKnownTypes(testVersion, &Simple{}, &SimpleList{}, &v1beta1.Status{}, &SimpleGetOptions{})
|
||||
// "version2" version
|
||||
// TODO: Use versioned api objects?
|
||||
api.Scheme.AddKnownTypes(testVersion2, &Simple{}, &SimpleList{}, &v1beta3.Status{})
|
||||
api.Scheme.AddKnownTypes(testVersion2, &Simple{}, &SimpleList{}, &v1beta3.Status{}, &SimpleGetOptions{})
|
||||
|
||||
// Register SimpleGetOptions with the server versions to convert query params to it
|
||||
api.Scheme.AddKnownTypes("v1beta1", &SimpleGetOptions{})
|
||||
api.Scheme.AddKnownTypes("v1beta3", &SimpleGetOptions{})
|
||||
|
||||
nsMapper := newMapper()
|
||||
legacyNsMapper := newMapper()
|
||||
@ -231,6 +235,14 @@ type Simple struct {
|
||||
|
||||
func (*Simple) IsAnAPIObject() {}
|
||||
|
||||
type SimpleGetOptions struct {
|
||||
api.TypeMeta `json:",inline"`
|
||||
Param1 string `json:"param1"`
|
||||
Param2 string `json:"param2"`
|
||||
}
|
||||
|
||||
func (*SimpleGetOptions) IsAnAPIObject() {}
|
||||
|
||||
type SimpleList struct {
|
||||
api.TypeMeta `json:",inline"`
|
||||
api.ListMeta `json:"metadata,inline"`
|
||||
@ -254,6 +266,21 @@ func TestSimpleSetupRight(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestSimpleOptionsSetupRight(t *testing.T) {
|
||||
s := &SimpleGetOptions{}
|
||||
wire, err := codec.Encode(s)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
s2, err := codec.Decode(wire)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if !reflect.DeepEqual(s, s2) {
|
||||
t.Fatalf("encode/decode broken:\n%#v\n%#v\n", s, s2)
|
||||
}
|
||||
}
|
||||
|
||||
type SimpleRESTStorage struct {
|
||||
errors map[string]error
|
||||
list []Simple
|
||||
@ -314,10 +341,10 @@ func (s *SimpleStream) Close() error {
|
||||
|
||||
func (s *SimpleStream) IsAnAPIObject() {}
|
||||
|
||||
func (s *SimpleStream) InputStream(version, accept string) (io.ReadCloser, string, error) {
|
||||
func (s *SimpleStream) InputStream(version, accept string) (io.ReadCloser, bool, string, error) {
|
||||
s.version = version
|
||||
s.accept = accept
|
||||
return s, s.contentType, s.err
|
||||
return s, false, s.contentType, s.err
|
||||
}
|
||||
|
||||
func (storage *SimpleRESTStorage) Get(ctx api.Context, id string) (runtime.Object, error) {
|
||||
@ -432,6 +459,23 @@ func (m *MetadataRESTStorage) ProducesMIMETypes(method string) []string {
|
||||
return m.types
|
||||
}
|
||||
|
||||
type GetWithOptionsRESTStorage struct {
|
||||
*SimpleRESTStorage
|
||||
optionsReceived runtime.Object
|
||||
}
|
||||
|
||||
func (r *GetWithOptionsRESTStorage) Get(ctx api.Context, name string, options runtime.Object) (runtime.Object, error) {
|
||||
if _, ok := options.(*SimpleGetOptions); !ok {
|
||||
return nil, fmt.Errorf("Unexpected options object: %#v", options)
|
||||
}
|
||||
r.optionsReceived = options
|
||||
return r.SimpleRESTStorage.Get(ctx, name)
|
||||
}
|
||||
|
||||
func (r *GetWithOptionsRESTStorage) NewGetOptions() runtime.Object {
|
||||
return &SimpleGetOptions{}
|
||||
}
|
||||
|
||||
func extractBody(response *http.Response, object runtime.Object) (string, error) {
|
||||
defer response.Body.Close()
|
||||
body, err := ioutil.ReadAll(response.Body)
|
||||
@ -878,6 +922,47 @@ func TestGetBinary(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetWithOptions(t *testing.T) {
|
||||
storage := map[string]rest.Storage{}
|
||||
simpleStorage := GetWithOptionsRESTStorage{
|
||||
SimpleRESTStorage: &SimpleRESTStorage{
|
||||
item: Simple{
|
||||
Other: "foo",
|
||||
},
|
||||
},
|
||||
}
|
||||
storage["simple"] = &simpleStorage
|
||||
handler := handle(storage)
|
||||
server := httptest.NewServer(handler)
|
||||
defer server.Close()
|
||||
|
||||
resp, err := http.Get(server.URL + "/api/version/simple/id?param1=test1¶m2=test2")
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
t.Fatalf("unexpected response: %#v", resp)
|
||||
}
|
||||
var itemOut Simple
|
||||
body, err := extractBody(resp, &itemOut)
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
if itemOut.Name != simpleStorage.item.Name {
|
||||
t.Errorf("Unexpected data: %#v, expected %#v (%s)", itemOut, simpleStorage.item, string(body))
|
||||
}
|
||||
|
||||
opts, ok := simpleStorage.optionsReceived.(*SimpleGetOptions)
|
||||
if !ok {
|
||||
t.Errorf("Unexpected options object received: %#v", simpleStorage.optionsReceived)
|
||||
return
|
||||
}
|
||||
if opts.Param1 != "test1" || opts.Param2 != "test2" {
|
||||
t.Errorf("Did not receive expected options: %#v", opts)
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetAlternateSelfLink(t *testing.T) {
|
||||
storage := map[string]rest.Storage{}
|
||||
simpleStorage := SimpleRESTStorage{
|
||||
|
@ -74,8 +74,13 @@ type RequestScope struct {
|
||||
ServerAPIVersion string
|
||||
}
|
||||
|
||||
// GetResource returns a function that handles retrieving a single resource from a rest.Storage object.
|
||||
func GetResource(r rest.Getter, scope RequestScope) restful.RouteFunction {
|
||||
// getterFunc performs a get request with the given context and object name. The request
|
||||
// may be used to deserialize an options object to pass to the getter.
|
||||
type getterFunc func(ctx api.Context, name string, req *restful.Request) (runtime.Object, error)
|
||||
|
||||
// getResourceHandler is an HTTP handler function for get requests. It delegates to the
|
||||
// passed-in getterFunc to perform the actual get.
|
||||
func getResourceHandler(scope RequestScope, getter getterFunc) restful.RouteFunction {
|
||||
return func(req *restful.Request, res *restful.Response) {
|
||||
w := res.ResponseWriter
|
||||
namespace, name, err := scope.Namer.Name(req)
|
||||
@ -86,7 +91,7 @@ func GetResource(r rest.Getter, scope RequestScope) restful.RouteFunction {
|
||||
ctx := scope.ContextFunc(req)
|
||||
ctx = api.WithNamespace(ctx, namespace)
|
||||
|
||||
result, err := r.Get(ctx, name)
|
||||
result, err := getter(ctx, name, req)
|
||||
if err != nil {
|
||||
errorJSON(err, scope.Codec, w)
|
||||
return
|
||||
@ -99,6 +104,26 @@ func GetResource(r rest.Getter, scope RequestScope) restful.RouteFunction {
|
||||
}
|
||||
}
|
||||
|
||||
// GetResource returns a function that handles retrieving a single resource from a rest.Storage object.
|
||||
func GetResource(r rest.Getter, scope RequestScope) restful.RouteFunction {
|
||||
return getResourceHandler(scope,
|
||||
func(ctx api.Context, name string, req *restful.Request) (runtime.Object, error) {
|
||||
return r.Get(ctx, name)
|
||||
})
|
||||
}
|
||||
|
||||
// GetResourceWithOptions returns a function that handles retrieving a single resource from a rest.Storage object.
|
||||
func GetResourceWithOptions(r rest.GetterWithOptions, scope RequestScope, getOptionsKind string) restful.RouteFunction {
|
||||
return getResourceHandler(scope,
|
||||
func(ctx api.Context, name string, req *restful.Request) (runtime.Object, error) {
|
||||
opts, err := queryToObject(req.Request.URL.Query(), scope, getOptionsKind)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return r.Get(ctx, name, opts)
|
||||
})
|
||||
}
|
||||
|
||||
// ListResource returns a function that handles retrieving a list of resources from a rest.Storage object.
|
||||
func ListResource(r rest.Lister, rw rest.Watcher, scope RequestScope, forceWatch bool) restful.RouteFunction {
|
||||
return func(req *restful.Request, res *restful.Response) {
|
||||
|
Loading…
Reference in New Issue
Block a user