mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-22 18:16:52 +00:00
Move TimeoutHandler+MaxInFlightLimit to Config.New()
This commit is contained in:
parent
1063903d01
commit
8b33a9ed42
@ -23,6 +23,7 @@ import (
|
||||
"net"
|
||||
"net/http"
|
||||
"os"
|
||||
"regexp"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
@ -157,6 +158,11 @@ type Config struct {
|
||||
// OpenAPIDefinitions is a map of type to OpenAPI spec for all types used in this API server. Failure to provide
|
||||
// this map or any of the models used by the server APIs will result in spec generation failure.
|
||||
OpenAPIDefinitions *common.OpenAPIDefinitions
|
||||
|
||||
// MaxRequestsInFlight is the maximum number of parallel non-long-running requests. Every further
|
||||
// request has to wait.
|
||||
MaxRequestsInFlight int
|
||||
LongRunningRequestRE string
|
||||
}
|
||||
|
||||
func NewConfig(options *options.ServerRunOptions) *Config {
|
||||
@ -191,6 +197,8 @@ func NewConfig(options *options.ServerRunOptions) *Config {
|
||||
Version: "unversioned",
|
||||
},
|
||||
},
|
||||
MaxRequestsInFlight: options.MaxRequestsInFlight,
|
||||
LongRunningRequestRE: options.LongRunningRequestRE,
|
||||
}
|
||||
}
|
||||
|
||||
@ -386,21 +394,34 @@ func (c Config) New() (*GenericAPIServer, error) {
|
||||
handler = authenticatedHandler
|
||||
}
|
||||
|
||||
// TODO: Make this optional? Consumers of GenericAPIServer depend on this currently.
|
||||
s.Handler = handler
|
||||
|
||||
// After all wrapping is done, put a context filter around both handlers
|
||||
var err error
|
||||
handler, err = api.NewRequestContextFilter(c.RequestContextMapper, s.Handler)
|
||||
handler, err := api.NewRequestContextFilter(c.RequestContextMapper, handler)
|
||||
if err != nil {
|
||||
glog.Fatalf("Could not initialize request context filter for s.Handler: %v", err)
|
||||
}
|
||||
|
||||
longRunningRE := regexp.MustCompile(c.LongRunningRequestRE)
|
||||
longRunningRequestCheck := apiserver.BasicLongRunningRequestCheck(longRunningRE, map[string]string{"watch": "true"})
|
||||
longRunningTimeout := func(req *http.Request) (<-chan time.Time, string) {
|
||||
// TODO unify this with apiserver.MaxInFlightLimit
|
||||
if longRunningRequestCheck(req) {
|
||||
return nil, ""
|
||||
}
|
||||
return time.After(globalTimeout), ""
|
||||
}
|
||||
handler = apiserver.TimeoutHandler(apiserver.RecoverPanics(handler, s.NewRequestInfoResolver()), longRunningTimeout)
|
||||
|
||||
var inFlightTokens chan bool
|
||||
if c.MaxRequestsInFlight > 0 {
|
||||
inFlightTokens = make(chan bool, c.MaxRequestsInFlight)
|
||||
}
|
||||
handler = apiserver.MaxInFlightLimit(inFlightTokens, longRunningRequestCheck, handler)
|
||||
s.Handler = handler
|
||||
|
||||
handler, err = api.NewRequestContextFilter(c.RequestContextMapper, s.InsecureHandler)
|
||||
if err != nil {
|
||||
glog.Fatalf("Could not initialize request context filter for s.InsecureHandler: %v", err)
|
||||
}
|
||||
handler = apiserver.TimeoutHandler(apiserver.RecoverPanics(handler, s.NewRequestInfoResolver()), longRunningTimeout)
|
||||
s.InsecureHandler = handler
|
||||
|
||||
s.installGroupsDiscoveryHandler()
|
||||
|
@ -22,7 +22,6 @@ import (
|
||||
"net"
|
||||
"net/http"
|
||||
"path"
|
||||
"regexp"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
@ -248,34 +247,16 @@ func (s *GenericAPIServer) Run(options *options.ServerRunOptions) {
|
||||
if s.enableOpenAPISupport {
|
||||
s.InstallOpenAPI()
|
||||
}
|
||||
// We serve on 2 ports. See docs/admin/accessing-the-api.md
|
||||
|
||||
secureLocation := ""
|
||||
if options.SecurePort != 0 {
|
||||
secureLocation = net.JoinHostPort(options.BindAddress.String(), strconv.Itoa(options.SecurePort))
|
||||
}
|
||||
insecureLocation := net.JoinHostPort(options.InsecureBindAddress.String(), strconv.Itoa(options.InsecurePort))
|
||||
|
||||
var sem chan bool
|
||||
if options.MaxRequestsInFlight > 0 {
|
||||
sem = make(chan bool, options.MaxRequestsInFlight)
|
||||
}
|
||||
|
||||
longRunningRE := regexp.MustCompile(options.LongRunningRequestRE)
|
||||
longRunningRequestCheck := apiserver.BasicLongRunningRequestCheck(longRunningRE, map[string]string{"watch": "true"})
|
||||
longRunningTimeout := func(req *http.Request) (<-chan time.Time, string) {
|
||||
// TODO unify this with apiserver.MaxInFlightLimit
|
||||
if longRunningRequestCheck(req) {
|
||||
return nil, ""
|
||||
}
|
||||
return time.After(globalTimeout), ""
|
||||
}
|
||||
|
||||
secureStartedCh := make(chan struct{})
|
||||
if secureLocation != "" {
|
||||
handler := apiserver.TimeoutHandler(apiserver.RecoverPanics(s.Handler, s.NewRequestInfoResolver()), longRunningTimeout)
|
||||
secureServer := &http.Server{
|
||||
Addr: secureLocation,
|
||||
Handler: apiserver.MaxInFlightLimit(sem, longRunningRequestCheck, handler),
|
||||
Handler: s.Handler,
|
||||
MaxHeaderBytes: 1 << 20,
|
||||
TLSConfig: &tls.Config{
|
||||
// Can't use SSLv3 because of POODLE and BEAST
|
||||
@ -342,10 +323,10 @@ func (s *GenericAPIServer) Run(options *options.ServerRunOptions) {
|
||||
close(secureStartedCh)
|
||||
}
|
||||
|
||||
handler := apiserver.TimeoutHandler(apiserver.RecoverPanics(s.InsecureHandler, s.NewRequestInfoResolver()), longRunningTimeout)
|
||||
insecureLocation := net.JoinHostPort(options.InsecureBindAddress.String(), strconv.Itoa(options.InsecurePort))
|
||||
http := &http.Server{
|
||||
Addr: insecureLocation,
|
||||
Handler: handler,
|
||||
Handler: s.InsecureHandler,
|
||||
MaxHeaderBytes: 1 << 20,
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user