diff --git a/pkg/api/errors/errors.go b/pkg/api/errors/errors.go index e4345761ab1..6507ed8082c 100644 --- a/pkg/api/errors/errors.go +++ b/pkg/api/errors/errors.go @@ -29,6 +29,12 @@ import ( const ( StatusUnprocessableEntity = 422 StatusTooManyRequests = 429 + // HTTP recommendations are for servers to define 5xx error codes + // for scenarios not covered by behavior. In this case, TryAgainLater + // is an indication that a transient server error has occured and the + // client *should* retry, with an optional Retry-After header to specify + // the back off window. + StatusTryAgainLater = 504 ) // StatusError is an error intended for consumption by a REST API server; it can also be @@ -202,6 +208,17 @@ func NewInternalError(err error) error { }} } +// NewTimeoutError returns an error indicating that a timeout occurred before the request +// could be completed. Clients may retry, but the operation may still complete. +func NewTimeoutError(message string) error { + return &StatusError{api.Status{ + Status: api.StatusFailure, + Code: StatusTryAgainLater, + Reason: api.StatusReasonTimeout, + Message: fmt.Sprintf("Timeout: %s", message), + }} +} + // IsNotFound returns true if the specified error was created by NewNotFoundErr. func IsNotFound(err error) bool { return reasonForError(err) == api.StatusReasonNotFound diff --git a/pkg/api/rest/resttest/resttest.go b/pkg/api/rest/resttest/resttest.go index fa3fb7de03e..a5bceff4fd6 100644 --- a/pkg/api/rest/resttest/resttest.go +++ b/pkg/api/rest/resttest/resttest.go @@ -85,12 +85,12 @@ func (t *Tester) TestCreateResetsUserData(valid runtime.Object) { objectMeta.UID = "bad-uid" objectMeta.CreationTimestamp = now - channel, err := t.storage.(apiserver.RESTCreater).Create(api.NewDefaultContext(), valid) + obj, err := t.storage.(apiserver.RESTCreater).Create(api.NewDefaultContext(), valid) if err != nil { t.Fatalf("Unexpected error: %v", err) } - if obj := <-channel; obj.Object == nil { - t.Fatalf("Unexpected object from channel: %#v", obj) + if obj == nil { + t.Fatalf("Unexpected object from result: %#v", obj) } if objectMeta.UID == "bad-uid" || objectMeta.CreationTimestamp == now { t.Errorf("ObjectMeta did not reset basic fields: %#v", objectMeta) @@ -111,12 +111,12 @@ func (t *Tester) TestCreateHasMetadata(valid runtime.Object) { context = api.NewContext() } - channel, err := t.storage.(apiserver.RESTCreater).Create(context, valid) + obj, err := t.storage.(apiserver.RESTCreater).Create(context, valid) if err != nil { t.Fatalf("Unexpected error: %v", err) } - if obj := <-channel; obj.Object == nil { - t.Fatalf("Unexpected object from channel: %#v", obj) + if obj == nil { + t.Fatalf("Unexpected object from result: %#v", obj) } if !api.HasObjectMetaSystemFieldValues(objectMeta) { t.Errorf("storage did not populate object meta field values") @@ -148,12 +148,8 @@ func (t *Tester) TestCreateGeneratesNameReturnsTryAgain(valid runtime.Object) { objectMeta.GenerateName = "test-" t.withStorageError(errors.NewAlreadyExists("kind", "thing"), func() { - ch, err := t.storage.(apiserver.RESTCreater).Create(api.NewDefaultContext(), valid) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - res := <-ch - if err := errors.FromObject(res.Object); err == nil || !errors.IsTryAgainLater(err) { + _, err := t.storage.(apiserver.RESTCreater).Create(api.NewDefaultContext(), valid) + if err == nil || !errors.IsTryAgainLater(err) { t.Fatalf("Unexpected error: %v", err) } }) diff --git a/pkg/apiserver/api_installer.go b/pkg/apiserver/api_installer.go index c0b19bb7a00..fc3c3b91574 100644 --- a/pkg/apiserver/api_installer.go +++ b/pkg/apiserver/api_installer.go @@ -17,20 +17,24 @@ limitations under the License. package apiserver import ( + "fmt" "net/http" + "net/url" + gpath "path" "reflect" + "strings" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/meta" + "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/emicklei/go-restful" ) type APIInstaller struct { - prefix string // Path prefix where API resources are to be registered. - version string // The API version being installed. - restHandler *RESTHandler - mapper meta.RESTMapper + group *APIGroupVersion + prefix string // Path prefix where API resources are to be registered. + version string // The API version being installed. } // Struct capturing information about an action ("GET", "POST", "WATCH", PROXY", etc). @@ -40,6 +44,9 @@ type action struct { Params []*restful.Parameter // List of parameters associated with the action. } +// errEmptyName is returned when API requests do not fill the name section of the path. +var errEmptyName = fmt.Errorf("name must be provided") + // Installs handlers for API resources. func (a *APIInstaller) Install() (ws *restful.WebService, errors []error) { errors = make([]error, 0) @@ -49,16 +56,16 @@ func (a *APIInstaller) Install() (ws *restful.WebService, errors []error) { // Initialize the custom handlers. watchHandler := (&WatchHandler{ - storage: a.restHandler.storage, - codec: a.restHandler.codec, - canonicalPrefix: a.restHandler.canonicalPrefix, - selfLinker: a.restHandler.selfLinker, - apiRequestInfoResolver: a.restHandler.apiRequestInfoResolver, + storage: a.group.storage, + codec: a.group.codec, + prefix: a.group.prefix, + linker: a.group.linker, + info: a.group.info, }) - redirectHandler := (&RedirectHandler{a.restHandler.storage, a.restHandler.codec, a.restHandler.apiRequestInfoResolver}) - proxyHandler := (&ProxyHandler{a.prefix + "/proxy/", a.restHandler.storage, a.restHandler.codec, a.restHandler.apiRequestInfoResolver}) + redirectHandler := (&RedirectHandler{a.group.storage, a.group.codec, a.group.info}) + proxyHandler := (&ProxyHandler{a.prefix + "/proxy/", a.group.storage, a.group.codec, a.group.info}) - for path, storage := range a.restHandler.storage { + for path, storage := range a.group.storage { if err := a.registerResourceHandlers(path, storage, ws, watchHandler, redirectHandler, proxyHandler); err != nil { errors = append(errors, err) } @@ -78,8 +85,11 @@ func (a *APIInstaller) newWebService() *restful.WebService { } func (a *APIInstaller) registerResourceHandlers(path string, storage RESTStorage, ws *restful.WebService, watchHandler http.Handler, redirectHandler http.Handler, proxyHandler http.Handler) error { - // Handler for standard REST verbs (GET, PUT, POST and DELETE). - restVerbHandler := restfulStripPrefix(a.prefix, a.restHandler) + codec := a.group.codec + admit := a.group.admit + linker := a.group.linker + resource := path + object := storage.New() // TODO: add scheme to APIInstaller rather than using api.Scheme _, kind, err := api.Scheme.ObjectVersionAndKind(object) @@ -103,28 +113,31 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage RESTStorage versionedList = indirectArbitraryPointer(versionedListPtr) } - mapping, err := a.mapper.RESTMapping(kind, a.version) + mapping, err := a.group.mapper.RESTMapping(kind, a.version) if err != nil { return err } // what verbs are supported by the storage, used to know what verbs we support per path storageVerbs := map[string]bool{} - if _, ok := storage.(RESTCreater); ok { - // Handler for standard REST verbs (GET, PUT, POST and DELETE). + creater, ok := storage.(RESTCreater) + if ok { storageVerbs["RESTCreater"] = true } - if _, ok := storage.(RESTLister); ok { - // Handler for standard REST verbs (GET, PUT, POST and DELETE). + lister, ok := storage.(RESTLister) + if ok { storageVerbs["RESTLister"] = true } - if _, ok := storage.(RESTGetter); ok { + getter, ok := storage.(RESTGetter) + if ok { storageVerbs["RESTGetter"] = true } - if _, ok := storage.(RESTDeleter); ok { + deleter, ok := storage.(RESTDeleter) + if ok { storageVerbs["RESTDeleter"] = true } - if _, ok := storage.(RESTUpdater); ok { + updater, ok := storage.(RESTUpdater) + if ok { storageVerbs["RESTUpdater"] = true } if _, ok := storage.(ResourceWatcher); ok { @@ -134,6 +147,14 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage RESTStorage storageVerbs["Redirector"] = true } + var namespaceFn ResourceNamespaceFunc + var nameFn ResourceNameFunc + var generateLinkFn linkFunc + var objNameFn ObjectNameFunc + linkFn := func(req *restful.Request, obj runtime.Object) error { + return setSelfLink(obj, req.Request, a.group.linker, generateLinkFn) + } + allowWatchList := storageVerbs["ResourceWatcher"] && storageVerbs["RESTLister"] // watching on lists is allowed only for kinds that support both watch and list. scope := mapping.Scope nameParam := ws.PathParameter("name", "name of the "+kind).DataType("string") @@ -141,6 +162,14 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage RESTStorage actions := []action{} // Get the list of actions for the given scope. if scope.Name() != meta.RESTScopeNameNamespace { + objNameFn = func(obj runtime.Object) (namespace, name string, err error) { + name, err = linker.Name(obj) + if len(name) == 0 { + err = errEmptyName + } + return + } + // Handler for standard REST verbs (GET, PUT, POST and DELETE). actions = appendIf(actions, action{"LIST", path, params}, storageVerbs["RESTLister"]) actions = appendIf(actions, action{"POST", path, params}, storageVerbs["RESTCreater"]) @@ -148,6 +177,22 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage RESTStorage itemPath := path + "/{name}" nameParams := append(params, nameParam) + namespaceFn = func(req *restful.Request) (namespace string, err error) { + return + } + nameFn = func(req *restful.Request) (namespace, name string, err error) { + name = req.PathParameter("name") + if len(name) == 0 { + err = errEmptyName + } + return + } + generateLinkFn = func(namespace, name string) (path string, query string) { + path = strings.Replace(itemPath, "{name}", name, 1) + path = gpath.Join(a.prefix, path) + return + } + actions = appendIf(actions, action{"GET", itemPath, nameParams}, storageVerbs["RESTGetter"]) actions = appendIf(actions, action{"PUT", itemPath, nameParams}, storageVerbs["RESTUpdater"]) actions = appendIf(actions, action{"DELETE", itemPath, nameParams}, storageVerbs["RESTDeleter"]) @@ -156,18 +201,49 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage RESTStorage actions = appendIf(actions, action{"PROXY", "/proxy/" + itemPath + "/{path:*}", nameParams}, storageVerbs["Redirector"]) actions = appendIf(actions, action{"PROXY", "/proxy/" + itemPath, nameParams}, storageVerbs["Redirector"]) } else { + objNameFn = func(obj runtime.Object) (namespace, name string, err error) { + if name, err = linker.Name(obj); err != nil { + return + } + namespace, err = linker.Namespace(obj) + return + } + // v1beta3 format with namespace in path if scope.ParamPath() { // Handler for standard REST verbs (GET, PUT, POST and DELETE). namespaceParam := ws.PathParameter(scope.ParamName(), scope.ParamDescription()).DataType("string") namespacedPath := scope.ParamName() + "/{" + scope.ParamName() + "}/" + path namespaceParams := []*restful.Parameter{namespaceParam} + namespaceFn = func(req *restful.Request) (namespace string, err error) { + namespace = req.PathParameter(scope.ParamName()) + if len(namespace) == 0 { + namespace = api.NamespaceDefault + } + return + } + actions = appendIf(actions, action{"LIST", namespacedPath, namespaceParams}, storageVerbs["RESTLister"]) actions = appendIf(actions, action{"POST", namespacedPath, namespaceParams}, storageVerbs["RESTCreater"]) actions = appendIf(actions, action{"WATCHLIST", "/watch/" + namespacedPath, namespaceParams}, allowWatchList) itemPath := namespacedPath + "/{name}" nameParams := append(namespaceParams, nameParam) + nameFn = func(req *restful.Request) (namespace, name string, err error) { + namespace, _ = namespaceFn(req) + name = req.PathParameter("name") + if len(name) == 0 { + err = errEmptyName + } + return + } + generateLinkFn = func(namespace, name string) (path string, query string) { + path = strings.Replace(itemPath, "{name}", name, 1) + path = strings.Replace(path, "{"+scope.ParamName()+"}", namespace, 1) + path = gpath.Join(a.prefix, path) + return + } + actions = appendIf(actions, action{"GET", itemPath, nameParams}, storageVerbs["RESTGetter"]) actions = appendIf(actions, action{"PUT", itemPath, nameParams}, storageVerbs["RESTUpdater"]) actions = appendIf(actions, action{"DELETE", itemPath, nameParams}, storageVerbs["RESTDeleter"]) @@ -184,12 +260,39 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage RESTStorage // v1beta1/v1beta2 format where namespace was a query parameter namespaceParam := ws.QueryParameter(scope.ParamName(), scope.ParamDescription()).DataType("string") namespaceParams := []*restful.Parameter{namespaceParam} + namespaceFn = func(req *restful.Request) (namespace string, err error) { + namespace = req.QueryParameter(scope.ParamName()) + if len(namespace) == 0 { + namespace = api.NamespaceDefault + } + return + } + actions = appendIf(actions, action{"LIST", path, namespaceParams}, storageVerbs["RESTLister"]) actions = appendIf(actions, action{"POST", path, namespaceParams}, storageVerbs["RESTCreater"]) actions = appendIf(actions, action{"WATCHLIST", "/watch/" + path, namespaceParams}, allowWatchList) itemPath := path + "/{name}" nameParams := append(namespaceParams, nameParam) + nameFn = func(req *restful.Request) (namespace, name string, err error) { + namespace, _ = namespaceFn(req) + name = req.PathParameter("name") + if len(name) == 0 { + err = errEmptyName + } + return + } + generateLinkFn = func(namespace, name string) (path string, query string) { + path = strings.Replace(itemPath, "{name}", name, -1) + path = gpath.Join(a.prefix, path) + if len(namespace) > 0 { + values := make(url.Values) + values.Set(scope.ParamName(), namespace) + query = values.Encode() + } + return + } + actions = appendIf(actions, action{"GET", itemPath, nameParams}, storageVerbs["RESTGetter"]) actions = appendIf(actions, action{"PUT", itemPath, nameParams}, storageVerbs["RESTUpdater"]) actions = appendIf(actions, action{"DELETE", itemPath, nameParams}, storageVerbs["RESTDeleter"]) @@ -218,43 +321,50 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage RESTStorage // test/integration/auth_test.go is currently the most comprehensive status code test for _, action := range actions { + m := monitorFilter(action.Verb, resource) switch action.Verb { case "GET": // Get a resource. - route := ws.GET(action.Path).To(restVerbHandler). + route := ws.GET(action.Path).To(GetResource(getter, nameFn, linkFn, codec)). + Filter(m). Doc("read the specified " + kind). Operation("read" + kind). Writes(versionedObject) addParams(route, action.Params) ws.Route(route) case "LIST": // List all resources of a kind. - route := ws.GET(action.Path).To(restVerbHandler). + route := ws.GET(action.Path).To(ListResource(lister, namespaceFn, linkFn, codec)). + Filter(m). Doc("list objects of kind " + kind). Operation("list" + kind). Writes(versionedList) addParams(route, action.Params) ws.Route(route) case "PUT": // Update a resource. - route := ws.PUT(action.Path).To(restVerbHandler). + route := ws.PUT(action.Path).To(UpdateResource(updater, nameFn, objNameFn, linkFn, codec, resource, admit)). + Filter(m). Doc("update the specified " + kind). Operation("update" + kind). Reads(versionedObject) addParams(route, action.Params) ws.Route(route) case "POST": // Create a resource. - route := ws.POST(action.Path).To(restVerbHandler). + route := ws.POST(action.Path).To(CreateResource(creater, namespaceFn, linkFn, codec, resource, admit)). + Filter(m). Doc("create a " + kind). Operation("create" + kind). Reads(versionedObject) addParams(route, action.Params) ws.Route(route) case "DELETE": // Delete a resource. - route := ws.DELETE(action.Path).To(restVerbHandler). + route := ws.DELETE(action.Path).To(DeleteResource(deleter, nameFn, linkFn, codec, resource, kind, admit)). + Filter(m). Doc("delete a " + kind). Operation("delete" + kind) addParams(route, action.Params) ws.Route(route) case "WATCH": // Watch a resource. route := ws.GET(action.Path).To(restfulStripPrefix(a.prefix+"/watch", watchHandler)). + Filter(m). Doc("watch a particular " + kind). Operation("watch" + kind). Writes(versionedObject) @@ -262,6 +372,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage RESTStorage ws.Route(route) case "WATCHLIST": // Watch all resources of a kind. route := ws.GET(action.Path).To(restfulStripPrefix(a.prefix+"/watch", watchHandler)). + Filter(m). Doc("watch a list of " + kind). Operation("watch" + kind + "list"). Writes(versionedList) @@ -269,6 +380,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage RESTStorage ws.Route(route) case "REDIRECT": // Get the redirect URL for a resource. route := ws.GET(action.Path).To(restfulStripPrefix(a.prefix+"/redirect", redirectHandler)). + Filter(m). Doc("redirect GET request to " + kind). Operation("redirect" + kind). Produces("*/*"). @@ -277,10 +389,12 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage RESTStorage ws.Route(route) case "PROXY": // Proxy requests to a resource. // Accept all methods as per https://github.com/GoogleCloudPlatform/kubernetes/issues/3996 - addProxyRoute(ws, "GET", a.prefix, action.Path, proxyHandler, kind, action.Params) - addProxyRoute(ws, "PUT", a.prefix, action.Path, proxyHandler, kind, action.Params) - addProxyRoute(ws, "POST", a.prefix, action.Path, proxyHandler, kind, action.Params) - addProxyRoute(ws, "DELETE", a.prefix, action.Path, proxyHandler, kind, action.Params) + addProxyRoute(ws, "GET", a.prefix, action.Path, proxyHandler, kind, resource, action.Params) + addProxyRoute(ws, "PUT", a.prefix, action.Path, proxyHandler, kind, resource, action.Params) + addProxyRoute(ws, "POST", a.prefix, action.Path, proxyHandler, kind, resource, action.Params) + addProxyRoute(ws, "DELETE", a.prefix, action.Path, proxyHandler, kind, resource, action.Params) + default: + return fmt.Errorf("unrecognized action verb: %s", action.Verb) } // Note: update GetAttribs() when adding a custom handler. } @@ -306,8 +420,9 @@ func restfulStripPrefix(prefix string, handler http.Handler) restful.RouteFuncti } } -func addProxyRoute(ws *restful.WebService, method string, prefix string, path string, proxyHandler http.Handler, kind string, params []*restful.Parameter) { +func addProxyRoute(ws *restful.WebService, method string, prefix string, path string, proxyHandler http.Handler, kind, resource string, params []*restful.Parameter) { proxyRoute := ws.Method(method).Path(path).To(restfulStripPrefix(prefix+"/proxy", proxyHandler)). + Filter(monitorFilter("PROXY", resource)). Doc("proxy " + method + " requests to " + kind). Operation("proxy" + method + kind). Produces("*/*"). diff --git a/pkg/apiserver/apiserver.go b/pkg/apiserver/apiserver.go index 738639eb257..68db292b60a 100644 --- a/pkg/apiserver/apiserver.go +++ b/pkg/apiserver/apiserver.go @@ -73,6 +73,15 @@ func monitor(handler, verb, resource string, httpCode int, reqStart time.Time) { requestLatencies.WithLabelValues(handler, verb).Observe(float64((time.Since(reqStart)) / time.Microsecond)) } +// monitorFilter creates a filter that reports the metrics for a given resource and action. +func monitorFilter(action, resource string) restful.FilterFunction { + return func(req *restful.Request, res *restful.Response, chain *restful.FilterChain) { + reqStart := time.Now() + chain.ProcessFilter(req, res) + monitor("rest", action, resource, res.StatusCode(), reqStart) + } +} + // mux is an object that can register http handlers. type Mux interface { Handle(pattern string, handler http.Handler) @@ -89,9 +98,9 @@ type defaultAPIServer struct { // as RESTful resources at prefix, serialized by codec, and also includes the support // http resources. // Note: This method is used only in tests. -func Handle(storage map[string]RESTStorage, codec runtime.Codec, root string, version string, selfLinker runtime.SelfLinker, admissionControl admission.Interface, mapper meta.RESTMapper) http.Handler { - prefix := root + "/" + version - group := NewAPIGroupVersion(storage, codec, root, prefix, selfLinker, admissionControl, mapper) +func Handle(storage map[string]RESTStorage, codec runtime.Codec, root string, version string, linker runtime.SelfLinker, admissionControl admission.Interface, mapper meta.RESTMapper) http.Handler { + prefix := path.Join(root, version) + group := NewAPIGroupVersion(storage, codec, root, prefix, linker, admissionControl, mapper) container := restful.NewContainer() container.Router(restful.CurlyRouter{}) mux := container.ServeMux @@ -102,16 +111,19 @@ func Handle(storage map[string]RESTStorage, codec runtime.Codec, root string, ve return &defaultAPIServer{mux, group} } -// TODO: This is a whole API version right now. Maybe should rename it. -// APIGroupVersion is a http.Handler that exposes multiple RESTStorage objects +// APIGroupVersion is a helper for exposing RESTStorage objects as http.Handlers via go-restful // It handles URLs of the form: // /${storage_key}[/${object_name}] // Where 'storage_key' points to a RESTStorage object stored in storage. -// -// TODO: consider migrating this to go-restful which is a more full-featured version of the same thing. type APIGroupVersion struct { - handler RESTHandler + storage map[string]RESTStorage + codec runtime.Codec + prefix string + linker runtime.SelfLinker + admit admission.Interface mapper meta.RESTMapper + // TODO: put me into a cleaner interface + info *APIRequestInfoResolver } // NewAPIGroupVersion returns an object that will serve a set of REST resources and their @@ -119,18 +131,15 @@ type APIGroupVersion struct { // This is a helper method for registering multiple sets of REST handlers under different // prefixes onto a server. // TODO: add multitype codec serialization -func NewAPIGroupVersion(storage map[string]RESTStorage, codec runtime.Codec, apiRoot, canonicalPrefix string, selfLinker runtime.SelfLinker, admissionControl admission.Interface, mapper meta.RESTMapper) *APIGroupVersion { +func NewAPIGroupVersion(storage map[string]RESTStorage, codec runtime.Codec, root, prefix string, linker runtime.SelfLinker, admissionControl admission.Interface, mapper meta.RESTMapper) *APIGroupVersion { return &APIGroupVersion{ - handler: RESTHandler{ - storage: storage, - codec: codec, - canonicalPrefix: canonicalPrefix, - selfLinker: selfLinker, - ops: NewOperations(), - admissionControl: admissionControl, - apiRequestInfoResolver: &APIRequestInfoResolver{util.NewStringSet(apiRoot), latest.RESTMapper}, - }, - mapper: mapper, + storage: storage, + codec: codec, + prefix: prefix, + linker: linker, + admit: admissionControl, + mapper: mapper, + info: &APIRequestInfoResolver{util.NewStringSet(root), latest.RESTMapper}, } } @@ -139,7 +148,12 @@ func NewAPIGroupVersion(storage map[string]RESTStorage, codec runtime.Codec, api // in a slash. A restful WebService is created for the group and version. func (g *APIGroupVersion) InstallREST(container *restful.Container, root string, version string) error { prefix := path.Join(root, version) - ws, registrationErrors := (&APIInstaller{prefix, version, &g.handler, g.mapper}).Install() + installer := &APIInstaller{ + group: g, + prefix: prefix, + version: version, + } + ws, registrationErrors := installer.Install() container.Add(ws) return errors.NewAggregate(registrationErrors) } @@ -186,15 +200,15 @@ func AddApiWebService(container *restful.Container, apiPrefix string, versions [ // TODO: InstallREST should register each version automatically versionHandler := APIVersionHandler(versions[:]...) - getApiVersionsWebService := new(restful.WebService) - getApiVersionsWebService.Path(apiPrefix) - getApiVersionsWebService.Doc("get available api versions") - getApiVersionsWebService.Route(getApiVersionsWebService.GET("/").To(versionHandler). - Doc("get available api versions"). - Operation("getApiVersions"). + ws := new(restful.WebService) + ws.Path(apiPrefix) + ws.Doc("get available API versions") + ws.Route(ws.GET("/").To(versionHandler). + Doc("get available API versions"). + Operation("getAPIVersions"). Produces(restful.MIME_JSON). Consumes(restful.MIME_JSON)) - container.Add(getApiVersionsWebService) + container.Add(ws) } // handleVersion writes the server's version information. diff --git a/pkg/apiserver/apiserver_test.go b/pkg/apiserver/apiserver_test.go index 2adca83fbce..55be37e4f30 100644 --- a/pkg/apiserver/apiserver_test.go +++ b/pkg/apiserver/apiserver_test.go @@ -180,17 +180,17 @@ func (storage *SimpleRESTStorage) Get(ctx api.Context, id string) (runtime.Objec return api.Scheme.CopyOrDie(&storage.item), storage.errors["get"] } -func (storage *SimpleRESTStorage) Delete(ctx api.Context, id string) (<-chan RESTResult, error) { +func (storage *SimpleRESTStorage) Delete(ctx api.Context, id string) (runtime.Object, error) { storage.deleted = id if err := storage.errors["delete"]; err != nil { return nil, err } - return MakeAsync(func() (runtime.Object, error) { - if storage.injectedFunction != nil { - return storage.injectedFunction(&Simple{ObjectMeta: api.ObjectMeta{Name: id}}) - } - return &api.Status{Status: api.StatusSuccess}, nil - }), nil + var obj runtime.Object = &api.Status{Status: api.StatusSuccess} + var err error + if storage.injectedFunction != nil { + obj, err = storage.injectedFunction(&Simple{ObjectMeta: api.ObjectMeta{Name: id}}) + } + return obj, err } func (storage *SimpleRESTStorage) New() runtime.Object { @@ -201,30 +201,28 @@ func (storage *SimpleRESTStorage) NewList() runtime.Object { return &SimpleList{} } -func (storage *SimpleRESTStorage) Create(ctx api.Context, obj runtime.Object) (<-chan RESTResult, error) { +func (storage *SimpleRESTStorage) Create(ctx api.Context, obj runtime.Object) (runtime.Object, error) { storage.created = obj.(*Simple) if err := storage.errors["create"]; err != nil { return nil, err } - return MakeAsync(func() (runtime.Object, error) { - if storage.injectedFunction != nil { - return storage.injectedFunction(obj) - } - return obj, nil - }), nil + var err error + if storage.injectedFunction != nil { + obj, err = storage.injectedFunction(obj) + } + return obj, err } -func (storage *SimpleRESTStorage) Update(ctx api.Context, obj runtime.Object) (<-chan RESTResult, error) { +func (storage *SimpleRESTStorage) Update(ctx api.Context, obj runtime.Object) (runtime.Object, bool, error) { storage.updated = obj.(*Simple) if err := storage.errors["update"]; err != nil { - return nil, err + return nil, false, err } - return MakeAsync(func() (runtime.Object, error) { - if storage.injectedFunction != nil { - return storage.injectedFunction(obj) - } - return obj, nil - }), nil + var err error + if storage.injectedFunction != nil { + obj, err = storage.injectedFunction(obj) + } + return obj, false, err } // Implement ResourceWatcher. @@ -489,7 +487,9 @@ func TestGet(t *testing.T) { } selfLinker := &setTestSelfLinker{ t: t, - expectedSet: "/prefix/version/simple/id", + expectedSet: "/prefix/version/simple/id?namespace=default", + name: "id", + namespace: "default", } storage["simple"] = &simpleStorage handler := Handle(storage, codec, "/prefix", testVersion, selfLinker, admissionControl, mapper) @@ -497,6 +497,12 @@ func TestGet(t *testing.T) { defer server.Close() resp, err := http.Get(server.URL + "/prefix/version/simple/id") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if resp.StatusCode != http.StatusOK { + t.Fatalf("unexpected response: %#v", resp) + } var itemOut Simple body, err := extractBody(resp, &itemOut) if err != nil { @@ -511,6 +517,81 @@ func TestGet(t *testing.T) { } } +func TestGetAlternateSelfLink(t *testing.T) { + storage := map[string]RESTStorage{} + simpleStorage := SimpleRESTStorage{ + item: Simple{ + Other: "foo", + }, + } + selfLinker := &setTestSelfLinker{ + t: t, + expectedSet: "/prefix/version/simple/id?namespace=test", + name: "id", + namespace: "test", + } + storage["simple"] = &simpleStorage + handler := Handle(storage, codec, "/prefix", testVersion, selfLinker, admissionControl, legacyNamespaceMapper) + server := httptest.NewServer(handler) + defer server.Close() + + resp, err := http.Get(server.URL + "/prefix/version/simple/id?namespace=test") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if resp.StatusCode != http.StatusOK { + t.Fatalf("unexpected response: %#v", resp) + } + var itemOut Simple + body, err := extractBody(resp, &itemOut) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if itemOut.Name != simpleStorage.item.Name { + t.Errorf("Unexpected data: %#v, expected %#v (%s)", itemOut, simpleStorage.item, string(body)) + } + if !selfLinker.called { + t.Errorf("Never set self link") + } +} + +func TestGetNamespaceSelfLink(t *testing.T) { + storage := map[string]RESTStorage{} + simpleStorage := SimpleRESTStorage{ + item: Simple{ + Other: "foo", + }, + } + selfLinker := &setTestSelfLinker{ + t: t, + expectedSet: "/prefix/version/namespaces/foo/simple/id", + name: "id", + namespace: "foo", + } + storage["simple"] = &simpleStorage + handler := Handle(storage, codec, "/prefix", testVersion, selfLinker, admissionControl, namespaceMapper) + server := httptest.NewServer(handler) + defer server.Close() + + resp, err := http.Get(server.URL + "/prefix/version/namespaces/foo/simple/id") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if resp.StatusCode != http.StatusOK { + t.Fatalf("unexpected response: %#v", resp) + } + var itemOut Simple + body, err := extractBody(resp, &itemOut) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if itemOut.Name != simpleStorage.item.Name { + t.Errorf("Unexpected data: %#v, expected %#v (%s)", itemOut, simpleStorage.item, string(body)) + } + if !selfLinker.called { + t.Errorf("Never set self link") + } +} func TestGetMissing(t *testing.T) { storage := map[string]RESTStorage{} simpleStorage := SimpleRESTStorage{ @@ -542,11 +623,13 @@ func TestDelete(t *testing.T) { client := http.Client{} request, err := http.NewRequest("DELETE", server.URL+"/prefix/version/simple/"+ID, nil) - _, err = client.Do(request) + res, err := client.Do(request) if err != nil { - t.Errorf("unexpected error: %v", err) + t.Fatalf("unexpected error: %v", err) + } + if res.StatusCode != http.StatusOK { + t.Errorf("unexpected response: %#v", res) } - if simpleStorage.deleted != ID { t.Errorf("Unexpected delete: %s, expected %s", simpleStorage.deleted, ID) } @@ -602,13 +685,19 @@ func TestUpdate(t *testing.T) { storage["simple"] = &simpleStorage selfLinker := &setTestSelfLinker{ t: t, - expectedSet: "/prefix/version/simple/" + ID, + expectedSet: "/prefix/version/simple/" + ID + "?namespace=default", + name: ID, + namespace: api.NamespaceDefault, } handler := Handle(storage, codec, "/prefix", testVersion, selfLinker, admissionControl, mapper) server := httptest.NewServer(handler) defer server.Close() item := &Simple{ + ObjectMeta: api.ObjectMeta{ + Name: ID, + Namespace: "", // update should allow the client to send an empty namespace + }, Other: "bar", } body, err := codec.Encode(item) @@ -637,15 +726,15 @@ func TestUpdateInvokesAdmissionControl(t *testing.T) { simpleStorage := SimpleRESTStorage{} ID := "id" storage["simple"] = &simpleStorage - selfLinker := &setTestSelfLinker{ - t: t, - expectedSet: "/prefix/version/simple/" + ID, - } handler := Handle(storage, codec, "/prefix", testVersion, selfLinker, deny.NewAlwaysDeny(), mapper) server := httptest.NewServer(handler) defer server.Close() item := &Simple{ + ObjectMeta: api.ObjectMeta{ + Name: ID, + Namespace: api.NamespaceDefault, + }, Other: "bar", } body, err := codec.Encode(item) @@ -665,6 +754,100 @@ func TestUpdateInvokesAdmissionControl(t *testing.T) { } } +func TestUpdateRequiresMatchingName(t *testing.T) { + storage := map[string]RESTStorage{} + simpleStorage := SimpleRESTStorage{} + ID := "id" + storage["simple"] = &simpleStorage + handler := Handle(storage, codec, "/prefix", testVersion, selfLinker, deny.NewAlwaysDeny(), mapper) + server := httptest.NewServer(handler) + defer server.Close() + + item := &Simple{ + Other: "bar", + } + body, err := codec.Encode(item) + if err != nil { + // The following cases will fail, so die now + t.Fatalf("unexpected error: %v", err) + } + + client := http.Client{} + request, err := http.NewRequest("PUT", server.URL+"/prefix/version/simple/"+ID, bytes.NewReader(body)) + response, err := client.Do(request) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + if response.StatusCode != http.StatusBadRequest { + t.Errorf("Unexpected response %#v", response) + } +} + +func TestUpdateAllowsMissingNamespace(t *testing.T) { + storage := map[string]RESTStorage{} + simpleStorage := SimpleRESTStorage{} + ID := "id" + storage["simple"] = &simpleStorage + handler := Handle(storage, codec, "/prefix", testVersion, selfLinker, admissionControl, mapper) + server := httptest.NewServer(handler) + defer server.Close() + + item := &Simple{ + ObjectMeta: api.ObjectMeta{ + Name: ID, + }, + Other: "bar", + } + body, err := codec.Encode(item) + if err != nil { + // The following cases will fail, so die now + t.Fatalf("unexpected error: %v", err) + } + + client := http.Client{} + request, err := http.NewRequest("PUT", server.URL+"/prefix/version/simple/"+ID, bytes.NewReader(body)) + response, err := client.Do(request) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + if response.StatusCode != http.StatusOK { + t.Errorf("Unexpected response %#v", response) + } +} + +func TestUpdatePreventsMismatchedNamespace(t *testing.T) { + storage := map[string]RESTStorage{} + simpleStorage := SimpleRESTStorage{} + ID := "id" + storage["simple"] = &simpleStorage + handler := Handle(storage, codec, "/prefix", testVersion, selfLinker, admissionControl, mapper) + server := httptest.NewServer(handler) + defer server.Close() + + item := &Simple{ + ObjectMeta: api.ObjectMeta{ + Name: ID, + Namespace: "other", + }, + Other: "bar", + } + body, err := codec.Encode(item) + if err != nil { + // The following cases will fail, so die now + t.Fatalf("unexpected error: %v", err) + } + + client := http.Client{} + request, err := http.NewRequest("PUT", server.URL+"/prefix/version/simple/"+ID, bytes.NewReader(body)) + response, err := client.Do(request) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + if response.StatusCode != http.StatusBadRequest { + t.Errorf("Unexpected response %#v", response) + } +} + func TestUpdateMissing(t *testing.T) { storage := map[string]RESTStorage{} ID := "id" @@ -677,6 +860,10 @@ func TestUpdateMissing(t *testing.T) { defer server.Close() item := &Simple{ + ObjectMeta: api.ObjectMeta{ + Name: ID, + Namespace: api.NamespaceDefault, + }, Other: "bar", } body, err := codec.Encode(item) @@ -690,7 +877,6 @@ func TestUpdateMissing(t *testing.T) { if err != nil { t.Errorf("unexpected error: %v", err) } - if response.StatusCode != http.StatusNotFound { t.Errorf("Unexpected response %#v", response) } @@ -806,7 +992,7 @@ func TestCreate(t *testing.T) { if !reflect.DeepEqual(&itemOut, simple) { t.Errorf("Unexpected data: %#v, expected %#v (%s)", itemOut, simple, string(body)) } - if response.StatusCode != http.StatusOK { + if response.StatusCode != http.StatusCreated { t.Errorf("Unexpected status: %d, Expected: %d, %#v", response.StatusCode, http.StatusOK, response) } if !selfLinker.called { @@ -961,7 +1147,7 @@ func TestCreateTimeout(t *testing.T) { simple := &Simple{Other: "foo"} data, _ := codec.Encode(simple) - itemOut := expectApiStatus(t, "POST", server.URL+"/prefix/version/foo?timeout=4ms", data, http.StatusAccepted) + itemOut := expectApiStatus(t, "POST", server.URL+"/prefix/version/foo?timeout=4ms", data, apierrs.StatusTryAgainLater) if itemOut.Status != api.StatusFailure || itemOut.Reason != api.StatusReasonTimeout { t.Errorf("Unexpected status %#v", itemOut) } diff --git a/pkg/apiserver/async.go b/pkg/apiserver/async.go index a96a151be34..ca1d0aa32cf 100644 --- a/pkg/apiserver/async.go +++ b/pkg/apiserver/async.go @@ -45,28 +45,3 @@ func MakeAsync(fn WorkFunc) <-chan RESTResult { }() return channel } - -// WorkFunc is used to perform any time consuming work for an api call, after -// the input has been validated. Pass one of these to MakeAsync to create an -// appropriate return value for the Update, Delete, and Create methods. -type WorkResultFunc func() (result RESTResult, err error) - -// MakeAsync takes a function and executes it, delivering the result in the way required -// by RESTStorage's Update, Delete, and Create methods. -func MakeAsyncResult(fn WorkResultFunc) <-chan RESTResult { - channel := make(chan RESTResult) - go func() { - defer util.HandleCrash() - obj, err := fn() - if err != nil { - channel <- RESTResult{Object: errToAPIStatus(err)} - } else { - channel <- obj - } - // 'close' is used to signal that no further values will - // be written to the channel. Not strictly necessary, but - // also won't hurt. - close(channel) - }() - return channel -} diff --git a/pkg/apiserver/errors.go b/pkg/apiserver/errors.go index 91b7e564147..a05d7afffb7 100644 --- a/pkg/apiserver/errors.go +++ b/pkg/apiserver/errors.go @@ -35,7 +35,21 @@ func errToAPIStatus(err error) *api.Status { switch t := err.(type) { case statusError: status := t.Status() - status.Status = api.StatusFailure + if len(status.Status) == 0 { + } + switch status.Status { + case api.StatusSuccess: + if status.Code == 0 { + status.Code = http.StatusOK + } + case "": + status.Status = api.StatusFailure + fallthrough + case api.StatusFailure: + if status.Code == 0 { + status.Code = http.StatusInternalServerError + } + } //TODO: check for invalid responses return &status default: diff --git a/pkg/apiserver/interfaces.go b/pkg/apiserver/interfaces.go index 94f58a6747e..233bd12236e 100644 --- a/pkg/apiserver/interfaces.go +++ b/pkg/apiserver/interfaces.go @@ -52,20 +52,29 @@ type RESTDeleter interface { // Delete finds a resource in the storage and deletes it. // Although it can return an arbitrary error value, IsNotFound(err) is true for the // returned error value err when the specified resource is not found. - Delete(ctx api.Context, id string) (<-chan RESTResult, error) + // Delete *may* return the object that was deleted, or a status object indicating additional + // information about deletion. + Delete(ctx api.Context, id string) (runtime.Object, error) } type RESTCreater interface { + // New returns an empty object that can be used with Create after request data has been put into it. + // This object must be a pointer type for use with Codec.DecodeInto([]byte, runtime.Object) + New() runtime.Object + // Create creates a new version of a resource. - Create(ctx api.Context, obj runtime.Object) (<-chan RESTResult, error) + Create(ctx api.Context, obj runtime.Object) (runtime.Object, error) } type RESTUpdater interface { + // New returns an empty object that can be used with Update after request data has been put into it. + // This object must be a pointer type for use with Codec.DecodeInto([]byte, runtime.Object) + New() runtime.Object + // Update finds a resource in the storage and updates it. Some implementations - // may allow updates creates the object - they should set the Created flag of - // the returned RESTResultto true. In the event of an asynchronous error returned - // via an api.Status object, the Created flag is ignored. - Update(ctx api.Context, obj runtime.Object) (<-chan RESTResult, error) + // may allow updates creates the object - they should set the created boolean + // to true. + Update(ctx api.Context, obj runtime.Object) (runtime.Object, bool, error) } // RESTResult indicates the result of a REST transformation. diff --git a/pkg/apiserver/resthandler.go b/pkg/apiserver/resthandler.go index e8cb3a35e92..b09be207367 100644 --- a/pkg/apiserver/resthandler.go +++ b/pkg/apiserver/resthandler.go @@ -18,7 +18,6 @@ package apiserver import ( "net/http" - "path" "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/admission" @@ -27,73 +26,325 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" - "github.com/golang/glog" + "github.com/emicklei/go-restful" ) -// RESTHandler implements HTTP verbs on a set of RESTful resources identified by name. -type RESTHandler struct { - storage map[string]RESTStorage - codec runtime.Codec - canonicalPrefix string - selfLinker runtime.SelfLinker - ops *Operations - admissionControl admission.Interface - apiRequestInfoResolver *APIRequestInfoResolver +// ResourceNameFunc returns a name (and optional namespace) given a request - if no name is present +// an error must be returned. +type ResourceNameFunc func(req *restful.Request) (namespace, name string, err error) + +// ObjectNameFunc returns the name (and optional namespace) of an object +type ObjectNameFunc func(obj runtime.Object) (namespace, name string, err error) + +// ResourceNamespaceFunc returns the namespace associated with the given request - if no namespace +// is present an error must be returned. +type ResourceNamespaceFunc func(req *restful.Request) (namespace string, err error) + +// LinkResourceFunc updates the provided object with a SelfLink that is appropriate for the current +// request. +type LinkResourceFunc func(req *restful.Request, obj runtime.Object) error + +// GetResource returns a function that handles retrieving a single resource from a RESTStorage object. +func GetResource(r RESTGetter, nameFn ResourceNameFunc, linkFn LinkResourceFunc, codec runtime.Codec) restful.RouteFunction { + return func(req *restful.Request, res *restful.Response) { + w := res.ResponseWriter + namespace, name, err := nameFn(req) + if err != nil { + notFound(w, req.Request) + return + } + ctx := api.NewContext() + if len(namespace) > 0 { + ctx = api.WithNamespace(ctx, namespace) + } + item, err := r.Get(ctx, name) + if err != nil { + errorJSON(err, codec, w) + return + } + if err := linkFn(req, item); err != nil { + errorJSON(err, codec, w) + return + } + writeJSON(http.StatusOK, codec, item, w) + } } -// ServeHTTP handles requests to all RESTStorage objects. -func (h *RESTHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { - var verb string - var apiResource string - var httpCode int - reqStart := time.Now() - defer func() { monitor("rest", verb, apiResource, httpCode, reqStart) }() +// ListResource returns a function that handles retrieving a list of resources from a RESTStorage object. +func ListResource(r RESTLister, namespaceFn ResourceNamespaceFunc, linkFn LinkResourceFunc, codec runtime.Codec) restful.RouteFunction { + return func(req *restful.Request, res *restful.Response) { + w := res.ResponseWriter - requestInfo, err := h.apiRequestInfoResolver.GetAPIRequestInfo(req) - if err != nil { - glog.Errorf("Unable to handle request %s %s %v", requestInfo.Namespace, requestInfo.Kind, err) - notFound(w, req) - httpCode = http.StatusNotFound - return + namespace, err := namespaceFn(req) + if err != nil { + notFound(w, req.Request) + return + } + ctx := api.NewContext() + if len(namespace) > 0 { + ctx = api.WithNamespace(ctx, namespace) + } + label, err := labels.ParseSelector(req.Request.URL.Query().Get("labels")) + if err != nil { + errorJSON(err, codec, w) + return + } + field, err := labels.ParseSelector(req.Request.URL.Query().Get("fields")) + if err != nil { + errorJSON(err, codec, w) + return + } + + item, err := r.List(ctx, label, field) + if err != nil { + errorJSON(err, codec, w) + return + } + if err := linkFn(req, item); err != nil { + errorJSON(err, codec, w) + return + } + writeJSON(http.StatusOK, codec, item, w) } - verb = requestInfo.Verb - - storage, ok := h.storage[requestInfo.Resource] - if !ok { - notFound(w, req) - httpCode = http.StatusNotFound - return - } - apiResource = requestInfo.Resource - - httpCode = h.handleRESTStorage(requestInfo.Parts, req, w, storage, requestInfo.Namespace, requestInfo.Resource) } -// Sets the SelfLink field of the object. -func (h *RESTHandler) setSelfLink(obj runtime.Object, req *http.Request) error { - newURL := *req.URL - newURL.Path = path.Join(h.canonicalPrefix, req.URL.Path) - newURL.RawQuery = "" - newURL.Fragment = "" - namespace, err := h.selfLinker.Namespace(obj) +// CreateResource returns a function that will handle a resource creation. +func CreateResource(r RESTCreater, namespaceFn ResourceNamespaceFunc, linkFn LinkResourceFunc, codec runtime.Codec, resource string, admit admission.Interface) restful.RouteFunction { + return func(req *restful.Request, res *restful.Response) { + w := res.ResponseWriter + + // TODO: we either want to remove timeout or document it (if we document, move timeout out of this function and declare it in api_installer) + timeout := parseTimeout(req.Request.URL.Query().Get("timeout")) + + namespace, err := namespaceFn(req) + if err != nil { + notFound(w, req.Request) + return + } + ctx := api.NewContext() + if len(namespace) > 0 { + ctx = api.WithNamespace(ctx, namespace) + } + + body, err := readBody(req.Request) + if err != nil { + errorJSON(err, codec, w) + return + } + + obj := r.New() + if err := codec.DecodeInto(body, obj); err != nil { + errorJSON(err, codec, w) + return + } + + err = admit.Admit(admission.NewAttributesRecord(obj, namespace, resource, "CREATE")) + if err != nil { + errorJSON(err, codec, w) + return + } + + result, err := finishRequest(timeout, func() (runtime.Object, error) { + return r.Create(ctx, obj) + }) + if err != nil { + errorJSON(err, codec, w) + return + } + + if err := linkFn(req, result); err != nil { + errorJSON(err, codec, w) + return + } + + writeJSON(http.StatusCreated, codec, result, w) + } +} + +// UpdateResource returns a function that will handle a resource update +func UpdateResource(r RESTUpdater, nameFn ResourceNameFunc, objNameFunc ObjectNameFunc, linkFn LinkResourceFunc, codec runtime.Codec, resource string, admit admission.Interface) restful.RouteFunction { + return func(req *restful.Request, res *restful.Response) { + w := res.ResponseWriter + + // TODO: we either want to remove timeout or document it (if we document, move timeout out of this function and declare it in api_installer) + timeout := parseTimeout(req.Request.URL.Query().Get("timeout")) + + namespace, name, err := nameFn(req) + if err != nil { + notFound(w, req.Request) + return + } + ctx := api.NewContext() + if len(namespace) > 0 { + ctx = api.WithNamespace(ctx, namespace) + } + + body, err := readBody(req.Request) + if err != nil { + errorJSON(err, codec, w) + return + } + + obj := r.New() + if err := codec.DecodeInto(body, obj); err != nil { + errorJSON(err, codec, w) + return + } + + objNamespace, objName, err := objNameFunc(obj) + if err != nil { + errorJSON(err, codec, w) + return + } + if objName != name { + errorJSON(errors.NewBadRequest("the name of the object does not match the name on the URL"), codec, w) + return + } + if len(namespace) > 0 { + if len(objNamespace) > 0 && objNamespace != namespace { + errorJSON(errors.NewBadRequest("the namespace of the object does not match the namespace on the request"), codec, w) + return + } + } + + err = admit.Admit(admission.NewAttributesRecord(obj, namespace, resource, "UPDATE")) + if err != nil { + errorJSON(err, codec, w) + return + } + + wasCreated := false + result, err := finishRequest(timeout, func() (runtime.Object, error) { + obj, created, err := r.Update(ctx, obj) + wasCreated = created + return obj, err + }) + if err != nil { + errorJSON(err, codec, w) + return + } + + if err := linkFn(req, result); err != nil { + errorJSON(err, codec, w) + return + } + + status := http.StatusOK + if wasCreated { + status = http.StatusCreated + } + writeJSON(status, codec, result, w) + } +} + +// DeleteResource returns a function that will handle a resource deletion +func DeleteResource(r RESTDeleter, nameFn ResourceNameFunc, linkFn LinkResourceFunc, codec runtime.Codec, resource, kind string, admit admission.Interface) restful.RouteFunction { + return func(req *restful.Request, res *restful.Response) { + w := res.ResponseWriter + + // TODO: we either want to remove timeout or document it (if we document, move timeout out of this function and declare it in api_installer) + timeout := parseTimeout(req.Request.URL.Query().Get("timeout")) + + namespace, name, err := nameFn(req) + if err != nil { + notFound(w, req.Request) + return + } + ctx := api.NewContext() + if len(namespace) > 0 { + ctx = api.WithNamespace(ctx, namespace) + } + + err = admit.Admit(admission.NewAttributesRecord(nil, namespace, resource, "DELETE")) + if err != nil { + errorJSON(err, codec, w) + return + } + + result, err := finishRequest(timeout, func() (runtime.Object, error) { + return r.Delete(ctx, name) + }) + if err != nil { + errorJSON(err, codec, w) + return + } + + // if the RESTDeleter returns a nil object, fill out a status. Callers may return a valid + // object with the response. + if result == nil { + result = &api.Status{ + Status: api.StatusSuccess, + Code: http.StatusOK, + Details: &api.StatusDetails{ + ID: name, + Kind: kind, + }, + } + } else { + // when a non-status response is returned, set the self link + if _, ok := result.(*api.Status); !ok { + if err := linkFn(req, result); err != nil { + errorJSON(err, codec, w) + return + } + } + } + writeJSON(http.StatusOK, codec, result, w) + } +} + +// resultFunc is a function that returns a rest result and can be run in a goroutine +type resultFunc func() (runtime.Object, error) + +// finishRequest makes a given resultFunc asynchronous and handles errors returned by the response. +// Any api.Status object returned is considered an "error", which interrupts the normal response flow. +func finishRequest(timeout time.Duration, fn resultFunc) (result runtime.Object, err error) { + ch := make(chan runtime.Object) + errCh := make(chan error) + go func() { + if result, err := fn(); err != nil { + errCh <- err + } else { + ch <- result + } + }() + + select { + case result = <-ch: + if status, ok := result.(*api.Status); ok { + return nil, errors.FromObject(status) + } + return result, nil + case err = <-errCh: + return nil, err + case <-time.After(timeout): + return nil, errors.NewTimeoutError("request did not complete within allowed duration") + } +} + +type linkFunc func(namespace, name string) (path string, query string) + +// setSelfLink sets the self link of an object (or the child items in a list) to the base URL of the request +// plus the path and query generated by the provided linkFunc +func setSelfLink(obj runtime.Object, req *http.Request, linker runtime.SelfLinker, fn linkFunc) error { + namespace, err := linker.Namespace(obj) if err != nil { return err } - - // we need to add namespace as a query param, if its not in the resource path - if len(namespace) > 0 { - parts := splitPath(req.URL.Path) - if parts[0] != "ns" { - query := newURL.Query() - query.Set("namespace", namespace) - newURL.RawQuery = query.Encode() - } - } - - err = h.selfLinker.SetSelfLink(obj, newURL.String()) + name, err := linker.Name(obj) if err != nil { return err } + path, query := fn(namespace, name) + + newURL := *req.URL + newURL.Path = path + newURL.RawQuery = query + newURL.Fragment = "" + + if err := linker.SetSelfLink(obj, newURL.String()); err != nil { + return err + } if !runtime.IsListType(obj) { return nil } @@ -104,231 +355,9 @@ func (h *RESTHandler) setSelfLink(obj runtime.Object, req *http.Request) error { return err } for i := range items { - if err := h.setSelfLinkAddName(items[i], req); err != nil { + if err := setSelfLink(items[i], req, linker, fn); err != nil { return err } } return runtime.SetList(obj, items) } - -// Like setSelfLink, but appends the object's name. -func (h *RESTHandler) setSelfLinkAddName(obj runtime.Object, req *http.Request) error { - name, err := h.selfLinker.Name(obj) - if err != nil { - return err - } - namespace, err := h.selfLinker.Namespace(obj) - if err != nil { - return err - } - newURL := *req.URL - newURL.Path = path.Join(h.canonicalPrefix, req.URL.Path, name) - newURL.RawQuery = "" - newURL.Fragment = "" - // we need to add namespace as a query param, if its not in the resource path - if len(namespace) > 0 { - parts := splitPath(req.URL.Path) - if parts[0] != "ns" { - query := newURL.Query() - query.Set("namespace", namespace) - newURL.RawQuery = query.Encode() - } - } - return h.selfLinker.SetSelfLink(obj, newURL.String()) -} - -// curry adapts either of the self link setting functions into a function appropriate for operation's hook. -func curry(f func(runtime.Object, *http.Request) error, req *http.Request) func(RESTResult) { - return func(obj RESTResult) { - if err := f(obj.Object, req); err != nil { - glog.Errorf("unable to set self link for %#v: %v", obj, err) - } - } -} - -// handleRESTStorage is the main dispatcher for a storage object. It switches on the HTTP method, and then -// on path length, according to the following table: -// Method Path Action -// GET /foo list -// GET /foo/bar get 'bar' -// POST /foo create -// PUT /foo/bar update 'bar' -// DELETE /foo/bar delete 'bar' -// Responds with a 404 if the method/pattern doesn't match one of these entries. -// The s accepts several query parameters: -// timeout= Timeout for synchronous requests -// labels= Used for filtering list operations -// Returns the HTTP status code written to the response. -func (h *RESTHandler) handleRESTStorage(parts []string, req *http.Request, w http.ResponseWriter, storage RESTStorage, namespace, kind string) int { - ctx := api.WithNamespace(api.NewContext(), namespace) - // TODO: Document the timeout query parameter. - timeout := parseTimeout(req.URL.Query().Get("timeout")) - switch req.Method { - case "GET": - switch len(parts) { - case 1: - label, err := labels.ParseSelector(req.URL.Query().Get("labels")) - if err != nil { - return errorJSON(err, h.codec, w) - } - field, err := labels.ParseSelector(req.URL.Query().Get("fields")) - if err != nil { - return errorJSON(err, h.codec, w) - } - lister, ok := storage.(RESTLister) - if !ok { - return errorJSON(errors.NewMethodNotSupported(kind, "list"), h.codec, w) - } - list, err := lister.List(ctx, label, field) - if err != nil { - return errorJSON(err, h.codec, w) - } - if err := h.setSelfLink(list, req); err != nil { - return errorJSON(err, h.codec, w) - } - writeJSON(http.StatusOK, h.codec, list, w) - case 2: - getter, ok := storage.(RESTGetter) - if !ok { - return errorJSON(errors.NewMethodNotSupported(kind, "get"), h.codec, w) - } - item, err := getter.Get(ctx, parts[1]) - if err != nil { - return errorJSON(err, h.codec, w) - } - if err := h.setSelfLink(item, req); err != nil { - return errorJSON(err, h.codec, w) - } - writeJSON(http.StatusOK, h.codec, item, w) - default: - notFound(w, req) - return http.StatusNotFound - } - - case "POST": - if len(parts) != 1 { - notFound(w, req) - return http.StatusNotFound - } - creater, ok := storage.(RESTCreater) - if !ok { - return errorJSON(errors.NewMethodNotSupported(kind, "create"), h.codec, w) - } - - body, err := readBody(req) - if err != nil { - return errorJSON(err, h.codec, w) - } - obj := storage.New() - err = h.codec.DecodeInto(body, obj) - if err != nil { - return errorJSON(err, h.codec, w) - } - - // invoke admission control - err = h.admissionControl.Admit(admission.NewAttributesRecord(obj, namespace, parts[0], "CREATE")) - if err != nil { - return errorJSON(err, h.codec, w) - } - - out, err := creater.Create(ctx, obj) - if err != nil { - return errorJSON(err, h.codec, w) - } - op := h.createOperation(out, timeout, curry(h.setSelfLinkAddName, req)) - return h.finishReq(op, req, w) - - case "DELETE": - if len(parts) != 2 { - notFound(w, req) - return http.StatusNotFound - } - deleter, ok := storage.(RESTDeleter) - if !ok { - return errorJSON(errors.NewMethodNotSupported(kind, "delete"), h.codec, w) - } - - // invoke admission control - err := h.admissionControl.Admit(admission.NewAttributesRecord(nil, namespace, parts[0], "DELETE")) - if err != nil { - return errorJSON(err, h.codec, w) - } - - out, err := deleter.Delete(ctx, parts[1]) - if err != nil { - return errorJSON(err, h.codec, w) - } - op := h.createOperation(out, timeout, nil) - return h.finishReq(op, req, w) - - case "PUT": - if len(parts) != 2 { - notFound(w, req) - return http.StatusNotFound - } - updater, ok := storage.(RESTUpdater) - if !ok { - return errorJSON(errors.NewMethodNotSupported(kind, "create"), h.codec, w) - } - - body, err := readBody(req) - if err != nil { - return errorJSON(err, h.codec, w) - } - obj := storage.New() - err = h.codec.DecodeInto(body, obj) - if err != nil { - return errorJSON(err, h.codec, w) - } - - // invoke admission control - err = h.admissionControl.Admit(admission.NewAttributesRecord(obj, namespace, parts[0], "UPDATE")) - if err != nil { - return errorJSON(err, h.codec, w) - } - - out, err := updater.Update(ctx, obj) - if err != nil { - return errorJSON(err, h.codec, w) - } - op := h.createOperation(out, timeout, curry(h.setSelfLink, req)) - return h.finishReq(op, req, w) - - default: - notFound(w, req) - return http.StatusNotFound - } - return http.StatusOK -} - -// createOperation creates an operation to process a channel response. -func (h *RESTHandler) createOperation(out <-chan RESTResult, timeout time.Duration, onReceive func(RESTResult)) *Operation { - op := h.ops.NewOperation(out, onReceive) - op.WaitFor(timeout) - return op -} - -// finishReq finishes up a request, waiting until the operation finishes or, after a timeout, creating an -// Operation to receive the result and returning its ID down the writer. -// Returns the HTTP status code written to the response. -func (h *RESTHandler) finishReq(op *Operation, req *http.Request, w http.ResponseWriter) int { - result, complete := op.StatusOrResult() - obj := result.Object - var status int - if complete { - status = http.StatusOK - if result.Created { - status = http.StatusCreated - } - switch stat := obj.(type) { - case *api.Status: - if stat.Code != 0 { - status = stat.Code - } - } - } else { - status = http.StatusAccepted - } - writeJSON(status, h.codec, obj, w) - return status -} diff --git a/pkg/apiserver/resthandler_test.go b/pkg/apiserver/resthandler_test.go deleted file mode 100644 index 2f471beda19..00000000000 --- a/pkg/apiserver/resthandler_test.go +++ /dev/null @@ -1,69 +0,0 @@ -/* -Copyright 2014 Google Inc. All rights reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package apiserver - -import ( - "encoding/json" - "net/http" - "net/http/httptest" - "testing" - "time" - - "github.com/GoogleCloudPlatform/kubernetes/pkg/api" -) - -func TestFinishReq(t *testing.T) { - handler := &RESTHandler{codec: api.Codec} - op := &Operation{finished: &time.Time{}, result: RESTResult{Object: &api.Status{Code: http.StatusNotFound}}} - resp := httptest.NewRecorder() - handler.finishReq(op, nil, resp) - status := &api.Status{} - if err := json.Unmarshal([]byte(resp.Body.String()), status); err != nil { - t.Fatalf("unexpected error: %v", err) - } - if resp.Code != http.StatusNotFound || status.Code != http.StatusNotFound { - t.Errorf("unexpected status: %#v", status) - } -} - -func TestFinishReqUnwrap(t *testing.T) { - handler := &RESTHandler{codec: api.Codec} - op := &Operation{finished: &time.Time{}, result: RESTResult{Created: true, Object: &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}}} - resp := httptest.NewRecorder() - handler.finishReq(op, nil, resp) - obj := &api.Pod{} - if err := json.Unmarshal([]byte(resp.Body.String()), obj); err != nil { - t.Fatalf("unexpected error: %v", err) - } - if resp.Code != http.StatusCreated || obj.Name != "foo" { - t.Errorf("unexpected object: %#v", obj) - } -} - -func TestFinishReqUnwrapStatus(t *testing.T) { - handler := &RESTHandler{codec: api.Codec} - op := &Operation{finished: &time.Time{}, result: RESTResult{Created: true, Object: &api.Status{Code: http.StatusNotFound}}} - resp := httptest.NewRecorder() - handler.finishReq(op, nil, resp) - obj := &api.Status{} - if err := json.Unmarshal([]byte(resp.Body.String()), obj); err != nil { - t.Fatalf("unexpected error: %v", err) - } - if resp.Code != http.StatusNotFound || obj.Code != http.StatusNotFound { - t.Errorf("unexpected object: %#v", obj) - } -} diff --git a/pkg/apiserver/watch.go b/pkg/apiserver/watch.go index 9f6578be991..ae3178504d0 100644 --- a/pkg/apiserver/watch.go +++ b/pkg/apiserver/watch.go @@ -37,24 +37,24 @@ import ( ) type WatchHandler struct { - storage map[string]RESTStorage - codec runtime.Codec - canonicalPrefix string - selfLinker runtime.SelfLinker - apiRequestInfoResolver *APIRequestInfoResolver + storage map[string]RESTStorage + codec runtime.Codec + prefix string + linker runtime.SelfLinker + info *APIRequestInfoResolver } // setSelfLinkAddName sets the self link, appending the object's name to the canonical path & type. func (h *WatchHandler) setSelfLinkAddName(obj runtime.Object, req *http.Request) error { - name, err := h.selfLinker.Name(obj) + name, err := h.linker.Name(obj) if err != nil { return err } newURL := *req.URL - newURL.Path = path.Join(h.canonicalPrefix, req.URL.Path, name) + newURL.Path = path.Join(h.prefix, req.URL.Path, name) newURL.RawQuery = "" newURL.Fragment = "" - return h.selfLinker.SetSelfLink(obj, newURL.String()) + return h.linker.SetSelfLink(obj, newURL.String()) } func getWatchParams(query url.Values) (label, field labels.Selector, resourceVersion string, err error) { @@ -96,7 +96,7 @@ func (h *WatchHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { return } - requestInfo, err := h.apiRequestInfoResolver.GetAPIRequestInfo(req) + requestInfo, err := h.info.GetAPIRequestInfo(req) if err != nil { notFound(w, req) httpCode = http.StatusNotFound diff --git a/pkg/master/publish.go b/pkg/master/publish.go index 47b9803f478..ced152954b9 100644 --- a/pkg/master/publish.go +++ b/pkg/master/publish.go @@ -17,7 +17,6 @@ limitations under the License. package master import ( - "fmt" "net" "strconv" "time" @@ -92,15 +91,8 @@ func (m *Master) createMasterNamespaceIfNeeded(ns string) error { Namespace: "", }, } - c, err := m.storage["namespaces"].(apiserver.RESTCreater).Create(ctx, namespace) - if err != nil { - return err - } - resp := <-c - if _, ok := resp.Object.(*api.Service); ok { - return nil - } - return fmt.Errorf("unexpected response %#v", resp) + _, err := m.storage["namespaces"].(apiserver.RESTCreater).Create(ctx, namespace) + return err } // createMasterServiceIfNeeded will create the specified service if it @@ -126,18 +118,8 @@ func (m *Master) createMasterServiceIfNeeded(serviceName string, serviceIP net.I SessionAffinity: api.AffinityTypeNone, }, } - // Kids, don't do this at home: this is a hack. There's no good way to call the business - // logic which lives in the REST object from here. - c, err := m.storage["services"].(apiserver.RESTCreater).Create(ctx, svc) - if err != nil { - return err - } - resp := <-c - if _, ok := resp.Object.(*api.Service); ok { - // If all worked, we get back an *api.Service object. - return nil - } - return fmt.Errorf("unexpected response: %#v", resp.Object) + _, err := m.storage["services"].(apiserver.RESTCreater).Create(ctx, svc) + return err } // ensureEndpointsContain sets the endpoints for the given service. Also removes diff --git a/pkg/registry/binding/rest.go b/pkg/registry/binding/rest.go index 52907852c93..42d7970870e 100644 --- a/pkg/registry/binding/rest.go +++ b/pkg/registry/binding/rest.go @@ -18,9 +18,9 @@ package binding import ( "fmt" + "net/http" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" - "github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" ) @@ -44,15 +44,13 @@ func (*REST) New() runtime.Object { } // Create attempts to make the assignment indicated by the binding it recieves. -func (b *REST) Create(ctx api.Context, obj runtime.Object) (<-chan apiserver.RESTResult, error) { +func (b *REST) Create(ctx api.Context, obj runtime.Object) (runtime.Object, error) { binding, ok := obj.(*api.Binding) if !ok { return nil, fmt.Errorf("incorrect type: %#v", obj) } - return apiserver.MakeAsync(func() (runtime.Object, error) { - if err := b.registry.ApplyBinding(ctx, binding); err != nil { - return nil, err - } - return &api.Status{Status: api.StatusSuccess}, nil - }), nil + if err := b.registry.ApplyBinding(ctx, binding); err != nil { + return nil, err + } + return &api.Status{Status: api.StatusSuccess, Code: http.StatusCreated}, nil } diff --git a/pkg/registry/binding/rest_test.go b/pkg/registry/binding/rest_test.go index 134d3df5481..5ddac234105 100644 --- a/pkg/registry/binding/rest_test.go +++ b/pkg/registry/binding/rest_test.go @@ -71,22 +71,20 @@ func TestRESTPost(t *testing.T) { } ctx := api.NewContext() b := NewREST(mockRegistry) - resultChan, err := b.Create(ctx, item.b) - if err != nil { + result, err := b.Create(ctx, item.b) + if err != nil && item.err == nil { t.Errorf("Unexpected error %v", err) continue } - var expect *api.Status - if item.err == nil { - expect = &api.Status{Status: api.StatusSuccess} - } else { - expect = &api.Status{ - Status: api.StatusFailure, - Code: http.StatusInternalServerError, - Message: item.err.Error(), - } + if err == nil && item.err != nil { + t.Errorf("Unexpected error %v", err) + continue } - if e, a := expect, (<-resultChan).Object; !reflect.DeepEqual(e, a) { + var expect interface{} + if item.err == nil { + expect = &api.Status{Status: api.StatusSuccess, Code: http.StatusCreated} + } + if e, a := expect, result; !reflect.DeepEqual(e, a) { t.Errorf("%v: expected %#v, got %#v", i, e, a) } } diff --git a/pkg/registry/controller/rest.go b/pkg/registry/controller/rest.go index 94c13664ef1..4ea56e49618 100644 --- a/pkg/registry/controller/rest.go +++ b/pkg/registry/controller/rest.go @@ -50,7 +50,7 @@ func NewREST(registry Registry, podLister PodLister) *REST { } // Create registers the given ReplicationController. -func (rs *REST) Create(ctx api.Context, obj runtime.Object) (<-chan apiserver.RESTResult, error) { +func (rs *REST) Create(ctx api.Context, obj runtime.Object) (runtime.Object, error) { controller, ok := obj.(*api.ReplicationController) if !ok { return nil, fmt.Errorf("not a replication controller: %#v", obj) @@ -60,20 +60,16 @@ func (rs *REST) Create(ctx api.Context, obj runtime.Object) (<-chan apiserver.RE return nil, err } - return apiserver.MakeAsync(func() (runtime.Object, error) { - if err := rs.registry.CreateController(ctx, controller); err != nil { - err = rest.CheckGeneratedNameError(rest.ReplicationControllers, err, controller) - return apiserver.RESTResult{}, err - } - return rs.registry.GetController(ctx, controller.Name) - }), nil + if err := rs.registry.CreateController(ctx, controller); err != nil { + err = rest.CheckGeneratedNameError(rest.ReplicationControllers, err, controller) + return apiserver.RESTResult{}, err + } + return rs.registry.GetController(ctx, controller.Name) } // Delete asynchronously deletes the ReplicationController specified by its id. -func (rs *REST) Delete(ctx api.Context, id string) (<-chan apiserver.RESTResult, error) { - return apiserver.MakeAsync(func() (runtime.Object, error) { - return &api.Status{Status: api.StatusSuccess}, rs.registry.DeleteController(ctx, id) - }), nil +func (rs *REST) Delete(ctx api.Context, id string) (runtime.Object, error) { + return &api.Status{Status: api.StatusSuccess}, rs.registry.DeleteController(ctx, id) } // Get obtains the ReplicationController specified by its id. @@ -117,24 +113,23 @@ func (*REST) NewList() runtime.Object { // Update replaces a given ReplicationController instance with an existing // instance in storage.registry. -func (rs *REST) Update(ctx api.Context, obj runtime.Object) (<-chan apiserver.RESTResult, error) { +func (rs *REST) Update(ctx api.Context, obj runtime.Object) (runtime.Object, bool, error) { controller, ok := obj.(*api.ReplicationController) if !ok { - return nil, fmt.Errorf("not a replication controller: %#v", obj) + return nil, false, fmt.Errorf("not a replication controller: %#v", obj) } if !api.ValidNamespace(ctx, &controller.ObjectMeta) { - return nil, errors.NewConflict("controller", controller.Namespace, fmt.Errorf("Controller.Namespace does not match the provided context")) + return nil, false, errors.NewConflict("controller", controller.Namespace, fmt.Errorf("Controller.Namespace does not match the provided context")) } if errs := validation.ValidateReplicationController(controller); len(errs) > 0 { - return nil, errors.NewInvalid("replicationController", controller.Name, errs) + return nil, false, errors.NewInvalid("replicationController", controller.Name, errs) } - return apiserver.MakeAsync(func() (runtime.Object, error) { - err := rs.registry.UpdateController(ctx, controller) - if err != nil { - return nil, err - } - return rs.registry.GetController(ctx, controller.Name) - }), nil + err := rs.registry.UpdateController(ctx, controller) + if err != nil { + return nil, false, err + } + out, err := rs.registry.GetController(ctx, controller.Name) + return out, false, err } // Watch returns ReplicationController events via a watch.Interface. diff --git a/pkg/registry/controller/rest_test.go b/pkg/registry/controller/rest_test.go index 577b1061415..9616c654210 100644 --- a/pkg/registry/controller/rest_test.go +++ b/pkg/registry/controller/rest_test.go @@ -22,7 +22,6 @@ import ( "io/ioutil" "strings" "testing" - "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" @@ -268,23 +267,17 @@ func TestCreateController(t *testing.T) { }, } ctx := api.NewDefaultContext() - channel, err := storage.Create(ctx, controller) + obj, err := storage.Create(ctx, controller) if err != nil { t.Fatalf("Unexpected error: %v", err) } - if err != nil { - t.Errorf("unexpected error: %v", err) + if obj == nil { + t.Errorf("unexpected object") } if !api.HasObjectMetaSystemFieldValues(&controller.ObjectMeta) { t.Errorf("storage did not populate object meta field values") } - select { - case <-channel: - // expected case - case <-time.After(time.Millisecond * 100): - t.Error("Unexpected timeout from async channel") - } } // TODO: remove, covered by TestCreate @@ -338,9 +331,9 @@ func TestControllerStorageValidatesUpdate(t *testing.T) { } ctx := api.NewDefaultContext() for _, failureCase := range failureCases { - c, err := storage.Update(ctx, &failureCase) - if c != nil { - t.Errorf("Expected nil channel") + c, created, err := storage.Update(ctx, &failureCase) + if c != nil || created { + t.Errorf("Expected nil object and not created") } if !errors.IsInvalid(err) { t.Errorf("Expected to get an invalid resource error, got %v", err) @@ -441,9 +434,9 @@ func TestUpdateControllerWithConflictingNamespace(t *testing.T) { } ctx := api.NewDefaultContext() - channel, err := storage.Update(ctx, controller) - if channel != nil { - t.Error("Expected a nil channel, but we got a value") + obj, created, err := storage.Update(ctx, controller) + if obj != nil || created { + t.Error("Expected a nil object, but we got a value or created was true") } if err == nil { t.Errorf("Expected an error, but we didn't get one") diff --git a/pkg/registry/endpoint/rest.go b/pkg/registry/endpoint/rest.go index 40a7831fc23..147a9df248c 100644 --- a/pkg/registry/endpoint/rest.go +++ b/pkg/registry/endpoint/rest.go @@ -21,7 +21,6 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" - "github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" @@ -59,7 +58,7 @@ func (rs *REST) Watch(ctx api.Context, label, field labels.Selector, resourceVer } // Create satisfies the RESTStorage interface. -func (rs *REST) Create(ctx api.Context, obj runtime.Object) (<-chan apiserver.RESTResult, error) { +func (rs *REST) Create(ctx api.Context, obj runtime.Object) (runtime.Object, error) { endpoints, ok := obj.(*api.Endpoints) if !ok { return nil, fmt.Errorf("not an endpoints: %#v", obj) @@ -72,28 +71,25 @@ func (rs *REST) Create(ctx api.Context, obj runtime.Object) (<-chan apiserver.RE } api.FillObjectMetaSystemFields(ctx, &endpoints.ObjectMeta) - return apiserver.MakeAsync(func() (runtime.Object, error) { - err := rs.registry.UpdateEndpoints(ctx, endpoints) - if err != nil { - return nil, err - } - return rs.registry.GetEndpoints(ctx, endpoints.Name) - }), nil + err := rs.registry.UpdateEndpoints(ctx, endpoints) + if err != nil { + return nil, err + } + return rs.registry.GetEndpoints(ctx, endpoints.Name) } // Update satisfies the RESTStorage interface. -func (rs *REST) Update(ctx api.Context, obj runtime.Object) (<-chan apiserver.RESTResult, error) { +func (rs *REST) Update(ctx api.Context, obj runtime.Object) (runtime.Object, bool, error) { endpoints, ok := obj.(*api.Endpoints) if !ok { - return nil, fmt.Errorf("not an endpoints: %#v", obj) + return nil, false, fmt.Errorf("not an endpoints: %#v", obj) } - return apiserver.MakeAsync(func() (runtime.Object, error) { - err := rs.registry.UpdateEndpoints(ctx, endpoints) - if err != nil { - return nil, err - } - return rs.registry.GetEndpoints(ctx, endpoints.Name) - }), nil + err := rs.registry.UpdateEndpoints(ctx, endpoints) + if err != nil { + return nil, false, err + } + out, err := rs.registry.GetEndpoints(ctx, endpoints.Name) + return out, false, err } // New implements the RESTStorage interface. diff --git a/pkg/registry/event/rest.go b/pkg/registry/event/rest.go index 216c356d032..c9307eb86d8 100644 --- a/pkg/registry/event/rest.go +++ b/pkg/registry/event/rest.go @@ -22,7 +22,6 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/validation" - "github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/generic" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" @@ -42,7 +41,7 @@ func NewREST(registry generic.Registry) *REST { } } -func (rs *REST) Create(ctx api.Context, obj runtime.Object) (<-chan apiserver.RESTResult, error) { +func (rs *REST) Create(ctx api.Context, obj runtime.Object) (runtime.Object, error) { event, ok := obj.(*api.Event) if !ok { return nil, fmt.Errorf("invalid object type") @@ -57,41 +56,38 @@ func (rs *REST) Create(ctx api.Context, obj runtime.Object) (<-chan apiserver.RE } api.FillObjectMetaSystemFields(ctx, &event.ObjectMeta) - return apiserver.MakeAsync(func() (runtime.Object, error) { - err := rs.registry.Create(ctx, event.Name, event) - if err != nil { - return nil, err - } - return rs.registry.Get(ctx, event.Name) - }), nil + err := rs.registry.Create(ctx, event.Name, event) + if err != nil { + return nil, err + } + return rs.registry.Get(ctx, event.Name) } // Update replaces an existing Event instance in storage.registry, with the given instance. -func (rs *REST) Update(ctx api.Context, obj runtime.Object) (<-chan apiserver.RESTResult, error) { +func (rs *REST) Update(ctx api.Context, obj runtime.Object) (runtime.Object, bool, error) { event, ok := obj.(*api.Event) if !ok { - return nil, fmt.Errorf("not an event object: %#v", obj) + return nil, false, fmt.Errorf("not an event object: %#v", obj) } if api.NamespaceValue(ctx) != "" { if !api.ValidNamespace(ctx, &event.ObjectMeta) { - return nil, errors.NewConflict("event", event.Namespace, fmt.Errorf("event.namespace does not match the provided context")) + return nil, false, errors.NewConflict("event", event.Namespace, fmt.Errorf("event.namespace does not match the provided context")) } } if errs := validation.ValidateEvent(event); len(errs) > 0 { - return nil, errors.NewInvalid("event", event.Name, errs) + return nil, false, errors.NewInvalid("event", event.Name, errs) } api.FillObjectMetaSystemFields(ctx, &event.ObjectMeta) - return apiserver.MakeAsync(func() (runtime.Object, error) { - err := rs.registry.Update(ctx, event.Name, event) - if err != nil { - return nil, err - } - return rs.registry.Get(ctx, event.Name) - }), nil + err := rs.registry.Update(ctx, event.Name, event) + if err != nil { + return nil, false, err + } + out, err := rs.registry.Get(ctx, event.Name) + return out, false, err } -func (rs *REST) Delete(ctx api.Context, id string) (<-chan apiserver.RESTResult, error) { +func (rs *REST) Delete(ctx api.Context, id string) (runtime.Object, error) { obj, err := rs.registry.Get(ctx, id) if err != nil { return nil, err @@ -100,9 +96,7 @@ func (rs *REST) Delete(ctx api.Context, id string) (<-chan apiserver.RESTResult, if !ok { return nil, fmt.Errorf("invalid object type") } - return apiserver.MakeAsync(func() (runtime.Object, error) { - return &api.Status{Status: api.StatusSuccess}, rs.registry.Delete(ctx, id) - }), nil + return &api.Status{Status: api.StatusSuccess}, rs.registry.Delete(ctx, id) } func (rs *REST) Get(ctx api.Context, id string) (runtime.Object, error) { diff --git a/pkg/registry/event/rest_test.go b/pkg/registry/event/rest_test.go index 922603dd9b0..cbcce2fbe11 100644 --- a/pkg/registry/event/rest_test.go +++ b/pkg/registry/event/rest_test.go @@ -89,7 +89,7 @@ func TestRESTCreate(t *testing.T) { if !api.HasObjectMetaSystemFieldValues(&item.event.ObjectMeta) { t.Errorf("storage did not populate object meta field values") } - if e, a := item.event, (<-c).Object; !reflect.DeepEqual(e, a) { + if e, a := item.event, c; !reflect.DeepEqual(e, a) { t.Errorf("diff: %s", util.ObjectDiff(e, a)) } // Ensure we implement the interface @@ -100,11 +100,10 @@ func TestRESTCreate(t *testing.T) { func TestRESTUpdate(t *testing.T) { _, rest := NewTestREST() eventA := testEvent("foo") - c, err := rest.Create(api.NewDefaultContext(), eventA) + _, err := rest.Create(api.NewDefaultContext(), eventA) if err != nil { t.Fatalf("Unexpected error %v", err) } - <-c got, err := rest.Get(api.NewDefaultContext(), eventA.Name) if err != nil { t.Fatalf("Unexpected error %v", err) @@ -113,11 +112,10 @@ func TestRESTUpdate(t *testing.T) { t.Errorf("diff: %s", util.ObjectDiff(e, a)) } eventB := testEvent("bar") - u, err := rest.Update(api.NewDefaultContext(), eventB) + _, _, err = rest.Update(api.NewDefaultContext(), eventB) if err != nil { t.Fatalf("Unexpected error %v", err) } - <-u got2, err := rest.Get(api.NewDefaultContext(), eventB.Name) if err != nil { t.Fatalf("Unexpected error %v", err) @@ -131,16 +129,15 @@ func TestRESTUpdate(t *testing.T) { func TestRESTDelete(t *testing.T) { _, rest := NewTestREST() eventA := testEvent("foo") - c, err := rest.Create(api.NewDefaultContext(), eventA) + _, err := rest.Create(api.NewDefaultContext(), eventA) if err != nil { t.Fatalf("Unexpected error %v", err) } - <-c - c, err = rest.Delete(api.NewDefaultContext(), eventA.Name) + c, err := rest.Delete(api.NewDefaultContext(), eventA.Name) if err != nil { t.Fatalf("Unexpected error %v", err) } - if stat := (<-c).Object.(*api.Status); stat.Status != api.StatusSuccess { + if stat := c.(*api.Status); stat.Status != api.StatusSuccess { t.Errorf("unexpected status: %v", stat) } } @@ -148,11 +145,10 @@ func TestRESTDelete(t *testing.T) { func TestRESTGet(t *testing.T) { _, rest := NewTestREST() eventA := testEvent("foo") - c, err := rest.Create(api.NewDefaultContext(), eventA) + _, err := rest.Create(api.NewDefaultContext(), eventA) if err != nil { t.Fatalf("Unexpected error %v", err) } - <-c got, err := rest.Get(api.NewDefaultContext(), eventA.Name) if err != nil { t.Fatalf("Unexpected error %v", err) diff --git a/pkg/registry/limitrange/rest.go b/pkg/registry/limitrange/rest.go index f5eaaa1a71d..878dfc46ebd 100644 --- a/pkg/registry/limitrange/rest.go +++ b/pkg/registry/limitrange/rest.go @@ -22,7 +22,6 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/validation" - "github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/generic" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" @@ -44,7 +43,7 @@ func NewREST(registry generic.Registry) *REST { } // Create a LimitRange object -func (rs *REST) Create(ctx api.Context, obj runtime.Object) (<-chan apiserver.RESTResult, error) { +func (rs *REST) Create(ctx api.Context, obj runtime.Object) (runtime.Object, error) { limitRange, ok := obj.(*api.LimitRange) if !ok { return nil, fmt.Errorf("invalid object type") @@ -63,29 +62,27 @@ func (rs *REST) Create(ctx api.Context, obj runtime.Object) (<-chan apiserver.RE } api.FillObjectMetaSystemFields(ctx, &limitRange.ObjectMeta) - return apiserver.MakeAsync(func() (runtime.Object, error) { - err := rs.registry.Create(ctx, limitRange.Name, limitRange) - if err != nil { - return nil, err - } - return rs.registry.Get(ctx, limitRange.Name) - }), nil + err := rs.registry.Create(ctx, limitRange.Name, limitRange) + if err != nil { + return nil, err + } + return rs.registry.Get(ctx, limitRange.Name) } // Update updates a LimitRange object. -func (rs *REST) Update(ctx api.Context, obj runtime.Object) (<-chan apiserver.RESTResult, error) { +func (rs *REST) Update(ctx api.Context, obj runtime.Object) (runtime.Object, bool, error) { limitRange, ok := obj.(*api.LimitRange) if !ok { - return nil, fmt.Errorf("invalid object type") + return nil, false, fmt.Errorf("invalid object type") } if !api.ValidNamespace(ctx, &limitRange.ObjectMeta) { - return nil, errors.NewConflict("limitRange", limitRange.Namespace, fmt.Errorf("LimitRange.Namespace does not match the provided context")) + return nil, false, errors.NewConflict("limitRange", limitRange.Namespace, fmt.Errorf("LimitRange.Namespace does not match the provided context")) } oldObj, err := rs.registry.Get(ctx, limitRange.Name) if err != nil { - return nil, err + return nil, false, err } editLimitRange := oldObj.(*api.LimitRange) @@ -97,20 +94,18 @@ func (rs *REST) Update(ctx api.Context, obj runtime.Object) (<-chan apiserver.RE editLimitRange.Spec = limitRange.Spec if errs := validation.ValidateLimitRange(editLimitRange); len(errs) > 0 { - return nil, errors.NewInvalid("limitRange", editLimitRange.Name, errs) + return nil, false, errors.NewInvalid("limitRange", editLimitRange.Name, errs) } - return apiserver.MakeAsync(func() (runtime.Object, error) { - err := rs.registry.Update(ctx, editLimitRange.Name, editLimitRange) - if err != nil { - return nil, err - } - return rs.registry.Get(ctx, editLimitRange.Name) - }), nil + if err := rs.registry.Update(ctx, editLimitRange.Name, editLimitRange); err != nil { + return nil, false, err + } + out, err := rs.registry.Get(ctx, editLimitRange.Name) + return out, false, err } // Delete deletes the LimitRange with the specified name -func (rs *REST) Delete(ctx api.Context, name string) (<-chan apiserver.RESTResult, error) { +func (rs *REST) Delete(ctx api.Context, name string) (runtime.Object, error) { obj, err := rs.registry.Get(ctx, name) if err != nil { return nil, err @@ -119,9 +114,7 @@ func (rs *REST) Delete(ctx api.Context, name string) (<-chan apiserver.RESTResul if !ok { return nil, fmt.Errorf("invalid object type") } - return apiserver.MakeAsync(func() (runtime.Object, error) { - return &api.Status{Status: api.StatusSuccess}, rs.registry.Delete(ctx, name) - }), nil + return &api.Status{Status: api.StatusSuccess}, rs.registry.Delete(ctx, name) } // Get gets a LimitRange with the specified name diff --git a/pkg/registry/minion/rest.go b/pkg/registry/minion/rest.go index 5c0ab637383..cd6db1ef475 100644 --- a/pkg/registry/minion/rest.go +++ b/pkg/registry/minion/rest.go @@ -26,7 +26,6 @@ import ( kerrors "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/rest" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/validation" - "github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/master/ports" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" @@ -49,7 +48,7 @@ var ErrDoesNotExist = errors.New("The requested resource does not exist.") var ErrNotHealty = errors.New("The requested minion is not healthy.") // Create satisfies the RESTStorage interface. -func (rs *REST) Create(ctx api.Context, obj runtime.Object) (<-chan apiserver.RESTResult, error) { +func (rs *REST) Create(ctx api.Context, obj runtime.Object) (runtime.Object, error) { minion, ok := obj.(*api.Node) if !ok { return nil, fmt.Errorf("not a minion: %#v", obj) @@ -59,17 +58,15 @@ func (rs *REST) Create(ctx api.Context, obj runtime.Object) (<-chan apiserver.RE return nil, err } - return apiserver.MakeAsync(func() (runtime.Object, error) { - if err := rs.registry.CreateMinion(ctx, minion); err != nil { - err = rest.CheckGeneratedNameError(rest.Nodes, err, minion) - return nil, err - } - return minion, nil - }), nil + if err := rs.registry.CreateMinion(ctx, minion); err != nil { + err = rest.CheckGeneratedNameError(rest.Nodes, err, minion) + return nil, err + } + return minion, nil } // Delete satisfies the RESTStorage interface. -func (rs *REST) Delete(ctx api.Context, id string) (<-chan apiserver.RESTResult, error) { +func (rs *REST) Delete(ctx api.Context, id string) (runtime.Object, error) { minion, err := rs.registry.GetMinion(ctx, id) if minion == nil { return nil, ErrDoesNotExist @@ -77,9 +74,7 @@ func (rs *REST) Delete(ctx api.Context, id string) (<-chan apiserver.RESTResult, if err != nil { return nil, err } - return apiserver.MakeAsync(func() (runtime.Object, error) { - return &api.Status{Status: api.StatusSuccess}, rs.registry.DeleteMinion(ctx, id) - }), nil + return &api.Status{Status: api.StatusSuccess}, rs.registry.DeleteMinion(ctx, id) } // Get satisfies the RESTStorage interface. @@ -108,10 +103,10 @@ func (*REST) NewList() runtime.Object { } // Update satisfies the RESTStorage interface. -func (rs *REST) Update(ctx api.Context, obj runtime.Object) (<-chan apiserver.RESTResult, error) { +func (rs *REST) Update(ctx api.Context, obj runtime.Object) (runtime.Object, bool, error) { minion, ok := obj.(*api.Node) if !ok { - return nil, fmt.Errorf("not a minion: %#v", obj) + return nil, false, fmt.Errorf("not a minion: %#v", obj) } // This is hacky, but minions don't really have a namespace, but kubectl currently automatically // stuffs one in there. Fix it here temporarily until we fix kubectl @@ -123,7 +118,7 @@ func (rs *REST) Update(ctx api.Context, obj runtime.Object) (<-chan apiserver.RE oldMinion, err := rs.registry.GetMinion(ctx, minion.Name) if err != nil { - return nil, err + return nil, false, err } // This is hacky, but minion HostIP has been moved from spec to status since v1beta2. When updating @@ -134,16 +129,14 @@ func (rs *REST) Update(ctx api.Context, obj runtime.Object) (<-chan apiserver.RE } if errs := validation.ValidateMinionUpdate(oldMinion, minion); len(errs) > 0 { - return nil, kerrors.NewInvalid("minion", minion.Name, errs) + return nil, false, kerrors.NewInvalid("minion", minion.Name, errs) } - return apiserver.MakeAsync(func() (runtime.Object, error) { - err := rs.registry.UpdateMinion(ctx, minion) - if err != nil { - return nil, err - } - return rs.registry.GetMinion(ctx, minion.Name) - }), nil + if err := rs.registry.UpdateMinion(ctx, minion); err != nil { + return nil, false, err + } + out, err := rs.registry.GetMinion(ctx, minion.Name) + return out, false, err } // Watch returns Minions events via a watch.Interface. diff --git a/pkg/registry/minion/rest_test.go b/pkg/registry/minion/rest_test.go index 0e3e2c87b83..fe53157e31a 100644 --- a/pkg/registry/minion/rest_test.go +++ b/pkg/registry/minion/rest_test.go @@ -39,27 +39,25 @@ func TestMinionRegistryREST(t *testing.T) { t.Errorf("has unexpected error: %v", err) } - c, err := ms.Create(ctx, &api.Node{ObjectMeta: api.ObjectMeta{Name: "baz"}}) + obj, err := ms.Create(ctx, &api.Node{ObjectMeta: api.ObjectMeta{Name: "baz"}}) if err != nil { t.Fatalf("insert failed: %v", err) } - obj := <-c - if !api.HasObjectMetaSystemFieldValues(&obj.Object.(*api.Node).ObjectMeta) { + if !api.HasObjectMetaSystemFieldValues(&obj.(*api.Node).ObjectMeta) { t.Errorf("storage did not populate object meta field values") } - if m, ok := obj.Object.(*api.Node); !ok || m.Name != "baz" { + if m, ok := obj.(*api.Node); !ok || m.Name != "baz" { t.Errorf("insert return value was weird: %#v", obj) } if obj, err := ms.Get(ctx, "baz"); err != nil || obj.(*api.Node).Name != "baz" { t.Errorf("insert didn't actually insert") } - c, err = ms.Delete(ctx, "bar") + obj, err = ms.Delete(ctx, "bar") if err != nil { t.Fatalf("delete failed") } - obj = <-c - if s, ok := obj.Object.(*api.Status); !ok || s.Status != api.StatusSuccess { + if s, ok := obj.(*api.Status); !ok || s.Status != api.StatusSuccess { t.Errorf("delete return value was weird: %#v", obj) } if _, err := ms.Get(ctx, "bar"); !errors.IsNotFound(err) { @@ -103,7 +101,7 @@ func TestMinionRegistryValidUpdate(t *testing.T) { "foo": "bar", "baz": "home", } - if _, err = storage.Update(ctx, minion); err != nil { + if _, _, err = storage.Update(ctx, minion); err != nil { t.Errorf("Unexpected error: %v", err) } } @@ -136,7 +134,7 @@ func TestMinionRegistryValidatesCreate(t *testing.T) { for _, failureCase := range failureCases { c, err := storage.Create(ctx, &failureCase) if c != nil { - t.Errorf("Expected nil channel") + t.Errorf("Expected nil object") } if !errors.IsInvalid(err) { t.Errorf("Expected to get an invalid resource error, got %v", err) diff --git a/pkg/registry/namespace/rest.go b/pkg/registry/namespace/rest.go index eeaad26519d..21a6a45bf61 100644 --- a/pkg/registry/namespace/rest.go +++ b/pkg/registry/namespace/rest.go @@ -23,7 +23,6 @@ import ( kerrors "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/rest" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/validation" - "github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/generic" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" @@ -44,48 +43,44 @@ func NewREST(registry generic.Registry) *REST { } // Create creates a Namespace object -func (rs *REST) Create(ctx api.Context, obj runtime.Object) (<-chan apiserver.RESTResult, error) { +func (rs *REST) Create(ctx api.Context, obj runtime.Object) (runtime.Object, error) { namespace := obj.(*api.Namespace) if err := rest.BeforeCreate(rest.Namespaces, ctx, obj); err != nil { return nil, err } - return apiserver.MakeAsync(func() (runtime.Object, error) { - if err := rs.registry.Create(ctx, namespace.Name, namespace); err != nil { - err = rest.CheckGeneratedNameError(rest.Namespaces, err, namespace) - return nil, err - } - return rs.registry.Get(ctx, namespace.Name) - }), nil + if err := rs.registry.Create(ctx, namespace.Name, namespace); err != nil { + err = rest.CheckGeneratedNameError(rest.Namespaces, err, namespace) + return nil, err + } + return rs.registry.Get(ctx, namespace.Name) } // Update updates a Namespace object. -func (rs *REST) Update(ctx api.Context, obj runtime.Object) (<-chan apiserver.RESTResult, error) { +func (rs *REST) Update(ctx api.Context, obj runtime.Object) (runtime.Object, bool, error) { namespace, ok := obj.(*api.Namespace) if !ok { - return nil, fmt.Errorf("not a namespace: %#v", obj) + return nil, false, fmt.Errorf("not a namespace: %#v", obj) } oldObj, err := rs.registry.Get(ctx, namespace.Name) if err != nil { - return nil, err + return nil, false, err } oldNamespace := oldObj.(*api.Namespace) if errs := validation.ValidateNamespaceUpdate(oldNamespace, namespace); len(errs) > 0 { - return nil, kerrors.NewInvalid("namespace", namespace.Name, errs) + return nil, false, kerrors.NewInvalid("namespace", namespace.Name, errs) } - return apiserver.MakeAsync(func() (runtime.Object, error) { - err := rs.registry.Update(ctx, oldNamespace.Name, oldNamespace) - if err != nil { - return nil, err - } - return rs.registry.Get(ctx, oldNamespace.Name) - }), nil + if err := rs.registry.Update(ctx, oldNamespace.Name, oldNamespace); err != nil { + return nil, false, err + } + out, err := rs.registry.Get(ctx, oldNamespace.Name) + return out, false, err } // Delete deletes the Namespace with the specified name -func (rs *REST) Delete(ctx api.Context, id string) (<-chan apiserver.RESTResult, error) { +func (rs *REST) Delete(ctx api.Context, id string) (runtime.Object, error) { obj, err := rs.registry.Get(ctx, id) if err != nil { return nil, err @@ -94,10 +89,7 @@ func (rs *REST) Delete(ctx api.Context, id string) (<-chan apiserver.RESTResult, if !ok { return nil, fmt.Errorf("invalid object type") } - - return apiserver.MakeAsync(func() (runtime.Object, error) { - return &api.Status{Status: api.StatusSuccess}, rs.registry.Delete(ctx, id) - }), nil + return &api.Status{Status: api.StatusSuccess}, rs.registry.Delete(ctx, id) } func (rs *REST) Get(ctx api.Context, id string) (runtime.Object, error) { diff --git a/pkg/registry/namespace/rest_test.go b/pkg/registry/namespace/rest_test.go index e053e4dd215..9a694984489 100644 --- a/pkg/registry/namespace/rest_test.go +++ b/pkg/registry/namespace/rest_test.go @@ -78,7 +78,7 @@ func TestRESTCreate(t *testing.T) { if !api.HasObjectMetaSystemFieldValues(&item.namespace.ObjectMeta) { t.Errorf("storage did not populate object meta field values") } - if e, a := item.namespace, (<-c).Object; !reflect.DeepEqual(e, a) { + if e, a := item.namespace, c; !reflect.DeepEqual(e, a) { t.Errorf("diff: %s", util.ObjectDiff(e, a)) } // Ensure we implement the interface @@ -89,11 +89,10 @@ func TestRESTCreate(t *testing.T) { func TestRESTUpdate(t *testing.T) { _, rest := NewTestREST() namespaceA := testNamespace("foo") - c, err := rest.Create(api.NewDefaultContext(), namespaceA) + _, err := rest.Create(api.NewDefaultContext(), namespaceA) if err != nil { t.Fatalf("Unexpected error %v", err) } - <-c got, err := rest.Get(api.NewDefaultContext(), namespaceA.Name) if err != nil { t.Fatalf("Unexpected error %v", err) @@ -102,11 +101,10 @@ func TestRESTUpdate(t *testing.T) { t.Errorf("diff: %s", util.ObjectDiff(e, a)) } namespaceB := testNamespace("foo") - u, err := rest.Update(api.NewDefaultContext(), namespaceB) + _, _, err = rest.Update(api.NewDefaultContext(), namespaceB) if err != nil { t.Fatalf("Unexpected error %v", err) } - <-u got2, err := rest.Get(api.NewDefaultContext(), namespaceB.Name) if err != nil { t.Fatalf("Unexpected error %v", err) @@ -120,16 +118,15 @@ func TestRESTUpdate(t *testing.T) { func TestRESTDelete(t *testing.T) { _, rest := NewTestREST() namespaceA := testNamespace("foo") - c, err := rest.Create(api.NewContext(), namespaceA) + _, err := rest.Create(api.NewContext(), namespaceA) if err != nil { t.Fatalf("Unexpected error %v", err) } - <-c - c, err = rest.Delete(api.NewContext(), namespaceA.Name) + c, err := rest.Delete(api.NewContext(), namespaceA.Name) if err != nil { t.Fatalf("Unexpected error %v", err) } - if stat := (<-c).Object.(*api.Status); stat.Status != api.StatusSuccess { + if stat := c.(*api.Status); stat.Status != api.StatusSuccess { t.Errorf("unexpected status: %v", stat) } } @@ -137,11 +134,10 @@ func TestRESTDelete(t *testing.T) { func TestRESTGet(t *testing.T) { _, rest := NewTestREST() namespaceA := testNamespace("foo") - c, err := rest.Create(api.NewContext(), namespaceA) + _, err := rest.Create(api.NewContext(), namespaceA) if err != nil { t.Fatalf("Unexpected error %v", err) } - <-c got, err := rest.Get(api.NewContext(), namespaceA.Name) if err != nil { t.Fatalf("Unexpected error %v", err) diff --git a/pkg/registry/pod/rest.go b/pkg/registry/pod/rest.go index b27290b1fdb..4e7c995ae69 100644 --- a/pkg/registry/pod/rest.go +++ b/pkg/registry/pod/rest.go @@ -25,7 +25,6 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api/rest" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/v1beta1" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/validation" - "github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" @@ -55,32 +54,28 @@ func NewREST(config *RESTConfig) *REST { } } -func (rs *REST) Create(ctx api.Context, obj runtime.Object) (<-chan apiserver.RESTResult, error) { +func (rs *REST) Create(ctx api.Context, obj runtime.Object) (runtime.Object, error) { pod := obj.(*api.Pod) if err := rest.BeforeCreate(rest.Pods, ctx, obj); err != nil { return nil, err } - return apiserver.MakeAsync(func() (runtime.Object, error) { - if err := rs.registry.CreatePod(ctx, pod); err != nil { - err = rest.CheckGeneratedNameError(rest.Pods, err, pod) - return nil, err - } - return rs.registry.GetPod(ctx, pod.Name) - }), nil + if err := rs.registry.CreatePod(ctx, pod); err != nil { + err = rest.CheckGeneratedNameError(rest.Pods, err, pod) + return nil, err + } + return rs.registry.GetPod(ctx, pod.Name) } -func (rs *REST) Delete(ctx api.Context, id string) (<-chan apiserver.RESTResult, error) { - return apiserver.MakeAsync(func() (runtime.Object, error) { - namespace, found := api.NamespaceFrom(ctx) - if !found { - return &api.Status{Status: api.StatusFailure}, nil - } - rs.podCache.ClearPodStatus(namespace, id) +func (rs *REST) Delete(ctx api.Context, id string) (runtime.Object, error) { + namespace, found := api.NamespaceFrom(ctx) + if !found { + return &api.Status{Status: api.StatusFailure}, nil + } + rs.podCache.ClearPodStatus(namespace, id) - return &api.Status{Status: api.StatusSuccess}, rs.registry.DeletePod(ctx, id) - }), nil + return &api.Status{Status: api.StatusSuccess}, rs.registry.DeletePod(ctx, id) } func (rs *REST) Get(ctx api.Context, id string) (runtime.Object, error) { @@ -167,20 +162,19 @@ func (*REST) NewList() runtime.Object { return &api.PodList{} } -func (rs *REST) Update(ctx api.Context, obj runtime.Object) (<-chan apiserver.RESTResult, error) { +func (rs *REST) Update(ctx api.Context, obj runtime.Object) (runtime.Object, bool, error) { pod := obj.(*api.Pod) if !api.ValidNamespace(ctx, &pod.ObjectMeta) { - return nil, errors.NewConflict("pod", pod.Namespace, fmt.Errorf("Pod.Namespace does not match the provided context")) + return nil, false, errors.NewConflict("pod", pod.Namespace, fmt.Errorf("Pod.Namespace does not match the provided context")) } if errs := validation.ValidatePod(pod); len(errs) > 0 { - return nil, errors.NewInvalid("pod", pod.Name, errs) + return nil, false, errors.NewInvalid("pod", pod.Name, errs) } - return apiserver.MakeAsync(func() (runtime.Object, error) { - if err := rs.registry.UpdatePod(ctx, pod); err != nil { - return nil, err - } - return rs.registry.GetPod(ctx, pod.Name) - }), nil + if err := rs.registry.UpdatePod(ctx, pod); err != nil { + return nil, false, err + } + out, err := rs.registry.GetPod(ctx, pod.Name) + return out, false, err } // ResourceLocation returns a URL to which one can send traffic for the specified pod. diff --git a/pkg/registry/pod/rest_test.go b/pkg/registry/pod/rest_test.go index e0b2c1c9031..ff133675499 100644 --- a/pkg/registry/pod/rest_test.go +++ b/pkg/registry/pod/rest_test.go @@ -21,7 +21,6 @@ import ( "reflect" "strings" "testing" - "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" @@ -31,6 +30,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest" + "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" ) @@ -55,9 +55,8 @@ func (f *fakeCache) ClearPodStatus(namespace, name string) { f.clearedName = name } -func expectApiStatusError(t *testing.T, ch <-chan apiserver.RESTResult, msg string) { - out := <-ch - status, ok := out.Object.(*api.Status) +func expectApiStatusError(t *testing.T, out runtime.Object, msg string) { + status, ok := out.(*api.Status) if !ok { t.Errorf("Expected an api.Status object, was %#v", out) return @@ -67,9 +66,8 @@ func expectApiStatusError(t *testing.T, ch <-chan apiserver.RESTResult, msg stri } } -func expectPod(t *testing.T, ch <-chan apiserver.RESTResult) (*api.Pod, bool) { - out := <-ch - pod, ok := out.Object.(*api.Pod) +func expectPod(t *testing.T, out runtime.Object) (*api.Pod, bool) { + pod, ok := out.(*api.Pod) if !ok || pod == nil { t.Errorf("Expected an api.Pod object, was %#v", out) return nil, false @@ -94,11 +92,10 @@ func TestCreatePodRegistryError(t *testing.T) { }, } ctx := api.NewDefaultContext() - ch, err := storage.Create(ctx, pod) - if err != nil { + _, err := storage.Create(ctx, pod) + if err != podRegistry.Err { t.Fatalf("unexpected error: %v", err) } - expectApiStatusError(t, ch, podRegistry.Err.Error()) } func TestCreatePodSetsIds(t *testing.T) { @@ -118,11 +115,10 @@ func TestCreatePodSetsIds(t *testing.T) { }, } ctx := api.NewDefaultContext() - ch, err := storage.Create(ctx, pod) - if err != nil { + _, err := storage.Create(ctx, pod) + if err != podRegistry.Err { t.Fatalf("unexpected error: %v", err) } - expectApiStatusError(t, ch, podRegistry.Err.Error()) if len(podRegistry.Pod.Name) == 0 { t.Errorf("Expected pod ID to be set, Got %#v", pod) @@ -149,11 +145,10 @@ func TestCreatePodSetsUID(t *testing.T) { }, } ctx := api.NewDefaultContext() - ch, err := storage.Create(ctx, pod) - if err != nil { + _, err := storage.Create(ctx, pod) + if err != podRegistry.Err { t.Fatalf("unexpected error: %v", err) } - expectApiStatusError(t, ch, podRegistry.Err.Error()) if len(podRegistry.Pod.UID) == 0 { t.Errorf("Expected pod UID to be set, Got %#v", pod) @@ -471,15 +466,12 @@ func TestCreatePod(t *testing.T) { } pod.Name = "foo" ctx := api.NewDefaultContext() - channel, err := storage.Create(ctx, pod) + obj, err := storage.Create(ctx, pod) if err != nil { t.Fatalf("unexpected error: %v", err) } - select { - case <-channel: - // Do nothing, this is expected. - case <-time.After(time.Millisecond * 100): - t.Error("Unexpected timeout on async channel") + if obj == nil { + t.Fatalf("unexpected object: %#v", obj) } if !api.HasObjectMetaSystemFieldValues(&podRegistry.Pod.ObjectMeta) { t.Errorf("Expected ObjectMeta field values were populated") @@ -520,9 +512,9 @@ func TestUpdatePodWithConflictingNamespace(t *testing.T) { } ctx := api.NewDefaultContext() - channel, err := storage.Update(ctx, pod) - if channel != nil { - t.Error("Expected a nil channel, but we got a value") + obj, created, err := storage.Update(ctx, pod) + if obj != nil || created { + t.Error("Expected a nil channel, but we got a value or created") } if err == nil { t.Errorf("Expected an error, but we didn't get one") @@ -648,19 +640,12 @@ func TestDeletePod(t *testing.T) { podCache: fakeCache, } ctx := api.NewDefaultContext() - channel, err := storage.Delete(ctx, "foo") + result, err := storage.Delete(ctx, "foo") if err != nil { t.Fatalf("unexpected error: %v", err) } - var result apiserver.RESTResult - select { - case result = <-channel: - // Do nothing, this is expected. - case <-time.After(time.Millisecond * 100): - t.Error("Unexpected timeout on async channel") - } if fakeCache.clearedNamespace != "default" || fakeCache.clearedName != "foo" { - t.Errorf("Unexpeceted cache delete: %s %s %#v", fakeCache.clearedName, fakeCache.clearedNamespace, result.Object) + t.Errorf("Unexpeceted cache delete: %s %s %#v", fakeCache.clearedName, fakeCache.clearedNamespace, result) } } diff --git a/pkg/registry/resourcequota/rest.go b/pkg/registry/resourcequota/rest.go index 3f22be18826..e577e0cc516 100644 --- a/pkg/registry/resourcequota/rest.go +++ b/pkg/registry/resourcequota/rest.go @@ -22,7 +22,6 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/validation" - "github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/generic" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" @@ -44,7 +43,7 @@ func NewREST(registry generic.Registry) *REST { } // Create a ResourceQuota object -func (rs *REST) Create(ctx api.Context, obj runtime.Object) (<-chan apiserver.RESTResult, error) { +func (rs *REST) Create(ctx api.Context, obj runtime.Object) (runtime.Object, error) { resourceQuota, ok := obj.(*api.ResourceQuota) if !ok { return nil, fmt.Errorf("invalid object type") @@ -66,29 +65,27 @@ func (rs *REST) Create(ctx api.Context, obj runtime.Object) (<-chan apiserver.RE } api.FillObjectMetaSystemFields(ctx, &resourceQuota.ObjectMeta) - return apiserver.MakeAsync(func() (runtime.Object, error) { - err := rs.registry.Create(ctx, resourceQuota.Name, resourceQuota) - if err != nil { - return nil, err - } - return rs.registry.Get(ctx, resourceQuota.Name) - }), nil + err := rs.registry.Create(ctx, resourceQuota.Name, resourceQuota) + if err != nil { + return nil, err + } + return rs.registry.Get(ctx, resourceQuota.Name) } // Update updates a ResourceQuota object. -func (rs *REST) Update(ctx api.Context, obj runtime.Object) (<-chan apiserver.RESTResult, error) { +func (rs *REST) Update(ctx api.Context, obj runtime.Object) (runtime.Object, bool, error) { resourceQuota, ok := obj.(*api.ResourceQuota) if !ok { - return nil, fmt.Errorf("invalid object type") + return nil, false, fmt.Errorf("invalid object type") } if !api.ValidNamespace(ctx, &resourceQuota.ObjectMeta) { - return nil, errors.NewConflict("resourceQuota", resourceQuota.Namespace, fmt.Errorf("ResourceQuota.Namespace does not match the provided context")) + return nil, false, errors.NewConflict("resourceQuota", resourceQuota.Namespace, fmt.Errorf("ResourceQuota.Namespace does not match the provided context")) } oldObj, err := rs.registry.Get(ctx, resourceQuota.Name) if err != nil { - return nil, err + return nil, false, err } editResourceQuota := oldObj.(*api.ResourceQuota) @@ -100,20 +97,18 @@ func (rs *REST) Update(ctx api.Context, obj runtime.Object) (<-chan apiserver.RE editResourceQuota.Spec = resourceQuota.Spec if errs := validation.ValidateResourceQuota(editResourceQuota); len(errs) > 0 { - return nil, errors.NewInvalid("resourceQuota", editResourceQuota.Name, errs) + return nil, false, errors.NewInvalid("resourceQuota", editResourceQuota.Name, errs) } - return apiserver.MakeAsync(func() (runtime.Object, error) { - err := rs.registry.Update(ctx, editResourceQuota.Name, editResourceQuota) - if err != nil { - return nil, err - } - return rs.registry.Get(ctx, editResourceQuota.Name) - }), nil + if err := rs.registry.Update(ctx, editResourceQuota.Name, editResourceQuota); err != nil { + return nil, false, err + } + out, err := rs.registry.Get(ctx, editResourceQuota.Name) + return out, false, err } // Delete deletes the ResourceQuota with the specified name -func (rs *REST) Delete(ctx api.Context, name string) (<-chan apiserver.RESTResult, error) { +func (rs *REST) Delete(ctx api.Context, name string) (runtime.Object, error) { obj, err := rs.registry.Get(ctx, name) if err != nil { return nil, err @@ -122,9 +117,7 @@ func (rs *REST) Delete(ctx api.Context, name string) (<-chan apiserver.RESTResul if !ok { return nil, fmt.Errorf("invalid object type") } - return apiserver.MakeAsync(func() (runtime.Object, error) { - return &api.Status{Status: api.StatusSuccess}, rs.registry.Delete(ctx, name) - }), nil + return &api.Status{Status: api.StatusSuccess}, rs.registry.Delete(ctx, name) } // Get gets a ResourceQuota with the specified name diff --git a/pkg/registry/resourcequotausage/rest.go b/pkg/registry/resourcequotausage/rest.go index cfd63cf4c7a..b2d9a51454e 100644 --- a/pkg/registry/resourcequotausage/rest.go +++ b/pkg/registry/resourcequotausage/rest.go @@ -20,7 +20,6 @@ import ( "fmt" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" - "github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" ) @@ -42,15 +41,13 @@ func (*REST) New() runtime.Object { } // Create takes the incoming ResourceQuotaUsage and applies the latest status atomically to a ResourceQuota -func (b *REST) Create(ctx api.Context, obj runtime.Object) (<-chan apiserver.RESTResult, error) { +func (b *REST) Create(ctx api.Context, obj runtime.Object) (runtime.Object, error) { resourceQuotaUsage, ok := obj.(*api.ResourceQuotaUsage) if !ok { return nil, fmt.Errorf("incorrect type: %#v", obj) } - return apiserver.MakeAsync(func() (runtime.Object, error) { - if err := b.registry.ApplyStatus(ctx, resourceQuotaUsage); err != nil { - return nil, err - } - return &api.Status{Status: api.StatusSuccess}, nil - }), nil + if err := b.registry.ApplyStatus(ctx, resourceQuotaUsage); err != nil { + return nil, err + } + return &api.Status{Status: api.StatusSuccess}, nil } diff --git a/pkg/registry/service/rest.go b/pkg/registry/service/rest.go index 638cf592c84..a2906179219 100644 --- a/pkg/registry/service/rest.go +++ b/pkg/registry/service/rest.go @@ -25,7 +25,6 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/rest" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/validation" - "github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver" "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/minion" @@ -80,7 +79,7 @@ func reloadIPsFromStorage(ipa *ipAllocator, registry Registry) { } } -func (rs *REST) Create(ctx api.Context, obj runtime.Object) (<-chan apiserver.RESTResult, error) { +func (rs *REST) Create(ctx api.Context, obj runtime.Object) (runtime.Object, error) { service := obj.(*api.Service) if err := rest.BeforeCreate(rest.Services, ctx, obj); err != nil { @@ -102,61 +101,59 @@ func (rs *REST) Create(ctx api.Context, obj runtime.Object) (<-chan apiserver.RE } } - return apiserver.MakeAsync(func() (runtime.Object, error) { - // TODO: Move this to post-creation rectification loop, so that we make/remove external load balancers - // correctly no matter what http operations happen. - if service.Spec.CreateExternalLoadBalancer { - if rs.cloud == nil { - return nil, fmt.Errorf("requested an external service, but no cloud provider supplied.") - } - if service.Spec.Protocol != api.ProtocolTCP { - // TODO: Support UDP here too. - return nil, fmt.Errorf("external load balancers for non TCP services are not currently supported.") - } - balancer, ok := rs.cloud.TCPLoadBalancer() - if !ok { - return nil, fmt.Errorf("the cloud provider does not support external TCP load balancers.") - } - zones, ok := rs.cloud.Zones() - if !ok { - return nil, fmt.Errorf("the cloud provider does not support zone enumeration.") - } - hosts, err := rs.machines.ListMinions(ctx) - if err != nil { - return nil, err - } - zone, err := zones.GetZone() - if err != nil { - return nil, err - } - // TODO: We should be able to rely on valid input, and not do defaulting here. - var affinityType api.AffinityType = service.Spec.SessionAffinity - if affinityType == "" { - affinityType = api.AffinityTypeNone - } - if len(service.Spec.PublicIPs) > 0 { - for _, publicIP := range service.Spec.PublicIPs { - _, err = balancer.CreateTCPLoadBalancer(service.Name, zone.Region, net.ParseIP(publicIP), service.Spec.Port, hostsFromMinionList(hosts), affinityType) - if err != nil { - // TODO: have to roll-back any successful calls. - return nil, err - } - } - } else { - ip, err := balancer.CreateTCPLoadBalancer(service.Name, zone.Region, nil, service.Spec.Port, hostsFromMinionList(hosts), affinityType) - if err != nil { - return nil, err - } - service.Spec.PublicIPs = []string{ip.String()} - } + // TODO: Move this to post-creation rectification loop, so that we make/remove external load balancers + // correctly no matter what http operations happen. + if service.Spec.CreateExternalLoadBalancer { + if rs.cloud == nil { + return nil, fmt.Errorf("requested an external service, but no cloud provider supplied.") } - - if err := rs.registry.CreateService(ctx, service); err != nil { - err = rest.CheckGeneratedNameError(rest.Services, err, service) + if service.Spec.Protocol != api.ProtocolTCP { + // TODO: Support UDP here too. + return nil, fmt.Errorf("external load balancers for non TCP services are not currently supported.") + } + balancer, ok := rs.cloud.TCPLoadBalancer() + if !ok { + return nil, fmt.Errorf("the cloud provider does not support external TCP load balancers.") + } + zones, ok := rs.cloud.Zones() + if !ok { + return nil, fmt.Errorf("the cloud provider does not support zone enumeration.") + } + hosts, err := rs.machines.ListMinions(ctx) + if err != nil { return nil, err } - return rs.registry.GetService(ctx, service.Name) - }), nil + zone, err := zones.GetZone() + if err != nil { + return nil, err + } + // TODO: We should be able to rely on valid input, and not do defaulting here. + var affinityType api.AffinityType = service.Spec.SessionAffinity + if affinityType == "" { + affinityType = api.AffinityTypeNone + } + if len(service.Spec.PublicIPs) > 0 { + for _, publicIP := range service.Spec.PublicIPs { + _, err = balancer.CreateTCPLoadBalancer(service.Name, zone.Region, net.ParseIP(publicIP), service.Spec.Port, hostsFromMinionList(hosts), affinityType) + if err != nil { + // TODO: have to roll-back any successful calls. + return nil, err + } + } + } else { + ip, err := balancer.CreateTCPLoadBalancer(service.Name, zone.Region, nil, service.Spec.Port, hostsFromMinionList(hosts), affinityType) + if err != nil { + return nil, err + } + service.Spec.PublicIPs = []string{ip.String()} + } + } + + if err := rs.registry.CreateService(ctx, service); err != nil { + err = rest.CheckGeneratedNameError(rest.Services, err, service) + return nil, err + } + return rs.registry.GetService(ctx, service.Name) } func hostsFromMinionList(list *api.NodeList) []string { @@ -167,16 +164,14 @@ func hostsFromMinionList(list *api.NodeList) []string { return result } -func (rs *REST) Delete(ctx api.Context, id string) (<-chan apiserver.RESTResult, error) { +func (rs *REST) Delete(ctx api.Context, id string) (runtime.Object, error) { service, err := rs.registry.GetService(ctx, id) if err != nil { return nil, err } rs.portalMgr.Release(net.ParseIP(service.Spec.PortalIP)) - return apiserver.MakeAsync(func() (runtime.Object, error) { - rs.deleteExternalLoadBalancer(service) - return &api.Status{Status: api.StatusSuccess}, rs.registry.DeleteService(ctx, id) - }), nil + rs.deleteExternalLoadBalancer(service) + return &api.Status{Status: api.StatusSuccess}, rs.registry.DeleteService(ctx, id) } func (rs *REST) Get(ctx api.Context, id string) (runtime.Object, error) { @@ -217,30 +212,29 @@ func (*REST) NewList() runtime.Object { return &api.Service{} } -func (rs *REST) Update(ctx api.Context, obj runtime.Object) (<-chan apiserver.RESTResult, error) { +func (rs *REST) Update(ctx api.Context, obj runtime.Object) (runtime.Object, bool, error) { service := obj.(*api.Service) if !api.ValidNamespace(ctx, &service.ObjectMeta) { - return nil, errors.NewConflict("service", service.Namespace, fmt.Errorf("Service.Namespace does not match the provided context")) + return nil, false, errors.NewConflict("service", service.Namespace, fmt.Errorf("Service.Namespace does not match the provided context")) } oldService, err := rs.registry.GetService(ctx, service.Name) if err != nil { - return nil, err + return nil, false, err } // Copy over non-user fields // TODO: make this a merge function if errs := validation.ValidateServiceUpdate(oldService, service); len(errs) > 0 { - return nil, errors.NewInvalid("service", service.Name, errs) + return nil, false, errors.NewInvalid("service", service.Name, errs) } - return apiserver.MakeAsync(func() (runtime.Object, error) { - // TODO: check to see if external load balancer status changed - err = rs.registry.UpdateService(ctx, service) - if err != nil { - return nil, err - } - return rs.registry.GetService(ctx, service.Name) - }), nil + // TODO: check to see if external load balancer status changed + err = rs.registry.UpdateService(ctx, service) + if err != nil { + return nil, false, err + } + out, err := rs.registry.GetService(ctx, service.Name) + return out, false, err } // ResourceLocation returns a URL to which one can send traffic for the specified service. diff --git a/pkg/registry/service/rest_test.go b/pkg/registry/service/rest_test.go index 8d228844734..60a3096015f 100644 --- a/pkg/registry/service/rest_test.go +++ b/pkg/registry/service/rest_test.go @@ -56,9 +56,8 @@ func TestServiceRegistryCreate(t *testing.T) { }, } ctx := api.NewDefaultContext() - c, _ := storage.Create(ctx, svc) - created_svc := <-c - created_service := created_svc.Object.(*api.Service) + created_svc, _ := storage.Create(ctx, svc) + created_service := created_svc.(*api.Service) if !api.HasObjectMetaSystemFieldValues(&created_service.ObjectMeta) { t.Errorf("storage did not populate object meta field values") } @@ -109,7 +108,7 @@ func TestServiceStorageValidatesCreate(t *testing.T) { for _, failureCase := range failureCases { c, err := storage.Create(ctx, &failureCase) if c != nil { - t.Errorf("Expected nil channel") + t.Errorf("Expected nil object") } if !errors.IsInvalid(err) { t.Errorf("Expected to get an invalid resource error, got %v", err) @@ -129,7 +128,7 @@ func TestServiceRegistryUpdate(t *testing.T) { }, }) storage := NewREST(registry, nil, nil, makeIPNet(t)) - c, err := storage.Update(ctx, &api.Service{ + updated_svc, created, err := storage.Update(ctx, &api.Service{ ObjectMeta: api.ObjectMeta{Name: "foo"}, Spec: api.ServiceSpec{ Port: 6502, @@ -141,11 +140,13 @@ func TestServiceRegistryUpdate(t *testing.T) { if err != nil { t.Fatalf("Expected no error: %v", err) } - if c == nil { - t.Errorf("Expected non-nil channel") + if updated_svc == nil { + t.Errorf("Expected non-nil object") } - updated_svc := <-c - updated_service := updated_svc.Object.(*api.Service) + if created { + t.Errorf("expected not created") + } + updated_service := updated_svc.(*api.Service) if updated_service.Name != "foo" { t.Errorf("Expected foo, but got %v", updated_service.Name) } @@ -186,9 +187,9 @@ func TestServiceStorageValidatesUpdate(t *testing.T) { }, } for _, failureCase := range failureCases { - c, err := storage.Update(ctx, &failureCase) - if c != nil { - t.Errorf("Expected nil channel") + c, created, err := storage.Update(ctx, &failureCase) + if c != nil || created { + t.Errorf("Expected nil object or created false") } if !errors.IsInvalid(err) { t.Errorf("Expected to get an invalid resource error, got %v", err) @@ -212,8 +213,7 @@ func TestServiceRegistryExternalService(t *testing.T) { SessionAffinity: api.AffinityTypeNone, }, } - c, _ := storage.Create(ctx, svc) - <-c + storage.Create(ctx, svc) if len(fakeCloud.Calls) != 2 || fakeCloud.Calls[0] != "get-zone" || fakeCloud.Calls[1] != "create" { t.Errorf("Unexpected call(s): %#v", fakeCloud.Calls) } @@ -244,8 +244,7 @@ func TestServiceRegistryExternalServiceError(t *testing.T) { }, } ctx := api.NewDefaultContext() - c, _ := storage.Create(ctx, svc) - <-c + storage.Create(ctx, svc) if len(fakeCloud.Calls) != 1 || fakeCloud.Calls[0] != "get-zone" { t.Errorf("Unexpected call(s): %#v", fakeCloud.Calls) } @@ -269,8 +268,7 @@ func TestServiceRegistryDelete(t *testing.T) { }, } registry.CreateService(ctx, svc) - c, _ := storage.Delete(ctx, svc.Name) - <-c + storage.Delete(ctx, svc.Name) if len(fakeCloud.Calls) != 0 { t.Errorf("Unexpected call(s): %#v", fakeCloud.Calls) } @@ -295,8 +293,7 @@ func TestServiceRegistryDeleteExternal(t *testing.T) { }, } registry.CreateService(ctx, svc) - c, _ := storage.Delete(ctx, svc.Name) - <-c + storage.Delete(ctx, svc.Name) if len(fakeCloud.Calls) != 2 || fakeCloud.Calls[0] != "get-zone" || fakeCloud.Calls[1] != "delete" { t.Errorf("Unexpected call(s): %#v", fakeCloud.Calls) } @@ -413,9 +410,8 @@ func TestServiceRegistryIPAllocation(t *testing.T) { }, } ctx := api.NewDefaultContext() - c1, _ := rest.Create(ctx, svc1) - created_svc1 := <-c1 - created_service_1 := created_svc1.Object.(*api.Service) + created_svc1, _ := rest.Create(ctx, svc1) + created_service_1 := created_svc1.(*api.Service) if created_service_1.Name != "foo" { t.Errorf("Expected foo, but got %v", created_service_1.Name) } @@ -432,9 +428,8 @@ func TestServiceRegistryIPAllocation(t *testing.T) { SessionAffinity: api.AffinityTypeNone, }} ctx = api.NewDefaultContext() - c2, _ := rest.Create(ctx, svc2) - created_svc2 := <-c2 - created_service_2 := created_svc2.Object.(*api.Service) + created_svc2, _ := rest.Create(ctx, svc2) + created_service_2 := created_svc2.(*api.Service) if created_service_2.Name != "bar" { t.Errorf("Expected bar, but got %v", created_service_2.Name) } @@ -453,9 +448,8 @@ func TestServiceRegistryIPAllocation(t *testing.T) { }, } ctx = api.NewDefaultContext() - c3, _ := rest.Create(ctx, svc3) - created_svc3 := <-c3 - created_service_3 := created_svc3.Object.(*api.Service) + created_svc3, _ := rest.Create(ctx, svc3) + created_service_3 := created_svc3.(*api.Service) if created_service_3.Spec.PortalIP != "1.2.3.93" { // specific IP t.Errorf("Unexpected PortalIP: %s", created_service_3.Spec.PortalIP) } @@ -478,9 +472,8 @@ func TestServiceRegistryIPReallocation(t *testing.T) { }, } ctx := api.NewDefaultContext() - c1, _ := rest.Create(ctx, svc1) - created_svc1 := <-c1 - created_service_1 := created_svc1.Object.(*api.Service) + created_svc1, _ := rest.Create(ctx, svc1) + created_service_1 := created_svc1.(*api.Service) if created_service_1.Name != "foo" { t.Errorf("Expected foo, but got %v", created_service_1.Name) } @@ -488,8 +481,7 @@ func TestServiceRegistryIPReallocation(t *testing.T) { t.Errorf("Unexpected PortalIP: %s", created_service_1.Spec.PortalIP) } - c, _ := rest.Delete(ctx, created_service_1.Name) - <-c + rest.Delete(ctx, created_service_1.Name) svc2 := &api.Service{ ObjectMeta: api.ObjectMeta{Name: "bar"}, @@ -501,9 +493,8 @@ func TestServiceRegistryIPReallocation(t *testing.T) { }, } ctx = api.NewDefaultContext() - c2, _ := rest.Create(ctx, svc2) - created_svc2 := <-c2 - created_service_2 := created_svc2.Object.(*api.Service) + created_svc2, _ := rest.Create(ctx, svc2) + created_service_2 := created_svc2.(*api.Service) if created_service_2.Name != "bar" { t.Errorf("Expected bar, but got %v", created_service_2.Name) } @@ -529,9 +520,8 @@ func TestServiceRegistryIPUpdate(t *testing.T) { }, } ctx := api.NewDefaultContext() - c, _ := rest.Create(ctx, svc) - created_svc := <-c - created_service := created_svc.Object.(*api.Service) + created_svc, _ := rest.Create(ctx, svc) + created_service := created_svc.(*api.Service) if created_service.Spec.Port != 6502 { t.Errorf("Expected port 6502, but got %v", created_service.Spec.Port) } @@ -543,9 +533,8 @@ func TestServiceRegistryIPUpdate(t *testing.T) { *update = *created_service update.Spec.Port = 6503 - c, _ = rest.Update(ctx, update) - updated_svc := <-c - updated_service := updated_svc.Object.(*api.Service) + updated_svc, _, _ := rest.Update(ctx, update) + updated_service := updated_svc.(*api.Service) if updated_service.Spec.Port != 6503 { t.Errorf("Expected port 6503, but got %v", updated_service.Spec.Port) } @@ -554,7 +543,7 @@ func TestServiceRegistryIPUpdate(t *testing.T) { update.Spec.Port = 6503 update.Spec.PortalIP = "1.2.3.76" // error - _, err := rest.Update(ctx, update) + _, _, err := rest.Update(ctx, update) if err == nil || !errors.IsInvalid(err) { t.Error("Unexpected error type: %v", err) } @@ -578,9 +567,8 @@ func TestServiceRegistryIPExternalLoadBalancer(t *testing.T) { }, } ctx := api.NewDefaultContext() - c, _ := rest.Create(ctx, svc) - created_svc := <-c - created_service := created_svc.Object.(*api.Service) + created_svc, _ := rest.Create(ctx, svc) + created_service := created_svc.(*api.Service) if created_service.Spec.Port != 6502 { t.Errorf("Expected port 6502, but got %v", created_service.Spec.Port) } @@ -591,7 +579,7 @@ func TestServiceRegistryIPExternalLoadBalancer(t *testing.T) { update := new(api.Service) *update = *created_service - _, err := rest.Update(ctx, update) + _, _, err := rest.Update(ctx, update) if err != nil { t.Errorf("Unexpected error %v", err) } @@ -614,8 +602,7 @@ func TestServiceRegistryIPReloadFromStorage(t *testing.T) { }, } ctx := api.NewDefaultContext() - c, _ := rest1.Create(ctx, svc) - <-c + rest1.Create(ctx, svc) svc = &api.Service{ ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: api.NamespaceDefault}, Spec: api.ServiceSpec{ @@ -625,8 +612,7 @@ func TestServiceRegistryIPReloadFromStorage(t *testing.T) { SessionAffinity: api.AffinityTypeNone, }, } - c, _ = rest1.Create(ctx, svc) - <-c + rest1.Create(ctx, svc) // This will reload from storage, finding the previous 2 rest2 := NewREST(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{}), makeIPNet(t)) @@ -641,9 +627,8 @@ func TestServiceRegistryIPReloadFromStorage(t *testing.T) { SessionAffinity: api.AffinityTypeNone, }, } - c, _ = rest2.Create(ctx, svc) - created_svc := <-c - created_service := created_svc.Object.(*api.Service) + created_svc, _ := rest2.Create(ctx, svc) + created_service := created_svc.(*api.Service) if created_service.Spec.PortalIP != "1.2.3.3" { t.Errorf("Unexpected PortalIP: %s", created_service.Spec.PortalIP) } @@ -657,9 +642,9 @@ func TestCreateServiceWithConflictingNamespace(t *testing.T) { } ctx := api.NewDefaultContext() - channel, err := storage.Create(ctx, service) - if channel != nil { - t.Error("Expected a nil channel, but we got a value") + obj, err := storage.Create(ctx, service) + if obj != nil { + t.Error("Expected a nil object, but we got a value") } if err == nil { t.Errorf("Expected an error, but we didn't get one") @@ -675,9 +660,9 @@ func TestUpdateServiceWithConflictingNamespace(t *testing.T) { } ctx := api.NewDefaultContext() - channel, err := storage.Update(ctx, service) - if channel != nil { - t.Error("Expected a nil channel, but we got a value") + obj, created, err := storage.Update(ctx, service) + if obj != nil || created { + t.Error("Expected a nil object, but we got a value or created was true") } if err == nil { t.Errorf("Expected an error, but we didn't get one") diff --git a/test/integration/auth_test.go b/test/integration/auth_test.go index 397c2c4b020..4f761589bd9 100644 --- a/test/integration/auth_test.go +++ b/test/integration/auth_test.go @@ -78,6 +78,20 @@ var aPod string = ` }%s } ` +var aPodInBar string = ` +{ + "kind": "Pod", + "apiVersion": "v1beta1", + "id": "a", + "desiredState": { + "manifest": { + "version": "v1beta1", + "id": "a", + "containers": [{ "name": "foo", "image": "bar/foo", }] + } + }%s +} +` var aRC string = ` { "kind": "ReplicationController", @@ -126,7 +140,6 @@ var aEvent string = ` { "kind": "Event", "apiVersion": "v1beta1", - "namespace": "default", "id": "a", "involvedObject": { "kind": "Minion", @@ -162,6 +175,7 @@ var timeoutFlag = "?timeout=60s" // Requests to try. Each one should be forbidden or not forbidden // depending on the authentication and authorization setup of the master. var code200 = map[int]bool{200: true} +var code201 = map[int]bool{201: true} var code400 = map[int]bool{400: true} var code403 = map[int]bool{403: true} var code404 = map[int]bool{404: true} @@ -184,7 +198,7 @@ func getTestRequests() []struct { }{ // Normal methods on pods {"GET", "/api/v1beta1/pods", "", code200}, - {"POST", "/api/v1beta1/pods" + timeoutFlag, aPod, code200}, + {"POST", "/api/v1beta1/pods" + timeoutFlag, aPod, code201}, {"PUT", "/api/v1beta1/pods/a" + timeoutFlag, aPod, code200}, {"GET", "/api/v1beta1/pods", "", code200}, {"GET", "/api/v1beta1/pods/a", "", code200}, @@ -204,7 +218,7 @@ func getTestRequests() []struct { // Normal methods on services {"GET", "/api/v1beta1/services", "", code200}, - {"POST", "/api/v1beta1/services" + timeoutFlag, aService, code200}, + {"POST", "/api/v1beta1/services" + timeoutFlag, aService, code201}, {"PUT", "/api/v1beta1/services/a" + timeoutFlag, aService, code200}, {"GET", "/api/v1beta1/services", "", code200}, {"GET", "/api/v1beta1/services/a", "", code200}, @@ -212,7 +226,7 @@ func getTestRequests() []struct { // Normal methods on replicationControllers {"GET", "/api/v1beta1/replicationControllers", "", code200}, - {"POST", "/api/v1beta1/replicationControllers" + timeoutFlag, aRC, code200}, + {"POST", "/api/v1beta1/replicationControllers" + timeoutFlag, aRC, code201}, {"PUT", "/api/v1beta1/replicationControllers/a" + timeoutFlag, aRC, code200}, {"GET", "/api/v1beta1/replicationControllers", "", code200}, {"GET", "/api/v1beta1/replicationControllers/a", "", code200}, @@ -220,7 +234,7 @@ func getTestRequests() []struct { // Normal methods on endpoints {"GET", "/api/v1beta1/endpoints", "", code200}, - {"POST", "/api/v1beta1/endpoints" + timeoutFlag, aEndpoints, code200}, + {"POST", "/api/v1beta1/endpoints" + timeoutFlag, aEndpoints, code201}, {"PUT", "/api/v1beta1/endpoints/a" + timeoutFlag, aEndpoints, code200}, {"GET", "/api/v1beta1/endpoints", "", code200}, {"GET", "/api/v1beta1/endpoints/a", "", code200}, @@ -228,7 +242,7 @@ func getTestRequests() []struct { // Normal methods on minions {"GET", "/api/v1beta1/minions", "", code200}, - {"POST", "/api/v1beta1/minions" + timeoutFlag, aMinion, code200}, + {"POST", "/api/v1beta1/minions" + timeoutFlag, aMinion, code201}, {"PUT", "/api/v1beta1/minions/a" + timeoutFlag, aMinion, code409}, // See #2115 about why 409 {"GET", "/api/v1beta1/minions", "", code200}, {"GET", "/api/v1beta1/minions/a", "", code200}, @@ -236,7 +250,7 @@ func getTestRequests() []struct { // Normal methods on events {"GET", "/api/v1beta1/events", "", code200}, - {"POST", "/api/v1beta1/events" + timeoutFlag, aEvent, code200}, + {"POST", "/api/v1beta1/events" + timeoutFlag, aEvent, code201}, {"PUT", "/api/v1beta1/events/a" + timeoutFlag, aEvent, code200}, {"GET", "/api/v1beta1/events", "", code200}, {"GET", "/api/v1beta1/events", "", code200}, @@ -245,8 +259,8 @@ func getTestRequests() []struct { // Normal methods on bindings {"GET", "/api/v1beta1/bindings", "", code405}, // Bindings are write-only - {"POST", "/api/v1beta1/pods" + timeoutFlag, aPod, code200}, // Need a pod to bind or you get a 404 - {"POST", "/api/v1beta1/bindings" + timeoutFlag, aBinding, code200}, + {"POST", "/api/v1beta1/pods" + timeoutFlag, aPod, code201}, // Need a pod to bind or you get a 404 + {"POST", "/api/v1beta1/bindings" + timeoutFlag, aBinding, code201}, {"PUT", "/api/v1beta1/bindings/a" + timeoutFlag, aBinding, code404}, {"GET", "/api/v1beta1/bindings", "", code405}, {"GET", "/api/v1beta1/bindings/a", "", code404}, // No bindings instances @@ -316,14 +330,16 @@ func TestAuthModeAlwaysAllow(t *testing.T) { t.Logf("case %v", r) var bodyStr string if r.body != "" { - bodyStr = fmt.Sprintf(r.body, "") + sub := "" if r.verb == "PUT" && r.body != "" { // For update operations, insert previous resource version if resVersion := previousResourceVersion[getPreviousResourceVersionKey(r.URL, "")]; resVersion != 0 { - resourceVersionJson := fmt.Sprintf(",\r\n\"resourceVersion\": %v", resVersion) - bodyStr = fmt.Sprintf(r.body, resourceVersionJson) + sub += fmt.Sprintf(",\r\n\"resourceVersion\": %v", resVersion) } + namespace := "default" + sub += fmt.Sprintf(",\r\n\"namespace\": %v", namespace) } + bodyStr = fmt.Sprintf(r.body, sub) } bodyBytes := bytes.NewReader([]byte(bodyStr)) req, err := http.NewRequest(r.verb, s.URL+r.URL, bodyBytes) @@ -483,14 +499,16 @@ func TestAliceNotForbiddenOrUnauthorized(t *testing.T) { t.Logf("case %v", r) var bodyStr string if r.body != "" { - bodyStr = fmt.Sprintf(r.body, "") + sub := "" if r.verb == "PUT" && r.body != "" { // For update operations, insert previous resource version if resVersion := previousResourceVersion[getPreviousResourceVersionKey(r.URL, "")]; resVersion != 0 { - resourceVersionJson := fmt.Sprintf(",\r\n\"resourceVersion\": %v", resVersion) - bodyStr = fmt.Sprintf(r.body, resourceVersionJson) + sub += fmt.Sprintf(",\r\n\"resourceVersion\": %v", resVersion) } + namespace := "default" + sub += fmt.Sprintf(",\r\n\"namespace\": %v", namespace) } + bodyStr = fmt.Sprintf(r.body, sub) } bodyBytes := bytes.NewReader([]byte(bodyStr)) req, err := http.NewRequest(r.verb, s.URL+r.URL, bodyBytes) @@ -705,24 +723,25 @@ func TestNamespaceAuthorization(t *testing.T) { requests := []struct { verb string URL string + namespace string body string statusCodes map[int]bool // allowed status codes. }{ - {"POST", "/api/v1beta1/pods" + timeoutFlag + "&namespace=foo", aPod, code200}, - {"GET", "/api/v1beta1/pods?namespace=foo", "", code200}, - {"GET", "/api/v1beta1/pods/a?namespace=foo", "", code200}, - {"DELETE", "/api/v1beta1/pods/a" + timeoutFlag + "&namespace=foo", "", code200}, + {"POST", "/api/v1beta1/pods" + timeoutFlag + "&namespace=foo", "foo", aPod, code201}, + {"GET", "/api/v1beta1/pods?namespace=foo", "foo", "", code200}, + {"GET", "/api/v1beta1/pods/a?namespace=foo", "foo", "", code200}, + {"DELETE", "/api/v1beta1/pods/a" + timeoutFlag + "&namespace=foo", "foo", "", code200}, - {"POST", "/api/v1beta1/pods" + timeoutFlag + "&namespace=bar", aPod, code403}, - {"GET", "/api/v1beta1/pods?namespace=bar", "", code403}, - {"GET", "/api/v1beta1/pods/a?namespace=bar", "", code403}, - {"DELETE", "/api/v1beta1/pods/a" + timeoutFlag + "&namespace=bar", "", code403}, + {"POST", "/api/v1beta1/pods" + timeoutFlag + "&namespace=bar", "bar", aPod, code403}, + {"GET", "/api/v1beta1/pods?namespace=bar", "bar", "", code403}, + {"GET", "/api/v1beta1/pods/a?namespace=bar", "bar", "", code403}, + {"DELETE", "/api/v1beta1/pods/a" + timeoutFlag + "&namespace=bar", "bar", "", code403}, - {"POST", "/api/v1beta1/pods" + timeoutFlag, aPod, code403}, - {"GET", "/api/v1beta1/pods", "", code403}, - {"GET", "/api/v1beta1/pods/a", "", code403}, - {"DELETE", "/api/v1beta1/pods/a" + timeoutFlag, "", code403}, + {"POST", "/api/v1beta1/pods" + timeoutFlag, "", aPod, code403}, + {"GET", "/api/v1beta1/pods", "", "", code403}, + {"GET", "/api/v1beta1/pods/a", "", "", code403}, + {"DELETE", "/api/v1beta1/pods/a" + timeoutFlag, "", "", code403}, } for _, r := range requests { @@ -730,14 +749,19 @@ func TestNamespaceAuthorization(t *testing.T) { t.Logf("case %v", r) var bodyStr string if r.body != "" { - bodyStr = fmt.Sprintf(r.body, "") + sub := "" if r.verb == "PUT" && r.body != "" { // For update operations, insert previous resource version if resVersion := previousResourceVersion[getPreviousResourceVersionKey(r.URL, "")]; resVersion != 0 { - resourceVersionJson := fmt.Sprintf(",\r\n\"resourceVersion\": %v", resVersion) - bodyStr = fmt.Sprintf(r.body, resourceVersionJson) + sub += fmt.Sprintf(",\r\n\"resourceVersion\": %v", resVersion) } + namespace := r.namespace + if len(namespace) == 0 { + namespace = "default" + } + sub += fmt.Sprintf(",\r\n\"namespace\": %v", namespace) } + bodyStr = fmt.Sprintf(r.body, sub) } bodyBytes := bytes.NewReader([]byte(bodyStr)) req, err := http.NewRequest(r.verb, s.URL+r.URL, bodyBytes) @@ -815,7 +839,7 @@ func TestKindAuthorization(t *testing.T) { body string statusCodes map[int]bool // allowed status codes. }{ - {"POST", "/api/v1beta1/services" + timeoutFlag, aService, code200}, + {"POST", "/api/v1beta1/services" + timeoutFlag, aService, code201}, {"GET", "/api/v1beta1/services", "", code200}, {"GET", "/api/v1beta1/services/a", "", code200}, {"DELETE", "/api/v1beta1/services/a" + timeoutFlag, "", code200},