MOVE: cmd/kube-apiserver: move genericapiserver construction into controlplane package

This commit is contained in:
Dr. Stefan Schimanski 2023-06-02 11:17:27 +02:00
parent 9be6e7bb33
commit 43be9c3a0a
No known key found for this signature in database
GPG Key ID: 4C68E0F19F95EC33
3 changed files with 251 additions and 202 deletions

View File

@ -30,62 +30,41 @@ import (
"time"
"github.com/spf13/cobra"
"k8s.io/client-go/dynamic"
oteltrace "go.opentelemetry.io/otel/trace"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
extensionsapiserver "k8s.io/apiextensions-apiserver/pkg/apiserver"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
utilnet "k8s.io/apimachinery/pkg/util/net"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apiserver/pkg/admission"
"k8s.io/apiserver/pkg/authorization/authorizer"
"k8s.io/apiserver/pkg/cel/openapi/resolver"
"k8s.io/apiserver/pkg/endpoints/discovery/aggregated"
genericapifilters "k8s.io/apiserver/pkg/endpoints/filters"
openapinamer "k8s.io/apiserver/pkg/endpoints/openapi"
genericfeatures "k8s.io/apiserver/pkg/features"
genericapiserver "k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/server/egressselector"
"k8s.io/apiserver/pkg/server/filters"
serveroptions "k8s.io/apiserver/pkg/server/options"
serverstorage "k8s.io/apiserver/pkg/server/storage"
utilfeature "k8s.io/apiserver/pkg/util/feature"
utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol"
"k8s.io/apiserver/pkg/util/notfoundhandler"
"k8s.io/apiserver/pkg/util/openapi"
"k8s.io/apiserver/pkg/util/webhook"
clientgoinformers "k8s.io/client-go/informers"
clientgoclientset "k8s.io/client-go/kubernetes"
k8sscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"k8s.io/client-go/util/keyutil"
cliflag "k8s.io/component-base/cli/flag"
"k8s.io/component-base/cli/globalflag"
"k8s.io/component-base/logs"
logsapi "k8s.io/component-base/logs/api/v1"
_ "k8s.io/component-base/metrics/prometheus/workqueue" // for workqueue metric registration
_ "k8s.io/component-base/metrics/prometheus/workqueue"
"k8s.io/component-base/term"
"k8s.io/component-base/version"
"k8s.io/component-base/version/verflag"
"k8s.io/klog/v2"
aggregatorapiserver "k8s.io/kube-aggregator/pkg/apiserver"
aggregatorscheme "k8s.io/kube-aggregator/pkg/apiserver/scheme"
netutils "k8s.io/utils/net"
"k8s.io/kubernetes/cmd/kube-apiserver/app/options"
"k8s.io/kubernetes/pkg/api/legacyscheme"
"k8s.io/kubernetes/pkg/capabilities"
"k8s.io/kubernetes/pkg/controlplane"
controlplaneapiserver "k8s.io/kubernetes/pkg/controlplane/apiserver"
"k8s.io/kubernetes/pkg/controlplane/reconcilers"
generatedopenapi "k8s.io/kubernetes/pkg/generated/openapi"
"k8s.io/kubernetes/pkg/kubeapiserver"
kubeapiserveradmission "k8s.io/kubernetes/pkg/kubeapiserver/admission"
kubeauthenticator "k8s.io/kubernetes/pkg/kubeapiserver/authenticator"
"k8s.io/kubernetes/pkg/kubeapiserver/authorizer/modes"
rbacrest "k8s.io/kubernetes/pkg/registry/rbac/rest"
"k8s.io/kubernetes/pkg/serviceaccount"
)
@ -234,7 +213,7 @@ func CreateKubeAPIServerConfig(s completedServerRunOptions) (
) {
proxyTransport := CreateProxyTransport()
genericConfig, versionedInformers, serviceResolver, pluginInitializers, admissionPostStartHook, storageFactory, err := buildGenericConfig(s.ServerRunOptions, proxyTransport)
genericConfig, versionedInformers, serviceResolver, pluginInitializers, admissionPostStartHook, storageFactory, err := controlplaneapiserver.BuildGenericConfig(s.ServerRunOptions, proxyTransport)
if err != nil {
return nil, nil, nil, err
}
@ -328,184 +307,6 @@ func CreateKubeAPIServerConfig(s completedServerRunOptions) (
return config, serviceResolver, pluginInitializers, nil
}
// buildGenericConfig takes the master server options and produces the genericapiserver.Config associated with it
func buildGenericConfig(
s *options.ServerRunOptions,
proxyTransport *http.Transport,
) (
genericConfig *genericapiserver.Config,
versionedInformers clientgoinformers.SharedInformerFactory,
serviceResolver aggregatorapiserver.ServiceResolver,
pluginInitializers []admission.PluginInitializer,
admissionPostStartHook genericapiserver.PostStartHookFunc,
storageFactory *serverstorage.DefaultStorageFactory,
lastErr error,
) {
genericConfig = genericapiserver.NewConfig(legacyscheme.Codecs)
genericConfig.MergedResourceConfig = controlplane.DefaultAPIResourceConfigSource()
if lastErr = s.GenericServerRunOptions.ApplyTo(genericConfig); lastErr != nil {
return
}
if lastErr = s.SecureServing.ApplyTo(&genericConfig.SecureServing, &genericConfig.LoopbackClientConfig); lastErr != nil {
return
}
if lastErr = s.Features.ApplyTo(genericConfig); lastErr != nil {
return
}
if lastErr = s.APIEnablement.ApplyTo(genericConfig, controlplane.DefaultAPIResourceConfigSource(), legacyscheme.Scheme); lastErr != nil {
return
}
if lastErr = s.EgressSelector.ApplyTo(genericConfig); lastErr != nil {
return
}
if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerTracing) {
if lastErr = s.Traces.ApplyTo(genericConfig.EgressSelector, genericConfig); lastErr != nil {
return
}
}
// wrap the definitions to revert any changes from disabled features
getOpenAPIDefinitions := openapi.GetOpenAPIDefinitionsWithoutDisabledFeatures(generatedopenapi.GetOpenAPIDefinitions)
namer := openapinamer.NewDefinitionNamer(legacyscheme.Scheme, extensionsapiserver.Scheme, aggregatorscheme.Scheme)
genericConfig.OpenAPIConfig = genericapiserver.DefaultOpenAPIConfig(getOpenAPIDefinitions, namer)
genericConfig.OpenAPIConfig.Info.Title = "Kubernetes"
genericConfig.OpenAPIV3Config = genericapiserver.DefaultOpenAPIV3Config(getOpenAPIDefinitions, namer)
genericConfig.OpenAPIV3Config.Info.Title = "Kubernetes"
genericConfig.LongRunningFunc = filters.BasicLongRunningRequestCheck(
sets.NewString("watch", "proxy"),
sets.NewString("attach", "exec", "proxy", "log", "portforward"),
)
kubeVersion := version.Get()
genericConfig.Version = &kubeVersion
if genericConfig.EgressSelector != nil {
s.Etcd.StorageConfig.Transport.EgressLookup = genericConfig.EgressSelector.Lookup
}
if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerTracing) {
s.Etcd.StorageConfig.Transport.TracerProvider = genericConfig.TracerProvider
} else {
s.Etcd.StorageConfig.Transport.TracerProvider = oteltrace.NewNoopTracerProvider()
}
storageFactoryConfig := kubeapiserver.NewStorageFactoryConfig()
storageFactoryConfig.APIResourceConfig = genericConfig.MergedResourceConfig
storageFactoryConfig.StorageConfig.StorageObjectCountTracker = genericConfig.StorageObjectCountTracker
storageFactory, lastErr = storageFactoryConfig.Complete(s.Etcd).New()
if lastErr != nil {
return
}
if lastErr = s.Etcd.ApplyWithStorageFactoryTo(storageFactory, genericConfig); lastErr != nil {
return
}
// Use protobufs for self-communication.
// Since not every generic apiserver has to support protobufs, we
// cannot default to it in generic apiserver and need to explicitly
// set it in kube-apiserver.
genericConfig.LoopbackClientConfig.ContentConfig.ContentType = "application/vnd.kubernetes.protobuf"
// Disable compression for self-communication, since we are going to be
// on a fast local network
genericConfig.LoopbackClientConfig.DisableCompression = true
kubeClientConfig := genericConfig.LoopbackClientConfig
clientgoExternalClient, err := clientgoclientset.NewForConfig(kubeClientConfig)
if err != nil {
lastErr = fmt.Errorf("failed to create real external clientset: %v", err)
return
}
versionedInformers = clientgoinformers.NewSharedInformerFactory(clientgoExternalClient, 10*time.Minute)
// Authentication.ApplyTo requires already applied OpenAPIConfig and EgressSelector if present
if lastErr = s.Authentication.ApplyTo(&genericConfig.Authentication, genericConfig.SecureServing, genericConfig.EgressSelector, genericConfig.OpenAPIConfig, genericConfig.OpenAPIV3Config, clientgoExternalClient, versionedInformers); lastErr != nil {
return
}
genericConfig.Authorization.Authorizer, genericConfig.RuleResolver, err = BuildAuthorizer(s, genericConfig.EgressSelector, versionedInformers)
if err != nil {
lastErr = fmt.Errorf("invalid authorization config: %v", err)
return
}
if !sets.NewString(s.Authorization.Modes...).Has(modes.ModeRBAC) {
genericConfig.DisabledPostStartHooks.Insert(rbacrest.PostStartHookName)
}
lastErr = s.Audit.ApplyTo(genericConfig)
if lastErr != nil {
return
}
admissionConfig := &kubeapiserveradmission.Config{
ExternalInformers: versionedInformers,
LoopbackClientConfig: genericConfig.LoopbackClientConfig,
CloudConfigFile: s.CloudProvider.CloudConfigFile,
}
serviceResolver = buildServiceResolver(s.EnableAggregatorRouting, genericConfig.LoopbackClientConfig.Host, versionedInformers)
schemaResolver := resolver.NewDefinitionsSchemaResolver(k8sscheme.Scheme, genericConfig.OpenAPIConfig.GetDefinitions)
pluginInitializers, admissionPostStartHook, err = admissionConfig.New(proxyTransport, genericConfig.EgressSelector, serviceResolver, genericConfig.TracerProvider, schemaResolver)
if err != nil {
lastErr = fmt.Errorf("failed to create admission plugin initializer: %v", err)
return
}
dynamicExternalClient, err := dynamic.NewForConfig(kubeClientConfig)
if err != nil {
lastErr = fmt.Errorf("failed to create real dynamic external client: %w", err)
return
}
err = s.Admission.ApplyTo(
genericConfig,
versionedInformers,
clientgoExternalClient,
dynamicExternalClient,
utilfeature.DefaultFeatureGate,
pluginInitializers...)
if err != nil {
lastErr = fmt.Errorf("failed to initialize admission: %v", err)
return
}
if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIPriorityAndFairness) && s.GenericServerRunOptions.EnablePriorityAndFairness {
genericConfig.FlowControl, lastErr = BuildPriorityAndFairness(s, clientgoExternalClient, versionedInformers)
}
if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.AggregatedDiscoveryEndpoint) {
genericConfig.AggregatedDiscoveryGroupManager = aggregated.NewResourceManager("apis")
}
return
}
// BuildAuthorizer constructs the authorizer
func BuildAuthorizer(s *options.ServerRunOptions, EgressSelector *egressselector.EgressSelector, versionedInformers clientgoinformers.SharedInformerFactory) (authorizer.Authorizer, authorizer.RuleResolver, error) {
authorizationConfig := s.Authorization.ToAuthorizationConfig(versionedInformers)
if EgressSelector != nil {
egressDialer, err := EgressSelector.Lookup(egressselector.ControlPlane.AsNetworkContext())
if err != nil {
return nil, nil, err
}
authorizationConfig.CustomDial = egressDialer
}
return authorizationConfig.New()
}
// BuildPriorityAndFairness constructs the guts of the API Priority and Fairness filter
func BuildPriorityAndFairness(s *options.ServerRunOptions, extclient clientgoclientset.Interface, versionedInformer clientgoinformers.SharedInformerFactory) (utilflowcontrol.Interface, error) {
if s.GenericServerRunOptions.MaxRequestsInFlight+s.GenericServerRunOptions.MaxMutatingRequestsInFlight <= 0 {
return nil, fmt.Errorf("invalid configuration: MaxRequestsInFlight=%d and MaxMutatingRequestsInFlight=%d; they must add up to something positive", s.GenericServerRunOptions.MaxRequestsInFlight, s.GenericServerRunOptions.MaxMutatingRequestsInFlight)
}
return utilflowcontrol.New(
versionedInformer,
extclient.FlowcontrolV1beta3(),
s.GenericServerRunOptions.MaxRequestsInFlight+s.GenericServerRunOptions.MaxMutatingRequestsInFlight,
s.GenericServerRunOptions.RequestTimeout/4,
), nil
}
// completedServerRunOptions is a private wrapper that enforces a call of Complete() before Run can be invoked.
type completedServerRunOptions struct {
*options.ServerRunOptions
@ -657,6 +458,7 @@ func buildServiceResolver(enabledAggregatorRouting bool, hostname string, inform
informer.Core().V1().Services().Lister(),
)
}
// resolve kubernetes.default.svc locally
if localHost, err := url.Parse(hostname); err == nil {
serviceResolver = aggregatorapiserver.NewLoopbackServiceResolver(serviceResolver, localHost)

View File

@ -0,0 +1,12 @@
rules:
- selectorRegexp: k8s[.]io/kubernetes
allowedPrefixes:
- ''
forbiddenPrefixes:
# prevent pkg/ from depending on cmd/
# note: pkg/kubemark overrides this
# - k8s.io/kubernetes/cmd # temporarily disabled until options are split and moved
# use sigs.k8s.io/yaml instead
- github.com/ghodss/yaml
# prevent kubernetes from opening sctp sockets (ref: https://github.com/kubernetes/kubernetes/pull/87926#discussion_r376642015)
- github.com/ishidawataru/sctp

View File

@ -0,0 +1,235 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package apiserver
import (
"fmt"
"net/http"
"time"
oteltrace "go.opentelemetry.io/otel/trace"
extensionsapiserver "k8s.io/apiextensions-apiserver/pkg/apiserver"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apiserver/pkg/admission"
"k8s.io/apiserver/pkg/authorization/authorizer"
"k8s.io/apiserver/pkg/cel/openapi/resolver"
"k8s.io/apiserver/pkg/endpoints/discovery/aggregated"
openapinamer "k8s.io/apiserver/pkg/endpoints/openapi"
genericfeatures "k8s.io/apiserver/pkg/features"
genericapiserver "k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/server/egressselector"
"k8s.io/apiserver/pkg/server/filters"
serverstorage "k8s.io/apiserver/pkg/server/storage"
utilfeature "k8s.io/apiserver/pkg/util/feature"
utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol"
"k8s.io/apiserver/pkg/util/openapi"
"k8s.io/client-go/dynamic"
clientgoinformers "k8s.io/client-go/informers"
clientgoclientset "k8s.io/client-go/kubernetes"
k8sscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/component-base/version"
aggregatorapiserver "k8s.io/kube-aggregator/pkg/apiserver"
aggregatorscheme "k8s.io/kube-aggregator/pkg/apiserver/scheme"
"k8s.io/kubernetes/cmd/kube-apiserver/app/options"
"k8s.io/kubernetes/pkg/api/legacyscheme"
"k8s.io/kubernetes/pkg/controlplane"
generatedopenapi "k8s.io/kubernetes/pkg/generated/openapi"
"k8s.io/kubernetes/pkg/kubeapiserver"
kubeapiserveradmission "k8s.io/kubernetes/pkg/kubeapiserver/admission"
"k8s.io/kubernetes/pkg/kubeapiserver/authorizer/modes"
rbacrest "k8s.io/kubernetes/pkg/registry/rbac/rest"
)
// BuildGenericConfig takes the master server options and produces the genericapiserver.Config associated with it
func BuildGenericConfig(
s *options.ServerRunOptions,
proxyTransport *http.Transport,
) (
genericConfig *genericapiserver.Config,
versionedInformers clientgoinformers.SharedInformerFactory,
serviceResolver aggregatorapiserver.ServiceResolver,
pluginInitializers []admission.PluginInitializer,
admissionPostStartHook genericapiserver.PostStartHookFunc,
storageFactory *serverstorage.DefaultStorageFactory,
lastErr error,
) {
genericConfig = genericapiserver.NewConfig(legacyscheme.Codecs)
genericConfig.MergedResourceConfig = controlplane.DefaultAPIResourceConfigSource()
if lastErr = s.GenericServerRunOptions.ApplyTo(genericConfig); lastErr != nil {
return
}
if lastErr = s.SecureServing.ApplyTo(&genericConfig.SecureServing, &genericConfig.LoopbackClientConfig); lastErr != nil {
return
}
if lastErr = s.Features.ApplyTo(genericConfig); lastErr != nil {
return
}
if lastErr = s.APIEnablement.ApplyTo(genericConfig, controlplane.DefaultAPIResourceConfigSource(), legacyscheme.Scheme); lastErr != nil {
return
}
if lastErr = s.EgressSelector.ApplyTo(genericConfig); lastErr != nil {
return
}
if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerTracing) {
if lastErr = s.Traces.ApplyTo(genericConfig.EgressSelector, genericConfig); lastErr != nil {
return
}
}
// wrap the definitions to revert any changes from disabled features
getOpenAPIDefinitions := openapi.GetOpenAPIDefinitionsWithoutDisabledFeatures(generatedopenapi.GetOpenAPIDefinitions)
namer := openapinamer.NewDefinitionNamer(legacyscheme.Scheme, extensionsapiserver.Scheme, aggregatorscheme.Scheme)
genericConfig.OpenAPIConfig = genericapiserver.DefaultOpenAPIConfig(getOpenAPIDefinitions, namer)
genericConfig.OpenAPIConfig.Info.Title = "Kubernetes"
genericConfig.OpenAPIV3Config = genericapiserver.DefaultOpenAPIV3Config(getOpenAPIDefinitions, namer)
genericConfig.OpenAPIV3Config.Info.Title = "Kubernetes"
genericConfig.LongRunningFunc = filters.BasicLongRunningRequestCheck(
sets.NewString("watch", "proxy"),
sets.NewString("attach", "exec", "proxy", "log", "portforward"),
)
kubeVersion := version.Get()
genericConfig.Version = &kubeVersion
if genericConfig.EgressSelector != nil {
s.Etcd.StorageConfig.Transport.EgressLookup = genericConfig.EgressSelector.Lookup
}
if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerTracing) {
s.Etcd.StorageConfig.Transport.TracerProvider = genericConfig.TracerProvider
} else {
s.Etcd.StorageConfig.Transport.TracerProvider = oteltrace.NewNoopTracerProvider()
}
storageFactoryConfig := kubeapiserver.NewStorageFactoryConfig()
storageFactoryConfig.APIResourceConfig = genericConfig.MergedResourceConfig
storageFactory, lastErr = storageFactoryConfig.Complete(s.Etcd).New()
if lastErr != nil {
return
}
if lastErr = s.Etcd.ApplyWithStorageFactoryTo(storageFactory, genericConfig); lastErr != nil {
return
}
// Use protobufs for self-communication.
// Since not every generic apiserver has to support protobufs, we
// cannot default to it in generic apiserver and need to explicitly
// set it in kube-apiserver.
genericConfig.LoopbackClientConfig.ContentConfig.ContentType = "application/vnd.kubernetes.protobuf"
// Disable compression for self-communication, since we are going to be
// on a fast local network
genericConfig.LoopbackClientConfig.DisableCompression = true
kubeClientConfig := genericConfig.LoopbackClientConfig
clientgoExternalClient, err := clientgoclientset.NewForConfig(kubeClientConfig)
if err != nil {
lastErr = fmt.Errorf("failed to create real external clientset: %v", err)
return
}
versionedInformers = clientgoinformers.NewSharedInformerFactory(clientgoExternalClient, 10*time.Minute)
// Authentication.ApplyTo requires already applied OpenAPIConfig and EgressSelector if present
if lastErr = s.Authentication.ApplyTo(&genericConfig.Authentication, genericConfig.SecureServing, genericConfig.EgressSelector, genericConfig.OpenAPIConfig, genericConfig.OpenAPIV3Config, clientgoExternalClient, versionedInformers); lastErr != nil {
return
}
genericConfig.Authorization.Authorizer, genericConfig.RuleResolver, err = BuildAuthorizer(s, genericConfig.EgressSelector, versionedInformers)
if err != nil {
lastErr = fmt.Errorf("invalid authorization config: %v", err)
return
}
if !sets.NewString(s.Authorization.Modes...).Has(modes.ModeRBAC) {
genericConfig.DisabledPostStartHooks.Insert(rbacrest.PostStartHookName)
}
lastErr = s.Audit.ApplyTo(genericConfig)
if lastErr != nil {
return
}
admissionConfig := &kubeapiserveradmission.Config{
ExternalInformers: versionedInformers,
LoopbackClientConfig: genericConfig.LoopbackClientConfig,
CloudConfigFile: s.CloudProvider.CloudConfigFile,
}
serviceResolver = buildServiceResolver(s.EnableAggregatorRouting, genericConfig.LoopbackClientConfig.Host, versionedInformers)
schemaResolver := resolver.NewDefinitionsSchemaResolver(k8sscheme.Scheme, genericConfig.OpenAPIConfig.GetDefinitions)
pluginInitializers, admissionPostStartHook, err = admissionConfig.New(proxyTransport, genericConfig.EgressSelector, serviceResolver, genericConfig.TracerProvider, schemaResolver)
if err != nil {
lastErr = fmt.Errorf("failed to create admission plugin initializer: %v", err)
return
}
dynamicExternalClient, err := dynamic.NewForConfig(kubeClientConfig)
if err != nil {
lastErr = fmt.Errorf("failed to create real dynamic external client: %w", err)
return
}
err = s.Admission.ApplyTo(
genericConfig,
versionedInformers,
clientgoExternalClient,
dynamicExternalClient,
utilfeature.DefaultFeatureGate,
pluginInitializers...)
if err != nil {
lastErr = fmt.Errorf("failed to initialize admission: %v", err)
return
}
if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIPriorityAndFairness) && s.GenericServerRunOptions.EnablePriorityAndFairness {
genericConfig.FlowControl, lastErr = BuildPriorityAndFairness(s, clientgoExternalClient, versionedInformers)
}
if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.AggregatedDiscoveryEndpoint) {
genericConfig.AggregatedDiscoveryGroupManager = aggregated.NewResourceManager("apis")
}
return
}
// BuildAuthorizer constructs the authorizer
func BuildAuthorizer(s *options.ServerRunOptions, EgressSelector *egressselector.EgressSelector, versionedInformers clientgoinformers.SharedInformerFactory) (authorizer.Authorizer, authorizer.RuleResolver, error) {
authorizationConfig := s.Authorization.ToAuthorizationConfig(versionedInformers)
if EgressSelector != nil {
egressDialer, err := EgressSelector.Lookup(egressselector.ControlPlane.AsNetworkContext())
if err != nil {
return nil, nil, err
}
authorizationConfig.CustomDial = egressDialer
}
return authorizationConfig.New()
}
// BuildPriorityAndFairness constructs the guts of the API Priority and Fairness filter
func BuildPriorityAndFairness(s *options.ServerRunOptions, extclient clientgoclientset.Interface, versionedInformer clientgoinformers.SharedInformerFactory) (utilflowcontrol.Interface, error) {
if s.GenericServerRunOptions.MaxRequestsInFlight+s.GenericServerRunOptions.MaxMutatingRequestsInFlight <= 0 {
return nil, fmt.Errorf("invalid configuration: MaxRequestsInFlight=%d and MaxMutatingRequestsInFlight=%d; they must add up to something positive", s.GenericServerRunOptions.MaxRequestsInFlight, s.GenericServerRunOptions.MaxMutatingRequestsInFlight)
}
return utilflowcontrol.New(
versionedInformer,
extclient.FlowcontrolV1beta3(),
s.GenericServerRunOptions.MaxRequestsInFlight+s.GenericServerRunOptions.MaxMutatingRequestsInFlight,
s.GenericServerRunOptions.RequestTimeout/4,
), nil
}