From dae08bc3a735e50845af7cf639bdbb8971a2115a Mon Sep 17 00:00:00 2001 From: Lukasz Szaszkiewicz Date: Fri, 2 Jul 2021 12:28:07 +0200 Subject: [PATCH 1/2] rename terminationSignals to lifecycleSignals --- .../src/k8s.io/apiserver/pkg/server/config.go | 10 +++--- .../apiserver/pkg/server/genericapiserver.go | 15 ++++---- ...ericapiserver_graceful_termination_test.go | 30 ++++++++-------- ...ul_termination.go => lifecycle_signals.go} | 34 +++++++++---------- 4 files changed, 44 insertions(+), 45 deletions(-) rename staging/src/k8s.io/apiserver/pkg/server/{graceful_termination.go => lifecycle_signals.go} (83%) diff --git a/staging/src/k8s.io/apiserver/pkg/server/config.go b/staging/src/k8s.io/apiserver/pkg/server/config.go index 8491188e5b1..c4bbe15119e 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/config.go +++ b/staging/src/k8s.io/apiserver/pkg/server/config.go @@ -225,10 +225,10 @@ type Config struct { // RequestWidthEstimator is used to estimate the "width" of the incoming request(s). RequestWidthEstimator flowcontrolrequest.WidthEstimatorFunc - // terminationSignals provides access to the various shutdown signals - // that happen during the graceful termination of the apiserver. + // lifecycleSignals provides access to the various signals + // that happen during lifecycle of the apiserver. // it's intentionally marked private as it should never be overridden. - terminationSignals terminationSignals + lifecycleSignals lifecycleSignals //=========================================================================== // values below here are targets for removal @@ -354,7 +354,7 @@ func NewConfig(codecs serializer.CodecFactory) *Config { // Generic API servers have no inherent long-running subresources LongRunningFunc: genericfilters.BasicLongRunningRequestCheck(sets.NewString("watch"), sets.NewString()), RequestWidthEstimator: flowcontrolrequest.DefaultWidthEstimator, - terminationSignals: newTerminationSignals(), + lifecycleSignals: newLifecycleSignals(), APIServerID: id, StorageVersionManager: storageversion.NewDefaultManager(), @@ -608,7 +608,7 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G maxRequestBodyBytes: c.MaxRequestBodyBytes, livezClock: clock.RealClock{}, - terminationSignals: c.terminationSignals, + lifecycleSignals: c.lifecycleSignals, APIServerID: c.APIServerID, StorageVersionManager: c.StorageVersionManager, diff --git a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go index 5c4cbd729e3..c665e418830 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go +++ b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go @@ -211,9 +211,8 @@ type GenericAPIServer struct { // Version will enable the /version endpoint if non-nil Version *version.Info - // terminationSignals provides access to the various termination - // signals that happen during the shutdown period of the apiserver. - terminationSignals terminationSignals + // lifecycleSignals provides access to the various signals that happen during the life cycle of the apiserver. + lifecycleSignals lifecycleSignals } // DelegationTarget is an interface which allows for composition of API servers with top level handling that works @@ -310,7 +309,7 @@ func (s *GenericAPIServer) PrepareRun() preparedGenericAPIServer { s.installLivez() // as soon as shutdown is initiated, readiness should start failing - readinessStopCh := s.terminationSignals.ShutdownInitiated.Signaled() + readinessStopCh := s.lifecycleSignals.ShutdownInitiated.Signaled() err := s.addReadyzShutdownCheck(readinessStopCh) if err != nil { klog.Errorf("Failed to install readyz shutdown check %s", err) @@ -334,8 +333,8 @@ func (s *GenericAPIServer) PrepareRun() preparedGenericAPIServer { // Run spawns the secure http server. It only returns if stopCh is closed // or the secure port cannot be listened on initially. func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error { - delayedStopCh := s.terminationSignals.AfterShutdownDelayDuration - shutdownInitiatedCh := s.terminationSignals.ShutdownInitiated + delayedStopCh := s.lifecycleSignals.AfterShutdownDelayDuration + shutdownInitiatedCh := s.lifecycleSignals.ShutdownInitiated go func() { defer delayedStopCh.Signal() @@ -355,13 +354,13 @@ func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error { if err != nil { return err } - httpServerStoppedListeningCh := s.terminationSignals.HTTPServerStoppedListening + httpServerStoppedListeningCh := s.lifecycleSignals.HTTPServerStoppedListening go func() { <-listenerStoppedCh httpServerStoppedListeningCh.Signal() }() - drainedCh := s.terminationSignals.InFlightRequestsDrained + drainedCh := s.lifecycleSignals.InFlightRequestsDrained go func() { defer drainedCh.Signal() diff --git a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver_graceful_termination_test.go b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver_graceful_termination_test.go index 7c30add2203..cb130186aab 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver_graceful_termination_test.go +++ b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver_graceful_termination_test.go @@ -52,33 +52,33 @@ type result struct { response *http.Response } -// wrap a terminationSignal so the test can inject its own callback +// wrap a lifecycleSignal so the test can inject its own callback type wrappedTerminationSignal struct { - terminationSignal - callback func(bool, string, terminationSignal) + lifecycleSignal + callback func(bool, string, lifecycleSignal) } func (w *wrappedTerminationSignal) Signal() { var name string - if ncw, ok := w.terminationSignal.(*namedChannelWrapper); ok { + if ncw, ok := w.lifecycleSignal.(*namedChannelWrapper); ok { name = ncw.name } // the callback is invoked before and after the termination event is signaled if w.callback != nil { - w.callback(true, name, w.terminationSignal) + w.callback(true, name, w.lifecycleSignal) } - w.terminationSignal.Signal() + w.lifecycleSignal.Signal() if w.callback != nil { - w.callback(false, name, w.terminationSignal) + w.callback(false, name, w.lifecycleSignal) } } -func wrapTerminationSignals(t *testing.T, ts *terminationSignals, callback func(bool, string, terminationSignal)) { - newWrappedTerminationSignal := func(delegated terminationSignal) terminationSignal { +func wrapTerminationSignals(t *testing.T, ts *lifecycleSignals, callback func(bool, string, lifecycleSignal)) { + newWrappedTerminationSignal := func(delegated lifecycleSignal) lifecycleSignal { return &wrappedTerminationSignal{ - terminationSignal: delegated, - callback: callback, + lifecycleSignal: delegated, + callback: callback, } } @@ -116,7 +116,7 @@ func TestGracefulTerminationWithKeepListeningDuringGracefulTerminationDisabled(t // record the termination events in the order they are signaled var signalOrderLock sync.Mutex signalOrderGot := make([]string, 0) - recordOrderFn := func(before bool, name string, e terminationSignal) { + recordOrderFn := func(before bool, name string, e lifecycleSignal) { if !before { return } @@ -147,7 +147,7 @@ func TestGracefulTerminationWithKeepListeningDuringGracefulTerminationDisabled(t resultGot = doer.Do(newClient(true), shouldUseNewConnection(t), "/echo?message=request-on-a-new-tcp-connection-should-succeed", time.Second) requestMustSucceed(t, resultGot) }) - steps := func(before bool, name string, e terminationSignal) { + steps := func(before bool, name string, e lifecycleSignal) { // Before AfterShutdownDelayDuration event is signaled, the test // will send request(s) to assert on expected behavior. if name == "AfterShutdownDelayDuration" && before { @@ -157,7 +157,7 @@ func TestGracefulTerminationWithKeepListeningDuringGracefulTerminationDisabled(t } // wrap the termination signals of the GenericAPIServer so the test can inject its own callback - wrapTerminationSignals(t, &s.terminationSignals, func(before bool, name string, e terminationSignal) { + wrapTerminationSignals(t, &s.lifecycleSignals, func(before bool, name string, e lifecycleSignal) { recordOrderFn(before, name, e) steps(before, name, e) }) @@ -192,7 +192,7 @@ func TestGracefulTerminationWithKeepListeningDuringGracefulTerminationDisabled(t } // step 4: wait for the HTTP Server listener to have stopped - httpServerStoppedListeningCh := s.terminationSignals.HTTPServerStoppedListening + httpServerStoppedListeningCh := s.lifecycleSignals.HTTPServerStoppedListening select { case <-httpServerStoppedListeningCh.Signaled(): case <-time.After(5 * time.Second): diff --git a/staging/src/k8s.io/apiserver/pkg/server/graceful_termination.go b/staging/src/k8s.io/apiserver/pkg/server/lifecycle_signals.go similarity index 83% rename from staging/src/k8s.io/apiserver/pkg/server/graceful_termination.go rename to staging/src/k8s.io/apiserver/pkg/server/lifecycle_signals.go index 817ba074ebd..a4e44e986fe 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/graceful_termination.go +++ b/staging/src/k8s.io/apiserver/pkg/server/lifecycle_signals.go @@ -22,7 +22,7 @@ import ( /* We make an attempt here to identify the events that take place during -the graceful shutdown of the apiserver. +lifecycle of the apiserver. We also identify each event with a name so we can refer to it. @@ -59,49 +59,49 @@ T0 + 70s + up-to 60s: InFlightRequestsDrained: existing in flight requests have in flight request(s) have drained. */ -// terminationSignal encapsulates a named apiserver termination event -type terminationSignal interface { +// lifecycleSignal encapsulates a named apiserver event +type lifecycleSignal interface { // Signal signals the event, indicating that the event has occurred. // Signal is idempotent, once signaled the event stays signaled and // it immediately unblocks any goroutine waiting for this event. Signal() - // Signaled returns a channel that is closed when the underlying termination - // event has been signaled. Successive calls to Signaled return the same value. + // Signaled returns a channel that is closed when the underlying event + // has been signaled. Successive calls to Signaled return the same value. Signaled() <-chan struct{} } -// terminationSignals provides an abstraction of the termination events that -// transpire during the shutdown period of the apiserver. This abstraction makes it easy +// lifecycleSignals provides an abstraction of the events that +// transpire during the lifecycle of the apiserver. This abstraction makes it easy // for us to write unit tests that can verify expected graceful termination behavior. // // GenericAPIServer can use these to either: // - signal that a particular termination event has transpired // - wait for a designated termination event to transpire and do some action. -type terminationSignals struct { +type lifecycleSignals struct { // ShutdownInitiated event is signaled when an apiserver shutdown has been initiated. // It is signaled when the `stopCh` provided by the main goroutine // receives a KILL signal and is closed as a consequence. - ShutdownInitiated terminationSignal + ShutdownInitiated lifecycleSignal // AfterShutdownDelayDuration event is signaled as soon as ShutdownDelayDuration // has elapsed since the ShutdownInitiated event. // ShutdownDelayDuration allows the apiserver to delay shutdown for some time. - AfterShutdownDelayDuration terminationSignal + AfterShutdownDelayDuration lifecycleSignal // InFlightRequestsDrained event is signaled when the existing requests // in flight have completed. This is used as signal to shut down the audit backends - InFlightRequestsDrained terminationSignal + InFlightRequestsDrained lifecycleSignal // HTTPServerStoppedListening termination event is signaled when the // HTTP Server has stopped listening to the underlying socket. - HTTPServerStoppedListening terminationSignal + HTTPServerStoppedListening lifecycleSignal } -// newTerminationSignals returns an instance of terminationSignals interface to be used -// to coordinate graceful termination of the apiserver -func newTerminationSignals() terminationSignals { - return terminationSignals{ +// newLifecycleSignals returns an instance of lifecycleSignals interface to be used +// to coordinate lifecycle of the apiserver +func newLifecycleSignals() lifecycleSignals { + return lifecycleSignals{ ShutdownInitiated: newNamedChannelWrapper("ShutdownInitiated"), AfterShutdownDelayDuration: newNamedChannelWrapper("AfterShutdownDelayDuration"), InFlightRequestsDrained: newNamedChannelWrapper("InFlightRequestsDrained"), @@ -109,7 +109,7 @@ func newTerminationSignals() terminationSignals { } } -func newNamedChannelWrapper(name string) terminationSignal { +func newNamedChannelWrapper(name string) lifecycleSignal { return &namedChannelWrapper{ name: name, ch: make(chan struct{}), From 6c88a62cb4c849e3844dcc3870073a1b5e05d301 Mon Sep 17 00:00:00 2001 From: Lukasz Szaszkiewicz Date: Fri, 2 Jul 2021 12:50:20 +0200 Subject: [PATCH 2/2] remove logging from the Signal method --- .../k8s.io/apiserver/pkg/server/genericapiserver.go | 4 ++++ .../k8s.io/apiserver/pkg/server/lifecycle_signals.go | 12 +++++++----- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go index c665e418830..5f4e8e348a8 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go +++ b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go @@ -338,6 +338,7 @@ func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error { go func() { defer delayedStopCh.Signal() + defer klog.V(1).InfoS("[graceful-termination] shutdown event", "name", delayedStopCh.Name()) <-stopCh @@ -345,6 +346,7 @@ func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error { // This gives the load balancer a window defined by ShutdownDelayDuration to detect that /readyz is red // and stop sending traffic to this server. shutdownInitiatedCh.Signal() + klog.V(1).InfoS("[graceful-termination] shutdown event", "name", shutdownInitiatedCh.Name()) time.Sleep(s.ShutdownDelayDuration) }() @@ -358,11 +360,13 @@ func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error { go func() { <-listenerStoppedCh httpServerStoppedListeningCh.Signal() + klog.V(1).InfoS("[graceful-termination] shutdown event", "name", httpServerStoppedListeningCh.Name()) }() drainedCh := s.lifecycleSignals.InFlightRequestsDrained go func() { defer drainedCh.Signal() + defer klog.V(1).InfoS("[graceful-termination] shutdown event", "name", drainedCh.Name()) // wait for the delayed stopCh before closing the handler chain (it rejects everything after Wait has been called). <-delayedStopCh.Signaled() diff --git a/staging/src/k8s.io/apiserver/pkg/server/lifecycle_signals.go b/staging/src/k8s.io/apiserver/pkg/server/lifecycle_signals.go index a4e44e986fe..2297a776ed4 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/lifecycle_signals.go +++ b/staging/src/k8s.io/apiserver/pkg/server/lifecycle_signals.go @@ -16,10 +16,6 @@ limitations under the License. package server -import ( - "k8s.io/klog/v2" -) - /* We make an attempt here to identify the events that take place during lifecycle of the apiserver. @@ -69,6 +65,9 @@ type lifecycleSignal interface { // Signaled returns a channel that is closed when the underlying event // has been signaled. Successive calls to Signaled return the same value. Signaled() <-chan struct{} + + // Name returns the name of the signal, useful for logging. + Name() string } // lifecycleSignals provides an abstraction of the events that @@ -127,10 +126,13 @@ func (e *namedChannelWrapper) Signal() { // already closed, don't close again. default: close(e.ch) - klog.V(1).InfoS("[graceful-termination] shutdown event", "name", e.name) } } func (e *namedChannelWrapper) Signaled() <-chan struct{} { return e.ch } + +func (e *namedChannelWrapper) Name() string { + return e.name +}