Merge pull request #104281 from tkashem/not-ready-429

send retry-after until the apiserver is ready
This commit is contained in:
Kubernetes Prow Robot 2021-08-17 05:51:13 -07:00 committed by GitHub
commit fc5863b8b2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 303 additions and 80 deletions

View File

@ -240,6 +240,15 @@ type Config struct {
// rejected with a 429 status code and a 'Retry-After' response.
ShutdownSendRetryAfter bool
// StartupSendRetryAfterUntilReady once set will reject incoming requests with
// a 429 status code and a 'Retry-After' response header until the apiserver
// hasn't fully initialized.
// This option ensures that the system stays consistent even when requests
// are received before the server has been initialized.
// In particular, it prevents child deletion in case of GC or/and orphaned
// content in case of the namespaces controller.
StartupSendRetryAfterUntilReady bool
//===========================================================================
// values below here are targets for removal
//===========================================================================
@ -473,6 +482,46 @@ func (c *Config) AddPostStartHookOrDie(name string, hook PostStartHookFunc) {
}
}
// shouldAddWithRetryAfterFilter returns an appropriate ShouldRespondWithRetryAfterFunc
// if the apiserver should respond with a Retry-After response header based on option
// 'shutdown-send-retry-after' or 'startup-send-retry-after-until-ready'.
func (c *Config) shouldAddWithRetryAfterFilter() genericfilters.ShouldRespondWithRetryAfterFunc {
if !(c.ShutdownSendRetryAfter || c.StartupSendRetryAfterUntilReady) {
return nil
}
// follow lifecycle, avoiding go routines per request
const (
startup int32 = iota
running
terminating
)
state := startup
go func() {
<-c.lifecycleSignals.HasBeenReady.Signaled()
atomic.StoreInt32(&state, running)
<-c.lifecycleSignals.AfterShutdownDelayDuration.Signaled()
atomic.StoreInt32(&state, terminating)
}()
return func() (*genericfilters.RetryAfterParams, bool) {
state := atomic.LoadInt32(&state)
switch {
case c.StartupSendRetryAfterUntilReady && state == startup:
return &genericfilters.RetryAfterParams{
Message: "The apiserver hasn't been fully initialized yet, please try again later.",
}, true
case c.ShutdownSendRetryAfter && state == terminating:
return &genericfilters.RetryAfterParams{
TearDownConnection: true,
Message: "The apiserver is shutting down, please try again later.",
}, true
default:
return nil, false
}
}
}
// Complete fills in any fields not set that are required to have valid data and can be derived
// from other fields. If you're going to `ApplyOptions`, do that first. It's mutating the receiver.
func (c *Config) Complete(informers informers.SharedInformerFactory) CompletedConfig {
@ -799,8 +848,8 @@ func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler {
handler = genericapifilters.WithWarningRecorder(handler)
handler = genericapifilters.WithCacheControl(handler)
handler = genericfilters.WithHSTS(handler, c.HSTSDirectives)
if c.ShutdownSendRetryAfter {
handler = genericfilters.WithRetryAfter(handler, c.lifecycleSignals.AfterShutdownDelayDuration.Signaled())
if shouldRespondWithRetryAfterFn := c.shouldAddWithRetryAfterFilter(); shouldRespondWithRetryAfterFn != nil {
handler = genericfilters.WithRetryAfter(handler, shouldRespondWithRetryAfterFn)
}
handler = genericfilters.WithHTTPLogging(handler)
if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerTracing) {

View File

@ -38,6 +38,7 @@ import (
"k8s.io/apiserver/pkg/authentication/authenticator"
"k8s.io/apiserver/pkg/authentication/user"
"k8s.io/apiserver/pkg/endpoints/request"
genericfilters "k8s.io/apiserver/pkg/server/filters"
"k8s.io/apiserver/pkg/server/healthz"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
@ -345,6 +346,159 @@ func TestAuthenticationAuditAnnotationsDefaultChain(t *testing.T) {
}
}
func TestShouldRespondWithRetryAfterFunc(t *testing.T) {
tests := []struct {
name string
config *Config
addWithRetryAfterFilterExpected bool
sendRetryAfterExpected bool
retryAfterParamsExpected *genericfilters.RetryAfterParams
}{
{
name: "both shutdown-send-retry-after and startup-send-retry-after-until-ready are not enabled",
config: &Config{
StartupSendRetryAfterUntilReady: false,
ShutdownSendRetryAfter: false,
},
addWithRetryAfterFilterExpected: false,
sendRetryAfterExpected: false,
retryAfterParamsExpected: nil,
},
{
name: "shutdown-send-retry-after is enabled, the apserver is shutting down",
config: func() *Config {
c := &Config{
lifecycleSignals: newLifecycleSignals(),
StartupSendRetryAfterUntilReady: false,
ShutdownSendRetryAfter: true,
}
c.lifecycleSignals.HasBeenReady.Signal()
c.lifecycleSignals.AfterShutdownDelayDuration.Signal()
return c
}(),
addWithRetryAfterFilterExpected: true,
sendRetryAfterExpected: true,
retryAfterParamsExpected: &genericfilters.RetryAfterParams{
TearDownConnection: true,
Message: "The apiserver is shutting down, please try again later.",
},
},
{
name: "shutdown-send-retry-after is enabled, the apserver is not in shutdown mode",
config: &Config{
lifecycleSignals: newLifecycleSignals(),
StartupSendRetryAfterUntilReady: false,
ShutdownSendRetryAfter: true,
},
addWithRetryAfterFilterExpected: true,
sendRetryAfterExpected: false,
retryAfterParamsExpected: nil,
},
{
name: "startup-send-retry-after-until-ready is enabled, the apserver is not ready yet",
config: &Config{
lifecycleSignals: newLifecycleSignals(),
StartupSendRetryAfterUntilReady: true,
ShutdownSendRetryAfter: false,
},
addWithRetryAfterFilterExpected: true,
sendRetryAfterExpected: true,
retryAfterParamsExpected: &genericfilters.RetryAfterParams{
TearDownConnection: false,
Message: "The apiserver hasn't been fully initialized yet, please try again later.",
},
},
{
name: "startup-send-retry-after-until-ready is enabled, the apserver is ready",
config: func() *Config {
c := &Config{
lifecycleSignals: newLifecycleSignals(),
StartupSendRetryAfterUntilReady: true,
ShutdownSendRetryAfter: false,
}
c.lifecycleSignals.HasBeenReady.Signal()
return c
}(),
addWithRetryAfterFilterExpected: true,
sendRetryAfterExpected: false,
retryAfterParamsExpected: nil,
},
{
name: "both shutdown-send-retry-after is enabled and startup-send-retry-after-until-ready are enabled, the apserver is not ready",
config: &Config{
lifecycleSignals: newLifecycleSignals(),
StartupSendRetryAfterUntilReady: true,
ShutdownSendRetryAfter: true,
},
addWithRetryAfterFilterExpected: true,
sendRetryAfterExpected: true,
retryAfterParamsExpected: &genericfilters.RetryAfterParams{
TearDownConnection: false,
Message: "The apiserver hasn't been fully initialized yet, please try again later.",
},
},
{
name: "both shutdown-send-retry-after is enabled and startup-send-retry-after-until-ready are enabled, the apserver is ready",
config: func() *Config {
c := &Config{
lifecycleSignals: newLifecycleSignals(),
StartupSendRetryAfterUntilReady: true,
ShutdownSendRetryAfter: true,
}
c.lifecycleSignals.HasBeenReady.Signal()
return c
}(),
addWithRetryAfterFilterExpected: true,
sendRetryAfterExpected: false,
retryAfterParamsExpected: nil,
},
{
name: "both shutdown-send-retry-after is enabled and startup-send-retry-after-until-ready are enabled, the apserver is shutting down",
config: func() *Config {
c := &Config{
lifecycleSignals: newLifecycleSignals(),
StartupSendRetryAfterUntilReady: true,
ShutdownSendRetryAfter: true,
}
c.lifecycleSignals.HasBeenReady.Signal()
c.lifecycleSignals.AfterShutdownDelayDuration.Signal()
return c
}(),
addWithRetryAfterFilterExpected: true,
sendRetryAfterExpected: true,
retryAfterParamsExpected: &genericfilters.RetryAfterParams{
TearDownConnection: true,
Message: "The apiserver is shutting down, please try again later.",
},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
shouldRespondWithRetryAfterFn := test.config.shouldAddWithRetryAfterFilter()
// we need to sleep some time to allow the goroutine launched by
// shouldAddWithRetryAfterFilter to finish
time.Sleep(100 * time.Millisecond)
if test.addWithRetryAfterFilterExpected != (shouldRespondWithRetryAfterFn != nil) {
t.Errorf("Expected add WithRetryAfter: %t, but got: %t", test.addWithRetryAfterFilterExpected, shouldRespondWithRetryAfterFn != nil)
}
if !test.addWithRetryAfterFilterExpected {
return
}
paramsGot, sendRetryAfterGot := shouldRespondWithRetryAfterFn()
if test.sendRetryAfterExpected != sendRetryAfterGot {
t.Errorf("Expected send retry-after: %t, but got: %t", test.sendRetryAfterExpected, sendRetryAfterGot)
}
if !reflect.DeepEqual(test.retryAfterParamsExpected, paramsGot) {
t.Errorf("Expected retry-after params to match, diff: %s", cmp.Diff(test.retryAfterParamsExpected, paramsGot))
}
})
}
}
type testBackend struct {
events []*auditinternal.Event

View File

@ -36,8 +36,8 @@ var (
// with a Retry-After response, otherwise it returns false.
type isRequestExemptFunc func(*http.Request) bool
// retryAfterParams dictates how the Retry-After response is constructed
type retryAfterParams struct {
// RetryAfterParams dictates how the Retry-After response is constructed
type RetryAfterParams struct {
// TearDownConnection is true when we should send a 'Connection: close'
// header in the response so net/http can tear down the TCP connection.
TearDownConnection bool
@ -46,11 +46,11 @@ type retryAfterParams struct {
Message string
}
// shouldRespondWithRetryAfterFunc returns true if the requests should
// ShouldRespondWithRetryAfterFunc returns true if the requests should
// be rejected with a Retry-After response once certain conditions are met.
// The retryAfterParams returned contains instructions on how to
// The RetryAfterParams returned contains instructions on how to
// construct the Retry-After response.
type shouldRespondWithRetryAfterFunc func() (*retryAfterParams, bool)
type ShouldRespondWithRetryAfterFunc func() (*RetryAfterParams, bool)
// WithRetryAfter rejects any incoming new request(s) with a 429
// if the specified shutdownDelayDurationElapsedFn channel is closed
@ -62,25 +62,13 @@ type shouldRespondWithRetryAfterFunc func() (*retryAfterParams, bool)
// - 'Connection: close': tear down the TCP connection
//
// TODO: is there a way to merge WithWaitGroup and this filter?
func WithRetryAfter(handler http.Handler, shutdownDelayDurationElapsedCh <-chan struct{}) http.Handler {
shutdownRetryAfterParams := &retryAfterParams{
TearDownConnection: true,
Message: "The apiserver is shutting down, please try again later.",
}
func WithRetryAfter(handler http.Handler, when ShouldRespondWithRetryAfterFunc) http.Handler {
// NOTE: both WithRetryAfter and WithWaitGroup must use the same exact isRequestExemptFunc 'isRequestExemptFromRetryAfter,
// otherwise SafeWaitGroup might wait indefinitely and will prevent the server from shutting down gracefully.
return withRetryAfter(handler, isRequestExemptFromRetryAfter, func() (*retryAfterParams, bool) {
select {
case <-shutdownDelayDurationElapsedCh:
return shutdownRetryAfterParams, true
default:
return nil, false
}
})
return withRetryAfter(handler, isRequestExemptFromRetryAfter, when)
}
func withRetryAfter(handler http.Handler, isRequestExemptFn isRequestExemptFunc, shouldRespondWithRetryAfterFn shouldRespondWithRetryAfterFunc) http.Handler {
func withRetryAfter(handler http.Handler, isRequestExemptFn isRequestExemptFunc, shouldRespondWithRetryAfterFn ShouldRespondWithRetryAfterFunc) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
params, send := shouldRespondWithRetryAfterFn()
if !send || isRequestExemptFn(req) {

View File

@ -27,20 +27,20 @@ import (
func TestWithRetryAfter(t *testing.T) {
tests := []struct {
name string
shutdownDelayDurationElapsedFn func() <-chan struct{}
requestURL string
userAgent string
safeWaitGroupIsWaiting bool
handlerInvoked int
closeExpected string
retryAfterExpected string
statusCodeExpected int
name string
when ShouldRespondWithRetryAfterFunc
requestURL string
userAgent string
safeWaitGroupIsWaiting bool
handlerInvoked int
closeExpected string
retryAfterExpected string
statusCodeExpected int
}{
{
name: "retry-after disabled",
shutdownDelayDurationElapsedFn: func() <-chan struct{} {
return newChannel(false)
when: func() (*RetryAfterParams, bool) {
return nil, false
},
requestURL: "/api/v1/namespaces",
userAgent: "foo",
@ -51,8 +51,11 @@ func TestWithRetryAfter(t *testing.T) {
},
{
name: "retry-after enabled, request is not exempt",
shutdownDelayDurationElapsedFn: func() <-chan struct{} {
return newChannel(true)
when: func() (*RetryAfterParams, bool) {
return &RetryAfterParams{
TearDownConnection: true,
Message: "The apiserver is shutting down, please try again later.",
}, true
},
requestURL: "/api/v1/namespaces",
userAgent: "foo",
@ -61,10 +64,28 @@ func TestWithRetryAfter(t *testing.T) {
retryAfterExpected: "5",
statusCodeExpected: http.StatusTooManyRequests,
},
{
name: "retry-after enabled, request is not exempt, no connection tear down",
when: func() (*RetryAfterParams, bool) {
return &RetryAfterParams{
TearDownConnection: false,
Message: "The apiserver is shutting down, please try again later.",
}, true
},
requestURL: "/api/v1/namespaces",
userAgent: "foo",
handlerInvoked: 0,
closeExpected: "",
retryAfterExpected: "5",
statusCodeExpected: http.StatusTooManyRequests,
},
{
name: "retry-after enabled, request is exempt(/metrics)",
shutdownDelayDurationElapsedFn: func() <-chan struct{} {
return newChannel(true)
when: func() (*RetryAfterParams, bool) {
return &RetryAfterParams{
TearDownConnection: true,
Message: "The apiserver is shutting down, please try again later.",
}, true
},
requestURL: "/metrics?foo=bar",
userAgent: "foo",
@ -75,8 +96,11 @@ func TestWithRetryAfter(t *testing.T) {
},
{
name: "retry-after enabled, request is exempt(/livez)",
shutdownDelayDurationElapsedFn: func() <-chan struct{} {
return newChannel(true)
when: func() (*RetryAfterParams, bool) {
return &RetryAfterParams{
TearDownConnection: true,
Message: "The apiserver is shutting down, please try again later.",
}, true
},
requestURL: "/livez?verbose",
userAgent: "foo",
@ -87,8 +111,11 @@ func TestWithRetryAfter(t *testing.T) {
},
{
name: "retry-after enabled, request is exempt(/readyz)",
shutdownDelayDurationElapsedFn: func() <-chan struct{} {
return newChannel(true)
when: func() (*RetryAfterParams, bool) {
return &RetryAfterParams{
TearDownConnection: true,
Message: "The apiserver is shutting down, please try again later.",
}, true
},
requestURL: "/readyz?verbose",
userAgent: "foo",
@ -99,8 +126,11 @@ func TestWithRetryAfter(t *testing.T) {
},
{
name: "retry-after enabled, request is exempt(/healthz)",
shutdownDelayDurationElapsedFn: func() <-chan struct{} {
return newChannel(true)
when: func() (*RetryAfterParams, bool) {
return &RetryAfterParams{
TearDownConnection: true,
Message: "The apiserver is shutting down, please try again later.",
}, true
},
requestURL: "/healthz?verbose",
userAgent: "foo",
@ -111,8 +141,11 @@ func TestWithRetryAfter(t *testing.T) {
},
{
name: "retry-after enabled, request is exempt(local loopback)",
shutdownDelayDurationElapsedFn: func() <-chan struct{} {
return newChannel(true)
when: func() (*RetryAfterParams, bool) {
return &RetryAfterParams{
TearDownConnection: true,
Message: "The apiserver is shutting down, please try again later.",
}, true
},
requestURL: "/api/v1/namespaces",
userAgent: "kube-apiserver/",
@ -121,22 +154,13 @@ func TestWithRetryAfter(t *testing.T) {
retryAfterExpected: "",
statusCodeExpected: http.StatusOK,
},
{
name: "nil channel",
shutdownDelayDurationElapsedFn: func() <-chan struct{} {
return nil
},
requestURL: "/api/v1/namespaces",
userAgent: "foo",
handlerInvoked: 1,
closeExpected: "",
retryAfterExpected: "",
statusCodeExpected: http.StatusOK,
},
{
name: "retry-after enabled, request is exempt(/readyz), SafeWaitGroup is in waiting mode",
shutdownDelayDurationElapsedFn: func() <-chan struct{} {
return newChannel(true)
when: func() (*RetryAfterParams, bool) {
return &RetryAfterParams{
TearDownConnection: true,
Message: "The apiserver is shutting down, please try again later.",
}, true
},
requestURL: "/readyz?verbose",
userAgent: "foo",
@ -165,7 +189,7 @@ func TestWithRetryAfter(t *testing.T) {
wrapped := WithWaitGroup(handler, func(*http.Request, *apirequest.RequestInfo) bool {
return false
}, safeWG)
wrapped = WithRetryAfter(wrapped, test.shutdownDelayDurationElapsedFn())
wrapped = WithRetryAfter(wrapped, test.when)
req, err := http.NewRequest(http.MethodGet, test.requestURL, nil)
if err != nil {
@ -197,11 +221,3 @@ func TestWithRetryAfter(t *testing.T) {
})
}
}
func newChannel(closed bool) <-chan struct{} {
ch := make(chan struct{})
if closed {
close(ch)
}
return ch
}

View File

@ -518,8 +518,8 @@ func newGenericAPIServer(t *testing.T, keepListening bool) *GenericAPIServer {
config.ShutdownSendRetryAfter = keepListening
config.BuildHandlerChainFunc = func(apiHandler http.Handler, c *Config) http.Handler {
handler := genericfilters.WithWaitGroup(apiHandler, c.LongRunningFunc, c.HandlerChainWaitGroup)
if c.ShutdownSendRetryAfter {
handler = genericfilters.WithRetryAfter(handler, c.lifecycleSignals.AfterShutdownDelayDuration.Signaled())
if shouldRetryAfterFn := c.shouldAddWithRetryAfterFilter(); shouldRetryAfterFn != nil {
handler = genericfilters.WithRetryAfter(handler, shouldRetryAfterFn)
}
handler = genericapifilters.WithRequestInfo(handler, c.RequestInfoResolver)
return handler

View File

@ -63,21 +63,31 @@ type ServerRunOptions struct {
// If enabled, after ShutdownDelayDuration elapses, any incoming request is
// rejected with a 429 status code and a 'Retry-After' response.
ShutdownSendRetryAfter bool
// StartupSendRetryAfterUntilReady once set will reject incoming requests with
// a 429 status code and a 'Retry-After' response header until the apiserver
// hasn't fully initialized.
// This option ensures that the system stays consistent even when requests
// are received before the server has been initialized.
// In particular, it prevents child deletion in case of GC or/and orphaned
// content in case of the namespaces controller.
StartupSendRetryAfterUntilReady bool
}
func NewServerRunOptions() *ServerRunOptions {
defaults := server.NewConfig(serializer.CodecFactory{})
return &ServerRunOptions{
MaxRequestsInFlight: defaults.MaxRequestsInFlight,
MaxMutatingRequestsInFlight: defaults.MaxMutatingRequestsInFlight,
RequestTimeout: defaults.RequestTimeout,
LivezGracePeriod: defaults.LivezGracePeriod,
MinRequestTimeout: defaults.MinRequestTimeout,
ShutdownDelayDuration: defaults.ShutdownDelayDuration,
JSONPatchMaxCopyBytes: defaults.JSONPatchMaxCopyBytes,
MaxRequestBodyBytes: defaults.MaxRequestBodyBytes,
EnablePriorityAndFairness: true,
ShutdownSendRetryAfter: false,
MaxRequestsInFlight: defaults.MaxRequestsInFlight,
MaxMutatingRequestsInFlight: defaults.MaxMutatingRequestsInFlight,
RequestTimeout: defaults.RequestTimeout,
LivezGracePeriod: defaults.LivezGracePeriod,
MinRequestTimeout: defaults.MinRequestTimeout,
ShutdownDelayDuration: defaults.ShutdownDelayDuration,
JSONPatchMaxCopyBytes: defaults.JSONPatchMaxCopyBytes,
MaxRequestBodyBytes: defaults.MaxRequestBodyBytes,
EnablePriorityAndFairness: true,
ShutdownSendRetryAfter: false,
StartupSendRetryAfterUntilReady: false,
}
}
@ -97,6 +107,7 @@ func (s *ServerRunOptions) ApplyTo(c *server.Config) error {
c.MaxRequestBodyBytes = s.MaxRequestBodyBytes
c.PublicAddress = s.AdvertiseAddress
c.ShutdownSendRetryAfter = s.ShutdownSendRetryAfter
c.StartupSendRetryAfterUntilReady = s.StartupSendRetryAfterUntilReady
return nil
}
@ -261,5 +272,10 @@ func (s *ServerRunOptions) AddUniversalFlags(fs *pflag.FlagSet) {
"during this window all incoming requests will be rejected with a status code 429 and a 'Retry-After' response header, "+
"in addition 'Connection: close' response header is set in order to tear down the TCP connection when idle.")
fs.BoolVar(&s.StartupSendRetryAfterUntilReady, "startup-send-retry-after-until-ready", s.ShutdownSendRetryAfter, ""+
"If true, incoming request(s) will be rejected with a '429' status code and a 'Retry-After' response header "+
"until the apiserver has initialized. This option ensures that the system stays consistent even when requests "+
"arrive at the server before it has been initialized.")
utilfeature.DefaultMutableFeatureGate.AddFlag(fs)
}