diff --git a/staging/src/k8s.io/apiserver/pkg/server/config.go b/staging/src/k8s.io/apiserver/pkg/server/config.go index 39c47a1c4b0..60be44a7a21 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/config.go +++ b/staging/src/k8s.io/apiserver/pkg/server/config.go @@ -850,7 +850,7 @@ func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler { handler = genericapifilters.WithCacheControl(handler) handler = genericfilters.WithHSTS(handler, c.HSTSDirectives) if c.ShutdownSendRetryAfter { - handler = genericfilters.WithRetryAfter(handler, c.lifecycleSignals.AfterShutdownDelayDuration.Signaled()) + handler = genericfilters.WithRetryAfter(handler, c.lifecycleSignals.NotAcceptingNewRequest.Signaled()) } handler = genericfilters.WithHTTPLogging(handler) if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerTracing) { diff --git a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go index b98dd800a16..55391627042 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go +++ b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go @@ -411,17 +411,6 @@ func (s *GenericAPIServer) PrepareRun() preparedGenericAPIServer { } s.installReadyz() - // Register audit backend preShutdownHook. - if s.AuditBackend != nil { - err := s.AddPreShutdownHook("audit-backend", func() error { - s.AuditBackend.Shutdown() - return nil - }) - if err != nil { - klog.Errorf("Failed to add pre-shutdown hook for audit-backend %s", err) - } - } - return preparedGenericAPIServer{s} } @@ -439,21 +428,31 @@ func (s *GenericAPIServer) PrepareRun() preparedGenericAPIServer { // | | // AfterShutdownDelayDuration (delayedStopCh) PreShutdownHooksStopped (preShutdownHooksHasStoppedCh) // | | -// |---------------------------------- | -// | | | -// | (HandlerChainWaitGroup::Wait) | -// | | | -// | InFlightRequestsDrained (drainedCh) | -// | | | -// [without ShutdownSendRetryAfter] [with ShutdownSendRetryAfter] | -// | | | -// --------------------------------------------------------- -// | -// stopHttpServerCh -// | -// listenerStoppedCh -// | -// HTTPServerStoppedListening (httpServerStoppedListeningCh) +// |-------------------------------------------------------| +// | +// | +// NotAcceptingNewRequest (notAcceptingNewRequestCh) +// | +// | +// |---------------------------------------------------------| +// | | | | +// [without [with | | +// ShutdownSendRetryAfter] ShutdownSendRetryAfter] | | +// | | | | +// | ---------------| | +// | | | +// | (HandlerChainWaitGroup::Wait) | +// | | | +// | InFlightRequestsDrained (drainedCh) | +// | | | +// ----------------------------------------|-----------------| +// | | +// stopHttpServerCh (AuditBackend::Shutdown()) +// | +// listenerStoppedCh +// | +// HTTPServerStoppedListening (httpServerStoppedListeningCh) +// func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error { delayedStopCh := s.lifecycleSignals.AfterShutdownDelayDuration shutdownInitiatedCh := s.lifecycleSignals.ShutdownInitiated @@ -494,8 +493,6 @@ func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error { }() // close socket after delayed stopCh - drainedCh := s.lifecycleSignals.InFlightRequestsDrained - delayedStopOrDrainedCh := delayedStopCh.Signaled() shutdownTimeout := s.ShutdownTimeout if s.ShutdownSendRetryAfter { // when this mode is enabled, we do the following: @@ -504,19 +501,22 @@ func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error { // - once drained, http Server Shutdown is invoked with a timeout of 2s, // net/http waits for 1s for the peer to respond to a GO_AWAY frame, so // we should wait for a minimum of 2s - delayedStopOrDrainedCh = drainedCh.Signaled() shutdownTimeout = 2 * time.Second klog.V(1).InfoS("[graceful-termination] using HTTP Server shutdown timeout", "ShutdownTimeout", shutdownTimeout) } - // pre-shutdown hooks need to finish before we stop the http server - preShutdownHooksHasStoppedCh := s.lifecycleSignals.PreShutdownHooksStopped + notAcceptingNewRequestCh := s.lifecycleSignals.NotAcceptingNewRequest + drainedCh := s.lifecycleSignals.InFlightRequestsDrained stopHttpServerCh := make(chan struct{}) go func() { defer close(stopHttpServerCh) - <-delayedStopOrDrainedCh - <-preShutdownHooksHasStoppedCh.Signaled() + timeToStopHttpServerCh := notAcceptingNewRequestCh.Signaled() + if s.ShutdownSendRetryAfter { + timeToStopHttpServerCh = drainedCh.Signaled() + } + + <-timeToStopHttpServerCh }() // Start the audit backend before any request comes in. This means we must call Backend.Run @@ -540,13 +540,29 @@ func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error { klog.V(1).InfoS("[graceful-termination] shutdown event", "name", httpServerStoppedListeningCh.Name()) }() + // we don't accept new request as soon as both ShutdownDelayDuration has + // elapsed and preshutdown hooks have completed. + preShutdownHooksHasStoppedCh := s.lifecycleSignals.PreShutdownHooksStopped go func() { - defer drainedCh.Signal() - defer klog.V(1).InfoS("[graceful-termination] shutdown event", "name", drainedCh.Name()) + defer klog.V(1).InfoS("[graceful-termination] shutdown event", "name", notAcceptingNewRequestCh.Name()) + defer notAcceptingNewRequestCh.Signal() // wait for the delayed stopCh before closing the handler chain <-delayedStopCh.Signaled() + // Additionally wait for preshutdown hooks to also be finished, as some of them need + // to send API calls to clean up after themselves (e.g. lease reconcilers removing + // itself from the active servers). + <-preShutdownHooksHasStoppedCh.Signaled() + }() + + go func() { + defer klog.V(1).InfoS("[graceful-termination] shutdown event", "name", drainedCh.Name()) + defer drainedCh.Signal() + + // wait for the delayed stopCh before closing the handler chain (it rejects everything after Wait has been called). + <-notAcceptingNewRequestCh.Signaled() + // Wait for all requests to finish, which are bounded by the RequestTimeout variable. // once HandlerChainWaitGroup.Wait is invoked, the apiserver is // expected to reject any incoming request with a {503, Retry-After} @@ -578,11 +594,17 @@ func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error { if err != nil { return err } - klog.V(1).Info("[graceful-termination] RunPreShutdownHooks has completed") // Wait for all requests in flight to drain, bounded by the RequestTimeout variable. <-drainedCh.Signaled() + + if s.AuditBackend != nil { + s.AuditBackend.Shutdown() + klog.V(1).InfoS("[graceful-termination] audit backend shutdown completed") + } + // wait for stoppedCh that is closed when the graceful termination (server.Shutdown) is finished. + <-listenerStoppedCh <-stoppedCh klog.V(1).Info("[graceful-termination] apiserver is exiting") @@ -612,10 +634,6 @@ func (s preparedGenericAPIServer) NonBlockingRun(stopCh <-chan struct{}, shutdow go func() { <-stopCh close(internalStopCh) - if stoppedCh != nil { - <-stoppedCh - } - s.HandlerChainWaitGroup.Wait() }() s.RunPostStartHooks(stopCh) diff --git a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver_graceful_termination_test.go b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver_graceful_termination_test.go index 52495a13e8e..6858796492c 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver_graceful_termination_test.go +++ b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver_graceful_termination_test.go @@ -85,6 +85,7 @@ func wrapLifecycleSignalsWithRecorer(t *testing.T, signals *lifecycleSignals, be // an asynchronous process. signals.AfterShutdownDelayDuration = wrapLifecycleSignal(t, signals.AfterShutdownDelayDuration, before, nil) signals.PreShutdownHooksStopped = wrapLifecycleSignal(t, signals.PreShutdownHooksStopped, before, nil) + signals.NotAcceptingNewRequest = wrapLifecycleSignal(t, signals.NotAcceptingNewRequest, before, nil) signals.HTTPServerStoppedListening = wrapLifecycleSignal(t, signals.HTTPServerStoppedListening, before, nil) signals.InFlightRequestsDrained = wrapLifecycleSignal(t, signals.InFlightRequestsDrained, before, nil) signals.ShutdownInitiated = wrapLifecycleSignal(t, signals.ShutdownInitiated, before, nil) @@ -139,6 +140,7 @@ func newSignalInterceptingTestStep() *signalInterceptingTestStep { // | | // |--------------------------------------------| // | | +// | (NotAcceptingNewRequest) // | | // | |-------------------------------------------------| // | | | @@ -154,13 +156,17 @@ func newSignalInterceptingTestStep() *signalInterceptingTestStep { // | | | // | wait up to 60s | // | | (InFlightRequestsDrained) -// | | | -// | | | -// | stoppedCh is closed s.AuditBackend.Shutdown() +// | | +// | | +// | stoppedCh is closed // | // | // <-drainedCh.Signaled() // | +// s.AuditBackend.Shutdown() +// | +// <-listenerStoppedCh +// | // <-stoppedCh // | // return nil @@ -248,25 +254,23 @@ func TestGracefulTerminationWithKeepListeningDuringGracefulTerminationDisabled(t // preshutdown hook has not completed yet, new incomng request should succeed resultGot = doer.Do(newClient(true), shouldUseNewConnection(t), "/echo?message=request-on-a-new-tcp-connection-should-succeed", time.Second) - // TODO: we expect the request to succeed with http.StatusOK, - // https://github.com/kubernetes/kubernetes/pull/110026 will fix it. - if err := assertResponseStatusCode(resultGot, http.StatusServiceUnavailable); err != nil { + if err := assertResponseStatusCode(resultGot, http.StatusOK); err != nil { t.Errorf("%s", err.Error()) } - // let the preshutdown hook issue an API call now, and then let's wait - // for it to return the result, it should succeed. + // let the preshutdown hook issue an API call now, and then + // let's wait for it to return the result. close(preShutdownHook.blockedCh) preShutdownHookResult := <-preShutdownHook.resultCh waitForeverUntilSignaled(t, signals.PreShutdownHooksStopped) - // TODO: the API call from the preshutdown hook is expected to pass wth - // http.StatusOK, https://github.com/kubernetes/kubernetes/pull/110026 - // will fix it. - if err := assertResponseStatusCode(preShutdownHookResult, http.StatusServiceUnavailable); err != nil { + if err := assertResponseStatusCode(preShutdownHookResult, http.StatusOK); err != nil { t.Errorf("%s", err.Error()) } waitForeverUntilSignaled(t, signals.PreShutdownHooksStopped) + // both AfterShutdownDelayDuration and PreShutdownHooksCompleted + // have been signaled, we should not be accepting new request + waitForeverUntilSignaled(t, signals.NotAcceptingNewRequest) waitForeverUntilSignaled(t, signals.HTTPServerStoppedListening) resultGot = doer.Do(newClient(true), shouldUseNewConnection(t), "/echo?message=request-on-a-new-tcp-connection-should-fail-with-503", time.Second) @@ -313,10 +317,15 @@ func TestGracefulTerminationWithKeepListeningDuringGracefulTerminationDisabled(t t.Log("Waiting for the apiserver Run method to return") waitForeverUntil(t, runCompletedCh, "the apiserver Run method did not return") + if !fakeAudit.shutdownCompleted() { + t.Errorf("Expected AuditBackend.Shutdown to be completed") + } + if err := recorder.verify([]string{ "ShutdownInitiated", "AfterShutdownDelayDuration", "PreShutdownHooksStopped", + "NotAcceptingNewRequest", "HTTPServerStoppedListening", "InFlightRequestsDrained", }); err != nil { @@ -328,7 +337,7 @@ func TestGracefulTerminationWithKeepListeningDuringGracefulTerminationDisabled(t // described in the following diagram // - every vertical line is an independent timeline // - the leftmost vertical line represents the go routine that -// is executing GenericAPIServer.Run methos +// is executing GenericAPIServer.Run method // - (signal) indicates that the given lifecycle signal has been fired // // stopCh @@ -344,26 +353,28 @@ func TestGracefulTerminationWithKeepListeningDuringGracefulTerminationDisabled(t // | | // |--------------------------------------------| // | | +// | (NotAcceptingNewRequest) // | | // | HandlerChainWaitGroup.Wait() // | | // | (InFlightRequestsDrained) // | | // | | -// | |-------------------------------------| -// | | | -// | | close(stopHttpServerCh) -// | | | -// | s.AuditBackend.Shutdown() server.Shutdown(timeout=2s) +// |------------------------------------------------------------| // | | +// <-drainedCh.Signaled() close(stopHttpServerCh) +// | | +// s.AuditBackend.Shutdown() server.Shutdown(timeout=2s) // | | // | stop listener (net/http) // | | // | |-------------------------------------| -// <-drainedCh.Signaled() | | +// | | | // | wait up to 2s (HTTPServerStoppedListening) -// <-stoppedCh | +// <-listenerStoppedCh | // | stoppedCh is closed +// <-stoppedCh +// | // return nil // func TestGracefulTerminationWithKeepListeningDuringGracefulTerminationEnabled(t *testing.T) { @@ -449,9 +460,7 @@ func TestGracefulTerminationWithKeepListeningDuringGracefulTerminationEnabled(t // preshutdown hook has not completed yet, new incomng request should succeed resultGot = doer.Do(newClient(true), shouldUseNewConnection(t), "/echo?message=request-on-a-new-tcp-connection-should-succeed", time.Second) - // TODO: we expect the request to succeed with http.StatusOK, - // https://github.com/kubernetes/kubernetes/pull/110026 will fix it. - if err := assertResponseStatusCode(resultGot, http.StatusTooManyRequests); err != nil { + if err := assertResponseStatusCode(resultGot, http.StatusOK); err != nil { t.Errorf("%s", err.Error()) } @@ -460,13 +469,12 @@ func TestGracefulTerminationWithKeepListeningDuringGracefulTerminationEnabled(t close(preShutdownHook.blockedCh) preShutdownHookResult := <-preShutdownHook.resultCh waitForeverUntilSignaled(t, signals.PreShutdownHooksStopped) - // TODO: the API call from the preshutdown hook is expected to pass wth - // http.StatusOK, https://github.com/kubernetes/kubernetes/pull/110026 - // will fix it. - if err := assertResponseStatusCode(preShutdownHookResult, http.StatusTooManyRequests); err != nil { + if err := assertResponseStatusCode(preShutdownHookResult, http.StatusOK); err != nil { t.Errorf("%s", err.Error()) } + waitForeverUntilSignaled(t, signals.NotAcceptingNewRequest) + // both AfterShutdownDelayDuration and PreShutdownHooksCompleted // have been signaled, any incoming request should receive 429 resultGot = doer.Do(newClient(true), shouldUseNewConnection(t), "/echo?message=request-on-a-new-tcp-connection-should-fail-with-429", time.Second) @@ -494,10 +502,15 @@ func TestGracefulTerminationWithKeepListeningDuringGracefulTerminationEnabled(t t.Log("Waiting for the apiserver Run method to return") waitForeverUntil(t, runCompletedCh, "the apiserver Run method did not return") + if !fakeAudit.shutdownCompleted() { + t.Errorf("Expected AuditBackend.Shutdown to be completed") + } + if err := recorder.verify([]string{ "ShutdownInitiated", "AfterShutdownDelayDuration", "PreShutdownHooksStopped", + "NotAcceptingNewRequest", "InFlightRequestsDrained", "HTTPServerStoppedListening", }); err != nil { @@ -584,9 +597,9 @@ func TestPreShutdownHooks(t *testing.T) { client := newClient(true) for i := 0; i < 5; i++ { r := doer.Do(client, func(httptrace.GotConnInfo) {}, fmt.Sprintf("/echo?message=attempt-%d", i), 1*time.Second) - // TODO: this is broken, we should check the response for a status code of 200 - // https://github.com/kubernetes/kubernetes/pull/110026 fixes this issue. - if r.err != nil { + err = r.err + if err == nil && r.response.StatusCode != http.StatusOK { + err = fmt.Errorf("did not get status code 200 - %#v", r.response) break } time.Sleep(time.Second) @@ -715,6 +728,7 @@ type fakeAudit struct { shutdownCh chan struct{} lock sync.Mutex audits map[string]struct{} + completed bool } func (a *fakeAudit) Run(stopCh <-chan struct{}) error { @@ -727,14 +741,24 @@ func (a *fakeAudit) Run(stopCh <-chan struct{}) error { } func (a *fakeAudit) Shutdown() { - // TODO: uncomment it in https://github.com/kubernetes/kubernetes/pull/110026 - // <-a.shutdownCh + <-a.shutdownCh + + a.lock.Lock() + defer a.lock.Unlock() + a.completed = true } func (a *fakeAudit) String() string { return "fake-audit" } +func (a *fakeAudit) shutdownCompleted() bool { + a.lock.Lock() + defer a.lock.Unlock() + + return a.completed +} + func (a *fakeAudit) ProcessEvents(events ...*auditinternal.Event) bool { a.lock.Lock() defer a.lock.Unlock() diff --git a/staging/src/k8s.io/apiserver/pkg/server/lifecycle_signals.go b/staging/src/k8s.io/apiserver/pkg/server/lifecycle_signals.go index fd1c94db2db..d39efa5be21 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/lifecycle_signals.go +++ b/staging/src/k8s.io/apiserver/pkg/server/lifecycle_signals.go @@ -124,6 +124,11 @@ type lifecycleSignals struct { // preshutdown hook(s) have finished running. PreShutdownHooksStopped lifecycleSignal + // NotAcceptingNewRequest event is signaled when the server is no + // longer accepting any new request, from this point on any new + // request will receive an error. + NotAcceptingNewRequest lifecycleSignal + // InFlightRequestsDrained event is signaled when the existing requests // in flight have completed. This is used as signal to shut down the audit backends InFlightRequestsDrained lifecycleSignal @@ -148,6 +153,7 @@ func newLifecycleSignals() lifecycleSignals { ShutdownInitiated: newNamedChannelWrapper("ShutdownInitiated"), AfterShutdownDelayDuration: newNamedChannelWrapper("AfterShutdownDelayDuration"), PreShutdownHooksStopped: newNamedChannelWrapper("PreShutdownHooksStopped"), + NotAcceptingNewRequest: newNamedChannelWrapper("NotAcceptingNewRequest"), InFlightRequestsDrained: newNamedChannelWrapper("InFlightRequestsDrained"), HTTPServerStoppedListening: newNamedChannelWrapper("HTTPServerStoppedListening"), HasBeenReady: newNamedChannelWrapper("HasBeenReady"),