Merge pull request #118379 from sttts/sttts-generic-controlplane-1

kube-apiserver/cmd: stratify construction to follow options/config/server pattern
This commit is contained in:
Kubernetes Prow Robot 2023-06-07 04:12:13 -07:00 committed by GitHub
commit c3750e4450
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 403 additions and 261 deletions

View File

@ -116,8 +116,8 @@ func createAggregatorConfig(
return aggregatorConfig, nil
}
func createAggregatorServer(aggregatorConfig *aggregatorapiserver.Config, delegateAPIServer genericapiserver.DelegationTarget, apiExtensionInformers apiextensionsinformers.SharedInformerFactory, crdAPIEnabled bool) (*aggregatorapiserver.APIAggregator, error) {
aggregatorServer, err := aggregatorConfig.Complete().NewWithDelegate(delegateAPIServer)
func createAggregatorServer(aggregatorConfig aggregatorapiserver.CompletedConfig, delegateAPIServer genericapiserver.DelegationTarget, apiExtensionInformers apiextensionsinformers.SharedInformerFactory, crdAPIEnabled bool) (*aggregatorapiserver.APIAggregator, error) {
aggregatorServer, err := aggregatorConfig.NewWithDelegate(delegateAPIServer)
if err != nil {
return nil, err
}

View File

@ -0,0 +1,93 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package app
import (
apiextensionsapiserver "k8s.io/apiextensions-apiserver/pkg/apiserver"
"k8s.io/apiserver/pkg/util/webhook"
aggregatorapiserver "k8s.io/kube-aggregator/pkg/apiserver"
"k8s.io/kubernetes/pkg/controlplane"
"k8s.io/kubernetes/pkg/controlplane/apiserver"
)
type Config struct {
Options completedServerRunOptions
Aggregator *aggregatorapiserver.Config
ControlPlane *controlplane.Config
ApiExtensions *apiextensionsapiserver.Config
ExtraConfig
}
type ExtraConfig struct {
}
type completedConfig struct {
Options completedServerRunOptions
Aggregator aggregatorapiserver.CompletedConfig
ControlPlane controlplane.CompletedConfig
ApiExtensions apiextensionsapiserver.CompletedConfig
ExtraConfig
}
type CompletedConfig struct {
// Embed a private pointer that cannot be instantiated outside of this package.
*completedConfig
}
func (c *Config) Complete() (CompletedConfig, error) {
return CompletedConfig{&completedConfig{
Options: c.Options,
Aggregator: c.Aggregator.Complete(),
ControlPlane: c.ControlPlane.Complete(),
ApiExtensions: c.ApiExtensions.Complete(),
ExtraConfig: c.ExtraConfig,
}}, nil
}
// NewConfig creates all the resources for running kube-apiserver, but runs none of them.
func NewConfig(opts completedServerRunOptions) (*Config, error) {
c := &Config{
Options: opts,
}
controlPlane, serviceResolver, pluginInitializer, err := CreateKubeAPIServerConfig(opts)
if err != nil {
return nil, err
}
c.ControlPlane = controlPlane
apiExtensions, err := apiserver.CreateAPIExtensionsConfig(*controlPlane.GenericConfig, controlPlane.ExtraConfig.VersionedInformers, pluginInitializer, opts.ServerRunOptions, opts.MasterCount,
serviceResolver, webhook.NewDefaultAuthenticationInfoResolverWrapper(controlPlane.ExtraConfig.ProxyTransport, controlPlane.GenericConfig.EgressSelector, controlPlane.GenericConfig.LoopbackClientConfig, controlPlane.GenericConfig.TracerProvider))
if err != nil {
return nil, err
}
c.ApiExtensions = apiExtensions
aggregator, err := createAggregatorConfig(*controlPlane.GenericConfig, opts.ServerRunOptions, controlPlane.ExtraConfig.VersionedInformers, serviceResolver, controlPlane.ExtraConfig.ProxyTransport, pluginInitializer)
if err != nil {
return nil, err
}
c.Aggregator = aggregator
return c, nil
}

View File

@ -30,35 +30,25 @@ import (
"time"
"github.com/spf13/cobra"
"k8s.io/client-go/dynamic"
oteltrace "go.opentelemetry.io/otel/trace"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
extensionsapiserver "k8s.io/apiextensions-apiserver/pkg/apiserver"
"k8s.io/apimachinery/pkg/runtime"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
utilnet "k8s.io/apimachinery/pkg/util/net"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apiserver/pkg/admission"
"k8s.io/apiserver/pkg/authorization/authorizer"
"k8s.io/apiserver/pkg/cel/openapi/resolver"
"k8s.io/apiserver/pkg/endpoints/discovery/aggregated"
genericapifilters "k8s.io/apiserver/pkg/endpoints/filters"
openapinamer "k8s.io/apiserver/pkg/endpoints/openapi"
genericfeatures "k8s.io/apiserver/pkg/features"
genericapiserver "k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/server/egressselector"
"k8s.io/apiserver/pkg/server/filters"
serveroptions "k8s.io/apiserver/pkg/server/options"
serverstorage "k8s.io/apiserver/pkg/server/storage"
utilfeature "k8s.io/apiserver/pkg/util/feature"
utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol"
"k8s.io/apiserver/pkg/util/notfoundhandler"
"k8s.io/apiserver/pkg/util/openapi"
"k8s.io/apiserver/pkg/util/webhook"
"k8s.io/client-go/dynamic"
clientgoinformers "k8s.io/client-go/informers"
clientgoclientset "k8s.io/client-go/kubernetes"
clientset "k8s.io/client-go/kubernetes"
k8sscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"k8s.io/client-go/util/keyutil"
@ -66,7 +56,7 @@ import (
"k8s.io/component-base/cli/globalflag"
"k8s.io/component-base/logs"
logsapi "k8s.io/component-base/logs/api/v1"
_ "k8s.io/component-base/metrics/prometheus/workqueue" // for workqueue metric registration
_ "k8s.io/component-base/metrics/prometheus/workqueue"
"k8s.io/component-base/term"
"k8s.io/component-base/version"
"k8s.io/component-base/version/verflag"
@ -79,13 +69,12 @@ import (
"k8s.io/kubernetes/pkg/api/legacyscheme"
"k8s.io/kubernetes/pkg/capabilities"
"k8s.io/kubernetes/pkg/controlplane"
controlplaneapiserver "k8s.io/kubernetes/pkg/controlplane/apiserver"
"k8s.io/kubernetes/pkg/controlplane/reconcilers"
generatedopenapi "k8s.io/kubernetes/pkg/generated/openapi"
"k8s.io/kubernetes/pkg/kubeapiserver"
kubeapiserveradmission "k8s.io/kubernetes/pkg/kubeapiserver/admission"
kubeauthenticator "k8s.io/kubernetes/pkg/kubeapiserver/authenticator"
"k8s.io/kubernetes/pkg/kubeapiserver/authorizer/modes"
rbacrest "k8s.io/kubernetes/pkg/registry/rbac/rest"
"k8s.io/kubernetes/pkg/serviceaccount"
)
@ -162,13 +151,21 @@ cluster's shared state through which all other components interact.`,
}
// Run runs the specified APIServer. This should never exit.
func Run(completeOptions completedServerRunOptions, stopCh <-chan struct{}) error {
func Run(options completedServerRunOptions, stopCh <-chan struct{}) error {
// To help debugging, immediately log version
klog.Infof("Version: %+v", version.Get())
klog.InfoS("Golang settings", "GOGC", os.Getenv("GOGC"), "GOMAXPROCS", os.Getenv("GOMAXPROCS"), "GOTRACEBACK", os.Getenv("GOTRACEBACK"))
server, err := CreateServerChain(completeOptions)
config, err := NewConfig(options)
if err != nil {
return err
}
completed, err := config.Complete()
if err != nil {
return err
}
server, err := CreateServerChain(completed)
if err != nil {
return err
}
@ -182,37 +179,21 @@ func Run(completeOptions completedServerRunOptions, stopCh <-chan struct{}) erro
}
// CreateServerChain creates the apiservers connected via delegation.
func CreateServerChain(completedOptions completedServerRunOptions) (*aggregatorapiserver.APIAggregator, error) {
kubeAPIServerConfig, serviceResolver, pluginInitializer, err := CreateKubeAPIServerConfig(completedOptions)
func CreateServerChain(config CompletedConfig) (*aggregatorapiserver.APIAggregator, error) {
notFoundHandler := notfoundhandler.New(config.ControlPlane.GenericConfig.Serializer, genericapifilters.NoMuxAndDiscoveryIncompleteKey)
apiExtensionsServer, err := config.ApiExtensions.New(genericapiserver.NewEmptyDelegateWithCustomHandler(notFoundHandler))
if err != nil {
return nil, err
}
crdAPIEnabled := config.ApiExtensions.GenericConfig.MergedResourceConfig.ResourceEnabled(apiextensionsv1.SchemeGroupVersion.WithResource("customresourcedefinitions"))
// If additional API servers are added, they should be gated.
apiExtensionsConfig, err := createAPIExtensionsConfig(*kubeAPIServerConfig.GenericConfig, kubeAPIServerConfig.ExtraConfig.VersionedInformers, pluginInitializer, completedOptions.ServerRunOptions, completedOptions.MasterCount,
serviceResolver, webhook.NewDefaultAuthenticationInfoResolverWrapper(kubeAPIServerConfig.ExtraConfig.ProxyTransport, kubeAPIServerConfig.GenericConfig.EgressSelector, kubeAPIServerConfig.GenericConfig.LoopbackClientConfig, kubeAPIServerConfig.GenericConfig.TracerProvider))
if err != nil {
return nil, err
}
crdAPIEnabled := apiExtensionsConfig.GenericConfig.MergedResourceConfig.ResourceEnabled(apiextensionsv1.SchemeGroupVersion.WithResource("customresourcedefinitions"))
notFoundHandler := notfoundhandler.New(kubeAPIServerConfig.GenericConfig.Serializer, genericapifilters.NoMuxAndDiscoveryIncompleteKey)
apiExtensionsServer, err := createAPIExtensionsServer(apiExtensionsConfig, genericapiserver.NewEmptyDelegateWithCustomHandler(notFoundHandler))
if err != nil {
return nil, err
}
kubeAPIServer, err := CreateKubeAPIServer(kubeAPIServerConfig, apiExtensionsServer.GenericAPIServer)
kubeAPIServer, err := config.ControlPlane.New(apiExtensionsServer.GenericAPIServer)
if err != nil {
return nil, err
}
// aggregator comes last in the chain
aggregatorConfig, err := createAggregatorConfig(*kubeAPIServerConfig.GenericConfig, completedOptions.ServerRunOptions, kubeAPIServerConfig.ExtraConfig.VersionedInformers, serviceResolver, kubeAPIServerConfig.ExtraConfig.ProxyTransport, pluginInitializer)
if err != nil {
return nil, err
}
aggregatorServer, err := createAggregatorServer(aggregatorConfig, kubeAPIServer.GenericAPIServer, apiExtensionsServer.Informers, crdAPIEnabled)
aggregatorServer, err := createAggregatorServer(config.Aggregator, kubeAPIServer.GenericAPIServer, apiExtensionsServer.Informers, crdAPIEnabled)
if err != nil {
// we don't need special handling for innerStopCh because the aggregator server doesn't create any go routines
return nil, err
@ -221,11 +202,6 @@ func CreateServerChain(completedOptions completedServerRunOptions) (*aggregatora
return aggregatorServer, nil
}
// CreateKubeAPIServer creates and wires a workable kube-apiserver
func CreateKubeAPIServer(kubeAPIServerConfig *controlplane.Config, delegateAPIServer genericapiserver.DelegationTarget) (*controlplane.Instance, error) {
return kubeAPIServerConfig.Complete().New(delegateAPIServer)
}
// CreateProxyTransport creates the dialer infrastructure to connect to the nodes.
func CreateProxyTransport() *http.Transport {
var proxyDialerFn utilnet.DialFunc
@ -247,7 +223,11 @@ func CreateKubeAPIServerConfig(s completedServerRunOptions) (
) {
proxyTransport := CreateProxyTransport()
genericConfig, versionedInformers, serviceResolver, pluginInitializers, admissionPostStartHook, storageFactory, err := buildGenericConfig(s.ServerRunOptions, proxyTransport)
genericConfig, versionedInformers, storageFactory, err := controlplaneapiserver.BuildGenericConfig(
s.ServerRunOptions,
[]*runtime.Scheme{legacyscheme.Scheme, extensionsapiserver.Scheme, aggregatorscheme.Scheme},
generatedopenapi.GetOpenAPIDefinitions,
)
if err != nil {
return nil, nil, nil, err
}
@ -305,6 +285,36 @@ func CreateKubeAPIServerConfig(s completedServerRunOptions) (
config.ExtraConfig.ClusterAuthenticationInfo.RequestHeaderUsernameHeaders = requestHeaderConfig.UsernameHeaders
}
// setup admission
admissionConfig := &kubeapiserveradmission.Config{
ExternalInformers: versionedInformers,
LoopbackClientConfig: genericConfig.LoopbackClientConfig,
CloudConfigFile: s.CloudProvider.CloudConfigFile,
}
serviceResolver := buildServiceResolver(s.EnableAggregatorRouting, genericConfig.LoopbackClientConfig.Host, versionedInformers)
schemaResolver := resolver.NewDefinitionsSchemaResolver(k8sscheme.Scheme, genericConfig.OpenAPIConfig.GetDefinitions)
pluginInitializers, admissionPostStartHook, err := admissionConfig.New(proxyTransport, genericConfig.EgressSelector, serviceResolver, genericConfig.TracerProvider, schemaResolver)
if err != nil {
return nil, nil, nil, fmt.Errorf("failed to create admission plugin initializer: %v", err)
}
clientgoExternalClient, err := clientset.NewForConfig(genericConfig.LoopbackClientConfig)
if err != nil {
return nil, nil, nil, fmt.Errorf("failed to create real client-go external client: %w", err)
}
dynamicExternalClient, err := dynamic.NewForConfig(genericConfig.LoopbackClientConfig)
if err != nil {
return nil, nil, nil, fmt.Errorf("failed to create real dynamic external client: %w", err)
}
err = s.Admission.ApplyTo(
genericConfig,
versionedInformers,
clientgoExternalClient,
dynamicExternalClient,
utilfeature.DefaultFeatureGate,
pluginInitializers...)
if err != nil {
return nil, nil, nil, fmt.Errorf("failed to apply admission: %w", err)
}
if err := config.GenericConfig.AddPostStartHook("start-kube-apiserver-admission-initializer", admissionPostStartHook); err != nil {
return nil, nil, nil, err
}
@ -324,7 +334,7 @@ func CreateKubeAPIServerConfig(s completedServerRunOptions) (
config.ExtraConfig.ProxyTransport = c
}
// Load the public keys.
// Load and set the public keys.
var pubKeys []interface{}
for _, f := range s.Authentication.ServiceAccounts.KeyFiles {
keys, err := keyutil.PublicKeysFromFile(f)
@ -333,7 +343,6 @@ func CreateKubeAPIServerConfig(s completedServerRunOptions) (
}
pubKeys = append(pubKeys, keys...)
}
// Plumb the required metadata through ExtraConfig.
config.ExtraConfig.ServiceAccountIssuerURL = s.Authentication.ServiceAccounts.Issuers[0]
config.ExtraConfig.ServiceAccountJWKSURI = s.Authentication.ServiceAccounts.JWKSURI
config.ExtraConfig.ServiceAccountPublicKeys = pubKeys
@ -341,184 +350,6 @@ func CreateKubeAPIServerConfig(s completedServerRunOptions) (
return config, serviceResolver, pluginInitializers, nil
}
// buildGenericConfig takes the master server options and produces the genericapiserver.Config associated with it
func buildGenericConfig(
s *options.ServerRunOptions,
proxyTransport *http.Transport,
) (
genericConfig *genericapiserver.Config,
versionedInformers clientgoinformers.SharedInformerFactory,
serviceResolver aggregatorapiserver.ServiceResolver,
pluginInitializers []admission.PluginInitializer,
admissionPostStartHook genericapiserver.PostStartHookFunc,
storageFactory *serverstorage.DefaultStorageFactory,
lastErr error,
) {
genericConfig = genericapiserver.NewConfig(legacyscheme.Codecs)
genericConfig.MergedResourceConfig = controlplane.DefaultAPIResourceConfigSource()
if lastErr = s.GenericServerRunOptions.ApplyTo(genericConfig); lastErr != nil {
return
}
if lastErr = s.SecureServing.ApplyTo(&genericConfig.SecureServing, &genericConfig.LoopbackClientConfig); lastErr != nil {
return
}
if lastErr = s.Features.ApplyTo(genericConfig); lastErr != nil {
return
}
if lastErr = s.APIEnablement.ApplyTo(genericConfig, controlplane.DefaultAPIResourceConfigSource(), legacyscheme.Scheme); lastErr != nil {
return
}
if lastErr = s.EgressSelector.ApplyTo(genericConfig); lastErr != nil {
return
}
if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerTracing) {
if lastErr = s.Traces.ApplyTo(genericConfig.EgressSelector, genericConfig); lastErr != nil {
return
}
}
// wrap the definitions to revert any changes from disabled features
getOpenAPIDefinitions := openapi.GetOpenAPIDefinitionsWithoutDisabledFeatures(generatedopenapi.GetOpenAPIDefinitions)
namer := openapinamer.NewDefinitionNamer(legacyscheme.Scheme, extensionsapiserver.Scheme, aggregatorscheme.Scheme)
genericConfig.OpenAPIConfig = genericapiserver.DefaultOpenAPIConfig(getOpenAPIDefinitions, namer)
genericConfig.OpenAPIConfig.Info.Title = "Kubernetes"
genericConfig.OpenAPIV3Config = genericapiserver.DefaultOpenAPIV3Config(getOpenAPIDefinitions, namer)
genericConfig.OpenAPIV3Config.Info.Title = "Kubernetes"
genericConfig.LongRunningFunc = filters.BasicLongRunningRequestCheck(
sets.NewString("watch", "proxy"),
sets.NewString("attach", "exec", "proxy", "log", "portforward"),
)
kubeVersion := version.Get()
genericConfig.Version = &kubeVersion
if genericConfig.EgressSelector != nil {
s.Etcd.StorageConfig.Transport.EgressLookup = genericConfig.EgressSelector.Lookup
}
if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerTracing) {
s.Etcd.StorageConfig.Transport.TracerProvider = genericConfig.TracerProvider
} else {
s.Etcd.StorageConfig.Transport.TracerProvider = oteltrace.NewNoopTracerProvider()
}
storageFactoryConfig := kubeapiserver.NewStorageFactoryConfig()
storageFactoryConfig.APIResourceConfig = genericConfig.MergedResourceConfig
storageFactoryConfig.StorageConfig.StorageObjectCountTracker = genericConfig.StorageObjectCountTracker
storageFactory, lastErr = storageFactoryConfig.Complete(s.Etcd).New()
if lastErr != nil {
return
}
if lastErr = s.Etcd.ApplyWithStorageFactoryTo(storageFactory, genericConfig); lastErr != nil {
return
}
// Use protobufs for self-communication.
// Since not every generic apiserver has to support protobufs, we
// cannot default to it in generic apiserver and need to explicitly
// set it in kube-apiserver.
genericConfig.LoopbackClientConfig.ContentConfig.ContentType = "application/vnd.kubernetes.protobuf"
// Disable compression for self-communication, since we are going to be
// on a fast local network
genericConfig.LoopbackClientConfig.DisableCompression = true
kubeClientConfig := genericConfig.LoopbackClientConfig
clientgoExternalClient, err := clientgoclientset.NewForConfig(kubeClientConfig)
if err != nil {
lastErr = fmt.Errorf("failed to create real external clientset: %v", err)
return
}
versionedInformers = clientgoinformers.NewSharedInformerFactory(clientgoExternalClient, 10*time.Minute)
// Authentication.ApplyTo requires already applied OpenAPIConfig and EgressSelector if present
if lastErr = s.Authentication.ApplyTo(&genericConfig.Authentication, genericConfig.SecureServing, genericConfig.EgressSelector, genericConfig.OpenAPIConfig, genericConfig.OpenAPIV3Config, clientgoExternalClient, versionedInformers); lastErr != nil {
return
}
genericConfig.Authorization.Authorizer, genericConfig.RuleResolver, err = BuildAuthorizer(s, genericConfig.EgressSelector, versionedInformers)
if err != nil {
lastErr = fmt.Errorf("invalid authorization config: %v", err)
return
}
if !sets.NewString(s.Authorization.Modes...).Has(modes.ModeRBAC) {
genericConfig.DisabledPostStartHooks.Insert(rbacrest.PostStartHookName)
}
lastErr = s.Audit.ApplyTo(genericConfig)
if lastErr != nil {
return
}
admissionConfig := &kubeapiserveradmission.Config{
ExternalInformers: versionedInformers,
LoopbackClientConfig: genericConfig.LoopbackClientConfig,
CloudConfigFile: s.CloudProvider.CloudConfigFile,
}
serviceResolver = buildServiceResolver(s.EnableAggregatorRouting, genericConfig.LoopbackClientConfig.Host, versionedInformers)
schemaResolver := resolver.NewDefinitionsSchemaResolver(k8sscheme.Scheme, genericConfig.OpenAPIConfig.GetDefinitions)
pluginInitializers, admissionPostStartHook, err = admissionConfig.New(proxyTransport, genericConfig.EgressSelector, serviceResolver, genericConfig.TracerProvider, schemaResolver)
if err != nil {
lastErr = fmt.Errorf("failed to create admission plugin initializer: %v", err)
return
}
dynamicExternalClient, err := dynamic.NewForConfig(kubeClientConfig)
if err != nil {
lastErr = fmt.Errorf("failed to create real dynamic external client: %w", err)
return
}
err = s.Admission.ApplyTo(
genericConfig,
versionedInformers,
clientgoExternalClient,
dynamicExternalClient,
utilfeature.DefaultFeatureGate,
pluginInitializers...)
if err != nil {
lastErr = fmt.Errorf("failed to initialize admission: %v", err)
return
}
if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIPriorityAndFairness) && s.GenericServerRunOptions.EnablePriorityAndFairness {
genericConfig.FlowControl, lastErr = BuildPriorityAndFairness(s, clientgoExternalClient, versionedInformers)
}
if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.AggregatedDiscoveryEndpoint) {
genericConfig.AggregatedDiscoveryGroupManager = aggregated.NewResourceManager("apis")
}
return
}
// BuildAuthorizer constructs the authorizer
func BuildAuthorizer(s *options.ServerRunOptions, EgressSelector *egressselector.EgressSelector, versionedInformers clientgoinformers.SharedInformerFactory) (authorizer.Authorizer, authorizer.RuleResolver, error) {
authorizationConfig := s.Authorization.ToAuthorizationConfig(versionedInformers)
if EgressSelector != nil {
egressDialer, err := EgressSelector.Lookup(egressselector.ControlPlane.AsNetworkContext())
if err != nil {
return nil, nil, err
}
authorizationConfig.CustomDial = egressDialer
}
return authorizationConfig.New()
}
// BuildPriorityAndFairness constructs the guts of the API Priority and Fairness filter
func BuildPriorityAndFairness(s *options.ServerRunOptions, extclient clientgoclientset.Interface, versionedInformer clientgoinformers.SharedInformerFactory) (utilflowcontrol.Interface, error) {
if s.GenericServerRunOptions.MaxRequestsInFlight+s.GenericServerRunOptions.MaxMutatingRequestsInFlight <= 0 {
return nil, fmt.Errorf("invalid configuration: MaxRequestsInFlight=%d and MaxMutatingRequestsInFlight=%d; they must add up to something positive", s.GenericServerRunOptions.MaxRequestsInFlight, s.GenericServerRunOptions.MaxMutatingRequestsInFlight)
}
return utilflowcontrol.New(
versionedInformer,
extclient.FlowcontrolV1beta3(),
s.GenericServerRunOptions.MaxRequestsInFlight+s.GenericServerRunOptions.MaxMutatingRequestsInFlight,
s.GenericServerRunOptions.RequestTimeout/4,
), nil
}
// completedServerRunOptions is a private wrapper that enforces a call of Complete() before Run can be invoked.
type completedServerRunOptions struct {
*options.ServerRunOptions
@ -670,6 +501,7 @@ func buildServiceResolver(enabledAggregatorRouting bool, hostname string, inform
informer.Core().V1().Services().Lister(),
)
}
// resolve kubernetes.default.svc locally
if localHost, err := url.Parse(hostname); err == nil {
serviceResolver = aggregatorapiserver.NewLoopbackServiceResolver(serviceResolver, localHost)

View File

@ -246,7 +246,16 @@ func StartTestServer(t Logger, instanceOptions *TestServerInstanceOptions, custo
t.Logf("runtime-config=%v", completedOptions.APIEnablement.RuntimeConfig)
t.Logf("Starting kube-apiserver on port %d...", s.SecureServing.BindPort)
server, err := app.CreateServerChain(completedOptions)
config, err := app.NewConfig(completedOptions)
if err != nil {
return result, err
}
completed, err := config.Complete()
if err != nil {
return result, err
}
server, err := app.CreateServerChain(completed)
if err != nil {
return result, fmt.Errorf("failed to create server chain: %v", err)
}

View File

@ -0,0 +1,12 @@
rules:
- selectorRegexp: k8s[.]io/kubernetes
allowedPrefixes:
- ''
forbiddenPrefixes:
# prevent pkg/ from depending on cmd/
# note: pkg/kubemark overrides this
# - k8s.io/kubernetes/cmd # temporarily disabled until options are split and moved
# use sigs.k8s.io/yaml instead
- github.com/ghodss/yaml
# prevent kubernetes from opening sctp sockets (ref: https://github.com/kubernetes/kubernetes/pull/87926#discussion_r376642015)
- github.com/ishidawataru/sctp

View File

@ -1,5 +1,5 @@
/*
Copyright 2017 The Kubernetes Authors.
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@ -14,13 +14,10 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
// Package app does all of the work necessary to create a Kubernetes
// APIServer by binding together the API, master and APIServer infrastructure.
// It can be configured and called directly or via the hyperkube framework.
package app
package apiserver
import (
v1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
"k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
"k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
apiextensionsapiserver "k8s.io/apiextensions-apiserver/pkg/apiserver"
apiextensionsoptions "k8s.io/apiextensions-apiserver/pkg/cmd/server/options"
@ -28,16 +25,17 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apiserver/pkg/admission"
"k8s.io/apiserver/pkg/features"
genericapiserver "k8s.io/apiserver/pkg/server"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/util/feature"
"k8s.io/apiserver/pkg/util/webhook"
kubeexternalinformers "k8s.io/client-go/informers"
"k8s.io/client-go/informers"
"k8s.io/kubernetes/cmd/kube-apiserver/app/options"
)
func createAPIExtensionsConfig(
kubeAPIServerConfig genericapiserver.Config,
externalInformers kubeexternalinformers.SharedInformerFactory,
func CreateAPIExtensionsConfig(
kubeAPIServerConfig server.Config,
kubeInformers informers.SharedInformerFactory,
pluginInitializers []admission.PluginInitializer,
commandOptions *options.ServerRunOptions,
masterCount int,
@ -47,14 +45,14 @@ func createAPIExtensionsConfig(
// make a shallow copy to let us twiddle a few things
// most of the config actually remains the same. We only need to mess with a couple items related to the particulars of the apiextensions
genericConfig := kubeAPIServerConfig
genericConfig.PostStartHooks = map[string]genericapiserver.PostStartHookConfigEntry{}
genericConfig.PostStartHooks = map[string]server.PostStartHookConfigEntry{}
genericConfig.RESTOptionsGetter = nil
// copy the etcd options so we don't mutate originals.
// we assume that the etcd options have been completed already. avoid messing with anything outside
// of changes to StorageConfig as that may lead to unexpected behavior when the options are applied.
etcdOptions := *commandOptions.Etcd
etcdOptions.StorageConfig.Paging = utilfeature.DefaultFeatureGate.Enabled(features.APIListChunking)
etcdOptions.StorageConfig.Paging = feature.DefaultFeatureGate.Enabled(features.APIListChunking)
// this is where the true decodable levels come from.
etcdOptions.StorageConfig.Codec = apiextensionsapiserver.Codecs.LegacyCodec(v1beta1.SchemeGroupVersion, v1.SchemeGroupVersion)
// prefer the more compact serialization (v1beta1) for storage until https://issue.k8s.io/82292 is resolved for objects whose v1 serialization is too big but whose v1beta1 serialization can be stored
@ -72,9 +70,9 @@ func createAPIExtensionsConfig(
return nil, err
}
apiextensionsConfig := &apiextensionsapiserver.Config{
GenericConfig: &genericapiserver.RecommendedConfig{
GenericConfig: &server.RecommendedConfig{
Config: genericConfig,
SharedInformerFactory: externalInformers,
SharedInformerFactory: kubeInformers,
},
ExtraConfig: apiextensionsapiserver.ExtraConfig{
CRDRESTOptionsGetter: apiextensionsoptions.NewCRDRESTOptionsGetter(etcdOptions, genericConfig.ResourceTransformers, genericConfig.StorageObjectCountTracker),
@ -85,11 +83,7 @@ func createAPIExtensionsConfig(
}
// we need to clear the poststarthooks so we don't add them multiple times to all the servers (that fails)
apiextensionsConfig.GenericConfig.PostStartHooks = map[string]genericapiserver.PostStartHookConfigEntry{}
apiextensionsConfig.GenericConfig.PostStartHooks = map[string]server.PostStartHookConfigEntry{}
return apiextensionsConfig, nil
}
func createAPIExtensionsServer(apiextensionsConfig *apiextensionsapiserver.Config, delegateAPIServer genericapiserver.DelegationTarget) (*apiextensionsapiserver.CustomResourceDefinitions, error) {
return apiextensionsConfig.Complete().New(delegateAPIServer)
}

View File

@ -0,0 +1,195 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package apiserver
import (
"fmt"
"time"
oteltrace "go.opentelemetry.io/otel/trace"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apiserver/pkg/authorization/authorizer"
"k8s.io/apiserver/pkg/endpoints/discovery/aggregated"
openapinamer "k8s.io/apiserver/pkg/endpoints/openapi"
genericfeatures "k8s.io/apiserver/pkg/features"
genericapiserver "k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/server/egressselector"
"k8s.io/apiserver/pkg/server/filters"
serverstorage "k8s.io/apiserver/pkg/server/storage"
utilfeature "k8s.io/apiserver/pkg/util/feature"
utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol"
"k8s.io/apiserver/pkg/util/openapi"
clientgoinformers "k8s.io/client-go/informers"
clientgoclientset "k8s.io/client-go/kubernetes"
"k8s.io/component-base/version"
openapicommon "k8s.io/kube-openapi/pkg/common"
"k8s.io/kubernetes/cmd/kube-apiserver/app/options"
"k8s.io/kubernetes/pkg/api/legacyscheme"
"k8s.io/kubernetes/pkg/controlplane"
"k8s.io/kubernetes/pkg/kubeapiserver"
"k8s.io/kubernetes/pkg/kubeapiserver/authorizer/modes"
rbacrest "k8s.io/kubernetes/pkg/registry/rbac/rest"
)
// BuildGenericConfig takes the master server options and produces the genericapiserver.Config associated with it
func BuildGenericConfig(
s *options.ServerRunOptions,
schemes []*runtime.Scheme,
getOpenAPIDefinitions func(ref openapicommon.ReferenceCallback) map[string]openapicommon.OpenAPIDefinition,
) (
genericConfig *genericapiserver.Config,
versionedInformers clientgoinformers.SharedInformerFactory,
storageFactory *serverstorage.DefaultStorageFactory,
lastErr error,
) {
genericConfig = genericapiserver.NewConfig(legacyscheme.Codecs)
genericConfig.MergedResourceConfig = controlplane.DefaultAPIResourceConfigSource()
if lastErr = s.GenericServerRunOptions.ApplyTo(genericConfig); lastErr != nil {
return
}
if lastErr = s.SecureServing.ApplyTo(&genericConfig.SecureServing, &genericConfig.LoopbackClientConfig); lastErr != nil {
return
}
if lastErr = s.Features.ApplyTo(genericConfig); lastErr != nil {
return
}
if lastErr = s.APIEnablement.ApplyTo(genericConfig, controlplane.DefaultAPIResourceConfigSource(), legacyscheme.Scheme); lastErr != nil {
return
}
if lastErr = s.EgressSelector.ApplyTo(genericConfig); lastErr != nil {
return
}
if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerTracing) {
if lastErr = s.Traces.ApplyTo(genericConfig.EgressSelector, genericConfig); lastErr != nil {
return
}
}
// wrap the definitions to revert any changes from disabled features
getOpenAPIDefinitions = openapi.GetOpenAPIDefinitionsWithoutDisabledFeatures(getOpenAPIDefinitions)
namer := openapinamer.NewDefinitionNamer(schemes...)
genericConfig.OpenAPIConfig = genericapiserver.DefaultOpenAPIConfig(getOpenAPIDefinitions, namer)
genericConfig.OpenAPIConfig.Info.Title = "Kubernetes"
genericConfig.OpenAPIV3Config = genericapiserver.DefaultOpenAPIV3Config(getOpenAPIDefinitions, namer)
genericConfig.OpenAPIV3Config.Info.Title = "Kubernetes"
genericConfig.LongRunningFunc = filters.BasicLongRunningRequestCheck(
sets.NewString("watch", "proxy"),
sets.NewString("attach", "exec", "proxy", "log", "portforward"),
)
kubeVersion := version.Get()
genericConfig.Version = &kubeVersion
if genericConfig.EgressSelector != nil {
s.Etcd.StorageConfig.Transport.EgressLookup = genericConfig.EgressSelector.Lookup
}
if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerTracing) {
s.Etcd.StorageConfig.Transport.TracerProvider = genericConfig.TracerProvider
} else {
s.Etcd.StorageConfig.Transport.TracerProvider = oteltrace.NewNoopTracerProvider()
}
storageFactoryConfig := kubeapiserver.NewStorageFactoryConfig()
storageFactoryConfig.APIResourceConfig = genericConfig.MergedResourceConfig
storageFactory, lastErr = storageFactoryConfig.Complete(s.Etcd).New()
if lastErr != nil {
return
}
if lastErr = s.Etcd.ApplyWithStorageFactoryTo(storageFactory, genericConfig); lastErr != nil {
return
}
// Use protobufs for self-communication.
// Since not every generic apiserver has to support protobufs, we
// cannot default to it in generic apiserver and need to explicitly
// set it in kube-apiserver.
genericConfig.LoopbackClientConfig.ContentConfig.ContentType = "application/vnd.kubernetes.protobuf"
// Disable compression for self-communication, since we are going to be
// on a fast local network
genericConfig.LoopbackClientConfig.DisableCompression = true
kubeClientConfig := genericConfig.LoopbackClientConfig
clientgoExternalClient, err := clientgoclientset.NewForConfig(kubeClientConfig)
if err != nil {
lastErr = fmt.Errorf("failed to create real external clientset: %v", err)
return
}
versionedInformers = clientgoinformers.NewSharedInformerFactory(clientgoExternalClient, 10*time.Minute)
// Authentication.ApplyTo requires already applied OpenAPIConfig and EgressSelector if present
if lastErr = s.Authentication.ApplyTo(&genericConfig.Authentication, genericConfig.SecureServing, genericConfig.EgressSelector, genericConfig.OpenAPIConfig, genericConfig.OpenAPIV3Config, clientgoExternalClient, versionedInformers); lastErr != nil {
return
}
genericConfig.Authorization.Authorizer, genericConfig.RuleResolver, err = BuildAuthorizer(s, genericConfig.EgressSelector, versionedInformers)
if err != nil {
lastErr = fmt.Errorf("invalid authorization config: %v", err)
return
}
if !sets.NewString(s.Authorization.Modes...).Has(modes.ModeRBAC) {
genericConfig.DisabledPostStartHooks.Insert(rbacrest.PostStartHookName)
}
lastErr = s.Audit.ApplyTo(genericConfig)
if lastErr != nil {
return
}
if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIPriorityAndFairness) && s.GenericServerRunOptions.EnablePriorityAndFairness {
genericConfig.FlowControl, lastErr = BuildPriorityAndFairness(s, clientgoExternalClient, versionedInformers)
}
if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.AggregatedDiscoveryEndpoint) {
genericConfig.AggregatedDiscoveryGroupManager = aggregated.NewResourceManager("apis")
}
return
}
// BuildAuthorizer constructs the authorizer
func BuildAuthorizer(s *options.ServerRunOptions, EgressSelector *egressselector.EgressSelector, versionedInformers clientgoinformers.SharedInformerFactory) (authorizer.Authorizer, authorizer.RuleResolver, error) {
authorizationConfig := s.Authorization.ToAuthorizationConfig(versionedInformers)
if EgressSelector != nil {
egressDialer, err := EgressSelector.Lookup(egressselector.ControlPlane.AsNetworkContext())
if err != nil {
return nil, nil, err
}
authorizationConfig.CustomDial = egressDialer
}
return authorizationConfig.New()
}
// BuildPriorityAndFairness constructs the guts of the API Priority and Fairness filter
func BuildPriorityAndFairness(s *options.ServerRunOptions, extclient clientgoclientset.Interface, versionedInformer clientgoinformers.SharedInformerFactory) (utilflowcontrol.Interface, error) {
if s.GenericServerRunOptions.MaxRequestsInFlight+s.GenericServerRunOptions.MaxMutatingRequestsInFlight <= 0 {
return nil, fmt.Errorf("invalid configuration: MaxRequestsInFlight=%d and MaxMutatingRequestsInFlight=%d; they must add up to something positive", s.GenericServerRunOptions.MaxRequestsInFlight, s.GenericServerRunOptions.MaxMutatingRequestsInFlight)
}
return utilflowcontrol.New(
versionedInformer,
extclient.FlowcontrolV1beta3(),
s.GenericServerRunOptions.MaxRequestsInFlight+s.GenericServerRunOptions.MaxMutatingRequestsInFlight,
s.GenericServerRunOptions.RequestTimeout/4,
), nil
}

View File

@ -124,7 +124,15 @@ func StartRealAPIServerOrDie(t *testing.T, configFuncs ...func(*options.ServerRu
t.Fatal(err)
}
kubeAPIServer, err := app.CreateServerChain(completedOptions)
config, err := app.NewConfig(completedOptions)
if err != nil {
t.Fatal(err)
}
completed, err := config.Complete()
if err != nil {
t.Fatal(err)
}
kubeAPIServer, err := app.CreateServerChain(completed)
if err != nil {
t.Fatal(err)
}

View File

@ -73,13 +73,13 @@ func TestAPIServiceWaitOnStart(t *testing.T) {
}
t.Cleanup(func() { etcd3Client.Close() })
// Pollute CRD path in etcd so CRD lists cannot succeed and the informer cannot sync
t.Log("Pollute CRD path in etcd so CRD lists cannot succeed and the informer cannot sync")
bogusCRDEtcdPath := path.Join("/", etcdConfig.Prefix, "apiextensions.k8s.io/customresourcedefinitions/bogus")
if _, err := etcd3Client.KV.Put(ctx, bogusCRDEtcdPath, `bogus data`); err != nil {
t.Fatal(err)
}
// Populate a valid CRD and managed APIService in etcd
t.Log("Populate a valid CRD and managed APIService in etcd")
if _, err := etcd3Client.KV.Put(
ctx,
path.Join("/", etcdConfig.Prefix, "apiextensions.k8s.io/customresourcedefinitions/widgets.valid.example.com"),
@ -155,7 +155,7 @@ func TestAPIServiceWaitOnStart(t *testing.T) {
t.Fatal(err)
}
// Populate a stale managed APIService in etcd
t.Log("Populate a stale managed APIService in etcd")
if _, err := etcd3Client.KV.Put(
ctx,
path.Join("/", etcdConfig.Prefix, "apiregistration.k8s.io/apiservices/v1.stale.example.com"),
@ -179,7 +179,7 @@ func TestAPIServiceWaitOnStart(t *testing.T) {
t.Fatal(err)
}
// Start server
t.Log("Starting server")
options := kastesting.NewDefaultTestServerOptions()
options.SkipHealthzCheck = true
testServer := kastesting.StartTestServerOrDie(t, options, nil, etcdConfig)
@ -188,7 +188,7 @@ func TestAPIServiceWaitOnStart(t *testing.T) {
kubeClientConfig := rest.CopyConfig(testServer.ClientConfig)
aggregatorClient := aggregatorclient.NewForConfigOrDie(kubeClientConfig)
// ensure both APIService objects remain
t.Log("Ensure both APIService objects remain")
for i := 0; i < 10; i++ {
if _, err := aggregatorClient.ApiregistrationV1().APIServices().Get(ctx, "v1.valid.example.com", metav1.GetOptions{}); err != nil {
t.Fatal(err)
@ -199,13 +199,12 @@ func TestAPIServiceWaitOnStart(t *testing.T) {
time.Sleep(time.Second)
}
// Clear the bogus CRD data so the informer can sync
t.Log("Clear the bogus CRD data so the informer can sync")
if _, err := etcd3Client.KV.Delete(ctx, bogusCRDEtcdPath); err != nil {
t.Fatal(err)
}
t.Log("cleaned up bogus CRD data")
// ensure the stale APIService object is cleaned up
t.Log("Ensure the stale APIService object is cleaned up")
if err := wait.Poll(time.Second, wait.ForeverTestTimeout, func() (bool, error) {
_, err := aggregatorClient.ApiregistrationV1().APIServices().Get(ctx, "v1.stale.example.com", metav1.GetOptions{})
if err == nil {
@ -220,7 +219,7 @@ func TestAPIServiceWaitOnStart(t *testing.T) {
t.Fatal(err)
}
// ensure the valid APIService object remains
t.Log("Ensure the valid APIService object remains")
for i := 0; i < 5; i++ {
time.Sleep(time.Second)
if _, err := aggregatorClient.ApiregistrationV1().APIServices().Get(ctx, "v1.valid.example.com", metav1.GetOptions{}); err != nil {

View File

@ -167,7 +167,7 @@ func StartTestServer(ctx context.Context, t testing.TB, setup TestServerSetup) (
if setup.ModifyServerConfig != nil {
setup.ModifyServerConfig(kubeAPIServerConfig)
}
kubeAPIServer, err := app.CreateKubeAPIServer(kubeAPIServerConfig, genericapiserver.NewEmptyDelegate())
kubeAPIServer, err := kubeAPIServerConfig.Complete().New(genericapiserver.NewEmptyDelegate())
if err != nil {
t.Fatal(err)
}