diff --git a/staging/src/k8s.io/apiserver/pkg/server/config.go b/staging/src/k8s.io/apiserver/pkg/server/config.go index 7aaf873d4f9..f99d0ca1c51 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/config.go +++ b/staging/src/k8s.io/apiserver/pkg/server/config.go @@ -239,15 +239,6 @@ type Config struct { // rejected with a 429 status code and a 'Retry-After' response. ShutdownSendRetryAfter bool - // StartupSendRetryAfterUntilReady once set will reject incoming requests with - // a 429 status code and a 'Retry-After' response header until the apiserver - // hasn't fully initialized. - // This option ensures that the system stays consistent even when requests - // are received before the server has been initialized. - // In particular, it prevents child deletion in case of GC or/and orphaned - // content in case of the namespaces controller. - StartupSendRetryAfterUntilReady bool - //=========================================================================== // values below here are targets for removal //=========================================================================== @@ -481,46 +472,6 @@ func (c *Config) AddPostStartHookOrDie(name string, hook PostStartHookFunc) { } } -// shouldAddWithRetryAfterFilter returns an appropriate ShouldRespondWithRetryAfterFunc -// if the apiserver should respond with a Retry-After response header based on option -// 'shutdown-send-retry-after' or 'startup-send-retry-after-until-ready'. -func (c *Config) shouldAddWithRetryAfterFilter() genericfilters.ShouldRespondWithRetryAfterFunc { - if !(c.ShutdownSendRetryAfter || c.StartupSendRetryAfterUntilReady) { - return nil - } - - // follow lifecycle, avoiding go routines per request - const ( - startup int32 = iota - running - terminating - ) - state := startup - go func() { - <-c.lifecycleSignals.HasBeenReady.Signaled() - atomic.StoreInt32(&state, running) - <-c.lifecycleSignals.AfterShutdownDelayDuration.Signaled() - atomic.StoreInt32(&state, terminating) - }() - - return func() (*genericfilters.RetryAfterParams, bool) { - state := atomic.LoadInt32(&state) - switch { - case c.StartupSendRetryAfterUntilReady && state == startup: - return &genericfilters.RetryAfterParams{ - Message: "The apiserver hasn't been fully initialized yet, please try again later.", - }, true - case c.ShutdownSendRetryAfter && state == terminating: - return &genericfilters.RetryAfterParams{ - TearDownConnection: true, - Message: "The apiserver is shutting down, please try again later.", - }, true - default: - return nil, false - } - } -} - // Complete fills in any fields not set that are required to have valid data and can be derived // from other fields. If you're going to `ApplyOptions`, do that first. It's mutating the receiver. func (c *Config) Complete(informers informers.SharedInformerFactory) CompletedConfig { @@ -847,8 +798,8 @@ func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler { handler = genericapifilters.WithWarningRecorder(handler) handler = genericapifilters.WithCacheControl(handler) handler = genericfilters.WithHSTS(handler, c.HSTSDirectives) - if shouldRespondWithRetryAfterFn := c.shouldAddWithRetryAfterFilter(); shouldRespondWithRetryAfterFn != nil { - handler = genericfilters.WithRetryAfter(handler, shouldRespondWithRetryAfterFn) + if c.ShutdownSendRetryAfter { + handler = genericfilters.WithRetryAfter(handler, c.lifecycleSignals.AfterShutdownDelayDuration.Signaled()) } handler = genericfilters.WithHTTPLogging(handler) if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerTracing) { diff --git a/staging/src/k8s.io/apiserver/pkg/server/config_test.go b/staging/src/k8s.io/apiserver/pkg/server/config_test.go index 283b915fe85..b4be78f5d19 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/config_test.go +++ b/staging/src/k8s.io/apiserver/pkg/server/config_test.go @@ -37,7 +37,6 @@ import ( "k8s.io/apiserver/pkg/authentication/authenticator" "k8s.io/apiserver/pkg/authentication/user" "k8s.io/apiserver/pkg/endpoints/request" - genericfilters "k8s.io/apiserver/pkg/server/filters" "k8s.io/apiserver/pkg/server/healthz" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/fake" @@ -346,159 +345,6 @@ func TestAuthenticationAuditAnnotationsDefaultChain(t *testing.T) { } } -func TestShouldRespondWithRetryAfterFunc(t *testing.T) { - tests := []struct { - name string - config *Config - addWithRetryAfterFilterExpected bool - sendRetryAfterExpected bool - retryAfterParamsExpected *genericfilters.RetryAfterParams - }{ - { - name: "both shutdown-send-retry-after and startup-send-retry-after-until-ready are not enabled", - config: &Config{ - StartupSendRetryAfterUntilReady: false, - ShutdownSendRetryAfter: false, - }, - addWithRetryAfterFilterExpected: false, - sendRetryAfterExpected: false, - retryAfterParamsExpected: nil, - }, - { - name: "shutdown-send-retry-after is enabled, the apserver is shutting down", - config: func() *Config { - c := &Config{ - lifecycleSignals: newLifecycleSignals(), - StartupSendRetryAfterUntilReady: false, - ShutdownSendRetryAfter: true, - } - c.lifecycleSignals.HasBeenReady.Signal() - c.lifecycleSignals.AfterShutdownDelayDuration.Signal() - return c - }(), - addWithRetryAfterFilterExpected: true, - sendRetryAfterExpected: true, - retryAfterParamsExpected: &genericfilters.RetryAfterParams{ - TearDownConnection: true, - Message: "The apiserver is shutting down, please try again later.", - }, - }, - { - name: "shutdown-send-retry-after is enabled, the apserver is not in shutdown mode", - config: &Config{ - lifecycleSignals: newLifecycleSignals(), - StartupSendRetryAfterUntilReady: false, - ShutdownSendRetryAfter: true, - }, - addWithRetryAfterFilterExpected: true, - sendRetryAfterExpected: false, - retryAfterParamsExpected: nil, - }, - { - name: "startup-send-retry-after-until-ready is enabled, the apserver is not ready yet", - config: &Config{ - lifecycleSignals: newLifecycleSignals(), - StartupSendRetryAfterUntilReady: true, - ShutdownSendRetryAfter: false, - }, - addWithRetryAfterFilterExpected: true, - sendRetryAfterExpected: true, - retryAfterParamsExpected: &genericfilters.RetryAfterParams{ - TearDownConnection: false, - Message: "The apiserver hasn't been fully initialized yet, please try again later.", - }, - }, - { - name: "startup-send-retry-after-until-ready is enabled, the apserver is ready", - config: func() *Config { - c := &Config{ - lifecycleSignals: newLifecycleSignals(), - StartupSendRetryAfterUntilReady: true, - ShutdownSendRetryAfter: false, - } - c.lifecycleSignals.HasBeenReady.Signal() - return c - }(), - addWithRetryAfterFilterExpected: true, - sendRetryAfterExpected: false, - retryAfterParamsExpected: nil, - }, - { - name: "both shutdown-send-retry-after is enabled and startup-send-retry-after-until-ready are enabled, the apserver is not ready", - config: &Config{ - lifecycleSignals: newLifecycleSignals(), - StartupSendRetryAfterUntilReady: true, - ShutdownSendRetryAfter: true, - }, - addWithRetryAfterFilterExpected: true, - sendRetryAfterExpected: true, - retryAfterParamsExpected: &genericfilters.RetryAfterParams{ - TearDownConnection: false, - Message: "The apiserver hasn't been fully initialized yet, please try again later.", - }, - }, - { - name: "both shutdown-send-retry-after is enabled and startup-send-retry-after-until-ready are enabled, the apserver is ready", - config: func() *Config { - c := &Config{ - lifecycleSignals: newLifecycleSignals(), - StartupSendRetryAfterUntilReady: true, - ShutdownSendRetryAfter: true, - } - c.lifecycleSignals.HasBeenReady.Signal() - return c - }(), - addWithRetryAfterFilterExpected: true, - sendRetryAfterExpected: false, - retryAfterParamsExpected: nil, - }, - { - name: "both shutdown-send-retry-after is enabled and startup-send-retry-after-until-ready are enabled, the apserver is shutting down", - config: func() *Config { - c := &Config{ - lifecycleSignals: newLifecycleSignals(), - StartupSendRetryAfterUntilReady: true, - ShutdownSendRetryAfter: true, - } - c.lifecycleSignals.HasBeenReady.Signal() - c.lifecycleSignals.AfterShutdownDelayDuration.Signal() - return c - }(), - addWithRetryAfterFilterExpected: true, - sendRetryAfterExpected: true, - retryAfterParamsExpected: &genericfilters.RetryAfterParams{ - TearDownConnection: true, - Message: "The apiserver is shutting down, please try again later.", - }, - }, - } - - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - shouldRespondWithRetryAfterFn := test.config.shouldAddWithRetryAfterFilter() - - // we need to sleep some time to allow the goroutine launched by - // shouldAddWithRetryAfterFilter to finish - time.Sleep(100 * time.Millisecond) - - if test.addWithRetryAfterFilterExpected != (shouldRespondWithRetryAfterFn != nil) { - t.Errorf("Expected add WithRetryAfter: %t, but got: %t", test.addWithRetryAfterFilterExpected, shouldRespondWithRetryAfterFn != nil) - } - if !test.addWithRetryAfterFilterExpected { - return - } - - paramsGot, sendRetryAfterGot := shouldRespondWithRetryAfterFn() - if test.sendRetryAfterExpected != sendRetryAfterGot { - t.Errorf("Expected send retry-after: %t, but got: %t", test.sendRetryAfterExpected, sendRetryAfterGot) - } - if !reflect.DeepEqual(test.retryAfterParamsExpected, paramsGot) { - t.Errorf("Expected retry-after params to match, diff: %s", cmp.Diff(test.retryAfterParamsExpected, paramsGot)) - } - }) - } -} - type testBackend struct { events []*auditinternal.Event diff --git a/staging/src/k8s.io/apiserver/pkg/server/filters/with_retry_after.go b/staging/src/k8s.io/apiserver/pkg/server/filters/with_retry_after.go index c3c5cda1f12..c5e2daa8ed7 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/filters/with_retry_after.go +++ b/staging/src/k8s.io/apiserver/pkg/server/filters/with_retry_after.go @@ -36,8 +36,8 @@ var ( // with a Retry-After response, otherwise it returns false. type isRequestExemptFunc func(*http.Request) bool -// RetryAfterParams dictates how the Retry-After response is constructed -type RetryAfterParams struct { +// retryAfterParams dictates how the Retry-After response is constructed +type retryAfterParams struct { // TearDownConnection is true when we should send a 'Connection: close' // header in the response so net/http can tear down the TCP connection. TearDownConnection bool @@ -46,11 +46,11 @@ type RetryAfterParams struct { Message string } -// ShouldRespondWithRetryAfterFunc returns true if the requests should +// shouldRespondWithRetryAfterFunc returns true if the requests should // be rejected with a Retry-After response once certain conditions are met. -// The RetryAfterParams returned contains instructions on how to +// The retryAfterParams returned contains instructions on how to // construct the Retry-After response. -type ShouldRespondWithRetryAfterFunc func() (*RetryAfterParams, bool) +type shouldRespondWithRetryAfterFunc func() (*retryAfterParams, bool) // WithRetryAfter rejects any incoming new request(s) with a 429 // if the specified shutdownDelayDurationElapsedFn channel is closed @@ -62,13 +62,25 @@ type ShouldRespondWithRetryAfterFunc func() (*RetryAfterParams, bool) // - 'Connection: close': tear down the TCP connection // // TODO: is there a way to merge WithWaitGroup and this filter? -func WithRetryAfter(handler http.Handler, when ShouldRespondWithRetryAfterFunc) http.Handler { +func WithRetryAfter(handler http.Handler, shutdownDelayDurationElapsedCh <-chan struct{}) http.Handler { + shutdownRetryAfterParams := &retryAfterParams{ + TearDownConnection: true, + Message: "The apiserver is shutting down, please try again later.", + } + // NOTE: both WithRetryAfter and WithWaitGroup must use the same exact isRequestExemptFunc 'isRequestExemptFromRetryAfter, // otherwise SafeWaitGroup might wait indefinitely and will prevent the server from shutting down gracefully. - return withRetryAfter(handler, isRequestExemptFromRetryAfter, when) + return withRetryAfter(handler, isRequestExemptFromRetryAfter, func() (*retryAfterParams, bool) { + select { + case <-shutdownDelayDurationElapsedCh: + return shutdownRetryAfterParams, true + default: + return nil, false + } + }) } -func withRetryAfter(handler http.Handler, isRequestExemptFn isRequestExemptFunc, shouldRespondWithRetryAfterFn ShouldRespondWithRetryAfterFunc) http.Handler { +func withRetryAfter(handler http.Handler, isRequestExemptFn isRequestExemptFunc, shouldRespondWithRetryAfterFn shouldRespondWithRetryAfterFunc) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { params, send := shouldRespondWithRetryAfterFn() if !send || isRequestExemptFn(req) { diff --git a/staging/src/k8s.io/apiserver/pkg/server/filters/with_retry_after_test.go b/staging/src/k8s.io/apiserver/pkg/server/filters/with_retry_after_test.go index 0200e6b7ca0..cb2ec54f4c6 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/filters/with_retry_after_test.go +++ b/staging/src/k8s.io/apiserver/pkg/server/filters/with_retry_after_test.go @@ -27,20 +27,20 @@ import ( func TestWithRetryAfter(t *testing.T) { tests := []struct { - name string - when ShouldRespondWithRetryAfterFunc - requestURL string - userAgent string - safeWaitGroupIsWaiting bool - handlerInvoked int - closeExpected string - retryAfterExpected string - statusCodeExpected int + name string + shutdownDelayDurationElapsedFn func() <-chan struct{} + requestURL string + userAgent string + safeWaitGroupIsWaiting bool + handlerInvoked int + closeExpected string + retryAfterExpected string + statusCodeExpected int }{ { name: "retry-after disabled", - when: func() (*RetryAfterParams, bool) { - return nil, false + shutdownDelayDurationElapsedFn: func() <-chan struct{} { + return newChannel(false) }, requestURL: "/api/v1/namespaces", userAgent: "foo", @@ -51,11 +51,8 @@ func TestWithRetryAfter(t *testing.T) { }, { name: "retry-after enabled, request is not exempt", - when: func() (*RetryAfterParams, bool) { - return &RetryAfterParams{ - TearDownConnection: true, - Message: "The apiserver is shutting down, please try again later.", - }, true + shutdownDelayDurationElapsedFn: func() <-chan struct{} { + return newChannel(true) }, requestURL: "/api/v1/namespaces", userAgent: "foo", @@ -64,28 +61,10 @@ func TestWithRetryAfter(t *testing.T) { retryAfterExpected: "5", statusCodeExpected: http.StatusTooManyRequests, }, - { - name: "retry-after enabled, request is not exempt, no connection tear down", - when: func() (*RetryAfterParams, bool) { - return &RetryAfterParams{ - TearDownConnection: false, - Message: "The apiserver is shutting down, please try again later.", - }, true - }, - requestURL: "/api/v1/namespaces", - userAgent: "foo", - handlerInvoked: 0, - closeExpected: "", - retryAfterExpected: "5", - statusCodeExpected: http.StatusTooManyRequests, - }, { name: "retry-after enabled, request is exempt(/metrics)", - when: func() (*RetryAfterParams, bool) { - return &RetryAfterParams{ - TearDownConnection: true, - Message: "The apiserver is shutting down, please try again later.", - }, true + shutdownDelayDurationElapsedFn: func() <-chan struct{} { + return newChannel(true) }, requestURL: "/metrics?foo=bar", userAgent: "foo", @@ -96,11 +75,8 @@ func TestWithRetryAfter(t *testing.T) { }, { name: "retry-after enabled, request is exempt(/livez)", - when: func() (*RetryAfterParams, bool) { - return &RetryAfterParams{ - TearDownConnection: true, - Message: "The apiserver is shutting down, please try again later.", - }, true + shutdownDelayDurationElapsedFn: func() <-chan struct{} { + return newChannel(true) }, requestURL: "/livez?verbose", userAgent: "foo", @@ -111,11 +87,8 @@ func TestWithRetryAfter(t *testing.T) { }, { name: "retry-after enabled, request is exempt(/readyz)", - when: func() (*RetryAfterParams, bool) { - return &RetryAfterParams{ - TearDownConnection: true, - Message: "The apiserver is shutting down, please try again later.", - }, true + shutdownDelayDurationElapsedFn: func() <-chan struct{} { + return newChannel(true) }, requestURL: "/readyz?verbose", userAgent: "foo", @@ -126,11 +99,8 @@ func TestWithRetryAfter(t *testing.T) { }, { name: "retry-after enabled, request is exempt(/healthz)", - when: func() (*RetryAfterParams, bool) { - return &RetryAfterParams{ - TearDownConnection: true, - Message: "The apiserver is shutting down, please try again later.", - }, true + shutdownDelayDurationElapsedFn: func() <-chan struct{} { + return newChannel(true) }, requestURL: "/healthz?verbose", userAgent: "foo", @@ -141,11 +111,8 @@ func TestWithRetryAfter(t *testing.T) { }, { name: "retry-after enabled, request is exempt(local loopback)", - when: func() (*RetryAfterParams, bool) { - return &RetryAfterParams{ - TearDownConnection: true, - Message: "The apiserver is shutting down, please try again later.", - }, true + shutdownDelayDurationElapsedFn: func() <-chan struct{} { + return newChannel(true) }, requestURL: "/api/v1/namespaces", userAgent: "kube-apiserver/", @@ -154,13 +121,22 @@ func TestWithRetryAfter(t *testing.T) { retryAfterExpected: "", statusCodeExpected: http.StatusOK, }, + { + name: "nil channel", + shutdownDelayDurationElapsedFn: func() <-chan struct{} { + return nil + }, + requestURL: "/api/v1/namespaces", + userAgent: "foo", + handlerInvoked: 1, + closeExpected: "", + retryAfterExpected: "", + statusCodeExpected: http.StatusOK, + }, { name: "retry-after enabled, request is exempt(/readyz), SafeWaitGroup is in waiting mode", - when: func() (*RetryAfterParams, bool) { - return &RetryAfterParams{ - TearDownConnection: true, - Message: "The apiserver is shutting down, please try again later.", - }, true + shutdownDelayDurationElapsedFn: func() <-chan struct{} { + return newChannel(true) }, requestURL: "/readyz?verbose", userAgent: "foo", @@ -189,7 +165,7 @@ func TestWithRetryAfter(t *testing.T) { wrapped := WithWaitGroup(handler, func(*http.Request, *apirequest.RequestInfo) bool { return false }, safeWG) - wrapped = WithRetryAfter(wrapped, test.when) + wrapped = WithRetryAfter(wrapped, test.shutdownDelayDurationElapsedFn()) req, err := http.NewRequest(http.MethodGet, test.requestURL, nil) if err != nil { @@ -221,3 +197,11 @@ func TestWithRetryAfter(t *testing.T) { }) } } + +func newChannel(closed bool) <-chan struct{} { + ch := make(chan struct{}) + if closed { + close(ch) + } + return ch +} diff --git a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver_graceful_termination_test.go b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver_graceful_termination_test.go index d3cffbdac2b..d7956c9f64e 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver_graceful_termination_test.go +++ b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver_graceful_termination_test.go @@ -518,8 +518,8 @@ func newGenericAPIServer(t *testing.T, keepListening bool) *GenericAPIServer { config.ShutdownSendRetryAfter = keepListening config.BuildHandlerChainFunc = func(apiHandler http.Handler, c *Config) http.Handler { handler := genericfilters.WithWaitGroup(apiHandler, c.LongRunningFunc, c.HandlerChainWaitGroup) - if shouldRetryAfterFn := c.shouldAddWithRetryAfterFilter(); shouldRetryAfterFn != nil { - handler = genericfilters.WithRetryAfter(handler, shouldRetryAfterFn) + if c.ShutdownSendRetryAfter { + handler = genericfilters.WithRetryAfter(handler, c.lifecycleSignals.AfterShutdownDelayDuration.Signaled()) } handler = genericapifilters.WithRequestInfo(handler, c.RequestInfoResolver) return handler diff --git a/staging/src/k8s.io/apiserver/pkg/server/options/server_run_options.go b/staging/src/k8s.io/apiserver/pkg/server/options/server_run_options.go index aebabbd76e5..07a887a5399 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/options/server_run_options.go +++ b/staging/src/k8s.io/apiserver/pkg/server/options/server_run_options.go @@ -63,31 +63,21 @@ type ServerRunOptions struct { // If enabled, after ShutdownDelayDuration elapses, any incoming request is // rejected with a 429 status code and a 'Retry-After' response. ShutdownSendRetryAfter bool - - // StartupSendRetryAfterUntilReady once set will reject incoming requests with - // a 429 status code and a 'Retry-After' response header until the apiserver - // hasn't fully initialized. - // This option ensures that the system stays consistent even when requests - // are received before the server has been initialized. - // In particular, it prevents child deletion in case of GC or/and orphaned - // content in case of the namespaces controller. - StartupSendRetryAfterUntilReady bool } func NewServerRunOptions() *ServerRunOptions { defaults := server.NewConfig(serializer.CodecFactory{}) return &ServerRunOptions{ - MaxRequestsInFlight: defaults.MaxRequestsInFlight, - MaxMutatingRequestsInFlight: defaults.MaxMutatingRequestsInFlight, - RequestTimeout: defaults.RequestTimeout, - LivezGracePeriod: defaults.LivezGracePeriod, - MinRequestTimeout: defaults.MinRequestTimeout, - ShutdownDelayDuration: defaults.ShutdownDelayDuration, - JSONPatchMaxCopyBytes: defaults.JSONPatchMaxCopyBytes, - MaxRequestBodyBytes: defaults.MaxRequestBodyBytes, - EnablePriorityAndFairness: true, - ShutdownSendRetryAfter: false, - StartupSendRetryAfterUntilReady: false, + MaxRequestsInFlight: defaults.MaxRequestsInFlight, + MaxMutatingRequestsInFlight: defaults.MaxMutatingRequestsInFlight, + RequestTimeout: defaults.RequestTimeout, + LivezGracePeriod: defaults.LivezGracePeriod, + MinRequestTimeout: defaults.MinRequestTimeout, + ShutdownDelayDuration: defaults.ShutdownDelayDuration, + JSONPatchMaxCopyBytes: defaults.JSONPatchMaxCopyBytes, + MaxRequestBodyBytes: defaults.MaxRequestBodyBytes, + EnablePriorityAndFairness: true, + ShutdownSendRetryAfter: false, } } @@ -107,7 +97,6 @@ func (s *ServerRunOptions) ApplyTo(c *server.Config) error { c.MaxRequestBodyBytes = s.MaxRequestBodyBytes c.PublicAddress = s.AdvertiseAddress c.ShutdownSendRetryAfter = s.ShutdownSendRetryAfter - c.StartupSendRetryAfterUntilReady = s.StartupSendRetryAfterUntilReady return nil } @@ -272,10 +261,5 @@ func (s *ServerRunOptions) AddUniversalFlags(fs *pflag.FlagSet) { "during this window all incoming requests will be rejected with a status code 429 and a 'Retry-After' response header, "+ "in addition 'Connection: close' response header is set in order to tear down the TCP connection when idle.") - fs.BoolVar(&s.StartupSendRetryAfterUntilReady, "startup-send-retry-after-until-ready", s.ShutdownSendRetryAfter, ""+ - "If true, incoming request(s) will be rejected with a '429' status code and a 'Retry-After' response header "+ - "until the apiserver has initialized. This option ensures that the system stays consistent even when requests "+ - "arrive at the server before it has been initialized.") - utilfeature.DefaultMutableFeatureGate.AddFlag(fs) }