From ddfbb5d2bb57ee44b3e10f0b58f9cc7001f55802 Mon Sep 17 00:00:00 2001 From: Lukasz Szaszkiewicz Date: Fri, 15 Oct 2021 18:14:20 +0200 Subject: [PATCH] genericapiserver: indroduce muxCompleteSignals for holding signals that indicate all known HTTP paths have been registered the new field exists primarily to avoid returning a 404 response when a resource actually exists but we haven't installed the path to a handler. it is exposed for easier composition of the individual servers. the primary users of this field are the WithMuxCompleteProtection filter and the NotFoundHandler. --- .../src/k8s.io/apiserver/pkg/server/config.go | 18 ++++++-- .../apiserver/pkg/server/config_test.go | 1 + .../apiserver/pkg/server/genericapiserver.go | 45 +++++++++++++++++++ ...ericapiserver_graceful_termination_test.go | 41 +++++++++++++++++ .../pkg/server/genericapiserver_test.go | 33 ++++++++++++++ .../apiserver/pkg/server/lifecycle_signals.go | 6 +++ 6 files changed, 140 insertions(+), 4 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/server/config.go b/staging/src/k8s.io/apiserver/pkg/server/config.go index 524c8c0f5ae..3e3ad59daf4 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/config.go +++ b/staging/src/k8s.io/apiserver/pkg/server/config.go @@ -578,6 +578,7 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G handlerChainBuilder := func(handler http.Handler) http.Handler { return c.BuildHandlerChainFunc(handler, c.Config) } + apiServerHandler := NewAPIServerHandler(name, c.Serializer, handlerChainBuilder, delegationTarget.UnprotectedHandler()) s := &GenericAPIServer{ @@ -591,6 +592,9 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G delegationTarget: delegationTarget, EquivalentResourceRegistry: c.EquivalentResourceRegistry, HandlerChainWaitGroup: c.HandlerChainWaitGroup, + Handler: apiServerHandler, + + listedPathProvider: apiServerHandler, minRequestTimeout: time.Duration(c.MinRequestTimeout) * time.Second, ShutdownTimeout: c.RequestTimeout, @@ -598,10 +602,6 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G SecureServingInfo: c.SecureServing, ExternalAddress: c.ExternalAddress, - Handler: apiServerHandler, - - listedPathProvider: apiServerHandler, - openAPIConfig: c.OpenAPIConfig, skipOpenAPIInstallation: c.SkipOpenAPIInstallation, @@ -626,6 +626,8 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G StorageVersionManager: c.StorageVersionManager, Version: c.Version, + + muxCompleteSignals: map[string]<-chan struct{}{}, } for { @@ -657,6 +659,13 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G } } + // register mux signals from the delegated server + for k, v := range delegationTarget.MuxCompleteSignals() { + if err := s.RegisterMuxCompleteSignal(k, v); err != nil { + return nil, err + } + } + genericApiServerHookName := "generic-apiserver-start-informers" if c.SharedInformerFactory != nil { if !s.isPostStartHookRegistered(genericApiServerHookName) { @@ -807,6 +816,7 @@ func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler { } handler = genericapifilters.WithRequestInfo(handler, c.RequestInfoResolver) handler = genericapifilters.WithRequestReceivedTimestamp(handler) + handler = genericapifilters.WithMuxCompleteProtection(handler, c.lifecycleSignals.MuxComplete.Signaled()) handler = genericfilters.WithPanicRecovery(handler, c.RequestInfoResolver) handler = genericapifilters.WithAuditID(handler) return handler diff --git a/staging/src/k8s.io/apiserver/pkg/server/config_test.go b/staging/src/k8s.io/apiserver/pkg/server/config_test.go index 01c3cbafcb8..67bf529f9f3 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/config_test.go +++ b/staging/src/k8s.io/apiserver/pkg/server/config_test.go @@ -298,6 +298,7 @@ func TestAuthenticationAuditAnnotationsDefaultChain(t *testing.T) { RequestInfoResolver: &request.RequestInfoFactory{}, RequestTimeout: 10 * time.Second, LongRunningFunc: func(_ *http.Request, _ *request.RequestInfo) bool { return false }, + lifecycleSignals: newLifecycleSignals(), } h := DefaultBuildHandlerChain(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { diff --git a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go index e67f5fbebad..4f326415d0e 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go +++ b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go @@ -214,6 +214,12 @@ type GenericAPIServer struct { // lifecycleSignals provides access to the various signals that happen during the life cycle of the apiserver. lifecycleSignals lifecycleSignals + // muxCompleteSignals holds signals that indicate all known HTTP paths have been registered. + // it exists primarily to avoid returning a 404 response when a resource actually exists but we haven't installed the path to a handler. + // it is exposed for easier composition of the individual servers. + // the primary users of this field are the WithMuxCompleteProtection filter and the NotFoundHandler + muxCompleteSignals map[string]<-chan struct{} + // ShutdownSendRetryAfter dictates when to initiate shutdown of the HTTP // Server during the graceful termination of the apiserver. If true, we wait // for non longrunning requests in flight to be drained and then initiate a @@ -247,6 +253,9 @@ type DelegationTarget interface { // PrepareRun does post API installation setup steps. It calls recursively the same function of the delegates. PrepareRun() preparedGenericAPIServer + + // MuxCompleteSignals exposes registered signals that indicate if all known HTTP paths have been installed. + MuxCompleteSignals() map[string]<-chan struct{} } func (s *GenericAPIServer) UnprotectedHandler() http.Handler { @@ -270,7 +279,23 @@ func (s *GenericAPIServer) NextDelegate() DelegationTarget { return s.delegationTarget } +// RegisterMuxCompleteSignal registers the given signal that will be used to determine if all known +// HTTP paths have been registered. It is okay to call this method after instantiating the generic server but before running. +func (s *GenericAPIServer) RegisterMuxCompleteSignal(signalName string, signal <-chan struct{}) error { + if _, exists := s.muxCompleteSignals[signalName]; exists { + return fmt.Errorf("%s already registered", signalName) + } + s.muxCompleteSignals[signalName] = signal + return nil +} + +func (s *GenericAPIServer) MuxCompleteSignals() map[string]<-chan struct{} { + return s.muxCompleteSignals +} + type emptyDelegate struct { + // handler is called at the end of the delegation chain + // when a request has been made against an unregistered HTTP path the individual servers will simply pass it through until it reaches the handler. handler http.Handler } @@ -304,6 +329,9 @@ func (s emptyDelegate) NextDelegate() DelegationTarget { func (s emptyDelegate) PrepareRun() preparedGenericAPIServer { return preparedGenericAPIServer{nil} } +func (s emptyDelegate) MuxCompleteSignals() map[string]<-chan struct{} { + return map[string]<-chan struct{}{} +} // preparedGenericAPIServer is a private wrapper that enforces a call of PrepareRun() before Run can be invoked. type preparedGenericAPIServer struct { @@ -351,6 +379,23 @@ func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error { delayedStopCh := s.lifecycleSignals.AfterShutdownDelayDuration shutdownInitiatedCh := s.lifecycleSignals.ShutdownInitiated + // spawn a new goroutine for closing the MuxComplete signal + // registration happens during construction of the generic api server + // the last server in the chain aggregates signals from the previous instances + go func() { + for _, muxInstalledSignal := range s.GenericAPIServer.MuxCompleteSignals() { + select { + case <-muxInstalledSignal: + continue + case <-stopCh: + klog.V(1).Infof("haven't completed %s, stop requested", s.lifecycleSignals.MuxComplete.Name()) + return + } + } + s.lifecycleSignals.MuxComplete.Signal() + klog.V(1).Infof("%s completed, all registered mux complete signals (%d) have finished", s.lifecycleSignals.MuxComplete.Name(), s.GenericAPIServer.MuxCompleteSignals()) + }() + go func() { defer delayedStopCh.Signal() defer klog.V(1).InfoS("[graceful-termination] shutdown event", "name", delayedStopCh.Name()) diff --git a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver_graceful_termination_test.go b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver_graceful_termination_test.go index d7956c9f64e..2f462408b55 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver_graceful_termination_test.go +++ b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver_graceful_termination_test.go @@ -379,6 +379,47 @@ func TestGracefulTerminationWithKeepListeningDuringGracefulTerminationEnabled(t }() } +func TestMuxComplete(t *testing.T) { + // setup + testSignal := make(chan struct{}) + testSignal2 := make(chan struct{}) + s := newGenericAPIServer(t, true) + s.muxCompleteSignals["TestSignal"] = testSignal + s.muxCompleteSignals["TestSignal2"] = testSignal2 + doer := setupDoer(t, s.SecureServingInfo) + isChanClosed := func(ch <-chan struct{}, delay time.Duration) bool { + time.Sleep(delay) + select { + case <-ch: + return true + default: + return false + } + } + + // start the API server + stopCh, runCompletedCh := make(chan struct{}), make(chan struct{}) + go func() { + defer close(runCompletedCh) + s.PrepareRun().Run(stopCh) + }() + waitForAPIServerStarted(t, doer) + + // act + if isChanClosed(s.lifecycleSignals.MuxComplete.Signaled(), 1*time.Second) { + t.Fatalf("%s is closed whereas the TestSignal is still open", s.lifecycleSignals.MuxComplete.Name()) + } + + close(testSignal) + if isChanClosed(s.lifecycleSignals.MuxComplete.Signaled(), 1*time.Second) { + t.Fatalf("%s is closed whereas the TestSignal2 is still open", s.lifecycleSignals.MuxComplete.Name()) + } + + close(testSignal2) + if !isChanClosed(s.lifecycleSignals.MuxComplete.Signaled(), 1*time.Second) { + t.Fatalf("%s wasn't closed", s.lifecycleSignals.MuxComplete.Name()) + } +} func shouldReuseConnection(t *testing.T) func(httptrace.GotConnInfo) { return func(ci httptrace.GotConnInfo) { if !ci.Reused { diff --git a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver_test.go b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver_test.go index 17f8d810409..29841aa6de0 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver_test.go +++ b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver_test.go @@ -485,6 +485,39 @@ func TestNotRestRoutesHaveAuth(t *testing.T) { } } +func TestMuxCompleteSignals(t *testing.T) { + // setup + cfg, assert := setUp(t) + + // scenario 1: single server with some mux signals + root, err := cfg.Complete(nil).New("rootServer", NewEmptyDelegate()) + assert.NoError(err) + if len(root.MuxCompleteSignals()) != 0 { + assert.Error(fmt.Errorf("unexpected mux signals registered %v in the root server", root.MuxCompleteSignals())) + } + root.RegisterMuxCompleteSignal("rootTestSignal", make(chan struct{})) + if len(root.MuxCompleteSignals()) != 1 { + assert.Error(fmt.Errorf("unexpected mux signals registered %v in the root server", root.MuxCompleteSignals())) + } + + // scenario 2: multiple servers with some mux signals + delegate, err := cfg.Complete(nil).New("delegateServer", NewEmptyDelegate()) + assert.NoError(err) + delegate.RegisterMuxCompleteSignal("delegateTestSignal", make(chan struct{})) + if len(delegate.MuxCompleteSignals()) != 1 { + assert.Error(fmt.Errorf("unexpected mux signals registered %v in the delegate server", delegate.MuxCompleteSignals())) + } + newRoot, err := cfg.Complete(nil).New("newRootServer", delegate) + assert.NoError(err) + if len(newRoot.MuxCompleteSignals()) != 1 { + assert.Error(fmt.Errorf("unexpected mux signals registered %v in the newRoot server", newRoot.MuxCompleteSignals())) + } + newRoot.RegisterMuxCompleteSignal("newRootTestSignal", make(chan struct{})) + if len(newRoot.MuxCompleteSignals()) != 2 { + assert.Error(fmt.Errorf("unexpected mux signals registered %v in the newRoot server", newRoot.MuxCompleteSignals())) + } +} + type mockAuthorizer struct { lastURI string } diff --git a/staging/src/k8s.io/apiserver/pkg/server/lifecycle_signals.go b/staging/src/k8s.io/apiserver/pkg/server/lifecycle_signals.go index 1a939e24fe4..bcd7fd33cf7 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/lifecycle_signals.go +++ b/staging/src/k8s.io/apiserver/pkg/server/lifecycle_signals.go @@ -130,6 +130,11 @@ type lifecycleSignals struct { // HasBeenReady is signaled when the readyz endpoint succeeds for the first time. HasBeenReady lifecycleSignal + + // MuxComplete is signaled when all known HTTP paths have been installed. + // It exists primarily to avoid returning a 404 response when a resource actually exists but we haven't installed the path to a handler. + // The actual logic is implemented by an APIServer using the generic server library. + MuxComplete lifecycleSignal } // newLifecycleSignals returns an instance of lifecycleSignals interface to be used @@ -141,6 +146,7 @@ func newLifecycleSignals() lifecycleSignals { InFlightRequestsDrained: newNamedChannelWrapper("InFlightRequestsDrained"), HTTPServerStoppedListening: newNamedChannelWrapper("HTTPServerStoppedListening"), HasBeenReady: newNamedChannelWrapper("HasBeenReady"), + MuxComplete: newNamedChannelWrapper("MuxComplete"), } }