mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-08 03:33:56 +00:00
Pipe minRequestTimeout as an arg to the apiserver
This commit is contained in:
parent
a7ee5559b1
commit
448867073d
@ -96,6 +96,7 @@ type APIServer struct {
|
|||||||
ClusterName string
|
ClusterName string
|
||||||
EnableProfiling bool
|
EnableProfiling bool
|
||||||
MaxRequestsInFlight int
|
MaxRequestsInFlight int
|
||||||
|
MinRequestTimeout int
|
||||||
LongRunningRequestRE string
|
LongRunningRequestRE string
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -204,6 +205,7 @@ func (s *APIServer) AddFlags(fs *pflag.FlagSet) {
|
|||||||
fs.BoolVar(&s.EnableProfiling, "profiling", true, "Enable profiling via web interface host:port/debug/pprof/")
|
fs.BoolVar(&s.EnableProfiling, "profiling", true, "Enable profiling via web interface host:port/debug/pprof/")
|
||||||
fs.StringVar(&s.ExternalHost, "external-hostname", "", "The hostname to use when generating externalized URLs for this master (e.g. Swagger API Docs.)")
|
fs.StringVar(&s.ExternalHost, "external-hostname", "", "The hostname to use when generating externalized URLs for this master (e.g. Swagger API Docs.)")
|
||||||
fs.IntVar(&s.MaxRequestsInFlight, "max-requests-inflight", 400, "The maximum number of requests in flight at a given time. When the server exceeds this, it rejects requests. Zero for no limit.")
|
fs.IntVar(&s.MaxRequestsInFlight, "max-requests-inflight", 400, "The maximum number of requests in flight at a given time. When the server exceeds this, it rejects requests. Zero for no limit.")
|
||||||
|
fs.IntVar(&s.MinRequestTimeout, "min-request-timeout", 300, "An optional field indicating the minimum number of seconds a handler must keep a request open before timing it out.")
|
||||||
fs.StringVar(&s.LongRunningRequestRE, "long-running-request-regexp", "[.*\\/watch$][^\\/proxy.*]", "A regular expression matching long running requests which should be excluded from maximum inflight request handling.")
|
fs.StringVar(&s.LongRunningRequestRE, "long-running-request-regexp", "[.*\\/watch$][^\\/proxy.*]", "A regular expression matching long running requests which should be excluded from maximum inflight request handling.")
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -380,6 +382,7 @@ func (s *APIServer) Run(_ []string) error {
|
|||||||
MasterServiceNamespace: s.MasterServiceNamespace,
|
MasterServiceNamespace: s.MasterServiceNamespace,
|
||||||
ClusterName: s.ClusterName,
|
ClusterName: s.ClusterName,
|
||||||
ExternalHost: s.ExternalHost,
|
ExternalHost: s.ExternalHost,
|
||||||
|
MinRequestTimeout: s.MinRequestTimeout,
|
||||||
}
|
}
|
||||||
m := master.New(config)
|
m := master.New(config)
|
||||||
|
|
||||||
|
@ -40,6 +40,7 @@ type APIInstaller struct {
|
|||||||
group *APIGroupVersion
|
group *APIGroupVersion
|
||||||
info *APIRequestInfoResolver
|
info *APIRequestInfoResolver
|
||||||
prefix string // Path prefix where API resources are to be registered.
|
prefix string // Path prefix where API resources are to be registered.
|
||||||
|
minRequestTimeout int
|
||||||
}
|
}
|
||||||
|
|
||||||
// Struct capturing information about an action ("GET", "POST", "WATCH", PROXY", etc).
|
// Struct capturing information about an action ("GET", "POST", "WATCH", PROXY", etc).
|
||||||
@ -419,7 +420,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
|
|||||||
addParams(route, action.Params)
|
addParams(route, action.Params)
|
||||||
ws.Route(route)
|
ws.Route(route)
|
||||||
case "LIST": // List all resources of a kind.
|
case "LIST": // List all resources of a kind.
|
||||||
route := ws.GET(action.Path).To(ListResource(lister, watcher, reqScope, false)).
|
route := ws.GET(action.Path).To(ListResource(lister, watcher, reqScope, false, a.minRequestTimeout)).
|
||||||
Filter(m).
|
Filter(m).
|
||||||
Doc("list objects of kind "+kind).
|
Doc("list objects of kind "+kind).
|
||||||
Operation("list"+kind).
|
Operation("list"+kind).
|
||||||
@ -492,7 +493,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
|
|||||||
ws.Route(route)
|
ws.Route(route)
|
||||||
// TODO: deprecated
|
// TODO: deprecated
|
||||||
case "WATCH": // Watch a resource.
|
case "WATCH": // Watch a resource.
|
||||||
route := ws.GET(action.Path).To(ListResource(lister, watcher, reqScope, true)).
|
route := ws.GET(action.Path).To(ListResource(lister, watcher, reqScope, true, a.minRequestTimeout)).
|
||||||
Filter(m).
|
Filter(m).
|
||||||
Doc("watch changes to an object of kind "+kind).
|
Doc("watch changes to an object of kind "+kind).
|
||||||
Operation("watch"+kind).
|
Operation("watch"+kind).
|
||||||
@ -506,7 +507,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
|
|||||||
ws.Route(route)
|
ws.Route(route)
|
||||||
// TODO: deprecated
|
// TODO: deprecated
|
||||||
case "WATCHLIST": // Watch all resources of a kind.
|
case "WATCHLIST": // Watch all resources of a kind.
|
||||||
route := ws.GET(action.Path).To(ListResource(lister, watcher, reqScope, true)).
|
route := ws.GET(action.Path).To(ListResource(lister, watcher, reqScope, true, a.minRequestTimeout)).
|
||||||
Filter(m).
|
Filter(m).
|
||||||
Doc("watch individual changes to a list of "+kind).
|
Doc("watch individual changes to a list of "+kind).
|
||||||
Operation("watch"+kind+"list").
|
Operation("watch"+kind+"list").
|
||||||
|
@ -140,10 +140,16 @@ const (
|
|||||||
MaxTimeoutSecs = 600
|
MaxTimeoutSecs = 600
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// restContainer is a wrapper around a generic restful Container that also contains a MinRequestTimeout
|
||||||
|
type RestContainer struct {
|
||||||
|
*restful.Container
|
||||||
|
MinRequestTimeout int
|
||||||
|
}
|
||||||
|
|
||||||
// InstallREST registers the REST handlers (storage, watch, proxy and redirect) into a restful Container.
|
// InstallREST registers the REST handlers (storage, watch, proxy and redirect) into a restful Container.
|
||||||
// It is expected that the provided path root prefix will serve all operations. Root MUST NOT end
|
// It is expected that the provided path root prefix will serve all operations. Root MUST NOT end
|
||||||
// in a slash. A restful WebService is created for the group and version.
|
// in a slash. A restful WebService is created for the group and version.
|
||||||
func (g *APIGroupVersion) InstallREST(container *restful.Container) error {
|
func (g *APIGroupVersion) InstallREST(container *RestContainer) error {
|
||||||
info := &APIRequestInfoResolver{util.NewStringSet(strings.TrimPrefix(g.Root, "/")), g.Mapper}
|
info := &APIRequestInfoResolver{util.NewStringSet(strings.TrimPrefix(g.Root, "/")), g.Mapper}
|
||||||
|
|
||||||
prefix := path.Join(g.Root, g.Version)
|
prefix := path.Join(g.Root, g.Version)
|
||||||
@ -151,6 +157,7 @@ func (g *APIGroupVersion) InstallREST(container *restful.Container) error {
|
|||||||
group: g,
|
group: g,
|
||||||
info: info,
|
info: info,
|
||||||
prefix: prefix,
|
prefix: prefix,
|
||||||
|
minRequestTimeout: container.MinRequestTimeout,
|
||||||
}
|
}
|
||||||
ws, registrationErrors := installer.Install()
|
ws, registrationErrors := installer.Install()
|
||||||
container.Add(ws)
|
container.Add(ws)
|
||||||
|
@ -231,7 +231,7 @@ func handleInternal(legacy bool, storage map[string]rest.Storage, admissionContr
|
|||||||
container := restful.NewContainer()
|
container := restful.NewContainer()
|
||||||
container.Router(restful.CurlyRouter{})
|
container.Router(restful.CurlyRouter{})
|
||||||
mux := container.ServeMux
|
mux := container.ServeMux
|
||||||
if err := group.InstallREST(container); err != nil {
|
if err := group.InstallREST(&RestContainer{container, 0}); err != nil {
|
||||||
panic(fmt.Sprintf("unable to install container %s: %v", group.Version, err))
|
panic(fmt.Sprintf("unable to install container %s: %v", group.Version, err))
|
||||||
}
|
}
|
||||||
ws := new(restful.WebService)
|
ws := new(restful.WebService)
|
||||||
@ -1901,7 +1901,7 @@ func TestParentResourceIsRequired(t *testing.T) {
|
|||||||
Codec: newCodec,
|
Codec: newCodec,
|
||||||
}
|
}
|
||||||
container := restful.NewContainer()
|
container := restful.NewContainer()
|
||||||
if err := group.InstallREST(container); err == nil {
|
if err := group.InstallREST(&RestContainer{container, 0}); err == nil {
|
||||||
t.Fatal("expected error")
|
t.Fatal("expected error")
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1929,7 +1929,7 @@ func TestParentResourceIsRequired(t *testing.T) {
|
|||||||
Codec: newCodec,
|
Codec: newCodec,
|
||||||
}
|
}
|
||||||
container = restful.NewContainer()
|
container = restful.NewContainer()
|
||||||
if err := group.InstallREST(container); err != nil {
|
if err := group.InstallREST(&RestContainer{container, 0}); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -185,7 +185,7 @@ func ConnectResource(connecter rest.Connecter, scope RequestScope, admit admissi
|
|||||||
}
|
}
|
||||||
|
|
||||||
// ListResource returns a function that handles retrieving a list of resources from a rest.Storage object.
|
// ListResource returns a function that handles retrieving a list of resources from a rest.Storage object.
|
||||||
func ListResource(r rest.Lister, rw rest.Watcher, scope RequestScope, forceWatch bool) restful.RouteFunction {
|
func ListResource(r rest.Lister, rw rest.Watcher, scope RequestScope, forceWatch bool, minRequestTimeout int) restful.RouteFunction {
|
||||||
return func(req *restful.Request, res *restful.Response) {
|
return func(req *restful.Request, res *restful.Response) {
|
||||||
w := res.ResponseWriter
|
w := res.ResponseWriter
|
||||||
|
|
||||||
@ -252,7 +252,7 @@ func ListResource(r rest.Lister, rw rest.Watcher, scope RequestScope, forceWatch
|
|||||||
errorJSON(err, scope.Codec, w)
|
errorJSON(err, scope.Codec, w)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
serveWatch(watcher, scope, w, req)
|
serveWatch(watcher, scope, w, req, minRequestTimeout)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -66,10 +66,12 @@ func (w *realTimeoutFactory) TimeoutCh() (<-chan time.Time, func() bool) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// serveWatch handles serving requests to the server
|
// serveWatch handles serving requests to the server
|
||||||
func serveWatch(watcher watch.Interface, scope RequestScope, w http.ResponseWriter, req *restful.Request) {
|
func serveWatch(watcher watch.Interface, scope RequestScope, w http.ResponseWriter, req *restful.Request, minRequestTimeout int) {
|
||||||
|
var timeout time.Duration
|
||||||
|
if minRequestTimeout > 0 {
|
||||||
// Each watch gets a random timeout to avoid thundering herds. Rand is seeded once in the api installer.
|
// Each watch gets a random timeout to avoid thundering herds. Rand is seeded once in the api installer.
|
||||||
timeout := time.Duration(MinTimeoutSecs+rand.Intn(MaxTimeoutSecs-MinTimeoutSecs)) * time.Second
|
timeout = time.Duration(minRequestTimeout+rand.Intn(2*minRequestTimeout-minRequestTimeout)) * time.Second
|
||||||
|
}
|
||||||
watchServer := &WatchServer{watcher, scope.Codec, func(obj runtime.Object) {
|
watchServer := &WatchServer{watcher, scope.Codec, func(obj runtime.Object) {
|
||||||
if err := setSelfLink(obj, req, scope.Namer); err != nil {
|
if err := setSelfLink(obj, req, scope.Namer); err != nil {
|
||||||
glog.V(5).Infof("Failed to set self link for object %v: %v", reflect.TypeOf(obj), err)
|
glog.V(5).Infof("Failed to set self link for object %v: %v", reflect.TypeOf(obj), err)
|
||||||
|
@ -111,6 +111,10 @@ type Config struct {
|
|||||||
// If specified, all web services will be registered into this container
|
// If specified, all web services will be registered into this container
|
||||||
RestfulContainer *restful.Container
|
RestfulContainer *restful.Container
|
||||||
|
|
||||||
|
// If specified, requests will be allocated a random timeout between this value, and twice this value.
|
||||||
|
// Note that it is upto the request handlers to ignore or honor this timeout.
|
||||||
|
MinRequestTimeout int
|
||||||
|
|
||||||
// Number of masters running; all masters must be started with the
|
// Number of masters running; all masters must be started with the
|
||||||
// same value for this field. (Numbers > 1 currently untested.)
|
// same value for this field. (Numbers > 1 currently untested.)
|
||||||
MasterCount int
|
MasterCount int
|
||||||
@ -153,7 +157,7 @@ type Master struct {
|
|||||||
|
|
||||||
mux apiserver.Mux
|
mux apiserver.Mux
|
||||||
muxHelper *apiserver.MuxHelper
|
muxHelper *apiserver.MuxHelper
|
||||||
handlerContainer *restful.Container
|
handlerContainer *apiserver.RestContainer
|
||||||
rootWebService *restful.WebService
|
rootWebService *restful.WebService
|
||||||
enableCoreControllers bool
|
enableCoreControllers bool
|
||||||
enableLogsSupport bool
|
enableLogsSupport bool
|
||||||
@ -341,14 +345,16 @@ func New(c *Config) *Master {
|
|||||||
serviceReadWritePort: 443,
|
serviceReadWritePort: 443,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var handlerContainer *restful.Container
|
||||||
if c.RestfulContainer != nil {
|
if c.RestfulContainer != nil {
|
||||||
m.mux = c.RestfulContainer.ServeMux
|
m.mux = c.RestfulContainer.ServeMux
|
||||||
m.handlerContainer = c.RestfulContainer
|
handlerContainer = c.RestfulContainer
|
||||||
} else {
|
} else {
|
||||||
mux := http.NewServeMux()
|
mux := http.NewServeMux()
|
||||||
m.mux = mux
|
m.mux = mux
|
||||||
m.handlerContainer = NewHandlerContainer(mux)
|
handlerContainer = NewHandlerContainer(mux)
|
||||||
}
|
}
|
||||||
|
m.handlerContainer = &apiserver.RestContainer{handlerContainer, c.MinRequestTimeout}
|
||||||
// Use CurlyRouter to be able to use regular expressions in paths. Regular expressions are required in paths for example for proxy (where the path is proxy/{kind}/{name}/{*})
|
// Use CurlyRouter to be able to use regular expressions in paths. Regular expressions are required in paths for example for proxy (where the path is proxy/{kind}/{name}/{*})
|
||||||
m.handlerContainer.Router(restful.CurlyRouter{})
|
m.handlerContainer.Router(restful.CurlyRouter{})
|
||||||
m.muxHelper = &apiserver.MuxHelper{m.mux, []string{}}
|
m.muxHelper = &apiserver.MuxHelper{m.mux, []string{}}
|
||||||
@ -507,16 +513,16 @@ func (m *Master) init(c *Config) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
apiserver.InstallSupport(m.muxHelper, m.rootWebService)
|
apiserver.InstallSupport(m.muxHelper, m.rootWebService)
|
||||||
apiserver.AddApiWebService(m.handlerContainer, c.APIPrefix, apiVersions)
|
apiserver.AddApiWebService(m.handlerContainer.Container, c.APIPrefix, apiVersions)
|
||||||
defaultVersion := m.defaultAPIGroupVersion()
|
defaultVersion := m.defaultAPIGroupVersion()
|
||||||
requestInfoResolver := &apiserver.APIRequestInfoResolver{util.NewStringSet(strings.TrimPrefix(defaultVersion.Root, "/")), defaultVersion.Mapper}
|
requestInfoResolver := &apiserver.APIRequestInfoResolver{util.NewStringSet(strings.TrimPrefix(defaultVersion.Root, "/")), defaultVersion.Mapper}
|
||||||
apiserver.InstallServiceErrorHandler(m.handlerContainer, requestInfoResolver, apiVersions)
|
apiserver.InstallServiceErrorHandler(m.handlerContainer.Container, requestInfoResolver, apiVersions)
|
||||||
|
|
||||||
// Register root handler.
|
// Register root handler.
|
||||||
// We do not register this using restful Webservice since we do not want to surface this in api docs.
|
// We do not register this using restful Webservice since we do not want to surface this in api docs.
|
||||||
// Allow master to be embedded in contexts which already have something registered at the root
|
// Allow master to be embedded in contexts which already have something registered at the root
|
||||||
if c.EnableIndex {
|
if c.EnableIndex {
|
||||||
m.mux.HandleFunc("/", apiserver.IndexHandler(m.handlerContainer, m.muxHelper))
|
m.mux.HandleFunc("/", apiserver.IndexHandler(m.handlerContainer.Container, m.muxHelper))
|
||||||
}
|
}
|
||||||
|
|
||||||
if c.EnableLogsSupport {
|
if c.EnableLogsSupport {
|
||||||
@ -649,7 +655,7 @@ func (m *Master) InstallSwaggerAPI() {
|
|||||||
SwaggerPath: "/swaggerui/",
|
SwaggerPath: "/swaggerui/",
|
||||||
SwaggerFilePath: "/swagger-ui/",
|
SwaggerFilePath: "/swagger-ui/",
|
||||||
}
|
}
|
||||||
swagger.RegisterSwaggerService(swaggerConfig, m.handlerContainer)
|
swagger.RegisterSwaggerService(swaggerConfig, m.handlerContainer.Container)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Master) getServersToValidate(c *Config) map[string]apiserver.Server {
|
func (m *Master) getServersToValidate(c *Config) map[string]apiserver.Server {
|
||||||
|
Loading…
Reference in New Issue
Block a user