Rename ServeWithListenerStopped to Serve in secure_serving

This PR removes Serve function and uses all required places
ServeWithListenerStopped which takes place new Serve function.

This function returns ListenerStopped channel can be used to drain
requests before shutting down the server.
This commit is contained in:
Arda Güçlü 2021-11-08 17:20:31 +03:00
parent 582c4ebe26
commit a8d2b3a792
6 changed files with 14 additions and 74 deletions

View File

@ -216,8 +216,8 @@ func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error {
if c.SecureServing != nil {
unsecuredMux = genericcontrollermanager.NewBaseHandler(&c.ComponentConfig.Generic.Debugging, healthzHandler)
handler := genericcontrollermanager.BuildHandlerChain(unsecuredMux, &c.Authorization, &c.Authentication)
// TODO: handle stoppedCh returned by c.SecureServing.Serve
if _, err := c.SecureServing.Serve(handler, 0, stopCh); err != nil {
// TODO: handle stoppedCh and listenerStoppedCh returned by c.SecureServing.Serve
if _, _, err := c.SecureServing.Serve(handler, 0, stopCh); err != nil {
return err
}
}

View File

@ -169,8 +169,8 @@ func Run(ctx context.Context, cc *schedulerserverconfig.CompletedConfig, sched *
// Start up the healthz server.
if cc.SecureServing != nil {
handler := buildHandlerChain(newHealthzAndMetricsHandler(&cc.ComponentConfig, cc.InformerFactory, isLeader, checks...), cc.Authentication.Authenticator, cc.Authorization.Authorizer)
// TODO: handle stoppedCh returned by c.SecureServing.Serve
if _, err := cc.SecureServing.Serve(handler, 0, ctx.Done()); err != nil {
// TODO: handle stoppedCh and listenerStoppedCh returned by c.SecureServing.Serve
if _, _, err := cc.SecureServing.Serve(handler, 0, ctx.Done()); err != nil {
// fail early for secure handlers, removing the old error loop from above
return fmt.Errorf("failed to start secure server: %v", err)
}

View File

@ -490,7 +490,7 @@ func (s preparedGenericAPIServer) NonBlockingRun(stopCh <-chan struct{}, shutdow
var listenerStoppedCh <-chan struct{}
if s.SecureServingInfo != nil && s.Handler != nil {
var err error
stoppedCh, listenerStoppedCh, err = s.SecureServingInfo.ServeWithListenerStopped(s.Handler, shutdownTimeout, internalStopCh)
stoppedCh, listenerStoppedCh, err = s.SecureServingInfo.Serve(s.Handler, shutdownTimeout, internalStopCh)
if err != nil {
close(internalStopCh)
close(auditStopCh)

View File

@ -142,73 +142,8 @@ func (s *SecureServingInfo) tlsConfig(stopCh <-chan struct{}) (*tls.Config, erro
// Serve runs the secure http server. It fails only if certificates cannot be loaded or the initial listen call fails.
// The actual server loop (stoppable by closing stopCh) runs in a go routine, i.e. Serve does not block.
// It returns a stoppedCh that is closed when all non-hijacked active requests have been processed.
func (s *SecureServingInfo) Serve(handler http.Handler, shutdownTimeout time.Duration, stopCh <-chan struct{}) (<-chan struct{}, error) {
if s.Listener == nil {
return nil, fmt.Errorf("listener must not be nil")
}
tlsConfig, err := s.tlsConfig(stopCh)
if err != nil {
return nil, err
}
secureServer := &http.Server{
Addr: s.Listener.Addr().String(),
Handler: handler,
MaxHeaderBytes: 1 << 20,
TLSConfig: tlsConfig,
IdleTimeout: 90 * time.Second, // matches http.DefaultTransport keep-alive timeout
ReadHeaderTimeout: 32 * time.Second, // just shy of requestTimeoutUpperBound
}
// At least 99% of serialized resources in surveyed clusters were smaller than 256kb.
// This should be big enough to accommodate most API POST requests in a single frame,
// and small enough to allow a per connection buffer of this size multiplied by `MaxConcurrentStreams`.
const resourceBody99Percentile = 256 * 1024
http2Options := &http2.Server{
IdleTimeout: 90 * time.Second, // matches http.DefaultTransport keep-alive timeout
}
// shrink the per-stream buffer and max framesize from the 1MB default while still accommodating most API POST requests in a single frame
http2Options.MaxUploadBufferPerStream = resourceBody99Percentile
http2Options.MaxReadFrameSize = resourceBody99Percentile
// use the overridden concurrent streams setting or make the default of 250 explicit so we can size MaxUploadBufferPerConnection appropriately
if s.HTTP2MaxStreamsPerConnection > 0 {
http2Options.MaxConcurrentStreams = uint32(s.HTTP2MaxStreamsPerConnection)
} else {
http2Options.MaxConcurrentStreams = 250
}
// increase the connection buffer size from the 1MB default to handle the specified number of concurrent streams
http2Options.MaxUploadBufferPerConnection = http2Options.MaxUploadBufferPerStream * int32(http2Options.MaxConcurrentStreams)
if !s.DisableHTTP2 {
// apply settings to the server
if err := http2.ConfigureServer(secureServer, http2Options); err != nil {
return nil, fmt.Errorf("error configuring http2: %v", err)
}
}
// use tlsHandshakeErrorWriter to handle messages of tls handshake error
tlsErrorWriter := &tlsHandshakeErrorWriter{os.Stderr}
tlsErrorLogger := log.New(tlsErrorWriter, "", 0)
secureServer.ErrorLog = tlsErrorLogger
klog.Infof("Serving securely on %s", secureServer.Addr)
stoppedCh, _, err := RunServer(secureServer, s.Listener, shutdownTimeout, stopCh)
return stoppedCh, err
}
// ServeWithListenerStopped runs the secure http server. It fails only if certificates cannot be loaded or the initial listen call fails.
// The actual server loop (stoppable by closing stopCh) runs in a go routine, i.e. ServeWithListenerStopped does not block.
// It returns a stoppedCh that is closed when all non-hijacked active requests have been processed.
// It returns a listenerStoppedCh that is closed when the underlying http Server has stopped listening.
// TODO: do a follow up PR to remove this function, change 'Serve' to return listenerStoppedCh
// and update all components that call 'Serve'
func (s *SecureServingInfo) ServeWithListenerStopped(handler http.Handler, shutdownTimeout time.Duration, stopCh <-chan struct{}) (<-chan struct{}, <-chan struct{}, error) {
func (s *SecureServingInfo) Serve(handler http.Handler, shutdownTimeout time.Duration, stopCh <-chan struct{}) (<-chan struct{}, <-chan struct{}, error) {
if s.Listener == nil {
return nil, nil, fmt.Errorf("listener must not be nil")
}

View File

@ -163,8 +163,8 @@ func Run(c *cloudcontrollerconfig.CompletedConfig, cloud cloudprovider.Interface
if c.SecureServing != nil {
unsecuredMux := genericcontrollermanager.NewBaseHandler(&c.ComponentConfig.Generic.Debugging, healthzHandler)
handler := genericcontrollermanager.BuildHandlerChain(unsecuredMux, &c.Authorization, &c.Authentication)
// TODO: handle stoppedCh returned by c.SecureServing.Serve
if _, err := c.SecureServing.Serve(handler, 0, stopCh); err != nil {
// TODO: handle stoppedCh and listenerStoppedCh returned by c.SecureServing.Serve
if _, _, err := c.SecureServing.Serve(handler, 0, stopCh); err != nil {
return err
}
}

View File

@ -131,16 +131,21 @@ func (s *Server) Start(ctx context.Context) error {
}
var shutdownCh <-chan struct{}
var listenerStoppedCh <-chan struct{}
if s.secureServing != nil {
var err error
shutdownCh, err = s.secureServing.Serve(mux, 0, ctx.Done())
shutdownCh, listenerStoppedCh, err = s.secureServing.Serve(mux, 0, ctx.Done())
if err != nil {
return fmt.Errorf("failed to start secure server: %w", err)
}
}
<-listenerStoppedCh
klog.V(1).InfoS("[graceful-termination] HTTP Server is stopped listening")
// Wait for graceful shutdown.
<-shutdownCh
klog.V(1).Info("[graceful-termination] HTTP Server is exiting")
return nil
}