STRUCTURE: cmd/kube-apiserver/app: stratify construction with options/config/server

This commit is contained in:
Dr. Stefan Schimanski 2023-06-01 17:04:05 +02:00
parent e6ed4c7934
commit 9be6e7bb33
No known key found for this signature in database
GPG Key ID: 4C68E0F19F95EC33
5 changed files with 130 additions and 29 deletions

View File

@ -116,8 +116,8 @@ func createAggregatorConfig(
return aggregatorConfig, nil
}
func createAggregatorServer(aggregatorConfig *aggregatorapiserver.Config, delegateAPIServer genericapiserver.DelegationTarget, apiExtensionInformers apiextensionsinformers.SharedInformerFactory, crdAPIEnabled bool) (*aggregatorapiserver.APIAggregator, error) {
aggregatorServer, err := aggregatorConfig.Complete().NewWithDelegate(delegateAPIServer)
func createAggregatorServer(aggregatorConfig aggregatorapiserver.CompletedConfig, delegateAPIServer genericapiserver.DelegationTarget, apiExtensionInformers apiextensionsinformers.SharedInformerFactory, crdAPIEnabled bool) (*aggregatorapiserver.APIAggregator, error) {
aggregatorServer, err := aggregatorConfig.NewWithDelegate(delegateAPIServer)
if err != nil {
return nil, err
}

View File

@ -0,0 +1,93 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package app
import (
apiextensionsapiserver "k8s.io/apiextensions-apiserver/pkg/apiserver"
"k8s.io/apiserver/pkg/util/webhook"
aggregatorapiserver "k8s.io/kube-aggregator/pkg/apiserver"
"k8s.io/kubernetes/pkg/controlplane"
"k8s.io/kubernetes/pkg/controlplane/apiserver"
)
type Config struct {
Options completedServerRunOptions
Aggregator *aggregatorapiserver.Config
ControlPlane *controlplane.Config
ApiExtensions *apiextensionsapiserver.Config
ExtraConfig
}
type ExtraConfig struct {
}
type completedConfig struct {
Options completedServerRunOptions
Aggregator aggregatorapiserver.CompletedConfig
ControlPlane controlplane.CompletedConfig
ApiExtensions apiextensionsapiserver.CompletedConfig
ExtraConfig
}
type CompletedConfig struct {
// Embed a private pointer that cannot be instantiated outside of this package.
*completedConfig
}
func (c *Config) Complete() (CompletedConfig, error) {
return CompletedConfig{&completedConfig{
Options: c.Options,
Aggregator: c.Aggregator.Complete(),
ControlPlane: c.ControlPlane.Complete(),
ApiExtensions: c.ApiExtensions.Complete(),
ExtraConfig: c.ExtraConfig,
}}, nil
}
// NewConfig creates all the resources for running kube-apiserver, but runs none of them.
func NewConfig(opts completedServerRunOptions) (*Config, error) {
c := &Config{
Options: opts,
}
controlPlane, serviceResolver, pluginInitializer, err := CreateKubeAPIServerConfig(opts)
if err != nil {
return nil, err
}
c.ControlPlane = controlPlane
apiExtensions, err := apiserver.CreateAPIExtensionsConfig(*controlPlane.GenericConfig, controlPlane.ExtraConfig.VersionedInformers, pluginInitializer, opts.ServerRunOptions, opts.MasterCount,
serviceResolver, webhook.NewDefaultAuthenticationInfoResolverWrapper(controlPlane.ExtraConfig.ProxyTransport, controlPlane.GenericConfig.EgressSelector, controlPlane.GenericConfig.LoopbackClientConfig, controlPlane.GenericConfig.TracerProvider))
if err != nil {
return nil, err
}
c.ApiExtensions = apiExtensions
aggregator, err := createAggregatorConfig(*controlPlane.GenericConfig, opts.ServerRunOptions, controlPlane.ExtraConfig.VersionedInformers, serviceResolver, controlPlane.ExtraConfig.ProxyTransport, pluginInitializer)
if err != nil {
return nil, err
}
c.Aggregator = aggregator
return c, nil
}

View File

@ -79,7 +79,6 @@ import (
"k8s.io/kubernetes/pkg/api/legacyscheme"
"k8s.io/kubernetes/pkg/capabilities"
"k8s.io/kubernetes/pkg/controlplane"
"k8s.io/kubernetes/pkg/controlplane/apiserver"
"k8s.io/kubernetes/pkg/controlplane/reconcilers"
generatedopenapi "k8s.io/kubernetes/pkg/generated/openapi"
"k8s.io/kubernetes/pkg/kubeapiserver"
@ -163,13 +162,21 @@ cluster's shared state through which all other components interact.`,
}
// Run runs the specified APIServer. This should never exit.
func Run(completeOptions completedServerRunOptions, stopCh <-chan struct{}) error {
func Run(options completedServerRunOptions, stopCh <-chan struct{}) error {
// To help debugging, immediately log version
klog.Infof("Version: %+v", version.Get())
klog.InfoS("Golang settings", "GOGC", os.Getenv("GOGC"), "GOMAXPROCS", os.Getenv("GOMAXPROCS"), "GOTRACEBACK", os.Getenv("GOTRACEBACK"))
server, err := CreateServerChain(completeOptions)
config, err := NewConfig(options)
if err != nil {
return err
}
completed, err := config.Complete()
if err != nil {
return err
}
server, err := CreateServerChain(completed)
if err != nil {
return err
}
@ -183,37 +190,21 @@ func Run(completeOptions completedServerRunOptions, stopCh <-chan struct{}) erro
}
// CreateServerChain creates the apiservers connected via delegation.
func CreateServerChain(completedOptions completedServerRunOptions) (*aggregatorapiserver.APIAggregator, error) {
kubeAPIServerConfig, serviceResolver, pluginInitializer, err := CreateKubeAPIServerConfig(completedOptions)
func CreateServerChain(config CompletedConfig) (*aggregatorapiserver.APIAggregator, error) {
notFoundHandler := notfoundhandler.New(config.ControlPlane.GenericConfig.Serializer, genericapifilters.NoMuxAndDiscoveryIncompleteKey)
apiExtensionsServer, err := config.ApiExtensions.New(genericapiserver.NewEmptyDelegateWithCustomHandler(notFoundHandler))
if err != nil {
return nil, err
}
crdAPIEnabled := config.ApiExtensions.GenericConfig.MergedResourceConfig.ResourceEnabled(apiextensionsv1.SchemeGroupVersion.WithResource("customresourcedefinitions"))
// If additional API servers are added, they should be gated.
apiExtensionsConfig, err := apiserver.CreateAPIExtensionsConfig(*kubeAPIServerConfig.GenericConfig, kubeAPIServerConfig.ExtraConfig.VersionedInformers, pluginInitializer, completedOptions.ServerRunOptions, completedOptions.MasterCount,
serviceResolver, webhook.NewDefaultAuthenticationInfoResolverWrapper(kubeAPIServerConfig.ExtraConfig.ProxyTransport, kubeAPIServerConfig.GenericConfig.EgressSelector, kubeAPIServerConfig.GenericConfig.LoopbackClientConfig, kubeAPIServerConfig.GenericConfig.TracerProvider))
if err != nil {
return nil, err
}
crdAPIEnabled := apiExtensionsConfig.GenericConfig.MergedResourceConfig.ResourceEnabled(apiextensionsv1.SchemeGroupVersion.WithResource("customresourcedefinitions"))
notFoundHandler := notfoundhandler.New(kubeAPIServerConfig.GenericConfig.Serializer, genericapifilters.NoMuxAndDiscoveryIncompleteKey)
apiExtensionsServer, err := apiExtensionsConfig.Complete().New(genericapiserver.NewEmptyDelegateWithCustomHandler(notFoundHandler))
if err != nil {
return nil, err
}
kubeAPIServer, err := kubeAPIServerConfig.Complete().New(apiExtensionsServer.GenericAPIServer)
kubeAPIServer, err := config.ControlPlane.New(apiExtensionsServer.GenericAPIServer)
if err != nil {
return nil, err
}
// aggregator comes last in the chain
aggregatorConfig, err := createAggregatorConfig(*kubeAPIServerConfig.GenericConfig, completedOptions.ServerRunOptions, kubeAPIServerConfig.ExtraConfig.VersionedInformers, serviceResolver, kubeAPIServerConfig.ExtraConfig.ProxyTransport, pluginInitializer)
if err != nil {
return nil, err
}
aggregatorServer, err := createAggregatorServer(aggregatorConfig, kubeAPIServer.GenericAPIServer, apiExtensionsServer.Informers, crdAPIEnabled)
aggregatorServer, err := createAggregatorServer(config.Aggregator, kubeAPIServer.GenericAPIServer, apiExtensionsServer.Informers, crdAPIEnabled)
if err != nil {
// we don't need special handling for innerStopCh because the aggregator server doesn't create any go routines
return nil, err

View File

@ -246,7 +246,16 @@ func StartTestServer(t Logger, instanceOptions *TestServerInstanceOptions, custo
t.Logf("runtime-config=%v", completedOptions.APIEnablement.RuntimeConfig)
t.Logf("Starting kube-apiserver on port %d...", s.SecureServing.BindPort)
server, err := app.CreateServerChain(completedOptions)
config, err := app.NewConfig(completedOptions)
if err != nil {
return result, err
}
completed, err := config.Complete()
if err != nil {
return result, err
}
server, err := app.CreateServerChain(completed)
if err != nil {
return result, fmt.Errorf("failed to create server chain: %v", err)
}

View File

@ -124,7 +124,15 @@ func StartRealAPIServerOrDie(t *testing.T, configFuncs ...func(*options.ServerRu
t.Fatal(err)
}
kubeAPIServer, err := app.CreateServerChain(completedOptions)
config, err := app.NewConfig(completedOptions)
if err != nil {
t.Fatal(err)
}
completed, err := config.Complete()
if err != nil {
t.Fatal(err)
}
kubeAPIServer, err := app.CreateServerChain(completed)
if err != nil {
t.Fatal(err)
}