aggregator: wire OpenAPI correctly into PrepareRun flow

This commit is contained in:
Dr. Stefan Schimanski 2019-07-08 11:37:00 +02:00
parent 7c4329ed45
commit f82bc712de
6 changed files with 76 additions and 28 deletions

View File

@ -153,11 +153,16 @@ func Run(completeOptions completedServerRunOptions, stopCh <-chan struct{}) erro
return err return err
} }
return server.PrepareRun().Run(stopCh) prepared, err := server.PrepareRun()
if err != nil {
return err
}
return prepared.Run(stopCh)
} }
// CreateServerChain creates the apiservers connected via delegation. // CreateServerChain creates the apiservers connected via delegation.
func CreateServerChain(completedOptions completedServerRunOptions, stopCh <-chan struct{}) (*genericapiserver.GenericAPIServer, error) { func CreateServerChain(completedOptions completedServerRunOptions, stopCh <-chan struct{}) (*aggregatorapiserver.APIAggregator, error) {
nodeTunneler, proxyTransport, err := CreateNodeDialer(completedOptions) nodeTunneler, proxyTransport, err := CreateNodeDialer(completedOptions)
if err != nil { if err != nil {
return nil, err return nil, err
@ -202,7 +207,7 @@ func CreateServerChain(completedOptions completedServerRunOptions, stopCh <-chan
} }
} }
return aggregatorServer.GenericAPIServer, nil return aggregatorServer, nil
} }
// CreateKubeAPIServer creates and wires a workable kube-apiserver // CreateKubeAPIServer creates and wires a workable kube-apiserver

View File

@ -147,25 +147,30 @@ func StartTestServer(t Logger, instanceOptions *TestServerInstanceOptions, custo
t.Logf("runtime-config=%v", completedOptions.APIEnablement.RuntimeConfig) t.Logf("runtime-config=%v", completedOptions.APIEnablement.RuntimeConfig)
t.Logf("Starting kube-apiserver on port %d...", s.SecureServing.BindPort) t.Logf("Starting kube-apiserver on port %d...", s.SecureServing.BindPort)
server, err := app.CreateServerChain(completedOptions, stopCh) server, err := app.CreateServerChain(completedOptions, stopCh)
if instanceOptions.InjectedHealthzChecker != nil {
t.Logf("Adding health check with delay %v %v", s.GenericServerRunOptions.MaxStartupSequenceDuration, instanceOptions.InjectedHealthzChecker.Name())
server.AddDelayedHealthzChecks(s.GenericServerRunOptions.MaxStartupSequenceDuration, instanceOptions.InjectedHealthzChecker)
}
if err != nil { if err != nil {
return result, fmt.Errorf("failed to create server chain: %v", err) return result, fmt.Errorf("failed to create server chain: %v", err)
} }
if instanceOptions.InjectedHealthzChecker != nil {
t.Logf("Adding health check with delay %v %v", s.GenericServerRunOptions.MaxStartupSequenceDuration, instanceOptions.InjectedHealthzChecker.Name())
if err := server.GenericAPIServer.AddDelayedHealthzChecks(s.GenericServerRunOptions.MaxStartupSequenceDuration, instanceOptions.InjectedHealthzChecker); err != nil {
return result, err
}
}
errCh := make(chan error) errCh := make(chan error)
go func(stopCh <-chan struct{}) { go func(stopCh <-chan struct{}) {
if err := server.PrepareRun().Run(stopCh); err != nil { prepared, err := server.PrepareRun()
if err != nil {
errCh <- err
} else if err := prepared.Run(stopCh); err != nil {
errCh <- err errCh <- err
} }
}(stopCh) }(stopCh)
t.Logf("Waiting for /healthz to be ok...") t.Logf("Waiting for /healthz to be ok...")
client, err := kubernetes.NewForConfig(server.LoopbackClientConfig) client, err := kubernetes.NewForConfig(server.GenericAPIServer.LoopbackClientConfig)
if err != nil { if err != nil {
return result, fmt.Errorf("failed to create a client: %v", err) return result, fmt.Errorf("failed to create a client: %v", err)
} }
@ -211,7 +216,7 @@ func StartTestServer(t Logger, instanceOptions *TestServerInstanceOptions, custo
} }
// from here the caller must call tearDown // from here the caller must call tearDown
result.ClientConfig = server.LoopbackClientConfig result.ClientConfig = server.GenericAPIServer.LoopbackClientConfig
result.ClientConfig.QPS = 1000 result.ClientConfig.QPS = 1000
result.ClientConfig.Burst = 10000 result.ClientConfig.Burst = 10000
result.ServerOpts = s result.ServerOpts = s

View File

@ -535,6 +535,7 @@ staging/src/k8s.io/component-base/featuregate
staging/src/k8s.io/cri-api/pkg/apis/testing staging/src/k8s.io/cri-api/pkg/apis/testing
staging/src/k8s.io/kube-aggregator/pkg/apis/apiregistration/v1 staging/src/k8s.io/kube-aggregator/pkg/apis/apiregistration/v1
staging/src/k8s.io/kube-aggregator/pkg/apis/apiregistration/v1beta1 staging/src/k8s.io/kube-aggregator/pkg/apis/apiregistration/v1beta1
staging/src/k8s.io/kube-aggregator/pkg/apiserver
staging/src/k8s.io/kube-aggregator/pkg/controllers/autoregister staging/src/k8s.io/kube-aggregator/pkg/controllers/autoregister
staging/src/k8s.io/kube-proxy/config/v1alpha1 staging/src/k8s.io/kube-proxy/config/v1alpha1
staging/src/k8s.io/kubectl/pkg/util/templates staging/src/k8s.io/kubectl/pkg/util/templates

View File

@ -83,6 +83,7 @@ go_library(
"//staging/src/k8s.io/kube-aggregator/pkg/controllers/status:go_default_library", "//staging/src/k8s.io/kube-aggregator/pkg/controllers/status:go_default_library",
"//staging/src/k8s.io/kube-aggregator/pkg/registry/apiservice/rest:go_default_library", "//staging/src/k8s.io/kube-aggregator/pkg/registry/apiservice/rest:go_default_library",
"//vendor/k8s.io/klog:go_default_library", "//vendor/k8s.io/klog:go_default_library",
"//vendor/k8s.io/kube-openapi/pkg/common:go_default_library",
], ],
) )

View File

@ -26,9 +26,10 @@ import (
genericapiserver "k8s.io/apiserver/pkg/server" genericapiserver "k8s.io/apiserver/pkg/server"
serverstorage "k8s.io/apiserver/pkg/server/storage" serverstorage "k8s.io/apiserver/pkg/server/storage"
"k8s.io/client-go/pkg/version" "k8s.io/client-go/pkg/version"
openapicommon "k8s.io/kube-openapi/pkg/common"
"k8s.io/kube-aggregator/pkg/apis/apiregistration" "k8s.io/kube-aggregator/pkg/apis/apiregistration"
"k8s.io/kube-aggregator/pkg/apis/apiregistration/v1" v1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
"k8s.io/kube-aggregator/pkg/apis/apiregistration/v1beta1" "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1beta1"
aggregatorscheme "k8s.io/kube-aggregator/pkg/apiserver/scheme" aggregatorscheme "k8s.io/kube-aggregator/pkg/apiserver/scheme"
"k8s.io/kube-aggregator/pkg/client/clientset_generated/internalclientset" "k8s.io/kube-aggregator/pkg/client/clientset_generated/internalclientset"
@ -89,6 +90,16 @@ type CompletedConfig struct {
*completedConfig *completedConfig
} }
type runnable interface {
Run(stopCh <-chan struct{}) error
}
// preparedGenericAPIServer is a private wrapper that enforces a call of PrepareRun() before Run can be invoked.
type preparedAPIAggregator struct {
*APIAggregator
runnable runnable
}
// APIAggregator contains state for a Kubernetes cluster master/api server. // APIAggregator contains state for a Kubernetes cluster master/api server.
type APIAggregator struct { type APIAggregator struct {
GenericAPIServer *genericapiserver.GenericAPIServer GenericAPIServer *genericapiserver.GenericAPIServer
@ -116,6 +127,10 @@ type APIAggregator struct {
// Information needed to determine routing for the aggregator // Information needed to determine routing for the aggregator
serviceResolver ServiceResolver serviceResolver ServiceResolver
// Enable swagger and/or OpenAPI if these configs are non-nil.
openAPIConfig *openapicommon.Config
// openAPIAggregationController downloads and merges OpenAPI specs.
openAPIAggregationController *openapicontroller.AggregationController openAPIAggregationController *openapicontroller.AggregationController
} }
@ -167,6 +182,7 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg
lister: informerFactory.Apiregistration().InternalVersion().APIServices().Lister(), lister: informerFactory.Apiregistration().InternalVersion().APIServices().Lister(),
APIRegistrationInformers: informerFactory, APIRegistrationInformers: informerFactory,
serviceResolver: c.ExtraConfig.ServiceResolver, serviceResolver: c.ExtraConfig.ServiceResolver,
openAPIConfig: openAPIConfig,
} }
apiGroupInfo := apiservicerest.NewRESTStorage(c.GenericConfig.MergedResourceConfig, c.GenericConfig.RESTOptionsGetter) apiGroupInfo := apiservicerest.NewRESTStorage(c.GenericConfig.MergedResourceConfig, c.GenericConfig.RESTOptionsGetter)
@ -211,26 +227,42 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg
return nil return nil
}) })
if openAPIConfig != nil { return s, nil
specDownloader := openapiaggregator.NewDownloader() }
openAPIAggregator, err := openapiaggregator.BuildAndRegisterAggregator(
&specDownloader,
delegationTarget,
s.GenericAPIServer.Handler.GoRestfulContainer.RegisteredWebServices(),
openAPIConfig,
s.GenericAPIServer.Handler.NonGoRestfulMux)
if err != nil {
return nil, err
}
s.openAPIAggregationController = openapicontroller.NewAggregationController(&specDownloader, openAPIAggregator)
// PrepareRun prepares the aggregator to run, by setting up the OpenAPI spec and calling
// the generic PrepareRun.
func (s *APIAggregator) PrepareRun() (preparedAPIAggregator, error) {
// add post start hook before generic PrepareRun in order to be before /healthz installation
if s.openAPIConfig != nil {
s.GenericAPIServer.AddPostStartHookOrDie("apiservice-openapi-controller", func(context genericapiserver.PostStartHookContext) error { s.GenericAPIServer.AddPostStartHookOrDie("apiservice-openapi-controller", func(context genericapiserver.PostStartHookContext) error {
go s.openAPIAggregationController.Run(context.StopCh) go s.openAPIAggregationController.Run(context.StopCh)
return nil return nil
}) })
} }
return s, nil prepared := s.GenericAPIServer.PrepareRun()
// delay OpenAPI setup until the delegate had a chance to setup their OpenAPI handlers
if s.openAPIConfig != nil {
specDownloader := openapiaggregator.NewDownloader()
openAPIAggregator, err := openapiaggregator.BuildAndRegisterAggregator(
&specDownloader,
s.GenericAPIServer.NextDelegate(),
s.GenericAPIServer.Handler.GoRestfulContainer.RegisteredWebServices(),
s.openAPIConfig,
s.GenericAPIServer.Handler.NonGoRestfulMux)
if err != nil {
return preparedAPIAggregator{}, err
}
s.openAPIAggregationController = openapicontroller.NewAggregationController(&specDownloader, openAPIAggregator)
}
return preparedAPIAggregator{APIAggregator: s, runnable: prepared}, nil
}
func (s preparedAPIAggregator) Run(stopCh <-chan struct{}) error {
return s.runnable.Run(stopCh)
} }
// AddAPIService adds an API service. It is not thread-safe, so only call it on one thread at a time please. // AddAPIService adds an API service. It is not thread-safe, so only call it on one thread at a time please.

View File

@ -117,7 +117,7 @@ func StartRealMasterOrDie(t *testing.T, configFuncs ...func(*options.ServerRunOp
t.Fatal(err) t.Fatal(err)
} }
kubeClientConfig := restclient.CopyConfig(kubeAPIServer.LoopbackClientConfig) kubeClientConfig := restclient.CopyConfig(kubeAPIServer.GenericAPIServer.LoopbackClientConfig)
// we make lots of requests, don't be slow // we make lots of requests, don't be slow
kubeClientConfig.QPS = 99999 kubeClientConfig.QPS = 99999
@ -133,7 +133,11 @@ func StartRealMasterOrDie(t *testing.T, configFuncs ...func(*options.ServerRunOp
} }
}() }()
if err := kubeAPIServer.PrepareRun().Run(stopCh); err != nil { prepared, err := kubeAPIServer.PrepareRun()
if err != nil {
t.Fatal(err)
}
if err := prepared.Run(stopCh); err != nil {
t.Fatal(err) t.Fatal(err)
} }
}() }()