mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-29 14:37:00 +00:00
break kube-apiserver start into stages
This commit is contained in:
parent
681ba3a6d3
commit
4acd751101
@ -69,6 +69,8 @@ go_library(
|
|||||||
"//vendor:k8s.io/apimachinery/pkg/util/sets",
|
"//vendor:k8s.io/apimachinery/pkg/util/sets",
|
||||||
"//vendor:k8s.io/apimachinery/pkg/util/wait",
|
"//vendor:k8s.io/apimachinery/pkg/util/wait",
|
||||||
"//vendor:k8s.io/apiserver/pkg/admission",
|
"//vendor:k8s.io/apiserver/pkg/admission",
|
||||||
|
"//vendor:k8s.io/apiserver/pkg/authentication/authenticator",
|
||||||
|
"//vendor:k8s.io/apiserver/pkg/authorization/authorizer",
|
||||||
"//vendor:k8s.io/apiserver/pkg/server",
|
"//vendor:k8s.io/apiserver/pkg/server",
|
||||||
"//vendor:k8s.io/apiserver/pkg/server/filters",
|
"//vendor:k8s.io/apiserver/pkg/server/filters",
|
||||||
"//vendor:k8s.io/apiserver/pkg/server/storage",
|
"//vendor:k8s.io/apiserver/pkg/server/storage",
|
||||||
|
@ -27,6 +27,7 @@ import (
|
|||||||
"net/http"
|
"net/http"
|
||||||
"net/url"
|
"net/url"
|
||||||
"os"
|
"os"
|
||||||
|
"reflect"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
@ -45,6 +46,8 @@ import (
|
|||||||
"k8s.io/apimachinery/pkg/util/wait"
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
utilwait "k8s.io/apimachinery/pkg/util/wait"
|
utilwait "k8s.io/apimachinery/pkg/util/wait"
|
||||||
"k8s.io/apiserver/pkg/admission"
|
"k8s.io/apiserver/pkg/admission"
|
||||||
|
"k8s.io/apiserver/pkg/authentication/authenticator"
|
||||||
|
"k8s.io/apiserver/pkg/authorization/authorizer"
|
||||||
genericapiserver "k8s.io/apiserver/pkg/server"
|
genericapiserver "k8s.io/apiserver/pkg/server"
|
||||||
"k8s.io/apiserver/pkg/server/filters"
|
"k8s.io/apiserver/pkg/server/filters"
|
||||||
serverstorage "k8s.io/apiserver/pkg/server/storage"
|
serverstorage "k8s.io/apiserver/pkg/server/storage"
|
||||||
@ -113,60 +116,25 @@ func RunServer(config *master.Config, sharedInformers informers.SharedInformerFa
|
|||||||
|
|
||||||
// BuildMasterConfig creates all the resources for running the API server, but runs none of them
|
// BuildMasterConfig creates all the resources for running the API server, but runs none of them
|
||||||
func BuildMasterConfig(s *options.ServerRunOptions) (*master.Config, informers.SharedInformerFactory, error) {
|
func BuildMasterConfig(s *options.ServerRunOptions) (*master.Config, informers.SharedInformerFactory, error) {
|
||||||
// set defaults
|
// set defaults in the options before trying to create the generic config
|
||||||
if err := s.GenericServerRunOptions.DefaultAdvertiseAddress(s.SecureServing, s.InsecureServing); err != nil {
|
if err := defaultOptions(s); err != nil {
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
serviceIPRange, apiServerServiceIP, err := master.DefaultServiceIPRange(s.ServiceClusterIPRange)
|
|
||||||
if err != nil {
|
|
||||||
return nil, nil, fmt.Errorf("error determining service IP ranges: %v", err)
|
|
||||||
}
|
|
||||||
if err := s.SecureServing.MaybeDefaultWithSelfSignedCerts(s.GenericServerRunOptions.AdvertiseAddress.String(), apiServerServiceIP); err != nil {
|
|
||||||
return nil, nil, fmt.Errorf("error creating self-signed certificates: %v", err)
|
|
||||||
}
|
|
||||||
if err := s.CloudProvider.DefaultExternalHost(s.GenericServerRunOptions); err != nil {
|
|
||||||
return nil, nil, fmt.Errorf("error setting the external host value: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
s.Authentication.ApplyAuthorization(s.Authorization)
|
|
||||||
|
|
||||||
// validate options
|
// validate options
|
||||||
if errs := s.Validate(); len(errs) != 0 {
|
if errs := s.Validate(); len(errs) != 0 {
|
||||||
return nil, nil, utilerrors.NewAggregate(errs)
|
return nil, nil, utilerrors.NewAggregate(errs)
|
||||||
}
|
}
|
||||||
|
|
||||||
// create config from options
|
genericConfig, sharedInformers, err := BuildGenericConfig(s)
|
||||||
genericConfig := genericapiserver.NewConfig().
|
if err != nil {
|
||||||
WithSerializer(api.Codecs)
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
|
||||||
if err := s.GenericServerRunOptions.ApplyTo(genericConfig); err != nil {
|
|
||||||
return nil, nil, err
|
|
||||||
}
|
|
||||||
if err := s.InsecureServing.ApplyTo(genericConfig); err != nil {
|
|
||||||
return nil, nil, err
|
|
||||||
}
|
|
||||||
if err := s.SecureServing.ApplyTo(genericConfig); err != nil {
|
|
||||||
return nil, nil, err
|
|
||||||
}
|
|
||||||
if err := s.Authentication.ApplyTo(genericConfig); err != nil {
|
|
||||||
return nil, nil, err
|
|
||||||
}
|
|
||||||
if err := s.Audit.ApplyTo(genericConfig); err != nil {
|
|
||||||
return nil, nil, err
|
|
||||||
}
|
|
||||||
if err := s.Features.ApplyTo(genericConfig); err != nil {
|
|
||||||
return nil, nil, err
|
|
||||||
}
|
|
||||||
if err := utilwait.PollImmediate(etcdRetryInterval, etcdRetryLimit*etcdRetryInterval, preflight.EtcdConnection{ServerList: s.Etcd.StorageConfig.ServerList}.CheckEtcdServers); err != nil {
|
if err := utilwait.PollImmediate(etcdRetryInterval, etcdRetryLimit*etcdRetryInterval, preflight.EtcdConnection{ServerList: s.Etcd.StorageConfig.ServerList}.CheckEtcdServers); err != nil {
|
||||||
return nil, nil, fmt.Errorf("error waiting for etcd connection: %v", err)
|
return nil, nil, fmt.Errorf("error waiting for etcd connection: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Use protobufs for self-communication.
|
|
||||||
// Since not every generic apiserver has to support protobufs, we
|
|
||||||
// cannot default to it in generic apiserver and need to explicitly
|
|
||||||
// set it in kube-apiserver.
|
|
||||||
genericConfig.LoopbackClientConfig.ContentConfig.ContentType = "application/vnd.kubernetes.protobuf"
|
|
||||||
|
|
||||||
capabilities.Initialize(capabilities.Capabilities{
|
capabilities.Initialize(capabilities.Capabilities{
|
||||||
AllowPrivileged: s.AllowPrivileged,
|
AllowPrivileged: s.AllowPrivileged,
|
||||||
// TODO(vmarmol): Implement support for HostNetworkSources.
|
// TODO(vmarmol): Implement support for HostNetworkSources.
|
||||||
@ -214,164 +182,20 @@ func BuildMasterConfig(s *options.ServerRunOptions) (*master.Config, informers.S
|
|||||||
// Use the nodeTunneler's dialer when proxying to pods, services, and nodes
|
// Use the nodeTunneler's dialer when proxying to pods, services, and nodes
|
||||||
proxyDialerFn = nodeTunneler.Dial
|
proxyDialerFn = nodeTunneler.Dial
|
||||||
}
|
}
|
||||||
|
|
||||||
// Proxying to pods and services is IP-based... don't expect to be able to verify the hostname
|
// Proxying to pods and services is IP-based... don't expect to be able to verify the hostname
|
||||||
proxyTLSClientConfig := &tls.Config{InsecureSkipVerify: true}
|
proxyTLSClientConfig := &tls.Config{InsecureSkipVerify: true}
|
||||||
|
|
||||||
if s.Etcd.StorageConfig.DeserializationCacheSize == 0 {
|
|
||||||
// When size of cache is not explicitly set, estimate its size based on
|
|
||||||
// target memory usage.
|
|
||||||
glog.V(2).Infof("Initializing deserialization cache size based on %dMB limit", s.GenericServerRunOptions.TargetRAMMB)
|
|
||||||
|
|
||||||
// This is the heuristics that from memory capacity is trying to infer
|
|
||||||
// the maximum number of nodes in the cluster and set cache sizes based
|
|
||||||
// on that value.
|
|
||||||
// From our documentation, we officially recomment 120GB machines for
|
|
||||||
// 2000 nodes, and we scale from that point. Thus we assume ~60MB of
|
|
||||||
// capacity per node.
|
|
||||||
// TODO: We may consider deciding that some percentage of memory will
|
|
||||||
// be used for the deserialization cache and divide it by the max object
|
|
||||||
// size to compute its size. We may even go further and measure
|
|
||||||
// collective sizes of the objects in the cache.
|
|
||||||
clusterSize := s.GenericServerRunOptions.TargetRAMMB / 60
|
|
||||||
s.Etcd.StorageConfig.DeserializationCacheSize = 25 * clusterSize
|
|
||||||
if s.Etcd.StorageConfig.DeserializationCacheSize < 1000 {
|
|
||||||
s.Etcd.StorageConfig.DeserializationCacheSize = 1000
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
storageGroupsToEncodingVersion, err := s.StorageSerialization.StorageGroupsToEncodingVersion()
|
|
||||||
if err != nil {
|
|
||||||
return nil, nil, fmt.Errorf("error generating storage version map: %s", err)
|
|
||||||
}
|
|
||||||
storageFactory, err := kubeapiserver.NewStorageFactory(
|
|
||||||
s.Etcd.StorageConfig, s.Etcd.DefaultStorageMediaType, api.Codecs,
|
|
||||||
serverstorage.NewDefaultResourceEncodingConfig(api.Registry), storageGroupsToEncodingVersion,
|
|
||||||
// FIXME: this GroupVersionResource override should be configurable
|
|
||||||
[]schema.GroupVersionResource{batch.Resource("cronjobs").WithVersion("v2alpha1")},
|
|
||||||
master.DefaultAPIResourceConfigSource(), s.APIEnablement.RuntimeConfig)
|
|
||||||
if err != nil {
|
|
||||||
return nil, nil, fmt.Errorf("error in initializing storage factory: %s", err)
|
|
||||||
}
|
|
||||||
// keep Deployments in extensions for backwards compatibility, we'll have to migrate at some point, eventually
|
|
||||||
storageFactory.AddCohabitatingResources(extensions.Resource("deployments"), apps.Resource("deployments"))
|
|
||||||
for _, override := range s.Etcd.EtcdServersOverrides {
|
|
||||||
tokens := strings.Split(override, "#")
|
|
||||||
if len(tokens) != 2 {
|
|
||||||
glog.Errorf("invalid value of etcd server overrides: %s", override)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
apiresource := strings.Split(tokens[0], "/")
|
|
||||||
if len(apiresource) != 2 {
|
|
||||||
glog.Errorf("invalid resource definition: %s", tokens[0])
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
group := apiresource[0]
|
|
||||||
resource := apiresource[1]
|
|
||||||
groupResource := schema.GroupResource{Group: group, Resource: resource}
|
|
||||||
|
|
||||||
servers := strings.Split(tokens[1], ";")
|
|
||||||
storageFactory.SetEtcdLocation(groupResource, servers)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Default to the private server key for service account token signing
|
|
||||||
if len(s.Authentication.ServiceAccounts.KeyFiles) == 0 && s.SecureServing.ServerCert.CertKey.KeyFile != "" {
|
|
||||||
if kubeauthenticator.IsValidServiceAccountKeyFile(s.SecureServing.ServerCert.CertKey.KeyFile) {
|
|
||||||
s.Authentication.ServiceAccounts.KeyFiles = []string{s.SecureServing.ServerCert.CertKey.KeyFile}
|
|
||||||
} else {
|
|
||||||
glog.Warning("No TLS key provided, service account token authentication disabled")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
authenticatorConfig := s.Authentication.ToAuthenticationConfig()
|
|
||||||
if s.Authentication.ServiceAccounts.Lookup {
|
|
||||||
// If we need to look up service accounts and tokens,
|
|
||||||
// go directly to etcd to avoid recursive auth insanity
|
|
||||||
storageConfig, err := storageFactory.NewConfig(api.Resource("serviceaccounts"))
|
|
||||||
if err != nil {
|
|
||||||
return nil, nil, fmt.Errorf("unable to get serviceaccounts storage: %v", err)
|
|
||||||
}
|
|
||||||
authenticatorConfig.ServiceAccountTokenGetter = serviceaccountcontroller.NewGetterFromStorageInterface(storageConfig, storageFactory.ResourcePrefix(api.Resource("serviceaccounts")), storageFactory.ResourcePrefix(api.Resource("secrets")))
|
|
||||||
}
|
|
||||||
|
|
||||||
client, err := internalclientset.NewForConfig(genericConfig.LoopbackClientConfig)
|
|
||||||
if err != nil {
|
|
||||||
kubeAPIVersions := os.Getenv("KUBE_API_VERSIONS")
|
|
||||||
if len(kubeAPIVersions) == 0 {
|
|
||||||
return nil, nil, fmt.Errorf("failed to create clientset: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// KUBE_API_VERSIONS is used in test-update-storage-objects.sh, disabling a number of API
|
|
||||||
// groups. This leads to a nil client above and undefined behaviour further down.
|
|
||||||
//
|
|
||||||
// TODO: get rid of KUBE_API_VERSIONS or define sane behaviour if set
|
|
||||||
glog.Errorf("Failed to create clientset with KUBE_API_VERSIONS=%q. KUBE_API_VERSIONS is only for testing. Things will break.", kubeAPIVersions)
|
|
||||||
}
|
|
||||||
|
|
||||||
sharedInformers := informers.NewSharedInformerFactory(client, 10*time.Minute)
|
|
||||||
|
|
||||||
if client == nil {
|
|
||||||
// TODO: Remove check once client can never be nil.
|
|
||||||
glog.Errorf("Failed to setup bootstrap token authenticator because the loopback clientset was not setup properly.")
|
|
||||||
} else {
|
|
||||||
authenticatorConfig.BootstrapTokenAuthenticator = bootstrap.NewTokenAuthenticator(
|
|
||||||
sharedInformers.Core().InternalVersion().Secrets().Lister().Secrets(v1.NamespaceSystem),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
apiAuthenticator, securityDefinitions, err := authenticatorConfig.New()
|
|
||||||
if err != nil {
|
|
||||||
return nil, nil, fmt.Errorf("invalid authentication config: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
authorizationConfig := s.Authorization.ToAuthorizationConfig(sharedInformers)
|
|
||||||
apiAuthorizer, err := authorizationConfig.New()
|
|
||||||
if err != nil {
|
|
||||||
return nil, nil, fmt.Errorf("invalid Authorization Config: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
admissionControlPluginNames := strings.Split(s.GenericServerRunOptions.AdmissionControl, ",")
|
|
||||||
var cloudConfig []byte
|
|
||||||
|
|
||||||
if s.CloudProvider.CloudConfigFile != "" {
|
|
||||||
cloudConfig, err = ioutil.ReadFile(s.CloudProvider.CloudConfigFile)
|
|
||||||
if err != nil {
|
|
||||||
glog.Fatalf("Error reading from cloud configuration file %s: %#v", s.CloudProvider.CloudConfigFile, err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
pluginInitializer := kubeadmission.NewPluginInitializer(client, sharedInformers, apiAuthorizer, cloudConfig)
|
|
||||||
admissionConfigProvider, err := admission.ReadAdmissionConfiguration(admissionControlPluginNames, s.GenericServerRunOptions.AdmissionControlConfigFile)
|
|
||||||
if err != nil {
|
|
||||||
return nil, nil, fmt.Errorf("failed to read plugin config: %v", err)
|
|
||||||
}
|
|
||||||
admissionController, err := admission.NewFromPlugins(admissionControlPluginNames, admissionConfigProvider, pluginInitializer)
|
|
||||||
if err != nil {
|
|
||||||
return nil, nil, fmt.Errorf("failed to initialize plugins: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
proxyTransport := utilnet.SetTransportDefaults(&http.Transport{
|
proxyTransport := utilnet.SetTransportDefaults(&http.Transport{
|
||||||
Dial: proxyDialerFn,
|
Dial: proxyDialerFn,
|
||||||
TLSClientConfig: proxyTLSClientConfig,
|
TLSClientConfig: proxyTLSClientConfig,
|
||||||
})
|
})
|
||||||
kubeVersion := version.Get()
|
|
||||||
|
|
||||||
genericConfig.Version = &kubeVersion
|
serviceIPRange, apiServerServiceIP, err := master.DefaultServiceIPRange(s.ServiceClusterIPRange)
|
||||||
genericConfig.Authenticator = apiAuthenticator
|
if err != nil {
|
||||||
genericConfig.Authorizer = apiAuthorizer
|
return nil, nil, err
|
||||||
genericConfig.AdmissionControl = admissionController
|
}
|
||||||
genericConfig.OpenAPIConfig = genericapiserver.DefaultOpenAPIConfig(generatedopenapi.GetOpenAPIDefinitions, api.Scheme)
|
|
||||||
genericConfig.OpenAPIConfig.PostProcessSpec = postProcessOpenAPISpecForBackwardCompatibility
|
|
||||||
genericConfig.OpenAPIConfig.SecurityDefinitions = securityDefinitions
|
|
||||||
genericConfig.OpenAPIConfig.Info.Title = "Kubernetes"
|
|
||||||
genericConfig.SwaggerConfig = genericapiserver.DefaultSwaggerConfig()
|
|
||||||
genericConfig.EnableMetrics = true
|
|
||||||
genericConfig.LongRunningFunc = filters.BasicLongRunningRequestCheck(
|
|
||||||
sets.NewString("watch", "proxy"),
|
|
||||||
sets.NewString("attach", "exec", "proxy", "log", "portforward"),
|
|
||||||
)
|
|
||||||
|
|
||||||
if err := s.Etcd.ApplyWithStorageFactoryTo(storageFactory, genericConfig); err != nil {
|
storageFactory, err := BuildStorageFactory(s)
|
||||||
|
if err != nil {
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -383,6 +207,7 @@ func BuildMasterConfig(s *options.ServerRunOptions) (*master.Config, informers.S
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
config := &master.Config{
|
config := &master.Config{
|
||||||
GenericConfig: genericConfig,
|
GenericConfig: genericConfig,
|
||||||
|
|
||||||
@ -416,13 +241,235 @@ func BuildMasterConfig(s *options.ServerRunOptions) (*master.Config, informers.S
|
|||||||
MasterCount: s.MasterCount,
|
MasterCount: s.MasterCount,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return config, sharedInformers, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// BuildGenericConfig takes the master server options and produces the genericapiserver.Config associated with it
|
||||||
|
func BuildGenericConfig(s *options.ServerRunOptions) (*genericapiserver.Config, informers.SharedInformerFactory, error) {
|
||||||
|
genericConfig := genericapiserver.NewConfig().WithSerializer(api.Codecs)
|
||||||
|
if err := s.GenericServerRunOptions.ApplyTo(genericConfig); err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
if err := s.InsecureServing.ApplyTo(genericConfig); err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
if err := s.SecureServing.ApplyTo(genericConfig); err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
if err := s.Authentication.ApplyTo(genericConfig); err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
if err := s.Audit.ApplyTo(genericConfig); err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
if err := s.Features.ApplyTo(genericConfig); err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
genericConfig.OpenAPIConfig = genericapiserver.DefaultOpenAPIConfig(generatedopenapi.GetOpenAPIDefinitions, api.Scheme)
|
||||||
|
genericConfig.OpenAPIConfig.PostProcessSpec = postProcessOpenAPISpecForBackwardCompatibility
|
||||||
|
genericConfig.OpenAPIConfig.Info.Title = "Kubernetes"
|
||||||
|
genericConfig.SwaggerConfig = genericapiserver.DefaultSwaggerConfig()
|
||||||
|
genericConfig.EnableMetrics = true
|
||||||
|
genericConfig.LongRunningFunc = filters.BasicLongRunningRequestCheck(
|
||||||
|
sets.NewString("watch", "proxy"),
|
||||||
|
sets.NewString("attach", "exec", "proxy", "log", "portforward"),
|
||||||
|
)
|
||||||
|
|
||||||
|
kubeVersion := version.Get()
|
||||||
|
genericConfig.Version = &kubeVersion
|
||||||
|
|
||||||
|
storageFactory, err := BuildStorageFactory(s)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
if err := s.Etcd.ApplyWithStorageFactoryTo(storageFactory, genericConfig); err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Use protobufs for self-communication.
|
||||||
|
// Since not every generic apiserver has to support protobufs, we
|
||||||
|
// cannot default to it in generic apiserver and need to explicitly
|
||||||
|
// set it in kube-apiserver.
|
||||||
|
genericConfig.LoopbackClientConfig.ContentConfig.ContentType = "application/vnd.kubernetes.protobuf"
|
||||||
|
|
||||||
|
client, err := internalclientset.NewForConfig(genericConfig.LoopbackClientConfig)
|
||||||
|
if err != nil {
|
||||||
|
kubeAPIVersions := os.Getenv("KUBE_API_VERSIONS")
|
||||||
|
if len(kubeAPIVersions) == 0 {
|
||||||
|
return nil, nil, fmt.Errorf("failed to create clientset: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// KUBE_API_VERSIONS is used in test-update-storage-objects.sh, disabling a number of API
|
||||||
|
// groups. This leads to a nil client above and undefined behaviour further down.
|
||||||
|
//
|
||||||
|
// TODO: get rid of KUBE_API_VERSIONS or define sane behaviour if set
|
||||||
|
glog.Errorf("Failed to create clientset with KUBE_API_VERSIONS=%q. KUBE_API_VERSIONS is only for testing. Things will break.", kubeAPIVersions)
|
||||||
|
}
|
||||||
|
sharedInformers := informers.NewSharedInformerFactory(client, 10*time.Minute)
|
||||||
|
|
||||||
|
genericConfig.Authenticator, genericConfig.OpenAPIConfig.SecurityDefinitions, err = BuildAuthenticator(s, storageFactory, client, sharedInformers)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, fmt.Errorf("invalid authentication config: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
genericConfig.Authorizer, err = BuildAuthorizer(s, sharedInformers)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, fmt.Errorf("invalid authorization config: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
genericConfig.AdmissionControl, err = BuildAdmission(s, client, sharedInformers, genericConfig.Authorizer)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, fmt.Errorf("failed to initialize admission: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return genericConfig, sharedInformers, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// BuildAdmission constructs the admission chain
|
||||||
|
func BuildAdmission(s *options.ServerRunOptions, client internalclientset.Interface, sharedInformers informers.SharedInformerFactory, apiAuthorizer authorizer.Authorizer) (admission.Interface, error) {
|
||||||
|
admissionControlPluginNames := strings.Split(s.GenericServerRunOptions.AdmissionControl, ",")
|
||||||
|
var cloudConfig []byte
|
||||||
|
var err error
|
||||||
|
|
||||||
|
if s.CloudProvider.CloudConfigFile != "" {
|
||||||
|
cloudConfig, err = ioutil.ReadFile(s.CloudProvider.CloudConfigFile)
|
||||||
|
if err != nil {
|
||||||
|
glog.Fatalf("Error reading from cloud configuration file %s: %#v", s.CloudProvider.CloudConfigFile, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pluginInitializer := kubeadmission.NewPluginInitializer(client, sharedInformers, apiAuthorizer, cloudConfig)
|
||||||
|
admissionConfigProvider, err := admission.ReadAdmissionConfiguration(admissionControlPluginNames, s.GenericServerRunOptions.AdmissionControlConfigFile)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to read plugin config: %v", err)
|
||||||
|
}
|
||||||
|
return admission.NewFromPlugins(admissionControlPluginNames, admissionConfigProvider, pluginInitializer)
|
||||||
|
}
|
||||||
|
|
||||||
|
// BuildAuthenticator constructs the authenticator
|
||||||
|
func BuildAuthenticator(s *options.ServerRunOptions, storageFactory serverstorage.StorageFactory, client internalclientset.Interface, sharedInformers informers.SharedInformerFactory) (authenticator.Request, *spec.SecurityDefinitions, error) {
|
||||||
|
authenticatorConfig := s.Authentication.ToAuthenticationConfig()
|
||||||
|
if s.Authentication.ServiceAccounts.Lookup {
|
||||||
|
// we have to go direct to storage because the clientsets fail when they're initialized with some API versions excluded
|
||||||
|
// we should stop trying to control them like that.
|
||||||
|
storageConfig, err := storageFactory.NewConfig(api.Resource("serviceaccounts"))
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, fmt.Errorf("unable to get serviceaccounts storage: %v", err)
|
||||||
|
}
|
||||||
|
authenticatorConfig.ServiceAccountTokenGetter = serviceaccountcontroller.NewGetterFromStorageInterface(storageConfig, storageFactory.ResourcePrefix(api.Resource("serviceaccounts")), storageFactory.ResourcePrefix(api.Resource("secrets")))
|
||||||
|
}
|
||||||
|
if client == nil || reflect.ValueOf(client).IsNil() {
|
||||||
|
// TODO: Remove check once client can never be nil.
|
||||||
|
glog.Errorf("Failed to setup bootstrap token authenticator because the loopback clientset was not setup properly.")
|
||||||
|
} else {
|
||||||
|
authenticatorConfig.BootstrapTokenAuthenticator = bootstrap.NewTokenAuthenticator(
|
||||||
|
sharedInformers.Core().InternalVersion().Secrets().Lister().Secrets(v1.NamespaceSystem),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
return authenticatorConfig.New()
|
||||||
|
}
|
||||||
|
|
||||||
|
// BuildAuthorizer constructs the authorizer
|
||||||
|
func BuildAuthorizer(s *options.ServerRunOptions, sharedInformers informers.SharedInformerFactory) (authorizer.Authorizer, error) {
|
||||||
|
authorizationConfig := s.Authorization.ToAuthorizationConfig(sharedInformers)
|
||||||
|
return authorizationConfig.New()
|
||||||
|
}
|
||||||
|
|
||||||
|
// BuildStorageFactory constructs the storage factory
|
||||||
|
func BuildStorageFactory(s *options.ServerRunOptions) (*serverstorage.DefaultStorageFactory, error) {
|
||||||
|
storageGroupsToEncodingVersion, err := s.StorageSerialization.StorageGroupsToEncodingVersion()
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("error generating storage version map: %s", err)
|
||||||
|
}
|
||||||
|
storageFactory, err := kubeapiserver.NewStorageFactory(
|
||||||
|
s.Etcd.StorageConfig, s.Etcd.DefaultStorageMediaType, api.Codecs,
|
||||||
|
serverstorage.NewDefaultResourceEncodingConfig(api.Registry), storageGroupsToEncodingVersion,
|
||||||
|
// FIXME: this GroupVersionResource override should be configurable
|
||||||
|
[]schema.GroupVersionResource{batch.Resource("cronjobs").WithVersion("v2alpha1")},
|
||||||
|
master.DefaultAPIResourceConfigSource(), s.APIEnablement.RuntimeConfig)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("error in initializing storage factory: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// keep Deployments in extensions for backwards compatibility, we'll have to migrate at some point, eventually
|
||||||
|
storageFactory.AddCohabitatingResources(extensions.Resource("deployments"), apps.Resource("deployments"))
|
||||||
|
for _, override := range s.Etcd.EtcdServersOverrides {
|
||||||
|
tokens := strings.Split(override, "#")
|
||||||
|
if len(tokens) != 2 {
|
||||||
|
glog.Errorf("invalid value of etcd server overrides: %s", override)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
apiresource := strings.Split(tokens[0], "/")
|
||||||
|
if len(apiresource) != 2 {
|
||||||
|
glog.Errorf("invalid resource definition: %s", tokens[0])
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
group := apiresource[0]
|
||||||
|
resource := apiresource[1]
|
||||||
|
groupResource := schema.GroupResource{Group: group, Resource: resource}
|
||||||
|
|
||||||
|
servers := strings.Split(tokens[1], ";")
|
||||||
|
storageFactory.SetEtcdLocation(groupResource, servers)
|
||||||
|
}
|
||||||
|
|
||||||
|
return storageFactory, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func defaultOptions(s *options.ServerRunOptions) error {
|
||||||
|
if err := s.GenericServerRunOptions.DefaultAdvertiseAddress(s.SecureServing, s.InsecureServing); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
_, apiServerServiceIP, err := master.DefaultServiceIPRange(s.ServiceClusterIPRange)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("error determining service IP ranges: %v", err)
|
||||||
|
}
|
||||||
|
if err := s.SecureServing.MaybeDefaultWithSelfSignedCerts(s.GenericServerRunOptions.AdvertiseAddress.String(), apiServerServiceIP); err != nil {
|
||||||
|
return fmt.Errorf("error creating self-signed certificates: %v", err)
|
||||||
|
}
|
||||||
|
if err := s.CloudProvider.DefaultExternalHost(s.GenericServerRunOptions); err != nil {
|
||||||
|
return fmt.Errorf("error setting the external host value: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
s.Authentication.ApplyAuthorization(s.Authorization)
|
||||||
|
|
||||||
|
// Default to the private server key for service account token signing
|
||||||
|
if len(s.Authentication.ServiceAccounts.KeyFiles) == 0 && s.SecureServing.ServerCert.CertKey.KeyFile != "" {
|
||||||
|
if kubeauthenticator.IsValidServiceAccountKeyFile(s.SecureServing.ServerCert.CertKey.KeyFile) {
|
||||||
|
s.Authentication.ServiceAccounts.KeyFiles = []string{s.SecureServing.ServerCert.CertKey.KeyFile}
|
||||||
|
} else {
|
||||||
|
glog.Warning("No TLS key provided, service account token authentication disabled")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if s.Etcd.StorageConfig.DeserializationCacheSize == 0 {
|
||||||
|
// When size of cache is not explicitly set, estimate its size based on
|
||||||
|
// target memory usage.
|
||||||
|
glog.V(2).Infof("Initializing deserialization cache size based on %dMB limit", s.GenericServerRunOptions.TargetRAMMB)
|
||||||
|
|
||||||
|
// This is the heuristics that from memory capacity is trying to infer
|
||||||
|
// the maximum number of nodes in the cluster and set cache sizes based
|
||||||
|
// on that value.
|
||||||
|
// From our documentation, we officially recomment 120GB machines for
|
||||||
|
// 2000 nodes, and we scale from that point. Thus we assume ~60MB of
|
||||||
|
// capacity per node.
|
||||||
|
// TODO: We may consider deciding that some percentage of memory will
|
||||||
|
// be used for the deserialization cache and divide it by the max object
|
||||||
|
// size to compute its size. We may even go further and measure
|
||||||
|
// collective sizes of the objects in the cache.
|
||||||
|
clusterSize := s.GenericServerRunOptions.TargetRAMMB / 60
|
||||||
|
s.Etcd.StorageConfig.DeserializationCacheSize = 25 * clusterSize
|
||||||
|
if s.Etcd.StorageConfig.DeserializationCacheSize < 1000 {
|
||||||
|
s.Etcd.StorageConfig.DeserializationCacheSize = 1000
|
||||||
|
}
|
||||||
|
}
|
||||||
if s.Etcd.EnableWatchCache {
|
if s.Etcd.EnableWatchCache {
|
||||||
glog.V(2).Infof("Initializing cache sizes based on %dMB limit", s.GenericServerRunOptions.TargetRAMMB)
|
glog.V(2).Infof("Initializing cache sizes based on %dMB limit", s.GenericServerRunOptions.TargetRAMMB)
|
||||||
cachesize.InitializeWatchCacheSizes(s.GenericServerRunOptions.TargetRAMMB)
|
cachesize.InitializeWatchCacheSizes(s.GenericServerRunOptions.TargetRAMMB)
|
||||||
cachesize.SetWatchCacheSizes(s.GenericServerRunOptions.WatchCacheSizes)
|
cachesize.SetWatchCacheSizes(s.GenericServerRunOptions.WatchCacheSizes)
|
||||||
}
|
}
|
||||||
|
|
||||||
return config, sharedInformers, nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func readCAorNil(file string) ([]byte, error) {
|
func readCAorNil(file string) ([]byte, error) {
|
||||||
|
Loading…
Reference in New Issue
Block a user