mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-27 05:27:21 +00:00
Merge pull request #110094 from tkashem/refactor-graceful
apiserver: refactor - move AuditBackend.Run out of NonBlockingRun
This commit is contained in:
commit
f727b5af34
@ -191,7 +191,7 @@ type GenericAPIServer struct {
|
|||||||
livezGracePeriod time.Duration
|
livezGracePeriod time.Duration
|
||||||
livezClock clock.Clock
|
livezClock clock.Clock
|
||||||
|
|
||||||
// auditing. The backend is started after the server starts listening.
|
// auditing. The backend is started before the server starts listening.
|
||||||
AuditBackend audit.Backend
|
AuditBackend audit.Backend
|
||||||
|
|
||||||
// Authorizer determines whether a user is allowed to make a certain request. The Handler does a preliminary
|
// Authorizer determines whether a user is allowed to make a certain request. The Handler does a preliminary
|
||||||
@ -518,10 +518,20 @@ func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error {
|
|||||||
<-preShutdownHooksHasStoppedCh
|
<-preShutdownHooksHasStoppedCh
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
// Start the audit backend before any request comes in. This means we must call Backend.Run
|
||||||
|
// before http server start serving. Otherwise the Backend.ProcessEvents call might block.
|
||||||
|
// AuditBackend.Run will stop as soon as all in-flight requests are drained.
|
||||||
|
if s.AuditBackend != nil {
|
||||||
|
if err := s.AuditBackend.Run(drainedCh.Signaled()); err != nil {
|
||||||
|
return fmt.Errorf("failed to run the audit backend: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
stoppedCh, listenerStoppedCh, err := s.NonBlockingRun(stopHttpServerCh, shutdownTimeout)
|
stoppedCh, listenerStoppedCh, err := s.NonBlockingRun(stopHttpServerCh, shutdownTimeout)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
httpServerStoppedListeningCh := s.lifecycleSignals.HTTPServerStoppedListening
|
httpServerStoppedListeningCh := s.lifecycleSignals.HTTPServerStoppedListening
|
||||||
go func() {
|
go func() {
|
||||||
<-listenerStoppedCh
|
<-listenerStoppedCh
|
||||||
@ -567,18 +577,6 @@ func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error {
|
|||||||
// returned if the secure port cannot be listened on.
|
// returned if the secure port cannot be listened on.
|
||||||
// The returned channel is closed when the (asynchronous) termination is finished.
|
// The returned channel is closed when the (asynchronous) termination is finished.
|
||||||
func (s preparedGenericAPIServer) NonBlockingRun(stopCh <-chan struct{}, shutdownTimeout time.Duration) (<-chan struct{}, <-chan struct{}, error) {
|
func (s preparedGenericAPIServer) NonBlockingRun(stopCh <-chan struct{}, shutdownTimeout time.Duration) (<-chan struct{}, <-chan struct{}, error) {
|
||||||
// Use an stop channel to allow graceful shutdown without dropping audit events
|
|
||||||
// after http server shutdown.
|
|
||||||
auditStopCh := make(chan struct{})
|
|
||||||
|
|
||||||
// Start the audit backend before any request comes in. This means we must call Backend.Run
|
|
||||||
// before http server start serving. Otherwise the Backend.ProcessEvents call might block.
|
|
||||||
if s.AuditBackend != nil {
|
|
||||||
if err := s.AuditBackend.Run(auditStopCh); err != nil {
|
|
||||||
return nil, nil, fmt.Errorf("failed to run the audit backend: %v", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Use an internal stop channel to allow cleanup of the listeners on error.
|
// Use an internal stop channel to allow cleanup of the listeners on error.
|
||||||
internalStopCh := make(chan struct{})
|
internalStopCh := make(chan struct{})
|
||||||
var stoppedCh <-chan struct{}
|
var stoppedCh <-chan struct{}
|
||||||
@ -588,7 +586,6 @@ func (s preparedGenericAPIServer) NonBlockingRun(stopCh <-chan struct{}, shutdow
|
|||||||
stoppedCh, listenerStoppedCh, err = s.SecureServingInfo.Serve(s.Handler, shutdownTimeout, internalStopCh)
|
stoppedCh, listenerStoppedCh, err = s.SecureServingInfo.Serve(s.Handler, shutdownTimeout, internalStopCh)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
close(internalStopCh)
|
close(internalStopCh)
|
||||||
close(auditStopCh)
|
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -603,7 +600,6 @@ func (s preparedGenericAPIServer) NonBlockingRun(stopCh <-chan struct{}, shutdow
|
|||||||
<-stoppedCh
|
<-stoppedCh
|
||||||
}
|
}
|
||||||
s.HandlerChainWaitGroup.Wait()
|
s.HandlerChainWaitGroup.Wait()
|
||||||
close(auditStopCh)
|
|
||||||
}()
|
}()
|
||||||
|
|
||||||
s.RunPostStartHooks(stopCh)
|
s.RunPostStartHooks(stopCh)
|
||||||
|
Loading…
Reference in New Issue
Block a user