mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-05 02:09:56 +00:00
Merge pull request #104748 from p0lyn0mial/not-found-handler
return 503 for aggregated APIs when the APIServiceRegistrationController hasn't finished installing all known APIServices
This commit is contained in:
commit
7c53095218
@ -38,6 +38,7 @@ import (
|
|||||||
"k8s.io/apimachinery/pkg/util/sets"
|
"k8s.io/apimachinery/pkg/util/sets"
|
||||||
"k8s.io/apiserver/pkg/admission"
|
"k8s.io/apiserver/pkg/admission"
|
||||||
"k8s.io/apiserver/pkg/authorization/authorizer"
|
"k8s.io/apiserver/pkg/authorization/authorizer"
|
||||||
|
genericapifilters "k8s.io/apiserver/pkg/endpoints/filters"
|
||||||
openapinamer "k8s.io/apiserver/pkg/endpoints/openapi"
|
openapinamer "k8s.io/apiserver/pkg/endpoints/openapi"
|
||||||
genericfeatures "k8s.io/apiserver/pkg/features"
|
genericfeatures "k8s.io/apiserver/pkg/features"
|
||||||
genericapiserver "k8s.io/apiserver/pkg/server"
|
genericapiserver "k8s.io/apiserver/pkg/server"
|
||||||
@ -47,6 +48,7 @@ import (
|
|||||||
serverstorage "k8s.io/apiserver/pkg/server/storage"
|
serverstorage "k8s.io/apiserver/pkg/server/storage"
|
||||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||||
utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol"
|
utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol"
|
||||||
|
"k8s.io/apiserver/pkg/util/notfoundhandler"
|
||||||
"k8s.io/apiserver/pkg/util/webhook"
|
"k8s.io/apiserver/pkg/util/webhook"
|
||||||
clientgoinformers "k8s.io/client-go/informers"
|
clientgoinformers "k8s.io/client-go/informers"
|
||||||
clientgoclientset "k8s.io/client-go/kubernetes"
|
clientgoclientset "k8s.io/client-go/kubernetes"
|
||||||
@ -187,7 +189,9 @@ func CreateServerChain(completedOptions completedServerRunOptions, stopCh <-chan
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
apiExtensionsServer, err := createAPIExtensionsServer(apiExtensionsConfig, genericapiserver.NewEmptyDelegate())
|
|
||||||
|
notFoundHandler := notfoundhandler.New(kubeAPIServerConfig.GenericConfig.Serializer, genericapifilters.NoMuxAndDiscoveryIncompleteKey)
|
||||||
|
apiExtensionsServer, err := createAPIExtensionsServer(apiExtensionsConfig, genericapiserver.NewEmptyDelegateWithCustomHandler(notFoundHandler))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,64 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2021 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package filters
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"net/http"
|
||||||
|
)
|
||||||
|
|
||||||
|
type muxAndDiscoveryIncompleteKeyType int
|
||||||
|
|
||||||
|
const (
|
||||||
|
// muxAndDiscoveryIncompleteKey is a key under which a protection signal for all requests made before the server have installed all known HTTP paths is stored in the request's context
|
||||||
|
muxAndDiscoveryIncompleteKey muxAndDiscoveryIncompleteKeyType = iota
|
||||||
|
)
|
||||||
|
|
||||||
|
// NoMuxAndDiscoveryIncompleteKey checks if the context contains muxAndDiscoveryIncompleteKey.
|
||||||
|
// The presence of the key indicates the request has been made when the HTTP paths weren't installed.
|
||||||
|
func NoMuxAndDiscoveryIncompleteKey(ctx context.Context) bool {
|
||||||
|
muxAndDiscoveryCompleteProtectionKeyValue, _ := ctx.Value(muxAndDiscoveryIncompleteKey).(string)
|
||||||
|
return len(muxAndDiscoveryCompleteProtectionKeyValue) == 0
|
||||||
|
}
|
||||||
|
|
||||||
|
// WithMuxAndDiscoveryComplete puts the muxAndDiscoveryIncompleteKey in the context if a request has been made before muxAndDiscoveryCompleteSignal has been ready.
|
||||||
|
// Putting the key protect us from returning a 404 response instead of a 503.
|
||||||
|
// It is especially important for controllers like GC and NS since they act on 404s.
|
||||||
|
//
|
||||||
|
// The presence of the key is checked in the NotFoundHandler (staging/src/k8s.io/apiserver/pkg/util/notfoundhandler/not_found_handler.go)
|
||||||
|
//
|
||||||
|
// The primary reason this filter exists is to protect from a potential race between the client's requests reaching the NotFoundHandler and the server becoming ready.
|
||||||
|
// Without the protection key a request could still get a 404 response when the registered signals changed their status just slightly before reaching the new handler.
|
||||||
|
// In that case, the presence of the key will make the handler return a 503 instead of a 404.
|
||||||
|
func WithMuxAndDiscoveryComplete(handler http.Handler, muxAndDiscoveryCompleteSignal <-chan struct{}) http.Handler {
|
||||||
|
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
|
||||||
|
if muxAndDiscoveryCompleteSignal != nil && !isClosed(muxAndDiscoveryCompleteSignal) {
|
||||||
|
req = req.WithContext(context.WithValue(req.Context(), muxAndDiscoveryIncompleteKey, "MuxAndDiscoveryInstallationNotComplete"))
|
||||||
|
}
|
||||||
|
handler.ServeHTTP(w, req)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// isClosed is a convenience function that simply check if the given chan has been closed
|
||||||
|
func isClosed(ch <-chan struct{}) bool {
|
||||||
|
select {
|
||||||
|
case <-ch:
|
||||||
|
return true
|
||||||
|
default:
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,66 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2021 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package filters
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"net/http"
|
||||||
|
"net/http/httptest"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestWithMuxAndDiscoveryCompleteProtection(t *testing.T) {
|
||||||
|
scenarios := []struct {
|
||||||
|
name string
|
||||||
|
muxAndDiscoveryCompleteSignal <-chan struct{}
|
||||||
|
expectNoMuxAndDiscoIncompleteKey bool
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "no signals, no key in the ctx",
|
||||||
|
expectNoMuxAndDiscoIncompleteKey: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "signal ready, no key in the ctx",
|
||||||
|
muxAndDiscoveryCompleteSignal: func() chan struct{} { ch := make(chan struct{}); close(ch); return ch }(),
|
||||||
|
expectNoMuxAndDiscoIncompleteKey: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "signal not ready, the key in the ctx",
|
||||||
|
muxAndDiscoveryCompleteSignal: make(chan struct{}),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, scenario := range scenarios {
|
||||||
|
t.Run(scenario.name, func(t *testing.T) {
|
||||||
|
// setup
|
||||||
|
var actualContext context.Context
|
||||||
|
delegate := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
|
||||||
|
actualContext = req.Context()
|
||||||
|
})
|
||||||
|
target := WithMuxAndDiscoveryComplete(delegate, scenario.muxAndDiscoveryCompleteSignal)
|
||||||
|
|
||||||
|
// act
|
||||||
|
req := &http.Request{}
|
||||||
|
target.ServeHTTP(httptest.NewRecorder(), req)
|
||||||
|
|
||||||
|
// validate
|
||||||
|
if scenario.expectNoMuxAndDiscoIncompleteKey != NoMuxAndDiscoveryIncompleteKey(actualContext) {
|
||||||
|
t.Fatalf("expectNoMuxAndDiscoIncompleteKey in the context = %v, does the actual context contain the key = %v", scenario.expectNoMuxAndDiscoIncompleteKey, NoMuxAndDiscoveryIncompleteKey(actualContext))
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
@ -578,6 +578,7 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G
|
|||||||
handlerChainBuilder := func(handler http.Handler) http.Handler {
|
handlerChainBuilder := func(handler http.Handler) http.Handler {
|
||||||
return c.BuildHandlerChainFunc(handler, c.Config)
|
return c.BuildHandlerChainFunc(handler, c.Config)
|
||||||
}
|
}
|
||||||
|
|
||||||
apiServerHandler := NewAPIServerHandler(name, c.Serializer, handlerChainBuilder, delegationTarget.UnprotectedHandler())
|
apiServerHandler := NewAPIServerHandler(name, c.Serializer, handlerChainBuilder, delegationTarget.UnprotectedHandler())
|
||||||
|
|
||||||
s := &GenericAPIServer{
|
s := &GenericAPIServer{
|
||||||
@ -591,6 +592,9 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G
|
|||||||
delegationTarget: delegationTarget,
|
delegationTarget: delegationTarget,
|
||||||
EquivalentResourceRegistry: c.EquivalentResourceRegistry,
|
EquivalentResourceRegistry: c.EquivalentResourceRegistry,
|
||||||
HandlerChainWaitGroup: c.HandlerChainWaitGroup,
|
HandlerChainWaitGroup: c.HandlerChainWaitGroup,
|
||||||
|
Handler: apiServerHandler,
|
||||||
|
|
||||||
|
listedPathProvider: apiServerHandler,
|
||||||
|
|
||||||
minRequestTimeout: time.Duration(c.MinRequestTimeout) * time.Second,
|
minRequestTimeout: time.Duration(c.MinRequestTimeout) * time.Second,
|
||||||
ShutdownTimeout: c.RequestTimeout,
|
ShutdownTimeout: c.RequestTimeout,
|
||||||
@ -598,10 +602,6 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G
|
|||||||
SecureServingInfo: c.SecureServing,
|
SecureServingInfo: c.SecureServing,
|
||||||
ExternalAddress: c.ExternalAddress,
|
ExternalAddress: c.ExternalAddress,
|
||||||
|
|
||||||
Handler: apiServerHandler,
|
|
||||||
|
|
||||||
listedPathProvider: apiServerHandler,
|
|
||||||
|
|
||||||
openAPIConfig: c.OpenAPIConfig,
|
openAPIConfig: c.OpenAPIConfig,
|
||||||
skipOpenAPIInstallation: c.SkipOpenAPIInstallation,
|
skipOpenAPIInstallation: c.SkipOpenAPIInstallation,
|
||||||
|
|
||||||
@ -626,6 +626,8 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G
|
|||||||
StorageVersionManager: c.StorageVersionManager,
|
StorageVersionManager: c.StorageVersionManager,
|
||||||
|
|
||||||
Version: c.Version,
|
Version: c.Version,
|
||||||
|
|
||||||
|
muxAndDiscoveryCompleteSignals: map[string]<-chan struct{}{},
|
||||||
}
|
}
|
||||||
|
|
||||||
for {
|
for {
|
||||||
@ -657,6 +659,13 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// register mux signals from the delegated server
|
||||||
|
for k, v := range delegationTarget.MuxAndDiscoveryCompleteSignals() {
|
||||||
|
if err := s.RegisterMuxAndDiscoveryCompleteSignal(k, v); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
genericApiServerHookName := "generic-apiserver-start-informers"
|
genericApiServerHookName := "generic-apiserver-start-informers"
|
||||||
if c.SharedInformerFactory != nil {
|
if c.SharedInformerFactory != nil {
|
||||||
if !s.isPostStartHookRegistered(genericApiServerHookName) {
|
if !s.isPostStartHookRegistered(genericApiServerHookName) {
|
||||||
@ -807,6 +816,7 @@ func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler {
|
|||||||
}
|
}
|
||||||
handler = genericapifilters.WithRequestInfo(handler, c.RequestInfoResolver)
|
handler = genericapifilters.WithRequestInfo(handler, c.RequestInfoResolver)
|
||||||
handler = genericapifilters.WithRequestReceivedTimestamp(handler)
|
handler = genericapifilters.WithRequestReceivedTimestamp(handler)
|
||||||
|
handler = genericapifilters.WithMuxAndDiscoveryComplete(handler, c.lifecycleSignals.MuxAndDiscoveryComplete.Signaled())
|
||||||
handler = genericfilters.WithPanicRecovery(handler, c.RequestInfoResolver)
|
handler = genericfilters.WithPanicRecovery(handler, c.RequestInfoResolver)
|
||||||
handler = genericapifilters.WithAuditID(handler)
|
handler = genericapifilters.WithAuditID(handler)
|
||||||
return handler
|
return handler
|
||||||
|
@ -298,6 +298,7 @@ func TestAuthenticationAuditAnnotationsDefaultChain(t *testing.T) {
|
|||||||
RequestInfoResolver: &request.RequestInfoFactory{},
|
RequestInfoResolver: &request.RequestInfoFactory{},
|
||||||
RequestTimeout: 10 * time.Second,
|
RequestTimeout: 10 * time.Second,
|
||||||
LongRunningFunc: func(_ *http.Request, _ *request.RequestInfo) bool { return false },
|
LongRunningFunc: func(_ *http.Request, _ *request.RequestInfo) bool { return false },
|
||||||
|
lifecycleSignals: newLifecycleSignals(),
|
||||||
}
|
}
|
||||||
|
|
||||||
h := DefaultBuildHandlerChain(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
h := DefaultBuildHandlerChain(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
@ -214,6 +214,12 @@ type GenericAPIServer struct {
|
|||||||
// lifecycleSignals provides access to the various signals that happen during the life cycle of the apiserver.
|
// lifecycleSignals provides access to the various signals that happen during the life cycle of the apiserver.
|
||||||
lifecycleSignals lifecycleSignals
|
lifecycleSignals lifecycleSignals
|
||||||
|
|
||||||
|
// muxAndDiscoveryCompleteSignals holds signals that indicate all known HTTP paths have been registered.
|
||||||
|
// it exists primarily to avoid returning a 404 response when a resource actually exists but we haven't installed the path to a handler.
|
||||||
|
// it is exposed for easier composition of the individual servers.
|
||||||
|
// the primary users of this field are the WithMuxCompleteProtection filter and the NotFoundHandler
|
||||||
|
muxAndDiscoveryCompleteSignals map[string]<-chan struct{}
|
||||||
|
|
||||||
// ShutdownSendRetryAfter dictates when to initiate shutdown of the HTTP
|
// ShutdownSendRetryAfter dictates when to initiate shutdown of the HTTP
|
||||||
// Server during the graceful termination of the apiserver. If true, we wait
|
// Server during the graceful termination of the apiserver. If true, we wait
|
||||||
// for non longrunning requests in flight to be drained and then initiate a
|
// for non longrunning requests in flight to be drained and then initiate a
|
||||||
@ -247,6 +253,9 @@ type DelegationTarget interface {
|
|||||||
|
|
||||||
// PrepareRun does post API installation setup steps. It calls recursively the same function of the delegates.
|
// PrepareRun does post API installation setup steps. It calls recursively the same function of the delegates.
|
||||||
PrepareRun() preparedGenericAPIServer
|
PrepareRun() preparedGenericAPIServer
|
||||||
|
|
||||||
|
// MuxAndDiscoveryCompleteSignals exposes registered signals that indicate if all known HTTP paths have been installed.
|
||||||
|
MuxAndDiscoveryCompleteSignals() map[string]<-chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *GenericAPIServer) UnprotectedHandler() http.Handler {
|
func (s *GenericAPIServer) UnprotectedHandler() http.Handler {
|
||||||
@ -270,15 +279,37 @@ func (s *GenericAPIServer) NextDelegate() DelegationTarget {
|
|||||||
return s.delegationTarget
|
return s.delegationTarget
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// RegisterMuxAndDiscoveryCompleteSignal registers the given signal that will be used to determine if all known
|
||||||
|
// HTTP paths have been registered. It is okay to call this method after instantiating the generic server but before running.
|
||||||
|
func (s *GenericAPIServer) RegisterMuxAndDiscoveryCompleteSignal(signalName string, signal <-chan struct{}) error {
|
||||||
|
if _, exists := s.muxAndDiscoveryCompleteSignals[signalName]; exists {
|
||||||
|
return fmt.Errorf("%s already registered", signalName)
|
||||||
|
}
|
||||||
|
s.muxAndDiscoveryCompleteSignals[signalName] = signal
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *GenericAPIServer) MuxAndDiscoveryCompleteSignals() map[string]<-chan struct{} {
|
||||||
|
return s.muxAndDiscoveryCompleteSignals
|
||||||
|
}
|
||||||
|
|
||||||
type emptyDelegate struct {
|
type emptyDelegate struct {
|
||||||
|
// handler is called at the end of the delegation chain
|
||||||
|
// when a request has been made against an unregistered HTTP path the individual servers will simply pass it through until it reaches the handler.
|
||||||
|
handler http.Handler
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewEmptyDelegate() DelegationTarget {
|
func NewEmptyDelegate() DelegationTarget {
|
||||||
return emptyDelegate{}
|
return emptyDelegate{}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NewEmptyDelegateWithCustomHandler allows for registering a custom handler usually for special handling of 404 requests
|
||||||
|
func NewEmptyDelegateWithCustomHandler(handler http.Handler) DelegationTarget {
|
||||||
|
return emptyDelegate{handler}
|
||||||
|
}
|
||||||
|
|
||||||
func (s emptyDelegate) UnprotectedHandler() http.Handler {
|
func (s emptyDelegate) UnprotectedHandler() http.Handler {
|
||||||
return nil
|
return s.handler
|
||||||
}
|
}
|
||||||
func (s emptyDelegate) PostStartHooks() map[string]postStartHookEntry {
|
func (s emptyDelegate) PostStartHooks() map[string]postStartHookEntry {
|
||||||
return map[string]postStartHookEntry{}
|
return map[string]postStartHookEntry{}
|
||||||
@ -298,6 +329,9 @@ func (s emptyDelegate) NextDelegate() DelegationTarget {
|
|||||||
func (s emptyDelegate) PrepareRun() preparedGenericAPIServer {
|
func (s emptyDelegate) PrepareRun() preparedGenericAPIServer {
|
||||||
return preparedGenericAPIServer{nil}
|
return preparedGenericAPIServer{nil}
|
||||||
}
|
}
|
||||||
|
func (s emptyDelegate) MuxAndDiscoveryCompleteSignals() map[string]<-chan struct{} {
|
||||||
|
return map[string]<-chan struct{}{}
|
||||||
|
}
|
||||||
|
|
||||||
// preparedGenericAPIServer is a private wrapper that enforces a call of PrepareRun() before Run can be invoked.
|
// preparedGenericAPIServer is a private wrapper that enforces a call of PrepareRun() before Run can be invoked.
|
||||||
type preparedGenericAPIServer struct {
|
type preparedGenericAPIServer struct {
|
||||||
@ -345,6 +379,23 @@ func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error {
|
|||||||
delayedStopCh := s.lifecycleSignals.AfterShutdownDelayDuration
|
delayedStopCh := s.lifecycleSignals.AfterShutdownDelayDuration
|
||||||
shutdownInitiatedCh := s.lifecycleSignals.ShutdownInitiated
|
shutdownInitiatedCh := s.lifecycleSignals.ShutdownInitiated
|
||||||
|
|
||||||
|
// spawn a new goroutine for closing the MuxAndDiscoveryComplete signal
|
||||||
|
// registration happens during construction of the generic api server
|
||||||
|
// the last server in the chain aggregates signals from the previous instances
|
||||||
|
go func() {
|
||||||
|
for _, muxAndDiscoveryCompletedSignal := range s.GenericAPIServer.MuxAndDiscoveryCompleteSignals() {
|
||||||
|
select {
|
||||||
|
case <-muxAndDiscoveryCompletedSignal:
|
||||||
|
continue
|
||||||
|
case <-stopCh:
|
||||||
|
klog.V(1).Infof("haven't completed %s, stop requested", s.lifecycleSignals.MuxAndDiscoveryComplete.Name())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
s.lifecycleSignals.MuxAndDiscoveryComplete.Signal()
|
||||||
|
klog.V(1).Infof("%s has all endpoints registered and discovery information is complete", s.lifecycleSignals.MuxAndDiscoveryComplete.Name())
|
||||||
|
}()
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
defer delayedStopCh.Signal()
|
defer delayedStopCh.Signal()
|
||||||
defer klog.V(1).InfoS("[graceful-termination] shutdown event", "name", delayedStopCh.Name())
|
defer klog.V(1).InfoS("[graceful-termination] shutdown event", "name", delayedStopCh.Name())
|
||||||
|
@ -379,6 +379,47 @@ func TestGracefulTerminationWithKeepListeningDuringGracefulTerminationEnabled(t
|
|||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestMuxAndDiscoveryComplete(t *testing.T) {
|
||||||
|
// setup
|
||||||
|
testSignal1 := make(chan struct{})
|
||||||
|
testSignal2 := make(chan struct{})
|
||||||
|
s := newGenericAPIServer(t, true)
|
||||||
|
s.muxAndDiscoveryCompleteSignals["TestSignal1"] = testSignal1
|
||||||
|
s.muxAndDiscoveryCompleteSignals["TestSignal2"] = testSignal2
|
||||||
|
doer := setupDoer(t, s.SecureServingInfo)
|
||||||
|
isChanClosed := func(ch <-chan struct{}, delay time.Duration) bool {
|
||||||
|
time.Sleep(delay)
|
||||||
|
select {
|
||||||
|
case <-ch:
|
||||||
|
return true
|
||||||
|
default:
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// start the API server
|
||||||
|
stopCh, runCompletedCh := make(chan struct{}), make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
defer close(runCompletedCh)
|
||||||
|
s.PrepareRun().Run(stopCh)
|
||||||
|
}()
|
||||||
|
waitForAPIServerStarted(t, doer)
|
||||||
|
|
||||||
|
// act
|
||||||
|
if isChanClosed(s.lifecycleSignals.MuxAndDiscoveryComplete.Signaled(), 1*time.Second) {
|
||||||
|
t.Fatalf("%s is closed whereas the TestSignal is still open", s.lifecycleSignals.MuxAndDiscoveryComplete.Name())
|
||||||
|
}
|
||||||
|
|
||||||
|
close(testSignal1)
|
||||||
|
if isChanClosed(s.lifecycleSignals.MuxAndDiscoveryComplete.Signaled(), 1*time.Second) {
|
||||||
|
t.Fatalf("%s is closed whereas the TestSignal2 is still open", s.lifecycleSignals.MuxAndDiscoveryComplete.Name())
|
||||||
|
}
|
||||||
|
|
||||||
|
close(testSignal2)
|
||||||
|
if !isChanClosed(s.lifecycleSignals.MuxAndDiscoveryComplete.Signaled(), 1*time.Second) {
|
||||||
|
t.Fatalf("%s wasn't closed", s.lifecycleSignals.MuxAndDiscoveryComplete.Name())
|
||||||
|
}
|
||||||
|
}
|
||||||
func shouldReuseConnection(t *testing.T) func(httptrace.GotConnInfo) {
|
func shouldReuseConnection(t *testing.T) func(httptrace.GotConnInfo) {
|
||||||
return func(ci httptrace.GotConnInfo) {
|
return func(ci httptrace.GotConnInfo) {
|
||||||
if !ci.Reused {
|
if !ci.Reused {
|
||||||
|
@ -485,6 +485,39 @@ func TestNotRestRoutesHaveAuth(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestMuxAndDiscoveryCompleteSignals(t *testing.T) {
|
||||||
|
// setup
|
||||||
|
cfg, assert := setUp(t)
|
||||||
|
|
||||||
|
// scenario 1: single server with some signals
|
||||||
|
root, err := cfg.Complete(nil).New("rootServer", NewEmptyDelegate())
|
||||||
|
assert.NoError(err)
|
||||||
|
if len(root.MuxAndDiscoveryCompleteSignals()) != 0 {
|
||||||
|
assert.Error(fmt.Errorf("unexpected signals %v registered in the root server", root.MuxAndDiscoveryCompleteSignals()))
|
||||||
|
}
|
||||||
|
root.RegisterMuxAndDiscoveryCompleteSignal("rootTestSignal", make(chan struct{}))
|
||||||
|
if len(root.MuxAndDiscoveryCompleteSignals()) != 1 {
|
||||||
|
assert.Error(fmt.Errorf("unexpected signals %v registered in the root server", root.MuxAndDiscoveryCompleteSignals()))
|
||||||
|
}
|
||||||
|
|
||||||
|
// scenario 2: multiple servers with some signals
|
||||||
|
delegate, err := cfg.Complete(nil).New("delegateServer", NewEmptyDelegate())
|
||||||
|
assert.NoError(err)
|
||||||
|
delegate.RegisterMuxAndDiscoveryCompleteSignal("delegateTestSignal", make(chan struct{}))
|
||||||
|
if len(delegate.MuxAndDiscoveryCompleteSignals()) != 1 {
|
||||||
|
assert.Error(fmt.Errorf("unexpected signals %v registered in the delegate server", delegate.MuxAndDiscoveryCompleteSignals()))
|
||||||
|
}
|
||||||
|
newRoot, err := cfg.Complete(nil).New("newRootServer", delegate)
|
||||||
|
assert.NoError(err)
|
||||||
|
if len(newRoot.MuxAndDiscoveryCompleteSignals()) != 1 {
|
||||||
|
assert.Error(fmt.Errorf("unexpected signals %v registered in the newRoot server", newRoot.MuxAndDiscoveryCompleteSignals()))
|
||||||
|
}
|
||||||
|
newRoot.RegisterMuxAndDiscoveryCompleteSignal("newRootTestSignal", make(chan struct{}))
|
||||||
|
if len(newRoot.MuxAndDiscoveryCompleteSignals()) != 2 {
|
||||||
|
assert.Error(fmt.Errorf("unexpected signals %v registered in the newRoot server", newRoot.MuxAndDiscoveryCompleteSignals()))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
type mockAuthorizer struct {
|
type mockAuthorizer struct {
|
||||||
lastURI string
|
lastURI string
|
||||||
}
|
}
|
||||||
|
@ -130,6 +130,11 @@ type lifecycleSignals struct {
|
|||||||
|
|
||||||
// HasBeenReady is signaled when the readyz endpoint succeeds for the first time.
|
// HasBeenReady is signaled when the readyz endpoint succeeds for the first time.
|
||||||
HasBeenReady lifecycleSignal
|
HasBeenReady lifecycleSignal
|
||||||
|
|
||||||
|
// MuxAndDiscoveryComplete is signaled when all known HTTP paths have been installed.
|
||||||
|
// It exists primarily to avoid returning a 404 response when a resource actually exists but we haven't installed the path to a handler.
|
||||||
|
// The actual logic is implemented by an APIServer using the generic server library.
|
||||||
|
MuxAndDiscoveryComplete lifecycleSignal
|
||||||
}
|
}
|
||||||
|
|
||||||
// newLifecycleSignals returns an instance of lifecycleSignals interface to be used
|
// newLifecycleSignals returns an instance of lifecycleSignals interface to be used
|
||||||
@ -141,6 +146,7 @@ func newLifecycleSignals() lifecycleSignals {
|
|||||||
InFlightRequestsDrained: newNamedChannelWrapper("InFlightRequestsDrained"),
|
InFlightRequestsDrained: newNamedChannelWrapper("InFlightRequestsDrained"),
|
||||||
HTTPServerStoppedListening: newNamedChannelWrapper("HTTPServerStoppedListening"),
|
HTTPServerStoppedListening: newNamedChannelWrapper("HTTPServerStoppedListening"),
|
||||||
HasBeenReady: newNamedChannelWrapper("HasBeenReady"),
|
HasBeenReady: newNamedChannelWrapper("HasBeenReady"),
|
||||||
|
MuxAndDiscoveryComplete: newNamedChannelWrapper("MuxAndDiscoveryComplete"),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -0,0 +1,65 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2021 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package notfoundhandler
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"net/http"
|
||||||
|
|
||||||
|
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||||
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||||
|
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
|
||||||
|
apirequest "k8s.io/apiserver/pkg/endpoints/request"
|
||||||
|
)
|
||||||
|
|
||||||
|
// New returns an HTTP handler that is meant to be executed at the end of the delegation chain.
|
||||||
|
// It checks if the request have been made before the server has installed all known HTTP paths.
|
||||||
|
// In that case it returns a 503 response otherwise it returns a 404.
|
||||||
|
//
|
||||||
|
// Note that we don't want to add additional checks to the readyz path as it might prevent fixing bricked clusters.
|
||||||
|
// This specific handler is meant to "protect" requests that arrive before the paths and handlers are fully initialized.
|
||||||
|
func New(serializer runtime.NegotiatedSerializer, isMuxAndDiscoveryCompleteFn func(ctx context.Context) bool) *Handler {
|
||||||
|
return &Handler{serializer: serializer, isMuxAndDiscoveryCompleteFn: isMuxAndDiscoveryCompleteFn}
|
||||||
|
}
|
||||||
|
|
||||||
|
type Handler struct {
|
||||||
|
serializer runtime.NegotiatedSerializer
|
||||||
|
isMuxAndDiscoveryCompleteFn func(ctx context.Context) bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *Handler) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
|
||||||
|
if !h.isMuxAndDiscoveryCompleteFn(req.Context()) {
|
||||||
|
errMsg := "the request has been made before all known HTTP paths have been installed, please try again"
|
||||||
|
err := apierrors.NewServiceUnavailable(errMsg)
|
||||||
|
if err.ErrStatus.Details == nil {
|
||||||
|
err.ErrStatus.Details = &metav1.StatusDetails{}
|
||||||
|
}
|
||||||
|
err.ErrStatus.Details.RetryAfterSeconds = int32(5)
|
||||||
|
|
||||||
|
gv := schema.GroupVersion{Group: "unknown", Version: "unknown"}
|
||||||
|
requestInfo, ok := apirequest.RequestInfoFrom(req.Context())
|
||||||
|
if ok {
|
||||||
|
gv.Group = requestInfo.APIGroup
|
||||||
|
gv.Version = requestInfo.APIVersion
|
||||||
|
}
|
||||||
|
responsewriters.ErrorNegotiated(err, h.serializer, gv, rw, req)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
http.NotFound(rw, req)
|
||||||
|
}
|
@ -0,0 +1,74 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2021 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package notfoundhandler
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"io"
|
||||||
|
"net/http/httptest"
|
||||||
|
"strings"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
|
"k8s.io/apimachinery/pkg/runtime/serializer"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestNotFoundHandler(t *testing.T) {
|
||||||
|
isMuxAndDiscoveryCompleteGlobalValue := true
|
||||||
|
isMuxAndDiscoveryCompleteTestFn := func(ctx context.Context) bool { return isMuxAndDiscoveryCompleteGlobalValue }
|
||||||
|
serializer := serializer.NewCodecFactory(runtime.NewScheme()).WithoutConversion()
|
||||||
|
target := New(serializer, isMuxAndDiscoveryCompleteTestFn)
|
||||||
|
|
||||||
|
// scenario 1: pretend the request has been made after the signal has been ready
|
||||||
|
req := httptest.NewRequest("GET", "http://apiserver.com/apis/flowcontrol.apiserver.k8s.io/v1beta1", nil)
|
||||||
|
rw := httptest.NewRecorder()
|
||||||
|
|
||||||
|
target.ServeHTTP(rw, req)
|
||||||
|
resp := rw.Result()
|
||||||
|
body, err := io.ReadAll(resp.Body)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
bodyStr := strings.TrimSuffix(string(body), "\n")
|
||||||
|
|
||||||
|
if resp.StatusCode != 404 {
|
||||||
|
t.Fatalf("unexpected status code %d, expected 503", resp.StatusCode)
|
||||||
|
}
|
||||||
|
expectedMsg := "404 page not found"
|
||||||
|
if bodyStr != expectedMsg {
|
||||||
|
t.Fatalf("unexpected response: %v, expected: %v", bodyStr, expectedMsg)
|
||||||
|
}
|
||||||
|
|
||||||
|
// scenario 2: pretend the request has been made before the signal has been ready
|
||||||
|
isMuxAndDiscoveryCompleteGlobalValue = false
|
||||||
|
rw = httptest.NewRecorder()
|
||||||
|
|
||||||
|
target.ServeHTTP(rw, req)
|
||||||
|
resp = rw.Result()
|
||||||
|
body, err = io.ReadAll(resp.Body)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
bodyStr = strings.TrimSuffix(string(body), "\n")
|
||||||
|
if resp.StatusCode != 503 {
|
||||||
|
t.Fatalf("unexpected status code %d, expected 503", resp.StatusCode)
|
||||||
|
}
|
||||||
|
expectedMsg = `{"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"the request has been made before all known HTTP paths have been installed, please try again","reason":"ServiceUnavailable","details":{"retryAfterSeconds":5},"code":503}`
|
||||||
|
if bodyStr != expectedMsg {
|
||||||
|
t.Fatalf("unexpected response: %v, expected: %v", bodyStr, expectedMsg)
|
||||||
|
}
|
||||||
|
}
|
@ -117,6 +117,9 @@ type preparedAPIAggregator struct {
|
|||||||
type APIAggregator struct {
|
type APIAggregator struct {
|
||||||
GenericAPIServer *genericapiserver.GenericAPIServer
|
GenericAPIServer *genericapiserver.GenericAPIServer
|
||||||
|
|
||||||
|
// provided for easier embedding
|
||||||
|
APIRegistrationInformers informers.SharedInformerFactory
|
||||||
|
|
||||||
delegateHandler http.Handler
|
delegateHandler http.Handler
|
||||||
|
|
||||||
// proxyCurrentCertKeyContent holds he client cert used to identify this proxy. Backing APIServices use this to confirm the proxy's identity
|
// proxyCurrentCertKeyContent holds he client cert used to identify this proxy. Backing APIServices use this to confirm the proxy's identity
|
||||||
@ -132,9 +135,6 @@ type APIAggregator struct {
|
|||||||
// controller state
|
// controller state
|
||||||
lister listers.APIServiceLister
|
lister listers.APIServiceLister
|
||||||
|
|
||||||
// provided for easier embedding
|
|
||||||
APIRegistrationInformers informers.SharedInformerFactory
|
|
||||||
|
|
||||||
// Information needed to determine routing for the aggregator
|
// Information needed to determine routing for the aggregator
|
||||||
serviceResolver ServiceResolver
|
serviceResolver ServiceResolver
|
||||||
|
|
||||||
@ -181,6 +181,16 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg
|
|||||||
5*time.Minute, // this is effectively used as a refresh interval right now. Might want to do something nicer later on.
|
5*time.Minute, // this is effectively used as a refresh interval right now. Might want to do something nicer later on.
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// apiServiceRegistrationControllerInitiated is closed when APIServiceRegistrationController has finished "installing" all known APIServices.
|
||||||
|
// At this point we know that the proxy handler knows about APIServices and can handle client requests.
|
||||||
|
// Before it might have resulted in a 404 response which could have serious consequences for some controllers like GC and NS
|
||||||
|
//
|
||||||
|
// Note that the APIServiceRegistrationController waits for APIServiceInformer to synced before doing its work.
|
||||||
|
apiServiceRegistrationControllerInitiated := make(chan struct{})
|
||||||
|
if err := genericServer.RegisterMuxAndDiscoveryCompleteSignal("APIServiceRegistrationControllerInitiated", apiServiceRegistrationControllerInitiated); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
s := &APIAggregator{
|
s := &APIAggregator{
|
||||||
GenericAPIServer: genericServer,
|
GenericAPIServer: genericServer,
|
||||||
delegateHandler: delegationTarget.UnprotectedHandler(),
|
delegateHandler: delegationTarget.UnprotectedHandler(),
|
||||||
@ -260,11 +270,10 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg
|
|||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
s.GenericAPIServer.AddPostStartHookOrDie("apiservice-registration-controller", func(context genericapiserver.PostStartHookContext) error {
|
s.GenericAPIServer.AddPostStartHookOrDie("apiservice-registration-controller", func(context genericapiserver.PostStartHookContext) error {
|
||||||
handlerSyncedCh := make(chan struct{})
|
go apiserviceRegistrationController.Run(context.StopCh, apiServiceRegistrationControllerInitiated)
|
||||||
go apiserviceRegistrationController.Run(context.StopCh, handlerSyncedCh)
|
|
||||||
select {
|
select {
|
||||||
case <-context.StopCh:
|
case <-context.StopCh:
|
||||||
case <-handlerSyncedCh:
|
case <-apiServiceRegistrationControllerInitiated:
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
@ -96,6 +96,12 @@ func BuildAndRegisterAggregator(downloader *Downloader, delegationTarget server.
|
|||||||
}
|
}
|
||||||
delegateSpec, etag, _, err := downloader.Download(handler, "")
|
delegateSpec, etag, _, err := downloader.Download(handler, "")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
// ignore errors for the empty delegate we attach at the end the chain
|
||||||
|
// atm the empty delegate returns 503 when the server hasn't been fully initialized
|
||||||
|
// and the spec downloader only silences 404s
|
||||||
|
if len(delegate.ListedPaths()) == 0 && delegate.NextDelegate() == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if delegateSpec == nil {
|
if delegateSpec == nil {
|
||||||
|
1
vendor/modules.txt
vendored
1
vendor/modules.txt
vendored
@ -1566,6 +1566,7 @@ k8s.io/apiserver/pkg/util/flowcontrol/format
|
|||||||
k8s.io/apiserver/pkg/util/flowcontrol/metrics
|
k8s.io/apiserver/pkg/util/flowcontrol/metrics
|
||||||
k8s.io/apiserver/pkg/util/flowcontrol/request
|
k8s.io/apiserver/pkg/util/flowcontrol/request
|
||||||
k8s.io/apiserver/pkg/util/flushwriter
|
k8s.io/apiserver/pkg/util/flushwriter
|
||||||
|
k8s.io/apiserver/pkg/util/notfoundhandler
|
||||||
k8s.io/apiserver/pkg/util/openapi
|
k8s.io/apiserver/pkg/util/openapi
|
||||||
k8s.io/apiserver/pkg/util/proxy
|
k8s.io/apiserver/pkg/util/proxy
|
||||||
k8s.io/apiserver/pkg/util/shufflesharding
|
k8s.io/apiserver/pkg/util/shufflesharding
|
||||||
|
Loading…
Reference in New Issue
Block a user