From bccef75d7ae43be333e068748bb6f998dafa6d9d Mon Sep 17 00:00:00 2001 From: deads2k Date: Mon, 13 Mar 2017 15:45:46 -0400 Subject: [PATCH] allow combining API servers --- .../src/k8s.io/apiserver/pkg/server/config.go | 70 +++++++- .../apiserver/pkg/server/config_test.go | 152 ++++++++++++++++++ .../apiserver/pkg/server/genericapiserver.go | 36 +++++ .../apiserver/pkg/server/mux/container.go | 13 ++ .../apiserver/pkg/server/mux/pathrecorder.go | 10 +- .../pkg/server/mux/pathrecorder_test.go | 4 +- .../apiserver/pkg/server/routes/index.go | 43 +++-- vendor/BUILD | 7 +- 8 files changed, 312 insertions(+), 23 deletions(-) create mode 100644 staging/src/k8s.io/apiserver/pkg/server/config_test.go diff --git a/staging/src/k8s.io/apiserver/pkg/server/config.go b/staging/src/k8s.io/apiserver/pkg/server/config.go index 059ef95635b..63dc5f04ab2 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/config.go +++ b/staging/src/k8s.io/apiserver/pkg/server/config.go @@ -376,6 +376,15 @@ func (c *Config) SkipComplete() completedConfig { // auth, then the caller should create a handler for those endpoints, which delegates the // any unhandled paths to "Handler". func (c completedConfig) New() (*GenericAPIServer, error) { + s, err := c.constructServer() + if err != nil { + return nil, err + } + + return c.buildHandlers(s, nil) +} + +func (c completedConfig) constructServer() (*GenericAPIServer, error) { if c.Serializer == nil { return nil, fmt.Errorf("Genericapiserver.New() called with config.Serializer == nil") } @@ -383,6 +392,8 @@ func (c completedConfig) New() (*GenericAPIServer, error) { return nil, fmt.Errorf("Genericapiserver.New() called with config.LoopbackClientConfig == nil") } + handlerContainer := mux.NewAPIContainer(http.NewServeMux(), c.Serializer, c.FallThroughHandler) + s := &GenericAPIServer{ discoveryAddresses: c.DiscoveryAddresses, LoopbackClientConfig: c.LoopbackClientConfig, @@ -399,8 +410,11 @@ func (c completedConfig) New() (*GenericAPIServer, error) { apiGroupsForDiscovery: map[string]metav1.APIGroup{}, + HandlerContainer: handlerContainer, FallThroughHandler: c.FallThroughHandler, + listedPathProvider: routes.ListedPathProviders{handlerContainer, c.FallThroughHandler}, + swaggerConfig: c.SwaggerConfig, openAPIConfig: c.OpenAPIConfig, @@ -408,8 +422,48 @@ func (c completedConfig) New() (*GenericAPIServer, error) { healthzChecks: c.HealthzChecks, } - s.HandlerContainer = mux.NewAPIContainer(http.NewServeMux(), c.Serializer, s.FallThroughHandler) + return s, nil +} +// NewWithDelegate creates a new server which logically combines the handling chain with the passed server. +func (c completedConfig) NewWithDelegate(delegationTarget DelegationTarget) (*GenericAPIServer, error) { + // some pieces of the delegationTarget take precendence. Callers should already have ensured that these + // were wired correctly. Documenting them here. + // c.RequestContextMapper = delegationTarget.RequestContextMapper() + + s, err := c.constructServer() + if err != nil { + return nil, err + } + + for k, v := range delegationTarget.PostStartHooks() { + s.postStartHooks[k] = v + } + + for _, delegateCheck := range delegationTarget.HealthzChecks() { + skip := false + for _, existingCheck := range c.HealthzChecks { + if existingCheck.Name() == delegateCheck.Name() { + skip = true + break + } + } + if skip { + continue + } + + s.healthzChecks = append(s.healthzChecks, delegateCheck) + } + + s.listedPathProvider = routes.ListedPathProviders{s.listedPathProvider, delegationTarget} + + // use the UnprotectedHandler from the delegation target to ensure that we don't attempt to double authenticator, authorize, + // or some other part of the filter chain in delegation cases. + return c.buildHandlers(s, delegationTarget.UnprotectedHandler()) +} + +// buildHandlers builds our handling chain +func (c completedConfig) buildHandlers(s *GenericAPIServer, delegate http.Handler) (*GenericAPIServer, error) { if s.openAPIConfig != nil { if s.openAPIConfig.Info == nil { s.openAPIConfig.Info = &spec.Info{} @@ -423,7 +477,7 @@ func (c completedConfig) New() (*GenericAPIServer, error) { } } - s.installAPI(c.Config) + installAPI(s, c.Config, delegate) s.Handler, s.InsecureHandler = c.BuildHandlerChainsFunc(s.HandlerContainer.ServeMux, c.Config) @@ -454,9 +508,15 @@ func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) (secure, insec return generic(protect(apiHandler)), generic(audit(apiHandler)) } -func (s *GenericAPIServer) installAPI(c *Config) { - if c.EnableIndex { - routes.Index{}.Install(s.HandlerContainer, s.FallThroughHandler) +func installAPI(s *GenericAPIServer, c *Config, delegate http.Handler) { + switch { + case c.EnableIndex: + routes.Index{}.Install(s.listedPathProvider, c.FallThroughHandler, delegate) + + case delegate != nil: + // if we have a delegate, allow it to handle everything that's unmatched even if + // the index is disabled. + s.FallThroughHandler.UnlistedHandleFunc("/", delegate.ServeHTTP) } if c.SwaggerConfig != nil && c.EnableSwaggerUI { routes.SwaggerUI{}.Install(s.FallThroughHandler) diff --git a/staging/src/k8s.io/apiserver/pkg/server/config_test.go b/staging/src/k8s.io/apiserver/pkg/server/config_test.go new file mode 100644 index 00000000000..2902f5279c3 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/server/config_test.go @@ -0,0 +1,152 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package server + +import ( + "fmt" + "io/ioutil" + "net" + "net/http" + "net/http/httptest" + "net/http/httputil" + "testing" + + "k8s.io/apimachinery/pkg/util/sets" + genericapirequest "k8s.io/apiserver/pkg/endpoints/request" + "k8s.io/apiserver/pkg/server/healthz" + "k8s.io/apiserver/pkg/server/mux" + "k8s.io/client-go/rest" +) + +func TestNewWithDelegate(t *testing.T) { + delegateConfig := NewConfig().WithSerializer(codecs) + delegateConfig.PublicAddress = net.ParseIP("192.168.10.4") + delegateConfig.RequestContextMapper = genericapirequest.NewRequestContextMapper() + delegateConfig.LegacyAPIGroupPrefixes = sets.NewString("/api") + delegateConfig.LoopbackClientConfig = &rest.Config{} + delegateConfig.FallThroughHandler = mux.NewPathRecorderMux() + delegateConfig.SwaggerConfig = DefaultSwaggerConfig() + + delegateHealthzCalled := false + delegateConfig.HealthzChecks = append(delegateConfig.HealthzChecks, healthz.NamedCheck("delegate-health", func(r *http.Request) error { + delegateHealthzCalled = true + return fmt.Errorf("delegate failed healthcheck") + })) + + delegateConfig.FallThroughHandler.HandleFunc("/foo", func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusForbidden) + }) + + delegateServer, err := delegateConfig.SkipComplete().New() + if err != nil { + t.Fatal(err) + } + + delegateServer.AddPostStartHook("delegate-post-start-hook", func(context PostStartHookContext) error { + return nil + }) + + // this wires up swagger + delegateServer.PrepareRun() + + wrappingConfig := NewConfig().WithSerializer(codecs) + wrappingConfig.PublicAddress = net.ParseIP("192.168.10.4") + wrappingConfig.RequestContextMapper = genericapirequest.NewRequestContextMapper() + wrappingConfig.LegacyAPIGroupPrefixes = sets.NewString("/api") + wrappingConfig.LoopbackClientConfig = &rest.Config{} + wrappingConfig.FallThroughHandler = mux.NewPathRecorderMux() + wrappingConfig.SwaggerConfig = DefaultSwaggerConfig() + + wrappingHealthzCalled := false + wrappingConfig.HealthzChecks = append(wrappingConfig.HealthzChecks, healthz.NamedCheck("wrapping-health", func(r *http.Request) error { + wrappingHealthzCalled = true + return fmt.Errorf("wrapping failed healthcheck") + })) + + wrappingConfig.FallThroughHandler.HandleFunc("/bar", func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusUnauthorized) + }) + + wrappingServer, err := wrappingConfig.Complete().NewWithDelegate(delegateServer) + if err != nil { + t.Fatal(err) + } + + wrappingServer.AddPostStartHook("wrapping-post-start-hook", func(context PostStartHookContext) error { + return nil + }) + + stopCh := make(chan struct{}) + defer close(stopCh) + wrappingServer.PrepareRun() + wrappingServer.RunPostStartHooks() + + server := httptest.NewServer(wrappingServer.Handler) + defer server.Close() + + checkPath(server.URL, http.StatusOK, `{ + "paths": [ + "/apis", + "/bar", + "/foo", + "/healthz", + "/healthz/delegate-health", + "/healthz/ping", + "/healthz/poststarthook/delegate-post-start-hook", + "/healthz/poststarthook/wrapping-post-start-hook", + "/healthz/wrapping-health", + "/swaggerapi/" + ] +}`, t) + checkPath(server.URL+"/healthz", http.StatusInternalServerError, `[+]ping ok +[-]wrapping-health failed: reason withheld +[-]delegate-health failed: reason withheld +[+]poststarthook/delegate-post-start-hook ok +[+]poststarthook/wrapping-post-start-hook ok +healthz check failed +`, t) + + checkPath(server.URL+"/healthz/delegate-health", http.StatusInternalServerError, `internal server error: delegate failed healthcheck +`, t) + checkPath(server.URL+"/healthz/wrapping-health", http.StatusInternalServerError, `internal server error: wrapping failed healthcheck +`, t) + checkPath(server.URL+"/healthz/poststarthook/delegate-post-start-hook", http.StatusOK, `ok`, t) + checkPath(server.URL+"/healthz/poststarthook/wrapping-post-start-hook", http.StatusOK, `ok`, t) + checkPath(server.URL+"/foo", http.StatusForbidden, ``, t) + checkPath(server.URL+"/bar", http.StatusUnauthorized, ``, t) +} + +func checkPath(url string, expectedStatusCode int, expectedBody string, t *testing.T) { + resp, err := http.Get(url) + if err != nil { + t.Fatal(err) + } + dump, _ := httputil.DumpResponse(resp, true) + t.Log(string(dump)) + + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + t.Fatal(err) + } + + if e, a := expectedBody, string(body); e != a { + t.Errorf("%q expected %v, got %v", url, e, a) + } + if e, a := expectedStatusCode, resp.StatusCode; e != a { + t.Errorf("%q expected %v, got %v", url, e, a) + } +} diff --git a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go index 68e15287295..489367e60e7 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go +++ b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go @@ -130,6 +130,9 @@ type GenericAPIServer struct { // It comes after all filters and the API handling FallThroughHandler *mux.PathRecorderMux + // listedPathProvider is a lister which provides the set of paths to show at / + listedPathProvider routes.ListedPathProvider + // Map storing information about all groups to be exposed in discovery response. // The map is from name to the group. apiGroupsForDiscoveryLock sync.RWMutex @@ -152,6 +155,39 @@ type GenericAPIServer struct { healthzCreated bool } +// DelegationTarget is an interface which allows for composition of API servers with top level handling that works +// as expected. +type DelegationTarget interface { + // UnprotectedHandler returns a handler that is NOT protected by a normal chain + UnprotectedHandler() http.Handler + + // RequestContextMapper returns the existing RequestContextMapper. Because we cannot rewire all existing + // uses of this function, this will be used in any delegating API server + RequestContextMapper() apirequest.RequestContextMapper + + // PostStartHooks returns the post-start hooks that need to be combined + PostStartHooks() map[string]postStartHookEntry + + // HealthzChecks returns the healthz checks that need to be combined + HealthzChecks() []healthz.HealthzChecker + + // ListedPaths returns the paths for supporting an index + ListedPaths() []string +} + +func (s *GenericAPIServer) UnprotectedHandler() http.Handler { + return s.HandlerContainer.ServeMux +} +func (s *GenericAPIServer) PostStartHooks() map[string]postStartHookEntry { + return s.postStartHooks +} +func (s *GenericAPIServer) HealthzChecks() []healthz.HealthzChecker { + return s.healthzChecks +} +func (s *GenericAPIServer) ListedPaths() []string { + return s.listedPathProvider.ListedPaths() +} + func init() { // Send correct mime type for .svg files. // TODO: remove when https://github.com/golang/go/commit/21e47d831bafb59f22b1ea8098f709677ec8ce33 diff --git a/staging/src/k8s.io/apiserver/pkg/server/mux/container.go b/staging/src/k8s.io/apiserver/pkg/server/mux/container.go index 4cd7614f227..f2598e1a269 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/mux/container.go +++ b/staging/src/k8s.io/apiserver/pkg/server/mux/container.go @@ -21,6 +21,7 @@ import ( "fmt" "net/http" rt "runtime" + "sort" "github.com/emicklei/go-restful" "github.com/golang/glog" @@ -58,6 +59,18 @@ func NewAPIContainer(mux *http.ServeMux, s runtime.NegotiatedSerializer, default return &c } +// ListedPaths returns the paths of the webservices for listing on /. +func (c *APIContainer) ListedPaths() []string { + var handledPaths []string + // Extract the paths handled using restful.WebService + for _, ws := range c.RegisteredWebServices() { + handledPaths = append(handledPaths, ws.RootPath()) + } + sort.Strings(handledPaths) + + return handledPaths +} + //TODO: Unify with RecoverPanics? func logStackOnRecover(s runtime.NegotiatedSerializer, panicReason interface{}, w http.ResponseWriter) { var buffer bytes.Buffer diff --git a/staging/src/k8s.io/apiserver/pkg/server/mux/pathrecorder.go b/staging/src/k8s.io/apiserver/pkg/server/mux/pathrecorder.go index 7c1ed2a7d10..3efd232263f 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/mux/pathrecorder.go +++ b/staging/src/k8s.io/apiserver/pkg/server/mux/pathrecorder.go @@ -20,6 +20,7 @@ import ( "fmt" "net/http" "runtime/debug" + "sort" utilruntime "k8s.io/apimachinery/pkg/util/runtime" ) @@ -42,9 +43,12 @@ func NewPathRecorderMux() *PathRecorderMux { } } -// HandledPaths returns the registered handler exposedPaths. -func (m *PathRecorderMux) HandledPaths() []string { - return append([]string{}, m.exposedPaths...) +// ListedPaths returns the registered handler exposedPaths. +func (m *PathRecorderMux) ListedPaths() []string { + handledPaths := append([]string{}, m.exposedPaths...) + sort.Strings(handledPaths) + + return handledPaths } // Handle registers the handler for the given pattern. diff --git a/staging/src/k8s.io/apiserver/pkg/server/mux/pathrecorder_test.go b/staging/src/k8s.io/apiserver/pkg/server/mux/pathrecorder_test.go index c6865c88600..3d7e6b61081 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/mux/pathrecorder_test.go +++ b/staging/src/k8s.io/apiserver/pkg/server/mux/pathrecorder_test.go @@ -27,6 +27,6 @@ func TestSecretHandlers(t *testing.T) { c := NewPathRecorderMux() c.UnlistedHandleFunc("/secret", func(http.ResponseWriter, *http.Request) {}) c.HandleFunc("/nonswagger", func(http.ResponseWriter, *http.Request) {}) - assert.NotContains(t, c.HandledPaths(), "/secret") - assert.Contains(t, c.HandledPaths(), "/nonswagger") + assert.NotContains(t, c.ListedPaths(), "/secret") + assert.Contains(t, c.ListedPaths(), "/nonswagger") } diff --git a/staging/src/k8s.io/apiserver/pkg/server/routes/index.go b/staging/src/k8s.io/apiserver/pkg/server/routes/index.go index b535ff0ac73..2b7c1ea6c46 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/routes/index.go +++ b/staging/src/k8s.io/apiserver/pkg/server/routes/index.go @@ -18,33 +18,52 @@ package routes import ( "net/http" - "sort" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apiserver/pkg/endpoints/handlers/responsewriters" "k8s.io/apiserver/pkg/server/mux" ) +// ListedPathProvider is an interface for providing paths that should be reported at /. +type ListedPathProvider interface { + // ListedPaths is an alphabetically sorted list of paths to be reported at /. + ListedPaths() []string +} + +// ListedPathProviders is a convenient way to combine multiple ListedPathProviders +type ListedPathProviders []ListedPathProvider + +// ListedPaths unions and sorts the included paths. +func (p ListedPathProviders) ListedPaths() []string { + ret := sets.String{} + for _, provider := range p { + for _, path := range provider.ListedPaths() { + ret.Insert(path) + } + } + + return ret.List() +} + // Index provides a webservice for the http root / listing all known paths. type Index struct{} // Install adds the Index webservice to the given mux. -func (i Index) Install(c *mux.APIContainer, mux *mux.PathRecorderMux) { +func (i Index) Install(pathProvider ListedPathProvider, mux *mux.PathRecorderMux, delegate http.Handler) { mux.UnlistedHandleFunc("/", func(w http.ResponseWriter, r *http.Request) { status := http.StatusOK if r.URL.Path != "/" && r.URL.Path != "/index.html" { // Since "/" matches all paths, handleIndex is called for all paths for which there is no handler api.Registry. - // We want to return a 404 status with a list of all valid paths, incase of an invalid URL request. + // if we have a delegate, we should call to it and simply return + if delegate != nil { + delegate.ServeHTTP(w, r) + return + } + + // If we have no delegate, we want to return a 404 status with a list of all valid paths, incase of an invalid URL request. status = http.StatusNotFound } - var handledPaths []string - // Extract the paths handled using restful.WebService - for _, ws := range c.RegisteredWebServices() { - handledPaths = append(handledPaths, ws.RootPath()) - } - // Extract the paths handled using mux handler. - handledPaths = append(handledPaths, mux.HandledPaths()...) - sort.Strings(handledPaths) - responsewriters.WriteRawJSON(status, metav1.RootPaths{Paths: handledPaths}, w) + responsewriters.WriteRawJSON(status, metav1.RootPaths{Paths: pathProvider.ListedPaths()}, w) }) } diff --git a/vendor/BUILD b/vendor/BUILD index bde45731420..2db551e382e 100644 --- a/vendor/BUILD +++ b/vendor/BUILD @@ -10467,7 +10467,10 @@ go_library( go_test( name = "k8s.io/apiserver/pkg/server_test", - srcs = ["k8s.io/apiserver/pkg/server/genericapiserver_test.go"], + srcs = [ + "k8s.io/apiserver/pkg/server/config_test.go", + "k8s.io/apiserver/pkg/server/genericapiserver_test.go", + ], library = ":k8s.io/apiserver/pkg/server", tags = ["automanaged"], deps = [ @@ -10488,6 +10491,7 @@ go_test( "//vendor:k8s.io/apiserver/pkg/authorization/authorizer", "//vendor:k8s.io/apiserver/pkg/endpoints/request", "//vendor:k8s.io/apiserver/pkg/registry/rest", + "//vendor:k8s.io/apiserver/pkg/server/healthz", "//vendor:k8s.io/apiserver/pkg/server/mux", "//vendor:k8s.io/apiserver/pkg/storage/etcd/testing", "//vendor:k8s.io/client-go/rest", @@ -10767,6 +10771,7 @@ go_library( "//vendor:github.com/prometheus/client_golang/prometheus", "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", "//vendor:k8s.io/apimachinery/pkg/openapi", + "//vendor:k8s.io/apimachinery/pkg/util/sets", "//vendor:k8s.io/apimachinery/pkg/version", "//vendor:k8s.io/apiserver/pkg/endpoints/handlers/responsewriters", "//vendor:k8s.io/apiserver/pkg/endpoints/metrics",