mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-03 17:30:00 +00:00
genericapiserver: indroduce muxCompleteSignals for holding signals that indicate all known HTTP paths have been registered
the new field exists primarily to avoid returning a 404 response when a resource actually exists but we haven't installed the path to a handler. it is exposed for easier composition of the individual servers. the primary users of this field are the WithMuxCompleteProtection filter and the NotFoundHandler.
This commit is contained in:
parent
b71fa61b79
commit
ddfbb5d2bb
@ -578,6 +578,7 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G
|
|||||||
handlerChainBuilder := func(handler http.Handler) http.Handler {
|
handlerChainBuilder := func(handler http.Handler) http.Handler {
|
||||||
return c.BuildHandlerChainFunc(handler, c.Config)
|
return c.BuildHandlerChainFunc(handler, c.Config)
|
||||||
}
|
}
|
||||||
|
|
||||||
apiServerHandler := NewAPIServerHandler(name, c.Serializer, handlerChainBuilder, delegationTarget.UnprotectedHandler())
|
apiServerHandler := NewAPIServerHandler(name, c.Serializer, handlerChainBuilder, delegationTarget.UnprotectedHandler())
|
||||||
|
|
||||||
s := &GenericAPIServer{
|
s := &GenericAPIServer{
|
||||||
@ -591,6 +592,9 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G
|
|||||||
delegationTarget: delegationTarget,
|
delegationTarget: delegationTarget,
|
||||||
EquivalentResourceRegistry: c.EquivalentResourceRegistry,
|
EquivalentResourceRegistry: c.EquivalentResourceRegistry,
|
||||||
HandlerChainWaitGroup: c.HandlerChainWaitGroup,
|
HandlerChainWaitGroup: c.HandlerChainWaitGroup,
|
||||||
|
Handler: apiServerHandler,
|
||||||
|
|
||||||
|
listedPathProvider: apiServerHandler,
|
||||||
|
|
||||||
minRequestTimeout: time.Duration(c.MinRequestTimeout) * time.Second,
|
minRequestTimeout: time.Duration(c.MinRequestTimeout) * time.Second,
|
||||||
ShutdownTimeout: c.RequestTimeout,
|
ShutdownTimeout: c.RequestTimeout,
|
||||||
@ -598,10 +602,6 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G
|
|||||||
SecureServingInfo: c.SecureServing,
|
SecureServingInfo: c.SecureServing,
|
||||||
ExternalAddress: c.ExternalAddress,
|
ExternalAddress: c.ExternalAddress,
|
||||||
|
|
||||||
Handler: apiServerHandler,
|
|
||||||
|
|
||||||
listedPathProvider: apiServerHandler,
|
|
||||||
|
|
||||||
openAPIConfig: c.OpenAPIConfig,
|
openAPIConfig: c.OpenAPIConfig,
|
||||||
skipOpenAPIInstallation: c.SkipOpenAPIInstallation,
|
skipOpenAPIInstallation: c.SkipOpenAPIInstallation,
|
||||||
|
|
||||||
@ -626,6 +626,8 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G
|
|||||||
StorageVersionManager: c.StorageVersionManager,
|
StorageVersionManager: c.StorageVersionManager,
|
||||||
|
|
||||||
Version: c.Version,
|
Version: c.Version,
|
||||||
|
|
||||||
|
muxCompleteSignals: map[string]<-chan struct{}{},
|
||||||
}
|
}
|
||||||
|
|
||||||
for {
|
for {
|
||||||
@ -657,6 +659,13 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// register mux signals from the delegated server
|
||||||
|
for k, v := range delegationTarget.MuxCompleteSignals() {
|
||||||
|
if err := s.RegisterMuxCompleteSignal(k, v); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
genericApiServerHookName := "generic-apiserver-start-informers"
|
genericApiServerHookName := "generic-apiserver-start-informers"
|
||||||
if c.SharedInformerFactory != nil {
|
if c.SharedInformerFactory != nil {
|
||||||
if !s.isPostStartHookRegistered(genericApiServerHookName) {
|
if !s.isPostStartHookRegistered(genericApiServerHookName) {
|
||||||
@ -807,6 +816,7 @@ func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler {
|
|||||||
}
|
}
|
||||||
handler = genericapifilters.WithRequestInfo(handler, c.RequestInfoResolver)
|
handler = genericapifilters.WithRequestInfo(handler, c.RequestInfoResolver)
|
||||||
handler = genericapifilters.WithRequestReceivedTimestamp(handler)
|
handler = genericapifilters.WithRequestReceivedTimestamp(handler)
|
||||||
|
handler = genericapifilters.WithMuxCompleteProtection(handler, c.lifecycleSignals.MuxComplete.Signaled())
|
||||||
handler = genericfilters.WithPanicRecovery(handler, c.RequestInfoResolver)
|
handler = genericfilters.WithPanicRecovery(handler, c.RequestInfoResolver)
|
||||||
handler = genericapifilters.WithAuditID(handler)
|
handler = genericapifilters.WithAuditID(handler)
|
||||||
return handler
|
return handler
|
||||||
|
@ -298,6 +298,7 @@ func TestAuthenticationAuditAnnotationsDefaultChain(t *testing.T) {
|
|||||||
RequestInfoResolver: &request.RequestInfoFactory{},
|
RequestInfoResolver: &request.RequestInfoFactory{},
|
||||||
RequestTimeout: 10 * time.Second,
|
RequestTimeout: 10 * time.Second,
|
||||||
LongRunningFunc: func(_ *http.Request, _ *request.RequestInfo) bool { return false },
|
LongRunningFunc: func(_ *http.Request, _ *request.RequestInfo) bool { return false },
|
||||||
|
lifecycleSignals: newLifecycleSignals(),
|
||||||
}
|
}
|
||||||
|
|
||||||
h := DefaultBuildHandlerChain(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
h := DefaultBuildHandlerChain(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
@ -214,6 +214,12 @@ type GenericAPIServer struct {
|
|||||||
// lifecycleSignals provides access to the various signals that happen during the life cycle of the apiserver.
|
// lifecycleSignals provides access to the various signals that happen during the life cycle of the apiserver.
|
||||||
lifecycleSignals lifecycleSignals
|
lifecycleSignals lifecycleSignals
|
||||||
|
|
||||||
|
// muxCompleteSignals holds signals that indicate all known HTTP paths have been registered.
|
||||||
|
// it exists primarily to avoid returning a 404 response when a resource actually exists but we haven't installed the path to a handler.
|
||||||
|
// it is exposed for easier composition of the individual servers.
|
||||||
|
// the primary users of this field are the WithMuxCompleteProtection filter and the NotFoundHandler
|
||||||
|
muxCompleteSignals map[string]<-chan struct{}
|
||||||
|
|
||||||
// ShutdownSendRetryAfter dictates when to initiate shutdown of the HTTP
|
// ShutdownSendRetryAfter dictates when to initiate shutdown of the HTTP
|
||||||
// Server during the graceful termination of the apiserver. If true, we wait
|
// Server during the graceful termination of the apiserver. If true, we wait
|
||||||
// for non longrunning requests in flight to be drained and then initiate a
|
// for non longrunning requests in flight to be drained and then initiate a
|
||||||
@ -247,6 +253,9 @@ type DelegationTarget interface {
|
|||||||
|
|
||||||
// PrepareRun does post API installation setup steps. It calls recursively the same function of the delegates.
|
// PrepareRun does post API installation setup steps. It calls recursively the same function of the delegates.
|
||||||
PrepareRun() preparedGenericAPIServer
|
PrepareRun() preparedGenericAPIServer
|
||||||
|
|
||||||
|
// MuxCompleteSignals exposes registered signals that indicate if all known HTTP paths have been installed.
|
||||||
|
MuxCompleteSignals() map[string]<-chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *GenericAPIServer) UnprotectedHandler() http.Handler {
|
func (s *GenericAPIServer) UnprotectedHandler() http.Handler {
|
||||||
@ -270,7 +279,23 @@ func (s *GenericAPIServer) NextDelegate() DelegationTarget {
|
|||||||
return s.delegationTarget
|
return s.delegationTarget
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// RegisterMuxCompleteSignal registers the given signal that will be used to determine if all known
|
||||||
|
// HTTP paths have been registered. It is okay to call this method after instantiating the generic server but before running.
|
||||||
|
func (s *GenericAPIServer) RegisterMuxCompleteSignal(signalName string, signal <-chan struct{}) error {
|
||||||
|
if _, exists := s.muxCompleteSignals[signalName]; exists {
|
||||||
|
return fmt.Errorf("%s already registered", signalName)
|
||||||
|
}
|
||||||
|
s.muxCompleteSignals[signalName] = signal
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *GenericAPIServer) MuxCompleteSignals() map[string]<-chan struct{} {
|
||||||
|
return s.muxCompleteSignals
|
||||||
|
}
|
||||||
|
|
||||||
type emptyDelegate struct {
|
type emptyDelegate struct {
|
||||||
|
// handler is called at the end of the delegation chain
|
||||||
|
// when a request has been made against an unregistered HTTP path the individual servers will simply pass it through until it reaches the handler.
|
||||||
handler http.Handler
|
handler http.Handler
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -304,6 +329,9 @@ func (s emptyDelegate) NextDelegate() DelegationTarget {
|
|||||||
func (s emptyDelegate) PrepareRun() preparedGenericAPIServer {
|
func (s emptyDelegate) PrepareRun() preparedGenericAPIServer {
|
||||||
return preparedGenericAPIServer{nil}
|
return preparedGenericAPIServer{nil}
|
||||||
}
|
}
|
||||||
|
func (s emptyDelegate) MuxCompleteSignals() map[string]<-chan struct{} {
|
||||||
|
return map[string]<-chan struct{}{}
|
||||||
|
}
|
||||||
|
|
||||||
// preparedGenericAPIServer is a private wrapper that enforces a call of PrepareRun() before Run can be invoked.
|
// preparedGenericAPIServer is a private wrapper that enforces a call of PrepareRun() before Run can be invoked.
|
||||||
type preparedGenericAPIServer struct {
|
type preparedGenericAPIServer struct {
|
||||||
@ -351,6 +379,23 @@ func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error {
|
|||||||
delayedStopCh := s.lifecycleSignals.AfterShutdownDelayDuration
|
delayedStopCh := s.lifecycleSignals.AfterShutdownDelayDuration
|
||||||
shutdownInitiatedCh := s.lifecycleSignals.ShutdownInitiated
|
shutdownInitiatedCh := s.lifecycleSignals.ShutdownInitiated
|
||||||
|
|
||||||
|
// spawn a new goroutine for closing the MuxComplete signal
|
||||||
|
// registration happens during construction of the generic api server
|
||||||
|
// the last server in the chain aggregates signals from the previous instances
|
||||||
|
go func() {
|
||||||
|
for _, muxInstalledSignal := range s.GenericAPIServer.MuxCompleteSignals() {
|
||||||
|
select {
|
||||||
|
case <-muxInstalledSignal:
|
||||||
|
continue
|
||||||
|
case <-stopCh:
|
||||||
|
klog.V(1).Infof("haven't completed %s, stop requested", s.lifecycleSignals.MuxComplete.Name())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
s.lifecycleSignals.MuxComplete.Signal()
|
||||||
|
klog.V(1).Infof("%s completed, all registered mux complete signals (%d) have finished", s.lifecycleSignals.MuxComplete.Name(), s.GenericAPIServer.MuxCompleteSignals())
|
||||||
|
}()
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
defer delayedStopCh.Signal()
|
defer delayedStopCh.Signal()
|
||||||
defer klog.V(1).InfoS("[graceful-termination] shutdown event", "name", delayedStopCh.Name())
|
defer klog.V(1).InfoS("[graceful-termination] shutdown event", "name", delayedStopCh.Name())
|
||||||
|
@ -379,6 +379,47 @@ func TestGracefulTerminationWithKeepListeningDuringGracefulTerminationEnabled(t
|
|||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestMuxComplete(t *testing.T) {
|
||||||
|
// setup
|
||||||
|
testSignal := make(chan struct{})
|
||||||
|
testSignal2 := make(chan struct{})
|
||||||
|
s := newGenericAPIServer(t, true)
|
||||||
|
s.muxCompleteSignals["TestSignal"] = testSignal
|
||||||
|
s.muxCompleteSignals["TestSignal2"] = testSignal2
|
||||||
|
doer := setupDoer(t, s.SecureServingInfo)
|
||||||
|
isChanClosed := func(ch <-chan struct{}, delay time.Duration) bool {
|
||||||
|
time.Sleep(delay)
|
||||||
|
select {
|
||||||
|
case <-ch:
|
||||||
|
return true
|
||||||
|
default:
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// start the API server
|
||||||
|
stopCh, runCompletedCh := make(chan struct{}), make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
defer close(runCompletedCh)
|
||||||
|
s.PrepareRun().Run(stopCh)
|
||||||
|
}()
|
||||||
|
waitForAPIServerStarted(t, doer)
|
||||||
|
|
||||||
|
// act
|
||||||
|
if isChanClosed(s.lifecycleSignals.MuxComplete.Signaled(), 1*time.Second) {
|
||||||
|
t.Fatalf("%s is closed whereas the TestSignal is still open", s.lifecycleSignals.MuxComplete.Name())
|
||||||
|
}
|
||||||
|
|
||||||
|
close(testSignal)
|
||||||
|
if isChanClosed(s.lifecycleSignals.MuxComplete.Signaled(), 1*time.Second) {
|
||||||
|
t.Fatalf("%s is closed whereas the TestSignal2 is still open", s.lifecycleSignals.MuxComplete.Name())
|
||||||
|
}
|
||||||
|
|
||||||
|
close(testSignal2)
|
||||||
|
if !isChanClosed(s.lifecycleSignals.MuxComplete.Signaled(), 1*time.Second) {
|
||||||
|
t.Fatalf("%s wasn't closed", s.lifecycleSignals.MuxComplete.Name())
|
||||||
|
}
|
||||||
|
}
|
||||||
func shouldReuseConnection(t *testing.T) func(httptrace.GotConnInfo) {
|
func shouldReuseConnection(t *testing.T) func(httptrace.GotConnInfo) {
|
||||||
return func(ci httptrace.GotConnInfo) {
|
return func(ci httptrace.GotConnInfo) {
|
||||||
if !ci.Reused {
|
if !ci.Reused {
|
||||||
|
@ -485,6 +485,39 @@ func TestNotRestRoutesHaveAuth(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestMuxCompleteSignals(t *testing.T) {
|
||||||
|
// setup
|
||||||
|
cfg, assert := setUp(t)
|
||||||
|
|
||||||
|
// scenario 1: single server with some mux signals
|
||||||
|
root, err := cfg.Complete(nil).New("rootServer", NewEmptyDelegate())
|
||||||
|
assert.NoError(err)
|
||||||
|
if len(root.MuxCompleteSignals()) != 0 {
|
||||||
|
assert.Error(fmt.Errorf("unexpected mux signals registered %v in the root server", root.MuxCompleteSignals()))
|
||||||
|
}
|
||||||
|
root.RegisterMuxCompleteSignal("rootTestSignal", make(chan struct{}))
|
||||||
|
if len(root.MuxCompleteSignals()) != 1 {
|
||||||
|
assert.Error(fmt.Errorf("unexpected mux signals registered %v in the root server", root.MuxCompleteSignals()))
|
||||||
|
}
|
||||||
|
|
||||||
|
// scenario 2: multiple servers with some mux signals
|
||||||
|
delegate, err := cfg.Complete(nil).New("delegateServer", NewEmptyDelegate())
|
||||||
|
assert.NoError(err)
|
||||||
|
delegate.RegisterMuxCompleteSignal("delegateTestSignal", make(chan struct{}))
|
||||||
|
if len(delegate.MuxCompleteSignals()) != 1 {
|
||||||
|
assert.Error(fmt.Errorf("unexpected mux signals registered %v in the delegate server", delegate.MuxCompleteSignals()))
|
||||||
|
}
|
||||||
|
newRoot, err := cfg.Complete(nil).New("newRootServer", delegate)
|
||||||
|
assert.NoError(err)
|
||||||
|
if len(newRoot.MuxCompleteSignals()) != 1 {
|
||||||
|
assert.Error(fmt.Errorf("unexpected mux signals registered %v in the newRoot server", newRoot.MuxCompleteSignals()))
|
||||||
|
}
|
||||||
|
newRoot.RegisterMuxCompleteSignal("newRootTestSignal", make(chan struct{}))
|
||||||
|
if len(newRoot.MuxCompleteSignals()) != 2 {
|
||||||
|
assert.Error(fmt.Errorf("unexpected mux signals registered %v in the newRoot server", newRoot.MuxCompleteSignals()))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
type mockAuthorizer struct {
|
type mockAuthorizer struct {
|
||||||
lastURI string
|
lastURI string
|
||||||
}
|
}
|
||||||
|
@ -130,6 +130,11 @@ type lifecycleSignals struct {
|
|||||||
|
|
||||||
// HasBeenReady is signaled when the readyz endpoint succeeds for the first time.
|
// HasBeenReady is signaled when the readyz endpoint succeeds for the first time.
|
||||||
HasBeenReady lifecycleSignal
|
HasBeenReady lifecycleSignal
|
||||||
|
|
||||||
|
// MuxComplete is signaled when all known HTTP paths have been installed.
|
||||||
|
// It exists primarily to avoid returning a 404 response when a resource actually exists but we haven't installed the path to a handler.
|
||||||
|
// The actual logic is implemented by an APIServer using the generic server library.
|
||||||
|
MuxComplete lifecycleSignal
|
||||||
}
|
}
|
||||||
|
|
||||||
// newLifecycleSignals returns an instance of lifecycleSignals interface to be used
|
// newLifecycleSignals returns an instance of lifecycleSignals interface to be used
|
||||||
@ -141,6 +146,7 @@ func newLifecycleSignals() lifecycleSignals {
|
|||||||
InFlightRequestsDrained: newNamedChannelWrapper("InFlightRequestsDrained"),
|
InFlightRequestsDrained: newNamedChannelWrapper("InFlightRequestsDrained"),
|
||||||
HTTPServerStoppedListening: newNamedChannelWrapper("HTTPServerStoppedListening"),
|
HTTPServerStoppedListening: newNamedChannelWrapper("HTTPServerStoppedListening"),
|
||||||
HasBeenReady: newNamedChannelWrapper("HasBeenReady"),
|
HasBeenReady: newNamedChannelWrapper("HasBeenReady"),
|
||||||
|
MuxComplete: newNamedChannelWrapper("MuxComplete"),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user