mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-29 06:27:05 +00:00
rename to muxAndDiscoveryComplete
This commit is contained in:
parent
c54463d379
commit
9e2bdfee02
@ -190,7 +190,7 @@ func CreateServerChain(completedOptions completedServerRunOptions, stopCh <-chan
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
notFoundHandler := notfoundhandler.New(kubeAPIServerConfig.GenericConfig.Serializer, genericapifilters.HasMuxCompleteProtectionKey)
|
notFoundHandler := notfoundhandler.New(kubeAPIServerConfig.GenericConfig.Serializer, genericapifilters.NoMuxAndDiscoveryIncompleteKey)
|
||||||
apiExtensionsServer, err := createAPIExtensionsServer(apiExtensionsConfig, genericapiserver.NewEmptyDelegateWithCustomHandler(notFoundHandler))
|
apiExtensionsServer, err := createAPIExtensionsServer(apiExtensionsConfig, genericapiserver.NewEmptyDelegateWithCustomHandler(notFoundHandler))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -21,21 +21,21 @@ import (
|
|||||||
"net/http"
|
"net/http"
|
||||||
)
|
)
|
||||||
|
|
||||||
type muxCompleteProtectionKeyType int
|
type muxAndDiscoveryIncompleteKeyType int
|
||||||
|
|
||||||
const (
|
const (
|
||||||
// muxCompleteProtectionKey is a key under which a protection signal for all requests made before the server have installed all known HTTP paths is stored in the request's context
|
// muxAndDiscoveryIncompleteKey is a key under which a protection signal for all requests made before the server have installed all known HTTP paths is stored in the request's context
|
||||||
muxCompleteProtectionKey muxCompleteProtectionKeyType = iota
|
muxAndDiscoveryIncompleteKey muxAndDiscoveryIncompleteKeyType = iota
|
||||||
)
|
)
|
||||||
|
|
||||||
// HasMuxCompleteProtectionKey checks if the context contains muxCompleteProtectionKey.
|
// NoMuxAndDiscoveryIncompleteKey checks if the context contains muxAndDiscoveryIncompleteKey.
|
||||||
// The presence of the key indicates the request has been made when the HTTP paths weren't installed.
|
// The presence of the key indicates the request has been made when the HTTP paths weren't installed.
|
||||||
func HasMuxCompleteProtectionKey(ctx context.Context) bool {
|
func NoMuxAndDiscoveryIncompleteKey(ctx context.Context) bool {
|
||||||
muxCompleteProtectionKeyValue, _ := ctx.Value(muxCompleteProtectionKey).(string)
|
muxAndDiscoveryCompleteProtectionKeyValue, _ := ctx.Value(muxAndDiscoveryIncompleteKey).(string)
|
||||||
return len(muxCompleteProtectionKeyValue) != 0
|
return len(muxAndDiscoveryCompleteProtectionKeyValue) == 0
|
||||||
}
|
}
|
||||||
|
|
||||||
// WithMuxCompleteProtection puts the muxCompleteProtectionKey in the context if a request has been made before muxCompleteSignal has been ready.
|
// WithMuxAndDiscoveryComplete puts the muxAndDiscoveryIncompleteKey in the context if a request has been made before muxAndDiscoveryCompleteSignal has been ready.
|
||||||
// Putting the key protect us from returning a 404 response instead of a 503.
|
// Putting the key protect us from returning a 404 response instead of a 503.
|
||||||
// It is especially important for controllers like GC and NS since they act on 404s.
|
// It is especially important for controllers like GC and NS since they act on 404s.
|
||||||
//
|
//
|
||||||
@ -44,10 +44,10 @@ func HasMuxCompleteProtectionKey(ctx context.Context) bool {
|
|||||||
// The race may happen when a request reaches the NotFoundHandler because not all paths have been registered in the mux
|
// The race may happen when a request reaches the NotFoundHandler because not all paths have been registered in the mux
|
||||||
// but when the registered checks are examined in the handler they indicate that the paths have been actually installed.
|
// but when the registered checks are examined in the handler they indicate that the paths have been actually installed.
|
||||||
// In that case, the presence of the key will make the handler return 503 instead of 404.
|
// In that case, the presence of the key will make the handler return 503 instead of 404.
|
||||||
func WithMuxCompleteProtection(handler http.Handler, muxCompleteSignal <-chan struct{}) http.Handler {
|
func WithMuxAndDiscoveryComplete(handler http.Handler, muxAndDiscoveryCompleteSignal <-chan struct{}) http.Handler {
|
||||||
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
|
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
|
||||||
if muxCompleteSignal != nil && !isClosed(muxCompleteSignal) {
|
if muxAndDiscoveryCompleteSignal != nil && !isClosed(muxAndDiscoveryCompleteSignal) {
|
||||||
req = req.WithContext(context.WithValue(req.Context(), muxCompleteProtectionKey, "MuxInstallationNotComplete"))
|
req = req.WithContext(context.WithValue(req.Context(), muxAndDiscoveryIncompleteKey, "MuxAndDiscoveryInstallationNotComplete"))
|
||||||
}
|
}
|
||||||
handler.ServeHTTP(w, req)
|
handler.ServeHTTP(w, req)
|
||||||
})
|
})
|
@ -23,23 +23,24 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestWithMuxCompleteProtectionFilter(t *testing.T) {
|
func TestWithMuxAndDiscoveryCompleteProtection(t *testing.T) {
|
||||||
scenarios := []struct {
|
scenarios := []struct {
|
||||||
name string
|
name string
|
||||||
muxCompleteSignal <-chan struct{}
|
muxAndDiscoveryCompleteSignal <-chan struct{}
|
||||||
expectMuxCompleteProtectionKey bool
|
expectNoMuxAndDiscoIncompleteKey bool
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
name: "no signals, no protection key in the ctx",
|
name: "no signals, no key in the ctx",
|
||||||
|
expectNoMuxAndDiscoIncompleteKey: true,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "signal ready, no protection key in the ctx",
|
name: "signal ready, no key in the ctx",
|
||||||
muxCompleteSignal: func() chan struct{} { ch := make(chan struct{}); close(ch); return ch }(),
|
muxAndDiscoveryCompleteSignal: func() chan struct{} { ch := make(chan struct{}); close(ch); return ch }(),
|
||||||
|
expectNoMuxAndDiscoIncompleteKey: true,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "signal not ready, the protection key in the ctx",
|
name: "signal not ready, the key in the ctx",
|
||||||
muxCompleteSignal: make(chan struct{}),
|
muxAndDiscoveryCompleteSignal: make(chan struct{}),
|
||||||
expectMuxCompleteProtectionKey: true,
|
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -50,15 +51,15 @@ func TestWithMuxCompleteProtectionFilter(t *testing.T) {
|
|||||||
delegate := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
|
delegate := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
|
||||||
actualContext = req.Context()
|
actualContext = req.Context()
|
||||||
})
|
})
|
||||||
target := WithMuxCompleteProtection(delegate, scenario.muxCompleteSignal)
|
target := WithMuxAndDiscoveryComplete(delegate, scenario.muxAndDiscoveryCompleteSignal)
|
||||||
|
|
||||||
// act
|
// act
|
||||||
req := &http.Request{}
|
req := &http.Request{}
|
||||||
target.ServeHTTP(httptest.NewRecorder(), req)
|
target.ServeHTTP(httptest.NewRecorder(), req)
|
||||||
|
|
||||||
// validate
|
// validate
|
||||||
if scenario.expectMuxCompleteProtectionKey != HasMuxCompleteProtectionKey(actualContext) {
|
if scenario.expectNoMuxAndDiscoIncompleteKey != NoMuxAndDiscoveryIncompleteKey(actualContext) {
|
||||||
t.Fatalf("expectMuxCompleteProtectionKey in the context = %v, does the actual context contain the key = %v", scenario.expectMuxCompleteProtectionKey, HasMuxCompleteProtectionKey(actualContext))
|
t.Fatalf("expectNoMuxAndDiscoIncompleteKey in the context = %v, does the actual context contain the key = %v", scenario.expectNoMuxAndDiscoIncompleteKey, NoMuxAndDiscoveryIncompleteKey(actualContext))
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
@ -627,7 +627,7 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G
|
|||||||
|
|
||||||
Version: c.Version,
|
Version: c.Version,
|
||||||
|
|
||||||
muxCompleteSignals: map[string]<-chan struct{}{},
|
muxAndDiscoveryCompleteSignals: map[string]<-chan struct{}{},
|
||||||
}
|
}
|
||||||
|
|
||||||
for {
|
for {
|
||||||
@ -660,8 +660,8 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G
|
|||||||
}
|
}
|
||||||
|
|
||||||
// register mux signals from the delegated server
|
// register mux signals from the delegated server
|
||||||
for k, v := range delegationTarget.MuxCompleteSignals() {
|
for k, v := range delegationTarget.MuxAndDiscoveryCompleteSignals() {
|
||||||
if err := s.RegisterMuxCompleteSignal(k, v); err != nil {
|
if err := s.RegisterMuxAndDiscoveryCompleteSignal(k, v); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -816,7 +816,7 @@ func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler {
|
|||||||
}
|
}
|
||||||
handler = genericapifilters.WithRequestInfo(handler, c.RequestInfoResolver)
|
handler = genericapifilters.WithRequestInfo(handler, c.RequestInfoResolver)
|
||||||
handler = genericapifilters.WithRequestReceivedTimestamp(handler)
|
handler = genericapifilters.WithRequestReceivedTimestamp(handler)
|
||||||
handler = genericapifilters.WithMuxCompleteProtection(handler, c.lifecycleSignals.MuxComplete.Signaled())
|
handler = genericapifilters.WithMuxAndDiscoveryComplete(handler, c.lifecycleSignals.MuxAndDiscoveryComplete.Signaled())
|
||||||
handler = genericfilters.WithPanicRecovery(handler, c.RequestInfoResolver)
|
handler = genericfilters.WithPanicRecovery(handler, c.RequestInfoResolver)
|
||||||
handler = genericapifilters.WithAuditID(handler)
|
handler = genericapifilters.WithAuditID(handler)
|
||||||
return handler
|
return handler
|
||||||
|
@ -214,11 +214,11 @@ type GenericAPIServer struct {
|
|||||||
// lifecycleSignals provides access to the various signals that happen during the life cycle of the apiserver.
|
// lifecycleSignals provides access to the various signals that happen during the life cycle of the apiserver.
|
||||||
lifecycleSignals lifecycleSignals
|
lifecycleSignals lifecycleSignals
|
||||||
|
|
||||||
// muxCompleteSignals holds signals that indicate all known HTTP paths have been registered.
|
// muxAndDiscoveryCompleteSignals holds signals that indicate all known HTTP paths have been registered.
|
||||||
// it exists primarily to avoid returning a 404 response when a resource actually exists but we haven't installed the path to a handler.
|
// it exists primarily to avoid returning a 404 response when a resource actually exists but we haven't installed the path to a handler.
|
||||||
// it is exposed for easier composition of the individual servers.
|
// it is exposed for easier composition of the individual servers.
|
||||||
// the primary users of this field are the WithMuxCompleteProtection filter and the NotFoundHandler
|
// the primary users of this field are the WithMuxCompleteProtection filter and the NotFoundHandler
|
||||||
muxCompleteSignals map[string]<-chan struct{}
|
muxAndDiscoveryCompleteSignals map[string]<-chan struct{}
|
||||||
|
|
||||||
// ShutdownSendRetryAfter dictates when to initiate shutdown of the HTTP
|
// ShutdownSendRetryAfter dictates when to initiate shutdown of the HTTP
|
||||||
// Server during the graceful termination of the apiserver. If true, we wait
|
// Server during the graceful termination of the apiserver. If true, we wait
|
||||||
@ -254,8 +254,8 @@ type DelegationTarget interface {
|
|||||||
// PrepareRun does post API installation setup steps. It calls recursively the same function of the delegates.
|
// PrepareRun does post API installation setup steps. It calls recursively the same function of the delegates.
|
||||||
PrepareRun() preparedGenericAPIServer
|
PrepareRun() preparedGenericAPIServer
|
||||||
|
|
||||||
// MuxCompleteSignals exposes registered signals that indicate if all known HTTP paths have been installed.
|
// MuxAndDiscoveryCompleteSignals exposes registered signals that indicate if all known HTTP paths have been installed.
|
||||||
MuxCompleteSignals() map[string]<-chan struct{}
|
MuxAndDiscoveryCompleteSignals() map[string]<-chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *GenericAPIServer) UnprotectedHandler() http.Handler {
|
func (s *GenericAPIServer) UnprotectedHandler() http.Handler {
|
||||||
@ -279,18 +279,18 @@ func (s *GenericAPIServer) NextDelegate() DelegationTarget {
|
|||||||
return s.delegationTarget
|
return s.delegationTarget
|
||||||
}
|
}
|
||||||
|
|
||||||
// RegisterMuxCompleteSignal registers the given signal that will be used to determine if all known
|
// RegisterMuxAndDiscoveryCompleteSignal registers the given signal that will be used to determine if all known
|
||||||
// HTTP paths have been registered. It is okay to call this method after instantiating the generic server but before running.
|
// HTTP paths have been registered. It is okay to call this method after instantiating the generic server but before running.
|
||||||
func (s *GenericAPIServer) RegisterMuxCompleteSignal(signalName string, signal <-chan struct{}) error {
|
func (s *GenericAPIServer) RegisterMuxAndDiscoveryCompleteSignal(signalName string, signal <-chan struct{}) error {
|
||||||
if _, exists := s.muxCompleteSignals[signalName]; exists {
|
if _, exists := s.muxAndDiscoveryCompleteSignals[signalName]; exists {
|
||||||
return fmt.Errorf("%s already registered", signalName)
|
return fmt.Errorf("%s already registered", signalName)
|
||||||
}
|
}
|
||||||
s.muxCompleteSignals[signalName] = signal
|
s.muxAndDiscoveryCompleteSignals[signalName] = signal
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *GenericAPIServer) MuxCompleteSignals() map[string]<-chan struct{} {
|
func (s *GenericAPIServer) MuxAndDiscoveryCompleteSignals() map[string]<-chan struct{} {
|
||||||
return s.muxCompleteSignals
|
return s.muxAndDiscoveryCompleteSignals
|
||||||
}
|
}
|
||||||
|
|
||||||
type emptyDelegate struct {
|
type emptyDelegate struct {
|
||||||
@ -329,7 +329,7 @@ func (s emptyDelegate) NextDelegate() DelegationTarget {
|
|||||||
func (s emptyDelegate) PrepareRun() preparedGenericAPIServer {
|
func (s emptyDelegate) PrepareRun() preparedGenericAPIServer {
|
||||||
return preparedGenericAPIServer{nil}
|
return preparedGenericAPIServer{nil}
|
||||||
}
|
}
|
||||||
func (s emptyDelegate) MuxCompleteSignals() map[string]<-chan struct{} {
|
func (s emptyDelegate) MuxAndDiscoveryCompleteSignals() map[string]<-chan struct{} {
|
||||||
return map[string]<-chan struct{}{}
|
return map[string]<-chan struct{}{}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -379,21 +379,21 @@ func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error {
|
|||||||
delayedStopCh := s.lifecycleSignals.AfterShutdownDelayDuration
|
delayedStopCh := s.lifecycleSignals.AfterShutdownDelayDuration
|
||||||
shutdownInitiatedCh := s.lifecycleSignals.ShutdownInitiated
|
shutdownInitiatedCh := s.lifecycleSignals.ShutdownInitiated
|
||||||
|
|
||||||
// spawn a new goroutine for closing the MuxComplete signal
|
// spawn a new goroutine for closing the MuxAndDiscoveryComplete signal
|
||||||
// registration happens during construction of the generic api server
|
// registration happens during construction of the generic api server
|
||||||
// the last server in the chain aggregates signals from the previous instances
|
// the last server in the chain aggregates signals from the previous instances
|
||||||
go func() {
|
go func() {
|
||||||
for _, muxInstalledSignal := range s.GenericAPIServer.MuxCompleteSignals() {
|
for _, muxAndDiscoveryCompletedSignal := range s.GenericAPIServer.MuxAndDiscoveryCompleteSignals() {
|
||||||
select {
|
select {
|
||||||
case <-muxInstalledSignal:
|
case <-muxAndDiscoveryCompletedSignal:
|
||||||
continue
|
continue
|
||||||
case <-stopCh:
|
case <-stopCh:
|
||||||
klog.V(1).Infof("haven't completed %s, stop requested", s.lifecycleSignals.MuxComplete.Name())
|
klog.V(1).Infof("haven't completed %s, stop requested", s.lifecycleSignals.MuxAndDiscoveryComplete.Name())
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
s.lifecycleSignals.MuxComplete.Signal()
|
s.lifecycleSignals.MuxAndDiscoveryComplete.Signal()
|
||||||
klog.V(1).Infof("%s completed, all registered mux complete signals (%d) have finished", s.lifecycleSignals.MuxComplete.Name(), s.GenericAPIServer.MuxCompleteSignals())
|
klog.V(1).Infof("%s has all endpoints registered and discovery information is complete", s.lifecycleSignals.MuxAndDiscoveryComplete.Name())
|
||||||
}()
|
}()
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
|
@ -379,13 +379,13 @@ func TestGracefulTerminationWithKeepListeningDuringGracefulTerminationEnabled(t
|
|||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestMuxComplete(t *testing.T) {
|
func TestMuxAndDiscoveryComplete(t *testing.T) {
|
||||||
// setup
|
// setup
|
||||||
testSignal := make(chan struct{})
|
testSignal := make(chan struct{})
|
||||||
testSignal2 := make(chan struct{})
|
testSignal2 := make(chan struct{})
|
||||||
s := newGenericAPIServer(t, true)
|
s := newGenericAPIServer(t, true)
|
||||||
s.muxCompleteSignals["TestSignal"] = testSignal
|
s.muxAndDiscoveryCompleteSignals["TestSignal"] = testSignal
|
||||||
s.muxCompleteSignals["TestSignal2"] = testSignal2
|
s.muxAndDiscoveryCompleteSignals["TestSignal2"] = testSignal2
|
||||||
doer := setupDoer(t, s.SecureServingInfo)
|
doer := setupDoer(t, s.SecureServingInfo)
|
||||||
isChanClosed := func(ch <-chan struct{}, delay time.Duration) bool {
|
isChanClosed := func(ch <-chan struct{}, delay time.Duration) bool {
|
||||||
time.Sleep(delay)
|
time.Sleep(delay)
|
||||||
@ -406,18 +406,18 @@ func TestMuxComplete(t *testing.T) {
|
|||||||
waitForAPIServerStarted(t, doer)
|
waitForAPIServerStarted(t, doer)
|
||||||
|
|
||||||
// act
|
// act
|
||||||
if isChanClosed(s.lifecycleSignals.MuxComplete.Signaled(), 1*time.Second) {
|
if isChanClosed(s.lifecycleSignals.MuxAndDiscoveryComplete.Signaled(), 1*time.Second) {
|
||||||
t.Fatalf("%s is closed whereas the TestSignal is still open", s.lifecycleSignals.MuxComplete.Name())
|
t.Fatalf("%s is closed whereas the TestSignal is still open", s.lifecycleSignals.MuxAndDiscoveryComplete.Name())
|
||||||
}
|
}
|
||||||
|
|
||||||
close(testSignal)
|
close(testSignal)
|
||||||
if isChanClosed(s.lifecycleSignals.MuxComplete.Signaled(), 1*time.Second) {
|
if isChanClosed(s.lifecycleSignals.MuxAndDiscoveryComplete.Signaled(), 1*time.Second) {
|
||||||
t.Fatalf("%s is closed whereas the TestSignal2 is still open", s.lifecycleSignals.MuxComplete.Name())
|
t.Fatalf("%s is closed whereas the TestSignal2 is still open", s.lifecycleSignals.MuxAndDiscoveryComplete.Name())
|
||||||
}
|
}
|
||||||
|
|
||||||
close(testSignal2)
|
close(testSignal2)
|
||||||
if !isChanClosed(s.lifecycleSignals.MuxComplete.Signaled(), 1*time.Second) {
|
if !isChanClosed(s.lifecycleSignals.MuxAndDiscoveryComplete.Signaled(), 1*time.Second) {
|
||||||
t.Fatalf("%s wasn't closed", s.lifecycleSignals.MuxComplete.Name())
|
t.Fatalf("%s wasn't closed", s.lifecycleSignals.MuxAndDiscoveryComplete.Name())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
func shouldReuseConnection(t *testing.T) func(httptrace.GotConnInfo) {
|
func shouldReuseConnection(t *testing.T) func(httptrace.GotConnInfo) {
|
||||||
|
@ -485,36 +485,36 @@ func TestNotRestRoutesHaveAuth(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestMuxCompleteSignals(t *testing.T) {
|
func TestMuxAndDiscoveryCompleteSignals(t *testing.T) {
|
||||||
// setup
|
// setup
|
||||||
cfg, assert := setUp(t)
|
cfg, assert := setUp(t)
|
||||||
|
|
||||||
// scenario 1: single server with some mux signals
|
// scenario 1: single server with some mux signals
|
||||||
root, err := cfg.Complete(nil).New("rootServer", NewEmptyDelegate())
|
root, err := cfg.Complete(nil).New("rootServer", NewEmptyDelegate())
|
||||||
assert.NoError(err)
|
assert.NoError(err)
|
||||||
if len(root.MuxCompleteSignals()) != 0 {
|
if len(root.MuxAndDiscoveryCompleteSignals()) != 0 {
|
||||||
assert.Error(fmt.Errorf("unexpected mux signals registered %v in the root server", root.MuxCompleteSignals()))
|
assert.Error(fmt.Errorf("unexpected mux signals registered %v in the root server", root.MuxAndDiscoveryCompleteSignals()))
|
||||||
}
|
}
|
||||||
root.RegisterMuxCompleteSignal("rootTestSignal", make(chan struct{}))
|
root.RegisterMuxAndDiscoveryCompleteSignal("rootTestSignal", make(chan struct{}))
|
||||||
if len(root.MuxCompleteSignals()) != 1 {
|
if len(root.MuxAndDiscoveryCompleteSignals()) != 1 {
|
||||||
assert.Error(fmt.Errorf("unexpected mux signals registered %v in the root server", root.MuxCompleteSignals()))
|
assert.Error(fmt.Errorf("unexpected mux signals registered %v in the root server", root.MuxAndDiscoveryCompleteSignals()))
|
||||||
}
|
}
|
||||||
|
|
||||||
// scenario 2: multiple servers with some mux signals
|
// scenario 2: multiple servers with some mux signals
|
||||||
delegate, err := cfg.Complete(nil).New("delegateServer", NewEmptyDelegate())
|
delegate, err := cfg.Complete(nil).New("delegateServer", NewEmptyDelegate())
|
||||||
assert.NoError(err)
|
assert.NoError(err)
|
||||||
delegate.RegisterMuxCompleteSignal("delegateTestSignal", make(chan struct{}))
|
delegate.RegisterMuxAndDiscoveryCompleteSignal("delegateTestSignal", make(chan struct{}))
|
||||||
if len(delegate.MuxCompleteSignals()) != 1 {
|
if len(delegate.MuxAndDiscoveryCompleteSignals()) != 1 {
|
||||||
assert.Error(fmt.Errorf("unexpected mux signals registered %v in the delegate server", delegate.MuxCompleteSignals()))
|
assert.Error(fmt.Errorf("unexpected mux signals registered %v in the delegate server", delegate.MuxAndDiscoveryCompleteSignals()))
|
||||||
}
|
}
|
||||||
newRoot, err := cfg.Complete(nil).New("newRootServer", delegate)
|
newRoot, err := cfg.Complete(nil).New("newRootServer", delegate)
|
||||||
assert.NoError(err)
|
assert.NoError(err)
|
||||||
if len(newRoot.MuxCompleteSignals()) != 1 {
|
if len(newRoot.MuxAndDiscoveryCompleteSignals()) != 1 {
|
||||||
assert.Error(fmt.Errorf("unexpected mux signals registered %v in the newRoot server", newRoot.MuxCompleteSignals()))
|
assert.Error(fmt.Errorf("unexpected mux signals registered %v in the newRoot server", newRoot.MuxAndDiscoveryCompleteSignals()))
|
||||||
}
|
}
|
||||||
newRoot.RegisterMuxCompleteSignal("newRootTestSignal", make(chan struct{}))
|
newRoot.RegisterMuxAndDiscoveryCompleteSignal("newRootTestSignal", make(chan struct{}))
|
||||||
if len(newRoot.MuxCompleteSignals()) != 2 {
|
if len(newRoot.MuxAndDiscoveryCompleteSignals()) != 2 {
|
||||||
assert.Error(fmt.Errorf("unexpected mux signals registered %v in the newRoot server", newRoot.MuxCompleteSignals()))
|
assert.Error(fmt.Errorf("unexpected mux signals registered %v in the newRoot server", newRoot.MuxAndDiscoveryCompleteSignals()))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -131,10 +131,10 @@ type lifecycleSignals struct {
|
|||||||
// HasBeenReady is signaled when the readyz endpoint succeeds for the first time.
|
// HasBeenReady is signaled when the readyz endpoint succeeds for the first time.
|
||||||
HasBeenReady lifecycleSignal
|
HasBeenReady lifecycleSignal
|
||||||
|
|
||||||
// MuxComplete is signaled when all known HTTP paths have been installed.
|
// MuxAndDiscoveryComplete is signaled when all known HTTP paths have been installed.
|
||||||
// It exists primarily to avoid returning a 404 response when a resource actually exists but we haven't installed the path to a handler.
|
// It exists primarily to avoid returning a 404 response when a resource actually exists but we haven't installed the path to a handler.
|
||||||
// The actual logic is implemented by an APIServer using the generic server library.
|
// The actual logic is implemented by an APIServer using the generic server library.
|
||||||
MuxComplete lifecycleSignal
|
MuxAndDiscoveryComplete lifecycleSignal
|
||||||
}
|
}
|
||||||
|
|
||||||
// newLifecycleSignals returns an instance of lifecycleSignals interface to be used
|
// newLifecycleSignals returns an instance of lifecycleSignals interface to be used
|
||||||
@ -146,7 +146,7 @@ func newLifecycleSignals() lifecycleSignals {
|
|||||||
InFlightRequestsDrained: newNamedChannelWrapper("InFlightRequestsDrained"),
|
InFlightRequestsDrained: newNamedChannelWrapper("InFlightRequestsDrained"),
|
||||||
HTTPServerStoppedListening: newNamedChannelWrapper("HTTPServerStoppedListening"),
|
HTTPServerStoppedListening: newNamedChannelWrapper("HTTPServerStoppedListening"),
|
||||||
HasBeenReady: newNamedChannelWrapper("HasBeenReady"),
|
HasBeenReady: newNamedChannelWrapper("HasBeenReady"),
|
||||||
MuxComplete: newNamedChannelWrapper("MuxComplete"),
|
MuxAndDiscoveryComplete: newNamedChannelWrapper("MuxAndDiscoveryComplete"),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -35,17 +35,17 @@ import (
|
|||||||
//
|
//
|
||||||
// Note that we don't want to add additional checks to the readyz path as it might prevent fixing bricked clusters.
|
// Note that we don't want to add additional checks to the readyz path as it might prevent fixing bricked clusters.
|
||||||
// This specific handler is meant to "protect" requests that arrive before the paths and handlers are fully initialized.
|
// This specific handler is meant to "protect" requests that arrive before the paths and handlers are fully initialized.
|
||||||
func New(serializer runtime.NegotiatedSerializer, hasMuxIncompleteKeyFn func(ctx context.Context) bool) *Handler {
|
func New(serializer runtime.NegotiatedSerializer, isMuxAndDiscoveryCompleteFn func(ctx context.Context) bool) *Handler {
|
||||||
return &Handler{serializer: serializer, hasMuxIncompleteKeyFn: hasMuxIncompleteKeyFn}
|
return &Handler{serializer: serializer, isMuxAndDiscoveryCompleteFn: isMuxAndDiscoveryCompleteFn}
|
||||||
}
|
}
|
||||||
|
|
||||||
type Handler struct {
|
type Handler struct {
|
||||||
serializer runtime.NegotiatedSerializer
|
serializer runtime.NegotiatedSerializer
|
||||||
hasMuxIncompleteKeyFn func(ctx context.Context) bool
|
isMuxAndDiscoveryCompleteFn func(ctx context.Context) bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *Handler) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
|
func (h *Handler) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
|
||||||
if h.hasMuxIncompleteKeyFn(req.Context()) {
|
if !h.isMuxAndDiscoveryCompleteFn(req.Context()) {
|
||||||
errMsg := fmt.Sprintf("the request has been made before all known HTTP paths have been installed, please try again")
|
errMsg := fmt.Sprintf("the request has been made before all known HTTP paths have been installed, please try again")
|
||||||
err := apierrors.NewServiceUnavailable(errMsg)
|
err := apierrors.NewServiceUnavailable(errMsg)
|
||||||
if err.ErrStatus.Details == nil {
|
if err.ErrStatus.Details == nil {
|
||||||
|
@ -28,10 +28,10 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func TestNotFoundHandler(t *testing.T) {
|
func TestNotFoundHandler(t *testing.T) {
|
||||||
hasMuxIncompleteKeyGlobalValue := false
|
isMuxAndDiscoveryCompleteGlobalValue := true
|
||||||
hasMuxIncompleteKeyTestFn := func(ctx context.Context) bool { return hasMuxIncompleteKeyGlobalValue }
|
isMuxAndDiscoveryCompleteTestFn := func(ctx context.Context) bool { return isMuxAndDiscoveryCompleteGlobalValue }
|
||||||
serializer := serializer.NewCodecFactory(runtime.NewScheme()).WithoutConversion()
|
serializer := serializer.NewCodecFactory(runtime.NewScheme()).WithoutConversion()
|
||||||
target := New(serializer, hasMuxIncompleteKeyTestFn)
|
target := New(serializer, isMuxAndDiscoveryCompleteTestFn)
|
||||||
|
|
||||||
// scenario 1: pretend the request has been made after the signal has been ready
|
// scenario 1: pretend the request has been made after the signal has been ready
|
||||||
req := httptest.NewRequest("GET", "http://apiserver.com/apis/flowcontrol.apiserver.k8s.io/v1beta1", nil)
|
req := httptest.NewRequest("GET", "http://apiserver.com/apis/flowcontrol.apiserver.k8s.io/v1beta1", nil)
|
||||||
@ -54,7 +54,7 @@ func TestNotFoundHandler(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// scenario 2: pretend the request has been made before the signal has been ready
|
// scenario 2: pretend the request has been made before the signal has been ready
|
||||||
hasMuxIncompleteKeyGlobalValue = true
|
isMuxAndDiscoveryCompleteGlobalValue = false
|
||||||
rw = httptest.NewRecorder()
|
rw = httptest.NewRecorder()
|
||||||
|
|
||||||
target.ServeHTTP(rw, req)
|
target.ServeHTTP(rw, req)
|
||||||
|
@ -187,7 +187,7 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg
|
|||||||
//
|
//
|
||||||
// Note that the APIServiceRegistrationController waits for APIServiceInformer to synced before doing its work.
|
// Note that the APIServiceRegistrationController waits for APIServiceInformer to synced before doing its work.
|
||||||
apiServiceRegistrationControllerInitiated := make(chan struct{})
|
apiServiceRegistrationControllerInitiated := make(chan struct{})
|
||||||
if err := genericServer.RegisterMuxCompleteSignal("APIServiceRegistrationControllerInitiated", apiServiceRegistrationControllerInitiated); err != nil {
|
if err := genericServer.RegisterMuxAndDiscoveryCompleteSignal("APIServiceRegistrationControllerInitiated", apiServiceRegistrationControllerInitiated); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user