mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-22 11:21:47 +00:00
apiserver: make watch termination during shutdown configurable
This commit is contained in:
parent
6385b86a9b
commit
791fcd6fb4
@ -276,6 +276,23 @@ type Config struct {
|
|||||||
|
|
||||||
// AggregatedDiscoveryGroupManager serves /apis in an aggregated form.
|
// AggregatedDiscoveryGroupManager serves /apis in an aggregated form.
|
||||||
AggregatedDiscoveryGroupManager discoveryendpoint.ResourceManager
|
AggregatedDiscoveryGroupManager discoveryendpoint.ResourceManager
|
||||||
|
|
||||||
|
// ShutdownWatchTerminationGracePeriod, if set to a positive value,
|
||||||
|
// is the maximum duration the apiserver will wait for all active
|
||||||
|
// watch request(s) to drain.
|
||||||
|
// Once this grace period elapses, the apiserver will no longer
|
||||||
|
// wait for any active watch request(s) in flight to drain, it will
|
||||||
|
// proceed to the next step in the graceful server shutdown process.
|
||||||
|
// If set to a positive value, the apiserver will keep track of the
|
||||||
|
// number of active watch request(s) in flight and during shutdown
|
||||||
|
// it will wait, at most, for the specified duration and allow these
|
||||||
|
// active watch requests to drain with some rate limiting in effect.
|
||||||
|
// The default is zero, which implies the apiserver will not keep
|
||||||
|
// track of active watch request(s) in flight and will not wait
|
||||||
|
// for them to drain, this maintains backward compatibility.
|
||||||
|
// This grace period is orthogonal to other grace periods, and
|
||||||
|
// it is not overridden by any other grace period.
|
||||||
|
ShutdownWatchTerminationGracePeriod time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
type RecommendedConfig struct {
|
type RecommendedConfig struct {
|
||||||
@ -413,9 +430,10 @@ func NewConfig(codecs serializer.CodecFactory) *Config {
|
|||||||
|
|
||||||
// Default to treating watch as a long-running operation
|
// Default to treating watch as a long-running operation
|
||||||
// Generic API servers have no inherent long-running subresources
|
// Generic API servers have no inherent long-running subresources
|
||||||
LongRunningFunc: genericfilters.BasicLongRunningRequestCheck(sets.NewString("watch"), sets.NewString()),
|
LongRunningFunc: genericfilters.BasicLongRunningRequestCheck(sets.NewString("watch"), sets.NewString()),
|
||||||
lifecycleSignals: lifecycleSignals,
|
lifecycleSignals: lifecycleSignals,
|
||||||
StorageObjectCountTracker: flowcontrolrequest.NewStorageObjectCountTracker(),
|
StorageObjectCountTracker: flowcontrolrequest.NewStorageObjectCountTracker(),
|
||||||
|
ShutdownWatchTerminationGracePeriod: time.Duration(0),
|
||||||
|
|
||||||
APIServerID: id,
|
APIServerID: id,
|
||||||
StorageVersionManager: storageversion.NewDefaultManager(),
|
StorageVersionManager: storageversion.NewDefaultManager(),
|
||||||
@ -681,11 +699,12 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G
|
|||||||
|
|
||||||
listedPathProvider: apiServerHandler,
|
listedPathProvider: apiServerHandler,
|
||||||
|
|
||||||
minRequestTimeout: time.Duration(c.MinRequestTimeout) * time.Second,
|
minRequestTimeout: time.Duration(c.MinRequestTimeout) * time.Second,
|
||||||
ShutdownTimeout: c.RequestTimeout,
|
ShutdownTimeout: c.RequestTimeout,
|
||||||
ShutdownDelayDuration: c.ShutdownDelayDuration,
|
ShutdownDelayDuration: c.ShutdownDelayDuration,
|
||||||
SecureServingInfo: c.SecureServing,
|
ShutdownWatchTerminationGracePeriod: c.ShutdownWatchTerminationGracePeriod,
|
||||||
ExternalAddress: c.ExternalAddress,
|
SecureServingInfo: c.SecureServing,
|
||||||
|
ExternalAddress: c.ExternalAddress,
|
||||||
|
|
||||||
openAPIConfig: c.OpenAPIConfig,
|
openAPIConfig: c.OpenAPIConfig,
|
||||||
openAPIV3Config: c.OpenAPIV3Config,
|
openAPIV3Config: c.OpenAPIV3Config,
|
||||||
@ -913,7 +932,9 @@ func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler {
|
|||||||
handler = genericapifilters.WithRequestDeadline(handler, c.AuditBackend, c.AuditPolicyRuleEvaluator,
|
handler = genericapifilters.WithRequestDeadline(handler, c.AuditBackend, c.AuditPolicyRuleEvaluator,
|
||||||
c.LongRunningFunc, c.Serializer, c.RequestTimeout)
|
c.LongRunningFunc, c.Serializer, c.RequestTimeout)
|
||||||
handler = genericfilters.WithWaitGroup(handler, c.LongRunningFunc, c.NonLongRunningRequestWaitGroup)
|
handler = genericfilters.WithWaitGroup(handler, c.LongRunningFunc, c.NonLongRunningRequestWaitGroup)
|
||||||
handler = genericfilters.WithWatchTerminationDuringShutdown(handler, c.lifecycleSignals, c.WatchRequestWaitGroup)
|
if c.ShutdownWatchTerminationGracePeriod > 0 {
|
||||||
|
handler = genericfilters.WithWatchTerminationDuringShutdown(handler, c.lifecycleSignals, c.WatchRequestWaitGroup)
|
||||||
|
}
|
||||||
if c.SecureServing != nil && !c.SecureServing.DisableHTTP2 && c.GoawayChance > 0 {
|
if c.SecureServing != nil && !c.SecureServing.DisableHTTP2 && c.GoawayChance > 0 {
|
||||||
handler = genericfilters.WithProbabilisticGoaway(handler, c.GoawayChance)
|
handler = genericfilters.WithProbabilisticGoaway(handler, c.GoawayChance)
|
||||||
}
|
}
|
||||||
|
@ -266,6 +266,23 @@ type GenericAPIServer struct {
|
|||||||
// If enabled, after ShutdownDelayDuration elapses, any incoming request is
|
// If enabled, after ShutdownDelayDuration elapses, any incoming request is
|
||||||
// rejected with a 429 status code and a 'Retry-After' response.
|
// rejected with a 429 status code and a 'Retry-After' response.
|
||||||
ShutdownSendRetryAfter bool
|
ShutdownSendRetryAfter bool
|
||||||
|
|
||||||
|
// ShutdownWatchTerminationGracePeriod, if set to a positive value,
|
||||||
|
// is the maximum duration the apiserver will wait for all active
|
||||||
|
// watch request(s) to drain.
|
||||||
|
// Once this grace period elapses, the apiserver will no longer
|
||||||
|
// wait for any active watch request(s) in flight to drain, it will
|
||||||
|
// proceed to the next step in the graceful server shutdown process.
|
||||||
|
// If set to a positive value, the apiserver will keep track of the
|
||||||
|
// number of active watch request(s) in flight and during shutdown
|
||||||
|
// it will wait, at most, for the specified duration and allow these
|
||||||
|
// active watch requests to drain with some rate limiting in effect.
|
||||||
|
// The default is zero, which implies the apiserver will not keep
|
||||||
|
// track of active watch request(s) in flight and will not wait
|
||||||
|
// for them to drain, this maintains backward compatibility.
|
||||||
|
// This grace period is orthogonal to other grace periods, and
|
||||||
|
// it is not overridden by any other grace period.
|
||||||
|
ShutdownWatchTerminationGracePeriod time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
// DelegationTarget is an interface which allows for composition of API servers with top level handling that works
|
// DelegationTarget is an interface which allows for composition of API servers with top level handling that works
|
||||||
@ -617,10 +634,13 @@ func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error {
|
|||||||
defer close(activeWatchesDrainedCh)
|
defer close(activeWatchesDrainedCh)
|
||||||
|
|
||||||
<-notAcceptingNewRequestCh.Signaled()
|
<-notAcceptingNewRequestCh.Signaled()
|
||||||
|
if s.ShutdownWatchTerminationGracePeriod <= time.Duration(0) {
|
||||||
|
klog.V(1).InfoS("[graceful-termination] not going to wait for active watch request(s) to drain")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
// Wait for all active watches to finish
|
// Wait for all active watches to finish
|
||||||
// TODO(tkashem): make the grace period configurable
|
grace := s.ShutdownWatchTerminationGracePeriod
|
||||||
grace := 10 * time.Second
|
|
||||||
activeBefore, activeAfter, err := s.WatchRequestWaitGroup.Wait(func(count int) (utilwaitgroup.RateLimiter, context.Context, context.CancelFunc) {
|
activeBefore, activeAfter, err := s.WatchRequestWaitGroup.Wait(func(count int) (utilwaitgroup.RateLimiter, context.Context, context.CancelFunc) {
|
||||||
qps := float64(count) / grace.Seconds()
|
qps := float64(count) / grace.Seconds()
|
||||||
// TODO: we don't want the QPS (max requests drained per second) to
|
// TODO: we don't want the QPS (max requests drained per second) to
|
||||||
|
@ -1011,6 +1011,8 @@ func newGenericAPIServer(t *testing.T, fAudit *fakeAudit, keepListening bool) *G
|
|||||||
config, _ := setUp(t)
|
config, _ := setUp(t)
|
||||||
config.ShutdownDelayDuration = 100 * time.Millisecond
|
config.ShutdownDelayDuration = 100 * time.Millisecond
|
||||||
config.ShutdownSendRetryAfter = keepListening
|
config.ShutdownSendRetryAfter = keepListening
|
||||||
|
// we enable watch draining, any positive value will do that
|
||||||
|
config.ShutdownWatchTerminationGracePeriod = 2 * time.Second
|
||||||
config.AuditPolicyRuleEvaluator = fAudit
|
config.AuditPolicyRuleEvaluator = fAudit
|
||||||
config.AuditBackend = fAudit
|
config.AuditBackend = fAudit
|
||||||
|
|
||||||
|
@ -73,21 +73,39 @@ type ServerRunOptions struct {
|
|||||||
// If enabled, after ShutdownDelayDuration elapses, any incoming request is
|
// If enabled, after ShutdownDelayDuration elapses, any incoming request is
|
||||||
// rejected with a 429 status code and a 'Retry-After' response.
|
// rejected with a 429 status code and a 'Retry-After' response.
|
||||||
ShutdownSendRetryAfter bool
|
ShutdownSendRetryAfter bool
|
||||||
|
|
||||||
|
// ShutdownWatchTerminationGracePeriod, if set to a positive value,
|
||||||
|
// is the maximum duration the apiserver will wait for all active
|
||||||
|
// watch request(s) to drain.
|
||||||
|
// Once this grace period elapses, the apiserver will no longer
|
||||||
|
// wait for any active watch request(s) in flight to drain, it will
|
||||||
|
// proceed to the next step in the graceful server shutdown process.
|
||||||
|
// If set to a positive value, the apiserver will keep track of the
|
||||||
|
// number of active watch request(s) in flight and during shutdown
|
||||||
|
// it will wait, at most, for the specified duration and allow these
|
||||||
|
// active watch requests to drain with some rate limiting in effect.
|
||||||
|
// The default is zero, which implies the apiserver will not keep
|
||||||
|
// track of active watch request(s) in flight and will not wait
|
||||||
|
// for them to drain, this maintains backward compatibility.
|
||||||
|
// This grace period is orthogonal to other grace periods, and
|
||||||
|
// it is not overridden by any other grace period.
|
||||||
|
ShutdownWatchTerminationGracePeriod time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewServerRunOptions() *ServerRunOptions {
|
func NewServerRunOptions() *ServerRunOptions {
|
||||||
defaults := server.NewConfig(serializer.CodecFactory{})
|
defaults := server.NewConfig(serializer.CodecFactory{})
|
||||||
return &ServerRunOptions{
|
return &ServerRunOptions{
|
||||||
MaxRequestsInFlight: defaults.MaxRequestsInFlight,
|
MaxRequestsInFlight: defaults.MaxRequestsInFlight,
|
||||||
MaxMutatingRequestsInFlight: defaults.MaxMutatingRequestsInFlight,
|
MaxMutatingRequestsInFlight: defaults.MaxMutatingRequestsInFlight,
|
||||||
RequestTimeout: defaults.RequestTimeout,
|
RequestTimeout: defaults.RequestTimeout,
|
||||||
LivezGracePeriod: defaults.LivezGracePeriod,
|
LivezGracePeriod: defaults.LivezGracePeriod,
|
||||||
MinRequestTimeout: defaults.MinRequestTimeout,
|
MinRequestTimeout: defaults.MinRequestTimeout,
|
||||||
ShutdownDelayDuration: defaults.ShutdownDelayDuration,
|
ShutdownDelayDuration: defaults.ShutdownDelayDuration,
|
||||||
JSONPatchMaxCopyBytes: defaults.JSONPatchMaxCopyBytes,
|
ShutdownWatchTerminationGracePeriod: defaults.ShutdownWatchTerminationGracePeriod,
|
||||||
MaxRequestBodyBytes: defaults.MaxRequestBodyBytes,
|
JSONPatchMaxCopyBytes: defaults.JSONPatchMaxCopyBytes,
|
||||||
EnablePriorityAndFairness: true,
|
MaxRequestBodyBytes: defaults.MaxRequestBodyBytes,
|
||||||
ShutdownSendRetryAfter: false,
|
EnablePriorityAndFairness: true,
|
||||||
|
ShutdownSendRetryAfter: false,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -107,6 +125,7 @@ func (s *ServerRunOptions) ApplyTo(c *server.Config) error {
|
|||||||
c.MaxRequestBodyBytes = s.MaxRequestBodyBytes
|
c.MaxRequestBodyBytes = s.MaxRequestBodyBytes
|
||||||
c.PublicAddress = s.AdvertiseAddress
|
c.PublicAddress = s.AdvertiseAddress
|
||||||
c.ShutdownSendRetryAfter = s.ShutdownSendRetryAfter
|
c.ShutdownSendRetryAfter = s.ShutdownSendRetryAfter
|
||||||
|
c.ShutdownWatchTerminationGracePeriod = s.ShutdownWatchTerminationGracePeriod
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -160,6 +179,10 @@ func (s *ServerRunOptions) Validate() []error {
|
|||||||
errors = append(errors, fmt.Errorf("--shutdown-delay-duration can not be negative value"))
|
errors = append(errors, fmt.Errorf("--shutdown-delay-duration can not be negative value"))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if s.ShutdownWatchTerminationGracePeriod < 0 {
|
||||||
|
errors = append(errors, fmt.Errorf("shutdown-watch-termination-grace-period, if provided, can not be a negative value"))
|
||||||
|
}
|
||||||
|
|
||||||
if s.JSONPatchMaxCopyBytes < 0 {
|
if s.JSONPatchMaxCopyBytes < 0 {
|
||||||
errors = append(errors, fmt.Errorf("ServerRunOptions.JSONPatchMaxCopyBytes can not be negative value"))
|
errors = append(errors, fmt.Errorf("ServerRunOptions.JSONPatchMaxCopyBytes can not be negative value"))
|
||||||
}
|
}
|
||||||
@ -315,5 +338,9 @@ func (s *ServerRunOptions) AddUniversalFlags(fs *pflag.FlagSet) {
|
|||||||
"during this window all incoming requests will be rejected with a status code 429 and a 'Retry-After' response header, "+
|
"during this window all incoming requests will be rejected with a status code 429 and a 'Retry-After' response header, "+
|
||||||
"in addition 'Connection: close' response header is set in order to tear down the TCP connection when idle.")
|
"in addition 'Connection: close' response header is set in order to tear down the TCP connection when idle.")
|
||||||
|
|
||||||
|
fs.DurationVar(&s.ShutdownWatchTerminationGracePeriod, "shutdown-watch-termination-grace-period", s.ShutdownWatchTerminationGracePeriod, ""+
|
||||||
|
"This option, if set, represents the maximum amount of grace period the apiserver will wait "+
|
||||||
|
"for active watch request(s) to drain during the graceful server shutdown window.")
|
||||||
|
|
||||||
utilfeature.DefaultMutableFeatureGate.AddFlag(fs)
|
utilfeature.DefaultMutableFeatureGate.AddFlag(fs)
|
||||||
}
|
}
|
||||||
|
@ -261,3 +261,52 @@ func TestValidateCorsAllowedOriginList(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestServerRunOptionsWithShutdownWatchTerminationGracePeriod(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
optionsFn func() *ServerRunOptions
|
||||||
|
errShouldContain string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "default should be valid",
|
||||||
|
optionsFn: func() *ServerRunOptions {
|
||||||
|
return NewServerRunOptions()
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "negative not allowed",
|
||||||
|
optionsFn: func() *ServerRunOptions {
|
||||||
|
o := NewServerRunOptions()
|
||||||
|
o.ShutdownWatchTerminationGracePeriod = -time.Second
|
||||||
|
return o
|
||||||
|
},
|
||||||
|
errShouldContain: "shutdown-watch-termination-grace-period, if provided, can not be a negative value",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, test := range tests {
|
||||||
|
t.Run(test.name, func(t *testing.T) {
|
||||||
|
options := test.optionsFn()
|
||||||
|
errsGot := options.Validate()
|
||||||
|
switch {
|
||||||
|
case len(test.errShouldContain) == 0:
|
||||||
|
if len(errsGot) != 0 {
|
||||||
|
t.Errorf("expected no error, but got: %v", errsGot)
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
if len(errsGot) == 0 ||
|
||||||
|
!strings.Contains(utilerrors.NewAggregate(errsGot).Error(), test.errShouldContain) {
|
||||||
|
t.Errorf("expected error to contain: %s, but got: %v", test.errShouldContain, errsGot)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Run("default should be zero", func(t *testing.T) {
|
||||||
|
options := NewServerRunOptions()
|
||||||
|
if options.ShutdownWatchTerminationGracePeriod != time.Duration(0) {
|
||||||
|
t.Errorf("expected default of ShutdownWatchTerminationGracePeriod to be zero, but got: %s", options.ShutdownWatchTerminationGracePeriod)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user