Merge pull request #38875 from sttts/sttts-move-pkg-apiserver

Automatic merge from submit-queue

pkg/apiserver: split up monolithic package

**Based on** https://github.com/kubernetes/kubernetes/pull/38191

This is a first step to integrate pkg/apiserver with pkg/genericapiserver into a common package structure. For this
- pkg/apiserver is cleaned up from code which does not belong there, 
- split up into pkg/apiserver, pkg/apiserver/handlers{,/negotation,/helpers,/errors}
This commit is contained in:
Kubernetes Submit Queue 2016-12-19 17:26:55 -08:00 committed by GitHub
commit e3a4a3675b
46 changed files with 1152 additions and 820 deletions

View File

@ -30,8 +30,8 @@ go_library(
"//pkg/api/errors:go_default_library",
"//pkg/api/rest:go_default_library",
"//pkg/apis/meta/v1:go_default_library",
"//pkg/apiserver:go_default_library",
"//pkg/apiserver/filters:go_default_library",
"//pkg/apiserver/handlers/responsewriters:go_default_library",
"//pkg/auth/handlers:go_default_library",
"//pkg/client/cache:go_default_library",
"//pkg/client/clientset_generated/clientset:go_default_library",

View File

@ -23,7 +23,7 @@ import (
"k8s.io/kubernetes/pkg/api"
apierrors "k8s.io/kubernetes/pkg/api/errors"
metav1 "k8s.io/kubernetes/pkg/apis/meta/v1"
"k8s.io/kubernetes/pkg/apiserver"
"k8s.io/kubernetes/pkg/apiserver/handlers/responsewriters"
v1listers "k8s.io/kubernetes/pkg/client/listers/core/v1"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/runtime"
@ -187,7 +187,7 @@ func (r *apiGroupHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
apiServices, err := r.lister.List(labels.Everything())
if statusErr, ok := err.(*apierrors.StatusError); ok && err != nil {
apiserver.WriteRawJSON(int(statusErr.Status().Code), statusErr.Status(), w)
responsewriters.WriteRawJSON(int(statusErr.Status().Code), statusErr.Status(), w)
return
}
if err != nil {

View File

@ -22,7 +22,7 @@ import (
"sync"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/apiserver"
"k8s.io/kubernetes/pkg/apiserver/handlers/responsewriters"
"k8s.io/kubernetes/pkg/client/restclient"
"k8s.io/kubernetes/pkg/client/transport"
genericrest "k8s.io/kubernetes/pkg/registry/generic/rest"
@ -154,7 +154,7 @@ type responder struct {
// TODO this should properly handle content type negotiation
// if the caller asked for protobuf and you write JSON bad things happen.
func (r *responder) Object(statusCode int, obj runtime.Object) {
apiserver.WriteRawJSON(statusCode, obj, r.w)
responsewriters.WriteRawJSON(statusCode, obj, r.w)
}
func (r *responder) Error(err error) {

View File

@ -26,7 +26,7 @@ import (
"testing"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/apiserver/request"
apiserverrequest "k8s.io/kubernetes/pkg/apiserver/request"
"k8s.io/kubernetes/pkg/auth/user"
"k8s.io/kubernetes/pkg/util/sets"
@ -62,13 +62,13 @@ func (m *fakeRequestContextMapper) Get(req *http.Request) (api.Context, bool) {
ctx = api.WithUser(ctx, m.user)
}
resolver := &request.RequestInfoFactory{
resolver := &apiserverrequest.RequestInfoFactory{
APIPrefixes: sets.NewString("api", "apis"),
GrouplessAPIPrefixes: sets.NewString("api"),
}
info, err := resolver.NewRequestInfo(req)
if err == nil {
ctx = request.WithRequestInfo(ctx, info)
ctx = apiserverrequest.WithRequestInfo(ctx, info)
}
return ctx, true

View File

@ -95,6 +95,7 @@ pkg/apis/rbac/install
pkg/apis/storage/install
pkg/apis/storage/validation
pkg/apiserver/audit
pkg/apiserver/handlers/responsewriters
pkg/apiserver/openapi
pkg/auth/authenticator
pkg/auth/authorizer/union

View File

@ -11,15 +11,11 @@ load(
go_library(
name = "go_default_library",
srcs = [
"api_installer.go",
"apiserver.go",
"discovery.go",
"doc.go",
"errors.go",
"negotiate.go",
"proxy.go",
"resthandler.go",
"serviceerror.go",
"watch.go",
"groupversion.go",
"installer.go",
],
tags = ["automanaged"],
deps = [
@ -30,46 +26,25 @@ go_library(
"//pkg/api/rest:go_default_library",
"//pkg/apis/extensions:go_default_library",
"//pkg/apis/meta/v1:go_default_library",
"//pkg/apiserver/handlers:go_default_library",
"//pkg/apiserver/handlers/negotiation:go_default_library",
"//pkg/apiserver/handlers/responsewriters:go_default_library",
"//pkg/apiserver/metrics:go_default_library",
"//pkg/apiserver/request:go_default_library",
"//pkg/conversion:go_default_library",
"//pkg/fields:go_default_library",
"//pkg/httplog:go_default_library",
"//pkg/runtime:go_default_library",
"//pkg/runtime/schema:go_default_library",
"//pkg/runtime/serializer/streaming:go_default_library",
"//pkg/storage:go_default_library",
"//pkg/util:go_default_library",
"//pkg/util/errors:go_default_library",
"//pkg/util/flushwriter:go_default_library",
"//pkg/util/httpstream:go_default_library",
"//pkg/util/net:go_default_library",
"//pkg/util/proxy:go_default_library",
"//pkg/util/runtime:go_default_library",
"//pkg/util/sets:go_default_library",
"//pkg/util/strategicpatch:go_default_library",
"//pkg/util/strings:go_default_library",
"//pkg/util/version:go_default_library",
"//pkg/util/wsstream:go_default_library",
"//pkg/watch:go_default_library",
"//pkg/watch/versioned:go_default_library",
"//vendor:bitbucket.org/ww/goautoneg",
"//vendor:github.com/emicklei/go-restful",
"//vendor:github.com/evanphx/json-patch",
"//vendor:github.com/golang/glog",
"//vendor:golang.org/x/net/websocket",
],
)
go_test(
name = "go_default_test",
srcs = [
"api_installer_test.go",
"apiserver_test.go",
"errors_test.go",
"negotiate_test.go",
"installer_test.go",
"proxy_test.go",
"resthandler_test.go",
"watch_test.go",
],
library = "go_default_library",
@ -80,11 +55,12 @@ go_test(
"//pkg/api/errors:go_default_library",
"//pkg/api/meta:go_default_library",
"//pkg/api/rest:go_default_library",
"//pkg/api/testapi:go_default_library",
"//pkg/api/testing:go_default_library",
"//pkg/api/v1:go_default_library",
"//pkg/apis/meta/v1:go_default_library",
"//pkg/apiserver/filters:go_default_library",
"//pkg/apiserver/handlers:go_default_library",
"//pkg/apiserver/handlers/responsewriters:go_default_library",
"//pkg/apiserver/request:go_default_library",
"//pkg/apiserver/testing:go_default_library",
"//pkg/fields:go_default_library",
@ -92,18 +68,15 @@ go_test(
"//pkg/runtime:go_default_library",
"//pkg/runtime/schema:go_default_library",
"//pkg/runtime/serializer/streaming:go_default_library",
"//pkg/types:go_default_library",
"//pkg/util/diff:go_default_library",
"//pkg/util/net:go_default_library",
"//pkg/util/sets:go_default_library",
"//pkg/util/strategicpatch:go_default_library",
"//pkg/util/wait:go_default_library",
"//pkg/watch:go_default_library",
"//pkg/watch/versioned:go_default_library",
"//plugin/pkg/admission/admit:go_default_library",
"//plugin/pkg/admission/deny:go_default_library",
"//vendor:github.com/emicklei/go-restful",
"//vendor:github.com/evanphx/json-patch",
"//vendor:golang.org/x/net/websocket",
],
)

View File

@ -17,502 +17,9 @@ limitations under the License.
package apiserver
import (
"bytes"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"path"
rt "runtime"
"strconv"
"strings"
"time"
"k8s.io/kubernetes/pkg/admission"
"k8s.io/kubernetes/pkg/api"
apierrors "k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/meta"
"k8s.io/kubernetes/pkg/api/rest"
metav1 "k8s.io/kubernetes/pkg/apis/meta/v1"
"k8s.io/kubernetes/pkg/apiserver/metrics"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/runtime/schema"
utilerrors "k8s.io/kubernetes/pkg/util/errors"
"k8s.io/kubernetes/pkg/util/flushwriter"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/sets"
utilversion "k8s.io/kubernetes/pkg/util/version"
"k8s.io/kubernetes/pkg/util/wsstream"
"github.com/emicklei/go-restful"
"github.com/golang/glog"
)
func init() {
metrics.Register()
}
type APIResourceLister interface {
ListAPIResources() []metav1.APIResource
}
// APIGroupVersion is a helper for exposing rest.Storage objects as http.Handlers via go-restful
// It handles URLs of the form:
// /${storage_key}[/${object_name}]
// Where 'storage_key' points to a rest.Storage object stored in storage.
// This object should contain all parameterization necessary for running a particular API version
type APIGroupVersion struct {
Storage map[string]rest.Storage
Root string
// GroupVersion is the external group version
GroupVersion schema.GroupVersion
// OptionsExternalVersion controls the Kubernetes APIVersion used for common objects in the apiserver
// schema like api.Status, api.DeleteOptions, and api.ListOptions. Other implementors may
// define a version "v1beta1" but want to use the Kubernetes "v1" internal objects. If
// empty, defaults to GroupVersion.
OptionsExternalVersion *schema.GroupVersion
Mapper meta.RESTMapper
// Serializer is used to determine how to convert responses from API methods into bytes to send over
// the wire.
Serializer runtime.NegotiatedSerializer
ParameterCodec runtime.ParameterCodec
Typer runtime.ObjectTyper
Creater runtime.ObjectCreater
Convertor runtime.ObjectConvertor
Copier runtime.ObjectCopier
Linker runtime.SelfLinker
Admit admission.Interface
Context api.RequestContextMapper
MinRequestTimeout time.Duration
// SubresourceGroupVersionKind contains the GroupVersionKind overrides for each subresource that is
// accessible from this API group version. The GroupVersionKind is that of the external version of
// the subresource. The key of this map should be the path of the subresource. The keys here should
// match the keys in the Storage map above for subresources.
SubresourceGroupVersionKind map[string]schema.GroupVersionKind
// ResourceLister is an interface that knows how to list resources
// for this API Group.
ResourceLister APIResourceLister
}
// staticLister implements the APIResourceLister interface
type staticLister struct {
list []metav1.APIResource
}
func (s staticLister) ListAPIResources() []metav1.APIResource {
return s.list
}
var _ APIResourceLister = &staticLister{}
// InstallREST registers the REST handlers (storage, watch, proxy and redirect) into a restful Container.
// It is expected that the provided path root prefix will serve all operations. Root MUST NOT end
// in a slash.
func (g *APIGroupVersion) InstallREST(container *restful.Container) error {
installer := g.newInstaller()
ws := installer.NewWebService()
apiResources, registrationErrors := installer.Install(ws)
lister := g.ResourceLister
if lister == nil {
lister = staticLister{apiResources}
}
AddSupportedResourcesWebService(g.Serializer, ws, g.GroupVersion, lister)
container.Add(ws)
return utilerrors.NewAggregate(registrationErrors)
}
// UpdateREST registers the REST handlers for this APIGroupVersion to an existing web service
// in the restful Container. It will use the prefix (root/version) to find the existing
// web service. If a web service does not exist within the container to support the prefix
// this method will return an error.
func (g *APIGroupVersion) UpdateREST(container *restful.Container) error {
installer := g.newInstaller()
var ws *restful.WebService = nil
for i, s := range container.RegisteredWebServices() {
if s.RootPath() == installer.prefix {
ws = container.RegisteredWebServices()[i]
break
}
}
if ws == nil {
return apierrors.NewInternalError(fmt.Errorf("unable to find an existing webservice for prefix %s", installer.prefix))
}
apiResources, registrationErrors := installer.Install(ws)
lister := g.ResourceLister
if lister == nil {
lister = staticLister{apiResources}
}
AddSupportedResourcesWebService(g.Serializer, ws, g.GroupVersion, lister)
return utilerrors.NewAggregate(registrationErrors)
}
// newInstaller is a helper to create the installer. Used by InstallREST and UpdateREST.
func (g *APIGroupVersion) newInstaller() *APIInstaller {
prefix := path.Join(g.Root, g.GroupVersion.Group, g.GroupVersion.Version)
installer := &APIInstaller{
group: g,
prefix: prefix,
minRequestTimeout: g.MinRequestTimeout,
}
return installer
}
// TODO: needs to perform response type negotiation, this is probably the wrong way to recover panics
func InstallRecoverHandler(s runtime.NegotiatedSerializer, container *restful.Container) {
container.RecoverHandler(func(panicReason interface{}, httpWriter http.ResponseWriter) {
logStackOnRecover(s, panicReason, httpWriter)
})
}
//TODO: Unify with RecoverPanics?
func logStackOnRecover(s runtime.NegotiatedSerializer, panicReason interface{}, w http.ResponseWriter) {
var buffer bytes.Buffer
buffer.WriteString(fmt.Sprintf("recover from panic situation: - %v\r\n", panicReason))
for i := 2; ; i += 1 {
_, file, line, ok := rt.Caller(i)
if !ok {
break
}
buffer.WriteString(fmt.Sprintf(" %s:%d\r\n", file, line))
}
glog.Errorln(buffer.String())
headers := http.Header{}
if ct := w.Header().Get("Content-Type"); len(ct) > 0 {
headers.Set("Accept", ct)
}
errorNegotiated(apierrors.NewGenericServerResponse(http.StatusInternalServerError, "", api.Resource(""), "", "", 0, false), s, schema.GroupVersion{}, w, &http.Request{Header: headers})
}
// Adds a service to return the supported api versions at the legacy /api.
func AddApiWebService(s runtime.NegotiatedSerializer, container *restful.Container, apiPrefix string, getAPIVersionsFunc func(req *restful.Request) *metav1.APIVersions) {
// TODO: InstallREST should register each version automatically
// Because in release 1.1, /api returns response with empty APIVersion, we
// use StripVersionNegotiatedSerializer to keep the response backwards
// compatible.
mediaTypes, _ := mediaTypesForSerializer(s)
ss := StripVersionNegotiatedSerializer{s}
versionHandler := APIVersionHandler(ss, getAPIVersionsFunc)
ws := new(restful.WebService)
ws.Path(apiPrefix)
ws.Doc("get available API versions")
ws.Route(ws.GET("/").To(versionHandler).
Doc("get available API versions").
Operation("getAPIVersions").
Produces(mediaTypes...).
Consumes(mediaTypes...).
Writes(metav1.APIVersions{}))
container.Add(ws)
}
// stripVersionEncoder strips APIVersion field from the encoding output. It's
// used to keep the responses at the discovery endpoints backward compatible
// with release-1.1, when the responses have empty APIVersion.
type stripVersionEncoder struct {
encoder runtime.Encoder
serializer runtime.Serializer
}
func (c stripVersionEncoder) Encode(obj runtime.Object, w io.Writer) error {
buf := bytes.NewBuffer([]byte{})
err := c.encoder.Encode(obj, buf)
if err != nil {
return err
}
roundTrippedObj, gvk, err := c.serializer.Decode(buf.Bytes(), nil, nil)
if err != nil {
return err
}
gvk.Group = ""
gvk.Version = ""
roundTrippedObj.GetObjectKind().SetGroupVersionKind(*gvk)
return c.serializer.Encode(roundTrippedObj, w)
}
// StripVersionNegotiatedSerializer will return stripVersionEncoder when
// EncoderForVersion is called. See comments for stripVersionEncoder.
type StripVersionNegotiatedSerializer struct {
runtime.NegotiatedSerializer
}
func (n StripVersionNegotiatedSerializer) EncoderForVersion(encoder runtime.Encoder, gv runtime.GroupVersioner) runtime.Encoder {
serializer, ok := encoder.(runtime.Serializer)
if !ok {
// The stripVersionEncoder needs both an encoder and decoder, but is called from a context that doesn't have access to the
// decoder. We do a best effort cast here (since this code path is only for backwards compatibility) to get access to the caller's
// decoder.
panic(fmt.Sprintf("Unable to extract serializer from %#v", encoder))
}
versioned := n.NegotiatedSerializer.EncoderForVersion(encoder, gv)
return stripVersionEncoder{versioned, serializer}
}
func keepUnversioned(group string) bool {
return group == "" || group == "extensions"
}
// NewApisWebService returns a webservice serving the available api version under /apis.
func NewApisWebService(s runtime.NegotiatedSerializer, apiPrefix string, f func(req *restful.Request) []metav1.APIGroup) *restful.WebService {
// Because in release 1.1, /apis returns response with empty APIVersion, we
// use StripVersionNegotiatedSerializer to keep the response backwards
// compatible.
ss := StripVersionNegotiatedSerializer{s}
mediaTypes, _ := mediaTypesForSerializer(s)
rootAPIHandler := RootAPIHandler(ss, f)
ws := new(restful.WebService)
ws.Path(apiPrefix)
ws.Doc("get available API versions")
ws.Route(ws.GET("/").To(rootAPIHandler).
Doc("get available API versions").
Operation("getAPIVersions").
Produces(mediaTypes...).
Consumes(mediaTypes...).
Writes(metav1.APIGroupList{}))
return ws
}
// NewGroupWebService returns a webservice serving the supported versions, preferred version, and name
// of a group. E.g., such a web service will be registered at /apis/extensions.
func NewGroupWebService(s runtime.NegotiatedSerializer, path string, group metav1.APIGroup) *restful.WebService {
ss := s
if keepUnversioned(group.Name) {
// Because in release 1.1, /apis/extensions returns response with empty
// APIVersion, we use StripVersionNegotiatedSerializer to keep the
// response backwards compatible.
ss = StripVersionNegotiatedSerializer{s}
}
mediaTypes, _ := mediaTypesForSerializer(s)
groupHandler := GroupHandler(ss, group)
ws := new(restful.WebService)
ws.Path(path)
ws.Doc("get information of a group")
ws.Route(ws.GET("/").To(groupHandler).
Doc("get information of a group").
Operation("getAPIGroup").
Produces(mediaTypes...).
Consumes(mediaTypes...).
Writes(metav1.APIGroup{}))
return ws
}
// Adds a service to return the supported resources, E.g., a such web service
// will be registered at /apis/extensions/v1.
func AddSupportedResourcesWebService(s runtime.NegotiatedSerializer, ws *restful.WebService, groupVersion schema.GroupVersion, lister APIResourceLister) {
ss := s
if keepUnversioned(groupVersion.Group) {
// Because in release 1.1, /apis/extensions/v1beta1 returns response
// with empty APIVersion, we use StripVersionNegotiatedSerializer to
// keep the response backwards compatible.
ss = StripVersionNegotiatedSerializer{s}
}
mediaTypes, _ := mediaTypesForSerializer(s)
resourceHandler := SupportedResourcesHandler(ss, groupVersion, lister)
ws.Route(ws.GET("/").To(resourceHandler).
Doc("get available resources").
Operation("getAPIResources").
Produces(mediaTypes...).
Consumes(mediaTypes...).
Writes(metav1.APIResourceList{}))
}
// APIVersionHandler returns a handler which will list the provided versions as available.
func APIVersionHandler(s runtime.NegotiatedSerializer, getAPIVersionsFunc func(req *restful.Request) *metav1.APIVersions) restful.RouteFunction {
return func(req *restful.Request, resp *restful.Response) {
writeNegotiated(s, schema.GroupVersion{}, resp.ResponseWriter, req.Request, http.StatusOK, getAPIVersionsFunc(req))
}
}
// TODO: Remove in 1.6. Returns if kubectl is older than v1.5.0
func isOldKubectl(userAgent string) bool {
// example userAgent string: kubectl-1.3/v1.3.8 (linux/amd64) kubernetes/e328d5b
if !strings.Contains(userAgent, "kubectl") {
return false
}
userAgent = strings.Split(userAgent, " ")[0]
subs := strings.Split(userAgent, "/")
if len(subs) != 2 {
return false
}
kubectlVersion, versionErr := utilversion.ParseSemantic(subs[1])
if versionErr != nil {
return false
}
return kubectlVersion.LessThan(utilversion.MustParseSemantic("v1.5.0"))
}
// TODO: Remove in 1.6. This is for backward compatibility with 1.4 kubectl.
// See https://github.com/kubernetes/kubernetes/issues/35791
var groupsWithNewVersionsIn1_5 = sets.NewString("apps", "policy")
// TODO: Remove in 1.6.
func filterAPIGroups(req *restful.Request, groups []metav1.APIGroup) []metav1.APIGroup {
if !isOldKubectl(req.HeaderParameter("User-Agent")) {
return groups
}
// hide API group that has new versions added in 1.5.
var ret []metav1.APIGroup
for _, group := range groups {
if groupsWithNewVersionsIn1_5.Has(group.Name) {
continue
}
ret = append(ret, group)
}
return ret
}
// RootAPIHandler returns a handler which will list the provided groups and versions as available.
func RootAPIHandler(s runtime.NegotiatedSerializer, f func(req *restful.Request) []metav1.APIGroup) restful.RouteFunction {
return func(req *restful.Request, resp *restful.Response) {
writeNegotiated(s, schema.GroupVersion{}, resp.ResponseWriter, req.Request, http.StatusOK, &metav1.APIGroupList{Groups: filterAPIGroups(req, f(req))})
}
}
// GroupHandler returns a handler which will return the api.GroupAndVersion of
// the group.
func GroupHandler(s runtime.NegotiatedSerializer, group metav1.APIGroup) restful.RouteFunction {
return func(req *restful.Request, resp *restful.Response) {
writeNegotiated(s, schema.GroupVersion{}, resp.ResponseWriter, req.Request, http.StatusOK, &group)
}
}
// SupportedResourcesHandler returns a handler which will list the provided resources as available.
func SupportedResourcesHandler(s runtime.NegotiatedSerializer, groupVersion schema.GroupVersion, lister APIResourceLister) restful.RouteFunction {
return func(req *restful.Request, resp *restful.Response) {
writeNegotiated(s, schema.GroupVersion{}, resp.ResponseWriter, req.Request, http.StatusOK, &metav1.APIResourceList{GroupVersion: groupVersion.String(), APIResources: lister.ListAPIResources()})
}
}
// write renders a returned runtime.Object to the response as a stream or an encoded object. If the object
// returned by the response implements rest.ResourceStreamer that interface will be used to render the
// response. The Accept header and current API version will be passed in, and the output will be copied
// directly to the response body. If content type is returned it is used, otherwise the content type will
// be "application/octet-stream". All other objects are sent to standard JSON serialization.
func write(statusCode int, gv schema.GroupVersion, s runtime.NegotiatedSerializer, object runtime.Object, w http.ResponseWriter, req *http.Request) {
stream, ok := object.(rest.ResourceStreamer)
if !ok {
writeNegotiated(s, gv, w, req, statusCode, object)
return
}
out, flush, contentType, err := stream.InputStream(gv.String(), req.Header.Get("Accept"))
if err != nil {
errorNegotiated(err, s, gv, w, req)
return
}
if out == nil {
// No output provided - return StatusNoContent
w.WriteHeader(http.StatusNoContent)
return
}
defer out.Close()
if wsstream.IsWebSocketRequest(req) {
r := wsstream.NewReader(out, true, wsstream.NewDefaultReaderProtocols())
if err := r.Copy(w, req); err != nil {
utilruntime.HandleError(fmt.Errorf("error encountered while streaming results via websocket: %v", err))
}
return
}
if len(contentType) == 0 {
contentType = "application/octet-stream"
}
w.Header().Set("Content-Type", contentType)
w.WriteHeader(statusCode)
writer := w.(io.Writer)
if flush {
writer = flushwriter.Wrap(w)
}
io.Copy(writer, out)
}
// writeNegotiated renders an object in the content type negotiated by the client
func writeNegotiated(s runtime.NegotiatedSerializer, gv schema.GroupVersion, w http.ResponseWriter, req *http.Request, statusCode int, object runtime.Object) {
serializer, err := negotiateOutputSerializer(req, s)
if err != nil {
status := errToAPIStatus(err)
WriteRawJSON(int(status.Code), status, w)
return
}
w.Header().Set("Content-Type", serializer.MediaType)
w.WriteHeader(statusCode)
encoder := s.EncoderForVersion(serializer.Serializer, gv)
if err := encoder.Encode(object, w); err != nil {
errorJSONFatal(err, encoder, w)
}
}
// errorNegotiated renders an error to the response. Returns the HTTP status code of the error.
func errorNegotiated(err error, s runtime.NegotiatedSerializer, gv schema.GroupVersion, w http.ResponseWriter, req *http.Request) int {
status := errToAPIStatus(err)
code := int(status.Code)
// when writing an error, check to see if the status indicates a retry after period
if status.Details != nil && status.Details.RetryAfterSeconds > 0 {
delay := strconv.Itoa(int(status.Details.RetryAfterSeconds))
w.Header().Set("Retry-After", delay)
}
writeNegotiated(s, gv, w, req, code, status)
return code
}
// errorJSONFatal renders an error to the response, and if codec fails will render plaintext.
// Returns the HTTP status code of the error.
func errorJSONFatal(err error, codec runtime.Encoder, w http.ResponseWriter) int {
utilruntime.HandleError(fmt.Errorf("apiserver was unable to write a JSON response: %v", err))
status := errToAPIStatus(err)
code := int(status.Code)
output, err := runtime.Encode(codec, status)
if err != nil {
w.WriteHeader(code)
fmt.Fprintf(w, "%s: %s", status.Reason, status.Message)
return code
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(code)
w.Write(output)
return code
}
// WriteRawJSON writes a non-API object in JSON.
func WriteRawJSON(statusCode int, object interface{}, w http.ResponseWriter) {
output, err := json.MarshalIndent(object, "", " ")
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(statusCode)
w.Write(output)
}
func parseTimeout(str string) time.Duration {
if str != "" {
timeout, err := time.ParseDuration(str)
if err == nil {
return timeout
}
glog.Errorf("Failed to parse %q: %v", str, err)
}
return 30 * time.Second
}
func readBody(req *http.Request) ([]byte, error) {
defer req.Body.Close()
return ioutil.ReadAll(req.Body)
}

View File

@ -23,6 +23,7 @@ import (
"fmt"
"io"
"io/ioutil"
"math/rand"
"net/http"
"net/http/httptest"
"net/url"
@ -38,10 +39,12 @@ import (
apierrs "k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/meta"
"k8s.io/kubernetes/pkg/api/rest"
apitesting "k8s.io/kubernetes/pkg/api/testing"
"k8s.io/kubernetes/pkg/api/v1"
apiv1 "k8s.io/kubernetes/pkg/api/v1"
metav1 "k8s.io/kubernetes/pkg/apis/meta/v1"
"k8s.io/kubernetes/pkg/apiserver/filters"
"k8s.io/kubernetes/pkg/apiserver/handlers/responsewriters"
"k8s.io/kubernetes/pkg/apiserver/request"
apiservertesting "k8s.io/kubernetes/pkg/apiserver/testing"
"k8s.io/kubernetes/pkg/fields"
@ -58,10 +61,6 @@ import (
"github.com/emicklei/go-restful"
)
func convert(obj runtime.Object) (runtime.Object, error) {
return obj, nil
}
// This creates fake API versions, similar to api/latest.go.
var testAPIGroup = "test.group"
var testAPIGroup2 = "test.group2"
@ -79,12 +78,10 @@ var grouplessPrefix = "api"
var groupVersions = []schema.GroupVersion{grouplessGroupVersion, testGroupVersion, newGroupVersion}
var codec = api.Codecs.LegacyCodec(groupVersions...)
var grouplessCodec = api.Codecs.LegacyCodec(grouplessGroupVersion)
var testCodec = api.Codecs.LegacyCodec(testGroupVersion)
var newCodec = api.Codecs.LegacyCodec(newGroupVersion)
var accessor = meta.NewAccessor()
var versioner runtime.ResourceVersioner = accessor
var selfLinker runtime.SelfLinker = accessor
var mapper, namespaceMapper meta.RESTMapper // The mappers with namespace and with legacy namespace scopes.
var admissionControl admission.Interface
@ -2773,18 +2770,6 @@ func TestUpdateChecksDecode(t *testing.T) {
}
}
func TestParseTimeout(t *testing.T) {
if d := parseTimeout(""); d != 30*time.Second {
t.Errorf("blank timeout produces %v", d)
}
if d := parseTimeout("not a timeout"); d != 30*time.Second {
t.Errorf("bad timeout produces %v", d)
}
if d := parseTimeout("10s"); d != 10*time.Second {
t.Errorf("10s timeout produced: %v", d)
}
}
type setTestSelfLinker struct {
t *testing.T
expectedSet string
@ -3089,7 +3074,7 @@ func (obj *UnregisteredAPIObject) GetObjectKind() schema.ObjectKind {
func TestWriteJSONDecodeError(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
writeNegotiated(api.Codecs, newGroupVersion, w, req, http.StatusOK, &UnregisteredAPIObject{"Undecodable"})
responsewriters.WriteObjectNegotiated(api.Codecs, newGroupVersion, w, req, http.StatusOK, &UnregisteredAPIObject{"Undecodable"})
}))
defer server.Close()
// We send a 200 status code before we encode the object, so we expect OK, but there will
@ -3114,7 +3099,7 @@ func (m *marshalError) MarshalJSON() ([]byte, error) {
func TestWriteRAWJSONMarshalError(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
WriteRawJSON(http.StatusOK, &marshalError{errors.New("Undecodable")}, w)
responsewriters.WriteRawJSON(http.StatusOK, &marshalError{errors.New("Undecodable")}, w)
}))
defer server.Close()
client := http.Client{}
@ -3425,3 +3410,15 @@ func newTestRequestInfoResolver() *request.RequestInfoFactory {
GrouplessAPIPrefixes: sets.NewString("api"),
}
}
const benchmarkSeed = 100
func benchmarkItems() []api.Pod {
apiObjectFuzzer := apitesting.FuzzerFor(nil, api.SchemeGroupVersion, rand.NewSource(benchmarkSeed))
items := make([]api.Pod, 3)
for i := range items {
apiObjectFuzzer.Fuzz(&items[i])
items[i].Spec.InitContainers, items[i].Status.InitContainerStatuses = nil, nil
}
return items
}

172
pkg/apiserver/discovery.go Normal file
View File

@ -0,0 +1,172 @@
/*
Copyright 2014 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package apiserver
import (
"bytes"
"fmt"
"io"
"net/http"
"github.com/emicklei/go-restful"
metav1 "k8s.io/kubernetes/pkg/apis/meta/v1"
"k8s.io/kubernetes/pkg/apiserver/handlers"
"k8s.io/kubernetes/pkg/apiserver/handlers/negotiation"
"k8s.io/kubernetes/pkg/apiserver/handlers/responsewriters"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/runtime/schema"
)
// AddApiWebService adds a service to return the supported api versions at the legacy /api.
func AddApiWebService(s runtime.NegotiatedSerializer, container *restful.Container, apiPrefix string, getAPIVersionsFunc func(req *restful.Request) *metav1.APIVersions) {
// TODO: InstallREST should register each version automatically
// Because in release 1.1, /api returns response with empty APIVersion, we
// use StripVersionNegotiatedSerializer to keep the response backwards
// compatible.
mediaTypes, _ := negotiation.MediaTypesForSerializer(s)
ss := stripVersionNegotiatedSerializer{s}
versionHandler := APIVersionHandler(ss, getAPIVersionsFunc)
ws := new(restful.WebService)
ws.Path(apiPrefix)
ws.Doc("get available API versions")
ws.Route(ws.GET("/").To(versionHandler).
Doc("get available API versions").
Operation("getAPIVersions").
Produces(mediaTypes...).
Consumes(mediaTypes...).
Writes(metav1.APIVersions{}))
container.Add(ws)
}
// stripVersionEncoder strips APIVersion field from the encoding output. It's
// used to keep the responses at the discovery endpoints backward compatible
// with release-1.1, when the responses have empty APIVersion.
type stripVersionEncoder struct {
encoder runtime.Encoder
serializer runtime.Serializer
}
func (c stripVersionEncoder) Encode(obj runtime.Object, w io.Writer) error {
buf := bytes.NewBuffer([]byte{})
err := c.encoder.Encode(obj, buf)
if err != nil {
return err
}
roundTrippedObj, gvk, err := c.serializer.Decode(buf.Bytes(), nil, nil)
if err != nil {
return err
}
gvk.Group = ""
gvk.Version = ""
roundTrippedObj.GetObjectKind().SetGroupVersionKind(*gvk)
return c.serializer.Encode(roundTrippedObj, w)
}
// stripVersionNegotiatedSerializer will return stripVersionEncoder when
// EncoderForVersion is called. See comments for stripVersionEncoder.
type stripVersionNegotiatedSerializer struct {
runtime.NegotiatedSerializer
}
func (n stripVersionNegotiatedSerializer) EncoderForVersion(encoder runtime.Encoder, gv runtime.GroupVersioner) runtime.Encoder {
serializer, ok := encoder.(runtime.Serializer)
if !ok {
// The stripVersionEncoder needs both an encoder and decoder, but is called from a context that doesn't have access to the
// decoder. We do a best effort cast here (since this code path is only for backwards compatibility) to get access to the caller's
// decoder.
panic(fmt.Sprintf("Unable to extract serializer from %#v", encoder))
}
versioned := n.NegotiatedSerializer.EncoderForVersion(encoder, gv)
return stripVersionEncoder{versioned, serializer}
}
func keepUnversioned(group string) bool {
return group == "" || group == "extensions"
}
// NewApisWebService returns a webservice serving the available api version under /apis.
func NewApisWebService(s runtime.NegotiatedSerializer, apiPrefix string, f func(req *restful.Request) []metav1.APIGroup) *restful.WebService {
// Because in release 1.1, /apis returns response with empty APIVersion, we
// use StripVersionNegotiatedSerializer to keep the response backwards
// compatible.
ss := stripVersionNegotiatedSerializer{s}
mediaTypes, _ := negotiation.MediaTypesForSerializer(s)
rootAPIHandler := handlers.RootAPIHandler(ss, f)
ws := new(restful.WebService)
ws.Path(apiPrefix)
ws.Doc("get available API versions")
ws.Route(ws.GET("/").To(rootAPIHandler).
Doc("get available API versions").
Operation("getAPIVersions").
Produces(mediaTypes...).
Consumes(mediaTypes...).
Writes(metav1.APIGroupList{}))
return ws
}
// NewGroupWebService returns a webservice serving the supported versions, preferred version, and name
// of a group. E.g., such a web service will be registered at /apis/extensions.
func NewGroupWebService(s runtime.NegotiatedSerializer, path string, group metav1.APIGroup) *restful.WebService {
ss := s
if keepUnversioned(group.Name) {
// Because in release 1.1, /apis/extensions returns response with empty
// APIVersion, we use StripVersionNegotiatedSerializer to keep the
// response backwards compatible.
ss = stripVersionNegotiatedSerializer{s}
}
mediaTypes, _ := negotiation.MediaTypesForSerializer(s)
groupHandler := handlers.GroupHandler(ss, group)
ws := new(restful.WebService)
ws.Path(path)
ws.Doc("get information of a group")
ws.Route(ws.GET("/").To(groupHandler).
Doc("get information of a group").
Operation("getAPIGroup").
Produces(mediaTypes...).
Consumes(mediaTypes...).
Writes(metav1.APIGroup{}))
return ws
}
// Adds a service to return the supported resources, E.g., a such web service
// will be registered at /apis/extensions/v1.
func AddSupportedResourcesWebService(s runtime.NegotiatedSerializer, ws *restful.WebService, groupVersion schema.GroupVersion, lister handlers.APIResourceLister) {
ss := s
if keepUnversioned(groupVersion.Group) {
// Because in release 1.1, /apis/extensions/v1beta1 returns response
// with empty APIVersion, we use StripVersionNegotiatedSerializer to
// keep the response backwards compatible.
ss = stripVersionNegotiatedSerializer{s}
}
mediaTypes, _ := negotiation.MediaTypesForSerializer(s)
resourceHandler := handlers.SupportedResourcesHandler(ss, groupVersion, lister)
ws.Route(ws.GET("/").To(resourceHandler).
Doc("get available resources").
Operation("getAPIResources").
Produces(mediaTypes...).
Consumes(mediaTypes...).
Writes(metav1.APIResourceList{}))
}
// APIVersionHandler returns a handler which will list the provided versions as available.
func APIVersionHandler(s runtime.NegotiatedSerializer, getAPIVersionsFunc func(req *restful.Request) *metav1.APIVersions) restful.RouteFunction {
return func(req *restful.Request, resp *restful.Response) {
responsewriters.WriteObjectNegotiated(s, schema.GroupVersion{}, resp.ResponseWriter, req.Request, http.StatusOK, getAPIVersionsFunc(req))
}
}

View File

@ -0,0 +1,149 @@
/*
Copyright 2014 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package apiserver
import (
"fmt"
"path"
"time"
"github.com/emicklei/go-restful"
"k8s.io/kubernetes/pkg/admission"
"k8s.io/kubernetes/pkg/api"
apierrors "k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/meta"
"k8s.io/kubernetes/pkg/api/rest"
metav1 "k8s.io/kubernetes/pkg/apis/meta/v1"
"k8s.io/kubernetes/pkg/apiserver/handlers"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/runtime/schema"
utilerrors "k8s.io/kubernetes/pkg/util/errors"
)
// APIGroupVersion is a helper for exposing rest.Storage objects as http.Handlers via go-restful
// It handles URLs of the form:
// /${storage_key}[/${object_name}]
// Where 'storage_key' points to a rest.Storage object stored in storage.
// This object should contain all parameterization necessary for running a particular API version
type APIGroupVersion struct {
Storage map[string]rest.Storage
Root string
// GroupVersion is the external group version
GroupVersion schema.GroupVersion
// OptionsExternalVersion controls the Kubernetes APIVersion used for common objects in the apiserver
// schema like api.Status, api.DeleteOptions, and api.ListOptions. Other implementors may
// define a version "v1beta1" but want to use the Kubernetes "v1" internal objects. If
// empty, defaults to GroupVersion.
OptionsExternalVersion *schema.GroupVersion
Mapper meta.RESTMapper
// Serializer is used to determine how to convert responses from API methods into bytes to send over
// the wire.
Serializer runtime.NegotiatedSerializer
ParameterCodec runtime.ParameterCodec
Typer runtime.ObjectTyper
Creater runtime.ObjectCreater
Convertor runtime.ObjectConvertor
Copier runtime.ObjectCopier
Linker runtime.SelfLinker
Admit admission.Interface
Context api.RequestContextMapper
MinRequestTimeout time.Duration
// SubresourceGroupVersionKind contains the GroupVersionKind overrides for each subresource that is
// accessible from this API group version. The GroupVersionKind is that of the external version of
// the subresource. The key of this map should be the path of the subresource. The keys here should
// match the keys in the Storage map above for subresources.
SubresourceGroupVersionKind map[string]schema.GroupVersionKind
// ResourceLister is an interface that knows how to list resources
// for this API Group.
ResourceLister handlers.APIResourceLister
}
// InstallREST registers the REST handlers (storage, watch, proxy and redirect) into a restful Container.
// It is expected that the provided path root prefix will serve all operations. Root MUST NOT end
// in a slash.
func (g *APIGroupVersion) InstallREST(container *restful.Container) error {
installer := g.newInstaller()
ws := installer.NewWebService()
apiResources, registrationErrors := installer.Install(ws)
lister := g.ResourceLister
if lister == nil {
lister = staticLister{apiResources}
}
AddSupportedResourcesWebService(g.Serializer, ws, g.GroupVersion, lister)
container.Add(ws)
return utilerrors.NewAggregate(registrationErrors)
}
// UpdateREST registers the REST handlers for this APIGroupVersion to an existing web service
// in the restful Container. It will use the prefix (root/version) to find the existing
// web service. If a web service does not exist within the container to support the prefix
// this method will return an error.
func (g *APIGroupVersion) UpdateREST(container *restful.Container) error {
installer := g.newInstaller()
var ws *restful.WebService = nil
for i, s := range container.RegisteredWebServices() {
if s.RootPath() == installer.prefix {
ws = container.RegisteredWebServices()[i]
break
}
}
if ws == nil {
return apierrors.NewInternalError(fmt.Errorf("unable to find an existing webservice for prefix %s", installer.prefix))
}
apiResources, registrationErrors := installer.Install(ws)
lister := g.ResourceLister
if lister == nil {
lister = staticLister{apiResources}
}
AddSupportedResourcesWebService(g.Serializer, ws, g.GroupVersion, lister)
return utilerrors.NewAggregate(registrationErrors)
}
// newInstaller is a helper to create the installer. Used by InstallREST and UpdateREST.
func (g *APIGroupVersion) newInstaller() *APIInstaller {
prefix := path.Join(g.Root, g.GroupVersion.Group, g.GroupVersion.Version)
installer := &APIInstaller{
group: g,
prefix: prefix,
minRequestTimeout: g.MinRequestTimeout,
}
return installer
}
// staticLister implements the APIResourceLister interface
type staticLister struct {
list []metav1.APIResource
}
func (s staticLister) ListAPIResources() []metav1.APIResource {
return s.list
}
var _ handlers.APIResourceLister = &staticLister{}

View File

@ -0,0 +1,75 @@
package(default_visibility = ["//visibility:public"])
licenses(["notice"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
"go_test",
)
go_test(
name = "go_default_test",
srcs = ["resthandler_test.go"],
library = "go_default_library",
tags = ["automanaged"],
deps = [
"//pkg/api:go_default_library",
"//pkg/api/errors:go_default_library",
"//pkg/api/rest:go_default_library",
"//pkg/api/testapi:go_default_library",
"//pkg/api/v1:go_default_library",
"//pkg/apis/meta/v1:go_default_library",
"//pkg/runtime:go_default_library",
"//pkg/runtime/schema:go_default_library",
"//pkg/types:go_default_library",
"//pkg/util/diff:go_default_library",
"//pkg/util/strategicpatch:go_default_library",
"//vendor:github.com/emicklei/go-restful",
"//vendor:github.com/evanphx/json-patch",
],
)
go_library(
name = "go_default_library",
srcs = [
"discovery.go",
"doc.go",
"proxy.go",
"resthandler.go",
"watch.go",
],
tags = ["automanaged"],
deps = [
"//pkg/admission:go_default_library",
"//pkg/api:go_default_library",
"//pkg/api/errors:go_default_library",
"//pkg/api/meta:go_default_library",
"//pkg/api/rest:go_default_library",
"//pkg/apis/meta/v1:go_default_library",
"//pkg/apiserver/handlers/negotiation:go_default_library",
"//pkg/apiserver/handlers/responsewriters:go_default_library",
"//pkg/apiserver/metrics:go_default_library",
"//pkg/apiserver/request:go_default_library",
"//pkg/fields:go_default_library",
"//pkg/httplog:go_default_library",
"//pkg/runtime:go_default_library",
"//pkg/runtime/schema:go_default_library",
"//pkg/runtime/serializer/streaming:go_default_library",
"//pkg/util:go_default_library",
"//pkg/util/httpstream:go_default_library",
"//pkg/util/net:go_default_library",
"//pkg/util/proxy:go_default_library",
"//pkg/util/runtime:go_default_library",
"//pkg/util/sets:go_default_library",
"//pkg/util/strategicpatch:go_default_library",
"//pkg/util/version:go_default_library",
"//pkg/util/wsstream:go_default_library",
"//pkg/watch:go_default_library",
"//pkg/watch/versioned:go_default_library",
"//vendor:github.com/emicklei/go-restful",
"//vendor:github.com/evanphx/json-patch",
"//vendor:github.com/golang/glog",
"//vendor:golang.org/x/net/websocket",
],
)

View File

@ -0,0 +1,95 @@
/*
Copyright 2014 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package handlers
import (
"net/http"
"strings"
metav1 "k8s.io/kubernetes/pkg/apis/meta/v1"
"k8s.io/kubernetes/pkg/apiserver/handlers/responsewriters"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/runtime/schema"
"k8s.io/kubernetes/pkg/util/sets"
utilversion "k8s.io/kubernetes/pkg/util/version"
"github.com/emicklei/go-restful"
)
type APIResourceLister interface {
ListAPIResources() []metav1.APIResource
}
// RootAPIHandler returns a handler which will list the provided groups and versions as available.
func RootAPIHandler(s runtime.NegotiatedSerializer, f func(req *restful.Request) []metav1.APIGroup) restful.RouteFunction {
return func(req *restful.Request, resp *restful.Response) {
responsewriters.WriteObjectNegotiated(s, schema.GroupVersion{}, resp.ResponseWriter, req.Request, http.StatusOK, &metav1.APIGroupList{Groups: filterAPIGroups(req, f(req))})
}
}
// GroupHandler returns a handler which will return the api.GroupAndVersion of
// the group.
func GroupHandler(s runtime.NegotiatedSerializer, group metav1.APIGroup) restful.RouteFunction {
return func(req *restful.Request, resp *restful.Response) {
responsewriters.WriteObjectNegotiated(s, schema.GroupVersion{}, resp.ResponseWriter, req.Request, http.StatusOK, &group)
}
}
// SupportedResourcesHandler returns a handler which will list the provided resources as available.
func SupportedResourcesHandler(s runtime.NegotiatedSerializer, groupVersion schema.GroupVersion, lister APIResourceLister) restful.RouteFunction {
return func(req *restful.Request, resp *restful.Response) {
responsewriters.WriteObjectNegotiated(s, schema.GroupVersion{}, resp.ResponseWriter, req.Request, http.StatusOK, &metav1.APIResourceList{GroupVersion: groupVersion.String(), APIResources: lister.ListAPIResources()})
}
}
// TODO: Remove in 1.6. This is for backward compatibility with 1.4 kubectl.
// See https://github.com/kubernetes/kubernetes/issues/35791
var groupsWithNewVersionsIn1_5 = sets.NewString("apps", "policy")
// TODO: Remove in 1.6.
func filterAPIGroups(req *restful.Request, groups []metav1.APIGroup) []metav1.APIGroup {
if !isOldKubectl(req.HeaderParameter("User-Agent")) {
return groups
}
// hide API group that has new versions added in 1.5.
var ret []metav1.APIGroup
for _, group := range groups {
if groupsWithNewVersionsIn1_5.Has(group.Name) {
continue
}
ret = append(ret, group)
}
return ret
}
// TODO: Remove in 1.6. Returns if kubectl is older than v1.5.0
func isOldKubectl(userAgent string) bool {
// example userAgent string: kubectl-1.3/v1.3.8 (linux/amd64) kubernetes/e328d5b
if !strings.Contains(userAgent, "kubectl") {
return false
}
userAgent = strings.Split(userAgent, " ")[0]
subs := strings.Split(userAgent, "/")
if len(subs) != 2 {
return false
}
kubectlVersion, versionErr := utilversion.ParseSemantic(subs[1])
if versionErr != nil {
return false
}
return kubectlVersion.LessThan(utilversion.MustParseSemantic("v1.5.0"))
}

View File

@ -0,0 +1,18 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package handlers contains HTTP handlers to implement the apiserver APIs.
package handlers // import "k8s.io/kubernetes/pkg/apiserver/handlers"

View File

@ -0,0 +1,36 @@
package(default_visibility = ["//visibility:public"])
licenses(["notice"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
"go_test",
)
go_test(
name = "go_default_test",
srcs = ["errors_test.go"],
library = "go_default_library",
tags = ["automanaged"],
deps = [
"//pkg/api:go_default_library",
"//pkg/api/errors:go_default_library",
"//pkg/apis/meta/v1:go_default_library",
"//pkg/runtime/schema:go_default_library",
],
)
go_library(
name = "go_default_library",
srcs = [
"doc.go",
"errors.go",
],
tags = ["automanaged"],
deps = [
"//pkg/apis/meta/v1:go_default_library",
"//pkg/storage:go_default_library",
"//pkg/util/runtime:go_default_library",
],
)

View File

@ -0,0 +1,18 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package errors contains HTTP handler related errors
package errors // import "k8s.io/kubernetes/pkg/apiserver/handlers/errors"

View File

@ -14,12 +14,11 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package apiserver
package errors
import (
"fmt"
"net/http"
"strings"
metav1 "k8s.io/kubernetes/pkg/apis/meta/v1"
"k8s.io/kubernetes/pkg/storage"
@ -31,8 +30,8 @@ type statusError interface {
Status() metav1.Status
}
// errToAPIStatus converts an error to an metav1.Status object.
func errToAPIStatus(err error) *metav1.Status {
// ErrToAPIStatus converts an error to an metav1.Status object.
func ErrToAPIStatus(err error) *metav1.Status {
switch t := err.(type) {
case statusError:
status := t.Status()
@ -70,19 +69,6 @@ func errToAPIStatus(err error) *metav1.Status {
}
}
// notFound renders a simple not found error.
func notFound(w http.ResponseWriter, req *http.Request) {
w.WriteHeader(http.StatusNotFound)
fmt.Fprintf(w, "Not Found: %#v", req.RequestURI)
}
// internalError renders a simple internal error
func internalError(w http.ResponseWriter, req *http.Request, err error) {
w.WriteHeader(http.StatusInternalServerError)
fmt.Fprintf(w, "Internal Server Error: %#v", req.RequestURI)
runtime.HandleError(err)
}
// errAPIPrefixNotFound indicates that a RequestInfo resolution failed because the request isn't under
// any known API prefixes
type errAPIPrefixNotFound struct {
@ -101,41 +87,3 @@ func IsAPIPrefixNotFound(err error) bool {
_, ok := err.(*errAPIPrefixNotFound)
return ok
}
// errNotAcceptable indicates Accept negotiation has failed
// TODO: move to api/errors if other code needs to return this
type errNotAcceptable struct {
accepted []string
}
func (e errNotAcceptable) Error() string {
return fmt.Sprintf("only the following media types are accepted: %v", strings.Join(e.accepted, ", "))
}
func (e errNotAcceptable) Status() metav1.Status {
return metav1.Status{
Status: metav1.StatusFailure,
Code: http.StatusNotAcceptable,
Reason: metav1.StatusReason("NotAcceptable"),
Message: e.Error(),
}
}
// errUnsupportedMediaType indicates Content-Type is not recognized
// TODO: move to api/errors if other code needs to return this
type errUnsupportedMediaType struct {
accepted []string
}
func (e errUnsupportedMediaType) Error() string {
return fmt.Sprintf("the body of the request was in an unknown format - accepted media types include: %v", strings.Join(e.accepted, ", "))
}
func (e errUnsupportedMediaType) Status() metav1.Status {
return metav1.Status{
Status: metav1.StatusFailure,
Code: http.StatusUnsupportedMediaType,
Reason: metav1.StatusReason("UnsupportedMediaType"),
Message: e.Error(),
}
}

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package apiserver
package errors
import (
stderrs "errors"
@ -65,7 +65,7 @@ func TestErrorsToAPIStatus(t *testing.T) {
},
}
for k, v := range cases {
actual := errToAPIStatus(k)
actual := ErrToAPIStatus(k)
if !reflect.DeepEqual(actual, &v) {
t.Errorf("%s: Expected %#v, Got %#v", k, v, actual)
}

View File

@ -0,0 +1,36 @@
package(default_visibility = ["//visibility:public"])
licenses(["notice"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
"go_test",
)
go_test(
name = "go_default_test",
srcs = ["negotiate_test.go"],
library = "go_default_library",
tags = ["automanaged"],
deps = [
"//pkg/apis/meta/v1:go_default_library",
"//pkg/runtime:go_default_library",
],
)
go_library(
name = "go_default_library",
srcs = [
"doc.go",
"errors.go",
"negotiate.go",
],
tags = ["automanaged"],
deps = [
"//pkg/apis/meta/v1:go_default_library",
"//pkg/runtime:go_default_library",
"//pkg/runtime/schema:go_default_library",
"//vendor:bitbucket.org/ww/goautoneg",
],
)

View File

@ -0,0 +1,18 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package negotation contains media type negotiation logic.
package negotiation // import "k8s.io/kubernetes/pkg/apiserver/handlers/negotiation"

View File

@ -0,0 +1,61 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package negotiation
import (
"fmt"
"net/http"
"strings"
metav1 "k8s.io/kubernetes/pkg/apis/meta/v1"
)
// errNotAcceptable indicates Accept negotiation has failed
type errNotAcceptable struct {
accepted []string
}
func (e errNotAcceptable) Error() string {
return fmt.Sprintf("only the following media types are accepted: %v", strings.Join(e.accepted, ", "))
}
func (e errNotAcceptable) Status() metav1.Status {
return metav1.Status{
Status: metav1.StatusFailure,
Code: http.StatusNotAcceptable,
Reason: metav1.StatusReason("NotAcceptable"),
Message: e.Error(),
}
}
// errUnsupportedMediaType indicates Content-Type is not recognized
type errUnsupportedMediaType struct {
accepted []string
}
func (e errUnsupportedMediaType) Error() string {
return fmt.Sprintf("the body of the request was in an unknown format - accepted media types include: %v", strings.Join(e.accepted, ", "))
}
func (e errUnsupportedMediaType) Status() metav1.Status {
return metav1.Status{
Status: metav1.StatusFailure,
Code: http.StatusUnsupportedMediaType,
Reason: metav1.StatusReason("UnsupportedMediaType"),
Message: e.Error(),
}
}

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package apiserver
package negotiation
import (
"mime"
@ -28,8 +28,8 @@ import (
"k8s.io/kubernetes/pkg/runtime/schema"
)
// mediaTypesForSerializer returns a list of media and stream media types for the server.
func mediaTypesForSerializer(ns runtime.NegotiatedSerializer) (mediaTypes, streamMediaTypes []string) {
// MediaTypesForSerializer returns a list of media and stream media types for the server.
func MediaTypesForSerializer(ns runtime.NegotiatedSerializer) (mediaTypes, streamMediaTypes []string) {
for _, info := range ns.SupportedMediaTypes() {
mediaTypes = append(mediaTypes, info.MediaType)
if info.StreamSerializer != nil {
@ -40,10 +40,10 @@ func mediaTypesForSerializer(ns runtime.NegotiatedSerializer) (mediaTypes, strea
return mediaTypes, streamMediaTypes
}
func negotiateOutputSerializer(req *http.Request, ns runtime.NegotiatedSerializer) (runtime.SerializerInfo, error) {
func NegotiateOutputSerializer(req *http.Request, ns runtime.NegotiatedSerializer) (runtime.SerializerInfo, error) {
mediaType, ok := negotiateMediaTypeOptions(req.Header.Get("Accept"), acceptedMediaTypesForEndpoint(ns), defaultEndpointRestrictions)
if !ok {
supported, _ := mediaTypesForSerializer(ns)
supported, _ := MediaTypesForSerializer(ns)
return runtime.SerializerInfo{}, errNotAcceptable{supported}
}
// TODO: move into resthandler
@ -54,16 +54,16 @@ func negotiateOutputSerializer(req *http.Request, ns runtime.NegotiatedSerialize
return info, nil
}
func negotiateOutputStreamSerializer(req *http.Request, ns runtime.NegotiatedSerializer) (runtime.SerializerInfo, error) {
func NegotiateOutputStreamSerializer(req *http.Request, ns runtime.NegotiatedSerializer) (runtime.SerializerInfo, error) {
mediaType, ok := negotiateMediaTypeOptions(req.Header.Get("Accept"), acceptedMediaTypesForEndpoint(ns), defaultEndpointRestrictions)
if !ok || mediaType.accepted.Serializer.StreamSerializer == nil {
_, supported := mediaTypesForSerializer(ns)
_, supported := MediaTypesForSerializer(ns)
return runtime.SerializerInfo{}, errNotAcceptable{supported}
}
return mediaType.accepted.Serializer, nil
}
func negotiateInputSerializer(req *http.Request, ns runtime.NegotiatedSerializer) (runtime.SerializerInfo, error) {
func NegotiateInputSerializer(req *http.Request, ns runtime.NegotiatedSerializer) (runtime.SerializerInfo, error) {
mediaTypes := ns.SupportedMediaTypes()
mediaType := req.Header.Get("Content-Type")
if len(mediaType) == 0 {
@ -71,7 +71,7 @@ func negotiateInputSerializer(req *http.Request, ns runtime.NegotiatedSerializer
}
mediaType, _, err := mime.ParseMediaType(mediaType)
if err != nil {
_, supported := mediaTypesForSerializer(ns)
_, supported := MediaTypesForSerializer(ns)
return runtime.SerializerInfo{}, errUnsupportedMediaType{supported}
}
@ -82,7 +82,7 @@ func negotiateInputSerializer(req *http.Request, ns runtime.NegotiatedSerializer
return info, nil
}
_, supported := mediaTypesForSerializer(ns)
_, supported := MediaTypesForSerializer(ns)
return runtime.SerializerInfo{}, errUnsupportedMediaType{supported}
}

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package apiserver
package negotiation
import (
"net/http"
@ -25,6 +25,11 @@ import (
"k8s.io/kubernetes/pkg/runtime"
)
// statusError is an object that can be converted into an metav1.Status
type statusError interface {
Status() metav1.Status
}
type fakeNegotiater struct {
serializer, streamSerializer runtime.Serializer
framer runtime.Framer
@ -207,7 +212,7 @@ func TestNegotiate(t *testing.T) {
req = &http.Request{Header: http.Header{}}
req.Header.Set("Accept", test.accept)
}
s, err := negotiateOutputSerializer(req, test.ns)
s, err := NegotiateOutputSerializer(req, test.ns)
switch {
case err == nil && test.errFn != nil:
t.Errorf("%d: failed: expected error", i)

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package apiserver
package handlers
import (
"errors"
@ -30,7 +30,9 @@ import (
"k8s.io/kubernetes/pkg/api"
apierrors "k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/rest"
"k8s.io/kubernetes/pkg/apiserver/handlers/responsewriters"
"k8s.io/kubernetes/pkg/apiserver/metrics"
"k8s.io/kubernetes/pkg/apiserver/request"
"k8s.io/kubernetes/pkg/httplog"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/runtime/schema"
@ -39,16 +41,15 @@ import (
proxyutil "k8s.io/kubernetes/pkg/util/proxy"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/apiserver/request"
)
// ProxyHandler provides a http.Handler which will proxy traffic to locations
// specified by items implementing Redirector.
type ProxyHandler struct {
prefix string
storage map[string]rest.Storage
serializer runtime.NegotiatedSerializer
mapper api.RequestContextMapper
Prefix string
Storage map[string]rest.Storage
Serializer runtime.NegotiatedSerializer
Mapper api.RequestContextMapper
}
func (r *ProxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
@ -60,21 +61,21 @@ func (r *ProxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
reqStart := time.Now()
defer metrics.Monitor(&verb, &apiResource, net.GetHTTPClient(req), w.Header().Get("Content-Type"), httpCode, reqStart)
ctx, ok := r.mapper.Get(req)
ctx, ok := r.Mapper.Get(req)
if !ok {
internalError(w, req, errors.New("Error getting request context"))
responsewriters.InternalError(w, req, errors.New("Error getting request context"))
httpCode = http.StatusInternalServerError
return
}
requestInfo, ok := request.RequestInfoFrom(ctx)
if !ok {
internalError(w, req, errors.New("Error getting RequestInfo from context"))
responsewriters.InternalError(w, req, errors.New("Error getting RequestInfo from context"))
httpCode = http.StatusInternalServerError
return
}
if !requestInfo.IsResourceRequest {
notFound(w, req)
responsewriters.NotFound(w, req)
httpCode = http.StatusNotFound
return
}
@ -83,7 +84,7 @@ func (r *ProxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
ctx = api.WithNamespace(ctx, namespace)
if len(parts) < 2 {
notFound(w, req)
responsewriters.NotFound(w, req)
httpCode = http.StatusNotFound
return
}
@ -99,10 +100,10 @@ func (r *ProxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
remainder = remainder + "/"
}
}
storage, ok := r.storage[resource]
storage, ok := r.Storage[resource]
if !ok {
httplog.LogOf(req, w).Addf("'%v' has no storage object", resource)
notFound(w, req)
responsewriters.NotFound(w, req)
httpCode = http.StatusNotFound
return
}
@ -113,19 +114,19 @@ func (r *ProxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
redirector, ok := storage.(rest.Redirector)
if !ok {
httplog.LogOf(req, w).Addf("'%v' is not a redirector", resource)
httpCode = errorNegotiated(apierrors.NewMethodNotSupported(api.Resource(resource), "proxy"), r.serializer, gv, w, req)
httpCode = responsewriters.ErrorNegotiated(apierrors.NewMethodNotSupported(api.Resource(resource), "proxy"), r.Serializer, gv, w, req)
return
}
location, roundTripper, err := redirector.ResourceLocation(ctx, id)
if err != nil {
httplog.LogOf(req, w).Addf("Error getting ResourceLocation: %v", err)
httpCode = errorNegotiated(err, r.serializer, gv, w, req)
httpCode = responsewriters.ErrorNegotiated(err, r.Serializer, gv, w, req)
return
}
if location == nil {
httplog.LogOf(req, w).Addf("ResourceLocation for %v returned nil", id)
notFound(w, req)
responsewriters.NotFound(w, req)
httpCode = http.StatusNotFound
return
}
@ -153,7 +154,7 @@ func (r *ProxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
newReq, err := http.NewRequest(req.Method, location.String(), req.Body)
if err != nil {
httpCode = errorNegotiated(err, r.serializer, gv, w, req)
httpCode = responsewriters.ErrorNegotiated(err, r.Serializer, gv, w, req)
return
}
httpCode = http.StatusOK
@ -197,9 +198,9 @@ func (r *ProxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
}
if !alreadyRewriting {
glog.V(5).Infof("[%x] making a transport for proxy %s...", proxyHandlerTraceID, req.URL)
prepend := path.Join(r.prefix, resource, id)
prepend := path.Join(r.Prefix, resource, id)
if len(namespace) > 0 {
prepend = path.Join(r.prefix, "namespaces", namespace, resource, id)
prepend = path.Join(r.Prefix, "namespaces", namespace, resource, id)
}
pTransport := &proxyutil.Transport{
Scheme: req.URL.Scheme,
@ -221,7 +222,7 @@ func (r *ProxyHandler) tryUpgrade(w http.ResponseWriter, req, newReq *http.Reque
}
backendConn, err := proxyutil.DialURL(location, transport)
if err != nil {
errorNegotiated(err, r.serializer, gv, w, req)
responsewriters.ErrorNegotiated(err, r.Serializer, gv, w, req)
return true
}
defer backendConn.Close()
@ -231,13 +232,13 @@ func (r *ProxyHandler) tryUpgrade(w http.ResponseWriter, req, newReq *http.Reque
// hijack, just for reference...
requestHijackedConn, _, err := w.(http.Hijacker).Hijack()
if err != nil {
errorNegotiated(err, r.serializer, gv, w, req)
responsewriters.ErrorNegotiated(err, r.Serializer, gv, w, req)
return true
}
defer requestHijackedConn.Close()
if err = newReq.Write(backendConn); err != nil {
errorNegotiated(err, r.serializer, gv, w, req)
responsewriters.ErrorNegotiated(err, r.Serializer, gv, w, req)
return true
}

View File

@ -0,0 +1,27 @@
package(default_visibility = ["//visibility:public"])
licenses(["notice"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
)
go_library(
name = "go_default_library",
srcs = [
"doc.go",
"writers.go",
],
tags = ["automanaged"],
deps = [
"//pkg/api/rest:go_default_library",
"//pkg/apiserver/handlers/errors:go_default_library",
"//pkg/apiserver/handlers/negotiation:go_default_library",
"//pkg/runtime:go_default_library",
"//pkg/runtime/schema:go_default_library",
"//pkg/util/flushwriter:go_default_library",
"//pkg/util/runtime:go_default_library",
"//pkg/util/wsstream:go_default_library",
],
)

View File

@ -0,0 +1,18 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package responsewriters containers helpers to write responses in HTTP handlers.
package responsewriters // import "k8s.io/kubernetes/pkg/apiserver/handlers/responsewriters"

View File

@ -0,0 +1,152 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package responsewriters
import (
"encoding/json"
"fmt"
"io"
"net/http"
"strconv"
"k8s.io/kubernetes/pkg/api/rest"
handlererrors "k8s.io/kubernetes/pkg/apiserver/handlers/errors"
"k8s.io/kubernetes/pkg/apiserver/handlers/negotiation"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/runtime/schema"
"k8s.io/kubernetes/pkg/util/flushwriter"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/wsstream"
)
// WriteObject renders a returned runtime.Object to the response as a stream or an encoded object. If the object
// returned by the response implements rest.ResourceStreamer that interface will be used to render the
// response. The Accept header and current API version will be passed in, and the output will be copied
// directly to the response body. If content type is returned it is used, otherwise the content type will
// be "application/octet-stream". All other objects are sent to standard JSON serialization.
func WriteObject(statusCode int, gv schema.GroupVersion, s runtime.NegotiatedSerializer, object runtime.Object, w http.ResponseWriter, req *http.Request) {
stream, ok := object.(rest.ResourceStreamer)
if !ok {
WriteObjectNegotiated(s, gv, w, req, statusCode, object)
return
}
out, flush, contentType, err := stream.InputStream(gv.String(), req.Header.Get("Accept"))
if err != nil {
ErrorNegotiated(err, s, gv, w, req)
return
}
if out == nil {
// No output provided - return StatusNoContent
w.WriteHeader(http.StatusNoContent)
return
}
defer out.Close()
if wsstream.IsWebSocketRequest(req) {
r := wsstream.NewReader(out, true, wsstream.NewDefaultReaderProtocols())
if err := r.Copy(w, req); err != nil {
utilruntime.HandleError(fmt.Errorf("error encountered while streaming results via websocket: %v", err))
}
return
}
if len(contentType) == 0 {
contentType = "application/octet-stream"
}
w.Header().Set("Content-Type", contentType)
w.WriteHeader(statusCode)
writer := w.(io.Writer)
if flush {
writer = flushwriter.Wrap(w)
}
io.Copy(writer, out)
}
// WriteObjectNegotiated renders an object in the content type negotiated by the client
func WriteObjectNegotiated(s runtime.NegotiatedSerializer, gv schema.GroupVersion, w http.ResponseWriter, req *http.Request, statusCode int, object runtime.Object) {
serializer, err := negotiation.NegotiateOutputSerializer(req, s)
if err != nil {
status := handlererrors.ErrToAPIStatus(err)
WriteRawJSON(int(status.Code), status, w)
return
}
w.Header().Set("Content-Type", serializer.MediaType)
w.WriteHeader(statusCode)
encoder := s.EncoderForVersion(serializer.Serializer, gv)
if err := encoder.Encode(object, w); err != nil {
errorJSONFatal(err, encoder, w)
}
}
// ErrorNegotiated renders an error to the response. Returns the HTTP status code of the error.
func ErrorNegotiated(err error, s runtime.NegotiatedSerializer, gv schema.GroupVersion, w http.ResponseWriter, req *http.Request) int {
status := handlererrors.ErrToAPIStatus(err)
code := int(status.Code)
// when writing an error, check to see if the status indicates a retry after period
if status.Details != nil && status.Details.RetryAfterSeconds > 0 {
delay := strconv.Itoa(int(status.Details.RetryAfterSeconds))
w.Header().Set("Retry-After", delay)
}
WriteObjectNegotiated(s, gv, w, req, code, status)
return code
}
// errorJSONFatal renders an error to the response, and if codec fails will render plaintext.
// Returns the HTTP status code of the error.
func errorJSONFatal(err error, codec runtime.Encoder, w http.ResponseWriter) int {
utilruntime.HandleError(fmt.Errorf("apiserver was unable to write a JSON response: %v", err))
status := handlererrors.ErrToAPIStatus(err)
code := int(status.Code)
output, err := runtime.Encode(codec, status)
if err != nil {
w.WriteHeader(code)
fmt.Fprintf(w, "%s: %s", status.Reason, status.Message)
return code
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(code)
w.Write(output)
return code
}
// WriteRawJSON writes a non-API object in JSON.
func WriteRawJSON(statusCode int, object interface{}, w http.ResponseWriter) {
output, err := json.MarshalIndent(object, "", " ")
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(statusCode)
w.Write(output)
}
// NotFound renders a simple not found error.
func NotFound(w http.ResponseWriter, req *http.Request) {
w.WriteHeader(http.StatusNotFound)
fmt.Fprintf(w, "Not Found: %#v", req.RequestURI)
}
// InternalError renders a simple internal error
func InternalError(w http.ResponseWriter, req *http.Request, err error) {
w.WriteHeader(http.StatusInternalServerError)
fmt.Fprintf(w, "Internal Server Error: %#v", req.RequestURI)
utilruntime.HandleError(err)
}

View File

@ -14,12 +14,13 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package apiserver
package handlers
import (
"encoding/hex"
"encoding/json"
"fmt"
"io/ioutil"
"math/rand"
"net/http"
"net/url"
@ -32,6 +33,8 @@ import (
"k8s.io/kubernetes/pkg/api/meta"
"k8s.io/kubernetes/pkg/api/rest"
metav1 "k8s.io/kubernetes/pkg/apis/meta/v1"
"k8s.io/kubernetes/pkg/apiserver/handlers/negotiation"
"k8s.io/kubernetes/pkg/apiserver/handlers/responsewriters"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/runtime/schema"
@ -86,7 +89,7 @@ type RequestScope struct {
}
func (scope *RequestScope) err(err error, w http.ResponseWriter, req *http.Request) {
errorNegotiated(err, scope.Serializer, scope.Kind.GroupVersion(), w, req)
responsewriters.ErrorNegotiated(err, scope.Serializer, scope.Kind.GroupVersion(), w, req)
}
// getterFunc performs a get request with the given context and object name. The request
@ -118,7 +121,7 @@ func getResourceHandler(scope RequestScope, getter getterFunc) restful.RouteFunc
scope.err(err, res.ResponseWriter, req.Request)
return
}
write(http.StatusOK, scope.Kind.GroupVersion(), scope.Serializer, result, w, req.Request)
responsewriters.WriteObject(http.StatusOK, scope.Kind.GroupVersion(), scope.Serializer, result, w, req.Request)
}
}
@ -228,7 +231,7 @@ type responder struct {
}
func (r *responder) Object(statusCode int, obj runtime.Object) {
write(statusCode, r.scope.Kind.GroupVersion(), r.scope.Serializer, obj, r.res.ResponseWriter, r.req.Request)
responsewriters.WriteObject(statusCode, r.scope.Kind.GroupVersion(), r.scope.Serializer, obj, r.res.ResponseWriter, r.req.Request)
}
func (r *responder) Error(err error) {
@ -330,7 +333,7 @@ func ListResource(r rest.Lister, rw rest.Watcher, scope RequestScope, forceWatch
return
}
trace.Step("Self-linking done")
write(http.StatusOK, scope.Kind.GroupVersion(), scope.Serializer, result, w, req.Request)
responsewriters.WriteObject(http.StatusOK, scope.Kind.GroupVersion(), scope.Serializer, result, w, req.Request)
trace.Step(fmt.Sprintf("Writing http response done (%d items)", numberOfItems))
}
}
@ -364,7 +367,7 @@ func createHandler(r rest.NamedCreater, scope RequestScope, typer runtime.Object
ctx = api.WithNamespace(ctx, namespace)
gv := scope.Kind.GroupVersion()
s, err := negotiateInputSerializer(req.Request, scope.Serializer)
s, err := negotiation.NegotiateInputSerializer(req.Request, scope.Serializer)
if err != nil {
scope.err(err, res.ResponseWriter, req.Request)
return
@ -423,7 +426,7 @@ func createHandler(r rest.NamedCreater, scope RequestScope, typer runtime.Object
}
trace.Step("Self-link added")
write(http.StatusCreated, scope.Kind.GroupVersion(), scope.Serializer, result, w, req.Request)
responsewriters.WriteObject(http.StatusCreated, scope.Kind.GroupVersion(), scope.Serializer, result, w, req.Request)
}
}
@ -516,7 +519,7 @@ func PatchResource(r rest.Patcher, scope RequestScope, typer runtime.ObjectTyper
return
}
write(http.StatusOK, scope.Kind.GroupVersion(), scope.Serializer, result, w, req.Request)
responsewriters.WriteObject(http.StatusOK, scope.Kind.GroupVersion(), scope.Serializer, result, w, req.Request)
}
}
@ -682,7 +685,7 @@ func UpdateResource(r rest.Updater, scope RequestScope, typer runtime.ObjectType
return
}
s, err := negotiateInputSerializer(req.Request, scope.Serializer)
s, err := negotiation.NegotiateInputSerializer(req.Request, scope.Serializer)
if err != nil {
scope.err(err, res.ResponseWriter, req.Request)
return
@ -739,7 +742,7 @@ func UpdateResource(r rest.Updater, scope RequestScope, typer runtime.ObjectType
if wasCreated {
status = http.StatusCreated
}
write(status, scope.Kind.GroupVersion(), scope.Serializer, result, w, req.Request)
responsewriters.WriteObject(status, scope.Kind.GroupVersion(), scope.Serializer, result, w, req.Request)
}
}
@ -771,7 +774,7 @@ func DeleteResource(r rest.GracefulDeleter, allowsOptions bool, scope RequestSco
return
}
if len(body) > 0 {
s, err := negotiateInputSerializer(req.Request, scope.Serializer)
s, err := negotiation.NegotiateInputSerializer(req.Request, scope.Serializer)
if err != nil {
scope.err(err, res.ResponseWriter, req.Request)
return
@ -836,7 +839,7 @@ func DeleteResource(r rest.GracefulDeleter, allowsOptions bool, scope RequestSco
}
}
}
write(http.StatusOK, scope.Kind.GroupVersion(), scope.Serializer, result, w, req.Request)
responsewriters.WriteObject(http.StatusOK, scope.Kind.GroupVersion(), scope.Serializer, result, w, req.Request)
}
}
@ -895,7 +898,7 @@ func DeleteCollection(r rest.CollectionDeleter, checkBody bool, scope RequestSco
return
}
if len(body) > 0 {
s, err := negotiateInputSerializer(req.Request, scope.Serializer)
s, err := negotiation.NegotiateInputSerializer(req.Request, scope.Serializer)
if err != nil {
scope.err(err, res.ResponseWriter, req.Request)
return
@ -940,7 +943,7 @@ func DeleteCollection(r rest.CollectionDeleter, checkBody bool, scope RequestSco
}
}
}
writeNegotiated(scope.Serializer, scope.Kind.GroupVersion(), w, req.Request, http.StatusOK, result)
responsewriters.WriteObjectNegotiated(scope.Serializer, scope.Kind.GroupVersion(), w, req.Request, http.StatusOK, result)
}
}
@ -1101,3 +1104,19 @@ func summarizeData(data []byte, maxLength int) string {
return hex.EncodeToString(data)
}
}
func readBody(req *http.Request) ([]byte, error) {
defer req.Body.Close()
return ioutil.ReadAll(req.Body)
}
func parseTimeout(str string) time.Duration {
if str != "" {
timeout, err := time.ParseDuration(str)
if err == nil {
return timeout
}
glog.Errorf("Failed to parse %q: %v", str, err)
}
return 30 * time.Second
}

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package apiserver
package handlers
import (
"errors"
@ -472,3 +472,15 @@ func TestHasUID(t *testing.T) {
}
}
}
func TestParseTimeout(t *testing.T) {
if d := parseTimeout(""); d != 30*time.Second {
t.Errorf("blank timeout produces %v", d)
}
if d := parseTimeout("not a timeout"); d != 30*time.Second {
t.Errorf("bad timeout produces %v", d)
}
if d := parseTimeout("10s"); d != 10*time.Second {
t.Errorf("10s timeout produced: %v", d)
}
}

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package apiserver
package handlers
import (
"bytes"
@ -24,6 +24,7 @@ import (
"time"
"k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/apiserver/handlers/negotiation"
"k8s.io/kubernetes/pkg/httplog"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/runtime/serializer/streaming"
@ -40,7 +41,7 @@ import (
var neverExitWatch <-chan time.Time = make(chan time.Time)
// timeoutFactory abstracts watch timeout logic for testing
type timeoutFactory interface {
type TimeoutFactory interface {
TimeoutCh() (<-chan time.Time, func() bool)
}
@ -63,7 +64,7 @@ func (w *realTimeoutFactory) TimeoutCh() (<-chan time.Time, func() bool) {
// TODO: the functionality in this method and in WatchServer.Serve is not cleanly decoupled.
func serveWatch(watcher watch.Interface, scope RequestScope, req *restful.Request, res *restful.Response, timeout time.Duration) {
// negotiate for the stream serializer
serializer, err := negotiateOutputStreamSerializer(req.Request, scope.Serializer)
serializer, err := negotiation.NegotiateOutputStreamSerializer(req.Request, scope.Serializer)
if err != nil {
scope.err(err, res.ResponseWriter, req.Request)
return
@ -89,21 +90,21 @@ func serveWatch(watcher watch.Interface, scope RequestScope, req *restful.Reques
}
server := &WatchServer{
watching: watcher,
scope: scope,
Watching: watcher,
Scope: scope,
useTextFraming: useTextFraming,
mediaType: mediaType,
framer: framer,
encoder: encoder,
embeddedEncoder: embeddedEncoder,
fixup: func(obj runtime.Object) {
UseTextFraming: useTextFraming,
MediaType: mediaType,
Framer: framer,
Encoder: encoder,
EmbeddedEncoder: embeddedEncoder,
Fixup: func(obj runtime.Object) {
if err := setSelfLink(obj, req, scope.Namer); err != nil {
utilruntime.HandleError(fmt.Errorf("failed to set link for object %v: %v", reflect.TypeOf(obj), err))
}
},
t: &realTimeoutFactory{timeout},
TimeoutFactory: &realTimeoutFactory{timeout},
}
server.ServeHTTP(res.ResponseWriter, req.Request)
@ -111,22 +112,22 @@ func serveWatch(watcher watch.Interface, scope RequestScope, req *restful.Reques
// WatchServer serves a watch.Interface over a websocket or vanilla HTTP.
type WatchServer struct {
watching watch.Interface
scope RequestScope
Watching watch.Interface
Scope RequestScope
// true if websocket messages should use text framing (as opposed to binary framing)
useTextFraming bool
UseTextFraming bool
// the media type this watch is being served with
mediaType string
MediaType string
// used to frame the watch stream
framer runtime.Framer
Framer runtime.Framer
// used to encode the watch stream event itself
encoder runtime.Encoder
Encoder runtime.Encoder
// used to encode the nested object in the watch stream
embeddedEncoder runtime.Encoder
fixup func(runtime.Object)
EmbeddedEncoder runtime.Encoder
Fixup func(runtime.Object)
t timeoutFactory
TimeoutFactory TimeoutFactory
}
// ServeHTTP serves a series of encoded events via HTTP with Transfer-Encoding: chunked
@ -135,7 +136,7 @@ func (s *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
w = httplog.Unlogged(w)
if wsstream.IsWebSocketRequest(req) {
w.Header().Set("Content-Type", s.mediaType)
w.Header().Set("Content-Type", s.MediaType)
websocket.Handler(s.HandleWS).ServeHTTP(w, req)
return
}
@ -144,34 +145,34 @@ func (s *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
if !ok {
err := fmt.Errorf("unable to start watch - can't get http.CloseNotifier: %#v", w)
utilruntime.HandleError(err)
s.scope.err(errors.NewInternalError(err), w, req)
s.Scope.err(errors.NewInternalError(err), w, req)
return
}
flusher, ok := w.(http.Flusher)
if !ok {
err := fmt.Errorf("unable to start watch - can't get http.Flusher: %#v", w)
utilruntime.HandleError(err)
s.scope.err(errors.NewInternalError(err), w, req)
s.Scope.err(errors.NewInternalError(err), w, req)
return
}
framer := s.framer.NewFrameWriter(w)
framer := s.Framer.NewFrameWriter(w)
if framer == nil {
// programmer error
err := fmt.Errorf("no stream framing support is available for media type %q", s.mediaType)
err := fmt.Errorf("no stream framing support is available for media type %q", s.MediaType)
utilruntime.HandleError(err)
s.scope.err(errors.NewBadRequest(err.Error()), w, req)
s.Scope.err(errors.NewBadRequest(err.Error()), w, req)
return
}
e := streaming.NewEncoder(framer, s.encoder)
e := streaming.NewEncoder(framer, s.Encoder)
// ensure the connection times out
timeoutCh, cleanup := s.t.TimeoutCh()
timeoutCh, cleanup := s.TimeoutFactory.TimeoutCh()
defer cleanup()
defer s.watching.Stop()
defer s.Watching.Stop()
// begin the stream
w.Header().Set("Content-Type", s.mediaType)
w.Header().Set("Content-Type", s.MediaType)
w.Header().Set("Transfer-Encoding", "chunked")
w.WriteHeader(http.StatusOK)
flusher.Flush()
@ -179,7 +180,7 @@ func (s *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
var unknown runtime.Unknown
internalEvent := &versioned.InternalEvent{}
buf := &bytes.Buffer{}
ch := s.watching.ResultChan()
ch := s.Watching.ResultChan()
for {
select {
case <-cn.CloseNotify():
@ -193,8 +194,8 @@ func (s *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
}
obj := event.Object
s.fixup(obj)
if err := s.embeddedEncoder.Encode(obj, buf); err != nil {
s.Fixup(obj)
if err := s.EmbeddedEncoder.Encode(obj, buf); err != nil {
// unexpected error
utilruntime.HandleError(fmt.Errorf("unable to encode watch object: %v", err))
return
@ -239,11 +240,11 @@ func (s *WatchServer) HandleWS(ws *websocket.Conn) {
internalEvent := &versioned.InternalEvent{}
buf := &bytes.Buffer{}
streamBuf := &bytes.Buffer{}
ch := s.watching.ResultChan()
ch := s.Watching.ResultChan()
for {
select {
case <-done:
s.watching.Stop()
s.Watching.Stop()
return
case event, ok := <-ch:
if !ok {
@ -251,8 +252,8 @@ func (s *WatchServer) HandleWS(ws *websocket.Conn) {
return
}
obj := event.Object
s.fixup(obj)
if err := s.embeddedEncoder.Encode(obj, buf); err != nil {
s.Fixup(obj)
if err := s.EmbeddedEncoder.Encode(obj, buf); err != nil {
// unexpected error
utilruntime.HandleError(fmt.Errorf("unable to encode watch object: %v", err))
return
@ -265,22 +266,22 @@ func (s *WatchServer) HandleWS(ws *websocket.Conn) {
// the internal event will be versioned by the encoder
*internalEvent = versioned.InternalEvent(event)
if err := s.encoder.Encode(internalEvent, streamBuf); err != nil {
if err := s.Encoder.Encode(internalEvent, streamBuf); err != nil {
// encoding error
utilruntime.HandleError(fmt.Errorf("unable to encode event: %v", err))
s.watching.Stop()
s.Watching.Stop()
return
}
if s.useTextFraming {
if s.UseTextFraming {
if err := websocket.Message.Send(ws, streamBuf.String()); err != nil {
// Client disconnect.
s.watching.Stop()
s.Watching.Stop()
return
}
} else {
if err := websocket.Message.Send(ws, streamBuf.Bytes()); err != nil {
// Client disconnect.
s.watching.Stop()
s.Watching.Stop()
return
}
}

View File

@ -33,6 +33,8 @@ import (
"k8s.io/kubernetes/pkg/api/rest"
"k8s.io/kubernetes/pkg/apis/extensions"
metav1 "k8s.io/kubernetes/pkg/apis/meta/v1"
"k8s.io/kubernetes/pkg/apiserver/handlers"
"k8s.io/kubernetes/pkg/apiserver/handlers/negotiation"
"k8s.io/kubernetes/pkg/apiserver/metrics"
"k8s.io/kubernetes/pkg/conversion"
"k8s.io/kubernetes/pkg/runtime"
@ -53,7 +55,7 @@ type action struct {
Verb string // Verb identifying the action ("GET", "POST", "WATCH", PROXY", etc).
Path string // The path of the action
Params []*restful.Parameter // List of parameters associated with the action.
Namer ScopeNamer
Namer handlers.ScopeNamer
AllNamespaces bool // true iff the action is namespaced but works on aggregate result for all namespaces
}
@ -84,11 +86,11 @@ var errEmptyName = errors.NewBadRequest("name must be provided")
func (a *APIInstaller) Install(ws *restful.WebService) (apiResources []metav1.APIResource, errors []error) {
errors = make([]error, 0)
proxyHandler := (&ProxyHandler{
prefix: a.prefix + "/proxy/",
storage: a.group.Storage,
serializer: a.group.Serializer,
mapper: a.group.Context,
proxyHandler := (&handlers.ProxyHandler{
Prefix: a.prefix + "/proxy/",
Storage: a.group.Storage,
Serializer: a.group.Serializer,
Mapper: a.group.Context,
})
// Register the paths in a deterministic (sorted) order to get a deterministic swagger spec.
@ -121,7 +123,7 @@ func (a *APIInstaller) NewWebService() *restful.WebService {
// If we stop using go-restful, we can default empty content-type to application/json on an
// endpoint by endpoint basis
ws.Consumes("*/*")
mediaTypes, streamMediaTypes := mediaTypesForSerializer(a.group.Serializer)
mediaTypes, streamMediaTypes := negotiation.MediaTypesForSerializer(a.group.Serializer)
ws.Produces(append(mediaTypes, streamMediaTypes...)...)
ws.ApiVersion(a.group.GroupVersion.String())
@ -338,7 +340,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
}
}
var ctxFn ContextFunc
var ctxFn handlers.ContextFunc
ctxFn = func(req *restful.Request) api.Context {
if context == nil {
return api.WithUserAgent(api.NewContext(), req.HeaderParameter("User-Agent"))
@ -501,12 +503,12 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
//
// test/integration/auth_test.go is currently the most comprehensive status code test
mediaTypes, streamMediaTypes := mediaTypesForSerializer(a.group.Serializer)
mediaTypes, streamMediaTypes := negotiation.MediaTypesForSerializer(a.group.Serializer)
allMediaTypes := append(mediaTypes, streamMediaTypes...)
ws.Produces(allMediaTypes...)
kubeVerbs := map[string]struct{}{}
reqScope := RequestScope{
reqScope := handlers.RequestScope{
ContextFunc: ctxFn,
Serializer: a.group.Serializer,
ParameterCodec: a.group.ParameterCodec,
@ -550,9 +552,9 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
case "GET": // Get a resource.
var handler restful.RouteFunction
if isGetterWithOptions {
handler = GetResourceWithOptions(getterWithOptions, reqScope)
handler = handlers.GetResourceWithOptions(getterWithOptions, reqScope)
} else {
handler = GetResource(getter, exporter, reqScope)
handler = handlers.GetResource(getter, exporter, reqScope)
}
handler = metrics.InstrumentRouteFunc(action.Verb, resource, handler)
doc := "read the specified " + kind
@ -583,7 +585,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
if hasSubresource {
doc = "list " + subresource + " of objects of kind " + kind
}
handler := metrics.InstrumentRouteFunc(action.Verb, resource, ListResource(lister, watcher, reqScope, false, a.minRequestTimeout))
handler := metrics.InstrumentRouteFunc(action.Verb, resource, handlers.ListResource(lister, watcher, reqScope, false, a.minRequestTimeout))
route := ws.GET(action.Path).To(handler).
Doc(doc).
Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")).
@ -615,7 +617,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
if hasSubresource {
doc = "replace " + subresource + " of the specified " + kind
}
handler := metrics.InstrumentRouteFunc(action.Verb, resource, UpdateResource(updater, reqScope, a.group.Typer, admit))
handler := metrics.InstrumentRouteFunc(action.Verb, resource, handlers.UpdateResource(updater, reqScope, a.group.Typer, admit))
route := ws.PUT(action.Path).To(handler).
Doc(doc).
Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")).
@ -631,7 +633,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
if hasSubresource {
doc = "partially update " + subresource + " of the specified " + kind
}
handler := metrics.InstrumentRouteFunc(action.Verb, resource, PatchResource(patcher, reqScope, a.group.Typer, admit, mapping.ObjectConvertor))
handler := metrics.InstrumentRouteFunc(action.Verb, resource, handlers.PatchResource(patcher, reqScope, a.group.Typer, admit, mapping.ObjectConvertor))
route := ws.PATCH(action.Path).To(handler).
Doc(doc).
Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")).
@ -646,9 +648,9 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
case "POST": // Create a resource.
var handler restful.RouteFunction
if isNamedCreater {
handler = CreateNamedResource(namedCreater, reqScope, a.group.Typer, admit)
handler = handlers.CreateNamedResource(namedCreater, reqScope, a.group.Typer, admit)
} else {
handler = CreateResource(creater, reqScope, a.group.Typer, admit)
handler = handlers.CreateResource(creater, reqScope, a.group.Typer, admit)
}
handler = metrics.InstrumentRouteFunc(action.Verb, resource, handler)
article := utilstrings.GetArticleForNoun(kind, " ")
@ -672,7 +674,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
if hasSubresource {
doc = "delete " + subresource + " of" + article + kind
}
handler := metrics.InstrumentRouteFunc(action.Verb, resource, DeleteResource(gracefulDeleter, isGracefulDeleter, reqScope, admit))
handler := metrics.InstrumentRouteFunc(action.Verb, resource, handlers.DeleteResource(gracefulDeleter, isGracefulDeleter, reqScope, admit))
route := ws.DELETE(action.Path).To(handler).
Doc(doc).
Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")).
@ -693,7 +695,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
if hasSubresource {
doc = "delete collection of " + subresource + " of a " + kind
}
handler := metrics.InstrumentRouteFunc(action.Verb, resource, DeleteCollection(collectionDeleter, isCollectionDeleter, reqScope, admit))
handler := metrics.InstrumentRouteFunc(action.Verb, resource, handlers.DeleteCollection(collectionDeleter, isCollectionDeleter, reqScope, admit))
route := ws.DELETE(action.Path).To(handler).
Doc(doc).
Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")).
@ -712,7 +714,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
if hasSubresource {
doc = "watch changes to " + subresource + " of an object of kind " + kind
}
handler := metrics.InstrumentRouteFunc(action.Verb, resource, ListResource(lister, watcher, reqScope, true, a.minRequestTimeout))
handler := metrics.InstrumentRouteFunc(action.Verb, resource, handlers.ListResource(lister, watcher, reqScope, true, a.minRequestTimeout))
route := ws.GET(action.Path).To(handler).
Doc(doc).
Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")).
@ -731,7 +733,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
if hasSubresource {
doc = "watch individual changes to a list of " + subresource + " of " + kind
}
handler := metrics.InstrumentRouteFunc(action.Verb, resource, ListResource(lister, watcher, reqScope, true, a.minRequestTimeout))
handler := metrics.InstrumentRouteFunc(action.Verb, resource, handlers.ListResource(lister, watcher, reqScope, true, a.minRequestTimeout))
route := ws.GET(action.Path).To(handler).
Doc(doc).
Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")).
@ -761,7 +763,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
if hasSubresource {
doc = "connect " + method + " requests to " + subresource + " of " + kind
}
handler := metrics.InstrumentRouteFunc(action.Verb, resource, ConnectResource(connecter, reqScope, admit, path))
handler := metrics.InstrumentRouteFunc(action.Verb, resource, handlers.ConnectResource(connecter, reqScope, admit, path))
route := ws.Method(method).Path(action.Path).
To(handler).
Doc(doc).
@ -802,7 +804,7 @@ type rootScopeNaming struct {
}
// rootScopeNaming implements ScopeNamer
var _ ScopeNamer = rootScopeNaming{}
var _ handlers.ScopeNamer = rootScopeNaming{}
// Namespace returns an empty string because root scoped objects have no namespace.
func (n rootScopeNaming) Namespace(req *restful.Request) (namespace string, err error) {
@ -866,7 +868,7 @@ type scopeNaming struct {
}
// scopeNaming implements ScopeNamer
var _ ScopeNamer = scopeNaming{}
var _ handlers.ScopeNamer = scopeNaming{}
// Namespace returns the namespace from the path or the default.
func (n scopeNaming) Namespace(req *restful.Request) (namespace string, err error) {

View File

@ -1,42 +0,0 @@
/*
Copyright 2014 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package apiserver
import (
"github.com/emicklei/go-restful"
"k8s.io/kubernetes/pkg/api"
apierrors "k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/runtime/schema"
)
func InstallServiceErrorHandler(s runtime.NegotiatedSerializer, container *restful.Container) {
container.ServiceErrorHandler(func(serviceErr restful.ServiceError, request *restful.Request, response *restful.Response) {
serviceErrorHandler(s, serviceErr, request, response)
})
}
func serviceErrorHandler(s runtime.NegotiatedSerializer, serviceErr restful.ServiceError, request *restful.Request, response *restful.Response) {
errorNegotiated(
apierrors.NewGenericServerResponse(serviceErr.Code, "", api.Resource(""), "", serviceErr.Message, 0, false),
s,
schema.GroupVersion{},
response.ResponseWriter,
request.Request,
)
}

View File

@ -21,7 +21,6 @@ import (
"fmt"
"io"
"io/ioutil"
"math/rand"
"net/http"
"net/http/httptest"
"net/url"
@ -33,9 +32,9 @@ import (
"golang.org/x/net/websocket"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/rest"
apitesting "k8s.io/kubernetes/pkg/api/testing"
apiv1 "k8s.io/kubernetes/pkg/api/v1"
metav1 "k8s.io/kubernetes/pkg/apis/meta/v1"
"k8s.io/kubernetes/pkg/apiserver/handlers"
apiservertesting "k8s.io/kubernetes/pkg/apiserver/testing"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/labels"
@ -579,16 +578,16 @@ func TestWatchHTTPTimeout(t *testing.T) {
serializer := info.StreamSerializer
// Setup a new watchserver
watchServer := &WatchServer{
watching: watcher,
watchServer := &handlers.WatchServer{
Watching: watcher,
mediaType: "testcase/json",
framer: serializer.Framer,
encoder: newCodec,
embeddedEncoder: newCodec,
MediaType: "testcase/json",
Framer: serializer.Framer,
Encoder: newCodec,
EmbeddedEncoder: newCodec,
fixup: func(obj runtime.Object) {},
t: &fakeTimeoutFactory{timeoutCh, done},
Fixup: func(obj runtime.Object) {},
TimeoutFactory: &fakeTimeoutFactory{timeoutCh, done},
}
s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
@ -632,18 +631,6 @@ func TestWatchHTTPTimeout(t *testing.T) {
}
}
const benchmarkSeed = 100
func benchmarkItems() []api.Pod {
apiObjectFuzzer := apitesting.FuzzerFor(nil, api.SchemeGroupVersion, rand.NewSource(benchmarkSeed))
items := make([]api.Pod, 3)
for i := range items {
apiObjectFuzzer.Fuzz(&items[i])
items[i].Spec.InitContainers, items[i].Status.InitContainerStatuses = nil, nil
}
return items
}
// BenchmarkWatchHTTP measures the cost of serving a watch.
func BenchmarkWatchHTTP(b *testing.B) {
items := benchmarkItems()

View File

@ -42,7 +42,7 @@ import (
apiserverauthenticator "k8s.io/kubernetes/pkg/apiserver/authenticator"
apiserverfilters "k8s.io/kubernetes/pkg/apiserver/filters"
apiserveropenapi "k8s.io/kubernetes/pkg/apiserver/openapi"
"k8s.io/kubernetes/pkg/apiserver/request"
apiserverrequest "k8s.io/kubernetes/pkg/apiserver/request"
"k8s.io/kubernetes/pkg/auth/authenticator"
"k8s.io/kubernetes/pkg/auth/authorizer"
authorizerunion "k8s.io/kubernetes/pkg/auth/authorizer/union"
@ -627,7 +627,7 @@ func (s *GenericAPIServer) installAPI(c *Config) {
s.HandlerContainer.Add(s.DynamicApisDiscovery())
}
func NewRequestInfoResolver(c *Config) *request.RequestInfoFactory {
func NewRequestInfoResolver(c *Config) *apiserverrequest.RequestInfoFactory {
apiPrefixes := sets.NewString(strings.Trim(APIGroupPrefix, "/")) // all possible API prefixes
legacyAPIPrefixes := sets.String{} // APIPrefixes that won't have groups (legacy)
for legacyAPIPrefix := range c.LegacyAPIGroupPrefixes {
@ -635,7 +635,7 @@ func NewRequestInfoResolver(c *Config) *request.RequestInfoFactory {
legacyAPIPrefixes.Insert(strings.Trim(legacyAPIPrefix, "/"))
}
return &request.RequestInfoFactory{
return &apiserverrequest.RequestInfoFactory{
APIPrefixes: apiPrefixes,
GrouplessAPIPrefixes: legacyAPIPrefixes,
}

View File

@ -19,16 +19,16 @@ package filters
import (
"net/http"
"k8s.io/kubernetes/pkg/apiserver/request"
apiserverrequest "k8s.io/kubernetes/pkg/apiserver/request"
"k8s.io/kubernetes/pkg/util/sets"
)
// LongRunningRequestCheck is a predicate which is true for long-running http requests.
type LongRunningRequestCheck func(r *http.Request, requestInfo *request.RequestInfo) bool
type LongRunningRequestCheck func(r *http.Request, requestInfo *apiserverrequest.RequestInfo) bool
// BasicLongRunningRequestCheck returns true if the given request has one of the specified verbs or one of the specified subresources
func BasicLongRunningRequestCheck(longRunningVerbs, longRunningSubresources sets.String) LongRunningRequestCheck {
return func(r *http.Request, requestInfo *request.RequestInfo) bool {
return func(r *http.Request, requestInfo *apiserverrequest.RequestInfo) bool {
if longRunningVerbs.Has(requestInfo.Verb) {
return true
}

View File

@ -22,7 +22,7 @@ import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/apiserver/request"
apiserverrequest "k8s.io/kubernetes/pkg/apiserver/request"
"k8s.io/kubernetes/pkg/httplog"
"k8s.io/kubernetes/pkg/util/sets"
@ -67,7 +67,7 @@ func WithMaxInFlightLimit(
handleError(w, r, fmt.Errorf("no context found for request, handler chain must be wrong"))
return
}
requestInfo, ok := request.RequestInfoFrom(ctx)
requestInfo, ok := apiserverrequest.RequestInfoFrom(ctx)
if !ok {
handleError(w, r, fmt.Errorf("no RequestInfo found in context, handler chain must be wrong"))
return

View File

@ -27,7 +27,7 @@ import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
apiserverfilters "k8s.io/kubernetes/pkg/apiserver/filters"
"k8s.io/kubernetes/pkg/apiserver/request"
apiserverrequest "k8s.io/kubernetes/pkg/apiserver/request"
"k8s.io/kubernetes/pkg/util/sets"
)
@ -36,7 +36,7 @@ func createMaxInflightServer(callsWg, blockWg *sync.WaitGroup, disableCallsWg *b
longRunningRequestCheck := BasicLongRunningRequestCheck(sets.NewString("watch"), sets.NewString("proxy"))
requestContextMapper := api.NewRequestContextMapper()
requestInfoFactory := &request.RequestInfoFactory{APIPrefixes: sets.NewString("apis", "api"), GrouplessAPIPrefixes: sets.NewString("api")}
requestInfoFactory := &apiserverrequest.RequestInfoFactory{APIPrefixes: sets.NewString("apis", "api"), GrouplessAPIPrefixes: sets.NewString("api")}
handler := WithMaxInFlightLimit(
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// A short, accounted request that does not wait for block WaitGroup.

View File

@ -24,7 +24,7 @@ import (
"k8s.io/kubernetes/pkg/api"
apierrors "k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/apiserver/request"
apiserverrequest "k8s.io/kubernetes/pkg/apiserver/request"
"k8s.io/kubernetes/pkg/httplog"
"k8s.io/kubernetes/pkg/util/runtime"
)
@ -39,12 +39,12 @@ func WithPanicRecovery(handler http.Handler, requestContextMapper api.RequestCon
logger := httplog.NewLogged(req, &w)
var requestInfo *request.RequestInfo
var requestInfo *apiserverrequest.RequestInfo
ctx, ok := requestContextMapper.Get(req)
if !ok {
glog.Errorf("no context found for request, handler chain must be wrong")
} else {
requestInfo, ok = request.RequestInfoFrom(ctx)
requestInfo, ok = apiserverrequest.RequestInfoFrom(ctx)
if !ok {
glog.Errorf("no RequestInfo found in context, handler chain must be wrong")
}

View File

@ -27,7 +27,7 @@ import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/apiserver/request"
apiserverrequest "k8s.io/kubernetes/pkg/apiserver/request"
)
const globalTimeout = time.Minute
@ -46,7 +46,7 @@ func WithTimeoutForNonLongRunningRequests(handler http.Handler, requestContextMa
return time.After(globalTimeout), ""
}
requestInfo, ok := request.RequestInfoFrom(ctx)
requestInfo, ok := apiserverrequest.RequestInfoFrom(ctx)
if !ok {
return time.After(globalTimeout), ""
}

View File

@ -17,9 +17,13 @@ go_library(
],
tags = ["automanaged"],
deps = [
"//pkg/apiserver:go_default_library",
"//pkg/api:go_default_library",
"//pkg/api/errors:go_default_library",
"//pkg/apiserver/handlers/responsewriters:go_default_library",
"//pkg/runtime:go_default_library",
"//pkg/runtime/schema:go_default_library",
"//vendor:github.com/emicklei/go-restful",
"//vendor:github.com/golang/glog",
],
)

View File

@ -17,12 +17,19 @@ limitations under the License.
package mux
import (
"bytes"
"fmt"
"net/http"
rt "runtime"
"github.com/emicklei/go-restful"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/apiserver"
"k8s.io/kubernetes/pkg/api"
apierrors "k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/apiserver/handlers/responsewriters"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/runtime/schema"
)
// APIContainer is a restful container which in addition support registering
@ -48,9 +55,42 @@ func NewAPIContainer(mux *http.ServeMux, s runtime.NegotiatedSerializer) *APICon
}
c.Container.ServeMux = mux
c.Container.Router(restful.CurlyRouter{}) // e.g. for proxy/{kind}/{name}/{*}
apiserver.InstallRecoverHandler(s, c.Container)
apiserver.InstallServiceErrorHandler(s, c.Container)
c.Container.RecoverHandler(func(panicReason interface{}, httpWriter http.ResponseWriter) {
logStackOnRecover(s, panicReason, httpWriter)
})
c.Container.ServiceErrorHandler(func(serviceErr restful.ServiceError, request *restful.Request, response *restful.Response) {
serviceErrorHandler(s, serviceErr, request, response)
})
return &c
}
//TODO: Unify with RecoverPanics?
func logStackOnRecover(s runtime.NegotiatedSerializer, panicReason interface{}, w http.ResponseWriter) {
var buffer bytes.Buffer
buffer.WriteString(fmt.Sprintf("recover from panic situation: - %v\r\n", panicReason))
for i := 2; ; i++ {
_, file, line, ok := rt.Caller(i)
if !ok {
break
}
buffer.WriteString(fmt.Sprintf(" %s:%d\r\n", file, line))
}
glog.Errorln(buffer.String())
headers := http.Header{}
if ct := w.Header().Get("Content-Type"); len(ct) > 0 {
headers.Set("Accept", ct)
}
responsewriters.ErrorNegotiated(apierrors.NewGenericServerResponse(http.StatusInternalServerError, "", api.Resource(""), "", "", 0, false), s, schema.GroupVersion{}, w, &http.Request{Header: headers})
}
func serviceErrorHandler(s runtime.NegotiatedSerializer, serviceErr restful.ServiceError, request *restful.Request, resp *restful.Response) {
responsewriters.ErrorNegotiated(
apierrors.NewGenericServerResponse(serviceErr.Code, "", api.Resource(""), "", serviceErr.Message, 0, false),
s,
schema.GroupVersion{},
resp,
request.Request,
)
}

View File

@ -22,7 +22,7 @@ go_library(
tags = ["automanaged"],
deps = [
"//pkg/apis/meta/v1:go_default_library",
"//pkg/apiserver:go_default_library",
"//pkg/apiserver/handlers/responsewriters:go_default_library",
"//pkg/apiserver/metrics:go_default_library",
"//pkg/genericapiserver/mux:go_default_library",
"//pkg/genericapiserver/openapi:go_default_library",

View File

@ -21,7 +21,7 @@ import (
"sort"
metav1 "k8s.io/kubernetes/pkg/apis/meta/v1"
"k8s.io/kubernetes/pkg/apiserver"
"k8s.io/kubernetes/pkg/apiserver/handlers/responsewriters"
"k8s.io/kubernetes/pkg/genericapiserver/mux"
)
@ -45,6 +45,6 @@ func (i Index) Install(c *mux.APIContainer) {
// Extract the paths handled using mux handler.
handledPaths = append(handledPaths, c.NonSwaggerRoutes.HandledPaths()...)
sort.Strings(handledPaths)
apiserver.WriteRawJSON(status, metav1.RootPaths{Paths: handledPaths}, w)
responsewriters.WriteRawJSON(status, metav1.RootPaths{Paths: handledPaths}, w)
})
}

View File

@ -21,7 +21,7 @@ import (
"github.com/emicklei/go-restful"
"k8s.io/kubernetes/pkg/apiserver"
"k8s.io/kubernetes/pkg/apiserver/handlers/responsewriters"
"k8s.io/kubernetes/pkg/genericapiserver/mux"
"k8s.io/kubernetes/pkg/version"
)
@ -54,5 +54,5 @@ func (v Version) Install(c *mux.APIContainer) {
// handleVersion writes the server's version information.
func (v Version) handleVersion(req *restful.Request, resp *restful.Response) {
apiserver.WriteRawJSON(http.StatusOK, *v.Version, resp.ResponseWriter)
responsewriters.WriteRawJSON(http.StatusOK, *v.Version, resp.ResponseWriter)
}

View File

@ -19,6 +19,7 @@ go_library(
"//pkg/apis/extensions:go_default_library",
"//pkg/apis/meta/v1:go_default_library",
"//pkg/apiserver:go_default_library",
"//pkg/apiserver/handlers:go_default_library",
"//pkg/genericapiserver:go_default_library",
"//pkg/registry/extensions/rest:go_default_library",
"//pkg/registry/extensions/thirdpartyresourcedata:go_default_library",

View File

@ -30,6 +30,7 @@ import (
"k8s.io/kubernetes/pkg/apis/extensions"
metav1 "k8s.io/kubernetes/pkg/apis/meta/v1"
"k8s.io/kubernetes/pkg/apiserver"
apiserverhandlers "k8s.io/kubernetes/pkg/apiserver/handlers"
"k8s.io/kubernetes/pkg/genericapiserver"
extensionsrest "k8s.io/kubernetes/pkg/registry/extensions/rest"
"k8s.io/kubernetes/pkg/registry/extensions/thirdpartyresourcedata"
@ -51,7 +52,7 @@ func (d dynamicLister) ListAPIResources() []metav1.APIResource {
return d.m.getExistingThirdPartyResources(d.path)
}
var _ apiserver.APIResourceLister = &dynamicLister{}
var _ apiserverhandlers.APIResourceLister = &dynamicLister{}
type ThirdPartyResourceServer struct {
genericAPIServer *genericapiserver.GenericAPIServer