apiserver: refactor graceful termination logic

- refactor graceful termination logic so we can write unit tests
  to assert on the expected behavior.
This commit is contained in:
Abu Kashem 2021-06-24 15:51:38 -04:00
parent 513ae557a3
commit d85619030e
No known key found for this signature in database
GPG Key ID: 33A4FA7088DB68A9
3 changed files with 155 additions and 13 deletions

View File

@ -219,6 +219,11 @@ type Config struct {
// RequestWidthEstimator is used to estimate the "width" of the incoming request(s).
RequestWidthEstimator flowcontrolrequest.WidthEstimatorFunc
// terminationSignals provides access to the various shutdown signals
// that happen during the graceful termination of the apiserver.
// it's intentionally marked private as it should never be overridden.
terminationSignals terminationSignals
//===========================================================================
// values below here are targets for removal
//===========================================================================
@ -343,6 +348,7 @@ func NewConfig(codecs serializer.CodecFactory) *Config {
// Generic API servers have no inherent long-running subresources
LongRunningFunc: genericfilters.BasicLongRunningRequestCheck(sets.NewString("watch"), sets.NewString()),
RequestWidthEstimator: flowcontrolrequest.DefaultWidthEstimator,
terminationSignals: newTerminationSignals(),
APIServerID: id,
StorageVersionManager: storageversion.NewDefaultManager(),
@ -589,7 +595,6 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G
healthzChecks: c.HealthzChecks,
livezChecks: c.LivezChecks,
readyzChecks: c.ReadyzChecks,
readinessStopCh: make(chan struct{}),
livezGracePeriod: c.LivezGracePeriod,
DiscoveryGroupManager: discovery.NewRootAPIsHandler(c.DiscoveryAddresses, c.Serializer),
@ -597,6 +602,8 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G
maxRequestBodyBytes: c.MaxRequestBodyBytes,
livezClock: clock.RealClock{},
terminationSignals: c.terminationSignals,
APIServerID: c.APIServerID,
StorageVersionManager: c.StorageVersionManager,

View File

@ -174,9 +174,6 @@ type GenericAPIServer struct {
readyzChecksInstalled bool
livezGracePeriod time.Duration
livezClock clock.Clock
// the readiness stop channel is used to signal that the apiserver has initiated a shutdown sequence, this
// will cause readyz to return unhealthy.
readinessStopCh chan struct{}
// auditing. The backend is started after the server starts listening.
AuditBackend audit.Backend
@ -213,6 +210,10 @@ type GenericAPIServer struct {
// Version will enable the /version endpoint if non-nil
Version *version.Info
// terminationSignals provides access to the various termination
// signals that happen during the shutdown period of the apiserver.
terminationSignals terminationSignals
}
// DelegationTarget is an interface which allows for composition of API servers with top level handling that works
@ -307,7 +308,10 @@ func (s *GenericAPIServer) PrepareRun() preparedGenericAPIServer {
s.installHealthz()
s.installLivez()
err := s.addReadyzShutdownCheck(s.readinessStopCh)
// as soon as shutdown is initiated, readiness should start failing
readinessStopCh := s.terminationSignals.ShutdownInitiated.Signaled()
err := s.addReadyzShutdownCheck(readinessStopCh)
if err != nil {
klog.Errorf("Failed to install readyz shutdown check %s", err)
}
@ -330,38 +334,40 @@ func (s *GenericAPIServer) PrepareRun() preparedGenericAPIServer {
// Run spawns the secure http server. It only returns if stopCh is closed
// or the secure port cannot be listened on initially.
func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error {
delayedStopCh := make(chan struct{})
delayedStopCh := s.terminationSignals.AfterShutdownDelayDuration
shutdownInitiatedCh := s.terminationSignals.ShutdownInitiated
go func() {
defer close(delayedStopCh)
defer delayedStopCh.Signal()
<-stopCh
// As soon as shutdown is initiated, /readyz should start returning failure.
// This gives the load balancer a window defined by ShutdownDelayDuration to detect that /readyz is red
// and stop sending traffic to this server.
close(s.readinessStopCh)
shutdownInitiatedCh.Signal()
time.Sleep(s.ShutdownDelayDuration)
}()
// close socket after delayed stopCh
stoppedCh, err := s.NonBlockingRun(delayedStopCh)
stoppedCh, err := s.NonBlockingRun(delayedStopCh.Signaled())
if err != nil {
return err
}
drainedCh := make(chan struct{})
drainedCh := s.terminationSignals.InFlightRequestsDrained
go func() {
defer close(drainedCh)
defer drainedCh.Signal()
// wait for the delayed stopCh before closing the handler chain (it rejects everything after Wait has been called).
<-delayedStopCh
<-delayedStopCh.Signaled()
// Wait for all requests to finish, which are bounded by the RequestTimeout variable.
s.HandlerChainWaitGroup.Wait()
}()
klog.V(1).Info("[graceful-termination] waiting for shutdown to be initiated")
<-stopCh
// run shutdown hooks directly. This includes deregistering from the kubernetes endpoint in case of kube-apiserver.
@ -369,12 +375,14 @@ func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error {
if err != nil {
return err
}
klog.V(1).Info("[graceful-termination] RunPreShutdownHooks has completed")
// Wait for all requests in flight to drain, bounded by the RequestTimeout variable.
<-drainedCh
<-drainedCh.Signaled()
// wait for stoppedCh that is closed when the graceful termination (server.Shutdown) is finished.
<-stoppedCh
klog.V(1).Info("[graceful-termination] apiserver is exiting")
return nil
}

View File

@ -0,0 +1,127 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package server
import (
"k8s.io/klog/v2"
)
/*
We make an attempt here to identify the events that take place during
the graceful shutdown of the apiserver.
We also identify each event with a name so we can refer to it.
Events:
- ShutdownInitiated: KILL signal received
- AfterShutdownDelayDuration: shutdown delay duration has passed
- InFlightRequestsDrained: all in flight request(s) have been drained
The following is a sequence of shutdown events that we expect to see during termination:
T0: ShutdownInitiated: KILL signal received
- /readyz starts returning red
- run pre shutdown hooks
T0+70s: AfterShutdownDelayDuration: shutdown delay duration has passed
- the default value of 'ShutdownDelayDuration' is '70s'
- it's time to initiate shutdown of the HTTP Server, server.Shutdown is invoked
- as a consequene, the Close function has is called for all listeners
- the HTTP Server stops listening immediately
- any new request arriving on a new TCP socket is denied with
a network error similar to 'connection refused'
- the HTTP Server waits gracefully for existing requests to complete
up to '60s' (dictated by ShutdownTimeout)
- active long running requests will receive a GOAWAY.
T0 + 70s + up-to 60s: InFlightRequestsDrained: existing in flight requests have been drained
- long running requests are outside of this scope
- up-to 60s: the default value of 'ShutdownTimeout' is 60s, this means that
any request in flight has a hard timeout of 60s.
- it's time to call 'Shutdown' on the audit events since all
in flight request(s) have drained.
*/
// terminationSignal encapsulates a named apiserver termination event
type terminationSignal interface {
// Signal signals the event, indicating that the event has occurred.
// Signal is idempotent, once signaled the event stays signaled and
// it immediately unblocks any goroutine waiting for this event.
Signal()
// Signaled returns a channel that is closed when the underlying termination
// event has been signaled. Successive calls to Signaled return the same value.
Signaled() <-chan struct{}
}
// terminationSignals provides an abstraction of the termination events that
// transpire during the shutdown period of the apiserver. This abstraction makes it easy
// for us to write unit tests that can verify expected graceful termination behavior.
//
// GenericAPIServer can use these to either:
// - signal that a particular termination event has transpired
// - wait for a designated termination event to transpire and do some action.
type terminationSignals struct {
// ShutdownInitiated event is signaled when an apiserver shutdown has been initiated.
// It is signaled when the `stopCh` provided by the main goroutine
// receives a KILL signal and is closed as a consequence.
ShutdownInitiated terminationSignal
// AfterShutdownDelayDuration event is signaled as soon as ShutdownDelayDuration
// has elapsed since the ShutdownInitiated event.
// ShutdownDelayDuration allows the apiserver to delay shutdown for some time.
AfterShutdownDelayDuration terminationSignal
// InFlightRequestsDrained event is signaled when the existing requests
// in flight have completed. This is used as signal to shut down the audit backends
InFlightRequestsDrained terminationSignal
}
// newTerminationSignals returns an instance of terminationSignals interface to be used
// to coordinate graceful termination of the apiserver
func newTerminationSignals() terminationSignals {
return terminationSignals{
ShutdownInitiated: newNamedChannelWrapper("ShutdownInitiated"),
AfterShutdownDelayDuration: newNamedChannelWrapper("AfterShutdownDelayDuration"),
InFlightRequestsDrained: newNamedChannelWrapper("InFlightRequestsDrained"),
}
}
func newNamedChannelWrapper(name string) terminationSignal {
return &namedChannelWrapper{
name: name,
ch: make(chan struct{}),
}
}
type namedChannelWrapper struct {
name string
ch chan struct{}
}
func (e *namedChannelWrapper) Signal() {
select {
case <-e.ch:
// already closed, don't close again.
default:
close(e.ch)
klog.V(1).InfoS("[graceful-termination] shutdown event", "name", e.name)
}
}
func (e *namedChannelWrapper) Signaled() <-chan struct{} {
return e.ch
}