From f82bc712decd41db48cd81aded5ddddf7bca9e16 Mon Sep 17 00:00:00 2001 From: "Dr. Stefan Schimanski" Date: Mon, 8 Jul 2019 11:37:00 +0200 Subject: [PATCH] aggregator: wire OpenAPI correctly into PrepareRun flow --- cmd/kube-apiserver/app/server.go | 11 +++- cmd/kube-apiserver/app/testing/testserver.go | 23 ++++--- hack/.golint_failures | 1 + .../kube-aggregator/pkg/apiserver/BUILD | 1 + .../pkg/apiserver/apiserver.go | 60 ++++++++++++++----- test/integration/etcd/server.go | 8 ++- 6 files changed, 76 insertions(+), 28 deletions(-) diff --git a/cmd/kube-apiserver/app/server.go b/cmd/kube-apiserver/app/server.go index 027dbfd7acc..a8b537891ad 100644 --- a/cmd/kube-apiserver/app/server.go +++ b/cmd/kube-apiserver/app/server.go @@ -153,11 +153,16 @@ func Run(completeOptions completedServerRunOptions, stopCh <-chan struct{}) erro return err } - return server.PrepareRun().Run(stopCh) + prepared, err := server.PrepareRun() + if err != nil { + return err + } + + return prepared.Run(stopCh) } // CreateServerChain creates the apiservers connected via delegation. -func CreateServerChain(completedOptions completedServerRunOptions, stopCh <-chan struct{}) (*genericapiserver.GenericAPIServer, error) { +func CreateServerChain(completedOptions completedServerRunOptions, stopCh <-chan struct{}) (*aggregatorapiserver.APIAggregator, error) { nodeTunneler, proxyTransport, err := CreateNodeDialer(completedOptions) if err != nil { return nil, err @@ -202,7 +207,7 @@ func CreateServerChain(completedOptions completedServerRunOptions, stopCh <-chan } } - return aggregatorServer.GenericAPIServer, nil + return aggregatorServer, nil } // CreateKubeAPIServer creates and wires a workable kube-apiserver diff --git a/cmd/kube-apiserver/app/testing/testserver.go b/cmd/kube-apiserver/app/testing/testserver.go index 6d3404ed600..6dd6d897c3d 100644 --- a/cmd/kube-apiserver/app/testing/testserver.go +++ b/cmd/kube-apiserver/app/testing/testserver.go @@ -147,25 +147,30 @@ func StartTestServer(t Logger, instanceOptions *TestServerInstanceOptions, custo t.Logf("runtime-config=%v", completedOptions.APIEnablement.RuntimeConfig) t.Logf("Starting kube-apiserver on port %d...", s.SecureServing.BindPort) server, err := app.CreateServerChain(completedOptions, stopCh) - - if instanceOptions.InjectedHealthzChecker != nil { - t.Logf("Adding health check with delay %v %v", s.GenericServerRunOptions.MaxStartupSequenceDuration, instanceOptions.InjectedHealthzChecker.Name()) - server.AddDelayedHealthzChecks(s.GenericServerRunOptions.MaxStartupSequenceDuration, instanceOptions.InjectedHealthzChecker) - } - if err != nil { return result, fmt.Errorf("failed to create server chain: %v", err) } + + if instanceOptions.InjectedHealthzChecker != nil { + t.Logf("Adding health check with delay %v %v", s.GenericServerRunOptions.MaxStartupSequenceDuration, instanceOptions.InjectedHealthzChecker.Name()) + if err := server.GenericAPIServer.AddDelayedHealthzChecks(s.GenericServerRunOptions.MaxStartupSequenceDuration, instanceOptions.InjectedHealthzChecker); err != nil { + return result, err + } + } + errCh := make(chan error) go func(stopCh <-chan struct{}) { - if err := server.PrepareRun().Run(stopCh); err != nil { + prepared, err := server.PrepareRun() + if err != nil { + errCh <- err + } else if err := prepared.Run(stopCh); err != nil { errCh <- err } }(stopCh) t.Logf("Waiting for /healthz to be ok...") - client, err := kubernetes.NewForConfig(server.LoopbackClientConfig) + client, err := kubernetes.NewForConfig(server.GenericAPIServer.LoopbackClientConfig) if err != nil { return result, fmt.Errorf("failed to create a client: %v", err) } @@ -211,7 +216,7 @@ func StartTestServer(t Logger, instanceOptions *TestServerInstanceOptions, custo } // from here the caller must call tearDown - result.ClientConfig = server.LoopbackClientConfig + result.ClientConfig = server.GenericAPIServer.LoopbackClientConfig result.ClientConfig.QPS = 1000 result.ClientConfig.Burst = 10000 result.ServerOpts = s diff --git a/hack/.golint_failures b/hack/.golint_failures index dec185def3f..833ba16772f 100644 --- a/hack/.golint_failures +++ b/hack/.golint_failures @@ -535,6 +535,7 @@ staging/src/k8s.io/component-base/featuregate staging/src/k8s.io/cri-api/pkg/apis/testing staging/src/k8s.io/kube-aggregator/pkg/apis/apiregistration/v1 staging/src/k8s.io/kube-aggregator/pkg/apis/apiregistration/v1beta1 +staging/src/k8s.io/kube-aggregator/pkg/apiserver staging/src/k8s.io/kube-aggregator/pkg/controllers/autoregister staging/src/k8s.io/kube-proxy/config/v1alpha1 staging/src/k8s.io/kubectl/pkg/util/templates diff --git a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/BUILD b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/BUILD index 4cca019e819..5cd556ab4ce 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/BUILD +++ b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/BUILD @@ -83,6 +83,7 @@ go_library( "//staging/src/k8s.io/kube-aggregator/pkg/controllers/status:go_default_library", "//staging/src/k8s.io/kube-aggregator/pkg/registry/apiservice/rest:go_default_library", "//vendor/k8s.io/klog:go_default_library", + "//vendor/k8s.io/kube-openapi/pkg/common:go_default_library", ], ) diff --git a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go index d3dc5cf65ba..5186addb9ef 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go @@ -26,9 +26,10 @@ import ( genericapiserver "k8s.io/apiserver/pkg/server" serverstorage "k8s.io/apiserver/pkg/server/storage" "k8s.io/client-go/pkg/version" + openapicommon "k8s.io/kube-openapi/pkg/common" "k8s.io/kube-aggregator/pkg/apis/apiregistration" - "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1" + v1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1" "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1beta1" aggregatorscheme "k8s.io/kube-aggregator/pkg/apiserver/scheme" "k8s.io/kube-aggregator/pkg/client/clientset_generated/internalclientset" @@ -89,6 +90,16 @@ type CompletedConfig struct { *completedConfig } +type runnable interface { + Run(stopCh <-chan struct{}) error +} + +// preparedGenericAPIServer is a private wrapper that enforces a call of PrepareRun() before Run can be invoked. +type preparedAPIAggregator struct { + *APIAggregator + runnable runnable +} + // APIAggregator contains state for a Kubernetes cluster master/api server. type APIAggregator struct { GenericAPIServer *genericapiserver.GenericAPIServer @@ -116,6 +127,10 @@ type APIAggregator struct { // Information needed to determine routing for the aggregator serviceResolver ServiceResolver + // Enable swagger and/or OpenAPI if these configs are non-nil. + openAPIConfig *openapicommon.Config + + // openAPIAggregationController downloads and merges OpenAPI specs. openAPIAggregationController *openapicontroller.AggregationController } @@ -167,6 +182,7 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg lister: informerFactory.Apiregistration().InternalVersion().APIServices().Lister(), APIRegistrationInformers: informerFactory, serviceResolver: c.ExtraConfig.ServiceResolver, + openAPIConfig: openAPIConfig, } apiGroupInfo := apiservicerest.NewRESTStorage(c.GenericConfig.MergedResourceConfig, c.GenericConfig.RESTOptionsGetter) @@ -211,26 +227,42 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg return nil }) - if openAPIConfig != nil { - specDownloader := openapiaggregator.NewDownloader() - openAPIAggregator, err := openapiaggregator.BuildAndRegisterAggregator( - &specDownloader, - delegationTarget, - s.GenericAPIServer.Handler.GoRestfulContainer.RegisteredWebServices(), - openAPIConfig, - s.GenericAPIServer.Handler.NonGoRestfulMux) - if err != nil { - return nil, err - } - s.openAPIAggregationController = openapicontroller.NewAggregationController(&specDownloader, openAPIAggregator) + return s, nil +} +// PrepareRun prepares the aggregator to run, by setting up the OpenAPI spec and calling +// the generic PrepareRun. +func (s *APIAggregator) PrepareRun() (preparedAPIAggregator, error) { + // add post start hook before generic PrepareRun in order to be before /healthz installation + if s.openAPIConfig != nil { s.GenericAPIServer.AddPostStartHookOrDie("apiservice-openapi-controller", func(context genericapiserver.PostStartHookContext) error { go s.openAPIAggregationController.Run(context.StopCh) return nil }) } - return s, nil + prepared := s.GenericAPIServer.PrepareRun() + + // delay OpenAPI setup until the delegate had a chance to setup their OpenAPI handlers + if s.openAPIConfig != nil { + specDownloader := openapiaggregator.NewDownloader() + openAPIAggregator, err := openapiaggregator.BuildAndRegisterAggregator( + &specDownloader, + s.GenericAPIServer.NextDelegate(), + s.GenericAPIServer.Handler.GoRestfulContainer.RegisteredWebServices(), + s.openAPIConfig, + s.GenericAPIServer.Handler.NonGoRestfulMux) + if err != nil { + return preparedAPIAggregator{}, err + } + s.openAPIAggregationController = openapicontroller.NewAggregationController(&specDownloader, openAPIAggregator) + } + + return preparedAPIAggregator{APIAggregator: s, runnable: prepared}, nil +} + +func (s preparedAPIAggregator) Run(stopCh <-chan struct{}) error { + return s.runnable.Run(stopCh) } // AddAPIService adds an API service. It is not thread-safe, so only call it on one thread at a time please. diff --git a/test/integration/etcd/server.go b/test/integration/etcd/server.go index 585bd19e330..2d8441b84c7 100644 --- a/test/integration/etcd/server.go +++ b/test/integration/etcd/server.go @@ -117,7 +117,7 @@ func StartRealMasterOrDie(t *testing.T, configFuncs ...func(*options.ServerRunOp t.Fatal(err) } - kubeClientConfig := restclient.CopyConfig(kubeAPIServer.LoopbackClientConfig) + kubeClientConfig := restclient.CopyConfig(kubeAPIServer.GenericAPIServer.LoopbackClientConfig) // we make lots of requests, don't be slow kubeClientConfig.QPS = 99999 @@ -133,7 +133,11 @@ func StartRealMasterOrDie(t *testing.T, configFuncs ...func(*options.ServerRunOp } }() - if err := kubeAPIServer.PrepareRun().Run(stopCh); err != nil { + prepared, err := kubeAPIServer.PrepareRun() + if err != nil { + t.Fatal(err) + } + if err := prepared.Run(stopCh); err != nil { t.Fatal(err) } }()