Merge pull request #5763 from smarterclayton/get_input_parameters_versioned

Expose versioned query parameters and make watch an operation on List
This commit is contained in:
Brian Grant
2015-03-27 14:35:23 -07:00
33 changed files with 538 additions and 226 deletions

View File

@@ -29,7 +29,9 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/meta"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/rest"
"github.com/GoogleCloudPlatform/kubernetes/pkg/conversion"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
watchjson "github.com/GoogleCloudPlatform/kubernetes/pkg/watch/json"
"github.com/emicklei/go-restful"
)
@@ -58,13 +60,6 @@ func (a *APIInstaller) Install() (ws *restful.WebService, errors []error) {
// Create the WebService.
ws = a.newWebService()
// Initialize the custom handlers.
watchHandler := (&WatchHandler{
storage: a.group.Storage,
codec: a.group.Codec,
linker: a.group.Linker,
info: a.info,
})
redirectHandler := (&RedirectHandler{a.group.Storage, a.group.Codec, a.group.Context, a.info})
proxyHandler := (&ProxyHandler{a.prefix + "/proxy/", a.group.Storage, a.group.Codec, a.group.Context, a.info})
@@ -77,7 +72,7 @@ func (a *APIInstaller) Install() (ws *restful.WebService, errors []error) {
}
sort.Strings(paths)
for _, path := range paths {
if err := a.registerResourceHandlers(path, a.group.Storage[path], ws, watchHandler, redirectHandler, proxyHandler); err != nil {
if err := a.registerResourceHandlers(path, a.group.Storage[path], ws, redirectHandler, proxyHandler); err != nil {
errors = append(errors, err)
}
}
@@ -95,10 +90,15 @@ func (a *APIInstaller) newWebService() *restful.WebService {
return ws
}
func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storage, ws *restful.WebService, watchHandler, redirectHandler, proxyHandler http.Handler) error {
func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storage, ws *restful.WebService, redirectHandler, proxyHandler http.Handler) error {
admit := a.group.Admit
context := a.group.Context
serverVersion := a.group.ServerVersion
if len(serverVersion) == 0 {
serverVersion = a.group.Version
}
var resource, subresource string
switch parts := strings.Split(path, "/"); len(parts) {
case 2:
@@ -121,17 +121,6 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
}
versionedObject := indirectArbitraryPointer(versionedPtr)
var versionedList interface{}
if lister, ok := storage.(rest.Lister); ok {
list := lister.NewList()
_, listKind, err := a.group.Typer.ObjectVersionAndKind(list)
versionedListPtr, err := a.group.Creater.New(a.group.Version, listKind)
if err != nil {
return err
}
versionedList = indirectArbitraryPointer(versionedListPtr)
}
mapping, err := a.group.Mapper.RESTMapping(kind, a.group.Version)
if err != nil {
return err
@@ -145,17 +134,33 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
gracefulDeleter, isGracefulDeleter := storage.(rest.GracefulDeleter)
updater, isUpdater := storage.(rest.Updater)
patcher, isPatcher := storage.(rest.Patcher)
_, isWatcher := storage.(rest.Watcher)
watcher, isWatcher := storage.(rest.Watcher)
_, isRedirector := storage.(rest.Redirector)
storageMeta, isMetadata := storage.(rest.StorageMetadata)
if !isMetadata {
storageMeta = defaultStorageMetadata{}
}
var versionedList interface{}
if isLister {
list := lister.NewList()
_, listKind, err := a.group.Typer.ObjectVersionAndKind(list)
versionedListPtr, err := a.group.Creater.New(a.group.Version, listKind)
if err != nil {
return err
}
versionedList = indirectArbitraryPointer(versionedListPtr)
}
versionedListOptions, err := a.group.Creater.New(serverVersion, "ListOptions")
if err != nil {
return err
}
var versionedDeleterObject runtime.Object
switch {
case isGracefulDeleter:
object, err := a.group.Creater.New(a.group.Version, "DeleteOptions")
object, err := a.group.Creater.New(serverVersion, "DeleteOptions")
if err != nil {
return err
}
@@ -288,11 +293,14 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
// test/integration/auth_test.go is currently the most comprehensive status code test
reqScope := RequestScope{
ContextFunc: ctxFn,
Codec: mapping.Codec,
APIVersion: a.group.Version,
Resource: resource,
Kind: kind,
ContextFunc: ctxFn,
Creater: a.group.Creater,
Convertor: a.group.Convertor,
Codec: mapping.Codec,
APIVersion: a.group.Version,
ServerAPIVersion: serverVersion,
Resource: resource,
Kind: kind,
}
for _, action := range actions {
reqScope.Namer = action.Namer
@@ -308,12 +316,15 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
addParams(route, action.Params)
ws.Route(route)
case "LIST": // List all resources of a kind.
route := ws.GET(action.Path).To(ListResource(lister, reqScope)).
route := ws.GET(action.Path).To(ListResource(lister, watcher, reqScope, false)).
Filter(m).
Doc("list objects of kind " + kind).
Operation("list" + kind).
Produces("application/json").
Writes(versionedList)
if err := addObjectParams(ws, route, versionedListOptions); err != nil {
return err
}
addParams(route, action.Params)
ws.Route(route)
case "PUT": // Update a resource.
@@ -356,22 +367,30 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
}
addParams(route, action.Params)
ws.Route(route)
// TODO: deprecated
case "WATCH": // Watch a resource.
route := ws.GET(action.Path).To(routeFunction(watchHandler)).
route := ws.GET(action.Path).To(ListResource(lister, watcher, reqScope, true)).
Filter(m).
Doc("watch a particular " + kind).
Doc("watch changes to an object of kind " + kind).
Operation("watch" + kind).
Produces("application/json").
Writes(versionedObject)
Writes(watchjson.NewWatchEvent())
if err := addObjectParams(ws, route, versionedListOptions); err != nil {
return err
}
addParams(route, action.Params)
ws.Route(route)
// TODO: deprecated
case "WATCHLIST": // Watch all resources of a kind.
route := ws.GET(action.Path).To(routeFunction(watchHandler)).
route := ws.GET(action.Path).To(ListResource(lister, watcher, reqScope, true)).
Filter(m).
Doc("watch a list of " + kind).
Doc("watch individual changes to a list of " + kind).
Operation("watch" + kind + "list").
Produces("application/json").
Writes(versionedList)
Writes(watchjson.NewWatchEvent())
if err := addObjectParams(ws, route, versionedListOptions); err != nil {
return err
}
addParams(route, action.Params)
ws.Route(route)
case "REDIRECT": // Get the redirect URL for a resource.
@@ -651,6 +670,45 @@ func addParams(route *restful.RouteBuilder, params []*restful.Parameter) {
}
}
// addObjectParams converts a runtime.Object into a set of go-restful Param() definitions on the route.
// The object must be a pointer to a struct; only fields at the top level of the struct that are not
// themselves interfaces or structs are used; only fields with a json tag that is non empty (the standard
// Go JSON behavior for omitting a field) become query parameters. The name of the query parameter is
// the JSON field name. If a description struct tag is set on the field, that description is used on the
// query parameter. In essence, it converts a standard JSON top level object into a query param schema.
func addObjectParams(ws *restful.WebService, route *restful.RouteBuilder, obj runtime.Object) error {
sv, err := conversion.EnforcePtr(obj)
if err != nil {
return err
}
st := sv.Type()
switch st.Kind() {
case reflect.Struct:
for i := 0; i < st.NumField(); i++ {
name := st.Field(i).Name
sf, ok := st.FieldByName(name)
if !ok {
continue
}
switch sf.Type.Kind() {
case reflect.Interface, reflect.Struct:
default:
jsonTag := sf.Tag.Get("json")
if len(jsonTag) == 0 {
continue
}
jsonName := strings.SplitN(jsonTag, ",", 2)[0]
if len(jsonName) == 0 {
continue
}
desc := sf.Tag.Get("description")
route.Param(ws.QueryParameter(jsonName, desc).DataType(sf.Type.Name()))
}
}
}
return nil
}
// defaultStorageMetadata provides default answers to rest.StorageMetadata.
type defaultStorageMetadata struct{}

View File

@@ -102,12 +102,19 @@ type APIGroupVersion struct {
Root string
Version string
// ServerVersion controls the Kubernetes APIVersion used for common objects in the apiserver
// schema like api.Status, api.DeleteOptions, and api.ListOptions. Other implementors may
// define a version "v1beta1" but want to use the Kubernetes "v1beta3" internal objects. If
// empty, defaults to Version.
ServerVersion string
Mapper meta.RESTMapper
Codec runtime.Codec
Typer runtime.ObjectTyper
Creater runtime.ObjectCreater
Linker runtime.SelfLinker
Codec runtime.Codec
Typer runtime.ObjectTyper
Creater runtime.ObjectCreater
Convertor runtime.ObjectConvertor
Linker runtime.SelfLinker
Admit admission.Interface
Context api.RequestContextMapper
@@ -197,7 +204,11 @@ func APIVersionHandler(versions ...string) restful.RouteFunction {
}
}
// write renders a returned runtime.Object to the response as a stream or an encoded object.
// write renders a returned runtime.Object to the response as a stream or an encoded object. If the object
// returned by the response implements rest.ResourceStreamer that interface will be used to render the
// response. The Accept header and current API version will be passed in, and the output will be copied
// directly to the response body. If content type is returned it is used, otherwise the content type will
// be "application/octet-stream". All other objects are sent to standard JSON serialization.
func write(statusCode int, apiVersion string, codec runtime.Codec, object runtime.Object, w http.ResponseWriter, req *http.Request) {
if stream, ok := object.(rest.ResourceStreamer); ok {
out, contentType, err := stream.InputStream(apiVersion, req.Header.Get("Accept"))

View File

@@ -37,6 +37,8 @@ import (
apierrs "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/meta"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/rest"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/v1beta1"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/v1beta3"
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
@@ -53,11 +55,21 @@ func convert(obj runtime.Object) (runtime.Object, error) {
return obj, nil
}
// This creates a fake API version, similar to api/latest.go
// This creates a fake API version, similar to api/latest.go for a v1beta1 equivalent api. It is distinct
// from the Kubernetes API versions to allow clients to properly distinguish the two.
const testVersion = "version"
var versions = []string{testVersion}
var codec = runtime.CodecFor(api.Scheme, testVersion)
// The equivalent of the Kubernetes v1beta3 API.
const testVersion2 = "version2"
var versions = []string{testVersion, testVersion2}
var legacyCodec = runtime.CodecFor(api.Scheme, testVersion)
var codec = runtime.CodecFor(api.Scheme, testVersion2)
// these codecs reflect ListOptions/DeleteOptions coming from the serverAPIversion
var versionServerCodec = runtime.CodecFor(api.Scheme, "v1beta1")
var version2ServerCodec = runtime.CodecFor(api.Scheme, "v1beta3")
var accessor = meta.NewAccessor()
var versioner runtime.ResourceVersioner = accessor
var selfLinker runtime.SelfLinker = accessor
@@ -68,6 +80,12 @@ var requestContextMapper api.RequestContextMapper
func interfacesFor(version string) (*meta.VersionInterfaces, error) {
switch version {
case testVersion:
return &meta.VersionInterfaces{
Codec: legacyCodec,
ObjectConvertor: api.Scheme,
MetadataAccessor: accessor,
}, nil
case testVersion2:
return &meta.VersionInterfaces{
Codec: codec,
ObjectConvertor: api.Scheme,
@@ -96,11 +114,13 @@ func init() {
// api.Status is returned in errors
// "internal" version
api.Scheme.AddKnownTypes("", &Simple{}, &SimpleList{},
&api.Status{})
api.Scheme.AddKnownTypes("", &Simple{}, &SimpleList{}, &api.Status{}, &api.ListOptions{})
// "version" version
// TODO: Use versioned api objects?
api.Scheme.AddKnownTypes(testVersion, &Simple{}, &SimpleList{}, &api.DeleteOptions{}, &api.Status{})
api.Scheme.AddKnownTypes(testVersion, &Simple{}, &SimpleList{}, &v1beta1.Status{})
// "version2" version
// TODO: Use versioned api objects?
api.Scheme.AddKnownTypes(testVersion2, &Simple{}, &SimpleList{}, &v1beta3.Status{})
nsMapper := newMapper()
legacyNsMapper := newMapper()
@@ -118,6 +138,18 @@ func init() {
namespaceMapper = nsMapper
admissionControl = admit.NewAlwaysAdmit()
requestContextMapper = api.NewRequestContextMapper()
//mapper.(*meta.DefaultRESTMapper).Add(meta.RESTScopeNamespaceLegacy, "Simple", testVersion, false)
api.Scheme.AddFieldLabelConversionFunc(testVersion, "Simple",
func(label, value string) (string, string, error) {
return label, value, nil
},
)
api.Scheme.AddFieldLabelConversionFunc(testVersion2, "Simple",
func(label, value string) (string, string, error) {
return label, value, nil
},
)
}
// defaultAPIServer exposes nested objects for testability.
@@ -129,45 +161,61 @@ type defaultAPIServer struct {
// uses the default settings
func handle(storage map[string]rest.Storage) http.Handler {
return handleInternal(storage, admissionControl, mapper, selfLinker)
return handleInternal(true, storage, admissionControl, selfLinker)
}
// uses the default settings for a v1beta3 compatible api
func handleNew(storage map[string]rest.Storage) http.Handler {
return handleInternal(false, storage, admissionControl, selfLinker)
}
// tests with a deny admission controller
func handleDeny(storage map[string]rest.Storage) http.Handler {
return handleInternal(storage, deny.NewAlwaysDeny(), mapper, selfLinker)
return handleInternal(true, storage, deny.NewAlwaysDeny(), selfLinker)
}
// tests using the new namespace scope mechanism
func handleNamespaced(storage map[string]rest.Storage) http.Handler {
return handleInternal(storage, admissionControl, namespaceMapper, selfLinker)
return handleInternal(false, storage, admissionControl, selfLinker)
}
// tests using a custom self linker
func handleLinker(storage map[string]rest.Storage, selfLinker runtime.SelfLinker) http.Handler {
return handleInternal(storage, admissionControl, mapper, selfLinker)
return handleInternal(true, storage, admissionControl, selfLinker)
}
func handleInternal(storage map[string]rest.Storage, admissionControl admission.Interface, mapper meta.RESTMapper, selfLinker runtime.SelfLinker) http.Handler {
func handleInternal(legacy bool, storage map[string]rest.Storage, admissionControl admission.Interface, selfLinker runtime.SelfLinker) http.Handler {
group := &APIGroupVersion{
Storage: storage,
Mapper: mapper,
Root: "/api",
Root: "/api",
Version: testVersion,
Creater: api.Scheme,
Typer: api.Scheme,
Codec: codec,
Linker: selfLinker,
Creater: api.Scheme,
Convertor: api.Scheme,
Typer: api.Scheme,
Linker: selfLinker,
Admit: admissionControl,
Context: requestContextMapper,
}
if legacy {
group.Version = testVersion
group.ServerVersion = "v1beta1"
group.Codec = legacyCodec
group.Mapper = legacyNamespaceMapper
} else {
group.Version = testVersion2
group.ServerVersion = "v1beta3"
group.Codec = codec
group.Mapper = namespaceMapper
}
container := restful.NewContainer()
container.Router(restful.CurlyRouter{})
mux := container.ServeMux
group.InstallREST(container)
if err := group.InstallREST(container); err != nil {
panic(fmt.Sprintf("unable to install container %s: %v", group.Version, err))
}
ws := new(restful.WebService)
InstallSupport(mux, ws)
container.Add(ws)
@@ -244,6 +292,8 @@ func (storage *SimpleRESTStorage) List(ctx api.Context, label labels.Selector, f
result := &SimpleList{
Items: storage.list,
}
storage.requestedLabelSelector = label
storage.requestedFieldSelector = field
return result, storage.errors["list"]
}
@@ -522,15 +572,60 @@ func TestList(t *testing.T) {
namespace string
selfLink string
legacy bool
label string
field string
}{
{"/api/version/simple", "", "/api/version/simple?namespace=", true},
{"/api/version/simple?namespace=other", "other", "/api/version/simple?namespace=other", true},
{
url: "/api/version/simple",
namespace: "",
selfLink: "/api/version/simple?namespace=",
legacy: true,
},
{
url: "/api/version/simple?namespace=other",
namespace: "other",
selfLink: "/api/version/simple?namespace=other",
legacy: true,
},
{
url: "/api/version/simple?namespace=other&labels=a%3Db&fields=c%3Dd",
namespace: "other",
selfLink: "/api/version/simple?namespace=other",
legacy: true,
label: "a=b",
field: "c=d",
},
// list items across all namespaces
{"/api/version/simple?namespace=", "", "/api/version/simple?namespace=", true},
{"/api/version/namespaces/default/simple", "default", "/api/version/namespaces/default/simple", false},
{"/api/version/namespaces/other/simple", "other", "/api/version/namespaces/other/simple", false},
{
url: "/api/version/simple?namespace=",
namespace: "",
selfLink: "/api/version/simple?namespace=",
legacy: true,
},
// list items in a namespace, v1beta3+
{
url: "/api/version2/namespaces/default/simple",
namespace: "default",
selfLink: "/api/version2/namespaces/default/simple",
},
{
url: "/api/version2/namespaces/other/simple",
namespace: "other",
selfLink: "/api/version2/namespaces/other/simple",
},
{
url: "/api/version2/namespaces/other/simple?labelSelector=a%3Db&fieldSelector=c%3Dd",
namespace: "other",
selfLink: "/api/version2/namespaces/other/simple",
label: "a=b",
field: "c=d",
},
// list items across all namespaces
{"/api/version/simple", "", "/api/version/simple", false},
{
url: "/api/version2/simple",
namespace: "",
selfLink: "/api/version2/simple",
},
}
for i, testCase := range testCases {
storage := map[string]rest.Storage{}
@@ -545,7 +640,7 @@ func TestList(t *testing.T) {
if testCase.legacy {
handler = handleLinker(storage, selfLinker)
} else {
handler = handleInternal(storage, admissionControl, namespaceMapper, selfLinker)
handler = handleInternal(false, storage, admissionControl, selfLinker)
}
server := httptest.NewServer(handler)
defer server.Close()
@@ -557,6 +652,9 @@ func TestList(t *testing.T) {
}
if resp.StatusCode != http.StatusOK {
t.Errorf("%d: unexpected status: %d, Expected: %d, %#v", i, resp.StatusCode, http.StatusOK, resp)
body, _ := ioutil.ReadAll(resp.Body)
t.Logf("%d: body: %s", string(body))
continue
}
// TODO: future, restore get links
if !selfLinker.called {
@@ -567,6 +665,12 @@ func TestList(t *testing.T) {
} else if simpleStorage.actualNamespace != testCase.namespace {
t.Errorf("%d: unexpected resource namespace: %s", i, simpleStorage.actualNamespace)
}
if simpleStorage.requestedLabelSelector == nil || simpleStorage.requestedLabelSelector.String() != testCase.label {
t.Errorf("%d: unexpected label selector: %v", i, simpleStorage.requestedLabelSelector)
}
if simpleStorage.requestedFieldSelector == nil || simpleStorage.requestedFieldSelector.String() != testCase.field {
t.Errorf("%d: unexpected field selector: %v", i, simpleStorage.requestedFieldSelector)
}
}
}
@@ -821,16 +925,16 @@ func TestGetNamespaceSelfLink(t *testing.T) {
}
selfLinker := &setTestSelfLinker{
t: t,
expectedSet: "/api/version/namespaces/foo/simple/id",
expectedSet: "/api/version2/namespaces/foo/simple/id",
name: "id",
namespace: "foo",
}
storage["simple"] = &simpleStorage
handler := handleInternal(storage, admissionControl, namespaceMapper, selfLinker)
handler := handleInternal(false, storage, admissionControl, selfLinker)
server := httptest.NewServer(handler)
defer server.Close()
resp, err := http.Get(server.URL + "/api/version/namespaces/foo/simple/id")
resp, err := http.Get(server.URL + "/api/version2/namespaces/foo/simple/id")
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
@@ -905,7 +1009,7 @@ func TestDeleteWithOptions(t *testing.T) {
item := &api.DeleteOptions{
GracePeriodSeconds: &grace,
}
body, err := codec.Encode(item)
body, err := versionServerCodec.Encode(item)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
@@ -966,7 +1070,7 @@ func TestLegacyDeleteIgnoresOptions(t *testing.T) {
defer server.Close()
item := api.NewDeleteOptions(300)
body, err := codec.Encode(item)
body, err := versionServerCodec.Encode(item)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
@@ -1575,7 +1679,7 @@ func TestCreateInvokesAdmissionControl(t *testing.T) {
namespace: "other",
expectedSet: "/api/version/foo/bar?namespace=other",
}
handler := handleInternal(map[string]rest.Storage{"foo": &storage}, deny.NewAlwaysDeny(), mapper, selfLinker)
handler := handleInternal(true, map[string]rest.Storage{"foo": &storage}, deny.NewAlwaysDeny(), selfLinker)
server := httptest.NewServer(handler)
defer server.Close()
client := http.Client{}

View File

@@ -294,7 +294,7 @@ func TestProxy(t *testing.T) {
server *httptest.Server
proxyTestPattern string
}{
{namespaceServer, "/api/version/proxy/namespaces/" + item.reqNamespace + "/foo/id" + item.path},
{namespaceServer, "/api/version2/proxy/namespaces/" + item.reqNamespace + "/foo/id" + item.path},
{legacyNamespaceServer, "/api/version/proxy/foo/id" + item.path + "?namespace=" + item.reqNamespace},
}
@@ -348,7 +348,7 @@ func TestProxyUpgrade(t *testing.T) {
server := httptest.NewServer(namespaceHandler)
defer server.Close()
ws, err := websocket.Dial("ws://"+server.Listener.Addr().String()+"/api/version/proxy/namespaces/myns/foo/123", "", "http://127.0.0.1/")
ws, err := websocket.Dial("ws://"+server.Listener.Addr().String()+"/api/version2/proxy/namespaces/myns/foo/123", "", "http://127.0.0.1/")
if err != nil {
t.Fatalf("websocket dial err: %s", err)
}

View File

@@ -105,7 +105,7 @@ func TestRedirectWithNamespaces(t *testing.T) {
for _, item := range table {
simpleStorage.errors["resourceLocation"] = item.err
simpleStorage.resourceLocation = &url.URL{Host: item.id}
resp, err := client.Get(server.URL + "/api/version/redirect/namespaces/other/foo/" + item.id)
resp, err := client.Get(server.URL + "/api/version2/redirect/namespaces/other/foo/" + item.id)
if resp == nil {
t.Fatalf("Unexpected nil resp")
}

View File

@@ -27,8 +27,6 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/rest"
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/emicklei/go-restful"
@@ -64,9 +62,15 @@ type RequestScope struct {
Namer ScopeNamer
ContextFunc
runtime.Codec
Creater runtime.ObjectCreater
Convertor runtime.ObjectConvertor
Resource string
Kind string
APIVersion string
// The version of apiserver resources to use
ServerAPIVersion string
}
// GetResource returns a function that handles retrieving a single resource from a rest.Storage object.
@@ -94,26 +98,8 @@ func GetResource(r rest.Getter, scope RequestScope) restful.RouteFunction {
}
}
func parseSelectorQueryParams(query url.Values, version, apiResource string) (label labels.Selector, field fields.Selector, err error) {
labelString := query.Get(api.LabelSelectorQueryParam(version))
label, err = labels.Parse(labelString)
if err != nil {
return nil, nil, errors.NewBadRequest(fmt.Sprintf("The 'labels' selector parameter (%s) could not be parsed: %v", labelString, err))
}
convertToInternalVersionFunc := func(label, value string) (newLabel, newValue string, err error) {
return api.Scheme.ConvertFieldLabel(version, apiResource, label, value)
}
fieldString := query.Get(api.FieldSelectorQueryParam(version))
field, err = fields.ParseAndTransformSelector(fieldString, convertToInternalVersionFunc)
if err != nil {
return nil, nil, errors.NewBadRequest(fmt.Sprintf("The 'fields' selector parameter (%s) could not be parsed: %v", fieldString, err))
}
return label, field, nil
}
// ListResource returns a function that handles retrieving a list of resources from a rest.Storage object.
func ListResource(r rest.Lister, scope RequestScope) restful.RouteFunction {
func ListResource(r rest.Lister, rw rest.Watcher, scope RequestScope, forceWatch bool) restful.RouteFunction {
return func(req *restful.Request, res *restful.Response) {
w := res.ResponseWriter
@@ -125,13 +111,35 @@ func ListResource(r rest.Lister, scope RequestScope) restful.RouteFunction {
ctx := scope.ContextFunc(req)
ctx = api.WithNamespace(ctx, namespace)
label, field, err := parseSelectorQueryParams(req.Request.URL.Query(), scope.APIVersion, scope.Resource)
out, err := queryToObject(req.Request.URL.Query(), scope, "ListOptions")
if err != nil {
errorJSON(err, scope.Codec, w)
return
}
opts := *out.(*api.ListOptions)
result, err := r.List(ctx, label, field)
// transform fields
fn := func(label, value string) (newLabel, newValue string, err error) {
return scope.Convertor.ConvertFieldLabel(scope.APIVersion, scope.Kind, label, value)
}
if opts.FieldSelector, err = opts.FieldSelector.Transform(fn); err != nil {
// TODO: allow bad request to set field causes based on query parameters
err = errors.NewBadRequest(err.Error())
errorJSON(err, scope.Codec, w)
return
}
if (opts.Watch || forceWatch) && rw != nil {
watcher, err := rw.Watch(ctx, opts.LabelSelector, opts.FieldSelector, opts.ResourceVersion)
if err != nil {
errorJSON(err, scope.Codec, w)
return
}
serveWatch(watcher, scope, w, req)
return
}
result, err := r.List(ctx, opts.LabelSelector, opts.FieldSelector)
if err != nil {
errorJSON(err, scope.Codec, w)
return
@@ -409,6 +417,27 @@ func DeleteResource(r rest.GracefulDeleter, checkBody bool, scope RequestScope,
}
}
// queryToObject converts query parameters into a structured internal object by
// kind. The caller must cast the returned object to the matching internal Kind
// to use it.
// TODO: add appropriate structured error responses
func queryToObject(query url.Values, scope RequestScope, kind string) (runtime.Object, error) {
versioned, err := scope.Creater.New(scope.ServerAPIVersion, kind)
if err != nil {
// programmer error
return nil, err
}
if err := scope.Convertor.Convert(&query, versioned); err != nil {
return nil, errors.NewBadRequest(err.Error())
}
out, err := scope.Convertor.ConvertToVersion(versioned, "")
if err != nil {
// programmer error
return nil, err
}
return out, nil
}
// resultFunc is a function that returns a rest result and can be run in a goroutine
type resultFunc func() (runtime.Object, error)

View File

@@ -17,111 +17,38 @@ limitations under the License.
package apiserver
import (
"fmt"
"net/http"
"path"
"reflect"
"regexp"
"strings"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/rest"
"github.com/GoogleCloudPlatform/kubernetes/pkg/httplog"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
watchjson "github.com/GoogleCloudPlatform/kubernetes/pkg/watch/json"
"github.com/emicklei/go-restful"
"github.com/golang/glog"
"golang.org/x/net/websocket"
)
type WatchHandler struct {
storage map[string]rest.Storage
codec runtime.Codec
linker runtime.SelfLinker
info *APIRequestInfoResolver
}
// setSelfLinkAddName sets the self link, appending the object's name to the canonical path & type.
func (h *WatchHandler) setSelfLinkAddName(obj runtime.Object, req *http.Request) error {
name, err := h.linker.Name(obj)
if err != nil {
return err
}
newURL := *req.URL
newURL.Path = path.Join(req.URL.Path, name)
newURL.RawQuery = ""
newURL.Fragment = ""
return h.linker.SetSelfLink(obj, newURL.String())
}
var connectionUpgradeRegex = regexp.MustCompile("(^|.*,\\s*)upgrade($|\\s*,)")
func isWebsocketRequest(req *http.Request) bool {
return connectionUpgradeRegex.MatchString(strings.ToLower(req.Header.Get("Connection"))) && strings.ToLower(req.Header.Get("Upgrade")) == "websocket"
}
// ServeHTTP processes watch requests.
func (h *WatchHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
var verb string
var apiResource string
var httpCode int
reqStart := time.Now()
defer monitor("watch", &verb, &apiResource, &httpCode, reqStart)
if req.Method != "GET" {
httpCode = errorJSON(errors.NewBadRequest(
fmt.Sprintf("unsupported method for watch: %s", req.Method)), h.codec, w)
return
}
requestInfo, err := h.info.GetAPIRequestInfo(req)
if err != nil {
httpCode = errorJSON(errors.NewBadRequest(
fmt.Sprintf("failed to find api request info: %s", err.Error())), h.codec, w)
return
}
verb = requestInfo.Verb
ctx := api.WithNamespace(api.NewContext(), requestInfo.Namespace)
storage := h.storage[requestInfo.Resource]
if storage == nil {
httpCode = errorJSON(errors.NewNotFound(requestInfo.Resource, "Resource"), h.codec, w)
return
}
apiResource = requestInfo.Resource
watcher, ok := storage.(rest.Watcher)
if !ok {
httpCode = errorJSON(errors.NewMethodNotSupported(requestInfo.Resource, "watch"), h.codec, w)
return
}
label, field, err := parseSelectorQueryParams(req.URL.Query(), requestInfo.APIVersion, apiResource)
if err != nil {
httpCode = errorJSON(err, h.codec, w)
return
}
resourceVersion := req.URL.Query().Get("resourceVersion")
watching, err := watcher.Watch(ctx, label, field, resourceVersion)
if err != nil {
httpCode = errorJSON(err, h.codec, w)
return
}
httpCode = http.StatusOK
// TODO: This is one watch per connection. We want to multiplex, so that
// multiple watches of the same thing don't create two watches downstream.
watchServer := &WatchServer{watching, h.codec, func(obj runtime.Object) {
if err := h.setSelfLinkAddName(obj, req); err != nil {
glog.Errorf("Failed to set self link for object %#v", obj)
// serveWatch handles serving requests to the server
func serveWatch(watcher watch.Interface, scope RequestScope, w http.ResponseWriter, req *restful.Request) {
watchServer := &WatchServer{watcher, scope.Codec, func(obj runtime.Object) {
if err := setSelfLink(obj, req, scope.Namer); err != nil {
glog.V(5).Infof("Failed to set self link for object %v: %v", reflect.TypeOf(obj), err)
}
}}
if isWebsocketRequest(req) {
websocket.Handler(watchServer.HandleWS).ServeHTTP(httplog.Unlogged(w), req)
if isWebsocketRequest(req.Request) {
websocket.Handler(watchServer.HandleWS).ServeHTTP(httplog.Unlogged(w), req.Request)
} else {
watchServer.ServeHTTP(w, req)
watchServer.ServeHTTP(w, req.Request)
}
}

View File

@@ -51,13 +51,13 @@ var watchTestTable = []struct {
func TestWatchWebsocket(t *testing.T) {
simpleStorage := &SimpleRESTStorage{}
_ = rest.Watcher(simpleStorage) // Give compile error if this doesn't work.
handler := handle(map[string]rest.Storage{"foo": simpleStorage})
handler := handle(map[string]rest.Storage{"simples": simpleStorage})
server := httptest.NewServer(handler)
defer server.Close()
dest, _ := url.Parse(server.URL)
dest.Scheme = "ws" // Required by websocket, though the server never sees it.
dest.Path = "/api/version/watch/foo"
dest.Path = "/api/version/watch/simples"
dest.RawQuery = ""
ws, err := websocket.Dial(dest.String(), "", "http://localhost")
@@ -103,13 +103,13 @@ func TestWatchWebsocket(t *testing.T) {
func TestWatchHTTP(t *testing.T) {
simpleStorage := &SimpleRESTStorage{}
handler := handle(map[string]rest.Storage{"foo": simpleStorage})
handler := handle(map[string]rest.Storage{"simples": simpleStorage})
server := httptest.NewServer(handler)
defer server.Close()
client := http.Client{}
dest, _ := url.Parse(server.URL)
dest.Path = "/api/version/watch/foo"
dest.Path = "/api/version/watch/simples"
dest.RawQuery = ""
request, err := http.NewRequest("GET", dest.String(), nil)
@@ -163,17 +163,13 @@ func TestWatchHTTP(t *testing.T) {
}
func TestWatchParamParsing(t *testing.T) {
api.Scheme.AddFieldLabelConversionFunc(testVersion, "foo",
func(label, value string) (string, string, error) {
return label, value, nil
})
simpleStorage := &SimpleRESTStorage{}
handler := handle(map[string]rest.Storage{"foo": simpleStorage})
handler := handle(map[string]rest.Storage{"simples": simpleStorage})
server := httptest.NewServer(handler)
defer server.Close()
dest, _ := url.Parse(server.URL)
dest.Path = "/api/" + testVersion + "/watch/foo"
dest.Path = "/api/" + testVersion + "/watch/simples"
table := []struct {
rawQuery string
@@ -189,13 +185,13 @@ func TestWatchParamParsing(t *testing.T) {
fieldSelector: "",
namespace: api.NamespaceAll,
}, {
rawQuery: "namespace=default&resourceVersion=314159&" + api.FieldSelectorQueryParam(testVersion) + "=Host%3D&" + api.LabelSelectorQueryParam(testVersion) + "=name%3Dfoo",
rawQuery: "namespace=default&resourceVersion=314159&fields=Host%3D&labels=name%3Dfoo",
resourceVersion: "314159",
labelSelector: "name=foo",
fieldSelector: "Host=",
namespace: api.NamespaceDefault,
}, {
rawQuery: "namespace=watchother&" + api.FieldSelectorQueryParam(testVersion) + "=id%3dfoo&resourceVersion=1492",
rawQuery: "namespace=watchother&fields=id%3dfoo&resourceVersion=1492",
resourceVersion: "1492",
labelSelector: "",
fieldSelector: "id=foo",
@@ -238,14 +234,14 @@ func TestWatchParamParsing(t *testing.T) {
func TestWatchProtocolSelection(t *testing.T) {
simpleStorage := &SimpleRESTStorage{}
handler := handle(map[string]rest.Storage{"foo": simpleStorage})
handler := handle(map[string]rest.Storage{"simples": simpleStorage})
server := httptest.NewServer(handler)
defer server.Close()
defer server.CloseClientConnections()
client := http.Client{}
dest, _ := url.Parse(server.URL)
dest.Path = "/api/version/watch/foo"
dest.Path = "/api/version/watch/simples"
dest.RawQuery = ""
table := []struct {