From a84c1b71005930e8253c1348515020132c5c175b Mon Sep 17 00:00:00 2001 From: Abu Kashem Date: Thu, 24 Jun 2021 16:04:54 -0400 Subject: [PATCH] apiserver: NonBlockingRun should return a listener stopped channel NonBlockingRun should also return a channel that gets closed when the underlying http Server has stopped listening (during the graceful shutdown period) --- .../pkg/server/deprecated_insecure_serving.go | 2 +- .../apiserver/pkg/server/genericapiserver.go | 19 +++-- .../pkg/server/genericapiserver_test.go | 2 +- .../pkg/server/graceful_termination.go | 9 +++ .../apiserver/pkg/server/secure_serving.go | 72 +++++++++++++++++-- 5 files changed, 91 insertions(+), 13 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/server/deprecated_insecure_serving.go b/staging/src/k8s.io/apiserver/pkg/server/deprecated_insecure_serving.go index 655543a2513..1de20682afa 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/deprecated_insecure_serving.go +++ b/staging/src/k8s.io/apiserver/pkg/server/deprecated_insecure_serving.go @@ -52,7 +52,7 @@ func (s *DeprecatedInsecureServingInfo) Serve(handler http.Handler, shutdownTime } else { klog.Infof("Serving insecurely on %s", s.Listener.Addr()) } - _, err := RunServer(insecureServer, s.Listener, shutdownTimeout, stopCh) + _, _, err := RunServer(insecureServer, s.Listener, shutdownTimeout, stopCh) // NOTE: we do not handle stoppedCh returned by RunServer for graceful termination here return err } diff --git a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go index cf2ffae163e..5c4cbd729e3 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go +++ b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go @@ -351,10 +351,15 @@ func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error { }() // close socket after delayed stopCh - stoppedCh, err := s.NonBlockingRun(delayedStopCh.Signaled()) + stoppedCh, listenerStoppedCh, err := s.NonBlockingRun(delayedStopCh.Signaled()) if err != nil { return err } + httpServerStoppedListeningCh := s.terminationSignals.HTTPServerStoppedListening + go func() { + <-listenerStoppedCh + httpServerStoppedListeningCh.Signal() + }() drainedCh := s.terminationSignals.InFlightRequestsDrained go func() { @@ -389,7 +394,7 @@ func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error { // NonBlockingRun spawns the secure http server. An error is // returned if the secure port cannot be listened on. // The returned channel is closed when the (asynchronous) termination is finished. -func (s preparedGenericAPIServer) NonBlockingRun(stopCh <-chan struct{}) (<-chan struct{}, error) { +func (s preparedGenericAPIServer) NonBlockingRun(stopCh <-chan struct{}) (<-chan struct{}, <-chan struct{}, error) { // Use an stop channel to allow graceful shutdown without dropping audit events // after http server shutdown. auditStopCh := make(chan struct{}) @@ -398,20 +403,22 @@ func (s preparedGenericAPIServer) NonBlockingRun(stopCh <-chan struct{}) (<-chan // before http server start serving. Otherwise the Backend.ProcessEvents call might block. if s.AuditBackend != nil { if err := s.AuditBackend.Run(auditStopCh); err != nil { - return nil, fmt.Errorf("failed to run the audit backend: %v", err) + return nil, nil, fmt.Errorf("failed to run the audit backend: %v", err) } } // Use an internal stop channel to allow cleanup of the listeners on error. internalStopCh := make(chan struct{}) var stoppedCh <-chan struct{} + var listenerStoppedCh <-chan struct{} if s.SecureServingInfo != nil && s.Handler != nil { var err error - stoppedCh, err = s.SecureServingInfo.Serve(s.Handler, s.ShutdownTimeout, internalStopCh) + klog.V(1).Infof("[graceful-termination] ShutdownTimeout=%s", s.ShutdownTimeout) + stoppedCh, listenerStoppedCh, err = s.SecureServingInfo.ServeWithListenerStopped(s.Handler, s.ShutdownTimeout, internalStopCh) if err != nil { close(internalStopCh) close(auditStopCh) - return nil, err + return nil, nil, err } } @@ -434,7 +441,7 @@ func (s preparedGenericAPIServer) NonBlockingRun(stopCh <-chan struct{}) (<-chan klog.Errorf("Unable to send systemd daemon successful start message: %v\n", err) } - return stoppedCh, nil + return stoppedCh, listenerStoppedCh, nil } // installAPIResources is a private method for installing the REST storage backing each api groupversionresource diff --git a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver_test.go b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver_test.go index 3990358e4e3..7a8f2a7f90b 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver_test.go +++ b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver_test.go @@ -590,7 +590,7 @@ func TestGracefulShutdown(t *testing.T) { // get port serverPort := ln.Addr().(*net.TCPAddr).Port - stoppedCh, err := RunServer(insecureServer, ln, 10*time.Second, stopCh) + stoppedCh, _, err := RunServer(insecureServer, ln, 10*time.Second, stopCh) if err != nil { t.Fatalf("RunServer err: %v", err) } diff --git a/staging/src/k8s.io/apiserver/pkg/server/graceful_termination.go b/staging/src/k8s.io/apiserver/pkg/server/graceful_termination.go index 0841a017ef2..817ba074ebd 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/graceful_termination.go +++ b/staging/src/k8s.io/apiserver/pkg/server/graceful_termination.go @@ -47,6 +47,10 @@ T0+70s: AfterShutdownDelayDuration: shutdown delay duration has passed up to '60s' (dictated by ShutdownTimeout) - active long running requests will receive a GOAWAY. +T0+70s: HTTPServerStoppedListening: + - this event is signaled when the HTTP Server has stopped listening + which is immediately after server.Shutdown has been invoked + T0 + 70s + up-to 60s: InFlightRequestsDrained: existing in flight requests have been drained - long running requests are outside of this scope - up-to 60s: the default value of 'ShutdownTimeout' is 60s, this means that @@ -88,6 +92,10 @@ type terminationSignals struct { // InFlightRequestsDrained event is signaled when the existing requests // in flight have completed. This is used as signal to shut down the audit backends InFlightRequestsDrained terminationSignal + + // HTTPServerStoppedListening termination event is signaled when the + // HTTP Server has stopped listening to the underlying socket. + HTTPServerStoppedListening terminationSignal } // newTerminationSignals returns an instance of terminationSignals interface to be used @@ -97,6 +105,7 @@ func newTerminationSignals() terminationSignals { ShutdownInitiated: newNamedChannelWrapper("ShutdownInitiated"), AfterShutdownDelayDuration: newNamedChannelWrapper("AfterShutdownDelayDuration"), InFlightRequestsDrained: newNamedChannelWrapper("InFlightRequestsDrained"), + HTTPServerStoppedListening: newNamedChannelWrapper("HTTPServerStoppedListening"), } } diff --git a/staging/src/k8s.io/apiserver/pkg/server/secure_serving.go b/staging/src/k8s.io/apiserver/pkg/server/secure_serving.go index 356bd3a2fe8..c706afb5f51 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/secure_serving.go +++ b/staging/src/k8s.io/apiserver/pkg/server/secure_serving.go @@ -192,6 +192,67 @@ func (s *SecureServingInfo) Serve(handler http.Handler, shutdownTimeout time.Dur tlsErrorLogger := log.New(tlsErrorWriter, "", 0) secureServer.ErrorLog = tlsErrorLogger + klog.Infof("Serving securely on %s", secureServer.Addr) + stoppedCh, _, err := RunServer(secureServer, s.Listener, shutdownTimeout, stopCh) + return stoppedCh, err +} + +// ServeWithListenerStopped runs the secure http server. It fails only if certificates cannot be loaded or the initial listen call fails. +// The actual server loop (stoppable by closing stopCh) runs in a go routine, i.e. ServeWithListenerStopped does not block. +// It returns a stoppedCh that is closed when all non-hijacked active requests have been processed. +// It returns a listenerStoppedCh that is closed when the underlying http Server has stopped listening. +// TODO: do a follow up PR to remove this function, change 'Serve' to return listenerStoppedCh +// and update all components that call 'Serve' +func (s *SecureServingInfo) ServeWithListenerStopped(handler http.Handler, shutdownTimeout time.Duration, stopCh <-chan struct{}) (<-chan struct{}, <-chan struct{}, error) { + if s.Listener == nil { + return nil, nil, fmt.Errorf("listener must not be nil") + } + + tlsConfig, err := s.tlsConfig(stopCh) + if err != nil { + return nil, nil, err + } + + secureServer := &http.Server{ + Addr: s.Listener.Addr().String(), + Handler: handler, + MaxHeaderBytes: 1 << 20, + TLSConfig: tlsConfig, + } + + // At least 99% of serialized resources in surveyed clusters were smaller than 256kb. + // This should be big enough to accommodate most API POST requests in a single frame, + // and small enough to allow a per connection buffer of this size multiplied by `MaxConcurrentStreams`. + const resourceBody99Percentile = 256 * 1024 + + http2Options := &http2.Server{} + + // shrink the per-stream buffer and max framesize from the 1MB default while still accommodating most API POST requests in a single frame + http2Options.MaxUploadBufferPerStream = resourceBody99Percentile + http2Options.MaxReadFrameSize = resourceBody99Percentile + + // use the overridden concurrent streams setting or make the default of 250 explicit so we can size MaxUploadBufferPerConnection appropriately + if s.HTTP2MaxStreamsPerConnection > 0 { + http2Options.MaxConcurrentStreams = uint32(s.HTTP2MaxStreamsPerConnection) + } else { + http2Options.MaxConcurrentStreams = 250 + } + + // increase the connection buffer size from the 1MB default to handle the specified number of concurrent streams + http2Options.MaxUploadBufferPerConnection = http2Options.MaxUploadBufferPerStream * int32(http2Options.MaxConcurrentStreams) + + if !s.DisableHTTP2 { + // apply settings to the server + if err := http2.ConfigureServer(secureServer, http2Options); err != nil { + return nil, nil, fmt.Errorf("error configuring http2: %v", err) + } + } + + // use tlsHandshakeErrorWriter to handle messages of tls handshake error + tlsErrorWriter := &tlsHandshakeErrorWriter{os.Stderr} + tlsErrorLogger := log.New(tlsErrorWriter, "", 0) + secureServer.ErrorLog = tlsErrorLogger + klog.Infof("Serving securely on %s", secureServer.Addr) return RunServer(secureServer, s.Listener, shutdownTimeout, stopCh) } @@ -207,15 +268,15 @@ func RunServer( ln net.Listener, shutDownTimeout time.Duration, stopCh <-chan struct{}, -) (<-chan struct{}, error) { +) (<-chan struct{}, <-chan struct{}, error) { if ln == nil { - return nil, fmt.Errorf("listener must not be nil") + return nil, nil, fmt.Errorf("listener must not be nil") } // Shutdown server gracefully. - stoppedCh := make(chan struct{}) + serverShutdownCh, listenerStoppedCh := make(chan struct{}), make(chan struct{}) go func() { - defer close(stoppedCh) + defer close(serverShutdownCh) <-stopCh ctx, cancel := context.WithTimeout(context.Background(), shutDownTimeout) server.Shutdown(ctx) @@ -224,6 +285,7 @@ func RunServer( go func() { defer utilruntime.HandleCrash() + defer close(listenerStoppedCh) var listener net.Listener listener = tcpKeepAliveListener{ln} @@ -242,7 +304,7 @@ func RunServer( } }() - return stoppedCh, nil + return serverShutdownCh, listenerStoppedCh, nil } // tcpKeepAliveListener sets TCP keep-alive timeouts on accepted