Merge pull request #110026 from tkashem/graceful-test

apiserver: fix preshutdown hook behavior with graceful termination
This commit is contained in:
Kubernetes Prow Robot 2022-05-25 09:38:57 -07:00 committed by GitHub
commit e0dbea2443
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 121 additions and 73 deletions

View File

@ -850,7 +850,7 @@ func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler {
handler = genericapifilters.WithCacheControl(handler)
handler = genericfilters.WithHSTS(handler, c.HSTSDirectives)
if c.ShutdownSendRetryAfter {
handler = genericfilters.WithRetryAfter(handler, c.lifecycleSignals.AfterShutdownDelayDuration.Signaled())
handler = genericfilters.WithRetryAfter(handler, c.lifecycleSignals.NotAcceptingNewRequest.Signaled())
}
handler = genericfilters.WithHTTPLogging(handler)
if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerTracing) {

View File

@ -411,17 +411,6 @@ func (s *GenericAPIServer) PrepareRun() preparedGenericAPIServer {
}
s.installReadyz()
// Register audit backend preShutdownHook.
if s.AuditBackend != nil {
err := s.AddPreShutdownHook("audit-backend", func() error {
s.AuditBackend.Shutdown()
return nil
})
if err != nil {
klog.Errorf("Failed to add pre-shutdown hook for audit-backend %s", err)
}
}
return preparedGenericAPIServer{s}
}
@ -439,21 +428,31 @@ func (s *GenericAPIServer) PrepareRun() preparedGenericAPIServer {
// | |
// AfterShutdownDelayDuration (delayedStopCh) PreShutdownHooksStopped (preShutdownHooksHasStoppedCh)
// | |
// |---------------------------------- |
// | | |
// | (HandlerChainWaitGroup::Wait) |
// | | |
// | InFlightRequestsDrained (drainedCh) |
// | | |
// [without ShutdownSendRetryAfter] [with ShutdownSendRetryAfter] |
// | | |
// ---------------------------------------------------------
// |
// stopHttpServerCh
// |
// listenerStoppedCh
// |
// HTTPServerStoppedListening (httpServerStoppedListeningCh)
// |-------------------------------------------------------|
// |
// |
// NotAcceptingNewRequest (notAcceptingNewRequestCh)
// |
// |
// |---------------------------------------------------------|
// | | | |
// [without [with | |
// ShutdownSendRetryAfter] ShutdownSendRetryAfter] | |
// | | | |
// | ---------------| |
// | | |
// | (HandlerChainWaitGroup::Wait) |
// | | |
// | InFlightRequestsDrained (drainedCh) |
// | | |
// ----------------------------------------|-----------------|
// | |
// stopHttpServerCh (AuditBackend::Shutdown())
// |
// listenerStoppedCh
// |
// HTTPServerStoppedListening (httpServerStoppedListeningCh)
//
func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error {
delayedStopCh := s.lifecycleSignals.AfterShutdownDelayDuration
shutdownInitiatedCh := s.lifecycleSignals.ShutdownInitiated
@ -494,8 +493,6 @@ func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error {
}()
// close socket after delayed stopCh
drainedCh := s.lifecycleSignals.InFlightRequestsDrained
delayedStopOrDrainedCh := delayedStopCh.Signaled()
shutdownTimeout := s.ShutdownTimeout
if s.ShutdownSendRetryAfter {
// when this mode is enabled, we do the following:
@ -504,19 +501,22 @@ func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error {
// - once drained, http Server Shutdown is invoked with a timeout of 2s,
// net/http waits for 1s for the peer to respond to a GO_AWAY frame, so
// we should wait for a minimum of 2s
delayedStopOrDrainedCh = drainedCh.Signaled()
shutdownTimeout = 2 * time.Second
klog.V(1).InfoS("[graceful-termination] using HTTP Server shutdown timeout", "ShutdownTimeout", shutdownTimeout)
}
// pre-shutdown hooks need to finish before we stop the http server
preShutdownHooksHasStoppedCh := s.lifecycleSignals.PreShutdownHooksStopped
notAcceptingNewRequestCh := s.lifecycleSignals.NotAcceptingNewRequest
drainedCh := s.lifecycleSignals.InFlightRequestsDrained
stopHttpServerCh := make(chan struct{})
go func() {
defer close(stopHttpServerCh)
<-delayedStopOrDrainedCh
<-preShutdownHooksHasStoppedCh.Signaled()
timeToStopHttpServerCh := notAcceptingNewRequestCh.Signaled()
if s.ShutdownSendRetryAfter {
timeToStopHttpServerCh = drainedCh.Signaled()
}
<-timeToStopHttpServerCh
}()
// Start the audit backend before any request comes in. This means we must call Backend.Run
@ -540,13 +540,29 @@ func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error {
klog.V(1).InfoS("[graceful-termination] shutdown event", "name", httpServerStoppedListeningCh.Name())
}()
// we don't accept new request as soon as both ShutdownDelayDuration has
// elapsed and preshutdown hooks have completed.
preShutdownHooksHasStoppedCh := s.lifecycleSignals.PreShutdownHooksStopped
go func() {
defer drainedCh.Signal()
defer klog.V(1).InfoS("[graceful-termination] shutdown event", "name", drainedCh.Name())
defer klog.V(1).InfoS("[graceful-termination] shutdown event", "name", notAcceptingNewRequestCh.Name())
defer notAcceptingNewRequestCh.Signal()
// wait for the delayed stopCh before closing the handler chain
<-delayedStopCh.Signaled()
// Additionally wait for preshutdown hooks to also be finished, as some of them need
// to send API calls to clean up after themselves (e.g. lease reconcilers removing
// itself from the active servers).
<-preShutdownHooksHasStoppedCh.Signaled()
}()
go func() {
defer klog.V(1).InfoS("[graceful-termination] shutdown event", "name", drainedCh.Name())
defer drainedCh.Signal()
// wait for the delayed stopCh before closing the handler chain (it rejects everything after Wait has been called).
<-notAcceptingNewRequestCh.Signaled()
// Wait for all requests to finish, which are bounded by the RequestTimeout variable.
// once HandlerChainWaitGroup.Wait is invoked, the apiserver is
// expected to reject any incoming request with a {503, Retry-After}
@ -578,11 +594,17 @@ func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error {
if err != nil {
return err
}
klog.V(1).Info("[graceful-termination] RunPreShutdownHooks has completed")
// Wait for all requests in flight to drain, bounded by the RequestTimeout variable.
<-drainedCh.Signaled()
if s.AuditBackend != nil {
s.AuditBackend.Shutdown()
klog.V(1).InfoS("[graceful-termination] audit backend shutdown completed")
}
// wait for stoppedCh that is closed when the graceful termination (server.Shutdown) is finished.
<-listenerStoppedCh
<-stoppedCh
klog.V(1).Info("[graceful-termination] apiserver is exiting")
@ -612,10 +634,6 @@ func (s preparedGenericAPIServer) NonBlockingRun(stopCh <-chan struct{}, shutdow
go func() {
<-stopCh
close(internalStopCh)
if stoppedCh != nil {
<-stoppedCh
}
s.HandlerChainWaitGroup.Wait()
}()
s.RunPostStartHooks(stopCh)

View File

@ -85,6 +85,7 @@ func wrapLifecycleSignalsWithRecorer(t *testing.T, signals *lifecycleSignals, be
// an asynchronous process.
signals.AfterShutdownDelayDuration = wrapLifecycleSignal(t, signals.AfterShutdownDelayDuration, before, nil)
signals.PreShutdownHooksStopped = wrapLifecycleSignal(t, signals.PreShutdownHooksStopped, before, nil)
signals.NotAcceptingNewRequest = wrapLifecycleSignal(t, signals.NotAcceptingNewRequest, before, nil)
signals.HTTPServerStoppedListening = wrapLifecycleSignal(t, signals.HTTPServerStoppedListening, before, nil)
signals.InFlightRequestsDrained = wrapLifecycleSignal(t, signals.InFlightRequestsDrained, before, nil)
signals.ShutdownInitiated = wrapLifecycleSignal(t, signals.ShutdownInitiated, before, nil)
@ -139,6 +140,7 @@ func newSignalInterceptingTestStep() *signalInterceptingTestStep {
// | |
// |--------------------------------------------|
// | |
// | (NotAcceptingNewRequest)
// | |
// | |-------------------------------------------------|
// | | |
@ -154,13 +156,17 @@ func newSignalInterceptingTestStep() *signalInterceptingTestStep {
// | | |
// | wait up to 60s |
// | | (InFlightRequestsDrained)
// | | |
// | | |
// | stoppedCh is closed s.AuditBackend.Shutdown()
// | |
// | |
// | stoppedCh is closed
// |
// |
// <-drainedCh.Signaled()
// |
// s.AuditBackend.Shutdown()
// |
// <-listenerStoppedCh
// |
// <-stoppedCh
// |
// return nil
@ -248,25 +254,23 @@ func TestGracefulTerminationWithKeepListeningDuringGracefulTerminationDisabled(t
// preshutdown hook has not completed yet, new incomng request should succeed
resultGot = doer.Do(newClient(true), shouldUseNewConnection(t), "/echo?message=request-on-a-new-tcp-connection-should-succeed", time.Second)
// TODO: we expect the request to succeed with http.StatusOK,
// https://github.com/kubernetes/kubernetes/pull/110026 will fix it.
if err := assertResponseStatusCode(resultGot, http.StatusServiceUnavailable); err != nil {
if err := assertResponseStatusCode(resultGot, http.StatusOK); err != nil {
t.Errorf("%s", err.Error())
}
// let the preshutdown hook issue an API call now, and then let's wait
// for it to return the result, it should succeed.
// let the preshutdown hook issue an API call now, and then
// let's wait for it to return the result.
close(preShutdownHook.blockedCh)
preShutdownHookResult := <-preShutdownHook.resultCh
waitForeverUntilSignaled(t, signals.PreShutdownHooksStopped)
// TODO: the API call from the preshutdown hook is expected to pass wth
// http.StatusOK, https://github.com/kubernetes/kubernetes/pull/110026
// will fix it.
if err := assertResponseStatusCode(preShutdownHookResult, http.StatusServiceUnavailable); err != nil {
if err := assertResponseStatusCode(preShutdownHookResult, http.StatusOK); err != nil {
t.Errorf("%s", err.Error())
}
waitForeverUntilSignaled(t, signals.PreShutdownHooksStopped)
// both AfterShutdownDelayDuration and PreShutdownHooksCompleted
// have been signaled, we should not be accepting new request
waitForeverUntilSignaled(t, signals.NotAcceptingNewRequest)
waitForeverUntilSignaled(t, signals.HTTPServerStoppedListening)
resultGot = doer.Do(newClient(true), shouldUseNewConnection(t), "/echo?message=request-on-a-new-tcp-connection-should-fail-with-503", time.Second)
@ -313,10 +317,15 @@ func TestGracefulTerminationWithKeepListeningDuringGracefulTerminationDisabled(t
t.Log("Waiting for the apiserver Run method to return")
waitForeverUntil(t, runCompletedCh, "the apiserver Run method did not return")
if !fakeAudit.shutdownCompleted() {
t.Errorf("Expected AuditBackend.Shutdown to be completed")
}
if err := recorder.verify([]string{
"ShutdownInitiated",
"AfterShutdownDelayDuration",
"PreShutdownHooksStopped",
"NotAcceptingNewRequest",
"HTTPServerStoppedListening",
"InFlightRequestsDrained",
}); err != nil {
@ -328,7 +337,7 @@ func TestGracefulTerminationWithKeepListeningDuringGracefulTerminationDisabled(t
// described in the following diagram
// - every vertical line is an independent timeline
// - the leftmost vertical line represents the go routine that
// is executing GenericAPIServer.Run methos
// is executing GenericAPIServer.Run method
// - (signal) indicates that the given lifecycle signal has been fired
//
// stopCh
@ -344,26 +353,28 @@ func TestGracefulTerminationWithKeepListeningDuringGracefulTerminationDisabled(t
// | |
// |--------------------------------------------|
// | |
// | (NotAcceptingNewRequest)
// | |
// | HandlerChainWaitGroup.Wait()
// | |
// | (InFlightRequestsDrained)
// | |
// | |
// | |-------------------------------------|
// | | |
// | | close(stopHttpServerCh)
// | | |
// | s.AuditBackend.Shutdown() server.Shutdown(timeout=2s)
// |------------------------------------------------------------|
// | |
// <-drainedCh.Signaled() close(stopHttpServerCh)
// | |
// s.AuditBackend.Shutdown() server.Shutdown(timeout=2s)
// | |
// | stop listener (net/http)
// | |
// | |-------------------------------------|
// <-drainedCh.Signaled() | |
// | | |
// | wait up to 2s (HTTPServerStoppedListening)
// <-stoppedCh |
// <-listenerStoppedCh |
// | stoppedCh is closed
// <-stoppedCh
// |
// return nil
//
func TestGracefulTerminationWithKeepListeningDuringGracefulTerminationEnabled(t *testing.T) {
@ -449,9 +460,7 @@ func TestGracefulTerminationWithKeepListeningDuringGracefulTerminationEnabled(t
// preshutdown hook has not completed yet, new incomng request should succeed
resultGot = doer.Do(newClient(true), shouldUseNewConnection(t), "/echo?message=request-on-a-new-tcp-connection-should-succeed", time.Second)
// TODO: we expect the request to succeed with http.StatusOK,
// https://github.com/kubernetes/kubernetes/pull/110026 will fix it.
if err := assertResponseStatusCode(resultGot, http.StatusTooManyRequests); err != nil {
if err := assertResponseStatusCode(resultGot, http.StatusOK); err != nil {
t.Errorf("%s", err.Error())
}
@ -460,13 +469,12 @@ func TestGracefulTerminationWithKeepListeningDuringGracefulTerminationEnabled(t
close(preShutdownHook.blockedCh)
preShutdownHookResult := <-preShutdownHook.resultCh
waitForeverUntilSignaled(t, signals.PreShutdownHooksStopped)
// TODO: the API call from the preshutdown hook is expected to pass wth
// http.StatusOK, https://github.com/kubernetes/kubernetes/pull/110026
// will fix it.
if err := assertResponseStatusCode(preShutdownHookResult, http.StatusTooManyRequests); err != nil {
if err := assertResponseStatusCode(preShutdownHookResult, http.StatusOK); err != nil {
t.Errorf("%s", err.Error())
}
waitForeverUntilSignaled(t, signals.NotAcceptingNewRequest)
// both AfterShutdownDelayDuration and PreShutdownHooksCompleted
// have been signaled, any incoming request should receive 429
resultGot = doer.Do(newClient(true), shouldUseNewConnection(t), "/echo?message=request-on-a-new-tcp-connection-should-fail-with-429", time.Second)
@ -494,10 +502,15 @@ func TestGracefulTerminationWithKeepListeningDuringGracefulTerminationEnabled(t
t.Log("Waiting for the apiserver Run method to return")
waitForeverUntil(t, runCompletedCh, "the apiserver Run method did not return")
if !fakeAudit.shutdownCompleted() {
t.Errorf("Expected AuditBackend.Shutdown to be completed")
}
if err := recorder.verify([]string{
"ShutdownInitiated",
"AfterShutdownDelayDuration",
"PreShutdownHooksStopped",
"NotAcceptingNewRequest",
"InFlightRequestsDrained",
"HTTPServerStoppedListening",
}); err != nil {
@ -584,9 +597,9 @@ func TestPreShutdownHooks(t *testing.T) {
client := newClient(true)
for i := 0; i < 5; i++ {
r := doer.Do(client, func(httptrace.GotConnInfo) {}, fmt.Sprintf("/echo?message=attempt-%d", i), 1*time.Second)
// TODO: this is broken, we should check the response for a status code of 200
// https://github.com/kubernetes/kubernetes/pull/110026 fixes this issue.
if r.err != nil {
err = r.err
if err == nil && r.response.StatusCode != http.StatusOK {
err = fmt.Errorf("did not get status code 200 - %#v", r.response)
break
}
time.Sleep(time.Second)
@ -715,6 +728,7 @@ type fakeAudit struct {
shutdownCh chan struct{}
lock sync.Mutex
audits map[string]struct{}
completed bool
}
func (a *fakeAudit) Run(stopCh <-chan struct{}) error {
@ -727,14 +741,24 @@ func (a *fakeAudit) Run(stopCh <-chan struct{}) error {
}
func (a *fakeAudit) Shutdown() {
// TODO: uncomment it in https://github.com/kubernetes/kubernetes/pull/110026
// <-a.shutdownCh
<-a.shutdownCh
a.lock.Lock()
defer a.lock.Unlock()
a.completed = true
}
func (a *fakeAudit) String() string {
return "fake-audit"
}
func (a *fakeAudit) shutdownCompleted() bool {
a.lock.Lock()
defer a.lock.Unlock()
return a.completed
}
func (a *fakeAudit) ProcessEvents(events ...*auditinternal.Event) bool {
a.lock.Lock()
defer a.lock.Unlock()

View File

@ -124,6 +124,11 @@ type lifecycleSignals struct {
// preshutdown hook(s) have finished running.
PreShutdownHooksStopped lifecycleSignal
// NotAcceptingNewRequest event is signaled when the server is no
// longer accepting any new request, from this point on any new
// request will receive an error.
NotAcceptingNewRequest lifecycleSignal
// InFlightRequestsDrained event is signaled when the existing requests
// in flight have completed. This is used as signal to shut down the audit backends
InFlightRequestsDrained lifecycleSignal
@ -148,6 +153,7 @@ func newLifecycleSignals() lifecycleSignals {
ShutdownInitiated: newNamedChannelWrapper("ShutdownInitiated"),
AfterShutdownDelayDuration: newNamedChannelWrapper("AfterShutdownDelayDuration"),
PreShutdownHooksStopped: newNamedChannelWrapper("PreShutdownHooksStopped"),
NotAcceptingNewRequest: newNamedChannelWrapper("NotAcceptingNewRequest"),
InFlightRequestsDrained: newNamedChannelWrapper("InFlightRequestsDrained"),
HTTPServerStoppedListening: newNamedChannelWrapper("HTTPServerStoppedListening"),
HasBeenReady: newNamedChannelWrapper("HasBeenReady"),