diff --git a/cmd/kube-apiserver/app/server.go b/cmd/kube-apiserver/app/server.go index fe2eb96cd13..0e730789e83 100644 --- a/cmd/kube-apiserver/app/server.go +++ b/cmd/kube-apiserver/app/server.go @@ -38,6 +38,7 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apiserver/pkg/admission" "k8s.io/apiserver/pkg/authorization/authorizer" + genericapifilters "k8s.io/apiserver/pkg/endpoints/filters" openapinamer "k8s.io/apiserver/pkg/endpoints/openapi" genericfeatures "k8s.io/apiserver/pkg/features" genericapiserver "k8s.io/apiserver/pkg/server" @@ -47,6 +48,7 @@ import ( serverstorage "k8s.io/apiserver/pkg/server/storage" utilfeature "k8s.io/apiserver/pkg/util/feature" utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol" + "k8s.io/apiserver/pkg/util/notfoundhandler" "k8s.io/apiserver/pkg/util/webhook" clientgoinformers "k8s.io/client-go/informers" clientgoclientset "k8s.io/client-go/kubernetes" @@ -187,7 +189,9 @@ func CreateServerChain(completedOptions completedServerRunOptions, stopCh <-chan if err != nil { return nil, err } - apiExtensionsServer, err := createAPIExtensionsServer(apiExtensionsConfig, genericapiserver.NewEmptyDelegate()) + + notFoundHandler := notfoundhandler.New(kubeAPIServerConfig.GenericConfig.Serializer, genericapifilters.NoMuxAndDiscoveryIncompleteKey) + apiExtensionsServer, err := createAPIExtensionsServer(apiExtensionsConfig, genericapiserver.NewEmptyDelegateWithCustomHandler(notFoundHandler)) if err != nil { return nil, err } diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/filters/mux_discovery_complete.go b/staging/src/k8s.io/apiserver/pkg/endpoints/filters/mux_discovery_complete.go new file mode 100644 index 00000000000..d2fee3b1587 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/filters/mux_discovery_complete.go @@ -0,0 +1,64 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package filters + +import ( + "context" + "net/http" +) + +type muxAndDiscoveryIncompleteKeyType int + +const ( + // muxAndDiscoveryIncompleteKey is a key under which a protection signal for all requests made before the server have installed all known HTTP paths is stored in the request's context + muxAndDiscoveryIncompleteKey muxAndDiscoveryIncompleteKeyType = iota +) + +// NoMuxAndDiscoveryIncompleteKey checks if the context contains muxAndDiscoveryIncompleteKey. +// The presence of the key indicates the request has been made when the HTTP paths weren't installed. +func NoMuxAndDiscoveryIncompleteKey(ctx context.Context) bool { + muxAndDiscoveryCompleteProtectionKeyValue, _ := ctx.Value(muxAndDiscoveryIncompleteKey).(string) + return len(muxAndDiscoveryCompleteProtectionKeyValue) == 0 +} + +// WithMuxAndDiscoveryComplete puts the muxAndDiscoveryIncompleteKey in the context if a request has been made before muxAndDiscoveryCompleteSignal has been ready. +// Putting the key protect us from returning a 404 response instead of a 503. +// It is especially important for controllers like GC and NS since they act on 404s. +// +// The presence of the key is checked in the NotFoundHandler (staging/src/k8s.io/apiserver/pkg/util/notfoundhandler/not_found_handler.go) +// +// The primary reason this filter exists is to protect from a potential race between the client's requests reaching the NotFoundHandler and the server becoming ready. +// Without the protection key a request could still get a 404 response when the registered signals changed their status just slightly before reaching the new handler. +// In that case, the presence of the key will make the handler return a 503 instead of a 404. +func WithMuxAndDiscoveryComplete(handler http.Handler, muxAndDiscoveryCompleteSignal <-chan struct{}) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + if muxAndDiscoveryCompleteSignal != nil && !isClosed(muxAndDiscoveryCompleteSignal) { + req = req.WithContext(context.WithValue(req.Context(), muxAndDiscoveryIncompleteKey, "MuxAndDiscoveryInstallationNotComplete")) + } + handler.ServeHTTP(w, req) + }) +} + +// isClosed is a convenience function that simply check if the given chan has been closed +func isClosed(ch <-chan struct{}) bool { + select { + case <-ch: + return true + default: + return false + } +} diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/filters/mux_discovery_complete_test.go b/staging/src/k8s.io/apiserver/pkg/endpoints/filters/mux_discovery_complete_test.go new file mode 100644 index 00000000000..d569b6d8971 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/filters/mux_discovery_complete_test.go @@ -0,0 +1,66 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package filters + +import ( + "context" + "net/http" + "net/http/httptest" + "testing" +) + +func TestWithMuxAndDiscoveryCompleteProtection(t *testing.T) { + scenarios := []struct { + name string + muxAndDiscoveryCompleteSignal <-chan struct{} + expectNoMuxAndDiscoIncompleteKey bool + }{ + { + name: "no signals, no key in the ctx", + expectNoMuxAndDiscoIncompleteKey: true, + }, + { + name: "signal ready, no key in the ctx", + muxAndDiscoveryCompleteSignal: func() chan struct{} { ch := make(chan struct{}); close(ch); return ch }(), + expectNoMuxAndDiscoIncompleteKey: true, + }, + { + name: "signal not ready, the key in the ctx", + muxAndDiscoveryCompleteSignal: make(chan struct{}), + }, + } + + for _, scenario := range scenarios { + t.Run(scenario.name, func(t *testing.T) { + // setup + var actualContext context.Context + delegate := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + actualContext = req.Context() + }) + target := WithMuxAndDiscoveryComplete(delegate, scenario.muxAndDiscoveryCompleteSignal) + + // act + req := &http.Request{} + target.ServeHTTP(httptest.NewRecorder(), req) + + // validate + if scenario.expectNoMuxAndDiscoIncompleteKey != NoMuxAndDiscoveryIncompleteKey(actualContext) { + t.Fatalf("expectNoMuxAndDiscoIncompleteKey in the context = %v, does the actual context contain the key = %v", scenario.expectNoMuxAndDiscoIncompleteKey, NoMuxAndDiscoveryIncompleteKey(actualContext)) + } + }) + } +} diff --git a/staging/src/k8s.io/apiserver/pkg/server/config.go b/staging/src/k8s.io/apiserver/pkg/server/config.go index 48d6ab6242c..55e40584948 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/config.go +++ b/staging/src/k8s.io/apiserver/pkg/server/config.go @@ -578,6 +578,7 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G handlerChainBuilder := func(handler http.Handler) http.Handler { return c.BuildHandlerChainFunc(handler, c.Config) } + apiServerHandler := NewAPIServerHandler(name, c.Serializer, handlerChainBuilder, delegationTarget.UnprotectedHandler()) s := &GenericAPIServer{ @@ -591,6 +592,9 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G delegationTarget: delegationTarget, EquivalentResourceRegistry: c.EquivalentResourceRegistry, HandlerChainWaitGroup: c.HandlerChainWaitGroup, + Handler: apiServerHandler, + + listedPathProvider: apiServerHandler, minRequestTimeout: time.Duration(c.MinRequestTimeout) * time.Second, ShutdownTimeout: c.RequestTimeout, @@ -598,10 +602,6 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G SecureServingInfo: c.SecureServing, ExternalAddress: c.ExternalAddress, - Handler: apiServerHandler, - - listedPathProvider: apiServerHandler, - openAPIConfig: c.OpenAPIConfig, skipOpenAPIInstallation: c.SkipOpenAPIInstallation, @@ -626,6 +626,8 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G StorageVersionManager: c.StorageVersionManager, Version: c.Version, + + muxAndDiscoveryCompleteSignals: map[string]<-chan struct{}{}, } for { @@ -657,6 +659,13 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G } } + // register mux signals from the delegated server + for k, v := range delegationTarget.MuxAndDiscoveryCompleteSignals() { + if err := s.RegisterMuxAndDiscoveryCompleteSignal(k, v); err != nil { + return nil, err + } + } + genericApiServerHookName := "generic-apiserver-start-informers" if c.SharedInformerFactory != nil { if !s.isPostStartHookRegistered(genericApiServerHookName) { @@ -807,6 +816,7 @@ func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler { } handler = genericapifilters.WithRequestInfo(handler, c.RequestInfoResolver) handler = genericapifilters.WithRequestReceivedTimestamp(handler) + handler = genericapifilters.WithMuxAndDiscoveryComplete(handler, c.lifecycleSignals.MuxAndDiscoveryComplete.Signaled()) handler = genericfilters.WithPanicRecovery(handler, c.RequestInfoResolver) handler = genericapifilters.WithAuditID(handler) return handler diff --git a/staging/src/k8s.io/apiserver/pkg/server/config_test.go b/staging/src/k8s.io/apiserver/pkg/server/config_test.go index 01c3cbafcb8..67bf529f9f3 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/config_test.go +++ b/staging/src/k8s.io/apiserver/pkg/server/config_test.go @@ -298,6 +298,7 @@ func TestAuthenticationAuditAnnotationsDefaultChain(t *testing.T) { RequestInfoResolver: &request.RequestInfoFactory{}, RequestTimeout: 10 * time.Second, LongRunningFunc: func(_ *http.Request, _ *request.RequestInfo) bool { return false }, + lifecycleSignals: newLifecycleSignals(), } h := DefaultBuildHandlerChain(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { diff --git a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go index f3c17b626f3..4308e835d0a 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go +++ b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go @@ -214,6 +214,12 @@ type GenericAPIServer struct { // lifecycleSignals provides access to the various signals that happen during the life cycle of the apiserver. lifecycleSignals lifecycleSignals + // muxAndDiscoveryCompleteSignals holds signals that indicate all known HTTP paths have been registered. + // it exists primarily to avoid returning a 404 response when a resource actually exists but we haven't installed the path to a handler. + // it is exposed for easier composition of the individual servers. + // the primary users of this field are the WithMuxCompleteProtection filter and the NotFoundHandler + muxAndDiscoveryCompleteSignals map[string]<-chan struct{} + // ShutdownSendRetryAfter dictates when to initiate shutdown of the HTTP // Server during the graceful termination of the apiserver. If true, we wait // for non longrunning requests in flight to be drained and then initiate a @@ -247,6 +253,9 @@ type DelegationTarget interface { // PrepareRun does post API installation setup steps. It calls recursively the same function of the delegates. PrepareRun() preparedGenericAPIServer + + // MuxAndDiscoveryCompleteSignals exposes registered signals that indicate if all known HTTP paths have been installed. + MuxAndDiscoveryCompleteSignals() map[string]<-chan struct{} } func (s *GenericAPIServer) UnprotectedHandler() http.Handler { @@ -270,15 +279,37 @@ func (s *GenericAPIServer) NextDelegate() DelegationTarget { return s.delegationTarget } +// RegisterMuxAndDiscoveryCompleteSignal registers the given signal that will be used to determine if all known +// HTTP paths have been registered. It is okay to call this method after instantiating the generic server but before running. +func (s *GenericAPIServer) RegisterMuxAndDiscoveryCompleteSignal(signalName string, signal <-chan struct{}) error { + if _, exists := s.muxAndDiscoveryCompleteSignals[signalName]; exists { + return fmt.Errorf("%s already registered", signalName) + } + s.muxAndDiscoveryCompleteSignals[signalName] = signal + return nil +} + +func (s *GenericAPIServer) MuxAndDiscoveryCompleteSignals() map[string]<-chan struct{} { + return s.muxAndDiscoveryCompleteSignals +} + type emptyDelegate struct { + // handler is called at the end of the delegation chain + // when a request has been made against an unregistered HTTP path the individual servers will simply pass it through until it reaches the handler. + handler http.Handler } func NewEmptyDelegate() DelegationTarget { return emptyDelegate{} } +// NewEmptyDelegateWithCustomHandler allows for registering a custom handler usually for special handling of 404 requests +func NewEmptyDelegateWithCustomHandler(handler http.Handler) DelegationTarget { + return emptyDelegate{handler} +} + func (s emptyDelegate) UnprotectedHandler() http.Handler { - return nil + return s.handler } func (s emptyDelegate) PostStartHooks() map[string]postStartHookEntry { return map[string]postStartHookEntry{} @@ -298,6 +329,9 @@ func (s emptyDelegate) NextDelegate() DelegationTarget { func (s emptyDelegate) PrepareRun() preparedGenericAPIServer { return preparedGenericAPIServer{nil} } +func (s emptyDelegate) MuxAndDiscoveryCompleteSignals() map[string]<-chan struct{} { + return map[string]<-chan struct{}{} +} // preparedGenericAPIServer is a private wrapper that enforces a call of PrepareRun() before Run can be invoked. type preparedGenericAPIServer struct { @@ -345,6 +379,23 @@ func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error { delayedStopCh := s.lifecycleSignals.AfterShutdownDelayDuration shutdownInitiatedCh := s.lifecycleSignals.ShutdownInitiated + // spawn a new goroutine for closing the MuxAndDiscoveryComplete signal + // registration happens during construction of the generic api server + // the last server in the chain aggregates signals from the previous instances + go func() { + for _, muxAndDiscoveryCompletedSignal := range s.GenericAPIServer.MuxAndDiscoveryCompleteSignals() { + select { + case <-muxAndDiscoveryCompletedSignal: + continue + case <-stopCh: + klog.V(1).Infof("haven't completed %s, stop requested", s.lifecycleSignals.MuxAndDiscoveryComplete.Name()) + return + } + } + s.lifecycleSignals.MuxAndDiscoveryComplete.Signal() + klog.V(1).Infof("%s has all endpoints registered and discovery information is complete", s.lifecycleSignals.MuxAndDiscoveryComplete.Name()) + }() + go func() { defer delayedStopCh.Signal() defer klog.V(1).InfoS("[graceful-termination] shutdown event", "name", delayedStopCh.Name()) diff --git a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver_graceful_termination_test.go b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver_graceful_termination_test.go index d7956c9f64e..6c84271f0e2 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver_graceful_termination_test.go +++ b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver_graceful_termination_test.go @@ -379,6 +379,47 @@ func TestGracefulTerminationWithKeepListeningDuringGracefulTerminationEnabled(t }() } +func TestMuxAndDiscoveryComplete(t *testing.T) { + // setup + testSignal1 := make(chan struct{}) + testSignal2 := make(chan struct{}) + s := newGenericAPIServer(t, true) + s.muxAndDiscoveryCompleteSignals["TestSignal1"] = testSignal1 + s.muxAndDiscoveryCompleteSignals["TestSignal2"] = testSignal2 + doer := setupDoer(t, s.SecureServingInfo) + isChanClosed := func(ch <-chan struct{}, delay time.Duration) bool { + time.Sleep(delay) + select { + case <-ch: + return true + default: + return false + } + } + + // start the API server + stopCh, runCompletedCh := make(chan struct{}), make(chan struct{}) + go func() { + defer close(runCompletedCh) + s.PrepareRun().Run(stopCh) + }() + waitForAPIServerStarted(t, doer) + + // act + if isChanClosed(s.lifecycleSignals.MuxAndDiscoveryComplete.Signaled(), 1*time.Second) { + t.Fatalf("%s is closed whereas the TestSignal is still open", s.lifecycleSignals.MuxAndDiscoveryComplete.Name()) + } + + close(testSignal1) + if isChanClosed(s.lifecycleSignals.MuxAndDiscoveryComplete.Signaled(), 1*time.Second) { + t.Fatalf("%s is closed whereas the TestSignal2 is still open", s.lifecycleSignals.MuxAndDiscoveryComplete.Name()) + } + + close(testSignal2) + if !isChanClosed(s.lifecycleSignals.MuxAndDiscoveryComplete.Signaled(), 1*time.Second) { + t.Fatalf("%s wasn't closed", s.lifecycleSignals.MuxAndDiscoveryComplete.Name()) + } +} func shouldReuseConnection(t *testing.T) func(httptrace.GotConnInfo) { return func(ci httptrace.GotConnInfo) { if !ci.Reused { diff --git a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver_test.go b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver_test.go index 17f8d810409..af452347c83 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver_test.go +++ b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver_test.go @@ -485,6 +485,39 @@ func TestNotRestRoutesHaveAuth(t *testing.T) { } } +func TestMuxAndDiscoveryCompleteSignals(t *testing.T) { + // setup + cfg, assert := setUp(t) + + // scenario 1: single server with some signals + root, err := cfg.Complete(nil).New("rootServer", NewEmptyDelegate()) + assert.NoError(err) + if len(root.MuxAndDiscoveryCompleteSignals()) != 0 { + assert.Error(fmt.Errorf("unexpected signals %v registered in the root server", root.MuxAndDiscoveryCompleteSignals())) + } + root.RegisterMuxAndDiscoveryCompleteSignal("rootTestSignal", make(chan struct{})) + if len(root.MuxAndDiscoveryCompleteSignals()) != 1 { + assert.Error(fmt.Errorf("unexpected signals %v registered in the root server", root.MuxAndDiscoveryCompleteSignals())) + } + + // scenario 2: multiple servers with some signals + delegate, err := cfg.Complete(nil).New("delegateServer", NewEmptyDelegate()) + assert.NoError(err) + delegate.RegisterMuxAndDiscoveryCompleteSignal("delegateTestSignal", make(chan struct{})) + if len(delegate.MuxAndDiscoveryCompleteSignals()) != 1 { + assert.Error(fmt.Errorf("unexpected signals %v registered in the delegate server", delegate.MuxAndDiscoveryCompleteSignals())) + } + newRoot, err := cfg.Complete(nil).New("newRootServer", delegate) + assert.NoError(err) + if len(newRoot.MuxAndDiscoveryCompleteSignals()) != 1 { + assert.Error(fmt.Errorf("unexpected signals %v registered in the newRoot server", newRoot.MuxAndDiscoveryCompleteSignals())) + } + newRoot.RegisterMuxAndDiscoveryCompleteSignal("newRootTestSignal", make(chan struct{})) + if len(newRoot.MuxAndDiscoveryCompleteSignals()) != 2 { + assert.Error(fmt.Errorf("unexpected signals %v registered in the newRoot server", newRoot.MuxAndDiscoveryCompleteSignals())) + } +} + type mockAuthorizer struct { lastURI string } diff --git a/staging/src/k8s.io/apiserver/pkg/server/lifecycle_signals.go b/staging/src/k8s.io/apiserver/pkg/server/lifecycle_signals.go index 1a939e24fe4..6b406072b61 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/lifecycle_signals.go +++ b/staging/src/k8s.io/apiserver/pkg/server/lifecycle_signals.go @@ -130,6 +130,11 @@ type lifecycleSignals struct { // HasBeenReady is signaled when the readyz endpoint succeeds for the first time. HasBeenReady lifecycleSignal + + // MuxAndDiscoveryComplete is signaled when all known HTTP paths have been installed. + // It exists primarily to avoid returning a 404 response when a resource actually exists but we haven't installed the path to a handler. + // The actual logic is implemented by an APIServer using the generic server library. + MuxAndDiscoveryComplete lifecycleSignal } // newLifecycleSignals returns an instance of lifecycleSignals interface to be used @@ -141,6 +146,7 @@ func newLifecycleSignals() lifecycleSignals { InFlightRequestsDrained: newNamedChannelWrapper("InFlightRequestsDrained"), HTTPServerStoppedListening: newNamedChannelWrapper("HTTPServerStoppedListening"), HasBeenReady: newNamedChannelWrapper("HasBeenReady"), + MuxAndDiscoveryComplete: newNamedChannelWrapper("MuxAndDiscoveryComplete"), } } diff --git a/staging/src/k8s.io/apiserver/pkg/util/notfoundhandler/not_found_handler.go b/staging/src/k8s.io/apiserver/pkg/util/notfoundhandler/not_found_handler.go new file mode 100644 index 00000000000..93c1d0a0454 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/util/notfoundhandler/not_found_handler.go @@ -0,0 +1,65 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package notfoundhandler + +import ( + "context" + "net/http" + + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apiserver/pkg/endpoints/handlers/responsewriters" + apirequest "k8s.io/apiserver/pkg/endpoints/request" +) + +// New returns an HTTP handler that is meant to be executed at the end of the delegation chain. +// It checks if the request have been made before the server has installed all known HTTP paths. +// In that case it returns a 503 response otherwise it returns a 404. +// +// Note that we don't want to add additional checks to the readyz path as it might prevent fixing bricked clusters. +// This specific handler is meant to "protect" requests that arrive before the paths and handlers are fully initialized. +func New(serializer runtime.NegotiatedSerializer, isMuxAndDiscoveryCompleteFn func(ctx context.Context) bool) *Handler { + return &Handler{serializer: serializer, isMuxAndDiscoveryCompleteFn: isMuxAndDiscoveryCompleteFn} +} + +type Handler struct { + serializer runtime.NegotiatedSerializer + isMuxAndDiscoveryCompleteFn func(ctx context.Context) bool +} + +func (h *Handler) ServeHTTP(rw http.ResponseWriter, req *http.Request) { + if !h.isMuxAndDiscoveryCompleteFn(req.Context()) { + errMsg := "the request has been made before all known HTTP paths have been installed, please try again" + err := apierrors.NewServiceUnavailable(errMsg) + if err.ErrStatus.Details == nil { + err.ErrStatus.Details = &metav1.StatusDetails{} + } + err.ErrStatus.Details.RetryAfterSeconds = int32(5) + + gv := schema.GroupVersion{Group: "unknown", Version: "unknown"} + requestInfo, ok := apirequest.RequestInfoFrom(req.Context()) + if ok { + gv.Group = requestInfo.APIGroup + gv.Version = requestInfo.APIVersion + } + responsewriters.ErrorNegotiated(err, h.serializer, gv, rw, req) + return + } + http.NotFound(rw, req) +} diff --git a/staging/src/k8s.io/apiserver/pkg/util/notfoundhandler/not_found_handler_test.go b/staging/src/k8s.io/apiserver/pkg/util/notfoundhandler/not_found_handler_test.go new file mode 100644 index 00000000000..57a67200683 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/util/notfoundhandler/not_found_handler_test.go @@ -0,0 +1,74 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package notfoundhandler + +import ( + "context" + "io" + "net/http/httptest" + "strings" + "testing" + + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/serializer" +) + +func TestNotFoundHandler(t *testing.T) { + isMuxAndDiscoveryCompleteGlobalValue := true + isMuxAndDiscoveryCompleteTestFn := func(ctx context.Context) bool { return isMuxAndDiscoveryCompleteGlobalValue } + serializer := serializer.NewCodecFactory(runtime.NewScheme()).WithoutConversion() + target := New(serializer, isMuxAndDiscoveryCompleteTestFn) + + // scenario 1: pretend the request has been made after the signal has been ready + req := httptest.NewRequest("GET", "http://apiserver.com/apis/flowcontrol.apiserver.k8s.io/v1beta1", nil) + rw := httptest.NewRecorder() + + target.ServeHTTP(rw, req) + resp := rw.Result() + body, err := io.ReadAll(resp.Body) + if err != nil { + t.Fatal(err) + } + bodyStr := strings.TrimSuffix(string(body), "\n") + + if resp.StatusCode != 404 { + t.Fatalf("unexpected status code %d, expected 503", resp.StatusCode) + } + expectedMsg := "404 page not found" + if bodyStr != expectedMsg { + t.Fatalf("unexpected response: %v, expected: %v", bodyStr, expectedMsg) + } + + // scenario 2: pretend the request has been made before the signal has been ready + isMuxAndDiscoveryCompleteGlobalValue = false + rw = httptest.NewRecorder() + + target.ServeHTTP(rw, req) + resp = rw.Result() + body, err = io.ReadAll(resp.Body) + if err != nil { + t.Fatal(err) + } + bodyStr = strings.TrimSuffix(string(body), "\n") + if resp.StatusCode != 503 { + t.Fatalf("unexpected status code %d, expected 503", resp.StatusCode) + } + expectedMsg = `{"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"the request has been made before all known HTTP paths have been installed, please try again","reason":"ServiceUnavailable","details":{"retryAfterSeconds":5},"code":503}` + if bodyStr != expectedMsg { + t.Fatalf("unexpected response: %v, expected: %v", bodyStr, expectedMsg) + } +} diff --git a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go index 8a80eb65b79..4558130c364 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go @@ -117,6 +117,9 @@ type preparedAPIAggregator struct { type APIAggregator struct { GenericAPIServer *genericapiserver.GenericAPIServer + // provided for easier embedding + APIRegistrationInformers informers.SharedInformerFactory + delegateHandler http.Handler // proxyCurrentCertKeyContent holds he client cert used to identify this proxy. Backing APIServices use this to confirm the proxy's identity @@ -132,9 +135,6 @@ type APIAggregator struct { // controller state lister listers.APIServiceLister - // provided for easier embedding - APIRegistrationInformers informers.SharedInformerFactory - // Information needed to determine routing for the aggregator serviceResolver ServiceResolver @@ -181,6 +181,16 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg 5*time.Minute, // this is effectively used as a refresh interval right now. Might want to do something nicer later on. ) + // apiServiceRegistrationControllerInitiated is closed when APIServiceRegistrationController has finished "installing" all known APIServices. + // At this point we know that the proxy handler knows about APIServices and can handle client requests. + // Before it might have resulted in a 404 response which could have serious consequences for some controllers like GC and NS + // + // Note that the APIServiceRegistrationController waits for APIServiceInformer to synced before doing its work. + apiServiceRegistrationControllerInitiated := make(chan struct{}) + if err := genericServer.RegisterMuxAndDiscoveryCompleteSignal("APIServiceRegistrationControllerInitiated", apiServiceRegistrationControllerInitiated); err != nil { + return nil, err + } + s := &APIAggregator{ GenericAPIServer: genericServer, delegateHandler: delegationTarget.UnprotectedHandler(), @@ -260,11 +270,10 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg return nil }) s.GenericAPIServer.AddPostStartHookOrDie("apiservice-registration-controller", func(context genericapiserver.PostStartHookContext) error { - handlerSyncedCh := make(chan struct{}) - go apiserviceRegistrationController.Run(context.StopCh, handlerSyncedCh) + go apiserviceRegistrationController.Run(context.StopCh, apiServiceRegistrationControllerInitiated) select { case <-context.StopCh: - case <-handlerSyncedCh: + case <-apiServiceRegistrationControllerInitiated: } return nil diff --git a/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator/aggregator.go b/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator/aggregator.go index 693c635357f..f9b78ddaa54 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator/aggregator.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator/aggregator.go @@ -96,6 +96,12 @@ func BuildAndRegisterAggregator(downloader *Downloader, delegationTarget server. } delegateSpec, etag, _, err := downloader.Download(handler, "") if err != nil { + // ignore errors for the empty delegate we attach at the end the chain + // atm the empty delegate returns 503 when the server hasn't been fully initialized + // and the spec downloader only silences 404s + if len(delegate.ListedPaths()) == 0 && delegate.NextDelegate() == nil { + continue + } return nil, err } if delegateSpec == nil { diff --git a/vendor/modules.txt b/vendor/modules.txt index 0d2275b654c..02047c3685d 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1566,6 +1566,7 @@ k8s.io/apiserver/pkg/util/flowcontrol/format k8s.io/apiserver/pkg/util/flowcontrol/metrics k8s.io/apiserver/pkg/util/flowcontrol/request k8s.io/apiserver/pkg/util/flushwriter +k8s.io/apiserver/pkg/util/notfoundhandler k8s.io/apiserver/pkg/util/openapi k8s.io/apiserver/pkg/util/proxy k8s.io/apiserver/pkg/util/shufflesharding