From a8d2b3a7926394b1c53621804cdeb93e4a61b7c8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arda=20G=C3=BC=C3=A7l=C3=BC?= Date: Mon, 8 Nov 2021 17:20:31 +0300 Subject: [PATCH] Rename ServeWithListenerStopped to Serve in secure_serving This PR removes Serve function and uses all required places ServeWithListenerStopped which takes place new Serve function. This function returns ListenerStopped channel can be used to drain requests before shutting down the server. --- .../app/controllermanager.go | 4 +- cmd/kube-scheduler/app/server.go | 4 +- .../apiserver/pkg/server/genericapiserver.go | 2 +- .../apiserver/pkg/server/secure_serving.go | 67 +------------------ .../cloud-provider/app/controllermanager.go | 4 +- .../cmd/webhook/server/server.go | 7 +- 6 files changed, 14 insertions(+), 74 deletions(-) diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index 7d8f89a980c..4c743342e1d 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -216,8 +216,8 @@ func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error { if c.SecureServing != nil { unsecuredMux = genericcontrollermanager.NewBaseHandler(&c.ComponentConfig.Generic.Debugging, healthzHandler) handler := genericcontrollermanager.BuildHandlerChain(unsecuredMux, &c.Authorization, &c.Authentication) - // TODO: handle stoppedCh returned by c.SecureServing.Serve - if _, err := c.SecureServing.Serve(handler, 0, stopCh); err != nil { + // TODO: handle stoppedCh and listenerStoppedCh returned by c.SecureServing.Serve + if _, _, err := c.SecureServing.Serve(handler, 0, stopCh); err != nil { return err } } diff --git a/cmd/kube-scheduler/app/server.go b/cmd/kube-scheduler/app/server.go index 6b84fc175b9..a2ed83f511c 100644 --- a/cmd/kube-scheduler/app/server.go +++ b/cmd/kube-scheduler/app/server.go @@ -169,8 +169,8 @@ func Run(ctx context.Context, cc *schedulerserverconfig.CompletedConfig, sched * // Start up the healthz server. if cc.SecureServing != nil { handler := buildHandlerChain(newHealthzAndMetricsHandler(&cc.ComponentConfig, cc.InformerFactory, isLeader, checks...), cc.Authentication.Authenticator, cc.Authorization.Authorizer) - // TODO: handle stoppedCh returned by c.SecureServing.Serve - if _, err := cc.SecureServing.Serve(handler, 0, ctx.Done()); err != nil { + // TODO: handle stoppedCh and listenerStoppedCh returned by c.SecureServing.Serve + if _, _, err := cc.SecureServing.Serve(handler, 0, ctx.Done()); err != nil { // fail early for secure handlers, removing the old error loop from above return fmt.Errorf("failed to start secure server: %v", err) } diff --git a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go index 4308e835d0a..b43dff47d43 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go +++ b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go @@ -490,7 +490,7 @@ func (s preparedGenericAPIServer) NonBlockingRun(stopCh <-chan struct{}, shutdow var listenerStoppedCh <-chan struct{} if s.SecureServingInfo != nil && s.Handler != nil { var err error - stoppedCh, listenerStoppedCh, err = s.SecureServingInfo.ServeWithListenerStopped(s.Handler, shutdownTimeout, internalStopCh) + stoppedCh, listenerStoppedCh, err = s.SecureServingInfo.Serve(s.Handler, shutdownTimeout, internalStopCh) if err != nil { close(internalStopCh) close(auditStopCh) diff --git a/staging/src/k8s.io/apiserver/pkg/server/secure_serving.go b/staging/src/k8s.io/apiserver/pkg/server/secure_serving.go index 5626cb3a90f..d4caa08d36a 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/secure_serving.go +++ b/staging/src/k8s.io/apiserver/pkg/server/secure_serving.go @@ -142,73 +142,8 @@ func (s *SecureServingInfo) tlsConfig(stopCh <-chan struct{}) (*tls.Config, erro // Serve runs the secure http server. It fails only if certificates cannot be loaded or the initial listen call fails. // The actual server loop (stoppable by closing stopCh) runs in a go routine, i.e. Serve does not block. // It returns a stoppedCh that is closed when all non-hijacked active requests have been processed. -func (s *SecureServingInfo) Serve(handler http.Handler, shutdownTimeout time.Duration, stopCh <-chan struct{}) (<-chan struct{}, error) { - if s.Listener == nil { - return nil, fmt.Errorf("listener must not be nil") - } - - tlsConfig, err := s.tlsConfig(stopCh) - if err != nil { - return nil, err - } - - secureServer := &http.Server{ - Addr: s.Listener.Addr().String(), - Handler: handler, - MaxHeaderBytes: 1 << 20, - TLSConfig: tlsConfig, - - IdleTimeout: 90 * time.Second, // matches http.DefaultTransport keep-alive timeout - ReadHeaderTimeout: 32 * time.Second, // just shy of requestTimeoutUpperBound - } - - // At least 99% of serialized resources in surveyed clusters were smaller than 256kb. - // This should be big enough to accommodate most API POST requests in a single frame, - // and small enough to allow a per connection buffer of this size multiplied by `MaxConcurrentStreams`. - const resourceBody99Percentile = 256 * 1024 - - http2Options := &http2.Server{ - IdleTimeout: 90 * time.Second, // matches http.DefaultTransport keep-alive timeout - } - - // shrink the per-stream buffer and max framesize from the 1MB default while still accommodating most API POST requests in a single frame - http2Options.MaxUploadBufferPerStream = resourceBody99Percentile - http2Options.MaxReadFrameSize = resourceBody99Percentile - - // use the overridden concurrent streams setting or make the default of 250 explicit so we can size MaxUploadBufferPerConnection appropriately - if s.HTTP2MaxStreamsPerConnection > 0 { - http2Options.MaxConcurrentStreams = uint32(s.HTTP2MaxStreamsPerConnection) - } else { - http2Options.MaxConcurrentStreams = 250 - } - - // increase the connection buffer size from the 1MB default to handle the specified number of concurrent streams - http2Options.MaxUploadBufferPerConnection = http2Options.MaxUploadBufferPerStream * int32(http2Options.MaxConcurrentStreams) - - if !s.DisableHTTP2 { - // apply settings to the server - if err := http2.ConfigureServer(secureServer, http2Options); err != nil { - return nil, fmt.Errorf("error configuring http2: %v", err) - } - } - - // use tlsHandshakeErrorWriter to handle messages of tls handshake error - tlsErrorWriter := &tlsHandshakeErrorWriter{os.Stderr} - tlsErrorLogger := log.New(tlsErrorWriter, "", 0) - secureServer.ErrorLog = tlsErrorLogger - - klog.Infof("Serving securely on %s", secureServer.Addr) - stoppedCh, _, err := RunServer(secureServer, s.Listener, shutdownTimeout, stopCh) - return stoppedCh, err -} - -// ServeWithListenerStopped runs the secure http server. It fails only if certificates cannot be loaded or the initial listen call fails. -// The actual server loop (stoppable by closing stopCh) runs in a go routine, i.e. ServeWithListenerStopped does not block. -// It returns a stoppedCh that is closed when all non-hijacked active requests have been processed. // It returns a listenerStoppedCh that is closed when the underlying http Server has stopped listening. -// TODO: do a follow up PR to remove this function, change 'Serve' to return listenerStoppedCh -// and update all components that call 'Serve' -func (s *SecureServingInfo) ServeWithListenerStopped(handler http.Handler, shutdownTimeout time.Duration, stopCh <-chan struct{}) (<-chan struct{}, <-chan struct{}, error) { +func (s *SecureServingInfo) Serve(handler http.Handler, shutdownTimeout time.Duration, stopCh <-chan struct{}) (<-chan struct{}, <-chan struct{}, error) { if s.Listener == nil { return nil, nil, fmt.Errorf("listener must not be nil") } diff --git a/staging/src/k8s.io/cloud-provider/app/controllermanager.go b/staging/src/k8s.io/cloud-provider/app/controllermanager.go index 95aa42411c8..dd88a7c24ed 100644 --- a/staging/src/k8s.io/cloud-provider/app/controllermanager.go +++ b/staging/src/k8s.io/cloud-provider/app/controllermanager.go @@ -163,8 +163,8 @@ func Run(c *cloudcontrollerconfig.CompletedConfig, cloud cloudprovider.Interface if c.SecureServing != nil { unsecuredMux := genericcontrollermanager.NewBaseHandler(&c.ComponentConfig.Generic.Debugging, healthzHandler) handler := genericcontrollermanager.BuildHandlerChain(unsecuredMux, &c.Authorization, &c.Authentication) - // TODO: handle stoppedCh returned by c.SecureServing.Serve - if _, err := c.SecureServing.Serve(handler, 0, stopCh); err != nil { + // TODO: handle stoppedCh and listenerStoppedCh returned by c.SecureServing.Serve + if _, _, err := c.SecureServing.Serve(handler, 0, stopCh); err != nil { return err } } diff --git a/staging/src/k8s.io/pod-security-admission/cmd/webhook/server/server.go b/staging/src/k8s.io/pod-security-admission/cmd/webhook/server/server.go index 8ea6d5cf202..bbbeb73668d 100644 --- a/staging/src/k8s.io/pod-security-admission/cmd/webhook/server/server.go +++ b/staging/src/k8s.io/pod-security-admission/cmd/webhook/server/server.go @@ -131,16 +131,21 @@ func (s *Server) Start(ctx context.Context) error { } var shutdownCh <-chan struct{} + var listenerStoppedCh <-chan struct{} if s.secureServing != nil { var err error - shutdownCh, err = s.secureServing.Serve(mux, 0, ctx.Done()) + shutdownCh, listenerStoppedCh, err = s.secureServing.Serve(mux, 0, ctx.Done()) if err != nil { return fmt.Errorf("failed to start secure server: %w", err) } } + <-listenerStoppedCh + klog.V(1).InfoS("[graceful-termination] HTTP Server is stopped listening") + // Wait for graceful shutdown. <-shutdownCh + klog.V(1).Info("[graceful-termination] HTTP Server is exiting") return nil }