mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-28 22:17:14 +00:00
apiserver: NonBlockingRun should return a listener stopped channel
NonBlockingRun should also return a channel that gets closed when the underlying http Server has stopped listening (during the graceful shutdown period)
This commit is contained in:
parent
d85619030e
commit
a84c1b7100
@ -52,7 +52,7 @@ func (s *DeprecatedInsecureServingInfo) Serve(handler http.Handler, shutdownTime
|
|||||||
} else {
|
} else {
|
||||||
klog.Infof("Serving insecurely on %s", s.Listener.Addr())
|
klog.Infof("Serving insecurely on %s", s.Listener.Addr())
|
||||||
}
|
}
|
||||||
_, err := RunServer(insecureServer, s.Listener, shutdownTimeout, stopCh)
|
_, _, err := RunServer(insecureServer, s.Listener, shutdownTimeout, stopCh)
|
||||||
// NOTE: we do not handle stoppedCh returned by RunServer for graceful termination here
|
// NOTE: we do not handle stoppedCh returned by RunServer for graceful termination here
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -351,10 +351,15 @@ func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error {
|
|||||||
}()
|
}()
|
||||||
|
|
||||||
// close socket after delayed stopCh
|
// close socket after delayed stopCh
|
||||||
stoppedCh, err := s.NonBlockingRun(delayedStopCh.Signaled())
|
stoppedCh, listenerStoppedCh, err := s.NonBlockingRun(delayedStopCh.Signaled())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
httpServerStoppedListeningCh := s.terminationSignals.HTTPServerStoppedListening
|
||||||
|
go func() {
|
||||||
|
<-listenerStoppedCh
|
||||||
|
httpServerStoppedListeningCh.Signal()
|
||||||
|
}()
|
||||||
|
|
||||||
drainedCh := s.terminationSignals.InFlightRequestsDrained
|
drainedCh := s.terminationSignals.InFlightRequestsDrained
|
||||||
go func() {
|
go func() {
|
||||||
@ -389,7 +394,7 @@ func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error {
|
|||||||
// NonBlockingRun spawns the secure http server. An error is
|
// NonBlockingRun spawns the secure http server. An error is
|
||||||
// returned if the secure port cannot be listened on.
|
// returned if the secure port cannot be listened on.
|
||||||
// The returned channel is closed when the (asynchronous) termination is finished.
|
// The returned channel is closed when the (asynchronous) termination is finished.
|
||||||
func (s preparedGenericAPIServer) NonBlockingRun(stopCh <-chan struct{}) (<-chan struct{}, error) {
|
func (s preparedGenericAPIServer) NonBlockingRun(stopCh <-chan struct{}) (<-chan struct{}, <-chan struct{}, error) {
|
||||||
// Use an stop channel to allow graceful shutdown without dropping audit events
|
// Use an stop channel to allow graceful shutdown without dropping audit events
|
||||||
// after http server shutdown.
|
// after http server shutdown.
|
||||||
auditStopCh := make(chan struct{})
|
auditStopCh := make(chan struct{})
|
||||||
@ -398,20 +403,22 @@ func (s preparedGenericAPIServer) NonBlockingRun(stopCh <-chan struct{}) (<-chan
|
|||||||
// before http server start serving. Otherwise the Backend.ProcessEvents call might block.
|
// before http server start serving. Otherwise the Backend.ProcessEvents call might block.
|
||||||
if s.AuditBackend != nil {
|
if s.AuditBackend != nil {
|
||||||
if err := s.AuditBackend.Run(auditStopCh); err != nil {
|
if err := s.AuditBackend.Run(auditStopCh); err != nil {
|
||||||
return nil, fmt.Errorf("failed to run the audit backend: %v", err)
|
return nil, nil, fmt.Errorf("failed to run the audit backend: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Use an internal stop channel to allow cleanup of the listeners on error.
|
// Use an internal stop channel to allow cleanup of the listeners on error.
|
||||||
internalStopCh := make(chan struct{})
|
internalStopCh := make(chan struct{})
|
||||||
var stoppedCh <-chan struct{}
|
var stoppedCh <-chan struct{}
|
||||||
|
var listenerStoppedCh <-chan struct{}
|
||||||
if s.SecureServingInfo != nil && s.Handler != nil {
|
if s.SecureServingInfo != nil && s.Handler != nil {
|
||||||
var err error
|
var err error
|
||||||
stoppedCh, err = s.SecureServingInfo.Serve(s.Handler, s.ShutdownTimeout, internalStopCh)
|
klog.V(1).Infof("[graceful-termination] ShutdownTimeout=%s", s.ShutdownTimeout)
|
||||||
|
stoppedCh, listenerStoppedCh, err = s.SecureServingInfo.ServeWithListenerStopped(s.Handler, s.ShutdownTimeout, internalStopCh)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
close(internalStopCh)
|
close(internalStopCh)
|
||||||
close(auditStopCh)
|
close(auditStopCh)
|
||||||
return nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -434,7 +441,7 @@ func (s preparedGenericAPIServer) NonBlockingRun(stopCh <-chan struct{}) (<-chan
|
|||||||
klog.Errorf("Unable to send systemd daemon successful start message: %v\n", err)
|
klog.Errorf("Unable to send systemd daemon successful start message: %v\n", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return stoppedCh, nil
|
return stoppedCh, listenerStoppedCh, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// installAPIResources is a private method for installing the REST storage backing each api groupversionresource
|
// installAPIResources is a private method for installing the REST storage backing each api groupversionresource
|
||||||
|
@ -590,7 +590,7 @@ func TestGracefulShutdown(t *testing.T) {
|
|||||||
|
|
||||||
// get port
|
// get port
|
||||||
serverPort := ln.Addr().(*net.TCPAddr).Port
|
serverPort := ln.Addr().(*net.TCPAddr).Port
|
||||||
stoppedCh, err := RunServer(insecureServer, ln, 10*time.Second, stopCh)
|
stoppedCh, _, err := RunServer(insecureServer, ln, 10*time.Second, stopCh)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("RunServer err: %v", err)
|
t.Fatalf("RunServer err: %v", err)
|
||||||
}
|
}
|
||||||
|
@ -47,6 +47,10 @@ T0+70s: AfterShutdownDelayDuration: shutdown delay duration has passed
|
|||||||
up to '60s' (dictated by ShutdownTimeout)
|
up to '60s' (dictated by ShutdownTimeout)
|
||||||
- active long running requests will receive a GOAWAY.
|
- active long running requests will receive a GOAWAY.
|
||||||
|
|
||||||
|
T0+70s: HTTPServerStoppedListening:
|
||||||
|
- this event is signaled when the HTTP Server has stopped listening
|
||||||
|
which is immediately after server.Shutdown has been invoked
|
||||||
|
|
||||||
T0 + 70s + up-to 60s: InFlightRequestsDrained: existing in flight requests have been drained
|
T0 + 70s + up-to 60s: InFlightRequestsDrained: existing in flight requests have been drained
|
||||||
- long running requests are outside of this scope
|
- long running requests are outside of this scope
|
||||||
- up-to 60s: the default value of 'ShutdownTimeout' is 60s, this means that
|
- up-to 60s: the default value of 'ShutdownTimeout' is 60s, this means that
|
||||||
@ -88,6 +92,10 @@ type terminationSignals struct {
|
|||||||
// InFlightRequestsDrained event is signaled when the existing requests
|
// InFlightRequestsDrained event is signaled when the existing requests
|
||||||
// in flight have completed. This is used as signal to shut down the audit backends
|
// in flight have completed. This is used as signal to shut down the audit backends
|
||||||
InFlightRequestsDrained terminationSignal
|
InFlightRequestsDrained terminationSignal
|
||||||
|
|
||||||
|
// HTTPServerStoppedListening termination event is signaled when the
|
||||||
|
// HTTP Server has stopped listening to the underlying socket.
|
||||||
|
HTTPServerStoppedListening terminationSignal
|
||||||
}
|
}
|
||||||
|
|
||||||
// newTerminationSignals returns an instance of terminationSignals interface to be used
|
// newTerminationSignals returns an instance of terminationSignals interface to be used
|
||||||
@ -97,6 +105,7 @@ func newTerminationSignals() terminationSignals {
|
|||||||
ShutdownInitiated: newNamedChannelWrapper("ShutdownInitiated"),
|
ShutdownInitiated: newNamedChannelWrapper("ShutdownInitiated"),
|
||||||
AfterShutdownDelayDuration: newNamedChannelWrapper("AfterShutdownDelayDuration"),
|
AfterShutdownDelayDuration: newNamedChannelWrapper("AfterShutdownDelayDuration"),
|
||||||
InFlightRequestsDrained: newNamedChannelWrapper("InFlightRequestsDrained"),
|
InFlightRequestsDrained: newNamedChannelWrapper("InFlightRequestsDrained"),
|
||||||
|
HTTPServerStoppedListening: newNamedChannelWrapper("HTTPServerStoppedListening"),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -192,6 +192,67 @@ func (s *SecureServingInfo) Serve(handler http.Handler, shutdownTimeout time.Dur
|
|||||||
tlsErrorLogger := log.New(tlsErrorWriter, "", 0)
|
tlsErrorLogger := log.New(tlsErrorWriter, "", 0)
|
||||||
secureServer.ErrorLog = tlsErrorLogger
|
secureServer.ErrorLog = tlsErrorLogger
|
||||||
|
|
||||||
|
klog.Infof("Serving securely on %s", secureServer.Addr)
|
||||||
|
stoppedCh, _, err := RunServer(secureServer, s.Listener, shutdownTimeout, stopCh)
|
||||||
|
return stoppedCh, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// ServeWithListenerStopped runs the secure http server. It fails only if certificates cannot be loaded or the initial listen call fails.
|
||||||
|
// The actual server loop (stoppable by closing stopCh) runs in a go routine, i.e. ServeWithListenerStopped does not block.
|
||||||
|
// It returns a stoppedCh that is closed when all non-hijacked active requests have been processed.
|
||||||
|
// It returns a listenerStoppedCh that is closed when the underlying http Server has stopped listening.
|
||||||
|
// TODO: do a follow up PR to remove this function, change 'Serve' to return listenerStoppedCh
|
||||||
|
// and update all components that call 'Serve'
|
||||||
|
func (s *SecureServingInfo) ServeWithListenerStopped(handler http.Handler, shutdownTimeout time.Duration, stopCh <-chan struct{}) (<-chan struct{}, <-chan struct{}, error) {
|
||||||
|
if s.Listener == nil {
|
||||||
|
return nil, nil, fmt.Errorf("listener must not be nil")
|
||||||
|
}
|
||||||
|
|
||||||
|
tlsConfig, err := s.tlsConfig(stopCh)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
secureServer := &http.Server{
|
||||||
|
Addr: s.Listener.Addr().String(),
|
||||||
|
Handler: handler,
|
||||||
|
MaxHeaderBytes: 1 << 20,
|
||||||
|
TLSConfig: tlsConfig,
|
||||||
|
}
|
||||||
|
|
||||||
|
// At least 99% of serialized resources in surveyed clusters were smaller than 256kb.
|
||||||
|
// This should be big enough to accommodate most API POST requests in a single frame,
|
||||||
|
// and small enough to allow a per connection buffer of this size multiplied by `MaxConcurrentStreams`.
|
||||||
|
const resourceBody99Percentile = 256 * 1024
|
||||||
|
|
||||||
|
http2Options := &http2.Server{}
|
||||||
|
|
||||||
|
// shrink the per-stream buffer and max framesize from the 1MB default while still accommodating most API POST requests in a single frame
|
||||||
|
http2Options.MaxUploadBufferPerStream = resourceBody99Percentile
|
||||||
|
http2Options.MaxReadFrameSize = resourceBody99Percentile
|
||||||
|
|
||||||
|
// use the overridden concurrent streams setting or make the default of 250 explicit so we can size MaxUploadBufferPerConnection appropriately
|
||||||
|
if s.HTTP2MaxStreamsPerConnection > 0 {
|
||||||
|
http2Options.MaxConcurrentStreams = uint32(s.HTTP2MaxStreamsPerConnection)
|
||||||
|
} else {
|
||||||
|
http2Options.MaxConcurrentStreams = 250
|
||||||
|
}
|
||||||
|
|
||||||
|
// increase the connection buffer size from the 1MB default to handle the specified number of concurrent streams
|
||||||
|
http2Options.MaxUploadBufferPerConnection = http2Options.MaxUploadBufferPerStream * int32(http2Options.MaxConcurrentStreams)
|
||||||
|
|
||||||
|
if !s.DisableHTTP2 {
|
||||||
|
// apply settings to the server
|
||||||
|
if err := http2.ConfigureServer(secureServer, http2Options); err != nil {
|
||||||
|
return nil, nil, fmt.Errorf("error configuring http2: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// use tlsHandshakeErrorWriter to handle messages of tls handshake error
|
||||||
|
tlsErrorWriter := &tlsHandshakeErrorWriter{os.Stderr}
|
||||||
|
tlsErrorLogger := log.New(tlsErrorWriter, "", 0)
|
||||||
|
secureServer.ErrorLog = tlsErrorLogger
|
||||||
|
|
||||||
klog.Infof("Serving securely on %s", secureServer.Addr)
|
klog.Infof("Serving securely on %s", secureServer.Addr)
|
||||||
return RunServer(secureServer, s.Listener, shutdownTimeout, stopCh)
|
return RunServer(secureServer, s.Listener, shutdownTimeout, stopCh)
|
||||||
}
|
}
|
||||||
@ -207,15 +268,15 @@ func RunServer(
|
|||||||
ln net.Listener,
|
ln net.Listener,
|
||||||
shutDownTimeout time.Duration,
|
shutDownTimeout time.Duration,
|
||||||
stopCh <-chan struct{},
|
stopCh <-chan struct{},
|
||||||
) (<-chan struct{}, error) {
|
) (<-chan struct{}, <-chan struct{}, error) {
|
||||||
if ln == nil {
|
if ln == nil {
|
||||||
return nil, fmt.Errorf("listener must not be nil")
|
return nil, nil, fmt.Errorf("listener must not be nil")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Shutdown server gracefully.
|
// Shutdown server gracefully.
|
||||||
stoppedCh := make(chan struct{})
|
serverShutdownCh, listenerStoppedCh := make(chan struct{}), make(chan struct{})
|
||||||
go func() {
|
go func() {
|
||||||
defer close(stoppedCh)
|
defer close(serverShutdownCh)
|
||||||
<-stopCh
|
<-stopCh
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), shutDownTimeout)
|
ctx, cancel := context.WithTimeout(context.Background(), shutDownTimeout)
|
||||||
server.Shutdown(ctx)
|
server.Shutdown(ctx)
|
||||||
@ -224,6 +285,7 @@ func RunServer(
|
|||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
defer utilruntime.HandleCrash()
|
defer utilruntime.HandleCrash()
|
||||||
|
defer close(listenerStoppedCh)
|
||||||
|
|
||||||
var listener net.Listener
|
var listener net.Listener
|
||||||
listener = tcpKeepAliveListener{ln}
|
listener = tcpKeepAliveListener{ln}
|
||||||
@ -242,7 +304,7 @@ func RunServer(
|
|||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
return stoppedCh, nil
|
return serverShutdownCh, listenerStoppedCh, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// tcpKeepAliveListener sets TCP keep-alive timeouts on accepted
|
// tcpKeepAliveListener sets TCP keep-alive timeouts on accepted
|
||||||
|
Loading…
Reference in New Issue
Block a user