From 448867073d0959a0377e7b5f14253fd63d594935 Mon Sep 17 00:00:00 2001 From: Prashanth Balasubramanian Date: Tue, 26 May 2015 18:39:42 -0700 Subject: [PATCH] Pipe minRequestTimeout as an arg to the apiserver --- cmd/kube-apiserver/app/server.go | 3 +++ pkg/apiserver/api_installer.go | 13 +++++++------ pkg/apiserver/apiserver.go | 15 +++++++++++---- pkg/apiserver/apiserver_test.go | 6 +++--- pkg/apiserver/resthandler.go | 4 ++-- pkg/apiserver/watch.go | 10 ++++++---- pkg/master/master.go | 20 +++++++++++++------- 7 files changed, 45 insertions(+), 26 deletions(-) diff --git a/cmd/kube-apiserver/app/server.go b/cmd/kube-apiserver/app/server.go index d27b5098f24..b3eca45d191 100644 --- a/cmd/kube-apiserver/app/server.go +++ b/cmd/kube-apiserver/app/server.go @@ -96,6 +96,7 @@ type APIServer struct { ClusterName string EnableProfiling bool MaxRequestsInFlight int + MinRequestTimeout int LongRunningRequestRE string } @@ -204,6 +205,7 @@ func (s *APIServer) AddFlags(fs *pflag.FlagSet) { fs.BoolVar(&s.EnableProfiling, "profiling", true, "Enable profiling via web interface host:port/debug/pprof/") fs.StringVar(&s.ExternalHost, "external-hostname", "", "The hostname to use when generating externalized URLs for this master (e.g. Swagger API Docs.)") fs.IntVar(&s.MaxRequestsInFlight, "max-requests-inflight", 400, "The maximum number of requests in flight at a given time. When the server exceeds this, it rejects requests. Zero for no limit.") + fs.IntVar(&s.MinRequestTimeout, "min-request-timeout", 300, "An optional field indicating the minimum number of seconds a handler must keep a request open before timing it out.") fs.StringVar(&s.LongRunningRequestRE, "long-running-request-regexp", "[.*\\/watch$][^\\/proxy.*]", "A regular expression matching long running requests which should be excluded from maximum inflight request handling.") } @@ -380,6 +382,7 @@ func (s *APIServer) Run(_ []string) error { MasterServiceNamespace: s.MasterServiceNamespace, ClusterName: s.ClusterName, ExternalHost: s.ExternalHost, + MinRequestTimeout: s.MinRequestTimeout, } m := master.New(config) diff --git a/pkg/apiserver/api_installer.go b/pkg/apiserver/api_installer.go index 2099d5458d6..2fdcaba9073 100644 --- a/pkg/apiserver/api_installer.go +++ b/pkg/apiserver/api_installer.go @@ -37,9 +37,10 @@ import ( ) type APIInstaller struct { - group *APIGroupVersion - info *APIRequestInfoResolver - prefix string // Path prefix where API resources are to be registered. + group *APIGroupVersion + info *APIRequestInfoResolver + prefix string // Path prefix where API resources are to be registered. + minRequestTimeout int } // Struct capturing information about an action ("GET", "POST", "WATCH", PROXY", etc). @@ -419,7 +420,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag addParams(route, action.Params) ws.Route(route) case "LIST": // List all resources of a kind. - route := ws.GET(action.Path).To(ListResource(lister, watcher, reqScope, false)). + route := ws.GET(action.Path).To(ListResource(lister, watcher, reqScope, false, a.minRequestTimeout)). Filter(m). Doc("list objects of kind "+kind). Operation("list"+kind). @@ -492,7 +493,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag ws.Route(route) // TODO: deprecated case "WATCH": // Watch a resource. - route := ws.GET(action.Path).To(ListResource(lister, watcher, reqScope, true)). + route := ws.GET(action.Path).To(ListResource(lister, watcher, reqScope, true, a.minRequestTimeout)). Filter(m). Doc("watch changes to an object of kind "+kind). Operation("watch"+kind). @@ -506,7 +507,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag ws.Route(route) // TODO: deprecated case "WATCHLIST": // Watch all resources of a kind. - route := ws.GET(action.Path).To(ListResource(lister, watcher, reqScope, true)). + route := ws.GET(action.Path).To(ListResource(lister, watcher, reqScope, true, a.minRequestTimeout)). Filter(m). Doc("watch individual changes to a list of "+kind). Operation("watch"+kind+"list"). diff --git a/pkg/apiserver/apiserver.go b/pkg/apiserver/apiserver.go index 0d1637010de..addf2d447c5 100644 --- a/pkg/apiserver/apiserver.go +++ b/pkg/apiserver/apiserver.go @@ -140,17 +140,24 @@ const ( MaxTimeoutSecs = 600 ) +// restContainer is a wrapper around a generic restful Container that also contains a MinRequestTimeout +type RestContainer struct { + *restful.Container + MinRequestTimeout int +} + // InstallREST registers the REST handlers (storage, watch, proxy and redirect) into a restful Container. // It is expected that the provided path root prefix will serve all operations. Root MUST NOT end // in a slash. A restful WebService is created for the group and version. -func (g *APIGroupVersion) InstallREST(container *restful.Container) error { +func (g *APIGroupVersion) InstallREST(container *RestContainer) error { info := &APIRequestInfoResolver{util.NewStringSet(strings.TrimPrefix(g.Root, "/")), g.Mapper} prefix := path.Join(g.Root, g.Version) installer := &APIInstaller{ - group: g, - info: info, - prefix: prefix, + group: g, + info: info, + prefix: prefix, + minRequestTimeout: container.MinRequestTimeout, } ws, registrationErrors := installer.Install() container.Add(ws) diff --git a/pkg/apiserver/apiserver_test.go b/pkg/apiserver/apiserver_test.go index 1ff5cfd7f5e..fdefac6615d 100644 --- a/pkg/apiserver/apiserver_test.go +++ b/pkg/apiserver/apiserver_test.go @@ -231,7 +231,7 @@ func handleInternal(legacy bool, storage map[string]rest.Storage, admissionContr container := restful.NewContainer() container.Router(restful.CurlyRouter{}) mux := container.ServeMux - if err := group.InstallREST(container); err != nil { + if err := group.InstallREST(&RestContainer{container, 0}); err != nil { panic(fmt.Sprintf("unable to install container %s: %v", group.Version, err)) } ws := new(restful.WebService) @@ -1901,7 +1901,7 @@ func TestParentResourceIsRequired(t *testing.T) { Codec: newCodec, } container := restful.NewContainer() - if err := group.InstallREST(container); err == nil { + if err := group.InstallREST(&RestContainer{container, 0}); err == nil { t.Fatal("expected error") } @@ -1929,7 +1929,7 @@ func TestParentResourceIsRequired(t *testing.T) { Codec: newCodec, } container = restful.NewContainer() - if err := group.InstallREST(container); err != nil { + if err := group.InstallREST(&RestContainer{container, 0}); err != nil { t.Fatal(err) } diff --git a/pkg/apiserver/resthandler.go b/pkg/apiserver/resthandler.go index 66733822730..ad9e0c59f71 100644 --- a/pkg/apiserver/resthandler.go +++ b/pkg/apiserver/resthandler.go @@ -185,7 +185,7 @@ func ConnectResource(connecter rest.Connecter, scope RequestScope, admit admissi } // ListResource returns a function that handles retrieving a list of resources from a rest.Storage object. -func ListResource(r rest.Lister, rw rest.Watcher, scope RequestScope, forceWatch bool) restful.RouteFunction { +func ListResource(r rest.Lister, rw rest.Watcher, scope RequestScope, forceWatch bool, minRequestTimeout int) restful.RouteFunction { return func(req *restful.Request, res *restful.Response) { w := res.ResponseWriter @@ -252,7 +252,7 @@ func ListResource(r rest.Lister, rw rest.Watcher, scope RequestScope, forceWatch errorJSON(err, scope.Codec, w) return } - serveWatch(watcher, scope, w, req) + serveWatch(watcher, scope, w, req, minRequestTimeout) return } diff --git a/pkg/apiserver/watch.go b/pkg/apiserver/watch.go index 83c171b7b03..5a10239a3df 100644 --- a/pkg/apiserver/watch.go +++ b/pkg/apiserver/watch.go @@ -66,10 +66,12 @@ func (w *realTimeoutFactory) TimeoutCh() (<-chan time.Time, func() bool) { } // serveWatch handles serving requests to the server -func serveWatch(watcher watch.Interface, scope RequestScope, w http.ResponseWriter, req *restful.Request) { - // Each watch gets a random timeout to avoid thundering herds. Rand is seeded once in the api installer. - timeout := time.Duration(MinTimeoutSecs+rand.Intn(MaxTimeoutSecs-MinTimeoutSecs)) * time.Second - +func serveWatch(watcher watch.Interface, scope RequestScope, w http.ResponseWriter, req *restful.Request, minRequestTimeout int) { + var timeout time.Duration + if minRequestTimeout > 0 { + // Each watch gets a random timeout to avoid thundering herds. Rand is seeded once in the api installer. + timeout = time.Duration(minRequestTimeout+rand.Intn(2*minRequestTimeout-minRequestTimeout)) * time.Second + } watchServer := &WatchServer{watcher, scope.Codec, func(obj runtime.Object) { if err := setSelfLink(obj, req, scope.Namer); err != nil { glog.V(5).Infof("Failed to set self link for object %v: %v", reflect.TypeOf(obj), err) diff --git a/pkg/master/master.go b/pkg/master/master.go index 3dc7e0f96ef..ad76e0f3bc8 100644 --- a/pkg/master/master.go +++ b/pkg/master/master.go @@ -111,6 +111,10 @@ type Config struct { // If specified, all web services will be registered into this container RestfulContainer *restful.Container + // If specified, requests will be allocated a random timeout between this value, and twice this value. + // Note that it is upto the request handlers to ignore or honor this timeout. + MinRequestTimeout int + // Number of masters running; all masters must be started with the // same value for this field. (Numbers > 1 currently untested.) MasterCount int @@ -153,7 +157,7 @@ type Master struct { mux apiserver.Mux muxHelper *apiserver.MuxHelper - handlerContainer *restful.Container + handlerContainer *apiserver.RestContainer rootWebService *restful.WebService enableCoreControllers bool enableLogsSupport bool @@ -341,14 +345,16 @@ func New(c *Config) *Master { serviceReadWritePort: 443, } + var handlerContainer *restful.Container if c.RestfulContainer != nil { m.mux = c.RestfulContainer.ServeMux - m.handlerContainer = c.RestfulContainer + handlerContainer = c.RestfulContainer } else { mux := http.NewServeMux() m.mux = mux - m.handlerContainer = NewHandlerContainer(mux) + handlerContainer = NewHandlerContainer(mux) } + m.handlerContainer = &apiserver.RestContainer{handlerContainer, c.MinRequestTimeout} // Use CurlyRouter to be able to use regular expressions in paths. Regular expressions are required in paths for example for proxy (where the path is proxy/{kind}/{name}/{*}) m.handlerContainer.Router(restful.CurlyRouter{}) m.muxHelper = &apiserver.MuxHelper{m.mux, []string{}} @@ -507,16 +513,16 @@ func (m *Master) init(c *Config) { } apiserver.InstallSupport(m.muxHelper, m.rootWebService) - apiserver.AddApiWebService(m.handlerContainer, c.APIPrefix, apiVersions) + apiserver.AddApiWebService(m.handlerContainer.Container, c.APIPrefix, apiVersions) defaultVersion := m.defaultAPIGroupVersion() requestInfoResolver := &apiserver.APIRequestInfoResolver{util.NewStringSet(strings.TrimPrefix(defaultVersion.Root, "/")), defaultVersion.Mapper} - apiserver.InstallServiceErrorHandler(m.handlerContainer, requestInfoResolver, apiVersions) + apiserver.InstallServiceErrorHandler(m.handlerContainer.Container, requestInfoResolver, apiVersions) // Register root handler. // We do not register this using restful Webservice since we do not want to surface this in api docs. // Allow master to be embedded in contexts which already have something registered at the root if c.EnableIndex { - m.mux.HandleFunc("/", apiserver.IndexHandler(m.handlerContainer, m.muxHelper)) + m.mux.HandleFunc("/", apiserver.IndexHandler(m.handlerContainer.Container, m.muxHelper)) } if c.EnableLogsSupport { @@ -649,7 +655,7 @@ func (m *Master) InstallSwaggerAPI() { SwaggerPath: "/swaggerui/", SwaggerFilePath: "/swagger-ui/", } - swagger.RegisterSwaggerService(swaggerConfig, m.handlerContainer) + swagger.RegisterSwaggerService(swaggerConfig, m.handlerContainer.Container) } func (m *Master) getServersToValidate(c *Config) map[string]apiserver.Server {