diff --git a/cmd/kube-apiserver/app/server.go b/cmd/kube-apiserver/app/server.go index 35be64eef57..e6aa454a543 100644 --- a/cmd/kube-apiserver/app/server.go +++ b/cmd/kube-apiserver/app/server.go @@ -30,62 +30,41 @@ import ( "time" "github.com/spf13/cobra" - "k8s.io/client-go/dynamic" - - oteltrace "go.opentelemetry.io/otel/trace" apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" - extensionsapiserver "k8s.io/apiextensions-apiserver/pkg/apiserver" utilerrors "k8s.io/apimachinery/pkg/util/errors" utilnet "k8s.io/apimachinery/pkg/util/net" utilruntime "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apiserver/pkg/admission" - "k8s.io/apiserver/pkg/authorization/authorizer" - "k8s.io/apiserver/pkg/cel/openapi/resolver" - "k8s.io/apiserver/pkg/endpoints/discovery/aggregated" genericapifilters "k8s.io/apiserver/pkg/endpoints/filters" - openapinamer "k8s.io/apiserver/pkg/endpoints/openapi" - genericfeatures "k8s.io/apiserver/pkg/features" genericapiserver "k8s.io/apiserver/pkg/server" "k8s.io/apiserver/pkg/server/egressselector" - "k8s.io/apiserver/pkg/server/filters" serveroptions "k8s.io/apiserver/pkg/server/options" - serverstorage "k8s.io/apiserver/pkg/server/storage" utilfeature "k8s.io/apiserver/pkg/util/feature" - utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol" "k8s.io/apiserver/pkg/util/notfoundhandler" - "k8s.io/apiserver/pkg/util/openapi" "k8s.io/apiserver/pkg/util/webhook" clientgoinformers "k8s.io/client-go/informers" - clientgoclientset "k8s.io/client-go/kubernetes" - k8sscheme "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/rest" "k8s.io/client-go/util/keyutil" cliflag "k8s.io/component-base/cli/flag" "k8s.io/component-base/cli/globalflag" "k8s.io/component-base/logs" logsapi "k8s.io/component-base/logs/api/v1" - _ "k8s.io/component-base/metrics/prometheus/workqueue" // for workqueue metric registration + _ "k8s.io/component-base/metrics/prometheus/workqueue" "k8s.io/component-base/term" "k8s.io/component-base/version" "k8s.io/component-base/version/verflag" "k8s.io/klog/v2" aggregatorapiserver "k8s.io/kube-aggregator/pkg/apiserver" - aggregatorscheme "k8s.io/kube-aggregator/pkg/apiserver/scheme" netutils "k8s.io/utils/net" "k8s.io/kubernetes/cmd/kube-apiserver/app/options" - "k8s.io/kubernetes/pkg/api/legacyscheme" "k8s.io/kubernetes/pkg/capabilities" "k8s.io/kubernetes/pkg/controlplane" + controlplaneapiserver "k8s.io/kubernetes/pkg/controlplane/apiserver" "k8s.io/kubernetes/pkg/controlplane/reconcilers" - generatedopenapi "k8s.io/kubernetes/pkg/generated/openapi" "k8s.io/kubernetes/pkg/kubeapiserver" - kubeapiserveradmission "k8s.io/kubernetes/pkg/kubeapiserver/admission" kubeauthenticator "k8s.io/kubernetes/pkg/kubeapiserver/authenticator" - "k8s.io/kubernetes/pkg/kubeapiserver/authorizer/modes" - rbacrest "k8s.io/kubernetes/pkg/registry/rbac/rest" "k8s.io/kubernetes/pkg/serviceaccount" ) @@ -234,7 +213,7 @@ func CreateKubeAPIServerConfig(s completedServerRunOptions) ( ) { proxyTransport := CreateProxyTransport() - genericConfig, versionedInformers, serviceResolver, pluginInitializers, admissionPostStartHook, storageFactory, err := buildGenericConfig(s.ServerRunOptions, proxyTransport) + genericConfig, versionedInformers, serviceResolver, pluginInitializers, admissionPostStartHook, storageFactory, err := controlplaneapiserver.BuildGenericConfig(s.ServerRunOptions, proxyTransport) if err != nil { return nil, nil, nil, err } @@ -328,184 +307,6 @@ func CreateKubeAPIServerConfig(s completedServerRunOptions) ( return config, serviceResolver, pluginInitializers, nil } -// buildGenericConfig takes the master server options and produces the genericapiserver.Config associated with it -func buildGenericConfig( - s *options.ServerRunOptions, - proxyTransport *http.Transport, -) ( - genericConfig *genericapiserver.Config, - versionedInformers clientgoinformers.SharedInformerFactory, - serviceResolver aggregatorapiserver.ServiceResolver, - pluginInitializers []admission.PluginInitializer, - admissionPostStartHook genericapiserver.PostStartHookFunc, - storageFactory *serverstorage.DefaultStorageFactory, - lastErr error, -) { - genericConfig = genericapiserver.NewConfig(legacyscheme.Codecs) - genericConfig.MergedResourceConfig = controlplane.DefaultAPIResourceConfigSource() - - if lastErr = s.GenericServerRunOptions.ApplyTo(genericConfig); lastErr != nil { - return - } - - if lastErr = s.SecureServing.ApplyTo(&genericConfig.SecureServing, &genericConfig.LoopbackClientConfig); lastErr != nil { - return - } - if lastErr = s.Features.ApplyTo(genericConfig); lastErr != nil { - return - } - if lastErr = s.APIEnablement.ApplyTo(genericConfig, controlplane.DefaultAPIResourceConfigSource(), legacyscheme.Scheme); lastErr != nil { - return - } - if lastErr = s.EgressSelector.ApplyTo(genericConfig); lastErr != nil { - return - } - if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerTracing) { - if lastErr = s.Traces.ApplyTo(genericConfig.EgressSelector, genericConfig); lastErr != nil { - return - } - } - // wrap the definitions to revert any changes from disabled features - getOpenAPIDefinitions := openapi.GetOpenAPIDefinitionsWithoutDisabledFeatures(generatedopenapi.GetOpenAPIDefinitions) - namer := openapinamer.NewDefinitionNamer(legacyscheme.Scheme, extensionsapiserver.Scheme, aggregatorscheme.Scheme) - genericConfig.OpenAPIConfig = genericapiserver.DefaultOpenAPIConfig(getOpenAPIDefinitions, namer) - genericConfig.OpenAPIConfig.Info.Title = "Kubernetes" - genericConfig.OpenAPIV3Config = genericapiserver.DefaultOpenAPIV3Config(getOpenAPIDefinitions, namer) - genericConfig.OpenAPIV3Config.Info.Title = "Kubernetes" - - genericConfig.LongRunningFunc = filters.BasicLongRunningRequestCheck( - sets.NewString("watch", "proxy"), - sets.NewString("attach", "exec", "proxy", "log", "portforward"), - ) - - kubeVersion := version.Get() - genericConfig.Version = &kubeVersion - - if genericConfig.EgressSelector != nil { - s.Etcd.StorageConfig.Transport.EgressLookup = genericConfig.EgressSelector.Lookup - } - if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerTracing) { - s.Etcd.StorageConfig.Transport.TracerProvider = genericConfig.TracerProvider - } else { - s.Etcd.StorageConfig.Transport.TracerProvider = oteltrace.NewNoopTracerProvider() - } - - storageFactoryConfig := kubeapiserver.NewStorageFactoryConfig() - storageFactoryConfig.APIResourceConfig = genericConfig.MergedResourceConfig - storageFactoryConfig.StorageConfig.StorageObjectCountTracker = genericConfig.StorageObjectCountTracker - storageFactory, lastErr = storageFactoryConfig.Complete(s.Etcd).New() - if lastErr != nil { - return - } - if lastErr = s.Etcd.ApplyWithStorageFactoryTo(storageFactory, genericConfig); lastErr != nil { - return - } - - // Use protobufs for self-communication. - // Since not every generic apiserver has to support protobufs, we - // cannot default to it in generic apiserver and need to explicitly - // set it in kube-apiserver. - genericConfig.LoopbackClientConfig.ContentConfig.ContentType = "application/vnd.kubernetes.protobuf" - // Disable compression for self-communication, since we are going to be - // on a fast local network - genericConfig.LoopbackClientConfig.DisableCompression = true - - kubeClientConfig := genericConfig.LoopbackClientConfig - clientgoExternalClient, err := clientgoclientset.NewForConfig(kubeClientConfig) - if err != nil { - lastErr = fmt.Errorf("failed to create real external clientset: %v", err) - return - } - versionedInformers = clientgoinformers.NewSharedInformerFactory(clientgoExternalClient, 10*time.Minute) - - // Authentication.ApplyTo requires already applied OpenAPIConfig and EgressSelector if present - if lastErr = s.Authentication.ApplyTo(&genericConfig.Authentication, genericConfig.SecureServing, genericConfig.EgressSelector, genericConfig.OpenAPIConfig, genericConfig.OpenAPIV3Config, clientgoExternalClient, versionedInformers); lastErr != nil { - return - } - - genericConfig.Authorization.Authorizer, genericConfig.RuleResolver, err = BuildAuthorizer(s, genericConfig.EgressSelector, versionedInformers) - if err != nil { - lastErr = fmt.Errorf("invalid authorization config: %v", err) - return - } - if !sets.NewString(s.Authorization.Modes...).Has(modes.ModeRBAC) { - genericConfig.DisabledPostStartHooks.Insert(rbacrest.PostStartHookName) - } - - lastErr = s.Audit.ApplyTo(genericConfig) - if lastErr != nil { - return - } - - admissionConfig := &kubeapiserveradmission.Config{ - ExternalInformers: versionedInformers, - LoopbackClientConfig: genericConfig.LoopbackClientConfig, - CloudConfigFile: s.CloudProvider.CloudConfigFile, - } - serviceResolver = buildServiceResolver(s.EnableAggregatorRouting, genericConfig.LoopbackClientConfig.Host, versionedInformers) - schemaResolver := resolver.NewDefinitionsSchemaResolver(k8sscheme.Scheme, genericConfig.OpenAPIConfig.GetDefinitions) - pluginInitializers, admissionPostStartHook, err = admissionConfig.New(proxyTransport, genericConfig.EgressSelector, serviceResolver, genericConfig.TracerProvider, schemaResolver) - if err != nil { - lastErr = fmt.Errorf("failed to create admission plugin initializer: %v", err) - return - } - - dynamicExternalClient, err := dynamic.NewForConfig(kubeClientConfig) - if err != nil { - lastErr = fmt.Errorf("failed to create real dynamic external client: %w", err) - return - } - - err = s.Admission.ApplyTo( - genericConfig, - versionedInformers, - clientgoExternalClient, - dynamicExternalClient, - utilfeature.DefaultFeatureGate, - pluginInitializers...) - if err != nil { - lastErr = fmt.Errorf("failed to initialize admission: %v", err) - return - } - - if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIPriorityAndFairness) && s.GenericServerRunOptions.EnablePriorityAndFairness { - genericConfig.FlowControl, lastErr = BuildPriorityAndFairness(s, clientgoExternalClient, versionedInformers) - } - if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.AggregatedDiscoveryEndpoint) { - genericConfig.AggregatedDiscoveryGroupManager = aggregated.NewResourceManager("apis") - } - - return -} - -// BuildAuthorizer constructs the authorizer -func BuildAuthorizer(s *options.ServerRunOptions, EgressSelector *egressselector.EgressSelector, versionedInformers clientgoinformers.SharedInformerFactory) (authorizer.Authorizer, authorizer.RuleResolver, error) { - authorizationConfig := s.Authorization.ToAuthorizationConfig(versionedInformers) - - if EgressSelector != nil { - egressDialer, err := EgressSelector.Lookup(egressselector.ControlPlane.AsNetworkContext()) - if err != nil { - return nil, nil, err - } - authorizationConfig.CustomDial = egressDialer - } - - return authorizationConfig.New() -} - -// BuildPriorityAndFairness constructs the guts of the API Priority and Fairness filter -func BuildPriorityAndFairness(s *options.ServerRunOptions, extclient clientgoclientset.Interface, versionedInformer clientgoinformers.SharedInformerFactory) (utilflowcontrol.Interface, error) { - if s.GenericServerRunOptions.MaxRequestsInFlight+s.GenericServerRunOptions.MaxMutatingRequestsInFlight <= 0 { - return nil, fmt.Errorf("invalid configuration: MaxRequestsInFlight=%d and MaxMutatingRequestsInFlight=%d; they must add up to something positive", s.GenericServerRunOptions.MaxRequestsInFlight, s.GenericServerRunOptions.MaxMutatingRequestsInFlight) - } - return utilflowcontrol.New( - versionedInformer, - extclient.FlowcontrolV1beta3(), - s.GenericServerRunOptions.MaxRequestsInFlight+s.GenericServerRunOptions.MaxMutatingRequestsInFlight, - s.GenericServerRunOptions.RequestTimeout/4, - ), nil -} - // completedServerRunOptions is a private wrapper that enforces a call of Complete() before Run can be invoked. type completedServerRunOptions struct { *options.ServerRunOptions @@ -657,6 +458,7 @@ func buildServiceResolver(enabledAggregatorRouting bool, hostname string, inform informer.Core().V1().Services().Lister(), ) } + // resolve kubernetes.default.svc locally if localHost, err := url.Parse(hostname); err == nil { serviceResolver = aggregatorapiserver.NewLoopbackServiceResolver(serviceResolver, localHost) diff --git a/pkg/controlplane/apiserver/.import-restrictions b/pkg/controlplane/apiserver/.import-restrictions new file mode 100644 index 00000000000..180994c1dc6 --- /dev/null +++ b/pkg/controlplane/apiserver/.import-restrictions @@ -0,0 +1,12 @@ +rules: + - selectorRegexp: k8s[.]io/kubernetes + allowedPrefixes: + - '' + forbiddenPrefixes: + # prevent pkg/ from depending on cmd/ + # note: pkg/kubemark overrides this + # - k8s.io/kubernetes/cmd # temporarily disabled until options are split and moved + # use sigs.k8s.io/yaml instead + - github.com/ghodss/yaml + # prevent kubernetes from opening sctp sockets (ref: https://github.com/kubernetes/kubernetes/pull/87926#discussion_r376642015) + - github.com/ishidawataru/sctp diff --git a/pkg/controlplane/apiserver/config.go b/pkg/controlplane/apiserver/config.go new file mode 100644 index 00000000000..bfacae91209 --- /dev/null +++ b/pkg/controlplane/apiserver/config.go @@ -0,0 +1,235 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package apiserver + +import ( + "fmt" + "net/http" + "time" + + oteltrace "go.opentelemetry.io/otel/trace" + + extensionsapiserver "k8s.io/apiextensions-apiserver/pkg/apiserver" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apiserver/pkg/admission" + "k8s.io/apiserver/pkg/authorization/authorizer" + "k8s.io/apiserver/pkg/cel/openapi/resolver" + "k8s.io/apiserver/pkg/endpoints/discovery/aggregated" + openapinamer "k8s.io/apiserver/pkg/endpoints/openapi" + genericfeatures "k8s.io/apiserver/pkg/features" + genericapiserver "k8s.io/apiserver/pkg/server" + "k8s.io/apiserver/pkg/server/egressselector" + "k8s.io/apiserver/pkg/server/filters" + serverstorage "k8s.io/apiserver/pkg/server/storage" + utilfeature "k8s.io/apiserver/pkg/util/feature" + utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol" + "k8s.io/apiserver/pkg/util/openapi" + "k8s.io/client-go/dynamic" + clientgoinformers "k8s.io/client-go/informers" + clientgoclientset "k8s.io/client-go/kubernetes" + k8sscheme "k8s.io/client-go/kubernetes/scheme" + "k8s.io/component-base/version" + aggregatorapiserver "k8s.io/kube-aggregator/pkg/apiserver" + aggregatorscheme "k8s.io/kube-aggregator/pkg/apiserver/scheme" + + "k8s.io/kubernetes/cmd/kube-apiserver/app/options" + "k8s.io/kubernetes/pkg/api/legacyscheme" + "k8s.io/kubernetes/pkg/controlplane" + generatedopenapi "k8s.io/kubernetes/pkg/generated/openapi" + "k8s.io/kubernetes/pkg/kubeapiserver" + kubeapiserveradmission "k8s.io/kubernetes/pkg/kubeapiserver/admission" + "k8s.io/kubernetes/pkg/kubeapiserver/authorizer/modes" + rbacrest "k8s.io/kubernetes/pkg/registry/rbac/rest" +) + +// BuildGenericConfig takes the master server options and produces the genericapiserver.Config associated with it +func BuildGenericConfig( + s *options.ServerRunOptions, + proxyTransport *http.Transport, +) ( + genericConfig *genericapiserver.Config, + versionedInformers clientgoinformers.SharedInformerFactory, + serviceResolver aggregatorapiserver.ServiceResolver, + pluginInitializers []admission.PluginInitializer, + admissionPostStartHook genericapiserver.PostStartHookFunc, + storageFactory *serverstorage.DefaultStorageFactory, + + lastErr error, +) { + genericConfig = genericapiserver.NewConfig(legacyscheme.Codecs) + genericConfig.MergedResourceConfig = controlplane.DefaultAPIResourceConfigSource() + + if lastErr = s.GenericServerRunOptions.ApplyTo(genericConfig); lastErr != nil { + return + } + + if lastErr = s.SecureServing.ApplyTo(&genericConfig.SecureServing, &genericConfig.LoopbackClientConfig); lastErr != nil { + return + } + if lastErr = s.Features.ApplyTo(genericConfig); lastErr != nil { + return + } + if lastErr = s.APIEnablement.ApplyTo(genericConfig, controlplane.DefaultAPIResourceConfigSource(), legacyscheme.Scheme); lastErr != nil { + return + } + if lastErr = s.EgressSelector.ApplyTo(genericConfig); lastErr != nil { + return + } + if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerTracing) { + if lastErr = s.Traces.ApplyTo(genericConfig.EgressSelector, genericConfig); lastErr != nil { + return + } + } + // wrap the definitions to revert any changes from disabled features + getOpenAPIDefinitions := openapi.GetOpenAPIDefinitionsWithoutDisabledFeatures(generatedopenapi.GetOpenAPIDefinitions) + namer := openapinamer.NewDefinitionNamer(legacyscheme.Scheme, extensionsapiserver.Scheme, aggregatorscheme.Scheme) + genericConfig.OpenAPIConfig = genericapiserver.DefaultOpenAPIConfig(getOpenAPIDefinitions, namer) + genericConfig.OpenAPIConfig.Info.Title = "Kubernetes" + genericConfig.OpenAPIV3Config = genericapiserver.DefaultOpenAPIV3Config(getOpenAPIDefinitions, namer) + genericConfig.OpenAPIV3Config.Info.Title = "Kubernetes" + + genericConfig.LongRunningFunc = filters.BasicLongRunningRequestCheck( + sets.NewString("watch", "proxy"), + sets.NewString("attach", "exec", "proxy", "log", "portforward"), + ) + + kubeVersion := version.Get() + genericConfig.Version = &kubeVersion + + if genericConfig.EgressSelector != nil { + s.Etcd.StorageConfig.Transport.EgressLookup = genericConfig.EgressSelector.Lookup + } + if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerTracing) { + s.Etcd.StorageConfig.Transport.TracerProvider = genericConfig.TracerProvider + } else { + s.Etcd.StorageConfig.Transport.TracerProvider = oteltrace.NewNoopTracerProvider() + } + + storageFactoryConfig := kubeapiserver.NewStorageFactoryConfig() + storageFactoryConfig.APIResourceConfig = genericConfig.MergedResourceConfig + storageFactory, lastErr = storageFactoryConfig.Complete(s.Etcd).New() + if lastErr != nil { + return + } + if lastErr = s.Etcd.ApplyWithStorageFactoryTo(storageFactory, genericConfig); lastErr != nil { + return + } + + // Use protobufs for self-communication. + // Since not every generic apiserver has to support protobufs, we + // cannot default to it in generic apiserver and need to explicitly + // set it in kube-apiserver. + genericConfig.LoopbackClientConfig.ContentConfig.ContentType = "application/vnd.kubernetes.protobuf" + // Disable compression for self-communication, since we are going to be + // on a fast local network + genericConfig.LoopbackClientConfig.DisableCompression = true + + kubeClientConfig := genericConfig.LoopbackClientConfig + clientgoExternalClient, err := clientgoclientset.NewForConfig(kubeClientConfig) + if err != nil { + lastErr = fmt.Errorf("failed to create real external clientset: %v", err) + return + } + versionedInformers = clientgoinformers.NewSharedInformerFactory(clientgoExternalClient, 10*time.Minute) + + // Authentication.ApplyTo requires already applied OpenAPIConfig and EgressSelector if present + if lastErr = s.Authentication.ApplyTo(&genericConfig.Authentication, genericConfig.SecureServing, genericConfig.EgressSelector, genericConfig.OpenAPIConfig, genericConfig.OpenAPIV3Config, clientgoExternalClient, versionedInformers); lastErr != nil { + return + } + + genericConfig.Authorization.Authorizer, genericConfig.RuleResolver, err = BuildAuthorizer(s, genericConfig.EgressSelector, versionedInformers) + if err != nil { + lastErr = fmt.Errorf("invalid authorization config: %v", err) + return + } + if !sets.NewString(s.Authorization.Modes...).Has(modes.ModeRBAC) { + genericConfig.DisabledPostStartHooks.Insert(rbacrest.PostStartHookName) + } + + lastErr = s.Audit.ApplyTo(genericConfig) + if lastErr != nil { + return + } + + admissionConfig := &kubeapiserveradmission.Config{ + ExternalInformers: versionedInformers, + LoopbackClientConfig: genericConfig.LoopbackClientConfig, + CloudConfigFile: s.CloudProvider.CloudConfigFile, + } + serviceResolver = buildServiceResolver(s.EnableAggregatorRouting, genericConfig.LoopbackClientConfig.Host, versionedInformers) + schemaResolver := resolver.NewDefinitionsSchemaResolver(k8sscheme.Scheme, genericConfig.OpenAPIConfig.GetDefinitions) + pluginInitializers, admissionPostStartHook, err = admissionConfig.New(proxyTransport, genericConfig.EgressSelector, serviceResolver, genericConfig.TracerProvider, schemaResolver) + if err != nil { + lastErr = fmt.Errorf("failed to create admission plugin initializer: %v", err) + return + } + + dynamicExternalClient, err := dynamic.NewForConfig(kubeClientConfig) + if err != nil { + lastErr = fmt.Errorf("failed to create real dynamic external client: %w", err) + return + } + + err = s.Admission.ApplyTo( + genericConfig, + versionedInformers, + clientgoExternalClient, + dynamicExternalClient, + utilfeature.DefaultFeatureGate, + pluginInitializers...) + if err != nil { + lastErr = fmt.Errorf("failed to initialize admission: %v", err) + return + } + + if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIPriorityAndFairness) && s.GenericServerRunOptions.EnablePriorityAndFairness { + genericConfig.FlowControl, lastErr = BuildPriorityAndFairness(s, clientgoExternalClient, versionedInformers) + } + if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.AggregatedDiscoveryEndpoint) { + genericConfig.AggregatedDiscoveryGroupManager = aggregated.NewResourceManager("apis") + } + + return +} + +// BuildAuthorizer constructs the authorizer +func BuildAuthorizer(s *options.ServerRunOptions, EgressSelector *egressselector.EgressSelector, versionedInformers clientgoinformers.SharedInformerFactory) (authorizer.Authorizer, authorizer.RuleResolver, error) { + authorizationConfig := s.Authorization.ToAuthorizationConfig(versionedInformers) + + if EgressSelector != nil { + egressDialer, err := EgressSelector.Lookup(egressselector.ControlPlane.AsNetworkContext()) + if err != nil { + return nil, nil, err + } + authorizationConfig.CustomDial = egressDialer + } + + return authorizationConfig.New() +} + +// BuildPriorityAndFairness constructs the guts of the API Priority and Fairness filter +func BuildPriorityAndFairness(s *options.ServerRunOptions, extclient clientgoclientset.Interface, versionedInformer clientgoinformers.SharedInformerFactory) (utilflowcontrol.Interface, error) { + if s.GenericServerRunOptions.MaxRequestsInFlight+s.GenericServerRunOptions.MaxMutatingRequestsInFlight <= 0 { + return nil, fmt.Errorf("invalid configuration: MaxRequestsInFlight=%d and MaxMutatingRequestsInFlight=%d; they must add up to something positive", s.GenericServerRunOptions.MaxRequestsInFlight, s.GenericServerRunOptions.MaxMutatingRequestsInFlight) + } + return utilflowcontrol.New( + versionedInformer, + extclient.FlowcontrolV1beta3(), + s.GenericServerRunOptions.MaxRequestsInFlight+s.GenericServerRunOptions.MaxMutatingRequestsInFlight, + s.GenericServerRunOptions.RequestTimeout/4, + ), nil +}