diff --git a/staging/src/k8s.io/apiserver/pkg/server/config.go b/staging/src/k8s.io/apiserver/pkg/server/config.go index 27ee09b4825..c5f995986aa 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/config.go +++ b/staging/src/k8s.io/apiserver/pkg/server/config.go @@ -219,6 +219,11 @@ type Config struct { // RequestWidthEstimator is used to estimate the "width" of the incoming request(s). RequestWidthEstimator flowcontrolrequest.WidthEstimatorFunc + // terminationSignals provides access to the various shutdown signals + // that happen during the graceful termination of the apiserver. + // it's intentionally marked private as it should never be overridden. + terminationSignals terminationSignals + //=========================================================================== // values below here are targets for removal //=========================================================================== @@ -343,6 +348,7 @@ func NewConfig(codecs serializer.CodecFactory) *Config { // Generic API servers have no inherent long-running subresources LongRunningFunc: genericfilters.BasicLongRunningRequestCheck(sets.NewString("watch"), sets.NewString()), RequestWidthEstimator: flowcontrolrequest.DefaultWidthEstimator, + terminationSignals: newTerminationSignals(), APIServerID: id, StorageVersionManager: storageversion.NewDefaultManager(), @@ -589,7 +595,6 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G healthzChecks: c.HealthzChecks, livezChecks: c.LivezChecks, readyzChecks: c.ReadyzChecks, - readinessStopCh: make(chan struct{}), livezGracePeriod: c.LivezGracePeriod, DiscoveryGroupManager: discovery.NewRootAPIsHandler(c.DiscoveryAddresses, c.Serializer), @@ -597,6 +602,8 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G maxRequestBodyBytes: c.MaxRequestBodyBytes, livezClock: clock.RealClock{}, + terminationSignals: c.terminationSignals, + APIServerID: c.APIServerID, StorageVersionManager: c.StorageVersionManager, diff --git a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go index 20ff9363c9a..cf2ffae163e 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go +++ b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go @@ -174,9 +174,6 @@ type GenericAPIServer struct { readyzChecksInstalled bool livezGracePeriod time.Duration livezClock clock.Clock - // the readiness stop channel is used to signal that the apiserver has initiated a shutdown sequence, this - // will cause readyz to return unhealthy. - readinessStopCh chan struct{} // auditing. The backend is started after the server starts listening. AuditBackend audit.Backend @@ -213,6 +210,10 @@ type GenericAPIServer struct { // Version will enable the /version endpoint if non-nil Version *version.Info + + // terminationSignals provides access to the various termination + // signals that happen during the shutdown period of the apiserver. + terminationSignals terminationSignals } // DelegationTarget is an interface which allows for composition of API servers with top level handling that works @@ -307,7 +308,10 @@ func (s *GenericAPIServer) PrepareRun() preparedGenericAPIServer { s.installHealthz() s.installLivez() - err := s.addReadyzShutdownCheck(s.readinessStopCh) + + // as soon as shutdown is initiated, readiness should start failing + readinessStopCh := s.terminationSignals.ShutdownInitiated.Signaled() + err := s.addReadyzShutdownCheck(readinessStopCh) if err != nil { klog.Errorf("Failed to install readyz shutdown check %s", err) } @@ -330,38 +334,40 @@ func (s *GenericAPIServer) PrepareRun() preparedGenericAPIServer { // Run spawns the secure http server. It only returns if stopCh is closed // or the secure port cannot be listened on initially. func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error { - delayedStopCh := make(chan struct{}) + delayedStopCh := s.terminationSignals.AfterShutdownDelayDuration + shutdownInitiatedCh := s.terminationSignals.ShutdownInitiated go func() { - defer close(delayedStopCh) + defer delayedStopCh.Signal() <-stopCh // As soon as shutdown is initiated, /readyz should start returning failure. // This gives the load balancer a window defined by ShutdownDelayDuration to detect that /readyz is red // and stop sending traffic to this server. - close(s.readinessStopCh) + shutdownInitiatedCh.Signal() time.Sleep(s.ShutdownDelayDuration) }() // close socket after delayed stopCh - stoppedCh, err := s.NonBlockingRun(delayedStopCh) + stoppedCh, err := s.NonBlockingRun(delayedStopCh.Signaled()) if err != nil { return err } - drainedCh := make(chan struct{}) + drainedCh := s.terminationSignals.InFlightRequestsDrained go func() { - defer close(drainedCh) + defer drainedCh.Signal() // wait for the delayed stopCh before closing the handler chain (it rejects everything after Wait has been called). - <-delayedStopCh + <-delayedStopCh.Signaled() // Wait for all requests to finish, which are bounded by the RequestTimeout variable. s.HandlerChainWaitGroup.Wait() }() + klog.V(1).Info("[graceful-termination] waiting for shutdown to be initiated") <-stopCh // run shutdown hooks directly. This includes deregistering from the kubernetes endpoint in case of kube-apiserver. @@ -369,12 +375,14 @@ func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error { if err != nil { return err } + klog.V(1).Info("[graceful-termination] RunPreShutdownHooks has completed") // Wait for all requests in flight to drain, bounded by the RequestTimeout variable. - <-drainedCh + <-drainedCh.Signaled() // wait for stoppedCh that is closed when the graceful termination (server.Shutdown) is finished. <-stoppedCh + klog.V(1).Info("[graceful-termination] apiserver is exiting") return nil } diff --git a/staging/src/k8s.io/apiserver/pkg/server/graceful_termination.go b/staging/src/k8s.io/apiserver/pkg/server/graceful_termination.go new file mode 100644 index 00000000000..0841a017ef2 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/server/graceful_termination.go @@ -0,0 +1,127 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package server + +import ( + "k8s.io/klog/v2" +) + +/* +We make an attempt here to identify the events that take place during +the graceful shutdown of the apiserver. + +We also identify each event with a name so we can refer to it. + +Events: +- ShutdownInitiated: KILL signal received +- AfterShutdownDelayDuration: shutdown delay duration has passed +- InFlightRequestsDrained: all in flight request(s) have been drained + +The following is a sequence of shutdown events that we expect to see during termination: +T0: ShutdownInitiated: KILL signal received + - /readyz starts returning red + - run pre shutdown hooks + +T0+70s: AfterShutdownDelayDuration: shutdown delay duration has passed + - the default value of 'ShutdownDelayDuration' is '70s' + - it's time to initiate shutdown of the HTTP Server, server.Shutdown is invoked + - as a consequene, the Close function has is called for all listeners + - the HTTP Server stops listening immediately + - any new request arriving on a new TCP socket is denied with + a network error similar to 'connection refused' + - the HTTP Server waits gracefully for existing requests to complete + up to '60s' (dictated by ShutdownTimeout) + - active long running requests will receive a GOAWAY. + +T0 + 70s + up-to 60s: InFlightRequestsDrained: existing in flight requests have been drained + - long running requests are outside of this scope + - up-to 60s: the default value of 'ShutdownTimeout' is 60s, this means that + any request in flight has a hard timeout of 60s. + - it's time to call 'Shutdown' on the audit events since all + in flight request(s) have drained. +*/ + +// terminationSignal encapsulates a named apiserver termination event +type terminationSignal interface { + // Signal signals the event, indicating that the event has occurred. + // Signal is idempotent, once signaled the event stays signaled and + // it immediately unblocks any goroutine waiting for this event. + Signal() + + // Signaled returns a channel that is closed when the underlying termination + // event has been signaled. Successive calls to Signaled return the same value. + Signaled() <-chan struct{} +} + +// terminationSignals provides an abstraction of the termination events that +// transpire during the shutdown period of the apiserver. This abstraction makes it easy +// for us to write unit tests that can verify expected graceful termination behavior. +// +// GenericAPIServer can use these to either: +// - signal that a particular termination event has transpired +// - wait for a designated termination event to transpire and do some action. +type terminationSignals struct { + // ShutdownInitiated event is signaled when an apiserver shutdown has been initiated. + // It is signaled when the `stopCh` provided by the main goroutine + // receives a KILL signal and is closed as a consequence. + ShutdownInitiated terminationSignal + + // AfterShutdownDelayDuration event is signaled as soon as ShutdownDelayDuration + // has elapsed since the ShutdownInitiated event. + // ShutdownDelayDuration allows the apiserver to delay shutdown for some time. + AfterShutdownDelayDuration terminationSignal + + // InFlightRequestsDrained event is signaled when the existing requests + // in flight have completed. This is used as signal to shut down the audit backends + InFlightRequestsDrained terminationSignal +} + +// newTerminationSignals returns an instance of terminationSignals interface to be used +// to coordinate graceful termination of the apiserver +func newTerminationSignals() terminationSignals { + return terminationSignals{ + ShutdownInitiated: newNamedChannelWrapper("ShutdownInitiated"), + AfterShutdownDelayDuration: newNamedChannelWrapper("AfterShutdownDelayDuration"), + InFlightRequestsDrained: newNamedChannelWrapper("InFlightRequestsDrained"), + } +} + +func newNamedChannelWrapper(name string) terminationSignal { + return &namedChannelWrapper{ + name: name, + ch: make(chan struct{}), + } +} + +type namedChannelWrapper struct { + name string + ch chan struct{} +} + +func (e *namedChannelWrapper) Signal() { + select { + case <-e.ch: + // already closed, don't close again. + default: + close(e.ch) + klog.V(1).InfoS("[graceful-termination] shutdown event", "name", e.name) + } +} + +func (e *namedChannelWrapper) Signaled() <-chan struct{} { + return e.ch +}