From 3182b69e970bd1fd036ff839fdf811f14e790244 Mon Sep 17 00:00:00 2001 From: Abu Kashem Date: Wed, 14 Jul 2021 10:39:29 -0400 Subject: [PATCH] apiserver: add a new mode for graceful termination add a new mode for graceful termination with the new server run option 'shutdown-send-retry-after' - shutdown-send-retry-after=true: we initiate shutdown of the HTTP Server when all in-flight request(s) have been drained. during this window all incoming requests are rejected with status code 429 and the following response headers: - 'Retry-After: N' - client should retry after N seconds - 'Connection: close' - tear down the TCP connection - shutdown-send-retry-after=false: we initiate shutdown of the HTTP Server as soon as shutdown-delay-duration has elapsed. This is in keeping with the current behavior. --- .../src/k8s.io/apiserver/pkg/server/config.go | 15 +- .../apiserver/pkg/server/filters/waitgroup.go | 44 ++-- .../pkg/server/filters/with_retry_after.go | 130 +++++++++++ .../server/filters/with_retry_after_test.go | 207 ++++++++++++++++++ .../apiserver/pkg/server/genericapiserver.go | 32 ++- ...ericapiserver_graceful_termination_test.go | 181 ++++++++++++++- .../apiserver/pkg/server/lifecycle_signals.go | 29 ++- .../pkg/server/options/server_run_options.go | 16 ++ 8 files changed, 626 insertions(+), 28 deletions(-) create mode 100644 staging/src/k8s.io/apiserver/pkg/server/filters/with_retry_after.go create mode 100644 staging/src/k8s.io/apiserver/pkg/server/filters/with_retry_after_test.go diff --git a/staging/src/k8s.io/apiserver/pkg/server/config.go b/staging/src/k8s.io/apiserver/pkg/server/config.go index ef68dd23c5f..d068c7e0c4c 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/config.go +++ b/staging/src/k8s.io/apiserver/pkg/server/config.go @@ -231,6 +231,15 @@ type Config struct { // in the storage per resource, so we can estimate width of incoming requests. StorageObjectCountTracker flowcontrolrequest.StorageObjectCountTracker + // ShutdownSendRetryAfter dictates when to initiate shutdown of the HTTP + // Server during the graceful termination of the apiserver. If true, we wait + // for non longrunning requests in flight to be drained and then initiate a + // shutdown of the HTTP Server. If false, we initiate a shutdown of the HTTP + // Server as soon as ShutdownDelayDuration has elapsed. + // If enabled, after ShutdownDelayDuration elapses, any incoming request is + // rejected with a 429 status code and a 'Retry-After' response. + ShutdownSendRetryAfter bool + //=========================================================================== // values below here are targets for removal //=========================================================================== @@ -611,7 +620,8 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G maxRequestBodyBytes: c.MaxRequestBodyBytes, livezClock: clock.RealClock{}, - lifecycleSignals: c.lifecycleSignals, + lifecycleSignals: c.lifecycleSignals, + ShutdownSendRetryAfter: c.ShutdownSendRetryAfter, APIServerID: c.APIServerID, StorageVersionManager: c.StorageVersionManager, @@ -789,6 +799,9 @@ func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler { handler = genericapifilters.WithWarningRecorder(handler) handler = genericapifilters.WithCacheControl(handler) handler = genericfilters.WithHSTS(handler, c.HSTSDirectives) + if c.ShutdownSendRetryAfter { + handler = genericfilters.WithRetryAfter(handler, c.lifecycleSignals.AfterShutdownDelayDuration.Signaled()) + } handler = genericfilters.WithHTTPLogging(handler) if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerTracing) { handler = genericapifilters.WithTracing(handler, c.TracerProvider) diff --git a/staging/src/k8s.io/apiserver/pkg/server/filters/waitgroup.go b/staging/src/k8s.io/apiserver/pkg/server/filters/waitgroup.go index 857ce188307..70b32c76697 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/filters/waitgroup.go +++ b/staging/src/k8s.io/apiserver/pkg/server/filters/waitgroup.go @@ -32,6 +32,12 @@ import ( // WithWaitGroup adds all non long-running requests to wait group, which is used for graceful shutdown. func WithWaitGroup(handler http.Handler, longRunning apirequest.LongRunningRequestCheck, wg *utilwaitgroup.SafeWaitGroup) http.Handler { + // NOTE: both WithWaitGroup and WithRetryAfter must use the same exact isRequestExemptFunc 'isRequestExemptFromRetryAfter, + // otherwise SafeWaitGroup might wait indefinitely and will prevent the server from shutting down gracefully. + return withWaitGroup(handler, longRunning, wg, isRequestExemptFromRetryAfter) +} + +func withWaitGroup(handler http.Handler, longRunning apirequest.LongRunningRequestCheck, wg *utilwaitgroup.SafeWaitGroup, isRequestExemptFn isRequestExemptFunc) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { ctx := req.Context() requestInfo, ok := apirequest.RequestInfoFrom(ctx) @@ -41,21 +47,33 @@ func WithWaitGroup(handler http.Handler, longRunning apirequest.LongRunningReque return } - if !longRunning(req, requestInfo) { - if err := wg.Add(1); err != nil { - // When apiserver is shutting down, signal clients to retry - // There is a good chance the client hit a different server, so a tight retry is good for client responsiveness. - w.Header().Add("Retry-After", "1") - w.Header().Set("Content-Type", runtime.ContentTypeJSON) - w.Header().Set("X-Content-Type-Options", "nosniff") - statusErr := apierrors.NewServiceUnavailable("apiserver is shutting down").Status() - w.WriteHeader(int(statusErr.Code)) - fmt.Fprintln(w, runtime.EncodeOrDie(scheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &statusErr)) - return - } - defer wg.Done() + if longRunning(req, requestInfo) { + handler.ServeHTTP(w, req) + return } + if err := wg.Add(1); err != nil { + // shutdown delay duration has elapsed and SafeWaitGroup.Wait has been invoked, + // this means 'WithRetryAfter' has started sending Retry-After response. + // we are going to exempt the same set of requests that WithRetryAfter are + // exempting from being rejected with a Retry-After response. + if isRequestExemptFn(req) { + handler.ServeHTTP(w, req) + return + } + + // When apiserver is shutting down, signal clients to retry + // There is a good chance the client hit a different server, so a tight retry is good for client responsiveness. + w.Header().Add("Retry-After", "1") + w.Header().Set("Content-Type", runtime.ContentTypeJSON) + w.Header().Set("X-Content-Type-Options", "nosniff") + statusErr := apierrors.NewServiceUnavailable("apiserver is shutting down").Status() + w.WriteHeader(int(statusErr.Code)) + fmt.Fprintln(w, runtime.EncodeOrDie(scheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &statusErr)) + return + } + + defer wg.Done() handler.ServeHTTP(w, req) }) } diff --git a/staging/src/k8s.io/apiserver/pkg/server/filters/with_retry_after.go b/staging/src/k8s.io/apiserver/pkg/server/filters/with_retry_after.go new file mode 100644 index 00000000000..c5e2daa8ed7 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/server/filters/with_retry_after.go @@ -0,0 +1,130 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package filters + +import ( + "net/http" + "strings" +) + +var ( + // health probes and metrics scraping are never rejected, we will continue + // serving these requests after shutdown delay duration elapses. + pathPrefixesExemptFromRetryAfter = []string{ + "/readyz", + "/livez", + "/healthz", + "/metrics", + } +) + +// isRequestExemptFunc returns true if the request should not be rejected, +// with a Retry-After response, otherwise it returns false. +type isRequestExemptFunc func(*http.Request) bool + +// retryAfterParams dictates how the Retry-After response is constructed +type retryAfterParams struct { + // TearDownConnection is true when we should send a 'Connection: close' + // header in the response so net/http can tear down the TCP connection. + TearDownConnection bool + + // Message describes why Retry-After response has been sent by the server + Message string +} + +// shouldRespondWithRetryAfterFunc returns true if the requests should +// be rejected with a Retry-After response once certain conditions are met. +// The retryAfterParams returned contains instructions on how to +// construct the Retry-After response. +type shouldRespondWithRetryAfterFunc func() (*retryAfterParams, bool) + +// WithRetryAfter rejects any incoming new request(s) with a 429 +// if the specified shutdownDelayDurationElapsedFn channel is closed +// +// It includes new request(s) on a new or an existing TCP connection +// Any new request(s) arriving after shutdownDelayDurationElapsedFn is closed +// are replied with a 429 and the following response headers: +// - 'Retry-After: N` (so client can retry after N seconds, hopefully on a new apiserver instance) +// - 'Connection: close': tear down the TCP connection +// +// TODO: is there a way to merge WithWaitGroup and this filter? +func WithRetryAfter(handler http.Handler, shutdownDelayDurationElapsedCh <-chan struct{}) http.Handler { + shutdownRetryAfterParams := &retryAfterParams{ + TearDownConnection: true, + Message: "The apiserver is shutting down, please try again later.", + } + + // NOTE: both WithRetryAfter and WithWaitGroup must use the same exact isRequestExemptFunc 'isRequestExemptFromRetryAfter, + // otherwise SafeWaitGroup might wait indefinitely and will prevent the server from shutting down gracefully. + return withRetryAfter(handler, isRequestExemptFromRetryAfter, func() (*retryAfterParams, bool) { + select { + case <-shutdownDelayDurationElapsedCh: + return shutdownRetryAfterParams, true + default: + return nil, false + } + }) +} + +func withRetryAfter(handler http.Handler, isRequestExemptFn isRequestExemptFunc, shouldRespondWithRetryAfterFn shouldRespondWithRetryAfterFunc) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + params, send := shouldRespondWithRetryAfterFn() + if !send || isRequestExemptFn(req) { + handler.ServeHTTP(w, req) + return + } + + // If we are here this means it's time to send Retry-After response + // + // Copied from net/http2 library + // "Connection" headers aren't allowed in HTTP/2 (RFC 7540, 8.1.2.2), + // but respect "Connection" == "close" to mean sending a GOAWAY and tearing + // down the TCP connection when idle, like we do for HTTP/1. + if params.TearDownConnection { + w.Header().Set("Connection", "close") + } + + // Return a 429 status asking the client to try again after 5 seconds + w.Header().Set("Retry-After", "5") + http.Error(w, params.Message, http.StatusTooManyRequests) + }) +} + +// isRequestExemptFromRetryAfter returns true if the given request should be exempt +// from being rejected with a 'Retry-After' response. +// NOTE: both 'WithRetryAfter' and 'WithWaitGroup' filters should use this function +// to exempt the set of requests from being rejected or tracked. +func isRequestExemptFromRetryAfter(r *http.Request) bool { + return isKubeApiserverUserAgent(r) || hasExemptPathPrefix(r) +} + +// isKubeApiserverUserAgent returns true if the user-agent matches +// the one set by the local loopback. +// NOTE: we can't look up the authenticated user informaion from the +// request context since the authentication filter has not executed yet. +func isKubeApiserverUserAgent(req *http.Request) bool { + return strings.HasPrefix(req.UserAgent(), "kube-apiserver/") +} + +func hasExemptPathPrefix(r *http.Request) bool { + for _, whiteListedPrefix := range pathPrefixesExemptFromRetryAfter { + if strings.HasPrefix(r.URL.Path, whiteListedPrefix) { + return true + } + } + return false +} diff --git a/staging/src/k8s.io/apiserver/pkg/server/filters/with_retry_after_test.go b/staging/src/k8s.io/apiserver/pkg/server/filters/with_retry_after_test.go new file mode 100644 index 00000000000..cb2ec54f4c6 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/server/filters/with_retry_after_test.go @@ -0,0 +1,207 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package filters + +import ( + "net/http" + "net/http/httptest" + "testing" + + utilwaitgroup "k8s.io/apimachinery/pkg/util/waitgroup" + apirequest "k8s.io/apiserver/pkg/endpoints/request" +) + +func TestWithRetryAfter(t *testing.T) { + tests := []struct { + name string + shutdownDelayDurationElapsedFn func() <-chan struct{} + requestURL string + userAgent string + safeWaitGroupIsWaiting bool + handlerInvoked int + closeExpected string + retryAfterExpected string + statusCodeExpected int + }{ + { + name: "retry-after disabled", + shutdownDelayDurationElapsedFn: func() <-chan struct{} { + return newChannel(false) + }, + requestURL: "/api/v1/namespaces", + userAgent: "foo", + handlerInvoked: 1, + closeExpected: "", + retryAfterExpected: "", + statusCodeExpected: http.StatusOK, + }, + { + name: "retry-after enabled, request is not exempt", + shutdownDelayDurationElapsedFn: func() <-chan struct{} { + return newChannel(true) + }, + requestURL: "/api/v1/namespaces", + userAgent: "foo", + handlerInvoked: 0, + closeExpected: "close", + retryAfterExpected: "5", + statusCodeExpected: http.StatusTooManyRequests, + }, + { + name: "retry-after enabled, request is exempt(/metrics)", + shutdownDelayDurationElapsedFn: func() <-chan struct{} { + return newChannel(true) + }, + requestURL: "/metrics?foo=bar", + userAgent: "foo", + handlerInvoked: 1, + closeExpected: "", + retryAfterExpected: "", + statusCodeExpected: http.StatusOK, + }, + { + name: "retry-after enabled, request is exempt(/livez)", + shutdownDelayDurationElapsedFn: func() <-chan struct{} { + return newChannel(true) + }, + requestURL: "/livez?verbose", + userAgent: "foo", + handlerInvoked: 1, + closeExpected: "", + retryAfterExpected: "", + statusCodeExpected: http.StatusOK, + }, + { + name: "retry-after enabled, request is exempt(/readyz)", + shutdownDelayDurationElapsedFn: func() <-chan struct{} { + return newChannel(true) + }, + requestURL: "/readyz?verbose", + userAgent: "foo", + handlerInvoked: 1, + closeExpected: "", + retryAfterExpected: "", + statusCodeExpected: http.StatusOK, + }, + { + name: "retry-after enabled, request is exempt(/healthz)", + shutdownDelayDurationElapsedFn: func() <-chan struct{} { + return newChannel(true) + }, + requestURL: "/healthz?verbose", + userAgent: "foo", + handlerInvoked: 1, + closeExpected: "", + retryAfterExpected: "", + statusCodeExpected: http.StatusOK, + }, + { + name: "retry-after enabled, request is exempt(local loopback)", + shutdownDelayDurationElapsedFn: func() <-chan struct{} { + return newChannel(true) + }, + requestURL: "/api/v1/namespaces", + userAgent: "kube-apiserver/", + handlerInvoked: 1, + closeExpected: "", + retryAfterExpected: "", + statusCodeExpected: http.StatusOK, + }, + { + name: "nil channel", + shutdownDelayDurationElapsedFn: func() <-chan struct{} { + return nil + }, + requestURL: "/api/v1/namespaces", + userAgent: "foo", + handlerInvoked: 1, + closeExpected: "", + retryAfterExpected: "", + statusCodeExpected: http.StatusOK, + }, + { + name: "retry-after enabled, request is exempt(/readyz), SafeWaitGroup is in waiting mode", + shutdownDelayDurationElapsedFn: func() <-chan struct{} { + return newChannel(true) + }, + requestURL: "/readyz?verbose", + userAgent: "foo", + safeWaitGroupIsWaiting: true, + handlerInvoked: 1, + closeExpected: "", + retryAfterExpected: "", + statusCodeExpected: http.StatusOK, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + var handlerInvoked int + handler := http.HandlerFunc(func(_ http.ResponseWriter, req *http.Request) { + handlerInvoked++ + }) + + safeWG := new(utilwaitgroup.SafeWaitGroup) + if test.safeWaitGroupIsWaiting { + // mark the safe wait group as waiting, it's a blocking call + // but since the WaitGroup counter is zero it should not block + safeWG.Wait() + } + + wrapped := WithWaitGroup(handler, func(*http.Request, *apirequest.RequestInfo) bool { + return false + }, safeWG) + wrapped = WithRetryAfter(wrapped, test.shutdownDelayDurationElapsedFn()) + + req, err := http.NewRequest(http.MethodGet, test.requestURL, nil) + if err != nil { + t.Fatalf("failed to create new http request - %v", err) + } + + req.Header.Set("User-Agent", test.userAgent) + req = req.WithContext(apirequest.WithRequestInfo(req.Context(), &apirequest.RequestInfo{})) + + w := httptest.NewRecorder() + wrapped.ServeHTTP(w, req) + + if test.handlerInvoked != handlerInvoked { + t.Errorf("expected the handler to be invoked: %d timed, but got: %d", test.handlerInvoked, handlerInvoked) + } + if test.statusCodeExpected != w.Result().StatusCode { + t.Errorf("expected status code: %d, but got: %d", test.statusCodeExpected, w.Result().StatusCode) + } + + closeGot := w.Header().Get("Connection") + if test.closeExpected != closeGot { + t.Errorf("expected Connection close: %s, but got: %s", test.closeExpected, closeGot) + } + + retryAfterGot := w.Header().Get("Retry-After") + if test.retryAfterExpected != retryAfterGot { + t.Errorf("expected Retry-After: %s, but got: %s", test.retryAfterExpected, retryAfterGot) + } + }) + } +} + +func newChannel(closed bool) <-chan struct{} { + ch := make(chan struct{}) + if closed { + close(ch) + } + return ch +} diff --git a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go index 5f4e8e348a8..8104cd2f05e 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go +++ b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go @@ -213,6 +213,15 @@ type GenericAPIServer struct { // lifecycleSignals provides access to the various signals that happen during the life cycle of the apiserver. lifecycleSignals lifecycleSignals + + // ShutdownSendRetryAfter dictates when to initiate shutdown of the HTTP + // Server during the graceful termination of the apiserver. If true, we wait + // for non longrunning requests in flight to be drained and then initiate a + // shutdown of the HTTP Server. If false, we initiate a shutdown of the HTTP + // Server as soon as ShutdownDelayDuration has elapsed. + // If enabled, after ShutdownDelayDuration elapses, any incoming request is + // rejected with a 429 status code and a 'Retry-After' response. + ShutdownSendRetryAfter bool } // DelegationTarget is an interface which allows for composition of API servers with top level handling that works @@ -352,7 +361,22 @@ func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error { }() // close socket after delayed stopCh - stoppedCh, listenerStoppedCh, err := s.NonBlockingRun(delayedStopCh.Signaled()) + drainedCh := s.lifecycleSignals.InFlightRequestsDrained + stopHttpServerCh := delayedStopCh.Signaled() + shutdownTimeout := s.ShutdownTimeout + if s.ShutdownSendRetryAfter { + // when this mode is enabled, we do the following: + // - the server will continue to listen until all existing requests in flight + // (not including active long runnning requests) have been drained. + // - once drained, http Server Shutdown is invoked with a timeout of 2s, + // net/http waits for 1s for the peer to respond to a GO_AWAY frame, so + // we should wait for a minimum of 2s + stopHttpServerCh = drainedCh.Signaled() + shutdownTimeout = 2 * time.Second + klog.V(1).InfoS("[graceful-termination] using HTTP Server shutdown timeout", "ShutdownTimeout", shutdownTimeout) + } + + stoppedCh, listenerStoppedCh, err := s.NonBlockingRun(stopHttpServerCh, shutdownTimeout) if err != nil { return err } @@ -363,7 +387,6 @@ func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error { klog.V(1).InfoS("[graceful-termination] shutdown event", "name", httpServerStoppedListeningCh.Name()) }() - drainedCh := s.lifecycleSignals.InFlightRequestsDrained go func() { defer drainedCh.Signal() defer klog.V(1).InfoS("[graceful-termination] shutdown event", "name", drainedCh.Name()) @@ -397,7 +420,7 @@ func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error { // NonBlockingRun spawns the secure http server. An error is // returned if the secure port cannot be listened on. // The returned channel is closed when the (asynchronous) termination is finished. -func (s preparedGenericAPIServer) NonBlockingRun(stopCh <-chan struct{}) (<-chan struct{}, <-chan struct{}, error) { +func (s preparedGenericAPIServer) NonBlockingRun(stopCh <-chan struct{}, shutdownTimeout time.Duration) (<-chan struct{}, <-chan struct{}, error) { // Use an stop channel to allow graceful shutdown without dropping audit events // after http server shutdown. auditStopCh := make(chan struct{}) @@ -416,8 +439,7 @@ func (s preparedGenericAPIServer) NonBlockingRun(stopCh <-chan struct{}) (<-chan var listenerStoppedCh <-chan struct{} if s.SecureServingInfo != nil && s.Handler != nil { var err error - klog.V(1).Infof("[graceful-termination] ShutdownTimeout=%s", s.ShutdownTimeout) - stoppedCh, listenerStoppedCh, err = s.SecureServingInfo.ServeWithListenerStopped(s.Handler, s.ShutdownTimeout, internalStopCh) + stoppedCh, listenerStoppedCh, err = s.SecureServingInfo.ServeWithListenerStopped(s.Handler, shutdownTimeout, internalStopCh) if err != nil { close(internalStopCh) close(auditStopCh) diff --git a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver_graceful_termination_test.go b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver_graceful_termination_test.go index 8e2554d929a..d7956c9f64e 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver_graceful_termination_test.go +++ b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver_graceful_termination_test.go @@ -111,7 +111,7 @@ func newStep(fn func()) *step { } func TestGracefulTerminationWithKeepListeningDuringGracefulTerminationDisabled(t *testing.T) { - s := newGenericAPIServer(t) + s := newGenericAPIServer(t, false) // record the termination events in the order they are signaled var signalOrderLock sync.Mutex @@ -143,9 +143,9 @@ func TestGracefulTerminationWithKeepListeningDuringGracefulTerminationDisabled(t delayedStopVerificationStepExecuted = true t.Log("Before ShutdownDelayDuration elapses new request(s) should be served") resultGot := doer.Do(connReusingClient, shouldReuseConnection(t), "/echo?message=request-on-an-existing-connection-should-succeed", time.Second) - requestMustSucceed(t, resultGot) + assertResponse(t, resultGot, http.StatusOK) resultGot = doer.Do(newClient(true), shouldUseNewConnection(t), "/echo?message=request-on-a-new-tcp-connection-should-succeed", time.Second) - requestMustSucceed(t, resultGot) + assertResponse(t, resultGot, http.StatusOK) }) steps := func(before bool, name string, e lifecycleSignal) { // Before AfterShutdownDelayDuration event is signaled, the test @@ -208,7 +208,7 @@ func TestGracefulTerminationWithKeepListeningDuringGracefulTerminationDisabled(t case <-time.After(5 * time.Second): t.Fatal("Expected the server to send a response") } - requestMustSucceed(t, inFlightResultGot) + assertResponse(t, inFlightResultGot, http.StatusOK) t.Log("Waiting for the apiserver Run method to return") select { @@ -232,6 +232,153 @@ func TestGracefulTerminationWithKeepListeningDuringGracefulTerminationDisabled(t }() } +func TestGracefulTerminationWithKeepListeningDuringGracefulTerminationEnabled(t *testing.T) { + s := newGenericAPIServer(t, true) + + // record the termination events in the order they are signaled + var signalOrderLock sync.Mutex + signalOrderGot := make([]string, 0) + recordOrderFn := func(before bool, name string, e lifecycleSignal) { + if !before { + return + } + signalOrderLock.Lock() + defer signalOrderLock.Unlock() + signalOrderGot = append(signalOrderGot, name) + } + + // handler for a request that we want to keep in flight through to the end + inFlightRequestBlockedCh, inFlightStartedCh := make(chan result), make(chan struct{}) + inFlightRequest := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + close(inFlightStartedCh) + // this request handler blocks until we deliberately unblock it. + <-inFlightRequestBlockedCh + w.WriteHeader(http.StatusOK) + }) + s.Handler.NonGoRestfulMux.Handle("/in-flight-request-as-designed", inFlightRequest) + + connReusingClient := newClient(false) + doer := setupDoer(t, s.SecureServingInfo) + + var delayedStopVerificationStepExecuted bool + delayedStopVerificationStep := newStep(func() { + delayedStopVerificationStepExecuted = true + t.Log("Before ShutdownDelayDuration elapses new request(s) should be served") + resultGot := doer.Do(connReusingClient, shouldReuseConnection(t), "/echo?message=request-on-an-existing-connection-should-succeed", time.Second) + assertResponse(t, resultGot, http.StatusOK) + resultGot = doer.Do(newClient(true), shouldUseNewConnection(t), "/echo?message=request-on-a-new-tcp-connection-should-succeed", time.Second) + assertResponse(t, resultGot, http.StatusOK) + }) + steps := func(before bool, name string, e lifecycleSignal) { + // Before AfterShutdownDelayDuration event is signaled, the test + // will send request(s) to assert on expected behavior. + if name == "AfterShutdownDelayDuration" && before { + // it unblocks the verification step and waits for it to complete + <-delayedStopVerificationStep.done() + } + } + + // wrap the termination signals of the GenericAPIServer so the test can inject its own callback + wrapLifecycleSignals(t, &s.lifecycleSignals, func(before bool, name string, e lifecycleSignal) { + recordOrderFn(before, name, e) + steps(before, name, e) + }) + + // start the API server + stopCh, runCompletedCh := make(chan struct{}), make(chan struct{}) + go func() { + defer close(runCompletedCh) + s.PrepareRun().Run(stopCh) + }() + waitForAPIServerStarted(t, doer) + + // step 1: fire a request that we want to keep in-flight through to the end + inFlightResultCh := make(chan result) + go func() { + resultGot := doer.Do(connReusingClient, func(httptrace.GotConnInfo) {}, "/in-flight-request-as-designed", 0) + inFlightResultCh <- resultGot + }() + select { + case <-inFlightStartedCh: + case <-time.After(5 * time.Second): + t.Fatalf("Waited for 5s for the in-flight request to reach the server") + } + + //step 1: /readyz should return OK + resultGot := doer.Do(connReusingClient, func(httptrace.GotConnInfo) {}, "/readyz", time.Second) + assertResponse(t, resultGot, http.StatusOK) + + // step 2: signal termination event: initiate a shutdown + close(stopCh) + + // step 3: /readyz must return an error, but we need to give it some time + err := wait.PollImmediate(100*time.Millisecond, 5*time.Second, func() (done bool, err error) { + resultGot := doer.Do(connReusingClient, func(httptrace.GotConnInfo) {}, "/readyz", time.Second) + // wait until we have a non 200 response + if resultGot.response != nil && resultGot.response.StatusCode == http.StatusOK { + return false, nil + } + + assertResponse(t, resultGot, http.StatusInternalServerError) + return true, nil + }) + if err != nil { + t.Errorf("Expected /readyz to return 500 status code, but got: %v", err) + } + + // step 4: before ShutdownDelayDuration elapses new request(s) should be served successfully. + delayedStopVerificationStep.execute() + if !delayedStopVerificationStepExecuted { + t.Fatal("Expected the AfterShutdownDelayDuration verification step to execute") + } + + // step 5: ShutdownDelayDuration has elapsed, all incoming requests should receive 429 + t.Log("Verify that new incoming request(s) get 429") + resultGot = doer.Do(newClient(true), shouldUseNewConnection(t), "/echo?message=request-on-a-new-tcp-connection-should-fail-with-429", time.Second) + requestMustFailWithRetryHeader(t, resultGot, http.StatusTooManyRequests) + resultGot = doer.Do(connReusingClient, shouldReuseConnection(t), "/echo?message=request-on-an-existing-connection-should-fail-with-429", time.Second) + requestMustFailWithRetryHeader(t, resultGot, http.StatusTooManyRequests) + + // step 6: we still have a request in flight, let it unblock and we expect the request to succeed. + close(inFlightRequestBlockedCh) + var inFlightResultGot result + select { + case inFlightResultGot = <-inFlightResultCh: + case <-time.After(5 * time.Second): + t.Fatal("Expected the server to send a response") + } + assertResponse(t, inFlightResultGot, http.StatusOK) + + // step 7: wait for the HTTP Server listener to have stopped + httpServerStoppedListeningCh := s.lifecycleSignals.HTTPServerStoppedListening + select { + case <-httpServerStoppedListeningCh.Signaled(): + case <-time.After(5 * time.Second): + t.Fatal("Expected the server to signal HTTPServerStoppedListening event") + } + + t.Log("Waiting for the apiserver Run method to return") + select { + case <-runCompletedCh: + case <-time.After(5 * time.Second): + t.Fatal("Expected the apiserver Run method to return") + } + + lifecycleSignalOrderExpected := []string{ + string("ShutdownInitiated"), + string("AfterShutdownDelayDuration"), + string("InFlightRequestsDrained"), + string("HTTPServerStoppedListening"), + } + func() { + signalOrderLock.Lock() + defer signalOrderLock.Unlock() + if !reflect.DeepEqual(lifecycleSignalOrderExpected, signalOrderGot) { + t.Errorf("Expected order of termination event signal to match, diff: %s", cmp.Diff(lifecycleSignalOrderExpected, signalOrderGot)) + } + }() +} + func shouldReuseConnection(t *testing.T) func(httptrace.GotConnInfo) { return func(ci httptrace.GotConnInfo) { if !ci.Reused { @@ -248,13 +395,27 @@ func shouldUseNewConnection(t *testing.T) func(httptrace.GotConnInfo) { } } -func requestMustSucceed(t *testing.T, resultGot result) { +func assertResponse(t *testing.T, resultGot result, statusCodeExpected int) { if resultGot.err != nil { t.Errorf("Expected no error, but got: %v", resultGot.err) return } - if resultGot.response.StatusCode != http.StatusOK { - t.Errorf("Expected Status Code: %d, but got: %d", http.StatusOK, resultGot.response.StatusCode) + if resultGot.response.StatusCode != statusCodeExpected { + t.Errorf("Expected Status Code: %d, but got: %d", statusCodeExpected, resultGot.response.StatusCode) + } +} + +func requestMustFailWithRetryHeader(t *testing.T, resultGot result, statusCodedExpected int) { + if resultGot.err != nil { + t.Errorf("Expected no error, but got: %v", resultGot.err) + return + } + if statusCodedExpected != resultGot.response.StatusCode { + t.Errorf("Expected Status Code: %d, but got: %d", statusCodedExpected, resultGot.response.StatusCode) + } + retryAfterGot := resultGot.response.Header.Get("Retry-After") + if retryAfterGot != "5" { + t.Errorf("Expected Retry-After Response Header, but got: %v", resultGot.response) } } @@ -351,11 +512,15 @@ func newClient(useNewConnection bool) *http.Client { } } -func newGenericAPIServer(t *testing.T) *GenericAPIServer { +func newGenericAPIServer(t *testing.T, keepListening bool) *GenericAPIServer { config, _ := setUp(t) config.ShutdownDelayDuration = 100 * time.Millisecond + config.ShutdownSendRetryAfter = keepListening config.BuildHandlerChainFunc = func(apiHandler http.Handler, c *Config) http.Handler { handler := genericfilters.WithWaitGroup(apiHandler, c.LongRunningFunc, c.HandlerChainWaitGroup) + if c.ShutdownSendRetryAfter { + handler = genericfilters.WithRetryAfter(handler, c.lifecycleSignals.AfterShutdownDelayDuration.Signaled()) + } handler = genericapifilters.WithRequestInfo(handler, c.RequestInfoResolver) return handler } diff --git a/staging/src/k8s.io/apiserver/pkg/server/lifecycle_signals.go b/staging/src/k8s.io/apiserver/pkg/server/lifecycle_signals.go index fda4f095199..376167a9c58 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/lifecycle_signals.go +++ b/staging/src/k8s.io/apiserver/pkg/server/lifecycle_signals.go @@ -28,7 +28,9 @@ Events: - InFlightRequestsDrained: all in flight request(s) have been drained - HasBeenReady is signaled when the readyz endpoint succeeds for the first time -The following is a sequence of shutdown events that we expect to see during termination: +The following is a sequence of shutdown events that we expect to see with + 'ShutdownSendRetryAfter' = false: + T0: ShutdownInitiated: KILL signal received - /readyz starts returning red - run pre shutdown hooks @@ -54,6 +56,31 @@ T0 + 70s + up-to 60s: InFlightRequestsDrained: existing in flight requests have any request in flight has a hard timeout of 60s. - it's time to call 'Shutdown' on the audit events since all in flight request(s) have drained. + + +The following is a sequence of shutdown events that we expect to see with + 'ShutdownSendRetryAfter' = true: + +T0: ShutdownInitiated: KILL signal received + - /readyz starts returning red + - run pre shutdown hooks + +T0+70s: AfterShutdownDelayDuration: shutdown delay duration has passed + - the default value of 'ShutdownDelayDuration' is '70s' + - the HTTP Server will continue to listen + - the apiserver is not accepting new request(s) + - it includes new request(s) on a new or an existing TCP connection + - new request(s) arriving after this point are replied with a 429 + and the response headers: 'Retry-After: 1` and 'Connection: close' + - note: these new request(s) will not show up in audit logs + +T0 + 70s + up to 60s: InFlightRequestsDrained: existing in flight requests have been drained + - long running requests are outside of this scope + - up to 60s: the default value of 'ShutdownTimeout' is 60s, this means that + any request in flight has a hard timeout of 60s. + - server.Shutdown is called, the HTTP Server stops listening immediately + - the HTTP Server waits gracefully for existing requests to complete + up to '2s' (it's hard coded right now) */ // lifecycleSignal encapsulates a named apiserver event diff --git a/staging/src/k8s.io/apiserver/pkg/server/options/server_run_options.go b/staging/src/k8s.io/apiserver/pkg/server/options/server_run_options.go index 9758eec11b1..07a887a5399 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/options/server_run_options.go +++ b/staging/src/k8s.io/apiserver/pkg/server/options/server_run_options.go @@ -54,6 +54,15 @@ type ServerRunOptions struct { // apiserver library can wire it to a flag. MaxRequestBodyBytes int64 EnablePriorityAndFairness bool + + // ShutdownSendRetryAfter dictates when to initiate shutdown of the HTTP + // Server during the graceful termination of the apiserver. If true, we wait + // for non longrunning requests in flight to be drained and then initiate a + // shutdown of the HTTP Server. If false, we initiate a shutdown of the HTTP + // Server as soon as ShutdownDelayDuration has elapsed. + // If enabled, after ShutdownDelayDuration elapses, any incoming request is + // rejected with a 429 status code and a 'Retry-After' response. + ShutdownSendRetryAfter bool } func NewServerRunOptions() *ServerRunOptions { @@ -68,6 +77,7 @@ func NewServerRunOptions() *ServerRunOptions { JSONPatchMaxCopyBytes: defaults.JSONPatchMaxCopyBytes, MaxRequestBodyBytes: defaults.MaxRequestBodyBytes, EnablePriorityAndFairness: true, + ShutdownSendRetryAfter: false, } } @@ -86,6 +96,7 @@ func (s *ServerRunOptions) ApplyTo(c *server.Config) error { c.JSONPatchMaxCopyBytes = s.JSONPatchMaxCopyBytes c.MaxRequestBodyBytes = s.MaxRequestBodyBytes c.PublicAddress = s.AdvertiseAddress + c.ShutdownSendRetryAfter = s.ShutdownSendRetryAfter return nil } @@ -245,5 +256,10 @@ func (s *ServerRunOptions) AddUniversalFlags(fs *pflag.FlagSet) { "will return success, but /readyz immediately returns failure. Graceful termination starts after this delay "+ "has elapsed. This can be used to allow load balancer to stop sending traffic to this server.") + fs.BoolVar(&s.ShutdownSendRetryAfter, "shutdown-send-retry-after", s.ShutdownSendRetryAfter, ""+ + "If true the HTTP Server will continue listening until all non long running request(s) in flight have been drained, "+ + "during this window all incoming requests will be rejected with a status code 429 and a 'Retry-After' response header, "+ + "in addition 'Connection: close' response header is set in order to tear down the TCP connection when idle.") + utilfeature.DefaultMutableFeatureGate.AddFlag(fs) }