mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-27 05:27:21 +00:00
aggregator: pass apiServiceRegistrationControllerInitiated signal directly to apiserviceRegistration controller
This commit is contained in:
parent
9e2bdfee02
commit
5116a508a7
@ -41,9 +41,9 @@ func NoMuxAndDiscoveryIncompleteKey(ctx context.Context) bool {
|
|||||||
//
|
//
|
||||||
// The presence of the key is checked in the NotFoundHandler (staging/src/k8s.io/apiserver/pkg/util/notfoundhandler/not_found_handler.go)
|
// The presence of the key is checked in the NotFoundHandler (staging/src/k8s.io/apiserver/pkg/util/notfoundhandler/not_found_handler.go)
|
||||||
//
|
//
|
||||||
// The race may happen when a request reaches the NotFoundHandler because not all paths have been registered in the mux
|
// The primary reason this filter exists is to protect from a potential race between the client's requests reaching the NotFoundHandler and the server becoming ready.
|
||||||
// but when the registered checks are examined in the handler they indicate that the paths have been actually installed.
|
// Without the protection key a request could still get a 404 response when the registered signals changed their status just slightly before reaching the new handler.
|
||||||
// In that case, the presence of the key will make the handler return 503 instead of 404.
|
// In that case, the presence of the key will make the handler return a 503 instead of a 404.
|
||||||
func WithMuxAndDiscoveryComplete(handler http.Handler, muxAndDiscoveryCompleteSignal <-chan struct{}) http.Handler {
|
func WithMuxAndDiscoveryComplete(handler http.Handler, muxAndDiscoveryCompleteSignal <-chan struct{}) http.Handler {
|
||||||
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
|
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
|
||||||
if muxAndDiscoveryCompleteSignal != nil && !isClosed(muxAndDiscoveryCompleteSignal) {
|
if muxAndDiscoveryCompleteSignal != nil && !isClosed(muxAndDiscoveryCompleteSignal) {
|
||||||
|
@ -381,10 +381,10 @@ func TestGracefulTerminationWithKeepListeningDuringGracefulTerminationEnabled(t
|
|||||||
|
|
||||||
func TestMuxAndDiscoveryComplete(t *testing.T) {
|
func TestMuxAndDiscoveryComplete(t *testing.T) {
|
||||||
// setup
|
// setup
|
||||||
testSignal := make(chan struct{})
|
testSignal1 := make(chan struct{})
|
||||||
testSignal2 := make(chan struct{})
|
testSignal2 := make(chan struct{})
|
||||||
s := newGenericAPIServer(t, true)
|
s := newGenericAPIServer(t, true)
|
||||||
s.muxAndDiscoveryCompleteSignals["TestSignal"] = testSignal
|
s.muxAndDiscoveryCompleteSignals["TestSignal1"] = testSignal1
|
||||||
s.muxAndDiscoveryCompleteSignals["TestSignal2"] = testSignal2
|
s.muxAndDiscoveryCompleteSignals["TestSignal2"] = testSignal2
|
||||||
doer := setupDoer(t, s.SecureServingInfo)
|
doer := setupDoer(t, s.SecureServingInfo)
|
||||||
isChanClosed := func(ch <-chan struct{}, delay time.Duration) bool {
|
isChanClosed := func(ch <-chan struct{}, delay time.Duration) bool {
|
||||||
@ -410,7 +410,7 @@ func TestMuxAndDiscoveryComplete(t *testing.T) {
|
|||||||
t.Fatalf("%s is closed whereas the TestSignal is still open", s.lifecycleSignals.MuxAndDiscoveryComplete.Name())
|
t.Fatalf("%s is closed whereas the TestSignal is still open", s.lifecycleSignals.MuxAndDiscoveryComplete.Name())
|
||||||
}
|
}
|
||||||
|
|
||||||
close(testSignal)
|
close(testSignal1)
|
||||||
if isChanClosed(s.lifecycleSignals.MuxAndDiscoveryComplete.Signaled(), 1*time.Second) {
|
if isChanClosed(s.lifecycleSignals.MuxAndDiscoveryComplete.Signaled(), 1*time.Second) {
|
||||||
t.Fatalf("%s is closed whereas the TestSignal2 is still open", s.lifecycleSignals.MuxAndDiscoveryComplete.Name())
|
t.Fatalf("%s is closed whereas the TestSignal2 is still open", s.lifecycleSignals.MuxAndDiscoveryComplete.Name())
|
||||||
}
|
}
|
||||||
|
@ -489,32 +489,32 @@ func TestMuxAndDiscoveryCompleteSignals(t *testing.T) {
|
|||||||
// setup
|
// setup
|
||||||
cfg, assert := setUp(t)
|
cfg, assert := setUp(t)
|
||||||
|
|
||||||
// scenario 1: single server with some mux signals
|
// scenario 1: single server with some signals
|
||||||
root, err := cfg.Complete(nil).New("rootServer", NewEmptyDelegate())
|
root, err := cfg.Complete(nil).New("rootServer", NewEmptyDelegate())
|
||||||
assert.NoError(err)
|
assert.NoError(err)
|
||||||
if len(root.MuxAndDiscoveryCompleteSignals()) != 0 {
|
if len(root.MuxAndDiscoveryCompleteSignals()) != 0 {
|
||||||
assert.Error(fmt.Errorf("unexpected mux signals registered %v in the root server", root.MuxAndDiscoveryCompleteSignals()))
|
assert.Error(fmt.Errorf("unexpected signals %v registered in the root server", root.MuxAndDiscoveryCompleteSignals()))
|
||||||
}
|
}
|
||||||
root.RegisterMuxAndDiscoveryCompleteSignal("rootTestSignal", make(chan struct{}))
|
root.RegisterMuxAndDiscoveryCompleteSignal("rootTestSignal", make(chan struct{}))
|
||||||
if len(root.MuxAndDiscoveryCompleteSignals()) != 1 {
|
if len(root.MuxAndDiscoveryCompleteSignals()) != 1 {
|
||||||
assert.Error(fmt.Errorf("unexpected mux signals registered %v in the root server", root.MuxAndDiscoveryCompleteSignals()))
|
assert.Error(fmt.Errorf("unexpected signals %v registered in the root server", root.MuxAndDiscoveryCompleteSignals()))
|
||||||
}
|
}
|
||||||
|
|
||||||
// scenario 2: multiple servers with some mux signals
|
// scenario 2: multiple servers with some signals
|
||||||
delegate, err := cfg.Complete(nil).New("delegateServer", NewEmptyDelegate())
|
delegate, err := cfg.Complete(nil).New("delegateServer", NewEmptyDelegate())
|
||||||
assert.NoError(err)
|
assert.NoError(err)
|
||||||
delegate.RegisterMuxAndDiscoveryCompleteSignal("delegateTestSignal", make(chan struct{}))
|
delegate.RegisterMuxAndDiscoveryCompleteSignal("delegateTestSignal", make(chan struct{}))
|
||||||
if len(delegate.MuxAndDiscoveryCompleteSignals()) != 1 {
|
if len(delegate.MuxAndDiscoveryCompleteSignals()) != 1 {
|
||||||
assert.Error(fmt.Errorf("unexpected mux signals registered %v in the delegate server", delegate.MuxAndDiscoveryCompleteSignals()))
|
assert.Error(fmt.Errorf("unexpected signals %v registered in the delegate server", delegate.MuxAndDiscoveryCompleteSignals()))
|
||||||
}
|
}
|
||||||
newRoot, err := cfg.Complete(nil).New("newRootServer", delegate)
|
newRoot, err := cfg.Complete(nil).New("newRootServer", delegate)
|
||||||
assert.NoError(err)
|
assert.NoError(err)
|
||||||
if len(newRoot.MuxAndDiscoveryCompleteSignals()) != 1 {
|
if len(newRoot.MuxAndDiscoveryCompleteSignals()) != 1 {
|
||||||
assert.Error(fmt.Errorf("unexpected mux signals registered %v in the newRoot server", newRoot.MuxAndDiscoveryCompleteSignals()))
|
assert.Error(fmt.Errorf("unexpected signals %v registered in the newRoot server", newRoot.MuxAndDiscoveryCompleteSignals()))
|
||||||
}
|
}
|
||||||
newRoot.RegisterMuxAndDiscoveryCompleteSignal("newRootTestSignal", make(chan struct{}))
|
newRoot.RegisterMuxAndDiscoveryCompleteSignal("newRootTestSignal", make(chan struct{}))
|
||||||
if len(newRoot.MuxAndDiscoveryCompleteSignals()) != 2 {
|
if len(newRoot.MuxAndDiscoveryCompleteSignals()) != 2 {
|
||||||
assert.Error(fmt.Errorf("unexpected mux signals registered %v in the newRoot server", newRoot.MuxAndDiscoveryCompleteSignals()))
|
assert.Error(fmt.Errorf("unexpected signals %v registered in the newRoot server", newRoot.MuxAndDiscoveryCompleteSignals()))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -18,7 +18,6 @@ package notfoundhandler
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
|
||||||
"net/http"
|
"net/http"
|
||||||
|
|
||||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||||
@ -46,7 +45,7 @@ type Handler struct {
|
|||||||
|
|
||||||
func (h *Handler) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
|
func (h *Handler) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
|
||||||
if !h.isMuxAndDiscoveryCompleteFn(req.Context()) {
|
if !h.isMuxAndDiscoveryCompleteFn(req.Context()) {
|
||||||
errMsg := fmt.Sprintf("the request has been made before all known HTTP paths have been installed, please try again")
|
errMsg := "the request has been made before all known HTTP paths have been installed, please try again"
|
||||||
err := apierrors.NewServiceUnavailable(errMsg)
|
err := apierrors.NewServiceUnavailable(errMsg)
|
||||||
if err.ErrStatus.Details == nil {
|
if err.ErrStatus.Details == nil {
|
||||||
err.ErrStatus.Details = &metav1.StatusDetails{}
|
err.ErrStatus.Details = &metav1.StatusDetails{}
|
||||||
|
@ -270,12 +270,10 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg
|
|||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
s.GenericAPIServer.AddPostStartHookOrDie("apiservice-registration-controller", func(context genericapiserver.PostStartHookContext) error {
|
s.GenericAPIServer.AddPostStartHookOrDie("apiservice-registration-controller", func(context genericapiserver.PostStartHookContext) error {
|
||||||
handlerSyncedCh := make(chan struct{})
|
go apiserviceRegistrationController.Run(context.StopCh, apiServiceRegistrationControllerInitiated)
|
||||||
go apiserviceRegistrationController.Run(context.StopCh, handlerSyncedCh)
|
|
||||||
select {
|
select {
|
||||||
case <-context.StopCh:
|
case <-context.StopCh:
|
||||||
case <-handlerSyncedCh:
|
case <-apiServiceRegistrationControllerInitiated:
|
||||||
close(apiServiceRegistrationControllerInitiated)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
Loading…
Reference in New Issue
Block a user