/* Copyright 2014 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ // Package app does all of the work necessary to create a Kubernetes // APIServer by binding together the API, master and APIServer infrastructure. // It can be configured and called directly or via the hyperkube framework. package app import ( "fmt" "io/ioutil" "os" "strings" "time" "github.com/golang/glog" "github.com/spf13/cobra" utilerrors "k8s.io/apimachinery/pkg/util/errors" utilnet "k8s.io/apimachinery/pkg/util/net" "k8s.io/apimachinery/pkg/util/sets" utilwait "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apiserver/pkg/admission" "k8s.io/apiserver/pkg/authentication/authenticator" "k8s.io/apiserver/pkg/authorization/authorizer" genericapiserver "k8s.io/apiserver/pkg/server" "k8s.io/apiserver/pkg/server/filters" serveroptions "k8s.io/apiserver/pkg/server/options" serverstorage "k8s.io/apiserver/pkg/server/storage" utilfeature "k8s.io/apiserver/pkg/util/feature" apiserverflag "k8s.io/apiserver/pkg/util/flag" "k8s.io/apiserver/pkg/util/webhook" cacheddiscovery "k8s.io/client-go/discovery/cached" clientgoinformers "k8s.io/client-go/informers" clientgoclientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/restmapper" certutil "k8s.io/client-go/util/cert" "k8s.io/kubernetes/cmd/kube-apiserver/app/options" "k8s.io/kubernetes/pkg/api/legacyscheme" "k8s.io/kubernetes/pkg/capabilities" "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/internalversion" serviceaccountcontroller "k8s.io/kubernetes/pkg/controller/serviceaccount" "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/kubeapiserver" kubeapiserveradmission "k8s.io/kubernetes/pkg/kubeapiserver/admission" kubeauthenticator "k8s.io/kubernetes/pkg/kubeapiserver/authenticator" "k8s.io/kubernetes/pkg/kubeapiserver/authorizer/modes" kubeoptions "k8s.io/kubernetes/pkg/kubeapiserver/options" "k8s.io/kubernetes/pkg/master" "k8s.io/kubernetes/pkg/master/reconcilers" quotainstall "k8s.io/kubernetes/pkg/quota/install" "k8s.io/kubernetes/pkg/registry/cachesize" rbacrest "k8s.io/kubernetes/pkg/registry/rbac/rest" "k8s.io/kubernetes/pkg/serviceaccount" "k8s.io/kubernetes/pkg/version" "k8s.io/kubernetes/pkg/version/verflag" utilflag "k8s.io/kubernetes/pkg/util/flag" _ "k8s.io/kubernetes/pkg/util/reflector/prometheus" // for reflector metric registration _ "k8s.io/kubernetes/pkg/util/workqueue/prometheus" // for workqueue metric registration ) const etcdRetryLimit = 60 const etcdRetryInterval = 1 * time.Second var ( DefaultProxyDialerFn utilnet.DialFunc ) // NewAPIServerCommand creates a *cobra.Command object with default parameters func NewAPIServerCommand(stopCh <-chan struct{}) *cobra.Command { s := options.NewServerRunOptions() cmd := &cobra.Command{ Use: "kube-apiserver", Long: `The Kubernetes API server validates and configures data for the api objects which include pods, services, replicationcontrollers, and others. The API Server services REST operations and provides the frontend to the cluster's shared state through which all other components interact.`, RunE: func(cmd *cobra.Command, args []string) error { verflag.PrintAndExitIfRequested() utilflag.PrintFlags(cmd.Flags()) // set default options completedOptions, err := Complete(s) if err != nil { return err } // validate options if errs := completedOptions.Validate(); len(errs) != 0 { return utilerrors.NewAggregate(errs) } return Run(completedOptions, stopCh) }, } fs := cmd.Flags() namedFlagSets := s.Flags() for _, f := range namedFlagSets.FlagSets { fs.AddFlagSet(f) } usageFmt := "Usage:\n %s\n" cols, _, _ := apiserverflag.TerminalSize(cmd.OutOrStdout()) cmd.SetUsageFunc(func(cmd *cobra.Command) error { fmt.Fprintf(cmd.OutOrStderr(), usageFmt, cmd.UseLine()) apiserverflag.PrintSections(cmd.OutOrStderr(), namedFlagSets, cols) return nil }) cmd.SetHelpFunc(func(cmd *cobra.Command, args []string) { fmt.Fprintf(cmd.OutOrStdout(), "%s\n\n"+usageFmt, cmd.Long, cmd.UseLine()) apiserverflag.PrintSections(cmd.OutOrStdout(), namedFlagSets, cols) }) return cmd } // Run runs the specified APIServer. This should never exit. func Run(completeOptions completedServerRunOptions, stopCh <-chan struct{}) error { // To help debugging, immediately log version glog.Infof("Version: %+v", version.Get()) _, server, err := CreateServerChain(completeOptions, stopCh) if err != nil { return err } return server.PrepareRun().Run(stopCh) } // CreateServerChain creates the apiservers connected via delegation. func CreateServerChain(completedOptions completedServerRunOptions, stopCh <-chan struct{}) (*master.Config, *genericapiserver.GenericAPIServer, error) { completedOptions.KubeletConfig.Dial = DefaultProxyDialerFn kubeAPIServerConfig, _, pluginInitializer, admissionPostStartHook, err := CreateKubeAPIServerConfig(completedOptions) if err != nil { return nil, nil, err } // If additional API servers are added, they should be gated. apiExtensionsConfig, err := createAPIExtensionsConfig(*kubeAPIServerConfig.GenericConfig, kubeAPIServerConfig.ExtraConfig.VersionedInformers, pluginInitializer, completedOptions.ServerRunOptions, completedOptions.MasterCount) if err != nil { return nil, nil, err } apiExtensionsServer, err := createAPIExtensionsServer(apiExtensionsConfig, genericapiserver.NewEmptyDelegate()) if err != nil { return nil, nil, err } kubeAPIServer, err := CreateKubeAPIServer(kubeAPIServerConfig, apiExtensionsServer.GenericAPIServer, admissionPostStartHook) if err != nil { return nil, nil, err } apiExtensionsServer.GenericAPIServer.DiscoveryGroupManager = kubeAPIServer.GenericAPIServer.DiscoveryGroupManager // This will wire up openapi for extension api server apiExtensionsServer.GenericAPIServer.PrepareRun() return kubeAPIServerConfig, kubeAPIServer.GenericAPIServer, nil } // CreateKubeAPIServer creates and wires a workable kube-apiserver func CreateKubeAPIServer(kubeAPIServerConfig *master.Config, delegateAPIServer genericapiserver.DelegationTarget, admissionPostStartHook genericapiserver.PostStartHookFunc) (*master.Master, error) { kubeAPIServer, err := kubeAPIServerConfig.Complete().New(delegateAPIServer) if err != nil { return nil, err } kubeAPIServer.GenericAPIServer.AddPostStartHookOrDie("start-kube-apiserver-admission-initializer", admissionPostStartHook) return kubeAPIServer, nil } // CreateKubeAPIServerConfig creates all the resources for running the API server, but runs none of them func CreateKubeAPIServerConfig( s completedServerRunOptions, ) ( config *master.Config, insecureServingInfo *genericapiserver.DeprecatedInsecureServingInfo, pluginInitializers []admission.PluginInitializer, admissionPostStartHook genericapiserver.PostStartHookFunc, lastErr error, ) { var genericConfig *genericapiserver.Config var storageFactory *serverstorage.DefaultStorageFactory var sharedInformers informers.SharedInformerFactory var versionedInformers clientgoinformers.SharedInformerFactory genericConfig, sharedInformers, versionedInformers, insecureServingInfo, pluginInitializers, admissionPostStartHook, storageFactory, lastErr = buildGenericConfig(s.ServerRunOptions) if lastErr != nil { return } capabilities.Initialize(capabilities.Capabilities{ AllowPrivileged: s.AllowPrivileged, // TODO(vmarmol): Implement support for HostNetworkSources. PrivilegedSources: capabilities.PrivilegedSources{ HostNetworkSources: []string{}, HostPIDSources: []string{}, HostIPCSources: []string{}, }, PerConnectionBandwidthLimitBytesPerSec: s.MaxConnectionBytesPerSec, }) serviceIPRange, apiServerServiceIP, lastErr := master.DefaultServiceIPRange(s.ServiceClusterIPRange) if lastErr != nil { return } var ( issuer serviceaccount.TokenGenerator apiAudiences []string maxExpiration time.Duration ) if s.ServiceAccountSigningKeyFile != "" || s.Authentication.ServiceAccounts.Issuer != "" || len(s.Authentication.ServiceAccounts.APIAudiences) > 0 { if !utilfeature.DefaultFeatureGate.Enabled(features.TokenRequest) { lastErr = fmt.Errorf("the TokenRequest feature is not enabled but --service-account-signing-key-file, --service-account-issuer and/or --service-account-api-audiences flags were passed") return } if s.ServiceAccountSigningKeyFile == "" || s.Authentication.ServiceAccounts.Issuer == "" || len(s.Authentication.ServiceAccounts.APIAudiences) == 0 || len(s.Authentication.ServiceAccounts.KeyFiles) == 0 { lastErr = fmt.Errorf("service-account-signing-key-file, service-account-issuer, service-account-api-audiences and service-account-key-file should be specified together") return } sk, err := certutil.PrivateKeyFromFile(s.ServiceAccountSigningKeyFile) if err != nil { lastErr = fmt.Errorf("failed to parse service-account-issuer-key-file: %v", err) return } if s.Authentication.ServiceAccounts.MaxExpiration != 0 { lowBound := time.Hour upBound := time.Duration(1<<32) * time.Second if s.Authentication.ServiceAccounts.MaxExpiration < lowBound || s.Authentication.ServiceAccounts.MaxExpiration > upBound { lastErr = fmt.Errorf("the serviceaccount max expiration is out of range, must be between 1 hour to 2^32 seconds") return } } issuer, err = serviceaccount.JWTTokenGenerator(s.Authentication.ServiceAccounts.Issuer, sk) if err != nil { lastErr = fmt.Errorf("failed to build token generator: %v", err) return } apiAudiences = s.Authentication.ServiceAccounts.APIAudiences maxExpiration = s.Authentication.ServiceAccounts.MaxExpiration } config = &master.Config{ GenericConfig: genericConfig, ExtraConfig: master.ExtraConfig{ APIResourceConfigSource: storageFactory.APIResourceConfigSource, StorageFactory: storageFactory, EventTTL: s.EventTTL, KubeletClientConfig: s.KubeletConfig, EnableLogsSupport: s.EnableLogsHandler, ServiceIPRange: serviceIPRange, APIServerServiceIP: apiServerServiceIP, APIServerServicePort: 443, ServiceNodePortRange: s.ServiceNodePortRange, KubernetesServiceNodePort: s.KubernetesServiceNodePort, EndpointReconcilerType: reconcilers.Type(s.EndpointReconcilerType), MasterCount: s.MasterCount, ServiceAccountIssuer: issuer, ServiceAccountAPIAudiences: apiAudiences, ServiceAccountMaxExpiration: maxExpiration, InternalInformers: sharedInformers, VersionedInformers: versionedInformers, }, } return } // BuildGenericConfig takes the master server options and produces the genericapiserver.Config associated with it func buildGenericConfig( s *options.ServerRunOptions, ) ( genericConfig *genericapiserver.Config, sharedInformers informers.SharedInformerFactory, versionedInformers clientgoinformers.SharedInformerFactory, insecureServingInfo *genericapiserver.DeprecatedInsecureServingInfo, pluginInitializers []admission.PluginInitializer, admissionPostStartHook genericapiserver.PostStartHookFunc, storageFactory *serverstorage.DefaultStorageFactory, lastErr error, ) { genericConfig = genericapiserver.NewConfig(legacyscheme.Codecs) genericConfig.MergedResourceConfig = master.DefaultAPIResourceConfigSource() if lastErr = s.GenericServerRunOptions.ApplyTo(genericConfig); lastErr != nil { return } if lastErr = s.InsecureServing.ApplyTo(&insecureServingInfo, &genericConfig.LoopbackClientConfig); lastErr != nil { return } if lastErr = s.SecureServing.ApplyTo(&genericConfig.SecureServing, &genericConfig.LoopbackClientConfig); lastErr != nil { return } if lastErr = s.Authentication.ApplyTo(genericConfig); lastErr != nil { return } if lastErr = s.Audit.ApplyTo(genericConfig); lastErr != nil { return } if lastErr = s.Features.ApplyTo(genericConfig); lastErr != nil { return } if lastErr = s.APIEnablement.ApplyTo(genericConfig, master.DefaultAPIResourceConfigSource(), legacyscheme.Scheme); lastErr != nil { return } genericConfig.LongRunningFunc = filters.BasicLongRunningRequestCheck( sets.NewString("watch", "proxy"), sets.NewString("attach", "exec", "proxy", "log", "portforward"), ) kubeVersion := version.Get() genericConfig.Version = &kubeVersion storageFactoryConfig := kubeapiserver.NewStorageFactoryConfig() storageFactoryConfig.ApiResourceConfig = genericConfig.MergedResourceConfig completedStorageFactoryConfig, err := storageFactoryConfig.Complete(s.Etcd, s.StorageSerialization) if err != nil { lastErr = err return } storageFactory, lastErr = completedStorageFactoryConfig.New() if lastErr != nil { return } if lastErr = s.Etcd.ApplyWithStorageFactoryTo(storageFactory, genericConfig); lastErr != nil { return } // Use protobufs for self-communication. // Since not every generic apiserver has to support protobufs, we // cannot default to it in generic apiserver and need to explicitly // set it in kube-apiserver. genericConfig.LoopbackClientConfig.ContentConfig.ContentType = "application/vnd.kubernetes.protobuf" client, err := internalclientset.NewForConfig(genericConfig.LoopbackClientConfig) if err != nil { lastErr = fmt.Errorf("failed to create clientset: %v", err) return } kubeClientConfig := genericConfig.LoopbackClientConfig sharedInformers = informers.NewSharedInformerFactory(client, 10*time.Minute) clientgoExternalClient, err := clientgoclientset.NewForConfig(kubeClientConfig) if err != nil { lastErr = fmt.Errorf("failed to create real external clientset: %v", err) return } versionedInformers = clientgoinformers.NewSharedInformerFactory(clientgoExternalClient, 10*time.Minute) genericConfig.Authentication.Authenticator, err = BuildAuthenticator(s, clientgoExternalClient, sharedInformers) if err != nil { lastErr = fmt.Errorf("invalid authentication config: %v", err) return } genericConfig.Authorization.Authorizer, genericConfig.RuleResolver, err = BuildAuthorizer(s, versionedInformers) if err != nil { lastErr = fmt.Errorf("invalid authorization config: %v", err) return } if !sets.NewString(s.Authorization.Modes...).Has(modes.ModeRBAC) { genericConfig.DisabledPostStartHooks.Insert(rbacrest.PostStartHookName) } webhookAuthResolverWrapper := func(delegate webhook.AuthenticationInfoResolver) webhook.AuthenticationInfoResolver { return &webhook.AuthenticationInfoResolverDelegator{ ClientConfigForFunc: func(server string) (*rest.Config, error) { if server == "kubernetes.default.svc" { return genericConfig.LoopbackClientConfig, nil } return delegate.ClientConfigFor(server) }, ClientConfigForServiceFunc: func(serviceName, serviceNamespace string) (*rest.Config, error) { if serviceName == "kubernetes" && serviceNamespace == "default" { return genericConfig.LoopbackClientConfig, nil } ret, err := delegate.ClientConfigForService(serviceName, serviceNamespace) if err != nil { return nil, err } return ret, err }, } } pluginInitializers, admissionPostStartHook, err = BuildAdmissionPluginInitializers( s, client, sharedInformers, webhookAuthResolverWrapper, ) if err != nil { lastErr = fmt.Errorf("failed to create admission plugin initializer: %v", err) return } err = s.Admission.ApplyTo( genericConfig, versionedInformers, kubeClientConfig, legacyscheme.Scheme, pluginInitializers...) if err != nil { lastErr = fmt.Errorf("failed to initialize admission: %v", err) } return } // BuildAdmissionPluginInitializers constructs the admission plugin initializer func BuildAdmissionPluginInitializers( s *options.ServerRunOptions, client internalclientset.Interface, sharedInformers informers.SharedInformerFactory, webhookAuthWrapper webhook.AuthenticationInfoResolverWrapper, ) ([]admission.PluginInitializer, genericapiserver.PostStartHookFunc, error) { var cloudConfig []byte if s.CloudProvider.CloudConfigFile != "" { var err error cloudConfig, err = ioutil.ReadFile(s.CloudProvider.CloudConfigFile) if err != nil { glog.Fatalf("Error reading from cloud configuration file %s: %#v", s.CloudProvider.CloudConfigFile, err) } } // We have a functional client so we can use that to build our discovery backed REST mapper // Use a discovery client capable of being refreshed. discoveryClient := cacheddiscovery.NewMemCacheClient(client.Discovery()) discoveryRESTMapper := restmapper.NewDeferredDiscoveryRESTMapper(discoveryClient) admissionPostStartHook := func(context genericapiserver.PostStartHookContext) error { discoveryRESTMapper.Reset() go utilwait.Until(discoveryRESTMapper.Reset, 30*time.Second, context.StopCh) return nil } quotaConfiguration := quotainstall.NewQuotaConfigurationForAdmission() kubePluginInitializer := kubeapiserveradmission.NewPluginInitializer(client, sharedInformers, cloudConfig, discoveryRESTMapper, quotaConfiguration) return []admission.PluginInitializer{kubePluginInitializer}, admissionPostStartHook, nil } // BuildAuthenticator constructs the authenticator func BuildAuthenticator(s *options.ServerRunOptions, extclient clientgoclientset.Interface, sharedInformers informers.SharedInformerFactory) (authenticator.Request, error) { authenticatorConfig := s.Authentication.ToAuthenticationConfig() if s.Authentication.ServiceAccounts.Lookup { authenticatorConfig.ServiceAccountTokenGetter = serviceaccountcontroller.NewGetterFromClient(extclient) } return authenticatorConfig.New() } // BuildAuthorizer constructs the authorizer func BuildAuthorizer(s *options.ServerRunOptions, versionedInformers clientgoinformers.SharedInformerFactory) (authorizer.Authorizer, authorizer.RuleResolver, error) { authorizationConfig := s.Authorization.ToAuthorizationConfig(versionedInformers) return authorizationConfig.New() } // completedServerRunOptions is a private wrapper that enforces a call of Complete() before Run can be invoked. type completedServerRunOptions struct { *options.ServerRunOptions } // Complete set default ServerRunOptions. // Should be called after kube-apiserver flags parsed. func Complete(s *options.ServerRunOptions) (completedServerRunOptions, error) { var options completedServerRunOptions // set defaults if err := s.GenericServerRunOptions.DefaultAdvertiseAddress(s.SecureServing.SecureServingOptions); err != nil { return options, err } if err := kubeoptions.DefaultAdvertiseAddress(s.GenericServerRunOptions, s.InsecureServing.DeprecatedInsecureServingOptions); err != nil { return options, err } serviceIPRange, _, err := master.DefaultServiceIPRange(s.ServiceClusterIPRange) if err != nil { return options, fmt.Errorf("error determining service IP ranges: %v", err) } s.ServiceClusterIPRange = serviceIPRange if len(s.GenericServerRunOptions.ExternalHost) == 0 { if len(s.GenericServerRunOptions.AdvertiseAddress) > 0 { s.GenericServerRunOptions.ExternalHost = s.GenericServerRunOptions.AdvertiseAddress.String() } else { if hostname, err := os.Hostname(); err == nil { s.GenericServerRunOptions.ExternalHost = hostname } else { return options, fmt.Errorf("error finding host name: %v", err) } } glog.Infof("external host was not specified, using %v", s.GenericServerRunOptions.ExternalHost) } s.Authentication.ApplyAuthorization(s.Authorization) // Use (ServiceAccountSigningKeyFile != "") as a proxy to the user enabling // TokenRequest functionality. This defaulting was convenient, but messed up // a lot of people when they rotated their serving cert with no idea it was // connected to their service account keys. We are taking this oppurtunity to // remove this problematic defaulting. if s.ServiceAccountSigningKeyFile == "" { // Default to the private server key for service account token signing if len(s.Authentication.ServiceAccounts.KeyFiles) == 0 && s.SecureServing.ServerCert.CertKey.KeyFile != "" { if kubeauthenticator.IsValidServiceAccountKeyFile(s.SecureServing.ServerCert.CertKey.KeyFile) { s.Authentication.ServiceAccounts.KeyFiles = []string{s.SecureServing.ServerCert.CertKey.KeyFile} } else { glog.Warning("No TLS key provided, service account token authentication disabled") } } } if s.Etcd.StorageConfig.DeserializationCacheSize == 0 { // When size of cache is not explicitly set, estimate its size based on // target memory usage. glog.V(2).Infof("Initializing deserialization cache size based on %dMB limit", s.GenericServerRunOptions.TargetRAMMB) // This is the heuristics that from memory capacity is trying to infer // the maximum number of nodes in the cluster and set cache sizes based // on that value. // From our documentation, we officially recommend 120GB machines for // 2000 nodes, and we scale from that point. Thus we assume ~60MB of // capacity per node. // TODO: We may consider deciding that some percentage of memory will // be used for the deserialization cache and divide it by the max object // size to compute its size. We may even go further and measure // collective sizes of the objects in the cache. clusterSize := s.GenericServerRunOptions.TargetRAMMB / 60 s.Etcd.StorageConfig.DeserializationCacheSize = 25 * clusterSize if s.Etcd.StorageConfig.DeserializationCacheSize < 1000 { s.Etcd.StorageConfig.DeserializationCacheSize = 1000 } } if s.Etcd.EnableWatchCache { glog.V(2).Infof("Initializing cache sizes based on %dMB limit", s.GenericServerRunOptions.TargetRAMMB) sizes := cachesize.NewHeuristicWatchCacheSizes(s.GenericServerRunOptions.TargetRAMMB) if userSpecified, err := serveroptions.ParseWatchCacheSizes(s.Etcd.WatchCacheSizes); err == nil { for resource, size := range userSpecified { sizes[resource] = size } } s.Etcd.WatchCacheSizes, err = serveroptions.WriteWatchCacheSizes(sizes) if err != nil { return options, err } } // TODO: remove when we stop supporting the legacy group version. if s.APIEnablement.RuntimeConfig != nil { for key, value := range s.APIEnablement.RuntimeConfig { if key == "v1" || strings.HasPrefix(key, "v1/") || key == "api/v1" || strings.HasPrefix(key, "api/v1/") { delete(s.APIEnablement.RuntimeConfig, key) s.APIEnablement.RuntimeConfig["/v1"] = value } if key == "api/legacy" { delete(s.APIEnablement.RuntimeConfig, key) } } } options.ServerRunOptions = s return options, nil } func readCAorNil(file string) ([]byte, error) { if len(file) == 0 { return nil, nil } return ioutil.ReadFile(file) }