diff --git a/staging/src/k8s.io/apiserver/pkg/server/config.go b/staging/src/k8s.io/apiserver/pkg/server/config.go index d068c7e0c4c..df5566bc5bf 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/config.go +++ b/staging/src/k8s.io/apiserver/pkg/server/config.go @@ -800,7 +800,8 @@ func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler { handler = genericapifilters.WithCacheControl(handler) handler = genericfilters.WithHSTS(handler, c.HSTSDirectives) if c.ShutdownSendRetryAfter { - handler = genericfilters.WithRetryAfter(handler, c.lifecycleSignals.AfterShutdownDelayDuration.Signaled()) + shouldRetryAfterFn := genericfilters.NewShouldRespondWithRetryAfterFunc(c.ShutdownSendRetryAfter, c.lifecycleSignals.AfterShutdownDelayDuration.Signaled()) + handler = genericfilters.WithRetryAfter(handler, shouldRetryAfterFn) } handler = genericfilters.WithHTTPLogging(handler) if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerTracing) { diff --git a/staging/src/k8s.io/apiserver/pkg/server/filters/with_retry_after.go b/staging/src/k8s.io/apiserver/pkg/server/filters/with_retry_after.go index c5e2daa8ed7..7b2343f07f5 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/filters/with_retry_after.go +++ b/staging/src/k8s.io/apiserver/pkg/server/filters/with_retry_after.go @@ -46,11 +46,11 @@ type retryAfterParams struct { Message string } -// shouldRespondWithRetryAfterFunc returns true if the requests should +// ShouldRespondWithRetryAfterFunc returns true if the requests should // be rejected with a Retry-After response once certain conditions are met. // The retryAfterParams returned contains instructions on how to // construct the Retry-After response. -type shouldRespondWithRetryAfterFunc func() (*retryAfterParams, bool) +type ShouldRespondWithRetryAfterFunc func() (*retryAfterParams, bool) // WithRetryAfter rejects any incoming new request(s) with a 429 // if the specified shutdownDelayDurationElapsedFn channel is closed @@ -62,25 +62,40 @@ type shouldRespondWithRetryAfterFunc func() (*retryAfterParams, bool) // - 'Connection: close': tear down the TCP connection // // TODO: is there a way to merge WithWaitGroup and this filter? -func WithRetryAfter(handler http.Handler, shutdownDelayDurationElapsedCh <-chan struct{}) http.Handler { +func WithRetryAfter(handler http.Handler, when ShouldRespondWithRetryAfterFunc) http.Handler { + // NOTE: both WithRetryAfter and WithWaitGroup must use the same exact isRequestExemptFunc 'isRequestExemptFromRetryAfter, + // otherwise SafeWaitGroup might wait indefinitely and will prevent the server from shutting down gracefully. + return withRetryAfter(handler, isRequestExemptFromRetryAfter, when) +} + +// NewShouldRespondWithRetryAfterFunc returns a ShouldRespondWithRetryAfterFunc +func NewShouldRespondWithRetryAfterFunc(shutdownSendRetryAfter bool, shutdownCh <-chan struct{}) ShouldRespondWithRetryAfterFunc { + if !shutdownSendRetryAfter { + return func() (*retryAfterParams, bool) { + return nil, false + } + } + + return newShutdownRetryAfterFunc(shutdownCh) +} + +func newShutdownRetryAfterFunc(shutdownCh <-chan struct{}) ShouldRespondWithRetryAfterFunc { shutdownRetryAfterParams := &retryAfterParams{ TearDownConnection: true, Message: "The apiserver is shutting down, please try again later.", } - // NOTE: both WithRetryAfter and WithWaitGroup must use the same exact isRequestExemptFunc 'isRequestExemptFromRetryAfter, - // otherwise SafeWaitGroup might wait indefinitely and will prevent the server from shutting down gracefully. - return withRetryAfter(handler, isRequestExemptFromRetryAfter, func() (*retryAfterParams, bool) { + return func() (*retryAfterParams, bool) { select { - case <-shutdownDelayDurationElapsedCh: + case <-shutdownCh: return shutdownRetryAfterParams, true default: return nil, false } - }) + } } -func withRetryAfter(handler http.Handler, isRequestExemptFn isRequestExemptFunc, shouldRespondWithRetryAfterFn shouldRespondWithRetryAfterFunc) http.Handler { +func withRetryAfter(handler http.Handler, isRequestExemptFn isRequestExemptFunc, shouldRespondWithRetryAfterFn ShouldRespondWithRetryAfterFunc) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { params, send := shouldRespondWithRetryAfterFn() if !send || isRequestExemptFn(req) { diff --git a/staging/src/k8s.io/apiserver/pkg/server/filters/with_retry_after_test.go b/staging/src/k8s.io/apiserver/pkg/server/filters/with_retry_after_test.go index cb2ec54f4c6..08b1e080974 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/filters/with_retry_after_test.go +++ b/staging/src/k8s.io/apiserver/pkg/server/filters/with_retry_after_test.go @@ -17,8 +17,10 @@ limitations under the License. package filters import ( + "github.com/google/go-cmp/cmp" "net/http" "net/http/httptest" + "reflect" "testing" utilwaitgroup "k8s.io/apimachinery/pkg/util/waitgroup" @@ -27,20 +29,20 @@ import ( func TestWithRetryAfter(t *testing.T) { tests := []struct { - name string - shutdownDelayDurationElapsedFn func() <-chan struct{} - requestURL string - userAgent string - safeWaitGroupIsWaiting bool - handlerInvoked int - closeExpected string - retryAfterExpected string - statusCodeExpected int + name string + when ShouldRespondWithRetryAfterFunc + requestURL string + userAgent string + safeWaitGroupIsWaiting bool + handlerInvoked int + closeExpected string + retryAfterExpected string + statusCodeExpected int }{ { name: "retry-after disabled", - shutdownDelayDurationElapsedFn: func() <-chan struct{} { - return newChannel(false) + when: func() (*retryAfterParams, bool) { + return nil, false }, requestURL: "/api/v1/namespaces", userAgent: "foo", @@ -51,8 +53,11 @@ func TestWithRetryAfter(t *testing.T) { }, { name: "retry-after enabled, request is not exempt", - shutdownDelayDurationElapsedFn: func() <-chan struct{} { - return newChannel(true) + when: func() (*retryAfterParams, bool) { + return &retryAfterParams{ + TearDownConnection: true, + Message: "The apiserver is shutting down, please try again later.", + }, true }, requestURL: "/api/v1/namespaces", userAgent: "foo", @@ -61,10 +66,28 @@ func TestWithRetryAfter(t *testing.T) { retryAfterExpected: "5", statusCodeExpected: http.StatusTooManyRequests, }, + { + name: "retry-after enabled, request is not exempt, no connection tear down", + when: func() (*retryAfterParams, bool) { + return &retryAfterParams{ + TearDownConnection: false, + Message: "The apiserver is shutting down, please try again later.", + }, true + }, + requestURL: "/api/v1/namespaces", + userAgent: "foo", + handlerInvoked: 0, + closeExpected: "", + retryAfterExpected: "5", + statusCodeExpected: http.StatusTooManyRequests, + }, { name: "retry-after enabled, request is exempt(/metrics)", - shutdownDelayDurationElapsedFn: func() <-chan struct{} { - return newChannel(true) + when: func() (*retryAfterParams, bool) { + return &retryAfterParams{ + TearDownConnection: true, + Message: "The apiserver is shutting down, please try again later.", + }, true }, requestURL: "/metrics?foo=bar", userAgent: "foo", @@ -75,8 +98,11 @@ func TestWithRetryAfter(t *testing.T) { }, { name: "retry-after enabled, request is exempt(/livez)", - shutdownDelayDurationElapsedFn: func() <-chan struct{} { - return newChannel(true) + when: func() (*retryAfterParams, bool) { + return &retryAfterParams{ + TearDownConnection: true, + Message: "The apiserver is shutting down, please try again later.", + }, true }, requestURL: "/livez?verbose", userAgent: "foo", @@ -87,8 +113,11 @@ func TestWithRetryAfter(t *testing.T) { }, { name: "retry-after enabled, request is exempt(/readyz)", - shutdownDelayDurationElapsedFn: func() <-chan struct{} { - return newChannel(true) + when: func() (*retryAfterParams, bool) { + return &retryAfterParams{ + TearDownConnection: true, + Message: "The apiserver is shutting down, please try again later.", + }, true }, requestURL: "/readyz?verbose", userAgent: "foo", @@ -99,8 +128,11 @@ func TestWithRetryAfter(t *testing.T) { }, { name: "retry-after enabled, request is exempt(/healthz)", - shutdownDelayDurationElapsedFn: func() <-chan struct{} { - return newChannel(true) + when: func() (*retryAfterParams, bool) { + return &retryAfterParams{ + TearDownConnection: true, + Message: "The apiserver is shutting down, please try again later.", + }, true }, requestURL: "/healthz?verbose", userAgent: "foo", @@ -111,8 +143,11 @@ func TestWithRetryAfter(t *testing.T) { }, { name: "retry-after enabled, request is exempt(local loopback)", - shutdownDelayDurationElapsedFn: func() <-chan struct{} { - return newChannel(true) + when: func() (*retryAfterParams, bool) { + return &retryAfterParams{ + TearDownConnection: true, + Message: "The apiserver is shutting down, please try again later.", + }, true }, requestURL: "/api/v1/namespaces", userAgent: "kube-apiserver/", @@ -121,22 +156,13 @@ func TestWithRetryAfter(t *testing.T) { retryAfterExpected: "", statusCodeExpected: http.StatusOK, }, - { - name: "nil channel", - shutdownDelayDurationElapsedFn: func() <-chan struct{} { - return nil - }, - requestURL: "/api/v1/namespaces", - userAgent: "foo", - handlerInvoked: 1, - closeExpected: "", - retryAfterExpected: "", - statusCodeExpected: http.StatusOK, - }, { name: "retry-after enabled, request is exempt(/readyz), SafeWaitGroup is in waiting mode", - shutdownDelayDurationElapsedFn: func() <-chan struct{} { - return newChannel(true) + when: func() (*retryAfterParams, bool) { + return &retryAfterParams{ + TearDownConnection: true, + Message: "The apiserver is shutting down, please try again later.", + }, true }, requestURL: "/readyz?verbose", userAgent: "foo", @@ -165,7 +191,7 @@ func TestWithRetryAfter(t *testing.T) { wrapped := WithWaitGroup(handler, func(*http.Request, *apirequest.RequestInfo) bool { return false }, safeWG) - wrapped = WithRetryAfter(wrapped, test.shutdownDelayDurationElapsedFn()) + wrapped = WithRetryAfter(wrapped, test.when) req, err := http.NewRequest(http.MethodGet, test.requestURL, nil) if err != nil { @@ -198,6 +224,58 @@ func TestWithRetryAfter(t *testing.T) { } } +func TestNewShouldRespondWithRetryAfterFunc(t *testing.T) { + tests := []struct { + name string + shutdownSendRetryAfter bool + shutdownCh <-chan struct{} + sendRetryAfterExpected bool + retryAfterParamsExpected *retryAfterParams + }{ + { + name: "shutdown-send-retry-after is disabled", + shutdownSendRetryAfter: false, + shutdownCh: newChannel(true), + sendRetryAfterExpected: false, + retryAfterParamsExpected: nil, + }, + { + name: "shutdown-send-retry-after is enabled, shutting down", + shutdownSendRetryAfter: true, + shutdownCh: newChannel(true), + sendRetryAfterExpected: true, + retryAfterParamsExpected: &retryAfterParams{ + TearDownConnection: true, + Message: "The apiserver is shutting down, please try again later.", + }, + }, + { + name: "shutdown-send-retry-after is enabled, not shutting down", + shutdownSendRetryAfter: true, + shutdownCh: newChannel(false), + sendRetryAfterExpected: false, + retryAfterParamsExpected: nil, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + fn := NewShouldRespondWithRetryAfterFunc(test.shutdownSendRetryAfter, test.shutdownCh) + if fn == nil { + t.Fatal("Expected a non nil ShouldRespondWithRetryAfterFunc") + } + + retryAfterParamsGot, sendRetryAfterGot := fn() + if test.sendRetryAfterExpected != sendRetryAfterGot { + t.Errorf("Expected send retry-after: %t, but got: %t", test.sendRetryAfterExpected, sendRetryAfterGot) + } + if !reflect.DeepEqual(test.retryAfterParamsExpected, retryAfterParamsGot) { + t.Errorf("Expected retry-after params to match, diff: %s", cmp.Diff(test.retryAfterParamsExpected, retryAfterParamsGot)) + } + }) + } +} + func newChannel(closed bool) <-chan struct{} { ch := make(chan struct{}) if closed { diff --git a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver_graceful_termination_test.go b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver_graceful_termination_test.go index d7956c9f64e..45047df9c0d 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver_graceful_termination_test.go +++ b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver_graceful_termination_test.go @@ -519,7 +519,8 @@ func newGenericAPIServer(t *testing.T, keepListening bool) *GenericAPIServer { config.BuildHandlerChainFunc = func(apiHandler http.Handler, c *Config) http.Handler { handler := genericfilters.WithWaitGroup(apiHandler, c.LongRunningFunc, c.HandlerChainWaitGroup) if c.ShutdownSendRetryAfter { - handler = genericfilters.WithRetryAfter(handler, c.lifecycleSignals.AfterShutdownDelayDuration.Signaled()) + shouldRetryAfterFn := genericfilters.NewShouldRespondWithRetryAfterFunc(c.ShutdownSendRetryAfter, c.lifecycleSignals.AfterShutdownDelayDuration.Signaled()) + handler = genericfilters.WithRetryAfter(handler, shouldRetryAfterFn) } handler = genericapifilters.WithRequestInfo(handler, c.RequestInfoResolver) return handler