mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-29 06:27:05 +00:00
apiserver: refactor WithRetryAfter server filter
This commit is contained in:
parent
3182b69e97
commit
83889ae594
@ -800,7 +800,8 @@ func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler {
|
|||||||
handler = genericapifilters.WithCacheControl(handler)
|
handler = genericapifilters.WithCacheControl(handler)
|
||||||
handler = genericfilters.WithHSTS(handler, c.HSTSDirectives)
|
handler = genericfilters.WithHSTS(handler, c.HSTSDirectives)
|
||||||
if c.ShutdownSendRetryAfter {
|
if c.ShutdownSendRetryAfter {
|
||||||
handler = genericfilters.WithRetryAfter(handler, c.lifecycleSignals.AfterShutdownDelayDuration.Signaled())
|
shouldRetryAfterFn := genericfilters.NewShouldRespondWithRetryAfterFunc(c.ShutdownSendRetryAfter, c.lifecycleSignals.AfterShutdownDelayDuration.Signaled())
|
||||||
|
handler = genericfilters.WithRetryAfter(handler, shouldRetryAfterFn)
|
||||||
}
|
}
|
||||||
handler = genericfilters.WithHTTPLogging(handler)
|
handler = genericfilters.WithHTTPLogging(handler)
|
||||||
if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerTracing) {
|
if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerTracing) {
|
||||||
|
@ -46,11 +46,11 @@ type retryAfterParams struct {
|
|||||||
Message string
|
Message string
|
||||||
}
|
}
|
||||||
|
|
||||||
// shouldRespondWithRetryAfterFunc returns true if the requests should
|
// ShouldRespondWithRetryAfterFunc returns true if the requests should
|
||||||
// be rejected with a Retry-After response once certain conditions are met.
|
// be rejected with a Retry-After response once certain conditions are met.
|
||||||
// The retryAfterParams returned contains instructions on how to
|
// The retryAfterParams returned contains instructions on how to
|
||||||
// construct the Retry-After response.
|
// construct the Retry-After response.
|
||||||
type shouldRespondWithRetryAfterFunc func() (*retryAfterParams, bool)
|
type ShouldRespondWithRetryAfterFunc func() (*retryAfterParams, bool)
|
||||||
|
|
||||||
// WithRetryAfter rejects any incoming new request(s) with a 429
|
// WithRetryAfter rejects any incoming new request(s) with a 429
|
||||||
// if the specified shutdownDelayDurationElapsedFn channel is closed
|
// if the specified shutdownDelayDurationElapsedFn channel is closed
|
||||||
@ -62,25 +62,40 @@ type shouldRespondWithRetryAfterFunc func() (*retryAfterParams, bool)
|
|||||||
// - 'Connection: close': tear down the TCP connection
|
// - 'Connection: close': tear down the TCP connection
|
||||||
//
|
//
|
||||||
// TODO: is there a way to merge WithWaitGroup and this filter?
|
// TODO: is there a way to merge WithWaitGroup and this filter?
|
||||||
func WithRetryAfter(handler http.Handler, shutdownDelayDurationElapsedCh <-chan struct{}) http.Handler {
|
func WithRetryAfter(handler http.Handler, when ShouldRespondWithRetryAfterFunc) http.Handler {
|
||||||
|
// NOTE: both WithRetryAfter and WithWaitGroup must use the same exact isRequestExemptFunc 'isRequestExemptFromRetryAfter,
|
||||||
|
// otherwise SafeWaitGroup might wait indefinitely and will prevent the server from shutting down gracefully.
|
||||||
|
return withRetryAfter(handler, isRequestExemptFromRetryAfter, when)
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewShouldRespondWithRetryAfterFunc returns a ShouldRespondWithRetryAfterFunc
|
||||||
|
func NewShouldRespondWithRetryAfterFunc(shutdownSendRetryAfter bool, shutdownCh <-chan struct{}) ShouldRespondWithRetryAfterFunc {
|
||||||
|
if !shutdownSendRetryAfter {
|
||||||
|
return func() (*retryAfterParams, bool) {
|
||||||
|
return nil, false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return newShutdownRetryAfterFunc(shutdownCh)
|
||||||
|
}
|
||||||
|
|
||||||
|
func newShutdownRetryAfterFunc(shutdownCh <-chan struct{}) ShouldRespondWithRetryAfterFunc {
|
||||||
shutdownRetryAfterParams := &retryAfterParams{
|
shutdownRetryAfterParams := &retryAfterParams{
|
||||||
TearDownConnection: true,
|
TearDownConnection: true,
|
||||||
Message: "The apiserver is shutting down, please try again later.",
|
Message: "The apiserver is shutting down, please try again later.",
|
||||||
}
|
}
|
||||||
|
|
||||||
// NOTE: both WithRetryAfter and WithWaitGroup must use the same exact isRequestExemptFunc 'isRequestExemptFromRetryAfter,
|
return func() (*retryAfterParams, bool) {
|
||||||
// otherwise SafeWaitGroup might wait indefinitely and will prevent the server from shutting down gracefully.
|
|
||||||
return withRetryAfter(handler, isRequestExemptFromRetryAfter, func() (*retryAfterParams, bool) {
|
|
||||||
select {
|
select {
|
||||||
case <-shutdownDelayDurationElapsedCh:
|
case <-shutdownCh:
|
||||||
return shutdownRetryAfterParams, true
|
return shutdownRetryAfterParams, true
|
||||||
default:
|
default:
|
||||||
return nil, false
|
return nil, false
|
||||||
}
|
}
|
||||||
})
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func withRetryAfter(handler http.Handler, isRequestExemptFn isRequestExemptFunc, shouldRespondWithRetryAfterFn shouldRespondWithRetryAfterFunc) http.Handler {
|
func withRetryAfter(handler http.Handler, isRequestExemptFn isRequestExemptFunc, shouldRespondWithRetryAfterFn ShouldRespondWithRetryAfterFunc) http.Handler {
|
||||||
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
|
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
|
||||||
params, send := shouldRespondWithRetryAfterFn()
|
params, send := shouldRespondWithRetryAfterFn()
|
||||||
if !send || isRequestExemptFn(req) {
|
if !send || isRequestExemptFn(req) {
|
||||||
|
@ -17,8 +17,10 @@ limitations under the License.
|
|||||||
package filters
|
package filters
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"github.com/google/go-cmp/cmp"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/http/httptest"
|
"net/http/httptest"
|
||||||
|
"reflect"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
utilwaitgroup "k8s.io/apimachinery/pkg/util/waitgroup"
|
utilwaitgroup "k8s.io/apimachinery/pkg/util/waitgroup"
|
||||||
@ -28,7 +30,7 @@ import (
|
|||||||
func TestWithRetryAfter(t *testing.T) {
|
func TestWithRetryAfter(t *testing.T) {
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
name string
|
name string
|
||||||
shutdownDelayDurationElapsedFn func() <-chan struct{}
|
when ShouldRespondWithRetryAfterFunc
|
||||||
requestURL string
|
requestURL string
|
||||||
userAgent string
|
userAgent string
|
||||||
safeWaitGroupIsWaiting bool
|
safeWaitGroupIsWaiting bool
|
||||||
@ -39,8 +41,8 @@ func TestWithRetryAfter(t *testing.T) {
|
|||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
name: "retry-after disabled",
|
name: "retry-after disabled",
|
||||||
shutdownDelayDurationElapsedFn: func() <-chan struct{} {
|
when: func() (*retryAfterParams, bool) {
|
||||||
return newChannel(false)
|
return nil, false
|
||||||
},
|
},
|
||||||
requestURL: "/api/v1/namespaces",
|
requestURL: "/api/v1/namespaces",
|
||||||
userAgent: "foo",
|
userAgent: "foo",
|
||||||
@ -51,8 +53,11 @@ func TestWithRetryAfter(t *testing.T) {
|
|||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "retry-after enabled, request is not exempt",
|
name: "retry-after enabled, request is not exempt",
|
||||||
shutdownDelayDurationElapsedFn: func() <-chan struct{} {
|
when: func() (*retryAfterParams, bool) {
|
||||||
return newChannel(true)
|
return &retryAfterParams{
|
||||||
|
TearDownConnection: true,
|
||||||
|
Message: "The apiserver is shutting down, please try again later.",
|
||||||
|
}, true
|
||||||
},
|
},
|
||||||
requestURL: "/api/v1/namespaces",
|
requestURL: "/api/v1/namespaces",
|
||||||
userAgent: "foo",
|
userAgent: "foo",
|
||||||
@ -61,10 +66,28 @@ func TestWithRetryAfter(t *testing.T) {
|
|||||||
retryAfterExpected: "5",
|
retryAfterExpected: "5",
|
||||||
statusCodeExpected: http.StatusTooManyRequests,
|
statusCodeExpected: http.StatusTooManyRequests,
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
name: "retry-after enabled, request is not exempt, no connection tear down",
|
||||||
|
when: func() (*retryAfterParams, bool) {
|
||||||
|
return &retryAfterParams{
|
||||||
|
TearDownConnection: false,
|
||||||
|
Message: "The apiserver is shutting down, please try again later.",
|
||||||
|
}, true
|
||||||
|
},
|
||||||
|
requestURL: "/api/v1/namespaces",
|
||||||
|
userAgent: "foo",
|
||||||
|
handlerInvoked: 0,
|
||||||
|
closeExpected: "",
|
||||||
|
retryAfterExpected: "5",
|
||||||
|
statusCodeExpected: http.StatusTooManyRequests,
|
||||||
|
},
|
||||||
{
|
{
|
||||||
name: "retry-after enabled, request is exempt(/metrics)",
|
name: "retry-after enabled, request is exempt(/metrics)",
|
||||||
shutdownDelayDurationElapsedFn: func() <-chan struct{} {
|
when: func() (*retryAfterParams, bool) {
|
||||||
return newChannel(true)
|
return &retryAfterParams{
|
||||||
|
TearDownConnection: true,
|
||||||
|
Message: "The apiserver is shutting down, please try again later.",
|
||||||
|
}, true
|
||||||
},
|
},
|
||||||
requestURL: "/metrics?foo=bar",
|
requestURL: "/metrics?foo=bar",
|
||||||
userAgent: "foo",
|
userAgent: "foo",
|
||||||
@ -75,8 +98,11 @@ func TestWithRetryAfter(t *testing.T) {
|
|||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "retry-after enabled, request is exempt(/livez)",
|
name: "retry-after enabled, request is exempt(/livez)",
|
||||||
shutdownDelayDurationElapsedFn: func() <-chan struct{} {
|
when: func() (*retryAfterParams, bool) {
|
||||||
return newChannel(true)
|
return &retryAfterParams{
|
||||||
|
TearDownConnection: true,
|
||||||
|
Message: "The apiserver is shutting down, please try again later.",
|
||||||
|
}, true
|
||||||
},
|
},
|
||||||
requestURL: "/livez?verbose",
|
requestURL: "/livez?verbose",
|
||||||
userAgent: "foo",
|
userAgent: "foo",
|
||||||
@ -87,8 +113,11 @@ func TestWithRetryAfter(t *testing.T) {
|
|||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "retry-after enabled, request is exempt(/readyz)",
|
name: "retry-after enabled, request is exempt(/readyz)",
|
||||||
shutdownDelayDurationElapsedFn: func() <-chan struct{} {
|
when: func() (*retryAfterParams, bool) {
|
||||||
return newChannel(true)
|
return &retryAfterParams{
|
||||||
|
TearDownConnection: true,
|
||||||
|
Message: "The apiserver is shutting down, please try again later.",
|
||||||
|
}, true
|
||||||
},
|
},
|
||||||
requestURL: "/readyz?verbose",
|
requestURL: "/readyz?verbose",
|
||||||
userAgent: "foo",
|
userAgent: "foo",
|
||||||
@ -99,8 +128,11 @@ func TestWithRetryAfter(t *testing.T) {
|
|||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "retry-after enabled, request is exempt(/healthz)",
|
name: "retry-after enabled, request is exempt(/healthz)",
|
||||||
shutdownDelayDurationElapsedFn: func() <-chan struct{} {
|
when: func() (*retryAfterParams, bool) {
|
||||||
return newChannel(true)
|
return &retryAfterParams{
|
||||||
|
TearDownConnection: true,
|
||||||
|
Message: "The apiserver is shutting down, please try again later.",
|
||||||
|
}, true
|
||||||
},
|
},
|
||||||
requestURL: "/healthz?verbose",
|
requestURL: "/healthz?verbose",
|
||||||
userAgent: "foo",
|
userAgent: "foo",
|
||||||
@ -111,8 +143,11 @@ func TestWithRetryAfter(t *testing.T) {
|
|||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "retry-after enabled, request is exempt(local loopback)",
|
name: "retry-after enabled, request is exempt(local loopback)",
|
||||||
shutdownDelayDurationElapsedFn: func() <-chan struct{} {
|
when: func() (*retryAfterParams, bool) {
|
||||||
return newChannel(true)
|
return &retryAfterParams{
|
||||||
|
TearDownConnection: true,
|
||||||
|
Message: "The apiserver is shutting down, please try again later.",
|
||||||
|
}, true
|
||||||
},
|
},
|
||||||
requestURL: "/api/v1/namespaces",
|
requestURL: "/api/v1/namespaces",
|
||||||
userAgent: "kube-apiserver/",
|
userAgent: "kube-apiserver/",
|
||||||
@ -121,22 +156,13 @@ func TestWithRetryAfter(t *testing.T) {
|
|||||||
retryAfterExpected: "",
|
retryAfterExpected: "",
|
||||||
statusCodeExpected: http.StatusOK,
|
statusCodeExpected: http.StatusOK,
|
||||||
},
|
},
|
||||||
{
|
|
||||||
name: "nil channel",
|
|
||||||
shutdownDelayDurationElapsedFn: func() <-chan struct{} {
|
|
||||||
return nil
|
|
||||||
},
|
|
||||||
requestURL: "/api/v1/namespaces",
|
|
||||||
userAgent: "foo",
|
|
||||||
handlerInvoked: 1,
|
|
||||||
closeExpected: "",
|
|
||||||
retryAfterExpected: "",
|
|
||||||
statusCodeExpected: http.StatusOK,
|
|
||||||
},
|
|
||||||
{
|
{
|
||||||
name: "retry-after enabled, request is exempt(/readyz), SafeWaitGroup is in waiting mode",
|
name: "retry-after enabled, request is exempt(/readyz), SafeWaitGroup is in waiting mode",
|
||||||
shutdownDelayDurationElapsedFn: func() <-chan struct{} {
|
when: func() (*retryAfterParams, bool) {
|
||||||
return newChannel(true)
|
return &retryAfterParams{
|
||||||
|
TearDownConnection: true,
|
||||||
|
Message: "The apiserver is shutting down, please try again later.",
|
||||||
|
}, true
|
||||||
},
|
},
|
||||||
requestURL: "/readyz?verbose",
|
requestURL: "/readyz?verbose",
|
||||||
userAgent: "foo",
|
userAgent: "foo",
|
||||||
@ -165,7 +191,7 @@ func TestWithRetryAfter(t *testing.T) {
|
|||||||
wrapped := WithWaitGroup(handler, func(*http.Request, *apirequest.RequestInfo) bool {
|
wrapped := WithWaitGroup(handler, func(*http.Request, *apirequest.RequestInfo) bool {
|
||||||
return false
|
return false
|
||||||
}, safeWG)
|
}, safeWG)
|
||||||
wrapped = WithRetryAfter(wrapped, test.shutdownDelayDurationElapsedFn())
|
wrapped = WithRetryAfter(wrapped, test.when)
|
||||||
|
|
||||||
req, err := http.NewRequest(http.MethodGet, test.requestURL, nil)
|
req, err := http.NewRequest(http.MethodGet, test.requestURL, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -198,6 +224,58 @@ func TestWithRetryAfter(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestNewShouldRespondWithRetryAfterFunc(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
shutdownSendRetryAfter bool
|
||||||
|
shutdownCh <-chan struct{}
|
||||||
|
sendRetryAfterExpected bool
|
||||||
|
retryAfterParamsExpected *retryAfterParams
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "shutdown-send-retry-after is disabled",
|
||||||
|
shutdownSendRetryAfter: false,
|
||||||
|
shutdownCh: newChannel(true),
|
||||||
|
sendRetryAfterExpected: false,
|
||||||
|
retryAfterParamsExpected: nil,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "shutdown-send-retry-after is enabled, shutting down",
|
||||||
|
shutdownSendRetryAfter: true,
|
||||||
|
shutdownCh: newChannel(true),
|
||||||
|
sendRetryAfterExpected: true,
|
||||||
|
retryAfterParamsExpected: &retryAfterParams{
|
||||||
|
TearDownConnection: true,
|
||||||
|
Message: "The apiserver is shutting down, please try again later.",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "shutdown-send-retry-after is enabled, not shutting down",
|
||||||
|
shutdownSendRetryAfter: true,
|
||||||
|
shutdownCh: newChannel(false),
|
||||||
|
sendRetryAfterExpected: false,
|
||||||
|
retryAfterParamsExpected: nil,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, test := range tests {
|
||||||
|
t.Run(test.name, func(t *testing.T) {
|
||||||
|
fn := NewShouldRespondWithRetryAfterFunc(test.shutdownSendRetryAfter, test.shutdownCh)
|
||||||
|
if fn == nil {
|
||||||
|
t.Fatal("Expected a non nil ShouldRespondWithRetryAfterFunc")
|
||||||
|
}
|
||||||
|
|
||||||
|
retryAfterParamsGot, sendRetryAfterGot := fn()
|
||||||
|
if test.sendRetryAfterExpected != sendRetryAfterGot {
|
||||||
|
t.Errorf("Expected send retry-after: %t, but got: %t", test.sendRetryAfterExpected, sendRetryAfterGot)
|
||||||
|
}
|
||||||
|
if !reflect.DeepEqual(test.retryAfterParamsExpected, retryAfterParamsGot) {
|
||||||
|
t.Errorf("Expected retry-after params to match, diff: %s", cmp.Diff(test.retryAfterParamsExpected, retryAfterParamsGot))
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func newChannel(closed bool) <-chan struct{} {
|
func newChannel(closed bool) <-chan struct{} {
|
||||||
ch := make(chan struct{})
|
ch := make(chan struct{})
|
||||||
if closed {
|
if closed {
|
||||||
|
@ -519,7 +519,8 @@ func newGenericAPIServer(t *testing.T, keepListening bool) *GenericAPIServer {
|
|||||||
config.BuildHandlerChainFunc = func(apiHandler http.Handler, c *Config) http.Handler {
|
config.BuildHandlerChainFunc = func(apiHandler http.Handler, c *Config) http.Handler {
|
||||||
handler := genericfilters.WithWaitGroup(apiHandler, c.LongRunningFunc, c.HandlerChainWaitGroup)
|
handler := genericfilters.WithWaitGroup(apiHandler, c.LongRunningFunc, c.HandlerChainWaitGroup)
|
||||||
if c.ShutdownSendRetryAfter {
|
if c.ShutdownSendRetryAfter {
|
||||||
handler = genericfilters.WithRetryAfter(handler, c.lifecycleSignals.AfterShutdownDelayDuration.Signaled())
|
shouldRetryAfterFn := genericfilters.NewShouldRespondWithRetryAfterFunc(c.ShutdownSendRetryAfter, c.lifecycleSignals.AfterShutdownDelayDuration.Signaled())
|
||||||
|
handler = genericfilters.WithRetryAfter(handler, shouldRetryAfterFn)
|
||||||
}
|
}
|
||||||
handler = genericapifilters.WithRequestInfo(handler, c.RequestInfoResolver)
|
handler = genericapifilters.WithRequestInfo(handler, c.RequestInfoResolver)
|
||||||
return handler
|
return handler
|
||||||
|
Loading…
Reference in New Issue
Block a user