mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-29 14:37:00 +00:00
Revert "Merge pull request #104281 from tkashem/not-ready-429"
This reverts commitfc5863b8b2
, reversing changes made to027fe2554f
.
This commit is contained in:
parent
8844d3092a
commit
f9f0872590
@ -239,15 +239,6 @@ type Config struct {
|
|||||||
// rejected with a 429 status code and a 'Retry-After' response.
|
// rejected with a 429 status code and a 'Retry-After' response.
|
||||||
ShutdownSendRetryAfter bool
|
ShutdownSendRetryAfter bool
|
||||||
|
|
||||||
// StartupSendRetryAfterUntilReady once set will reject incoming requests with
|
|
||||||
// a 429 status code and a 'Retry-After' response header until the apiserver
|
|
||||||
// hasn't fully initialized.
|
|
||||||
// This option ensures that the system stays consistent even when requests
|
|
||||||
// are received before the server has been initialized.
|
|
||||||
// In particular, it prevents child deletion in case of GC or/and orphaned
|
|
||||||
// content in case of the namespaces controller.
|
|
||||||
StartupSendRetryAfterUntilReady bool
|
|
||||||
|
|
||||||
//===========================================================================
|
//===========================================================================
|
||||||
// values below here are targets for removal
|
// values below here are targets for removal
|
||||||
//===========================================================================
|
//===========================================================================
|
||||||
@ -481,46 +472,6 @@ func (c *Config) AddPostStartHookOrDie(name string, hook PostStartHookFunc) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// shouldAddWithRetryAfterFilter returns an appropriate ShouldRespondWithRetryAfterFunc
|
|
||||||
// if the apiserver should respond with a Retry-After response header based on option
|
|
||||||
// 'shutdown-send-retry-after' or 'startup-send-retry-after-until-ready'.
|
|
||||||
func (c *Config) shouldAddWithRetryAfterFilter() genericfilters.ShouldRespondWithRetryAfterFunc {
|
|
||||||
if !(c.ShutdownSendRetryAfter || c.StartupSendRetryAfterUntilReady) {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// follow lifecycle, avoiding go routines per request
|
|
||||||
const (
|
|
||||||
startup int32 = iota
|
|
||||||
running
|
|
||||||
terminating
|
|
||||||
)
|
|
||||||
state := startup
|
|
||||||
go func() {
|
|
||||||
<-c.lifecycleSignals.HasBeenReady.Signaled()
|
|
||||||
atomic.StoreInt32(&state, running)
|
|
||||||
<-c.lifecycleSignals.AfterShutdownDelayDuration.Signaled()
|
|
||||||
atomic.StoreInt32(&state, terminating)
|
|
||||||
}()
|
|
||||||
|
|
||||||
return func() (*genericfilters.RetryAfterParams, bool) {
|
|
||||||
state := atomic.LoadInt32(&state)
|
|
||||||
switch {
|
|
||||||
case c.StartupSendRetryAfterUntilReady && state == startup:
|
|
||||||
return &genericfilters.RetryAfterParams{
|
|
||||||
Message: "The apiserver hasn't been fully initialized yet, please try again later.",
|
|
||||||
}, true
|
|
||||||
case c.ShutdownSendRetryAfter && state == terminating:
|
|
||||||
return &genericfilters.RetryAfterParams{
|
|
||||||
TearDownConnection: true,
|
|
||||||
Message: "The apiserver is shutting down, please try again later.",
|
|
||||||
}, true
|
|
||||||
default:
|
|
||||||
return nil, false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Complete fills in any fields not set that are required to have valid data and can be derived
|
// Complete fills in any fields not set that are required to have valid data and can be derived
|
||||||
// from other fields. If you're going to `ApplyOptions`, do that first. It's mutating the receiver.
|
// from other fields. If you're going to `ApplyOptions`, do that first. It's mutating the receiver.
|
||||||
func (c *Config) Complete(informers informers.SharedInformerFactory) CompletedConfig {
|
func (c *Config) Complete(informers informers.SharedInformerFactory) CompletedConfig {
|
||||||
@ -847,8 +798,8 @@ func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler {
|
|||||||
handler = genericapifilters.WithWarningRecorder(handler)
|
handler = genericapifilters.WithWarningRecorder(handler)
|
||||||
handler = genericapifilters.WithCacheControl(handler)
|
handler = genericapifilters.WithCacheControl(handler)
|
||||||
handler = genericfilters.WithHSTS(handler, c.HSTSDirectives)
|
handler = genericfilters.WithHSTS(handler, c.HSTSDirectives)
|
||||||
if shouldRespondWithRetryAfterFn := c.shouldAddWithRetryAfterFilter(); shouldRespondWithRetryAfterFn != nil {
|
if c.ShutdownSendRetryAfter {
|
||||||
handler = genericfilters.WithRetryAfter(handler, shouldRespondWithRetryAfterFn)
|
handler = genericfilters.WithRetryAfter(handler, c.lifecycleSignals.AfterShutdownDelayDuration.Signaled())
|
||||||
}
|
}
|
||||||
handler = genericfilters.WithHTTPLogging(handler)
|
handler = genericfilters.WithHTTPLogging(handler)
|
||||||
if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerTracing) {
|
if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerTracing) {
|
||||||
|
@ -37,7 +37,6 @@ import (
|
|||||||
"k8s.io/apiserver/pkg/authentication/authenticator"
|
"k8s.io/apiserver/pkg/authentication/authenticator"
|
||||||
"k8s.io/apiserver/pkg/authentication/user"
|
"k8s.io/apiserver/pkg/authentication/user"
|
||||||
"k8s.io/apiserver/pkg/endpoints/request"
|
"k8s.io/apiserver/pkg/endpoints/request"
|
||||||
genericfilters "k8s.io/apiserver/pkg/server/filters"
|
|
||||||
"k8s.io/apiserver/pkg/server/healthz"
|
"k8s.io/apiserver/pkg/server/healthz"
|
||||||
"k8s.io/client-go/informers"
|
"k8s.io/client-go/informers"
|
||||||
"k8s.io/client-go/kubernetes/fake"
|
"k8s.io/client-go/kubernetes/fake"
|
||||||
@ -346,159 +345,6 @@ func TestAuthenticationAuditAnnotationsDefaultChain(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestShouldRespondWithRetryAfterFunc(t *testing.T) {
|
|
||||||
tests := []struct {
|
|
||||||
name string
|
|
||||||
config *Config
|
|
||||||
addWithRetryAfterFilterExpected bool
|
|
||||||
sendRetryAfterExpected bool
|
|
||||||
retryAfterParamsExpected *genericfilters.RetryAfterParams
|
|
||||||
}{
|
|
||||||
{
|
|
||||||
name: "both shutdown-send-retry-after and startup-send-retry-after-until-ready are not enabled",
|
|
||||||
config: &Config{
|
|
||||||
StartupSendRetryAfterUntilReady: false,
|
|
||||||
ShutdownSendRetryAfter: false,
|
|
||||||
},
|
|
||||||
addWithRetryAfterFilterExpected: false,
|
|
||||||
sendRetryAfterExpected: false,
|
|
||||||
retryAfterParamsExpected: nil,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "shutdown-send-retry-after is enabled, the apserver is shutting down",
|
|
||||||
config: func() *Config {
|
|
||||||
c := &Config{
|
|
||||||
lifecycleSignals: newLifecycleSignals(),
|
|
||||||
StartupSendRetryAfterUntilReady: false,
|
|
||||||
ShutdownSendRetryAfter: true,
|
|
||||||
}
|
|
||||||
c.lifecycleSignals.HasBeenReady.Signal()
|
|
||||||
c.lifecycleSignals.AfterShutdownDelayDuration.Signal()
|
|
||||||
return c
|
|
||||||
}(),
|
|
||||||
addWithRetryAfterFilterExpected: true,
|
|
||||||
sendRetryAfterExpected: true,
|
|
||||||
retryAfterParamsExpected: &genericfilters.RetryAfterParams{
|
|
||||||
TearDownConnection: true,
|
|
||||||
Message: "The apiserver is shutting down, please try again later.",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "shutdown-send-retry-after is enabled, the apserver is not in shutdown mode",
|
|
||||||
config: &Config{
|
|
||||||
lifecycleSignals: newLifecycleSignals(),
|
|
||||||
StartupSendRetryAfterUntilReady: false,
|
|
||||||
ShutdownSendRetryAfter: true,
|
|
||||||
},
|
|
||||||
addWithRetryAfterFilterExpected: true,
|
|
||||||
sendRetryAfterExpected: false,
|
|
||||||
retryAfterParamsExpected: nil,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "startup-send-retry-after-until-ready is enabled, the apserver is not ready yet",
|
|
||||||
config: &Config{
|
|
||||||
lifecycleSignals: newLifecycleSignals(),
|
|
||||||
StartupSendRetryAfterUntilReady: true,
|
|
||||||
ShutdownSendRetryAfter: false,
|
|
||||||
},
|
|
||||||
addWithRetryAfterFilterExpected: true,
|
|
||||||
sendRetryAfterExpected: true,
|
|
||||||
retryAfterParamsExpected: &genericfilters.RetryAfterParams{
|
|
||||||
TearDownConnection: false,
|
|
||||||
Message: "The apiserver hasn't been fully initialized yet, please try again later.",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "startup-send-retry-after-until-ready is enabled, the apserver is ready",
|
|
||||||
config: func() *Config {
|
|
||||||
c := &Config{
|
|
||||||
lifecycleSignals: newLifecycleSignals(),
|
|
||||||
StartupSendRetryAfterUntilReady: true,
|
|
||||||
ShutdownSendRetryAfter: false,
|
|
||||||
}
|
|
||||||
c.lifecycleSignals.HasBeenReady.Signal()
|
|
||||||
return c
|
|
||||||
}(),
|
|
||||||
addWithRetryAfterFilterExpected: true,
|
|
||||||
sendRetryAfterExpected: false,
|
|
||||||
retryAfterParamsExpected: nil,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "both shutdown-send-retry-after is enabled and startup-send-retry-after-until-ready are enabled, the apserver is not ready",
|
|
||||||
config: &Config{
|
|
||||||
lifecycleSignals: newLifecycleSignals(),
|
|
||||||
StartupSendRetryAfterUntilReady: true,
|
|
||||||
ShutdownSendRetryAfter: true,
|
|
||||||
},
|
|
||||||
addWithRetryAfterFilterExpected: true,
|
|
||||||
sendRetryAfterExpected: true,
|
|
||||||
retryAfterParamsExpected: &genericfilters.RetryAfterParams{
|
|
||||||
TearDownConnection: false,
|
|
||||||
Message: "The apiserver hasn't been fully initialized yet, please try again later.",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "both shutdown-send-retry-after is enabled and startup-send-retry-after-until-ready are enabled, the apserver is ready",
|
|
||||||
config: func() *Config {
|
|
||||||
c := &Config{
|
|
||||||
lifecycleSignals: newLifecycleSignals(),
|
|
||||||
StartupSendRetryAfterUntilReady: true,
|
|
||||||
ShutdownSendRetryAfter: true,
|
|
||||||
}
|
|
||||||
c.lifecycleSignals.HasBeenReady.Signal()
|
|
||||||
return c
|
|
||||||
}(),
|
|
||||||
addWithRetryAfterFilterExpected: true,
|
|
||||||
sendRetryAfterExpected: false,
|
|
||||||
retryAfterParamsExpected: nil,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "both shutdown-send-retry-after is enabled and startup-send-retry-after-until-ready are enabled, the apserver is shutting down",
|
|
||||||
config: func() *Config {
|
|
||||||
c := &Config{
|
|
||||||
lifecycleSignals: newLifecycleSignals(),
|
|
||||||
StartupSendRetryAfterUntilReady: true,
|
|
||||||
ShutdownSendRetryAfter: true,
|
|
||||||
}
|
|
||||||
c.lifecycleSignals.HasBeenReady.Signal()
|
|
||||||
c.lifecycleSignals.AfterShutdownDelayDuration.Signal()
|
|
||||||
return c
|
|
||||||
}(),
|
|
||||||
addWithRetryAfterFilterExpected: true,
|
|
||||||
sendRetryAfterExpected: true,
|
|
||||||
retryAfterParamsExpected: &genericfilters.RetryAfterParams{
|
|
||||||
TearDownConnection: true,
|
|
||||||
Message: "The apiserver is shutting down, please try again later.",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, test := range tests {
|
|
||||||
t.Run(test.name, func(t *testing.T) {
|
|
||||||
shouldRespondWithRetryAfterFn := test.config.shouldAddWithRetryAfterFilter()
|
|
||||||
|
|
||||||
// we need to sleep some time to allow the goroutine launched by
|
|
||||||
// shouldAddWithRetryAfterFilter to finish
|
|
||||||
time.Sleep(100 * time.Millisecond)
|
|
||||||
|
|
||||||
if test.addWithRetryAfterFilterExpected != (shouldRespondWithRetryAfterFn != nil) {
|
|
||||||
t.Errorf("Expected add WithRetryAfter: %t, but got: %t", test.addWithRetryAfterFilterExpected, shouldRespondWithRetryAfterFn != nil)
|
|
||||||
}
|
|
||||||
if !test.addWithRetryAfterFilterExpected {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
paramsGot, sendRetryAfterGot := shouldRespondWithRetryAfterFn()
|
|
||||||
if test.sendRetryAfterExpected != sendRetryAfterGot {
|
|
||||||
t.Errorf("Expected send retry-after: %t, but got: %t", test.sendRetryAfterExpected, sendRetryAfterGot)
|
|
||||||
}
|
|
||||||
if !reflect.DeepEqual(test.retryAfterParamsExpected, paramsGot) {
|
|
||||||
t.Errorf("Expected retry-after params to match, diff: %s", cmp.Diff(test.retryAfterParamsExpected, paramsGot))
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
type testBackend struct {
|
type testBackend struct {
|
||||||
events []*auditinternal.Event
|
events []*auditinternal.Event
|
||||||
|
|
||||||
|
@ -36,8 +36,8 @@ var (
|
|||||||
// with a Retry-After response, otherwise it returns false.
|
// with a Retry-After response, otherwise it returns false.
|
||||||
type isRequestExemptFunc func(*http.Request) bool
|
type isRequestExemptFunc func(*http.Request) bool
|
||||||
|
|
||||||
// RetryAfterParams dictates how the Retry-After response is constructed
|
// retryAfterParams dictates how the Retry-After response is constructed
|
||||||
type RetryAfterParams struct {
|
type retryAfterParams struct {
|
||||||
// TearDownConnection is true when we should send a 'Connection: close'
|
// TearDownConnection is true when we should send a 'Connection: close'
|
||||||
// header in the response so net/http can tear down the TCP connection.
|
// header in the response so net/http can tear down the TCP connection.
|
||||||
TearDownConnection bool
|
TearDownConnection bool
|
||||||
@ -46,11 +46,11 @@ type RetryAfterParams struct {
|
|||||||
Message string
|
Message string
|
||||||
}
|
}
|
||||||
|
|
||||||
// ShouldRespondWithRetryAfterFunc returns true if the requests should
|
// shouldRespondWithRetryAfterFunc returns true if the requests should
|
||||||
// be rejected with a Retry-After response once certain conditions are met.
|
// be rejected with a Retry-After response once certain conditions are met.
|
||||||
// The RetryAfterParams returned contains instructions on how to
|
// The retryAfterParams returned contains instructions on how to
|
||||||
// construct the Retry-After response.
|
// construct the Retry-After response.
|
||||||
type ShouldRespondWithRetryAfterFunc func() (*RetryAfterParams, bool)
|
type shouldRespondWithRetryAfterFunc func() (*retryAfterParams, bool)
|
||||||
|
|
||||||
// WithRetryAfter rejects any incoming new request(s) with a 429
|
// WithRetryAfter rejects any incoming new request(s) with a 429
|
||||||
// if the specified shutdownDelayDurationElapsedFn channel is closed
|
// if the specified shutdownDelayDurationElapsedFn channel is closed
|
||||||
@ -62,13 +62,25 @@ type ShouldRespondWithRetryAfterFunc func() (*RetryAfterParams, bool)
|
|||||||
// - 'Connection: close': tear down the TCP connection
|
// - 'Connection: close': tear down the TCP connection
|
||||||
//
|
//
|
||||||
// TODO: is there a way to merge WithWaitGroup and this filter?
|
// TODO: is there a way to merge WithWaitGroup and this filter?
|
||||||
func WithRetryAfter(handler http.Handler, when ShouldRespondWithRetryAfterFunc) http.Handler {
|
func WithRetryAfter(handler http.Handler, shutdownDelayDurationElapsedCh <-chan struct{}) http.Handler {
|
||||||
|
shutdownRetryAfterParams := &retryAfterParams{
|
||||||
|
TearDownConnection: true,
|
||||||
|
Message: "The apiserver is shutting down, please try again later.",
|
||||||
|
}
|
||||||
|
|
||||||
// NOTE: both WithRetryAfter and WithWaitGroup must use the same exact isRequestExemptFunc 'isRequestExemptFromRetryAfter,
|
// NOTE: both WithRetryAfter and WithWaitGroup must use the same exact isRequestExemptFunc 'isRequestExemptFromRetryAfter,
|
||||||
// otherwise SafeWaitGroup might wait indefinitely and will prevent the server from shutting down gracefully.
|
// otherwise SafeWaitGroup might wait indefinitely and will prevent the server from shutting down gracefully.
|
||||||
return withRetryAfter(handler, isRequestExemptFromRetryAfter, when)
|
return withRetryAfter(handler, isRequestExemptFromRetryAfter, func() (*retryAfterParams, bool) {
|
||||||
|
select {
|
||||||
|
case <-shutdownDelayDurationElapsedCh:
|
||||||
|
return shutdownRetryAfterParams, true
|
||||||
|
default:
|
||||||
|
return nil, false
|
||||||
|
}
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func withRetryAfter(handler http.Handler, isRequestExemptFn isRequestExemptFunc, shouldRespondWithRetryAfterFn ShouldRespondWithRetryAfterFunc) http.Handler {
|
func withRetryAfter(handler http.Handler, isRequestExemptFn isRequestExemptFunc, shouldRespondWithRetryAfterFn shouldRespondWithRetryAfterFunc) http.Handler {
|
||||||
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
|
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
|
||||||
params, send := shouldRespondWithRetryAfterFn()
|
params, send := shouldRespondWithRetryAfterFn()
|
||||||
if !send || isRequestExemptFn(req) {
|
if !send || isRequestExemptFn(req) {
|
||||||
|
@ -27,20 +27,20 @@ import (
|
|||||||
|
|
||||||
func TestWithRetryAfter(t *testing.T) {
|
func TestWithRetryAfter(t *testing.T) {
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
name string
|
name string
|
||||||
when ShouldRespondWithRetryAfterFunc
|
shutdownDelayDurationElapsedFn func() <-chan struct{}
|
||||||
requestURL string
|
requestURL string
|
||||||
userAgent string
|
userAgent string
|
||||||
safeWaitGroupIsWaiting bool
|
safeWaitGroupIsWaiting bool
|
||||||
handlerInvoked int
|
handlerInvoked int
|
||||||
closeExpected string
|
closeExpected string
|
||||||
retryAfterExpected string
|
retryAfterExpected string
|
||||||
statusCodeExpected int
|
statusCodeExpected int
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
name: "retry-after disabled",
|
name: "retry-after disabled",
|
||||||
when: func() (*RetryAfterParams, bool) {
|
shutdownDelayDurationElapsedFn: func() <-chan struct{} {
|
||||||
return nil, false
|
return newChannel(false)
|
||||||
},
|
},
|
||||||
requestURL: "/api/v1/namespaces",
|
requestURL: "/api/v1/namespaces",
|
||||||
userAgent: "foo",
|
userAgent: "foo",
|
||||||
@ -51,11 +51,8 @@ func TestWithRetryAfter(t *testing.T) {
|
|||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "retry-after enabled, request is not exempt",
|
name: "retry-after enabled, request is not exempt",
|
||||||
when: func() (*RetryAfterParams, bool) {
|
shutdownDelayDurationElapsedFn: func() <-chan struct{} {
|
||||||
return &RetryAfterParams{
|
return newChannel(true)
|
||||||
TearDownConnection: true,
|
|
||||||
Message: "The apiserver is shutting down, please try again later.",
|
|
||||||
}, true
|
|
||||||
},
|
},
|
||||||
requestURL: "/api/v1/namespaces",
|
requestURL: "/api/v1/namespaces",
|
||||||
userAgent: "foo",
|
userAgent: "foo",
|
||||||
@ -64,28 +61,10 @@ func TestWithRetryAfter(t *testing.T) {
|
|||||||
retryAfterExpected: "5",
|
retryAfterExpected: "5",
|
||||||
statusCodeExpected: http.StatusTooManyRequests,
|
statusCodeExpected: http.StatusTooManyRequests,
|
||||||
},
|
},
|
||||||
{
|
|
||||||
name: "retry-after enabled, request is not exempt, no connection tear down",
|
|
||||||
when: func() (*RetryAfterParams, bool) {
|
|
||||||
return &RetryAfterParams{
|
|
||||||
TearDownConnection: false,
|
|
||||||
Message: "The apiserver is shutting down, please try again later.",
|
|
||||||
}, true
|
|
||||||
},
|
|
||||||
requestURL: "/api/v1/namespaces",
|
|
||||||
userAgent: "foo",
|
|
||||||
handlerInvoked: 0,
|
|
||||||
closeExpected: "",
|
|
||||||
retryAfterExpected: "5",
|
|
||||||
statusCodeExpected: http.StatusTooManyRequests,
|
|
||||||
},
|
|
||||||
{
|
{
|
||||||
name: "retry-after enabled, request is exempt(/metrics)",
|
name: "retry-after enabled, request is exempt(/metrics)",
|
||||||
when: func() (*RetryAfterParams, bool) {
|
shutdownDelayDurationElapsedFn: func() <-chan struct{} {
|
||||||
return &RetryAfterParams{
|
return newChannel(true)
|
||||||
TearDownConnection: true,
|
|
||||||
Message: "The apiserver is shutting down, please try again later.",
|
|
||||||
}, true
|
|
||||||
},
|
},
|
||||||
requestURL: "/metrics?foo=bar",
|
requestURL: "/metrics?foo=bar",
|
||||||
userAgent: "foo",
|
userAgent: "foo",
|
||||||
@ -96,11 +75,8 @@ func TestWithRetryAfter(t *testing.T) {
|
|||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "retry-after enabled, request is exempt(/livez)",
|
name: "retry-after enabled, request is exempt(/livez)",
|
||||||
when: func() (*RetryAfterParams, bool) {
|
shutdownDelayDurationElapsedFn: func() <-chan struct{} {
|
||||||
return &RetryAfterParams{
|
return newChannel(true)
|
||||||
TearDownConnection: true,
|
|
||||||
Message: "The apiserver is shutting down, please try again later.",
|
|
||||||
}, true
|
|
||||||
},
|
},
|
||||||
requestURL: "/livez?verbose",
|
requestURL: "/livez?verbose",
|
||||||
userAgent: "foo",
|
userAgent: "foo",
|
||||||
@ -111,11 +87,8 @@ func TestWithRetryAfter(t *testing.T) {
|
|||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "retry-after enabled, request is exempt(/readyz)",
|
name: "retry-after enabled, request is exempt(/readyz)",
|
||||||
when: func() (*RetryAfterParams, bool) {
|
shutdownDelayDurationElapsedFn: func() <-chan struct{} {
|
||||||
return &RetryAfterParams{
|
return newChannel(true)
|
||||||
TearDownConnection: true,
|
|
||||||
Message: "The apiserver is shutting down, please try again later.",
|
|
||||||
}, true
|
|
||||||
},
|
},
|
||||||
requestURL: "/readyz?verbose",
|
requestURL: "/readyz?verbose",
|
||||||
userAgent: "foo",
|
userAgent: "foo",
|
||||||
@ -126,11 +99,8 @@ func TestWithRetryAfter(t *testing.T) {
|
|||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "retry-after enabled, request is exempt(/healthz)",
|
name: "retry-after enabled, request is exempt(/healthz)",
|
||||||
when: func() (*RetryAfterParams, bool) {
|
shutdownDelayDurationElapsedFn: func() <-chan struct{} {
|
||||||
return &RetryAfterParams{
|
return newChannel(true)
|
||||||
TearDownConnection: true,
|
|
||||||
Message: "The apiserver is shutting down, please try again later.",
|
|
||||||
}, true
|
|
||||||
},
|
},
|
||||||
requestURL: "/healthz?verbose",
|
requestURL: "/healthz?verbose",
|
||||||
userAgent: "foo",
|
userAgent: "foo",
|
||||||
@ -141,11 +111,8 @@ func TestWithRetryAfter(t *testing.T) {
|
|||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "retry-after enabled, request is exempt(local loopback)",
|
name: "retry-after enabled, request is exempt(local loopback)",
|
||||||
when: func() (*RetryAfterParams, bool) {
|
shutdownDelayDurationElapsedFn: func() <-chan struct{} {
|
||||||
return &RetryAfterParams{
|
return newChannel(true)
|
||||||
TearDownConnection: true,
|
|
||||||
Message: "The apiserver is shutting down, please try again later.",
|
|
||||||
}, true
|
|
||||||
},
|
},
|
||||||
requestURL: "/api/v1/namespaces",
|
requestURL: "/api/v1/namespaces",
|
||||||
userAgent: "kube-apiserver/",
|
userAgent: "kube-apiserver/",
|
||||||
@ -154,13 +121,22 @@ func TestWithRetryAfter(t *testing.T) {
|
|||||||
retryAfterExpected: "",
|
retryAfterExpected: "",
|
||||||
statusCodeExpected: http.StatusOK,
|
statusCodeExpected: http.StatusOK,
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
name: "nil channel",
|
||||||
|
shutdownDelayDurationElapsedFn: func() <-chan struct{} {
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
requestURL: "/api/v1/namespaces",
|
||||||
|
userAgent: "foo",
|
||||||
|
handlerInvoked: 1,
|
||||||
|
closeExpected: "",
|
||||||
|
retryAfterExpected: "",
|
||||||
|
statusCodeExpected: http.StatusOK,
|
||||||
|
},
|
||||||
{
|
{
|
||||||
name: "retry-after enabled, request is exempt(/readyz), SafeWaitGroup is in waiting mode",
|
name: "retry-after enabled, request is exempt(/readyz), SafeWaitGroup is in waiting mode",
|
||||||
when: func() (*RetryAfterParams, bool) {
|
shutdownDelayDurationElapsedFn: func() <-chan struct{} {
|
||||||
return &RetryAfterParams{
|
return newChannel(true)
|
||||||
TearDownConnection: true,
|
|
||||||
Message: "The apiserver is shutting down, please try again later.",
|
|
||||||
}, true
|
|
||||||
},
|
},
|
||||||
requestURL: "/readyz?verbose",
|
requestURL: "/readyz?verbose",
|
||||||
userAgent: "foo",
|
userAgent: "foo",
|
||||||
@ -189,7 +165,7 @@ func TestWithRetryAfter(t *testing.T) {
|
|||||||
wrapped := WithWaitGroup(handler, func(*http.Request, *apirequest.RequestInfo) bool {
|
wrapped := WithWaitGroup(handler, func(*http.Request, *apirequest.RequestInfo) bool {
|
||||||
return false
|
return false
|
||||||
}, safeWG)
|
}, safeWG)
|
||||||
wrapped = WithRetryAfter(wrapped, test.when)
|
wrapped = WithRetryAfter(wrapped, test.shutdownDelayDurationElapsedFn())
|
||||||
|
|
||||||
req, err := http.NewRequest(http.MethodGet, test.requestURL, nil)
|
req, err := http.NewRequest(http.MethodGet, test.requestURL, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -221,3 +197,11 @@ func TestWithRetryAfter(t *testing.T) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func newChannel(closed bool) <-chan struct{} {
|
||||||
|
ch := make(chan struct{})
|
||||||
|
if closed {
|
||||||
|
close(ch)
|
||||||
|
}
|
||||||
|
return ch
|
||||||
|
}
|
||||||
|
@ -518,8 +518,8 @@ func newGenericAPIServer(t *testing.T, keepListening bool) *GenericAPIServer {
|
|||||||
config.ShutdownSendRetryAfter = keepListening
|
config.ShutdownSendRetryAfter = keepListening
|
||||||
config.BuildHandlerChainFunc = func(apiHandler http.Handler, c *Config) http.Handler {
|
config.BuildHandlerChainFunc = func(apiHandler http.Handler, c *Config) http.Handler {
|
||||||
handler := genericfilters.WithWaitGroup(apiHandler, c.LongRunningFunc, c.HandlerChainWaitGroup)
|
handler := genericfilters.WithWaitGroup(apiHandler, c.LongRunningFunc, c.HandlerChainWaitGroup)
|
||||||
if shouldRetryAfterFn := c.shouldAddWithRetryAfterFilter(); shouldRetryAfterFn != nil {
|
if c.ShutdownSendRetryAfter {
|
||||||
handler = genericfilters.WithRetryAfter(handler, shouldRetryAfterFn)
|
handler = genericfilters.WithRetryAfter(handler, c.lifecycleSignals.AfterShutdownDelayDuration.Signaled())
|
||||||
}
|
}
|
||||||
handler = genericapifilters.WithRequestInfo(handler, c.RequestInfoResolver)
|
handler = genericapifilters.WithRequestInfo(handler, c.RequestInfoResolver)
|
||||||
return handler
|
return handler
|
||||||
|
@ -63,31 +63,21 @@ type ServerRunOptions struct {
|
|||||||
// If enabled, after ShutdownDelayDuration elapses, any incoming request is
|
// If enabled, after ShutdownDelayDuration elapses, any incoming request is
|
||||||
// rejected with a 429 status code and a 'Retry-After' response.
|
// rejected with a 429 status code and a 'Retry-After' response.
|
||||||
ShutdownSendRetryAfter bool
|
ShutdownSendRetryAfter bool
|
||||||
|
|
||||||
// StartupSendRetryAfterUntilReady once set will reject incoming requests with
|
|
||||||
// a 429 status code and a 'Retry-After' response header until the apiserver
|
|
||||||
// hasn't fully initialized.
|
|
||||||
// This option ensures that the system stays consistent even when requests
|
|
||||||
// are received before the server has been initialized.
|
|
||||||
// In particular, it prevents child deletion in case of GC or/and orphaned
|
|
||||||
// content in case of the namespaces controller.
|
|
||||||
StartupSendRetryAfterUntilReady bool
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewServerRunOptions() *ServerRunOptions {
|
func NewServerRunOptions() *ServerRunOptions {
|
||||||
defaults := server.NewConfig(serializer.CodecFactory{})
|
defaults := server.NewConfig(serializer.CodecFactory{})
|
||||||
return &ServerRunOptions{
|
return &ServerRunOptions{
|
||||||
MaxRequestsInFlight: defaults.MaxRequestsInFlight,
|
MaxRequestsInFlight: defaults.MaxRequestsInFlight,
|
||||||
MaxMutatingRequestsInFlight: defaults.MaxMutatingRequestsInFlight,
|
MaxMutatingRequestsInFlight: defaults.MaxMutatingRequestsInFlight,
|
||||||
RequestTimeout: defaults.RequestTimeout,
|
RequestTimeout: defaults.RequestTimeout,
|
||||||
LivezGracePeriod: defaults.LivezGracePeriod,
|
LivezGracePeriod: defaults.LivezGracePeriod,
|
||||||
MinRequestTimeout: defaults.MinRequestTimeout,
|
MinRequestTimeout: defaults.MinRequestTimeout,
|
||||||
ShutdownDelayDuration: defaults.ShutdownDelayDuration,
|
ShutdownDelayDuration: defaults.ShutdownDelayDuration,
|
||||||
JSONPatchMaxCopyBytes: defaults.JSONPatchMaxCopyBytes,
|
JSONPatchMaxCopyBytes: defaults.JSONPatchMaxCopyBytes,
|
||||||
MaxRequestBodyBytes: defaults.MaxRequestBodyBytes,
|
MaxRequestBodyBytes: defaults.MaxRequestBodyBytes,
|
||||||
EnablePriorityAndFairness: true,
|
EnablePriorityAndFairness: true,
|
||||||
ShutdownSendRetryAfter: false,
|
ShutdownSendRetryAfter: false,
|
||||||
StartupSendRetryAfterUntilReady: false,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -107,7 +97,6 @@ func (s *ServerRunOptions) ApplyTo(c *server.Config) error {
|
|||||||
c.MaxRequestBodyBytes = s.MaxRequestBodyBytes
|
c.MaxRequestBodyBytes = s.MaxRequestBodyBytes
|
||||||
c.PublicAddress = s.AdvertiseAddress
|
c.PublicAddress = s.AdvertiseAddress
|
||||||
c.ShutdownSendRetryAfter = s.ShutdownSendRetryAfter
|
c.ShutdownSendRetryAfter = s.ShutdownSendRetryAfter
|
||||||
c.StartupSendRetryAfterUntilReady = s.StartupSendRetryAfterUntilReady
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -272,10 +261,5 @@ func (s *ServerRunOptions) AddUniversalFlags(fs *pflag.FlagSet) {
|
|||||||
"during this window all incoming requests will be rejected with a status code 429 and a 'Retry-After' response header, "+
|
"during this window all incoming requests will be rejected with a status code 429 and a 'Retry-After' response header, "+
|
||||||
"in addition 'Connection: close' response header is set in order to tear down the TCP connection when idle.")
|
"in addition 'Connection: close' response header is set in order to tear down the TCP connection when idle.")
|
||||||
|
|
||||||
fs.BoolVar(&s.StartupSendRetryAfterUntilReady, "startup-send-retry-after-until-ready", s.ShutdownSendRetryAfter, ""+
|
|
||||||
"If true, incoming request(s) will be rejected with a '429' status code and a 'Retry-After' response header "+
|
|
||||||
"until the apiserver has initialized. This option ensures that the system stays consistent even when requests "+
|
|
||||||
"arrive at the server before it has been initialized.")
|
|
||||||
|
|
||||||
utilfeature.DefaultMutableFeatureGate.AddFlag(fs)
|
utilfeature.DefaultMutableFeatureGate.AddFlag(fs)
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user