Merge pull request #103432 from p0lyn0mial/lifecycle_events

simply renames terminationSignals to lifecycleSignals
This commit is contained in:
Kubernetes Prow Robot 2021-07-02 05:44:13 -07:00 committed by GitHub
commit 93119f4503
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 55 additions and 50 deletions

View File

@ -225,10 +225,10 @@ type Config struct {
// RequestWidthEstimator is used to estimate the "width" of the incoming request(s). // RequestWidthEstimator is used to estimate the "width" of the incoming request(s).
RequestWidthEstimator flowcontrolrequest.WidthEstimatorFunc RequestWidthEstimator flowcontrolrequest.WidthEstimatorFunc
// terminationSignals provides access to the various shutdown signals // lifecycleSignals provides access to the various signals
// that happen during the graceful termination of the apiserver. // that happen during lifecycle of the apiserver.
// it's intentionally marked private as it should never be overridden. // it's intentionally marked private as it should never be overridden.
terminationSignals terminationSignals lifecycleSignals lifecycleSignals
//=========================================================================== //===========================================================================
// values below here are targets for removal // values below here are targets for removal
@ -354,7 +354,7 @@ func NewConfig(codecs serializer.CodecFactory) *Config {
// Generic API servers have no inherent long-running subresources // Generic API servers have no inherent long-running subresources
LongRunningFunc: genericfilters.BasicLongRunningRequestCheck(sets.NewString("watch"), sets.NewString()), LongRunningFunc: genericfilters.BasicLongRunningRequestCheck(sets.NewString("watch"), sets.NewString()),
RequestWidthEstimator: flowcontrolrequest.DefaultWidthEstimator, RequestWidthEstimator: flowcontrolrequest.DefaultWidthEstimator,
terminationSignals: newTerminationSignals(), lifecycleSignals: newLifecycleSignals(),
APIServerID: id, APIServerID: id,
StorageVersionManager: storageversion.NewDefaultManager(), StorageVersionManager: storageversion.NewDefaultManager(),
@ -608,7 +608,7 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G
maxRequestBodyBytes: c.MaxRequestBodyBytes, maxRequestBodyBytes: c.MaxRequestBodyBytes,
livezClock: clock.RealClock{}, livezClock: clock.RealClock{},
terminationSignals: c.terminationSignals, lifecycleSignals: c.lifecycleSignals,
APIServerID: c.APIServerID, APIServerID: c.APIServerID,
StorageVersionManager: c.StorageVersionManager, StorageVersionManager: c.StorageVersionManager,

View File

@ -211,9 +211,8 @@ type GenericAPIServer struct {
// Version will enable the /version endpoint if non-nil // Version will enable the /version endpoint if non-nil
Version *version.Info Version *version.Info
// terminationSignals provides access to the various termination // lifecycleSignals provides access to the various signals that happen during the life cycle of the apiserver.
// signals that happen during the shutdown period of the apiserver. lifecycleSignals lifecycleSignals
terminationSignals terminationSignals
} }
// DelegationTarget is an interface which allows for composition of API servers with top level handling that works // DelegationTarget is an interface which allows for composition of API servers with top level handling that works
@ -310,7 +309,7 @@ func (s *GenericAPIServer) PrepareRun() preparedGenericAPIServer {
s.installLivez() s.installLivez()
// as soon as shutdown is initiated, readiness should start failing // as soon as shutdown is initiated, readiness should start failing
readinessStopCh := s.terminationSignals.ShutdownInitiated.Signaled() readinessStopCh := s.lifecycleSignals.ShutdownInitiated.Signaled()
err := s.addReadyzShutdownCheck(readinessStopCh) err := s.addReadyzShutdownCheck(readinessStopCh)
if err != nil { if err != nil {
klog.Errorf("Failed to install readyz shutdown check %s", err) klog.Errorf("Failed to install readyz shutdown check %s", err)
@ -334,11 +333,12 @@ func (s *GenericAPIServer) PrepareRun() preparedGenericAPIServer {
// Run spawns the secure http server. It only returns if stopCh is closed // Run spawns the secure http server. It only returns if stopCh is closed
// or the secure port cannot be listened on initially. // or the secure port cannot be listened on initially.
func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error { func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error {
delayedStopCh := s.terminationSignals.AfterShutdownDelayDuration delayedStopCh := s.lifecycleSignals.AfterShutdownDelayDuration
shutdownInitiatedCh := s.terminationSignals.ShutdownInitiated shutdownInitiatedCh := s.lifecycleSignals.ShutdownInitiated
go func() { go func() {
defer delayedStopCh.Signal() defer delayedStopCh.Signal()
defer klog.V(1).InfoS("[graceful-termination] shutdown event", "name", delayedStopCh.Name())
<-stopCh <-stopCh
@ -346,6 +346,7 @@ func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error {
// This gives the load balancer a window defined by ShutdownDelayDuration to detect that /readyz is red // This gives the load balancer a window defined by ShutdownDelayDuration to detect that /readyz is red
// and stop sending traffic to this server. // and stop sending traffic to this server.
shutdownInitiatedCh.Signal() shutdownInitiatedCh.Signal()
klog.V(1).InfoS("[graceful-termination] shutdown event", "name", shutdownInitiatedCh.Name())
time.Sleep(s.ShutdownDelayDuration) time.Sleep(s.ShutdownDelayDuration)
}() }()
@ -355,15 +356,17 @@ func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error {
if err != nil { if err != nil {
return err return err
} }
httpServerStoppedListeningCh := s.terminationSignals.HTTPServerStoppedListening httpServerStoppedListeningCh := s.lifecycleSignals.HTTPServerStoppedListening
go func() { go func() {
<-listenerStoppedCh <-listenerStoppedCh
httpServerStoppedListeningCh.Signal() httpServerStoppedListeningCh.Signal()
klog.V(1).InfoS("[graceful-termination] shutdown event", "name", httpServerStoppedListeningCh.Name())
}() }()
drainedCh := s.terminationSignals.InFlightRequestsDrained drainedCh := s.lifecycleSignals.InFlightRequestsDrained
go func() { go func() {
defer drainedCh.Signal() defer drainedCh.Signal()
defer klog.V(1).InfoS("[graceful-termination] shutdown event", "name", drainedCh.Name())
// wait for the delayed stopCh before closing the handler chain (it rejects everything after Wait has been called). // wait for the delayed stopCh before closing the handler chain (it rejects everything after Wait has been called).
<-delayedStopCh.Signaled() <-delayedStopCh.Signaled()

View File

@ -52,32 +52,32 @@ type result struct {
response *http.Response response *http.Response
} }
// wrap a terminationSignal so the test can inject its own callback // wrap a lifecycleSignal so the test can inject its own callback
type wrappedTerminationSignal struct { type wrappedTerminationSignal struct {
terminationSignal lifecycleSignal
callback func(bool, string, terminationSignal) callback func(bool, string, lifecycleSignal)
} }
func (w *wrappedTerminationSignal) Signal() { func (w *wrappedTerminationSignal) Signal() {
var name string var name string
if ncw, ok := w.terminationSignal.(*namedChannelWrapper); ok { if ncw, ok := w.lifecycleSignal.(*namedChannelWrapper); ok {
name = ncw.name name = ncw.name
} }
// the callback is invoked before and after the termination event is signaled // the callback is invoked before and after the termination event is signaled
if w.callback != nil { if w.callback != nil {
w.callback(true, name, w.terminationSignal) w.callback(true, name, w.lifecycleSignal)
} }
w.terminationSignal.Signal() w.lifecycleSignal.Signal()
if w.callback != nil { if w.callback != nil {
w.callback(false, name, w.terminationSignal) w.callback(false, name, w.lifecycleSignal)
} }
} }
func wrapTerminationSignals(t *testing.T, ts *terminationSignals, callback func(bool, string, terminationSignal)) { func wrapTerminationSignals(t *testing.T, ts *lifecycleSignals, callback func(bool, string, lifecycleSignal)) {
newWrappedTerminationSignal := func(delegated terminationSignal) terminationSignal { newWrappedTerminationSignal := func(delegated lifecycleSignal) lifecycleSignal {
return &wrappedTerminationSignal{ return &wrappedTerminationSignal{
terminationSignal: delegated, lifecycleSignal: delegated,
callback: callback, callback: callback,
} }
} }
@ -116,7 +116,7 @@ func TestGracefulTerminationWithKeepListeningDuringGracefulTerminationDisabled(t
// record the termination events in the order they are signaled // record the termination events in the order they are signaled
var signalOrderLock sync.Mutex var signalOrderLock sync.Mutex
signalOrderGot := make([]string, 0) signalOrderGot := make([]string, 0)
recordOrderFn := func(before bool, name string, e terminationSignal) { recordOrderFn := func(before bool, name string, e lifecycleSignal) {
if !before { if !before {
return return
} }
@ -147,7 +147,7 @@ func TestGracefulTerminationWithKeepListeningDuringGracefulTerminationDisabled(t
resultGot = doer.Do(newClient(true), shouldUseNewConnection(t), "/echo?message=request-on-a-new-tcp-connection-should-succeed", time.Second) resultGot = doer.Do(newClient(true), shouldUseNewConnection(t), "/echo?message=request-on-a-new-tcp-connection-should-succeed", time.Second)
requestMustSucceed(t, resultGot) requestMustSucceed(t, resultGot)
}) })
steps := func(before bool, name string, e terminationSignal) { steps := func(before bool, name string, e lifecycleSignal) {
// Before AfterShutdownDelayDuration event is signaled, the test // Before AfterShutdownDelayDuration event is signaled, the test
// will send request(s) to assert on expected behavior. // will send request(s) to assert on expected behavior.
if name == "AfterShutdownDelayDuration" && before { if name == "AfterShutdownDelayDuration" && before {
@ -157,7 +157,7 @@ func TestGracefulTerminationWithKeepListeningDuringGracefulTerminationDisabled(t
} }
// wrap the termination signals of the GenericAPIServer so the test can inject its own callback // wrap the termination signals of the GenericAPIServer so the test can inject its own callback
wrapTerminationSignals(t, &s.terminationSignals, func(before bool, name string, e terminationSignal) { wrapTerminationSignals(t, &s.lifecycleSignals, func(before bool, name string, e lifecycleSignal) {
recordOrderFn(before, name, e) recordOrderFn(before, name, e)
steps(before, name, e) steps(before, name, e)
}) })
@ -192,7 +192,7 @@ func TestGracefulTerminationWithKeepListeningDuringGracefulTerminationDisabled(t
} }
// step 4: wait for the HTTP Server listener to have stopped // step 4: wait for the HTTP Server listener to have stopped
httpServerStoppedListeningCh := s.terminationSignals.HTTPServerStoppedListening httpServerStoppedListeningCh := s.lifecycleSignals.HTTPServerStoppedListening
select { select {
case <-httpServerStoppedListeningCh.Signaled(): case <-httpServerStoppedListeningCh.Signaled():
case <-time.After(5 * time.Second): case <-time.After(5 * time.Second):

View File

@ -16,13 +16,9 @@ limitations under the License.
package server package server
import (
"k8s.io/klog/v2"
)
/* /*
We make an attempt here to identify the events that take place during We make an attempt here to identify the events that take place during
the graceful shutdown of the apiserver. lifecycle of the apiserver.
We also identify each event with a name so we can refer to it. We also identify each event with a name so we can refer to it.
@ -59,49 +55,52 @@ T0 + 70s + up-to 60s: InFlightRequestsDrained: existing in flight requests have
in flight request(s) have drained. in flight request(s) have drained.
*/ */
// terminationSignal encapsulates a named apiserver termination event // lifecycleSignal encapsulates a named apiserver event
type terminationSignal interface { type lifecycleSignal interface {
// Signal signals the event, indicating that the event has occurred. // Signal signals the event, indicating that the event has occurred.
// Signal is idempotent, once signaled the event stays signaled and // Signal is idempotent, once signaled the event stays signaled and
// it immediately unblocks any goroutine waiting for this event. // it immediately unblocks any goroutine waiting for this event.
Signal() Signal()
// Signaled returns a channel that is closed when the underlying termination // Signaled returns a channel that is closed when the underlying event
// event has been signaled. Successive calls to Signaled return the same value. // has been signaled. Successive calls to Signaled return the same value.
Signaled() <-chan struct{} Signaled() <-chan struct{}
// Name returns the name of the signal, useful for logging.
Name() string
} }
// terminationSignals provides an abstraction of the termination events that // lifecycleSignals provides an abstraction of the events that
// transpire during the shutdown period of the apiserver. This abstraction makes it easy // transpire during the lifecycle of the apiserver. This abstraction makes it easy
// for us to write unit tests that can verify expected graceful termination behavior. // for us to write unit tests that can verify expected graceful termination behavior.
// //
// GenericAPIServer can use these to either: // GenericAPIServer can use these to either:
// - signal that a particular termination event has transpired // - signal that a particular termination event has transpired
// - wait for a designated termination event to transpire and do some action. // - wait for a designated termination event to transpire and do some action.
type terminationSignals struct { type lifecycleSignals struct {
// ShutdownInitiated event is signaled when an apiserver shutdown has been initiated. // ShutdownInitiated event is signaled when an apiserver shutdown has been initiated.
// It is signaled when the `stopCh` provided by the main goroutine // It is signaled when the `stopCh` provided by the main goroutine
// receives a KILL signal and is closed as a consequence. // receives a KILL signal and is closed as a consequence.
ShutdownInitiated terminationSignal ShutdownInitiated lifecycleSignal
// AfterShutdownDelayDuration event is signaled as soon as ShutdownDelayDuration // AfterShutdownDelayDuration event is signaled as soon as ShutdownDelayDuration
// has elapsed since the ShutdownInitiated event. // has elapsed since the ShutdownInitiated event.
// ShutdownDelayDuration allows the apiserver to delay shutdown for some time. // ShutdownDelayDuration allows the apiserver to delay shutdown for some time.
AfterShutdownDelayDuration terminationSignal AfterShutdownDelayDuration lifecycleSignal
// InFlightRequestsDrained event is signaled when the existing requests // InFlightRequestsDrained event is signaled when the existing requests
// in flight have completed. This is used as signal to shut down the audit backends // in flight have completed. This is used as signal to shut down the audit backends
InFlightRequestsDrained terminationSignal InFlightRequestsDrained lifecycleSignal
// HTTPServerStoppedListening termination event is signaled when the // HTTPServerStoppedListening termination event is signaled when the
// HTTP Server has stopped listening to the underlying socket. // HTTP Server has stopped listening to the underlying socket.
HTTPServerStoppedListening terminationSignal HTTPServerStoppedListening lifecycleSignal
} }
// newTerminationSignals returns an instance of terminationSignals interface to be used // newLifecycleSignals returns an instance of lifecycleSignals interface to be used
// to coordinate graceful termination of the apiserver // to coordinate lifecycle of the apiserver
func newTerminationSignals() terminationSignals { func newLifecycleSignals() lifecycleSignals {
return terminationSignals{ return lifecycleSignals{
ShutdownInitiated: newNamedChannelWrapper("ShutdownInitiated"), ShutdownInitiated: newNamedChannelWrapper("ShutdownInitiated"),
AfterShutdownDelayDuration: newNamedChannelWrapper("AfterShutdownDelayDuration"), AfterShutdownDelayDuration: newNamedChannelWrapper("AfterShutdownDelayDuration"),
InFlightRequestsDrained: newNamedChannelWrapper("InFlightRequestsDrained"), InFlightRequestsDrained: newNamedChannelWrapper("InFlightRequestsDrained"),
@ -109,7 +108,7 @@ func newTerminationSignals() terminationSignals {
} }
} }
func newNamedChannelWrapper(name string) terminationSignal { func newNamedChannelWrapper(name string) lifecycleSignal {
return &namedChannelWrapper{ return &namedChannelWrapper{
name: name, name: name,
ch: make(chan struct{}), ch: make(chan struct{}),
@ -127,10 +126,13 @@ func (e *namedChannelWrapper) Signal() {
// already closed, don't close again. // already closed, don't close again.
default: default:
close(e.ch) close(e.ch)
klog.V(1).InfoS("[graceful-termination] shutdown event", "name", e.name)
} }
} }
func (e *namedChannelWrapper) Signaled() <-chan struct{} { func (e *namedChannelWrapper) Signaled() <-chan struct{} {
return e.ch return e.ch
} }
func (e *namedChannelWrapper) Name() string {
return e.name
}