From 207478c1e6f94b26d8033d5edc7a8a91c69ef716 Mon Sep 17 00:00:00 2001 From: Lukasz Szaszkiewicz Date: Thu, 2 Sep 2021 14:22:23 +0200 Subject: [PATCH 1/9] allow for passing a custom handler to the empty delegate --- .../src/k8s.io/apiserver/pkg/server/genericapiserver.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go index f3c17b626f3..e67f5fbebad 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go +++ b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go @@ -271,14 +271,20 @@ func (s *GenericAPIServer) NextDelegate() DelegationTarget { } type emptyDelegate struct { + handler http.Handler } func NewEmptyDelegate() DelegationTarget { return emptyDelegate{} } +// NewEmptyDelegateWithCustomHandler allows for registering a custom handler usually for special handling of 404 requests +func NewEmptyDelegateWithCustomHandler(handler http.Handler) DelegationTarget { + return emptyDelegate{handler} +} + func (s emptyDelegate) UnprotectedHandler() http.Handler { - return nil + return s.handler } func (s emptyDelegate) PostStartHooks() map[string]postStartHookEntry { return map[string]postStartHookEntry{} From 7a342a0f8a9b483066235711dada494d70105707 Mon Sep 17 00:00:00 2001 From: Lukasz Szaszkiewicz Date: Fri, 15 Oct 2021 17:36:52 +0200 Subject: [PATCH 2/9] kube-apiserver: wires the notFoundHandler --- cmd/kube-apiserver/app/server.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/cmd/kube-apiserver/app/server.go b/cmd/kube-apiserver/app/server.go index fe2eb96cd13..ddd00092955 100644 --- a/cmd/kube-apiserver/app/server.go +++ b/cmd/kube-apiserver/app/server.go @@ -47,6 +47,7 @@ import ( serverstorage "k8s.io/apiserver/pkg/server/storage" utilfeature "k8s.io/apiserver/pkg/util/feature" utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol" + "k8s.io/apiserver/pkg/util/notfoundhandler" "k8s.io/apiserver/pkg/util/webhook" clientgoinformers "k8s.io/client-go/informers" clientgoclientset "k8s.io/client-go/kubernetes" @@ -187,7 +188,9 @@ func CreateServerChain(completedOptions completedServerRunOptions, stopCh <-chan if err != nil { return nil, err } - apiExtensionsServer, err := createAPIExtensionsServer(apiExtensionsConfig, genericapiserver.NewEmptyDelegate()) + + notFoundHandler := notfoundhandler.New(kubeAPIServerConfig.GenericConfig.Serializer, genericapifilters.HasMuxCompleteProtectionKey) + apiExtensionsServer, err := createAPIExtensionsServer(apiExtensionsConfig, genericapiserver.NewEmptyDelegateWithCustomHandler(notFoundHandler)) if err != nil { return nil, err } From 53867975e72c7a2d2dd94aac6bd2869411f92094 Mon Sep 17 00:00:00 2001 From: Lukasz Szaszkiewicz Date: Fri, 15 Oct 2021 17:37:16 +0200 Subject: [PATCH 3/9] apiserver: indroduces NotFoundHanlder The new handler is meant to be executed at the end of the delegation chain. It simply checks if the request have been made before the server has installed all known HTTP paths. In that case it returns a 503 response otherwise it returns a 404. We don't want to add additional checks to the readyz path as it might prevent fixing bricked clusters. This specific handler is meant to "protect" requests that arrive before the paths and handlers are fully initialized. --- cmd/kube-apiserver/app/server.go | 1 + .../util/notfoundhandler/not_found_handler.go | 66 +++++++++++++++++ .../notfoundhandler/not_found_handler_test.go | 74 +++++++++++++++++++ 3 files changed, 141 insertions(+) create mode 100644 staging/src/k8s.io/apiserver/pkg/util/notfoundhandler/not_found_handler.go create mode 100644 staging/src/k8s.io/apiserver/pkg/util/notfoundhandler/not_found_handler_test.go diff --git a/cmd/kube-apiserver/app/server.go b/cmd/kube-apiserver/app/server.go index ddd00092955..5f3b510a7bf 100644 --- a/cmd/kube-apiserver/app/server.go +++ b/cmd/kube-apiserver/app/server.go @@ -38,6 +38,7 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apiserver/pkg/admission" "k8s.io/apiserver/pkg/authorization/authorizer" + genericapifilters "k8s.io/apiserver/pkg/endpoints/filters" openapinamer "k8s.io/apiserver/pkg/endpoints/openapi" genericfeatures "k8s.io/apiserver/pkg/features" genericapiserver "k8s.io/apiserver/pkg/server" diff --git a/staging/src/k8s.io/apiserver/pkg/util/notfoundhandler/not_found_handler.go b/staging/src/k8s.io/apiserver/pkg/util/notfoundhandler/not_found_handler.go new file mode 100644 index 00000000000..a828637f572 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/util/notfoundhandler/not_found_handler.go @@ -0,0 +1,66 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package notfoundhandler + +import ( + "context" + "fmt" + "net/http" + + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apiserver/pkg/endpoints/handlers/responsewriters" + apirequest "k8s.io/apiserver/pkg/endpoints/request" +) + +// New returns an HTTP handler that is meant to be executed at the end of the delegation chain. +// It checks if the request have been made before the server has installed all known HTTP paths. +// In that case it returns a 503 response otherwise it returns a 404. +// +// Note that we don't want to add additional checks to the readyz path as it might prevent fixing bricked clusters. +// This specific handler is meant to "protect" requests that arrive before the paths and handlers are fully initialized. +func New(serializer runtime.NegotiatedSerializer, hasMuxIncompleteKeyFn func(ctx context.Context) bool) *Handler { + return &Handler{serializer: serializer, hasMuxIncompleteKeyFn: hasMuxIncompleteKeyFn} +} + +type Handler struct { + serializer runtime.NegotiatedSerializer + hasMuxIncompleteKeyFn func(ctx context.Context) bool +} + +func (h *Handler) ServeHTTP(rw http.ResponseWriter, req *http.Request) { + if h.hasMuxIncompleteKeyFn(req.Context()) { + errMsg := fmt.Sprintf("the request has been made before all known HTTP paths have been installed, please try again") + err := apierrors.NewServiceUnavailable(errMsg) + if err.ErrStatus.Details == nil { + err.ErrStatus.Details = &metav1.StatusDetails{} + } + err.ErrStatus.Details.RetryAfterSeconds = int32(5) + + gv := schema.GroupVersion{Group: "unknown", Version: "unknown"} + requestInfo, ok := apirequest.RequestInfoFrom(req.Context()) + if ok { + gv.Group = requestInfo.APIGroup + gv.Version = requestInfo.APIVersion + } + responsewriters.ErrorNegotiated(err, h.serializer, gv, rw, req) + return + } + http.NotFound(rw, req) +} diff --git a/staging/src/k8s.io/apiserver/pkg/util/notfoundhandler/not_found_handler_test.go b/staging/src/k8s.io/apiserver/pkg/util/notfoundhandler/not_found_handler_test.go new file mode 100644 index 00000000000..0906e5cb12c --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/util/notfoundhandler/not_found_handler_test.go @@ -0,0 +1,74 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package notfoundhandler + +import ( + "context" + "io" + "net/http/httptest" + "strings" + "testing" + + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/serializer" +) + +func TestNotFoundHandler(t *testing.T) { + hasMuxIncompleteKeyGlobalValue := false + hasMuxIncompleteKeyTestFn := func(ctx context.Context) bool { return hasMuxIncompleteKeyGlobalValue } + serializer := serializer.NewCodecFactory(runtime.NewScheme()).WithoutConversion() + target := New(serializer, hasMuxIncompleteKeyTestFn) + + // scenario 1: pretend the request has been made after the signal has been ready + req := httptest.NewRequest("GET", "http://apiserver.com/apis/flowcontrol.apiserver.k8s.io/v1beta1", nil) + rw := httptest.NewRecorder() + + target.ServeHTTP(rw, req) + resp := rw.Result() + body, err := io.ReadAll(resp.Body) + if err != nil { + t.Fatal(err) + } + bodyStr := strings.TrimSuffix(string(body), "\n") + + if resp.StatusCode != 404 { + t.Fatalf("unexpected status code %d, expected 503", resp.StatusCode) + } + expectedMsg := "404 page not found" + if bodyStr != expectedMsg { + t.Fatalf("unexpected response: %v, expected: %v", bodyStr, expectedMsg) + } + + // scenario 2: pretend the request has been made before the signal has been ready + hasMuxIncompleteKeyGlobalValue = true + rw = httptest.NewRecorder() + + target.ServeHTTP(rw, req) + resp = rw.Result() + body, err = io.ReadAll(resp.Body) + if err != nil { + t.Fatal(err) + } + bodyStr = strings.TrimSuffix(string(body), "\n") + if resp.StatusCode != 503 { + t.Fatalf("unexpected status code %d, expected 503", resp.StatusCode) + } + expectedMsg = `{"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"the request has been made before all known HTTP paths have been installed, please try again","reason":"ServiceUnavailable","details":{"retryAfterSeconds":5},"code":503}` + if bodyStr != expectedMsg { + t.Fatalf("unexpected response: %v, expected: %v", bodyStr, expectedMsg) + } +} From b92ff2928a13de4310e1361af6f8ea1dce9e15d0 Mon Sep 17 00:00:00 2001 From: Lukasz Szaszkiewicz Date: Tue, 12 Oct 2021 11:40:37 +0200 Subject: [PATCH 4/9] update modules.txt --- vendor/modules.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/vendor/modules.txt b/vendor/modules.txt index 8bf74f6ec50..a4426b88caf 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1567,6 +1567,7 @@ k8s.io/apiserver/pkg/util/flowcontrol/format k8s.io/apiserver/pkg/util/flowcontrol/metrics k8s.io/apiserver/pkg/util/flowcontrol/request k8s.io/apiserver/pkg/util/flushwriter +k8s.io/apiserver/pkg/util/notfoundhandler k8s.io/apiserver/pkg/util/openapi k8s.io/apiserver/pkg/util/proxy k8s.io/apiserver/pkg/util/shufflesharding From b71fa61b79598b723c3ee23217e0b44564d90b52 Mon Sep 17 00:00:00 2001 From: Lukasz Szaszkiewicz Date: Thu, 14 Oct 2021 14:25:54 +0200 Subject: [PATCH 5/9] apiserver: adds WithMuxCompleteProtection filter It puts the muxCompleteProtectionKey in the context if a request has been made before muxCompleteSignal has been ready. Putting the key protect us from returning a 404 response instead of a 503. It is especially important for controllers like GC and NS since they act on 404s. The presence of the key is checked in the NotFoundHandler (staging/src/k8s.io/apiserver/pkg/util/notfoundhandler/not_found_handler.go) The race may happen when a request reaches the NotFoundHandler because not all paths have been registered in the mux but when the registered checks are examined in the handler they indicate that the paths have been actually installed. In that case, the presence of the key will make the handler return 503 instead of 404. --- .../pkg/endpoints/filters/mux_complete.go | 64 ++++++++++++++++++ .../endpoints/filters/mux_complete_test.go | 65 +++++++++++++++++++ 2 files changed, 129 insertions(+) create mode 100644 staging/src/k8s.io/apiserver/pkg/endpoints/filters/mux_complete.go create mode 100644 staging/src/k8s.io/apiserver/pkg/endpoints/filters/mux_complete_test.go diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/filters/mux_complete.go b/staging/src/k8s.io/apiserver/pkg/endpoints/filters/mux_complete.go new file mode 100644 index 00000000000..1c17b06aa63 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/filters/mux_complete.go @@ -0,0 +1,64 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package filters + +import ( + "context" + "net/http" +) + +type muxCompleteProtectionKeyType int + +const ( + // muxCompleteProtectionKey is a key under which a protection signal for all requests made before the server have installed all known HTTP paths is stored in the request's context + muxCompleteProtectionKey muxCompleteProtectionKeyType = iota +) + +// HasMuxCompleteProtectionKey checks if the context contains muxCompleteProtectionKey. +// The presence of the key indicates the request has been made when the HTTP paths weren't installed. +func HasMuxCompleteProtectionKey(ctx context.Context) bool { + muxCompleteProtectionKeyValue, _ := ctx.Value(muxCompleteProtectionKey).(string) + return len(muxCompleteProtectionKeyValue) != 0 +} + +// WithMuxCompleteProtection puts the muxCompleteProtectionKey in the context if a request has been made before muxCompleteSignal has been ready. +// Putting the key protect us from returning a 404 response instead of a 503. +// It is especially important for controllers like GC and NS since they act on 404s. +// +// The presence of the key is checked in the NotFoundHandler (staging/src/k8s.io/apiserver/pkg/util/notfoundhandler/not_found_handler.go) +// +// The race may happen when a request reaches the NotFoundHandler because not all paths have been registered in the mux +// but when the registered checks are examined in the handler they indicate that the paths have been actually installed. +// In that case, the presence of the key will make the handler return 503 instead of 404. +func WithMuxCompleteProtection(handler http.Handler, muxCompleteSignal <-chan struct{}) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + if muxCompleteSignal != nil && !isClosed(muxCompleteSignal) { + req = req.WithContext(context.WithValue(req.Context(), muxCompleteProtectionKey, "MuxInstallationNotComplete")) + } + handler.ServeHTTP(w, req) + }) +} + +// isClosed is a convenience function that simply check if the given chan has been closed +func isClosed(ch <-chan struct{}) bool { + select { + case <-ch: + return true + default: + return false + } +} diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/filters/mux_complete_test.go b/staging/src/k8s.io/apiserver/pkg/endpoints/filters/mux_complete_test.go new file mode 100644 index 00000000000..daaa7504e67 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/filters/mux_complete_test.go @@ -0,0 +1,65 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package filters + +import ( + "context" + "net/http" + "net/http/httptest" + "testing" +) + +func TestWithMuxCompleteProtectionFilter(t *testing.T) { + scenarios := []struct { + name string + muxCompleteSignal <-chan struct{} + expectMuxCompleteProtectionKey bool + }{ + { + name: "no signals, no protection key in the ctx", + }, + { + name: "signal ready, no protection key in the ctx", + muxCompleteSignal: func() chan struct{} { ch := make(chan struct{}); close(ch); return ch }(), + }, + { + name: "signal not ready, the protection key in the ctx", + muxCompleteSignal: make(chan struct{}), + expectMuxCompleteProtectionKey: true, + }, + } + + for _, scenario := range scenarios { + t.Run(scenario.name, func(t *testing.T) { + // setup + var actualContext context.Context + delegate := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + actualContext = req.Context() + }) + target := WithMuxCompleteProtection(delegate, scenario.muxCompleteSignal) + + // act + req := &http.Request{} + target.ServeHTTP(httptest.NewRecorder(), req) + + // validate + if scenario.expectMuxCompleteProtectionKey != HasMuxCompleteProtectionKey(actualContext) { + t.Fatalf("expectMuxCompleteProtectionKey in the context = %v, does the actual context contain the key = %v", scenario.expectMuxCompleteProtectionKey, HasMuxCompleteProtectionKey(actualContext)) + } + }) + } +} From ddfbb5d2bb57ee44b3e10f0b58f9cc7001f55802 Mon Sep 17 00:00:00 2001 From: Lukasz Szaszkiewicz Date: Fri, 15 Oct 2021 18:14:20 +0200 Subject: [PATCH 6/9] genericapiserver: indroduce muxCompleteSignals for holding signals that indicate all known HTTP paths have been registered the new field exists primarily to avoid returning a 404 response when a resource actually exists but we haven't installed the path to a handler. it is exposed for easier composition of the individual servers. the primary users of this field are the WithMuxCompleteProtection filter and the NotFoundHandler. --- .../src/k8s.io/apiserver/pkg/server/config.go | 18 ++++++-- .../apiserver/pkg/server/config_test.go | 1 + .../apiserver/pkg/server/genericapiserver.go | 45 +++++++++++++++++++ ...ericapiserver_graceful_termination_test.go | 41 +++++++++++++++++ .../pkg/server/genericapiserver_test.go | 33 ++++++++++++++ .../apiserver/pkg/server/lifecycle_signals.go | 6 +++ 6 files changed, 140 insertions(+), 4 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/server/config.go b/staging/src/k8s.io/apiserver/pkg/server/config.go index 524c8c0f5ae..3e3ad59daf4 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/config.go +++ b/staging/src/k8s.io/apiserver/pkg/server/config.go @@ -578,6 +578,7 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G handlerChainBuilder := func(handler http.Handler) http.Handler { return c.BuildHandlerChainFunc(handler, c.Config) } + apiServerHandler := NewAPIServerHandler(name, c.Serializer, handlerChainBuilder, delegationTarget.UnprotectedHandler()) s := &GenericAPIServer{ @@ -591,6 +592,9 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G delegationTarget: delegationTarget, EquivalentResourceRegistry: c.EquivalentResourceRegistry, HandlerChainWaitGroup: c.HandlerChainWaitGroup, + Handler: apiServerHandler, + + listedPathProvider: apiServerHandler, minRequestTimeout: time.Duration(c.MinRequestTimeout) * time.Second, ShutdownTimeout: c.RequestTimeout, @@ -598,10 +602,6 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G SecureServingInfo: c.SecureServing, ExternalAddress: c.ExternalAddress, - Handler: apiServerHandler, - - listedPathProvider: apiServerHandler, - openAPIConfig: c.OpenAPIConfig, skipOpenAPIInstallation: c.SkipOpenAPIInstallation, @@ -626,6 +626,8 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G StorageVersionManager: c.StorageVersionManager, Version: c.Version, + + muxCompleteSignals: map[string]<-chan struct{}{}, } for { @@ -657,6 +659,13 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G } } + // register mux signals from the delegated server + for k, v := range delegationTarget.MuxCompleteSignals() { + if err := s.RegisterMuxCompleteSignal(k, v); err != nil { + return nil, err + } + } + genericApiServerHookName := "generic-apiserver-start-informers" if c.SharedInformerFactory != nil { if !s.isPostStartHookRegistered(genericApiServerHookName) { @@ -807,6 +816,7 @@ func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler { } handler = genericapifilters.WithRequestInfo(handler, c.RequestInfoResolver) handler = genericapifilters.WithRequestReceivedTimestamp(handler) + handler = genericapifilters.WithMuxCompleteProtection(handler, c.lifecycleSignals.MuxComplete.Signaled()) handler = genericfilters.WithPanicRecovery(handler, c.RequestInfoResolver) handler = genericapifilters.WithAuditID(handler) return handler diff --git a/staging/src/k8s.io/apiserver/pkg/server/config_test.go b/staging/src/k8s.io/apiserver/pkg/server/config_test.go index 01c3cbafcb8..67bf529f9f3 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/config_test.go +++ b/staging/src/k8s.io/apiserver/pkg/server/config_test.go @@ -298,6 +298,7 @@ func TestAuthenticationAuditAnnotationsDefaultChain(t *testing.T) { RequestInfoResolver: &request.RequestInfoFactory{}, RequestTimeout: 10 * time.Second, LongRunningFunc: func(_ *http.Request, _ *request.RequestInfo) bool { return false }, + lifecycleSignals: newLifecycleSignals(), } h := DefaultBuildHandlerChain(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { diff --git a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go index e67f5fbebad..4f326415d0e 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go +++ b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go @@ -214,6 +214,12 @@ type GenericAPIServer struct { // lifecycleSignals provides access to the various signals that happen during the life cycle of the apiserver. lifecycleSignals lifecycleSignals + // muxCompleteSignals holds signals that indicate all known HTTP paths have been registered. + // it exists primarily to avoid returning a 404 response when a resource actually exists but we haven't installed the path to a handler. + // it is exposed for easier composition of the individual servers. + // the primary users of this field are the WithMuxCompleteProtection filter and the NotFoundHandler + muxCompleteSignals map[string]<-chan struct{} + // ShutdownSendRetryAfter dictates when to initiate shutdown of the HTTP // Server during the graceful termination of the apiserver. If true, we wait // for non longrunning requests in flight to be drained and then initiate a @@ -247,6 +253,9 @@ type DelegationTarget interface { // PrepareRun does post API installation setup steps. It calls recursively the same function of the delegates. PrepareRun() preparedGenericAPIServer + + // MuxCompleteSignals exposes registered signals that indicate if all known HTTP paths have been installed. + MuxCompleteSignals() map[string]<-chan struct{} } func (s *GenericAPIServer) UnprotectedHandler() http.Handler { @@ -270,7 +279,23 @@ func (s *GenericAPIServer) NextDelegate() DelegationTarget { return s.delegationTarget } +// RegisterMuxCompleteSignal registers the given signal that will be used to determine if all known +// HTTP paths have been registered. It is okay to call this method after instantiating the generic server but before running. +func (s *GenericAPIServer) RegisterMuxCompleteSignal(signalName string, signal <-chan struct{}) error { + if _, exists := s.muxCompleteSignals[signalName]; exists { + return fmt.Errorf("%s already registered", signalName) + } + s.muxCompleteSignals[signalName] = signal + return nil +} + +func (s *GenericAPIServer) MuxCompleteSignals() map[string]<-chan struct{} { + return s.muxCompleteSignals +} + type emptyDelegate struct { + // handler is called at the end of the delegation chain + // when a request has been made against an unregistered HTTP path the individual servers will simply pass it through until it reaches the handler. handler http.Handler } @@ -304,6 +329,9 @@ func (s emptyDelegate) NextDelegate() DelegationTarget { func (s emptyDelegate) PrepareRun() preparedGenericAPIServer { return preparedGenericAPIServer{nil} } +func (s emptyDelegate) MuxCompleteSignals() map[string]<-chan struct{} { + return map[string]<-chan struct{}{} +} // preparedGenericAPIServer is a private wrapper that enforces a call of PrepareRun() before Run can be invoked. type preparedGenericAPIServer struct { @@ -351,6 +379,23 @@ func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error { delayedStopCh := s.lifecycleSignals.AfterShutdownDelayDuration shutdownInitiatedCh := s.lifecycleSignals.ShutdownInitiated + // spawn a new goroutine for closing the MuxComplete signal + // registration happens during construction of the generic api server + // the last server in the chain aggregates signals from the previous instances + go func() { + for _, muxInstalledSignal := range s.GenericAPIServer.MuxCompleteSignals() { + select { + case <-muxInstalledSignal: + continue + case <-stopCh: + klog.V(1).Infof("haven't completed %s, stop requested", s.lifecycleSignals.MuxComplete.Name()) + return + } + } + s.lifecycleSignals.MuxComplete.Signal() + klog.V(1).Infof("%s completed, all registered mux complete signals (%d) have finished", s.lifecycleSignals.MuxComplete.Name(), s.GenericAPIServer.MuxCompleteSignals()) + }() + go func() { defer delayedStopCh.Signal() defer klog.V(1).InfoS("[graceful-termination] shutdown event", "name", delayedStopCh.Name()) diff --git a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver_graceful_termination_test.go b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver_graceful_termination_test.go index d7956c9f64e..2f462408b55 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver_graceful_termination_test.go +++ b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver_graceful_termination_test.go @@ -379,6 +379,47 @@ func TestGracefulTerminationWithKeepListeningDuringGracefulTerminationEnabled(t }() } +func TestMuxComplete(t *testing.T) { + // setup + testSignal := make(chan struct{}) + testSignal2 := make(chan struct{}) + s := newGenericAPIServer(t, true) + s.muxCompleteSignals["TestSignal"] = testSignal + s.muxCompleteSignals["TestSignal2"] = testSignal2 + doer := setupDoer(t, s.SecureServingInfo) + isChanClosed := func(ch <-chan struct{}, delay time.Duration) bool { + time.Sleep(delay) + select { + case <-ch: + return true + default: + return false + } + } + + // start the API server + stopCh, runCompletedCh := make(chan struct{}), make(chan struct{}) + go func() { + defer close(runCompletedCh) + s.PrepareRun().Run(stopCh) + }() + waitForAPIServerStarted(t, doer) + + // act + if isChanClosed(s.lifecycleSignals.MuxComplete.Signaled(), 1*time.Second) { + t.Fatalf("%s is closed whereas the TestSignal is still open", s.lifecycleSignals.MuxComplete.Name()) + } + + close(testSignal) + if isChanClosed(s.lifecycleSignals.MuxComplete.Signaled(), 1*time.Second) { + t.Fatalf("%s is closed whereas the TestSignal2 is still open", s.lifecycleSignals.MuxComplete.Name()) + } + + close(testSignal2) + if !isChanClosed(s.lifecycleSignals.MuxComplete.Signaled(), 1*time.Second) { + t.Fatalf("%s wasn't closed", s.lifecycleSignals.MuxComplete.Name()) + } +} func shouldReuseConnection(t *testing.T) func(httptrace.GotConnInfo) { return func(ci httptrace.GotConnInfo) { if !ci.Reused { diff --git a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver_test.go b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver_test.go index 17f8d810409..29841aa6de0 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver_test.go +++ b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver_test.go @@ -485,6 +485,39 @@ func TestNotRestRoutesHaveAuth(t *testing.T) { } } +func TestMuxCompleteSignals(t *testing.T) { + // setup + cfg, assert := setUp(t) + + // scenario 1: single server with some mux signals + root, err := cfg.Complete(nil).New("rootServer", NewEmptyDelegate()) + assert.NoError(err) + if len(root.MuxCompleteSignals()) != 0 { + assert.Error(fmt.Errorf("unexpected mux signals registered %v in the root server", root.MuxCompleteSignals())) + } + root.RegisterMuxCompleteSignal("rootTestSignal", make(chan struct{})) + if len(root.MuxCompleteSignals()) != 1 { + assert.Error(fmt.Errorf("unexpected mux signals registered %v in the root server", root.MuxCompleteSignals())) + } + + // scenario 2: multiple servers with some mux signals + delegate, err := cfg.Complete(nil).New("delegateServer", NewEmptyDelegate()) + assert.NoError(err) + delegate.RegisterMuxCompleteSignal("delegateTestSignal", make(chan struct{})) + if len(delegate.MuxCompleteSignals()) != 1 { + assert.Error(fmt.Errorf("unexpected mux signals registered %v in the delegate server", delegate.MuxCompleteSignals())) + } + newRoot, err := cfg.Complete(nil).New("newRootServer", delegate) + assert.NoError(err) + if len(newRoot.MuxCompleteSignals()) != 1 { + assert.Error(fmt.Errorf("unexpected mux signals registered %v in the newRoot server", newRoot.MuxCompleteSignals())) + } + newRoot.RegisterMuxCompleteSignal("newRootTestSignal", make(chan struct{})) + if len(newRoot.MuxCompleteSignals()) != 2 { + assert.Error(fmt.Errorf("unexpected mux signals registered %v in the newRoot server", newRoot.MuxCompleteSignals())) + } +} + type mockAuthorizer struct { lastURI string } diff --git a/staging/src/k8s.io/apiserver/pkg/server/lifecycle_signals.go b/staging/src/k8s.io/apiserver/pkg/server/lifecycle_signals.go index 1a939e24fe4..bcd7fd33cf7 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/lifecycle_signals.go +++ b/staging/src/k8s.io/apiserver/pkg/server/lifecycle_signals.go @@ -130,6 +130,11 @@ type lifecycleSignals struct { // HasBeenReady is signaled when the readyz endpoint succeeds for the first time. HasBeenReady lifecycleSignal + + // MuxComplete is signaled when all known HTTP paths have been installed. + // It exists primarily to avoid returning a 404 response when a resource actually exists but we haven't installed the path to a handler. + // The actual logic is implemented by an APIServer using the generic server library. + MuxComplete lifecycleSignal } // newLifecycleSignals returns an instance of lifecycleSignals interface to be used @@ -141,6 +146,7 @@ func newLifecycleSignals() lifecycleSignals { InFlightRequestsDrained: newNamedChannelWrapper("InFlightRequestsDrained"), HTTPServerStoppedListening: newNamedChannelWrapper("HTTPServerStoppedListening"), HasBeenReady: newNamedChannelWrapper("HasBeenReady"), + MuxComplete: newNamedChannelWrapper("MuxComplete"), } } From c54463d379a9e84a3d40184be7a4829ec79c4ea5 Mon Sep 17 00:00:00 2001 From: Lukasz Szaszkiewicz Date: Fri, 15 Oct 2021 17:36:38 +0200 Subject: [PATCH 7/9] kube-aggregator: registeres APIServiceRegistrationControllerInitiated as a MuxComplete signal apiServiceRegistrationControllerInitiated is closed when APIServiceRegistrationController has finished "installing" all known APIServices. At this point we know that the proxy handler knows about APIServices and can handle client requests. Before it might have resulted in a 404 response which could have serious consequences for some controllers like GC and NS --- .../kube-aggregator/pkg/apiserver/apiserver.go | 17 ++++++++++++++--- .../openapi/aggregator/aggregator.go | 6 ++++++ 2 files changed, 20 insertions(+), 3 deletions(-) diff --git a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go index 8a80eb65b79..eddf54ada0e 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go @@ -117,6 +117,9 @@ type preparedAPIAggregator struct { type APIAggregator struct { GenericAPIServer *genericapiserver.GenericAPIServer + // provided for easier embedding + APIRegistrationInformers informers.SharedInformerFactory + delegateHandler http.Handler // proxyCurrentCertKeyContent holds he client cert used to identify this proxy. Backing APIServices use this to confirm the proxy's identity @@ -132,9 +135,6 @@ type APIAggregator struct { // controller state lister listers.APIServiceLister - // provided for easier embedding - APIRegistrationInformers informers.SharedInformerFactory - // Information needed to determine routing for the aggregator serviceResolver ServiceResolver @@ -181,6 +181,16 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg 5*time.Minute, // this is effectively used as a refresh interval right now. Might want to do something nicer later on. ) + // apiServiceRegistrationControllerInitiated is closed when APIServiceRegistrationController has finished "installing" all known APIServices. + // At this point we know that the proxy handler knows about APIServices and can handle client requests. + // Before it might have resulted in a 404 response which could have serious consequences for some controllers like GC and NS + // + // Note that the APIServiceRegistrationController waits for APIServiceInformer to synced before doing its work. + apiServiceRegistrationControllerInitiated := make(chan struct{}) + if err := genericServer.RegisterMuxCompleteSignal("APIServiceRegistrationControllerInitiated", apiServiceRegistrationControllerInitiated); err != nil { + return nil, err + } + s := &APIAggregator{ GenericAPIServer: genericServer, delegateHandler: delegationTarget.UnprotectedHandler(), @@ -265,6 +275,7 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg select { case <-context.StopCh: case <-handlerSyncedCh: + close(apiServiceRegistrationControllerInitiated) } return nil diff --git a/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator/aggregator.go b/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator/aggregator.go index 693c635357f..f9b78ddaa54 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator/aggregator.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator/aggregator.go @@ -96,6 +96,12 @@ func BuildAndRegisterAggregator(downloader *Downloader, delegationTarget server. } delegateSpec, etag, _, err := downloader.Download(handler, "") if err != nil { + // ignore errors for the empty delegate we attach at the end the chain + // atm the empty delegate returns 503 when the server hasn't been fully initialized + // and the spec downloader only silences 404s + if len(delegate.ListedPaths()) == 0 && delegate.NextDelegate() == nil { + continue + } return nil, err } if delegateSpec == nil { From 9e2bdfee02a6851fbb13ffe28611e9d2b6242785 Mon Sep 17 00:00:00 2001 From: Lukasz Szaszkiewicz Date: Tue, 19 Oct 2021 12:24:00 +0200 Subject: [PATCH 8/9] rename to muxAndDiscoveryComplete --- cmd/kube-apiserver/app/server.go | 2 +- ..._complete.go => mux_discovery_complete.go} | 22 ++++++------ ...test.go => mux_discovery_complete_test.go} | 27 ++++++++------- .../src/k8s.io/apiserver/pkg/server/config.go | 8 ++--- .../apiserver/pkg/server/genericapiserver.go | 34 +++++++++---------- ...ericapiserver_graceful_termination_test.go | 18 +++++----- .../pkg/server/genericapiserver_test.go | 28 +++++++-------- .../apiserver/pkg/server/lifecycle_signals.go | 6 ++-- .../util/notfoundhandler/not_found_handler.go | 10 +++--- .../notfoundhandler/not_found_handler_test.go | 8 ++--- .../pkg/apiserver/apiserver.go | 2 +- 11 files changed, 83 insertions(+), 82 deletions(-) rename staging/src/k8s.io/apiserver/pkg/endpoints/filters/{mux_complete.go => mux_discovery_complete.go} (60%) rename staging/src/k8s.io/apiserver/pkg/endpoints/filters/{mux_complete_test.go => mux_discovery_complete_test.go} (50%) diff --git a/cmd/kube-apiserver/app/server.go b/cmd/kube-apiserver/app/server.go index 5f3b510a7bf..0e730789e83 100644 --- a/cmd/kube-apiserver/app/server.go +++ b/cmd/kube-apiserver/app/server.go @@ -190,7 +190,7 @@ func CreateServerChain(completedOptions completedServerRunOptions, stopCh <-chan return nil, err } - notFoundHandler := notfoundhandler.New(kubeAPIServerConfig.GenericConfig.Serializer, genericapifilters.HasMuxCompleteProtectionKey) + notFoundHandler := notfoundhandler.New(kubeAPIServerConfig.GenericConfig.Serializer, genericapifilters.NoMuxAndDiscoveryIncompleteKey) apiExtensionsServer, err := createAPIExtensionsServer(apiExtensionsConfig, genericapiserver.NewEmptyDelegateWithCustomHandler(notFoundHandler)) if err != nil { return nil, err diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/filters/mux_complete.go b/staging/src/k8s.io/apiserver/pkg/endpoints/filters/mux_discovery_complete.go similarity index 60% rename from staging/src/k8s.io/apiserver/pkg/endpoints/filters/mux_complete.go rename to staging/src/k8s.io/apiserver/pkg/endpoints/filters/mux_discovery_complete.go index 1c17b06aa63..818f648f849 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/filters/mux_complete.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/filters/mux_discovery_complete.go @@ -21,21 +21,21 @@ import ( "net/http" ) -type muxCompleteProtectionKeyType int +type muxAndDiscoveryIncompleteKeyType int const ( - // muxCompleteProtectionKey is a key under which a protection signal for all requests made before the server have installed all known HTTP paths is stored in the request's context - muxCompleteProtectionKey muxCompleteProtectionKeyType = iota + // muxAndDiscoveryIncompleteKey is a key under which a protection signal for all requests made before the server have installed all known HTTP paths is stored in the request's context + muxAndDiscoveryIncompleteKey muxAndDiscoveryIncompleteKeyType = iota ) -// HasMuxCompleteProtectionKey checks if the context contains muxCompleteProtectionKey. +// NoMuxAndDiscoveryIncompleteKey checks if the context contains muxAndDiscoveryIncompleteKey. // The presence of the key indicates the request has been made when the HTTP paths weren't installed. -func HasMuxCompleteProtectionKey(ctx context.Context) bool { - muxCompleteProtectionKeyValue, _ := ctx.Value(muxCompleteProtectionKey).(string) - return len(muxCompleteProtectionKeyValue) != 0 +func NoMuxAndDiscoveryIncompleteKey(ctx context.Context) bool { + muxAndDiscoveryCompleteProtectionKeyValue, _ := ctx.Value(muxAndDiscoveryIncompleteKey).(string) + return len(muxAndDiscoveryCompleteProtectionKeyValue) == 0 } -// WithMuxCompleteProtection puts the muxCompleteProtectionKey in the context if a request has been made before muxCompleteSignal has been ready. +// WithMuxAndDiscoveryComplete puts the muxAndDiscoveryIncompleteKey in the context if a request has been made before muxAndDiscoveryCompleteSignal has been ready. // Putting the key protect us from returning a 404 response instead of a 503. // It is especially important for controllers like GC and NS since they act on 404s. // @@ -44,10 +44,10 @@ func HasMuxCompleteProtectionKey(ctx context.Context) bool { // The race may happen when a request reaches the NotFoundHandler because not all paths have been registered in the mux // but when the registered checks are examined in the handler they indicate that the paths have been actually installed. // In that case, the presence of the key will make the handler return 503 instead of 404. -func WithMuxCompleteProtection(handler http.Handler, muxCompleteSignal <-chan struct{}) http.Handler { +func WithMuxAndDiscoveryComplete(handler http.Handler, muxAndDiscoveryCompleteSignal <-chan struct{}) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { - if muxCompleteSignal != nil && !isClosed(muxCompleteSignal) { - req = req.WithContext(context.WithValue(req.Context(), muxCompleteProtectionKey, "MuxInstallationNotComplete")) + if muxAndDiscoveryCompleteSignal != nil && !isClosed(muxAndDiscoveryCompleteSignal) { + req = req.WithContext(context.WithValue(req.Context(), muxAndDiscoveryIncompleteKey, "MuxAndDiscoveryInstallationNotComplete")) } handler.ServeHTTP(w, req) }) diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/filters/mux_complete_test.go b/staging/src/k8s.io/apiserver/pkg/endpoints/filters/mux_discovery_complete_test.go similarity index 50% rename from staging/src/k8s.io/apiserver/pkg/endpoints/filters/mux_complete_test.go rename to staging/src/k8s.io/apiserver/pkg/endpoints/filters/mux_discovery_complete_test.go index daaa7504e67..d569b6d8971 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/filters/mux_complete_test.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/filters/mux_discovery_complete_test.go @@ -23,23 +23,24 @@ import ( "testing" ) -func TestWithMuxCompleteProtectionFilter(t *testing.T) { +func TestWithMuxAndDiscoveryCompleteProtection(t *testing.T) { scenarios := []struct { - name string - muxCompleteSignal <-chan struct{} - expectMuxCompleteProtectionKey bool + name string + muxAndDiscoveryCompleteSignal <-chan struct{} + expectNoMuxAndDiscoIncompleteKey bool }{ { - name: "no signals, no protection key in the ctx", + name: "no signals, no key in the ctx", + expectNoMuxAndDiscoIncompleteKey: true, }, { - name: "signal ready, no protection key in the ctx", - muxCompleteSignal: func() chan struct{} { ch := make(chan struct{}); close(ch); return ch }(), + name: "signal ready, no key in the ctx", + muxAndDiscoveryCompleteSignal: func() chan struct{} { ch := make(chan struct{}); close(ch); return ch }(), + expectNoMuxAndDiscoIncompleteKey: true, }, { - name: "signal not ready, the protection key in the ctx", - muxCompleteSignal: make(chan struct{}), - expectMuxCompleteProtectionKey: true, + name: "signal not ready, the key in the ctx", + muxAndDiscoveryCompleteSignal: make(chan struct{}), }, } @@ -50,15 +51,15 @@ func TestWithMuxCompleteProtectionFilter(t *testing.T) { delegate := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { actualContext = req.Context() }) - target := WithMuxCompleteProtection(delegate, scenario.muxCompleteSignal) + target := WithMuxAndDiscoveryComplete(delegate, scenario.muxAndDiscoveryCompleteSignal) // act req := &http.Request{} target.ServeHTTP(httptest.NewRecorder(), req) // validate - if scenario.expectMuxCompleteProtectionKey != HasMuxCompleteProtectionKey(actualContext) { - t.Fatalf("expectMuxCompleteProtectionKey in the context = %v, does the actual context contain the key = %v", scenario.expectMuxCompleteProtectionKey, HasMuxCompleteProtectionKey(actualContext)) + if scenario.expectNoMuxAndDiscoIncompleteKey != NoMuxAndDiscoveryIncompleteKey(actualContext) { + t.Fatalf("expectNoMuxAndDiscoIncompleteKey in the context = %v, does the actual context contain the key = %v", scenario.expectNoMuxAndDiscoIncompleteKey, NoMuxAndDiscoveryIncompleteKey(actualContext)) } }) } diff --git a/staging/src/k8s.io/apiserver/pkg/server/config.go b/staging/src/k8s.io/apiserver/pkg/server/config.go index 3e3ad59daf4..965989811f8 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/config.go +++ b/staging/src/k8s.io/apiserver/pkg/server/config.go @@ -627,7 +627,7 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G Version: c.Version, - muxCompleteSignals: map[string]<-chan struct{}{}, + muxAndDiscoveryCompleteSignals: map[string]<-chan struct{}{}, } for { @@ -660,8 +660,8 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G } // register mux signals from the delegated server - for k, v := range delegationTarget.MuxCompleteSignals() { - if err := s.RegisterMuxCompleteSignal(k, v); err != nil { + for k, v := range delegationTarget.MuxAndDiscoveryCompleteSignals() { + if err := s.RegisterMuxAndDiscoveryCompleteSignal(k, v); err != nil { return nil, err } } @@ -816,7 +816,7 @@ func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler { } handler = genericapifilters.WithRequestInfo(handler, c.RequestInfoResolver) handler = genericapifilters.WithRequestReceivedTimestamp(handler) - handler = genericapifilters.WithMuxCompleteProtection(handler, c.lifecycleSignals.MuxComplete.Signaled()) + handler = genericapifilters.WithMuxAndDiscoveryComplete(handler, c.lifecycleSignals.MuxAndDiscoveryComplete.Signaled()) handler = genericfilters.WithPanicRecovery(handler, c.RequestInfoResolver) handler = genericapifilters.WithAuditID(handler) return handler diff --git a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go index 4f326415d0e..4308e835d0a 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go +++ b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go @@ -214,11 +214,11 @@ type GenericAPIServer struct { // lifecycleSignals provides access to the various signals that happen during the life cycle of the apiserver. lifecycleSignals lifecycleSignals - // muxCompleteSignals holds signals that indicate all known HTTP paths have been registered. + // muxAndDiscoveryCompleteSignals holds signals that indicate all known HTTP paths have been registered. // it exists primarily to avoid returning a 404 response when a resource actually exists but we haven't installed the path to a handler. // it is exposed for easier composition of the individual servers. // the primary users of this field are the WithMuxCompleteProtection filter and the NotFoundHandler - muxCompleteSignals map[string]<-chan struct{} + muxAndDiscoveryCompleteSignals map[string]<-chan struct{} // ShutdownSendRetryAfter dictates when to initiate shutdown of the HTTP // Server during the graceful termination of the apiserver. If true, we wait @@ -254,8 +254,8 @@ type DelegationTarget interface { // PrepareRun does post API installation setup steps. It calls recursively the same function of the delegates. PrepareRun() preparedGenericAPIServer - // MuxCompleteSignals exposes registered signals that indicate if all known HTTP paths have been installed. - MuxCompleteSignals() map[string]<-chan struct{} + // MuxAndDiscoveryCompleteSignals exposes registered signals that indicate if all known HTTP paths have been installed. + MuxAndDiscoveryCompleteSignals() map[string]<-chan struct{} } func (s *GenericAPIServer) UnprotectedHandler() http.Handler { @@ -279,18 +279,18 @@ func (s *GenericAPIServer) NextDelegate() DelegationTarget { return s.delegationTarget } -// RegisterMuxCompleteSignal registers the given signal that will be used to determine if all known +// RegisterMuxAndDiscoveryCompleteSignal registers the given signal that will be used to determine if all known // HTTP paths have been registered. It is okay to call this method after instantiating the generic server but before running. -func (s *GenericAPIServer) RegisterMuxCompleteSignal(signalName string, signal <-chan struct{}) error { - if _, exists := s.muxCompleteSignals[signalName]; exists { +func (s *GenericAPIServer) RegisterMuxAndDiscoveryCompleteSignal(signalName string, signal <-chan struct{}) error { + if _, exists := s.muxAndDiscoveryCompleteSignals[signalName]; exists { return fmt.Errorf("%s already registered", signalName) } - s.muxCompleteSignals[signalName] = signal + s.muxAndDiscoveryCompleteSignals[signalName] = signal return nil } -func (s *GenericAPIServer) MuxCompleteSignals() map[string]<-chan struct{} { - return s.muxCompleteSignals +func (s *GenericAPIServer) MuxAndDiscoveryCompleteSignals() map[string]<-chan struct{} { + return s.muxAndDiscoveryCompleteSignals } type emptyDelegate struct { @@ -329,7 +329,7 @@ func (s emptyDelegate) NextDelegate() DelegationTarget { func (s emptyDelegate) PrepareRun() preparedGenericAPIServer { return preparedGenericAPIServer{nil} } -func (s emptyDelegate) MuxCompleteSignals() map[string]<-chan struct{} { +func (s emptyDelegate) MuxAndDiscoveryCompleteSignals() map[string]<-chan struct{} { return map[string]<-chan struct{}{} } @@ -379,21 +379,21 @@ func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error { delayedStopCh := s.lifecycleSignals.AfterShutdownDelayDuration shutdownInitiatedCh := s.lifecycleSignals.ShutdownInitiated - // spawn a new goroutine for closing the MuxComplete signal + // spawn a new goroutine for closing the MuxAndDiscoveryComplete signal // registration happens during construction of the generic api server // the last server in the chain aggregates signals from the previous instances go func() { - for _, muxInstalledSignal := range s.GenericAPIServer.MuxCompleteSignals() { + for _, muxAndDiscoveryCompletedSignal := range s.GenericAPIServer.MuxAndDiscoveryCompleteSignals() { select { - case <-muxInstalledSignal: + case <-muxAndDiscoveryCompletedSignal: continue case <-stopCh: - klog.V(1).Infof("haven't completed %s, stop requested", s.lifecycleSignals.MuxComplete.Name()) + klog.V(1).Infof("haven't completed %s, stop requested", s.lifecycleSignals.MuxAndDiscoveryComplete.Name()) return } } - s.lifecycleSignals.MuxComplete.Signal() - klog.V(1).Infof("%s completed, all registered mux complete signals (%d) have finished", s.lifecycleSignals.MuxComplete.Name(), s.GenericAPIServer.MuxCompleteSignals()) + s.lifecycleSignals.MuxAndDiscoveryComplete.Signal() + klog.V(1).Infof("%s has all endpoints registered and discovery information is complete", s.lifecycleSignals.MuxAndDiscoveryComplete.Name()) }() go func() { diff --git a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver_graceful_termination_test.go b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver_graceful_termination_test.go index 2f462408b55..32a74285265 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver_graceful_termination_test.go +++ b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver_graceful_termination_test.go @@ -379,13 +379,13 @@ func TestGracefulTerminationWithKeepListeningDuringGracefulTerminationEnabled(t }() } -func TestMuxComplete(t *testing.T) { +func TestMuxAndDiscoveryComplete(t *testing.T) { // setup testSignal := make(chan struct{}) testSignal2 := make(chan struct{}) s := newGenericAPIServer(t, true) - s.muxCompleteSignals["TestSignal"] = testSignal - s.muxCompleteSignals["TestSignal2"] = testSignal2 + s.muxAndDiscoveryCompleteSignals["TestSignal"] = testSignal + s.muxAndDiscoveryCompleteSignals["TestSignal2"] = testSignal2 doer := setupDoer(t, s.SecureServingInfo) isChanClosed := func(ch <-chan struct{}, delay time.Duration) bool { time.Sleep(delay) @@ -406,18 +406,18 @@ func TestMuxComplete(t *testing.T) { waitForAPIServerStarted(t, doer) // act - if isChanClosed(s.lifecycleSignals.MuxComplete.Signaled(), 1*time.Second) { - t.Fatalf("%s is closed whereas the TestSignal is still open", s.lifecycleSignals.MuxComplete.Name()) + if isChanClosed(s.lifecycleSignals.MuxAndDiscoveryComplete.Signaled(), 1*time.Second) { + t.Fatalf("%s is closed whereas the TestSignal is still open", s.lifecycleSignals.MuxAndDiscoveryComplete.Name()) } close(testSignal) - if isChanClosed(s.lifecycleSignals.MuxComplete.Signaled(), 1*time.Second) { - t.Fatalf("%s is closed whereas the TestSignal2 is still open", s.lifecycleSignals.MuxComplete.Name()) + if isChanClosed(s.lifecycleSignals.MuxAndDiscoveryComplete.Signaled(), 1*time.Second) { + t.Fatalf("%s is closed whereas the TestSignal2 is still open", s.lifecycleSignals.MuxAndDiscoveryComplete.Name()) } close(testSignal2) - if !isChanClosed(s.lifecycleSignals.MuxComplete.Signaled(), 1*time.Second) { - t.Fatalf("%s wasn't closed", s.lifecycleSignals.MuxComplete.Name()) + if !isChanClosed(s.lifecycleSignals.MuxAndDiscoveryComplete.Signaled(), 1*time.Second) { + t.Fatalf("%s wasn't closed", s.lifecycleSignals.MuxAndDiscoveryComplete.Name()) } } func shouldReuseConnection(t *testing.T) func(httptrace.GotConnInfo) { diff --git a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver_test.go b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver_test.go index 29841aa6de0..df14f3d1215 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver_test.go +++ b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver_test.go @@ -485,36 +485,36 @@ func TestNotRestRoutesHaveAuth(t *testing.T) { } } -func TestMuxCompleteSignals(t *testing.T) { +func TestMuxAndDiscoveryCompleteSignals(t *testing.T) { // setup cfg, assert := setUp(t) // scenario 1: single server with some mux signals root, err := cfg.Complete(nil).New("rootServer", NewEmptyDelegate()) assert.NoError(err) - if len(root.MuxCompleteSignals()) != 0 { - assert.Error(fmt.Errorf("unexpected mux signals registered %v in the root server", root.MuxCompleteSignals())) + if len(root.MuxAndDiscoveryCompleteSignals()) != 0 { + assert.Error(fmt.Errorf("unexpected mux signals registered %v in the root server", root.MuxAndDiscoveryCompleteSignals())) } - root.RegisterMuxCompleteSignal("rootTestSignal", make(chan struct{})) - if len(root.MuxCompleteSignals()) != 1 { - assert.Error(fmt.Errorf("unexpected mux signals registered %v in the root server", root.MuxCompleteSignals())) + root.RegisterMuxAndDiscoveryCompleteSignal("rootTestSignal", make(chan struct{})) + if len(root.MuxAndDiscoveryCompleteSignals()) != 1 { + assert.Error(fmt.Errorf("unexpected mux signals registered %v in the root server", root.MuxAndDiscoveryCompleteSignals())) } // scenario 2: multiple servers with some mux signals delegate, err := cfg.Complete(nil).New("delegateServer", NewEmptyDelegate()) assert.NoError(err) - delegate.RegisterMuxCompleteSignal("delegateTestSignal", make(chan struct{})) - if len(delegate.MuxCompleteSignals()) != 1 { - assert.Error(fmt.Errorf("unexpected mux signals registered %v in the delegate server", delegate.MuxCompleteSignals())) + delegate.RegisterMuxAndDiscoveryCompleteSignal("delegateTestSignal", make(chan struct{})) + if len(delegate.MuxAndDiscoveryCompleteSignals()) != 1 { + assert.Error(fmt.Errorf("unexpected mux signals registered %v in the delegate server", delegate.MuxAndDiscoveryCompleteSignals())) } newRoot, err := cfg.Complete(nil).New("newRootServer", delegate) assert.NoError(err) - if len(newRoot.MuxCompleteSignals()) != 1 { - assert.Error(fmt.Errorf("unexpected mux signals registered %v in the newRoot server", newRoot.MuxCompleteSignals())) + if len(newRoot.MuxAndDiscoveryCompleteSignals()) != 1 { + assert.Error(fmt.Errorf("unexpected mux signals registered %v in the newRoot server", newRoot.MuxAndDiscoveryCompleteSignals())) } - newRoot.RegisterMuxCompleteSignal("newRootTestSignal", make(chan struct{})) - if len(newRoot.MuxCompleteSignals()) != 2 { - assert.Error(fmt.Errorf("unexpected mux signals registered %v in the newRoot server", newRoot.MuxCompleteSignals())) + newRoot.RegisterMuxAndDiscoveryCompleteSignal("newRootTestSignal", make(chan struct{})) + if len(newRoot.MuxAndDiscoveryCompleteSignals()) != 2 { + assert.Error(fmt.Errorf("unexpected mux signals registered %v in the newRoot server", newRoot.MuxAndDiscoveryCompleteSignals())) } } diff --git a/staging/src/k8s.io/apiserver/pkg/server/lifecycle_signals.go b/staging/src/k8s.io/apiserver/pkg/server/lifecycle_signals.go index bcd7fd33cf7..6b406072b61 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/lifecycle_signals.go +++ b/staging/src/k8s.io/apiserver/pkg/server/lifecycle_signals.go @@ -131,10 +131,10 @@ type lifecycleSignals struct { // HasBeenReady is signaled when the readyz endpoint succeeds for the first time. HasBeenReady lifecycleSignal - // MuxComplete is signaled when all known HTTP paths have been installed. + // MuxAndDiscoveryComplete is signaled when all known HTTP paths have been installed. // It exists primarily to avoid returning a 404 response when a resource actually exists but we haven't installed the path to a handler. // The actual logic is implemented by an APIServer using the generic server library. - MuxComplete lifecycleSignal + MuxAndDiscoveryComplete lifecycleSignal } // newLifecycleSignals returns an instance of lifecycleSignals interface to be used @@ -146,7 +146,7 @@ func newLifecycleSignals() lifecycleSignals { InFlightRequestsDrained: newNamedChannelWrapper("InFlightRequestsDrained"), HTTPServerStoppedListening: newNamedChannelWrapper("HTTPServerStoppedListening"), HasBeenReady: newNamedChannelWrapper("HasBeenReady"), - MuxComplete: newNamedChannelWrapper("MuxComplete"), + MuxAndDiscoveryComplete: newNamedChannelWrapper("MuxAndDiscoveryComplete"), } } diff --git a/staging/src/k8s.io/apiserver/pkg/util/notfoundhandler/not_found_handler.go b/staging/src/k8s.io/apiserver/pkg/util/notfoundhandler/not_found_handler.go index a828637f572..1a8d3aac180 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/notfoundhandler/not_found_handler.go +++ b/staging/src/k8s.io/apiserver/pkg/util/notfoundhandler/not_found_handler.go @@ -35,17 +35,17 @@ import ( // // Note that we don't want to add additional checks to the readyz path as it might prevent fixing bricked clusters. // This specific handler is meant to "protect" requests that arrive before the paths and handlers are fully initialized. -func New(serializer runtime.NegotiatedSerializer, hasMuxIncompleteKeyFn func(ctx context.Context) bool) *Handler { - return &Handler{serializer: serializer, hasMuxIncompleteKeyFn: hasMuxIncompleteKeyFn} +func New(serializer runtime.NegotiatedSerializer, isMuxAndDiscoveryCompleteFn func(ctx context.Context) bool) *Handler { + return &Handler{serializer: serializer, isMuxAndDiscoveryCompleteFn: isMuxAndDiscoveryCompleteFn} } type Handler struct { - serializer runtime.NegotiatedSerializer - hasMuxIncompleteKeyFn func(ctx context.Context) bool + serializer runtime.NegotiatedSerializer + isMuxAndDiscoveryCompleteFn func(ctx context.Context) bool } func (h *Handler) ServeHTTP(rw http.ResponseWriter, req *http.Request) { - if h.hasMuxIncompleteKeyFn(req.Context()) { + if !h.isMuxAndDiscoveryCompleteFn(req.Context()) { errMsg := fmt.Sprintf("the request has been made before all known HTTP paths have been installed, please try again") err := apierrors.NewServiceUnavailable(errMsg) if err.ErrStatus.Details == nil { diff --git a/staging/src/k8s.io/apiserver/pkg/util/notfoundhandler/not_found_handler_test.go b/staging/src/k8s.io/apiserver/pkg/util/notfoundhandler/not_found_handler_test.go index 0906e5cb12c..57a67200683 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/notfoundhandler/not_found_handler_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/notfoundhandler/not_found_handler_test.go @@ -28,10 +28,10 @@ import ( ) func TestNotFoundHandler(t *testing.T) { - hasMuxIncompleteKeyGlobalValue := false - hasMuxIncompleteKeyTestFn := func(ctx context.Context) bool { return hasMuxIncompleteKeyGlobalValue } + isMuxAndDiscoveryCompleteGlobalValue := true + isMuxAndDiscoveryCompleteTestFn := func(ctx context.Context) bool { return isMuxAndDiscoveryCompleteGlobalValue } serializer := serializer.NewCodecFactory(runtime.NewScheme()).WithoutConversion() - target := New(serializer, hasMuxIncompleteKeyTestFn) + target := New(serializer, isMuxAndDiscoveryCompleteTestFn) // scenario 1: pretend the request has been made after the signal has been ready req := httptest.NewRequest("GET", "http://apiserver.com/apis/flowcontrol.apiserver.k8s.io/v1beta1", nil) @@ -54,7 +54,7 @@ func TestNotFoundHandler(t *testing.T) { } // scenario 2: pretend the request has been made before the signal has been ready - hasMuxIncompleteKeyGlobalValue = true + isMuxAndDiscoveryCompleteGlobalValue = false rw = httptest.NewRecorder() target.ServeHTTP(rw, req) diff --git a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go index eddf54ada0e..4733dcfc711 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go @@ -187,7 +187,7 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg // // Note that the APIServiceRegistrationController waits for APIServiceInformer to synced before doing its work. apiServiceRegistrationControllerInitiated := make(chan struct{}) - if err := genericServer.RegisterMuxCompleteSignal("APIServiceRegistrationControllerInitiated", apiServiceRegistrationControllerInitiated); err != nil { + if err := genericServer.RegisterMuxAndDiscoveryCompleteSignal("APIServiceRegistrationControllerInitiated", apiServiceRegistrationControllerInitiated); err != nil { return nil, err } From 5116a508a7bf84844f4987ab2db14af88bfd296f Mon Sep 17 00:00:00 2001 From: Lukasz Szaszkiewicz Date: Wed, 20 Oct 2021 11:06:27 +0200 Subject: [PATCH 9/9] aggregator: pass apiServiceRegistrationControllerInitiated signal directly to apiserviceRegistration controller --- .../endpoints/filters/mux_discovery_complete.go | 6 +++--- .../genericapiserver_graceful_termination_test.go | 6 +++--- .../apiserver/pkg/server/genericapiserver_test.go | 14 +++++++------- .../pkg/util/notfoundhandler/not_found_handler.go | 3 +-- .../kube-aggregator/pkg/apiserver/apiserver.go | 6 ++---- 5 files changed, 16 insertions(+), 19 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/filters/mux_discovery_complete.go b/staging/src/k8s.io/apiserver/pkg/endpoints/filters/mux_discovery_complete.go index 818f648f849..d2fee3b1587 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/filters/mux_discovery_complete.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/filters/mux_discovery_complete.go @@ -41,9 +41,9 @@ func NoMuxAndDiscoveryIncompleteKey(ctx context.Context) bool { // // The presence of the key is checked in the NotFoundHandler (staging/src/k8s.io/apiserver/pkg/util/notfoundhandler/not_found_handler.go) // -// The race may happen when a request reaches the NotFoundHandler because not all paths have been registered in the mux -// but when the registered checks are examined in the handler they indicate that the paths have been actually installed. -// In that case, the presence of the key will make the handler return 503 instead of 404. +// The primary reason this filter exists is to protect from a potential race between the client's requests reaching the NotFoundHandler and the server becoming ready. +// Without the protection key a request could still get a 404 response when the registered signals changed their status just slightly before reaching the new handler. +// In that case, the presence of the key will make the handler return a 503 instead of a 404. func WithMuxAndDiscoveryComplete(handler http.Handler, muxAndDiscoveryCompleteSignal <-chan struct{}) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { if muxAndDiscoveryCompleteSignal != nil && !isClosed(muxAndDiscoveryCompleteSignal) { diff --git a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver_graceful_termination_test.go b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver_graceful_termination_test.go index 32a74285265..6c84271f0e2 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver_graceful_termination_test.go +++ b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver_graceful_termination_test.go @@ -381,10 +381,10 @@ func TestGracefulTerminationWithKeepListeningDuringGracefulTerminationEnabled(t func TestMuxAndDiscoveryComplete(t *testing.T) { // setup - testSignal := make(chan struct{}) + testSignal1 := make(chan struct{}) testSignal2 := make(chan struct{}) s := newGenericAPIServer(t, true) - s.muxAndDiscoveryCompleteSignals["TestSignal"] = testSignal + s.muxAndDiscoveryCompleteSignals["TestSignal1"] = testSignal1 s.muxAndDiscoveryCompleteSignals["TestSignal2"] = testSignal2 doer := setupDoer(t, s.SecureServingInfo) isChanClosed := func(ch <-chan struct{}, delay time.Duration) bool { @@ -410,7 +410,7 @@ func TestMuxAndDiscoveryComplete(t *testing.T) { t.Fatalf("%s is closed whereas the TestSignal is still open", s.lifecycleSignals.MuxAndDiscoveryComplete.Name()) } - close(testSignal) + close(testSignal1) if isChanClosed(s.lifecycleSignals.MuxAndDiscoveryComplete.Signaled(), 1*time.Second) { t.Fatalf("%s is closed whereas the TestSignal2 is still open", s.lifecycleSignals.MuxAndDiscoveryComplete.Name()) } diff --git a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver_test.go b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver_test.go index df14f3d1215..af452347c83 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver_test.go +++ b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver_test.go @@ -489,32 +489,32 @@ func TestMuxAndDiscoveryCompleteSignals(t *testing.T) { // setup cfg, assert := setUp(t) - // scenario 1: single server with some mux signals + // scenario 1: single server with some signals root, err := cfg.Complete(nil).New("rootServer", NewEmptyDelegate()) assert.NoError(err) if len(root.MuxAndDiscoveryCompleteSignals()) != 0 { - assert.Error(fmt.Errorf("unexpected mux signals registered %v in the root server", root.MuxAndDiscoveryCompleteSignals())) + assert.Error(fmt.Errorf("unexpected signals %v registered in the root server", root.MuxAndDiscoveryCompleteSignals())) } root.RegisterMuxAndDiscoveryCompleteSignal("rootTestSignal", make(chan struct{})) if len(root.MuxAndDiscoveryCompleteSignals()) != 1 { - assert.Error(fmt.Errorf("unexpected mux signals registered %v in the root server", root.MuxAndDiscoveryCompleteSignals())) + assert.Error(fmt.Errorf("unexpected signals %v registered in the root server", root.MuxAndDiscoveryCompleteSignals())) } - // scenario 2: multiple servers with some mux signals + // scenario 2: multiple servers with some signals delegate, err := cfg.Complete(nil).New("delegateServer", NewEmptyDelegate()) assert.NoError(err) delegate.RegisterMuxAndDiscoveryCompleteSignal("delegateTestSignal", make(chan struct{})) if len(delegate.MuxAndDiscoveryCompleteSignals()) != 1 { - assert.Error(fmt.Errorf("unexpected mux signals registered %v in the delegate server", delegate.MuxAndDiscoveryCompleteSignals())) + assert.Error(fmt.Errorf("unexpected signals %v registered in the delegate server", delegate.MuxAndDiscoveryCompleteSignals())) } newRoot, err := cfg.Complete(nil).New("newRootServer", delegate) assert.NoError(err) if len(newRoot.MuxAndDiscoveryCompleteSignals()) != 1 { - assert.Error(fmt.Errorf("unexpected mux signals registered %v in the newRoot server", newRoot.MuxAndDiscoveryCompleteSignals())) + assert.Error(fmt.Errorf("unexpected signals %v registered in the newRoot server", newRoot.MuxAndDiscoveryCompleteSignals())) } newRoot.RegisterMuxAndDiscoveryCompleteSignal("newRootTestSignal", make(chan struct{})) if len(newRoot.MuxAndDiscoveryCompleteSignals()) != 2 { - assert.Error(fmt.Errorf("unexpected mux signals registered %v in the newRoot server", newRoot.MuxAndDiscoveryCompleteSignals())) + assert.Error(fmt.Errorf("unexpected signals %v registered in the newRoot server", newRoot.MuxAndDiscoveryCompleteSignals())) } } diff --git a/staging/src/k8s.io/apiserver/pkg/util/notfoundhandler/not_found_handler.go b/staging/src/k8s.io/apiserver/pkg/util/notfoundhandler/not_found_handler.go index 1a8d3aac180..93c1d0a0454 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/notfoundhandler/not_found_handler.go +++ b/staging/src/k8s.io/apiserver/pkg/util/notfoundhandler/not_found_handler.go @@ -18,7 +18,6 @@ package notfoundhandler import ( "context" - "fmt" "net/http" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -46,7 +45,7 @@ type Handler struct { func (h *Handler) ServeHTTP(rw http.ResponseWriter, req *http.Request) { if !h.isMuxAndDiscoveryCompleteFn(req.Context()) { - errMsg := fmt.Sprintf("the request has been made before all known HTTP paths have been installed, please try again") + errMsg := "the request has been made before all known HTTP paths have been installed, please try again" err := apierrors.NewServiceUnavailable(errMsg) if err.ErrStatus.Details == nil { err.ErrStatus.Details = &metav1.StatusDetails{} diff --git a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go index 4733dcfc711..4558130c364 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go @@ -270,12 +270,10 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg return nil }) s.GenericAPIServer.AddPostStartHookOrDie("apiservice-registration-controller", func(context genericapiserver.PostStartHookContext) error { - handlerSyncedCh := make(chan struct{}) - go apiserviceRegistrationController.Run(context.StopCh, handlerSyncedCh) + go apiserviceRegistrationController.Run(context.StopCh, apiServiceRegistrationControllerInitiated) select { case <-context.StopCh: - case <-handlerSyncedCh: - close(apiServiceRegistrationControllerInitiated) + case <-apiServiceRegistrationControllerInitiated: } return nil