diff --git a/staging/src/k8s.io/apiserver/pkg/server/config.go b/staging/src/k8s.io/apiserver/pkg/server/config.go index d068c7e0c4c..0739a3baa94 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/config.go +++ b/staging/src/k8s.io/apiserver/pkg/server/config.go @@ -240,6 +240,15 @@ type Config struct { // rejected with a 429 status code and a 'Retry-After' response. ShutdownSendRetryAfter bool + // StartupSendRetryAfterUntilReady once set will reject incoming requests with + // a 429 status code and a 'Retry-After' response header until the apiserver + // hasn't fully initialized. + // This option ensures that the system stays consistent even when requests + // are received before the server has been initialized. + // In particular, it prevents child deletion in case of GC or/and orphaned + // content in case of the namespaces controller. + StartupSendRetryAfterUntilReady bool + //=========================================================================== // values below here are targets for removal //=========================================================================== @@ -473,6 +482,46 @@ func (c *Config) AddPostStartHookOrDie(name string, hook PostStartHookFunc) { } } +// shouldAddWithRetryAfterFilter returns an appropriate ShouldRespondWithRetryAfterFunc +// if the apiserver should respond with a Retry-After response header based on option +// 'shutdown-send-retry-after' or 'startup-send-retry-after-until-ready'. +func (c *Config) shouldAddWithRetryAfterFilter() genericfilters.ShouldRespondWithRetryAfterFunc { + if !(c.ShutdownSendRetryAfter || c.StartupSendRetryAfterUntilReady) { + return nil + } + + // follow lifecycle, avoiding go routines per request + const ( + startup int32 = iota + running + terminating + ) + state := startup + go func() { + <-c.lifecycleSignals.HasBeenReady.Signaled() + atomic.StoreInt32(&state, running) + <-c.lifecycleSignals.AfterShutdownDelayDuration.Signaled() + atomic.StoreInt32(&state, terminating) + }() + + return func() (*genericfilters.RetryAfterParams, bool) { + state := atomic.LoadInt32(&state) + switch { + case c.StartupSendRetryAfterUntilReady && state == startup: + return &genericfilters.RetryAfterParams{ + Message: "The apiserver hasn't been fully initialized yet, please try again later.", + }, true + case c.ShutdownSendRetryAfter && state == terminating: + return &genericfilters.RetryAfterParams{ + TearDownConnection: true, + Message: "The apiserver is shutting down, please try again later.", + }, true + default: + return nil, false + } + } +} + // Complete fills in any fields not set that are required to have valid data and can be derived // from other fields. If you're going to `ApplyOptions`, do that first. It's mutating the receiver. func (c *Config) Complete(informers informers.SharedInformerFactory) CompletedConfig { @@ -799,8 +848,8 @@ func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler { handler = genericapifilters.WithWarningRecorder(handler) handler = genericapifilters.WithCacheControl(handler) handler = genericfilters.WithHSTS(handler, c.HSTSDirectives) - if c.ShutdownSendRetryAfter { - handler = genericfilters.WithRetryAfter(handler, c.lifecycleSignals.AfterShutdownDelayDuration.Signaled()) + if shouldRespondWithRetryAfterFn := c.shouldAddWithRetryAfterFilter(); shouldRespondWithRetryAfterFn != nil { + handler = genericfilters.WithRetryAfter(handler, shouldRespondWithRetryAfterFn) } handler = genericfilters.WithHTTPLogging(handler) if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerTracing) { diff --git a/staging/src/k8s.io/apiserver/pkg/server/config_test.go b/staging/src/k8s.io/apiserver/pkg/server/config_test.go index 532cf49d3d4..e60f4a51603 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/config_test.go +++ b/staging/src/k8s.io/apiserver/pkg/server/config_test.go @@ -38,6 +38,7 @@ import ( "k8s.io/apiserver/pkg/authentication/authenticator" "k8s.io/apiserver/pkg/authentication/user" "k8s.io/apiserver/pkg/endpoints/request" + genericfilters "k8s.io/apiserver/pkg/server/filters" "k8s.io/apiserver/pkg/server/healthz" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/fake" @@ -345,6 +346,159 @@ func TestAuthenticationAuditAnnotationsDefaultChain(t *testing.T) { } } +func TestShouldRespondWithRetryAfterFunc(t *testing.T) { + tests := []struct { + name string + config *Config + addWithRetryAfterFilterExpected bool + sendRetryAfterExpected bool + retryAfterParamsExpected *genericfilters.RetryAfterParams + }{ + { + name: "both shutdown-send-retry-after and startup-send-retry-after-until-ready are not enabled", + config: &Config{ + StartupSendRetryAfterUntilReady: false, + ShutdownSendRetryAfter: false, + }, + addWithRetryAfterFilterExpected: false, + sendRetryAfterExpected: false, + retryAfterParamsExpected: nil, + }, + { + name: "shutdown-send-retry-after is enabled, the apserver is shutting down", + config: func() *Config { + c := &Config{ + lifecycleSignals: newLifecycleSignals(), + StartupSendRetryAfterUntilReady: false, + ShutdownSendRetryAfter: true, + } + c.lifecycleSignals.HasBeenReady.Signal() + c.lifecycleSignals.AfterShutdownDelayDuration.Signal() + return c + }(), + addWithRetryAfterFilterExpected: true, + sendRetryAfterExpected: true, + retryAfterParamsExpected: &genericfilters.RetryAfterParams{ + TearDownConnection: true, + Message: "The apiserver is shutting down, please try again later.", + }, + }, + { + name: "shutdown-send-retry-after is enabled, the apserver is not in shutdown mode", + config: &Config{ + lifecycleSignals: newLifecycleSignals(), + StartupSendRetryAfterUntilReady: false, + ShutdownSendRetryAfter: true, + }, + addWithRetryAfterFilterExpected: true, + sendRetryAfterExpected: false, + retryAfterParamsExpected: nil, + }, + { + name: "startup-send-retry-after-until-ready is enabled, the apserver is not ready yet", + config: &Config{ + lifecycleSignals: newLifecycleSignals(), + StartupSendRetryAfterUntilReady: true, + ShutdownSendRetryAfter: false, + }, + addWithRetryAfterFilterExpected: true, + sendRetryAfterExpected: true, + retryAfterParamsExpected: &genericfilters.RetryAfterParams{ + TearDownConnection: false, + Message: "The apiserver hasn't been fully initialized yet, please try again later.", + }, + }, + { + name: "startup-send-retry-after-until-ready is enabled, the apserver is ready", + config: func() *Config { + c := &Config{ + lifecycleSignals: newLifecycleSignals(), + StartupSendRetryAfterUntilReady: true, + ShutdownSendRetryAfter: false, + } + c.lifecycleSignals.HasBeenReady.Signal() + return c + }(), + addWithRetryAfterFilterExpected: true, + sendRetryAfterExpected: false, + retryAfterParamsExpected: nil, + }, + { + name: "both shutdown-send-retry-after is enabled and startup-send-retry-after-until-ready are enabled, the apserver is not ready", + config: &Config{ + lifecycleSignals: newLifecycleSignals(), + StartupSendRetryAfterUntilReady: true, + ShutdownSendRetryAfter: true, + }, + addWithRetryAfterFilterExpected: true, + sendRetryAfterExpected: true, + retryAfterParamsExpected: &genericfilters.RetryAfterParams{ + TearDownConnection: false, + Message: "The apiserver hasn't been fully initialized yet, please try again later.", + }, + }, + { + name: "both shutdown-send-retry-after is enabled and startup-send-retry-after-until-ready are enabled, the apserver is ready", + config: func() *Config { + c := &Config{ + lifecycleSignals: newLifecycleSignals(), + StartupSendRetryAfterUntilReady: true, + ShutdownSendRetryAfter: true, + } + c.lifecycleSignals.HasBeenReady.Signal() + return c + }(), + addWithRetryAfterFilterExpected: true, + sendRetryAfterExpected: false, + retryAfterParamsExpected: nil, + }, + { + name: "both shutdown-send-retry-after is enabled and startup-send-retry-after-until-ready are enabled, the apserver is shutting down", + config: func() *Config { + c := &Config{ + lifecycleSignals: newLifecycleSignals(), + StartupSendRetryAfterUntilReady: true, + ShutdownSendRetryAfter: true, + } + c.lifecycleSignals.HasBeenReady.Signal() + c.lifecycleSignals.AfterShutdownDelayDuration.Signal() + return c + }(), + addWithRetryAfterFilterExpected: true, + sendRetryAfterExpected: true, + retryAfterParamsExpected: &genericfilters.RetryAfterParams{ + TearDownConnection: true, + Message: "The apiserver is shutting down, please try again later.", + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + shouldRespondWithRetryAfterFn := test.config.shouldAddWithRetryAfterFilter() + + // we need to sleep some time to allow the goroutine launched by + // shouldAddWithRetryAfterFilter to finish + time.Sleep(100 * time.Millisecond) + + if test.addWithRetryAfterFilterExpected != (shouldRespondWithRetryAfterFn != nil) { + t.Errorf("Expected add WithRetryAfter: %t, but got: %t", test.addWithRetryAfterFilterExpected, shouldRespondWithRetryAfterFn != nil) + } + if !test.addWithRetryAfterFilterExpected { + return + } + + paramsGot, sendRetryAfterGot := shouldRespondWithRetryAfterFn() + if test.sendRetryAfterExpected != sendRetryAfterGot { + t.Errorf("Expected send retry-after: %t, but got: %t", test.sendRetryAfterExpected, sendRetryAfterGot) + } + if !reflect.DeepEqual(test.retryAfterParamsExpected, paramsGot) { + t.Errorf("Expected retry-after params to match, diff: %s", cmp.Diff(test.retryAfterParamsExpected, paramsGot)) + } + }) + } +} + type testBackend struct { events []*auditinternal.Event diff --git a/staging/src/k8s.io/apiserver/pkg/server/filters/with_retry_after.go b/staging/src/k8s.io/apiserver/pkg/server/filters/with_retry_after.go index c5e2daa8ed7..c3c5cda1f12 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/filters/with_retry_after.go +++ b/staging/src/k8s.io/apiserver/pkg/server/filters/with_retry_after.go @@ -36,8 +36,8 @@ var ( // with a Retry-After response, otherwise it returns false. type isRequestExemptFunc func(*http.Request) bool -// retryAfterParams dictates how the Retry-After response is constructed -type retryAfterParams struct { +// RetryAfterParams dictates how the Retry-After response is constructed +type RetryAfterParams struct { // TearDownConnection is true when we should send a 'Connection: close' // header in the response so net/http can tear down the TCP connection. TearDownConnection bool @@ -46,11 +46,11 @@ type retryAfterParams struct { Message string } -// shouldRespondWithRetryAfterFunc returns true if the requests should +// ShouldRespondWithRetryAfterFunc returns true if the requests should // be rejected with a Retry-After response once certain conditions are met. -// The retryAfterParams returned contains instructions on how to +// The RetryAfterParams returned contains instructions on how to // construct the Retry-After response. -type shouldRespondWithRetryAfterFunc func() (*retryAfterParams, bool) +type ShouldRespondWithRetryAfterFunc func() (*RetryAfterParams, bool) // WithRetryAfter rejects any incoming new request(s) with a 429 // if the specified shutdownDelayDurationElapsedFn channel is closed @@ -62,25 +62,13 @@ type shouldRespondWithRetryAfterFunc func() (*retryAfterParams, bool) // - 'Connection: close': tear down the TCP connection // // TODO: is there a way to merge WithWaitGroup and this filter? -func WithRetryAfter(handler http.Handler, shutdownDelayDurationElapsedCh <-chan struct{}) http.Handler { - shutdownRetryAfterParams := &retryAfterParams{ - TearDownConnection: true, - Message: "The apiserver is shutting down, please try again later.", - } - +func WithRetryAfter(handler http.Handler, when ShouldRespondWithRetryAfterFunc) http.Handler { // NOTE: both WithRetryAfter and WithWaitGroup must use the same exact isRequestExemptFunc 'isRequestExemptFromRetryAfter, // otherwise SafeWaitGroup might wait indefinitely and will prevent the server from shutting down gracefully. - return withRetryAfter(handler, isRequestExemptFromRetryAfter, func() (*retryAfterParams, bool) { - select { - case <-shutdownDelayDurationElapsedCh: - return shutdownRetryAfterParams, true - default: - return nil, false - } - }) + return withRetryAfter(handler, isRequestExemptFromRetryAfter, when) } -func withRetryAfter(handler http.Handler, isRequestExemptFn isRequestExemptFunc, shouldRespondWithRetryAfterFn shouldRespondWithRetryAfterFunc) http.Handler { +func withRetryAfter(handler http.Handler, isRequestExemptFn isRequestExemptFunc, shouldRespondWithRetryAfterFn ShouldRespondWithRetryAfterFunc) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { params, send := shouldRespondWithRetryAfterFn() if !send || isRequestExemptFn(req) { diff --git a/staging/src/k8s.io/apiserver/pkg/server/filters/with_retry_after_test.go b/staging/src/k8s.io/apiserver/pkg/server/filters/with_retry_after_test.go index cb2ec54f4c6..0200e6b7ca0 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/filters/with_retry_after_test.go +++ b/staging/src/k8s.io/apiserver/pkg/server/filters/with_retry_after_test.go @@ -27,20 +27,20 @@ import ( func TestWithRetryAfter(t *testing.T) { tests := []struct { - name string - shutdownDelayDurationElapsedFn func() <-chan struct{} - requestURL string - userAgent string - safeWaitGroupIsWaiting bool - handlerInvoked int - closeExpected string - retryAfterExpected string - statusCodeExpected int + name string + when ShouldRespondWithRetryAfterFunc + requestURL string + userAgent string + safeWaitGroupIsWaiting bool + handlerInvoked int + closeExpected string + retryAfterExpected string + statusCodeExpected int }{ { name: "retry-after disabled", - shutdownDelayDurationElapsedFn: func() <-chan struct{} { - return newChannel(false) + when: func() (*RetryAfterParams, bool) { + return nil, false }, requestURL: "/api/v1/namespaces", userAgent: "foo", @@ -51,8 +51,11 @@ func TestWithRetryAfter(t *testing.T) { }, { name: "retry-after enabled, request is not exempt", - shutdownDelayDurationElapsedFn: func() <-chan struct{} { - return newChannel(true) + when: func() (*RetryAfterParams, bool) { + return &RetryAfterParams{ + TearDownConnection: true, + Message: "The apiserver is shutting down, please try again later.", + }, true }, requestURL: "/api/v1/namespaces", userAgent: "foo", @@ -61,10 +64,28 @@ func TestWithRetryAfter(t *testing.T) { retryAfterExpected: "5", statusCodeExpected: http.StatusTooManyRequests, }, + { + name: "retry-after enabled, request is not exempt, no connection tear down", + when: func() (*RetryAfterParams, bool) { + return &RetryAfterParams{ + TearDownConnection: false, + Message: "The apiserver is shutting down, please try again later.", + }, true + }, + requestURL: "/api/v1/namespaces", + userAgent: "foo", + handlerInvoked: 0, + closeExpected: "", + retryAfterExpected: "5", + statusCodeExpected: http.StatusTooManyRequests, + }, { name: "retry-after enabled, request is exempt(/metrics)", - shutdownDelayDurationElapsedFn: func() <-chan struct{} { - return newChannel(true) + when: func() (*RetryAfterParams, bool) { + return &RetryAfterParams{ + TearDownConnection: true, + Message: "The apiserver is shutting down, please try again later.", + }, true }, requestURL: "/metrics?foo=bar", userAgent: "foo", @@ -75,8 +96,11 @@ func TestWithRetryAfter(t *testing.T) { }, { name: "retry-after enabled, request is exempt(/livez)", - shutdownDelayDurationElapsedFn: func() <-chan struct{} { - return newChannel(true) + when: func() (*RetryAfterParams, bool) { + return &RetryAfterParams{ + TearDownConnection: true, + Message: "The apiserver is shutting down, please try again later.", + }, true }, requestURL: "/livez?verbose", userAgent: "foo", @@ -87,8 +111,11 @@ func TestWithRetryAfter(t *testing.T) { }, { name: "retry-after enabled, request is exempt(/readyz)", - shutdownDelayDurationElapsedFn: func() <-chan struct{} { - return newChannel(true) + when: func() (*RetryAfterParams, bool) { + return &RetryAfterParams{ + TearDownConnection: true, + Message: "The apiserver is shutting down, please try again later.", + }, true }, requestURL: "/readyz?verbose", userAgent: "foo", @@ -99,8 +126,11 @@ func TestWithRetryAfter(t *testing.T) { }, { name: "retry-after enabled, request is exempt(/healthz)", - shutdownDelayDurationElapsedFn: func() <-chan struct{} { - return newChannel(true) + when: func() (*RetryAfterParams, bool) { + return &RetryAfterParams{ + TearDownConnection: true, + Message: "The apiserver is shutting down, please try again later.", + }, true }, requestURL: "/healthz?verbose", userAgent: "foo", @@ -111,8 +141,11 @@ func TestWithRetryAfter(t *testing.T) { }, { name: "retry-after enabled, request is exempt(local loopback)", - shutdownDelayDurationElapsedFn: func() <-chan struct{} { - return newChannel(true) + when: func() (*RetryAfterParams, bool) { + return &RetryAfterParams{ + TearDownConnection: true, + Message: "The apiserver is shutting down, please try again later.", + }, true }, requestURL: "/api/v1/namespaces", userAgent: "kube-apiserver/", @@ -121,22 +154,13 @@ func TestWithRetryAfter(t *testing.T) { retryAfterExpected: "", statusCodeExpected: http.StatusOK, }, - { - name: "nil channel", - shutdownDelayDurationElapsedFn: func() <-chan struct{} { - return nil - }, - requestURL: "/api/v1/namespaces", - userAgent: "foo", - handlerInvoked: 1, - closeExpected: "", - retryAfterExpected: "", - statusCodeExpected: http.StatusOK, - }, { name: "retry-after enabled, request is exempt(/readyz), SafeWaitGroup is in waiting mode", - shutdownDelayDurationElapsedFn: func() <-chan struct{} { - return newChannel(true) + when: func() (*RetryAfterParams, bool) { + return &RetryAfterParams{ + TearDownConnection: true, + Message: "The apiserver is shutting down, please try again later.", + }, true }, requestURL: "/readyz?verbose", userAgent: "foo", @@ -165,7 +189,7 @@ func TestWithRetryAfter(t *testing.T) { wrapped := WithWaitGroup(handler, func(*http.Request, *apirequest.RequestInfo) bool { return false }, safeWG) - wrapped = WithRetryAfter(wrapped, test.shutdownDelayDurationElapsedFn()) + wrapped = WithRetryAfter(wrapped, test.when) req, err := http.NewRequest(http.MethodGet, test.requestURL, nil) if err != nil { @@ -197,11 +221,3 @@ func TestWithRetryAfter(t *testing.T) { }) } } - -func newChannel(closed bool) <-chan struct{} { - ch := make(chan struct{}) - if closed { - close(ch) - } - return ch -} diff --git a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver_graceful_termination_test.go b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver_graceful_termination_test.go index d7956c9f64e..d3cffbdac2b 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver_graceful_termination_test.go +++ b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver_graceful_termination_test.go @@ -518,8 +518,8 @@ func newGenericAPIServer(t *testing.T, keepListening bool) *GenericAPIServer { config.ShutdownSendRetryAfter = keepListening config.BuildHandlerChainFunc = func(apiHandler http.Handler, c *Config) http.Handler { handler := genericfilters.WithWaitGroup(apiHandler, c.LongRunningFunc, c.HandlerChainWaitGroup) - if c.ShutdownSendRetryAfter { - handler = genericfilters.WithRetryAfter(handler, c.lifecycleSignals.AfterShutdownDelayDuration.Signaled()) + if shouldRetryAfterFn := c.shouldAddWithRetryAfterFilter(); shouldRetryAfterFn != nil { + handler = genericfilters.WithRetryAfter(handler, shouldRetryAfterFn) } handler = genericapifilters.WithRequestInfo(handler, c.RequestInfoResolver) return handler diff --git a/staging/src/k8s.io/apiserver/pkg/server/options/server_run_options.go b/staging/src/k8s.io/apiserver/pkg/server/options/server_run_options.go index 07a887a5399..aebabbd76e5 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/options/server_run_options.go +++ b/staging/src/k8s.io/apiserver/pkg/server/options/server_run_options.go @@ -63,21 +63,31 @@ type ServerRunOptions struct { // If enabled, after ShutdownDelayDuration elapses, any incoming request is // rejected with a 429 status code and a 'Retry-After' response. ShutdownSendRetryAfter bool + + // StartupSendRetryAfterUntilReady once set will reject incoming requests with + // a 429 status code and a 'Retry-After' response header until the apiserver + // hasn't fully initialized. + // This option ensures that the system stays consistent even when requests + // are received before the server has been initialized. + // In particular, it prevents child deletion in case of GC or/and orphaned + // content in case of the namespaces controller. + StartupSendRetryAfterUntilReady bool } func NewServerRunOptions() *ServerRunOptions { defaults := server.NewConfig(serializer.CodecFactory{}) return &ServerRunOptions{ - MaxRequestsInFlight: defaults.MaxRequestsInFlight, - MaxMutatingRequestsInFlight: defaults.MaxMutatingRequestsInFlight, - RequestTimeout: defaults.RequestTimeout, - LivezGracePeriod: defaults.LivezGracePeriod, - MinRequestTimeout: defaults.MinRequestTimeout, - ShutdownDelayDuration: defaults.ShutdownDelayDuration, - JSONPatchMaxCopyBytes: defaults.JSONPatchMaxCopyBytes, - MaxRequestBodyBytes: defaults.MaxRequestBodyBytes, - EnablePriorityAndFairness: true, - ShutdownSendRetryAfter: false, + MaxRequestsInFlight: defaults.MaxRequestsInFlight, + MaxMutatingRequestsInFlight: defaults.MaxMutatingRequestsInFlight, + RequestTimeout: defaults.RequestTimeout, + LivezGracePeriod: defaults.LivezGracePeriod, + MinRequestTimeout: defaults.MinRequestTimeout, + ShutdownDelayDuration: defaults.ShutdownDelayDuration, + JSONPatchMaxCopyBytes: defaults.JSONPatchMaxCopyBytes, + MaxRequestBodyBytes: defaults.MaxRequestBodyBytes, + EnablePriorityAndFairness: true, + ShutdownSendRetryAfter: false, + StartupSendRetryAfterUntilReady: false, } } @@ -97,6 +107,7 @@ func (s *ServerRunOptions) ApplyTo(c *server.Config) error { c.MaxRequestBodyBytes = s.MaxRequestBodyBytes c.PublicAddress = s.AdvertiseAddress c.ShutdownSendRetryAfter = s.ShutdownSendRetryAfter + c.StartupSendRetryAfterUntilReady = s.StartupSendRetryAfterUntilReady return nil } @@ -261,5 +272,10 @@ func (s *ServerRunOptions) AddUniversalFlags(fs *pflag.FlagSet) { "during this window all incoming requests will be rejected with a status code 429 and a 'Retry-After' response header, "+ "in addition 'Connection: close' response header is set in order to tear down the TCP connection when idle.") + fs.BoolVar(&s.StartupSendRetryAfterUntilReady, "startup-send-retry-after-until-ready", s.ShutdownSendRetryAfter, ""+ + "If true, incoming request(s) will be rejected with a '429' status code and a 'Retry-After' response header "+ + "until the apiserver has initialized. This option ensures that the system stays consistent even when requests "+ + "arrive at the server before it has been initialized.") + utilfeature.DefaultMutableFeatureGate.AddFlag(fs) }