mirror of
				https://github.com/k3s-io/kubernetes.git
				synced 2025-11-04 07:49:35 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			759 lines
		
	
	
		
			30 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			759 lines
		
	
	
		
			30 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
/*
 | 
						|
Copyright 2014 The Kubernetes Authors.
 | 
						|
 | 
						|
Licensed under the Apache License, Version 2.0 (the "License");
 | 
						|
you may not use this file except in compliance with the License.
 | 
						|
You may obtain a copy of the License at
 | 
						|
 | 
						|
    http://www.apache.org/licenses/LICENSE-2.0
 | 
						|
 | 
						|
Unless required by applicable law or agreed to in writing, software
 | 
						|
distributed under the License is distributed on an "AS IS" BASIS,
 | 
						|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
						|
See the License for the specific language governing permissions and
 | 
						|
limitations under the License.
 | 
						|
*/
 | 
						|
 | 
						|
// Package app does all of the work necessary to create a Kubernetes
 | 
						|
// APIServer by binding together the API, master and APIServer infrastructure.
 | 
						|
// It can be configured and called directly or via the hyperkube framework.
 | 
						|
package app
 | 
						|
 | 
						|
import (
 | 
						|
	"crypto/tls"
 | 
						|
	"fmt"
 | 
						|
	"net"
 | 
						|
	"net/http"
 | 
						|
	"net/url"
 | 
						|
	"os"
 | 
						|
	"strconv"
 | 
						|
	"strings"
 | 
						|
	"time"
 | 
						|
 | 
						|
	"github.com/spf13/cobra"
 | 
						|
	"github.com/spf13/pflag"
 | 
						|
 | 
						|
	extensionsapiserver "k8s.io/apiextensions-apiserver/pkg/apiserver"
 | 
						|
	utilerrors "k8s.io/apimachinery/pkg/util/errors"
 | 
						|
	utilnet "k8s.io/apimachinery/pkg/util/net"
 | 
						|
	"k8s.io/apimachinery/pkg/util/sets"
 | 
						|
	utilwait "k8s.io/apimachinery/pkg/util/wait"
 | 
						|
	"k8s.io/apiserver/pkg/admission"
 | 
						|
	"k8s.io/apiserver/pkg/authorization/authorizer"
 | 
						|
	openapinamer "k8s.io/apiserver/pkg/endpoints/openapi"
 | 
						|
	genericfeatures "k8s.io/apiserver/pkg/features"
 | 
						|
	genericapiserver "k8s.io/apiserver/pkg/server"
 | 
						|
	"k8s.io/apiserver/pkg/server/egressselector"
 | 
						|
	"k8s.io/apiserver/pkg/server/filters"
 | 
						|
	serveroptions "k8s.io/apiserver/pkg/server/options"
 | 
						|
	serverstorage "k8s.io/apiserver/pkg/server/storage"
 | 
						|
	"k8s.io/apiserver/pkg/storage/etcd3/preflight"
 | 
						|
	"k8s.io/apiserver/pkg/util/feature"
 | 
						|
	utilfeature "k8s.io/apiserver/pkg/util/feature"
 | 
						|
	utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol"
 | 
						|
	"k8s.io/apiserver/pkg/util/webhook"
 | 
						|
	clientgoinformers "k8s.io/client-go/informers"
 | 
						|
	clientgoclientset "k8s.io/client-go/kubernetes"
 | 
						|
	"k8s.io/client-go/rest"
 | 
						|
	"k8s.io/client-go/util/keyutil"
 | 
						|
	cloudprovider "k8s.io/cloud-provider"
 | 
						|
	cliflag "k8s.io/component-base/cli/flag"
 | 
						|
	"k8s.io/component-base/cli/globalflag"
 | 
						|
	_ "k8s.io/component-base/metrics/prometheus/workqueue" // for workqueue metric registration
 | 
						|
	"k8s.io/component-base/term"
 | 
						|
	"k8s.io/component-base/version"
 | 
						|
	"k8s.io/component-base/version/verflag"
 | 
						|
	"k8s.io/klog/v2"
 | 
						|
	aggregatorapiserver "k8s.io/kube-aggregator/pkg/apiserver"
 | 
						|
	aggregatorscheme "k8s.io/kube-aggregator/pkg/apiserver/scheme"
 | 
						|
 | 
						|
	"k8s.io/kubernetes/cmd/kube-apiserver/app/options"
 | 
						|
	"k8s.io/kubernetes/pkg/api/legacyscheme"
 | 
						|
	"k8s.io/kubernetes/pkg/capabilities"
 | 
						|
	"k8s.io/kubernetes/pkg/controlplane"
 | 
						|
	"k8s.io/kubernetes/pkg/controlplane/reconcilers"
 | 
						|
	"k8s.io/kubernetes/pkg/controlplane/tunneler"
 | 
						|
	generatedopenapi "k8s.io/kubernetes/pkg/generated/openapi"
 | 
						|
	"k8s.io/kubernetes/pkg/kubeapiserver"
 | 
						|
	kubeapiserveradmission "k8s.io/kubernetes/pkg/kubeapiserver/admission"
 | 
						|
	kubeauthenticator "k8s.io/kubernetes/pkg/kubeapiserver/authenticator"
 | 
						|
	"k8s.io/kubernetes/pkg/kubeapiserver/authorizer/modes"
 | 
						|
	rbacrest "k8s.io/kubernetes/pkg/registry/rbac/rest"
 | 
						|
	"k8s.io/kubernetes/pkg/serviceaccount"
 | 
						|
)
 | 
						|
 | 
						|
const (
 | 
						|
	etcdRetryLimit    = 60
 | 
						|
	etcdRetryInterval = 1 * time.Second
 | 
						|
)
 | 
						|
 | 
						|
// TODO: delete this check after insecure flags removed in v1.24
 | 
						|
func checkNonZeroInsecurePort(fs *pflag.FlagSet) error {
 | 
						|
	for _, name := range options.InsecurePortFlags {
 | 
						|
		val, err := fs.GetInt(name)
 | 
						|
		if err != nil {
 | 
						|
			return err
 | 
						|
		}
 | 
						|
		if val != 0 {
 | 
						|
			return fmt.Errorf("invalid port value %d: only zero is allowed", val)
 | 
						|
		}
 | 
						|
	}
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
// NewAPIServerCommand creates a *cobra.Command object with default parameters
 | 
						|
func NewAPIServerCommand() *cobra.Command {
 | 
						|
	s := options.NewServerRunOptions()
 | 
						|
	cmd := &cobra.Command{
 | 
						|
		Use: "kube-apiserver",
 | 
						|
		Long: `The Kubernetes API server validates and configures data
 | 
						|
for the api objects which include pods, services, replicationcontrollers, and
 | 
						|
others. The API Server services REST operations and provides the frontend to the
 | 
						|
cluster's shared state through which all other components interact.`,
 | 
						|
 | 
						|
		// stop printing usage when the command errors
 | 
						|
		SilenceUsage: true,
 | 
						|
		PersistentPreRunE: func(*cobra.Command, []string) error {
 | 
						|
			// silence client-go warnings.
 | 
						|
			// kube-apiserver loopback clients should not log self-issued warnings.
 | 
						|
			rest.SetDefaultWarningHandler(rest.NoWarnings{})
 | 
						|
			return nil
 | 
						|
		},
 | 
						|
		RunE: func(cmd *cobra.Command, args []string) error {
 | 
						|
			verflag.PrintAndExitIfRequested()
 | 
						|
			fs := cmd.Flags()
 | 
						|
			cliflag.PrintFlags(fs)
 | 
						|
 | 
						|
			err := checkNonZeroInsecurePort(fs)
 | 
						|
			if err != nil {
 | 
						|
				return err
 | 
						|
			}
 | 
						|
			// set default options
 | 
						|
			completedOptions, err := Complete(s)
 | 
						|
			if err != nil {
 | 
						|
				return err
 | 
						|
			}
 | 
						|
 | 
						|
			// validate options
 | 
						|
			if errs := completedOptions.Validate(); len(errs) != 0 {
 | 
						|
				return utilerrors.NewAggregate(errs)
 | 
						|
			}
 | 
						|
 | 
						|
			return Run(completedOptions, genericapiserver.SetupSignalHandler())
 | 
						|
		},
 | 
						|
		Args: func(cmd *cobra.Command, args []string) error {
 | 
						|
			for _, arg := range args {
 | 
						|
				if len(arg) > 0 {
 | 
						|
					return fmt.Errorf("%q does not take any arguments, got %q", cmd.CommandPath(), args)
 | 
						|
				}
 | 
						|
			}
 | 
						|
			return nil
 | 
						|
		},
 | 
						|
	}
 | 
						|
 | 
						|
	fs := cmd.Flags()
 | 
						|
	namedFlagSets := s.Flags()
 | 
						|
	verflag.AddFlags(namedFlagSets.FlagSet("global"))
 | 
						|
	globalflag.AddGlobalFlags(namedFlagSets.FlagSet("global"), cmd.Name())
 | 
						|
	options.AddCustomGlobalFlags(namedFlagSets.FlagSet("generic"))
 | 
						|
	for _, f := range namedFlagSets.FlagSets {
 | 
						|
		fs.AddFlagSet(f)
 | 
						|
	}
 | 
						|
 | 
						|
	usageFmt := "Usage:\n  %s\n"
 | 
						|
	cols, _, _ := term.TerminalSize(cmd.OutOrStdout())
 | 
						|
	cmd.SetUsageFunc(func(cmd *cobra.Command) error {
 | 
						|
		fmt.Fprintf(cmd.OutOrStderr(), usageFmt, cmd.UseLine())
 | 
						|
		cliflag.PrintSections(cmd.OutOrStderr(), namedFlagSets, cols)
 | 
						|
		return nil
 | 
						|
	})
 | 
						|
	cmd.SetHelpFunc(func(cmd *cobra.Command, args []string) {
 | 
						|
		fmt.Fprintf(cmd.OutOrStdout(), "%s\n\n"+usageFmt, cmd.Long, cmd.UseLine())
 | 
						|
		cliflag.PrintSections(cmd.OutOrStdout(), namedFlagSets, cols)
 | 
						|
	})
 | 
						|
 | 
						|
	return cmd
 | 
						|
}
 | 
						|
 | 
						|
// Run runs the specified APIServer.  This should never exit.
 | 
						|
func Run(completeOptions completedServerRunOptions, stopCh <-chan struct{}) error {
 | 
						|
	// To help debugging, immediately log version
 | 
						|
	klog.Infof("Version: %+v", version.Get())
 | 
						|
 | 
						|
	server, err := CreateServerChain(completeOptions, stopCh)
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
 | 
						|
	prepared, err := server.PrepareRun()
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
 | 
						|
	return prepared.Run(stopCh)
 | 
						|
}
 | 
						|
 | 
						|
// CreateServerChain creates the apiservers connected via delegation.
 | 
						|
func CreateServerChain(completedOptions completedServerRunOptions, stopCh <-chan struct{}) (*aggregatorapiserver.APIAggregator, error) {
 | 
						|
	nodeTunneler, proxyTransport, err := CreateNodeDialer(completedOptions)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	kubeAPIServerConfig, serviceResolver, pluginInitializer, err := CreateKubeAPIServerConfig(completedOptions, nodeTunneler, proxyTransport)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	// If additional API servers are added, they should be gated.
 | 
						|
	apiExtensionsConfig, err := createAPIExtensionsConfig(*kubeAPIServerConfig.GenericConfig, kubeAPIServerConfig.ExtraConfig.VersionedInformers, pluginInitializer, completedOptions.ServerRunOptions, completedOptions.MasterCount,
 | 
						|
		serviceResolver, webhook.NewDefaultAuthenticationInfoResolverWrapper(proxyTransport, kubeAPIServerConfig.GenericConfig.EgressSelector, kubeAPIServerConfig.GenericConfig.LoopbackClientConfig))
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
	apiExtensionsServer, err := createAPIExtensionsServer(apiExtensionsConfig, genericapiserver.NewEmptyDelegate())
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	kubeAPIServer, err := CreateKubeAPIServer(kubeAPIServerConfig, apiExtensionsServer.GenericAPIServer)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	// aggregator comes last in the chain
 | 
						|
	aggregatorConfig, err := createAggregatorConfig(*kubeAPIServerConfig.GenericConfig, completedOptions.ServerRunOptions, kubeAPIServerConfig.ExtraConfig.VersionedInformers, serviceResolver, proxyTransport, pluginInitializer)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
	aggregatorServer, err := createAggregatorServer(aggregatorConfig, kubeAPIServer.GenericAPIServer, apiExtensionsServer.Informers)
 | 
						|
	if err != nil {
 | 
						|
		// we don't need special handling for innerStopCh because the aggregator server doesn't create any go routines
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	return aggregatorServer, nil
 | 
						|
}
 | 
						|
 | 
						|
// CreateKubeAPIServer creates and wires a workable kube-apiserver
 | 
						|
func CreateKubeAPIServer(kubeAPIServerConfig *controlplane.Config, delegateAPIServer genericapiserver.DelegationTarget) (*controlplane.Instance, error) {
 | 
						|
	kubeAPIServer, err := kubeAPIServerConfig.Complete().New(delegateAPIServer)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	return kubeAPIServer, nil
 | 
						|
}
 | 
						|
 | 
						|
// CreateNodeDialer creates the dialer infrastructure to connect to the nodes.
 | 
						|
func CreateNodeDialer(s completedServerRunOptions) (tunneler.Tunneler, *http.Transport, error) {
 | 
						|
	// Setup nodeTunneler if needed
 | 
						|
	var nodeTunneler tunneler.Tunneler
 | 
						|
	var proxyDialerFn utilnet.DialFunc
 | 
						|
	if len(s.SSHUser) > 0 {
 | 
						|
		// Get ssh key distribution func, if supported
 | 
						|
		var installSSHKey tunneler.InstallSSHKey
 | 
						|
 | 
						|
		cloudprovider.DeprecationWarningForProvider(s.CloudProvider.CloudProvider)
 | 
						|
		cloud, err := cloudprovider.InitCloudProvider(s.CloudProvider.CloudProvider, s.CloudProvider.CloudConfigFile)
 | 
						|
		if err != nil {
 | 
						|
			return nil, nil, fmt.Errorf("cloud provider could not be initialized: %v", err)
 | 
						|
		}
 | 
						|
		if cloud != nil {
 | 
						|
			if instances, supported := cloud.Instances(); supported {
 | 
						|
				installSSHKey = instances.AddSSHKeyToAllInstances
 | 
						|
			}
 | 
						|
		}
 | 
						|
		if s.KubeletConfig.Port == 0 {
 | 
						|
			return nil, nil, fmt.Errorf("must enable kubelet port if proxy ssh-tunneling is specified")
 | 
						|
		}
 | 
						|
		if s.KubeletConfig.ReadOnlyPort == 0 {
 | 
						|
			return nil, nil, fmt.Errorf("must enable kubelet readonly port if proxy ssh-tunneling is specified")
 | 
						|
		}
 | 
						|
		// Set up the nodeTunneler
 | 
						|
		// TODO(cjcullen): If we want this to handle per-kubelet ports or other
 | 
						|
		// kubelet listen-addresses, we need to plumb through options.
 | 
						|
		healthCheckPath := &url.URL{
 | 
						|
			Scheme: "http",
 | 
						|
			Host:   net.JoinHostPort("127.0.0.1", strconv.FormatUint(uint64(s.KubeletConfig.ReadOnlyPort), 10)),
 | 
						|
			Path:   "healthz",
 | 
						|
		}
 | 
						|
		nodeTunneler = tunneler.New(s.SSHUser, s.SSHKeyfile, healthCheckPath, installSSHKey)
 | 
						|
 | 
						|
		// Use the nodeTunneler's dialer when proxying to pods, services, and nodes
 | 
						|
		proxyDialerFn = nodeTunneler.Dial
 | 
						|
	}
 | 
						|
	// Proxying to pods and services is IP-based... don't expect to be able to verify the hostname
 | 
						|
	proxyTLSClientConfig := &tls.Config{InsecureSkipVerify: true}
 | 
						|
	proxyTransport := utilnet.SetTransportDefaults(&http.Transport{
 | 
						|
		DialContext:     proxyDialerFn,
 | 
						|
		TLSClientConfig: proxyTLSClientConfig,
 | 
						|
	})
 | 
						|
	return nodeTunneler, proxyTransport, nil
 | 
						|
}
 | 
						|
 | 
						|
// CreateKubeAPIServerConfig creates all the resources for running the API server, but runs none of them
 | 
						|
func CreateKubeAPIServerConfig(
 | 
						|
	s completedServerRunOptions,
 | 
						|
	nodeTunneler tunneler.Tunneler,
 | 
						|
	proxyTransport *http.Transport,
 | 
						|
) (
 | 
						|
	*controlplane.Config,
 | 
						|
	aggregatorapiserver.ServiceResolver,
 | 
						|
	[]admission.PluginInitializer,
 | 
						|
	error,
 | 
						|
) {
 | 
						|
	genericConfig, versionedInformers, serviceResolver, pluginInitializers, admissionPostStartHook, storageFactory, err := buildGenericConfig(s.ServerRunOptions, proxyTransport)
 | 
						|
	if err != nil {
 | 
						|
		return nil, nil, nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	if _, port, err := net.SplitHostPort(s.Etcd.StorageConfig.Transport.ServerList[0]); err == nil && port != "0" && len(port) != 0 {
 | 
						|
		if err := utilwait.PollImmediate(etcdRetryInterval, etcdRetryLimit*etcdRetryInterval, preflight.EtcdConnection{ServerList: s.Etcd.StorageConfig.Transport.ServerList}.CheckEtcdServers); err != nil {
 | 
						|
			return nil, nil, nil, fmt.Errorf("error waiting for etcd connection: %v", err)
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	capabilities.Initialize(capabilities.Capabilities{
 | 
						|
		AllowPrivileged: s.AllowPrivileged,
 | 
						|
		// TODO(vmarmol): Implement support for HostNetworkSources.
 | 
						|
		PrivilegedSources: capabilities.PrivilegedSources{
 | 
						|
			HostNetworkSources: []string{},
 | 
						|
			HostPIDSources:     []string{},
 | 
						|
			HostIPCSources:     []string{},
 | 
						|
		},
 | 
						|
		PerConnectionBandwidthLimitBytesPerSec: s.MaxConnectionBytesPerSec,
 | 
						|
	})
 | 
						|
 | 
						|
	s.Metrics.Apply()
 | 
						|
	serviceaccount.RegisterMetrics()
 | 
						|
 | 
						|
	s.Logs.Apply()
 | 
						|
 | 
						|
	config := &controlplane.Config{
 | 
						|
		GenericConfig: genericConfig,
 | 
						|
		ExtraConfig: controlplane.ExtraConfig{
 | 
						|
			APIResourceConfigSource: storageFactory.APIResourceConfigSource,
 | 
						|
			StorageFactory:          storageFactory,
 | 
						|
			EventTTL:                s.EventTTL,
 | 
						|
			KubeletClientConfig:     s.KubeletConfig,
 | 
						|
			EnableLogsSupport:       s.EnableLogsHandler,
 | 
						|
			ProxyTransport:          proxyTransport,
 | 
						|
 | 
						|
			Tunneler: nodeTunneler,
 | 
						|
 | 
						|
			ServiceIPRange:          s.PrimaryServiceClusterIPRange,
 | 
						|
			APIServerServiceIP:      s.APIServerServiceIP,
 | 
						|
			SecondaryServiceIPRange: s.SecondaryServiceClusterIPRange,
 | 
						|
 | 
						|
			APIServerServicePort: 443,
 | 
						|
 | 
						|
			ServiceNodePortRange:      s.ServiceNodePortRange,
 | 
						|
			KubernetesServiceNodePort: s.KubernetesServiceNodePort,
 | 
						|
 | 
						|
			EndpointReconcilerType: reconcilers.Type(s.EndpointReconcilerType),
 | 
						|
			MasterCount:            s.MasterCount,
 | 
						|
 | 
						|
			ServiceAccountIssuer:        s.ServiceAccountIssuer,
 | 
						|
			ServiceAccountMaxExpiration: s.ServiceAccountTokenMaxExpiration,
 | 
						|
			ExtendExpiration:            s.Authentication.ServiceAccounts.ExtendExpiration,
 | 
						|
 | 
						|
			VersionedInformers: versionedInformers,
 | 
						|
 | 
						|
			IdentityLeaseDurationSeconds:      s.IdentityLeaseDurationSeconds,
 | 
						|
			IdentityLeaseRenewIntervalSeconds: s.IdentityLeaseRenewIntervalSeconds,
 | 
						|
		},
 | 
						|
	}
 | 
						|
 | 
						|
	clientCAProvider, err := s.Authentication.ClientCert.GetClientCAContentProvider()
 | 
						|
	if err != nil {
 | 
						|
		return nil, nil, nil, err
 | 
						|
	}
 | 
						|
	config.ExtraConfig.ClusterAuthenticationInfo.ClientCA = clientCAProvider
 | 
						|
 | 
						|
	requestHeaderConfig, err := s.Authentication.RequestHeader.ToAuthenticationRequestHeaderConfig()
 | 
						|
	if err != nil {
 | 
						|
		return nil, nil, nil, err
 | 
						|
	}
 | 
						|
	if requestHeaderConfig != nil {
 | 
						|
		config.ExtraConfig.ClusterAuthenticationInfo.RequestHeaderCA = requestHeaderConfig.CAContentProvider
 | 
						|
		config.ExtraConfig.ClusterAuthenticationInfo.RequestHeaderAllowedNames = requestHeaderConfig.AllowedClientNames
 | 
						|
		config.ExtraConfig.ClusterAuthenticationInfo.RequestHeaderExtraHeaderPrefixes = requestHeaderConfig.ExtraHeaderPrefixes
 | 
						|
		config.ExtraConfig.ClusterAuthenticationInfo.RequestHeaderGroupHeaders = requestHeaderConfig.GroupHeaders
 | 
						|
		config.ExtraConfig.ClusterAuthenticationInfo.RequestHeaderUsernameHeaders = requestHeaderConfig.UsernameHeaders
 | 
						|
	}
 | 
						|
 | 
						|
	if err := config.GenericConfig.AddPostStartHook("start-kube-apiserver-admission-initializer", admissionPostStartHook); err != nil {
 | 
						|
		return nil, nil, nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	if nodeTunneler != nil {
 | 
						|
		// Use the nodeTunneler's dialer to connect to the kubelet
 | 
						|
		config.ExtraConfig.KubeletClientConfig.Dial = nodeTunneler.Dial
 | 
						|
	}
 | 
						|
	if config.GenericConfig.EgressSelector != nil {
 | 
						|
		// Use the config.GenericConfig.EgressSelector lookup to find the dialer to connect to the kubelet
 | 
						|
		config.ExtraConfig.KubeletClientConfig.Lookup = config.GenericConfig.EgressSelector.Lookup
 | 
						|
 | 
						|
		// Use the config.GenericConfig.EgressSelector lookup as the transport used by the "proxy" subresources.
 | 
						|
		networkContext := egressselector.Cluster.AsNetworkContext()
 | 
						|
		dialer, err := config.GenericConfig.EgressSelector.Lookup(networkContext)
 | 
						|
		if err != nil {
 | 
						|
			return nil, nil, nil, err
 | 
						|
		}
 | 
						|
		c := proxyTransport.Clone()
 | 
						|
		c.DialContext = dialer
 | 
						|
		config.ExtraConfig.ProxyTransport = c
 | 
						|
	}
 | 
						|
 | 
						|
	// Load the public keys.
 | 
						|
	var pubKeys []interface{}
 | 
						|
	for _, f := range s.Authentication.ServiceAccounts.KeyFiles {
 | 
						|
		keys, err := keyutil.PublicKeysFromFile(f)
 | 
						|
		if err != nil {
 | 
						|
			return nil, nil, nil, fmt.Errorf("failed to parse key file %q: %v", f, err)
 | 
						|
		}
 | 
						|
		pubKeys = append(pubKeys, keys...)
 | 
						|
	}
 | 
						|
	// Plumb the required metadata through ExtraConfig.
 | 
						|
	config.ExtraConfig.ServiceAccountIssuerURL = s.Authentication.ServiceAccounts.Issuer
 | 
						|
	config.ExtraConfig.ServiceAccountJWKSURI = s.Authentication.ServiceAccounts.JWKSURI
 | 
						|
	config.ExtraConfig.ServiceAccountPublicKeys = pubKeys
 | 
						|
 | 
						|
	return config, serviceResolver, pluginInitializers, nil
 | 
						|
}
 | 
						|
 | 
						|
// BuildGenericConfig takes the master server options and produces the genericapiserver.Config associated with it
 | 
						|
func buildGenericConfig(
 | 
						|
	s *options.ServerRunOptions,
 | 
						|
	proxyTransport *http.Transport,
 | 
						|
) (
 | 
						|
	genericConfig *genericapiserver.Config,
 | 
						|
	versionedInformers clientgoinformers.SharedInformerFactory,
 | 
						|
	serviceResolver aggregatorapiserver.ServiceResolver,
 | 
						|
	pluginInitializers []admission.PluginInitializer,
 | 
						|
	admissionPostStartHook genericapiserver.PostStartHookFunc,
 | 
						|
	storageFactory *serverstorage.DefaultStorageFactory,
 | 
						|
	lastErr error,
 | 
						|
) {
 | 
						|
	genericConfig = genericapiserver.NewConfig(legacyscheme.Codecs)
 | 
						|
	genericConfig.MergedResourceConfig = controlplane.DefaultAPIResourceConfigSource()
 | 
						|
 | 
						|
	if lastErr = s.GenericServerRunOptions.ApplyTo(genericConfig); lastErr != nil {
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	if lastErr = s.SecureServing.ApplyTo(&genericConfig.SecureServing, &genericConfig.LoopbackClientConfig); lastErr != nil {
 | 
						|
		return
 | 
						|
	}
 | 
						|
	if lastErr = s.Features.ApplyTo(genericConfig); lastErr != nil {
 | 
						|
		return
 | 
						|
	}
 | 
						|
	if lastErr = s.APIEnablement.ApplyTo(genericConfig, controlplane.DefaultAPIResourceConfigSource(), legacyscheme.Scheme); lastErr != nil {
 | 
						|
		return
 | 
						|
	}
 | 
						|
	if lastErr = s.EgressSelector.ApplyTo(genericConfig); lastErr != nil {
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	genericConfig.OpenAPIConfig = genericapiserver.DefaultOpenAPIConfig(generatedopenapi.GetOpenAPIDefinitions, openapinamer.NewDefinitionNamer(legacyscheme.Scheme, extensionsapiserver.Scheme, aggregatorscheme.Scheme))
 | 
						|
	genericConfig.OpenAPIConfig.Info.Title = "Kubernetes"
 | 
						|
	genericConfig.LongRunningFunc = filters.BasicLongRunningRequestCheck(
 | 
						|
		sets.NewString("watch", "proxy"),
 | 
						|
		sets.NewString("attach", "exec", "proxy", "log", "portforward"),
 | 
						|
	)
 | 
						|
 | 
						|
	kubeVersion := version.Get()
 | 
						|
	genericConfig.Version = &kubeVersion
 | 
						|
 | 
						|
	storageFactoryConfig := kubeapiserver.NewStorageFactoryConfig()
 | 
						|
	storageFactoryConfig.APIResourceConfig = genericConfig.MergedResourceConfig
 | 
						|
	completedStorageFactoryConfig, err := storageFactoryConfig.Complete(s.Etcd)
 | 
						|
	if err != nil {
 | 
						|
		lastErr = err
 | 
						|
		return
 | 
						|
	}
 | 
						|
	storageFactory, lastErr = completedStorageFactoryConfig.New()
 | 
						|
	if lastErr != nil {
 | 
						|
		return
 | 
						|
	}
 | 
						|
	if genericConfig.EgressSelector != nil {
 | 
						|
		storageFactory.StorageConfig.Transport.EgressLookup = genericConfig.EgressSelector.Lookup
 | 
						|
	}
 | 
						|
	if lastErr = s.Etcd.ApplyWithStorageFactoryTo(storageFactory, genericConfig); lastErr != nil {
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	// Use protobufs for self-communication.
 | 
						|
	// Since not every generic apiserver has to support protobufs, we
 | 
						|
	// cannot default to it in generic apiserver and need to explicitly
 | 
						|
	// set it in kube-apiserver.
 | 
						|
	genericConfig.LoopbackClientConfig.ContentConfig.ContentType = "application/vnd.kubernetes.protobuf"
 | 
						|
	// Disable compression for self-communication, since we are going to be
 | 
						|
	// on a fast local network
 | 
						|
	genericConfig.LoopbackClientConfig.DisableCompression = true
 | 
						|
 | 
						|
	kubeClientConfig := genericConfig.LoopbackClientConfig
 | 
						|
	clientgoExternalClient, err := clientgoclientset.NewForConfig(kubeClientConfig)
 | 
						|
	if err != nil {
 | 
						|
		lastErr = fmt.Errorf("failed to create real external clientset: %v", err)
 | 
						|
		return
 | 
						|
	}
 | 
						|
	versionedInformers = clientgoinformers.NewSharedInformerFactory(clientgoExternalClient, 10*time.Minute)
 | 
						|
 | 
						|
	// Authentication.ApplyTo requires already applied OpenAPIConfig and EgressSelector if present
 | 
						|
	if lastErr = s.Authentication.ApplyTo(&genericConfig.Authentication, genericConfig.SecureServing, genericConfig.EgressSelector, genericConfig.OpenAPIConfig, clientgoExternalClient, versionedInformers); lastErr != nil {
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	genericConfig.Authorization.Authorizer, genericConfig.RuleResolver, err = BuildAuthorizer(s, genericConfig.EgressSelector, versionedInformers)
 | 
						|
	if err != nil {
 | 
						|
		lastErr = fmt.Errorf("invalid authorization config: %v", err)
 | 
						|
		return
 | 
						|
	}
 | 
						|
	if !sets.NewString(s.Authorization.Modes...).Has(modes.ModeRBAC) {
 | 
						|
		genericConfig.DisabledPostStartHooks.Insert(rbacrest.PostStartHookName)
 | 
						|
	}
 | 
						|
 | 
						|
	lastErr = s.Audit.ApplyTo(genericConfig)
 | 
						|
	if lastErr != nil {
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	admissionConfig := &kubeapiserveradmission.Config{
 | 
						|
		ExternalInformers:    versionedInformers,
 | 
						|
		LoopbackClientConfig: genericConfig.LoopbackClientConfig,
 | 
						|
		CloudConfigFile:      s.CloudProvider.CloudConfigFile,
 | 
						|
	}
 | 
						|
	serviceResolver = buildServiceResolver(s.EnableAggregatorRouting, genericConfig.LoopbackClientConfig.Host, versionedInformers)
 | 
						|
	pluginInitializers, admissionPostStartHook, err = admissionConfig.New(proxyTransport, genericConfig.EgressSelector, serviceResolver)
 | 
						|
	if err != nil {
 | 
						|
		lastErr = fmt.Errorf("failed to create admission plugin initializer: %v", err)
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	err = s.Admission.ApplyTo(
 | 
						|
		genericConfig,
 | 
						|
		versionedInformers,
 | 
						|
		kubeClientConfig,
 | 
						|
		feature.DefaultFeatureGate,
 | 
						|
		pluginInitializers...)
 | 
						|
	if err != nil {
 | 
						|
		lastErr = fmt.Errorf("failed to initialize admission: %v", err)
 | 
						|
	}
 | 
						|
 | 
						|
	if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIPriorityAndFairness) && s.GenericServerRunOptions.EnablePriorityAndFairness {
 | 
						|
		genericConfig.FlowControl = BuildPriorityAndFairness(s, clientgoExternalClient, versionedInformers)
 | 
						|
	}
 | 
						|
 | 
						|
	return
 | 
						|
}
 | 
						|
 | 
						|
// BuildAuthorizer constructs the authorizer
 | 
						|
func BuildAuthorizer(s *options.ServerRunOptions, EgressSelector *egressselector.EgressSelector, versionedInformers clientgoinformers.SharedInformerFactory) (authorizer.Authorizer, authorizer.RuleResolver, error) {
 | 
						|
	authorizationConfig := s.Authorization.ToAuthorizationConfig(versionedInformers)
 | 
						|
 | 
						|
	if EgressSelector != nil {
 | 
						|
		egressDialer, err := EgressSelector.Lookup(egressselector.ControlPlane.AsNetworkContext())
 | 
						|
		if err != nil {
 | 
						|
			return nil, nil, err
 | 
						|
		}
 | 
						|
		authorizationConfig.CustomDial = egressDialer
 | 
						|
	}
 | 
						|
 | 
						|
	return authorizationConfig.New()
 | 
						|
}
 | 
						|
 | 
						|
// BuildPriorityAndFairness constructs the guts of the API Priority and Fairness filter
 | 
						|
func BuildPriorityAndFairness(s *options.ServerRunOptions, extclient clientgoclientset.Interface, versionedInformer clientgoinformers.SharedInformerFactory) utilflowcontrol.Interface {
 | 
						|
	return utilflowcontrol.New(
 | 
						|
		versionedInformer,
 | 
						|
		extclient.FlowcontrolV1beta1(),
 | 
						|
		s.GenericServerRunOptions.MaxRequestsInFlight+s.GenericServerRunOptions.MaxMutatingRequestsInFlight,
 | 
						|
		s.GenericServerRunOptions.RequestTimeout/4,
 | 
						|
	)
 | 
						|
}
 | 
						|
 | 
						|
// completedServerRunOptions is a private wrapper that enforces a call of Complete() before Run can be invoked.
 | 
						|
type completedServerRunOptions struct {
 | 
						|
	*options.ServerRunOptions
 | 
						|
}
 | 
						|
 | 
						|
// Complete set default ServerRunOptions.
 | 
						|
// Should be called after kube-apiserver flags parsed.
 | 
						|
func Complete(s *options.ServerRunOptions) (completedServerRunOptions, error) {
 | 
						|
	var options completedServerRunOptions
 | 
						|
	// set defaults
 | 
						|
	if err := s.GenericServerRunOptions.DefaultAdvertiseAddress(s.SecureServing.SecureServingOptions); err != nil {
 | 
						|
		return options, err
 | 
						|
	}
 | 
						|
 | 
						|
	// process s.ServiceClusterIPRange from list to Primary and Secondary
 | 
						|
	// we process secondary only if provided by user
 | 
						|
	apiServerServiceIP, primaryServiceIPRange, secondaryServiceIPRange, err := getServiceIPAndRanges(s.ServiceClusterIPRanges)
 | 
						|
	if err != nil {
 | 
						|
		return options, err
 | 
						|
	}
 | 
						|
	s.PrimaryServiceClusterIPRange = primaryServiceIPRange
 | 
						|
	s.SecondaryServiceClusterIPRange = secondaryServiceIPRange
 | 
						|
	s.APIServerServiceIP = apiServerServiceIP
 | 
						|
 | 
						|
	if err := s.SecureServing.MaybeDefaultWithSelfSignedCerts(s.GenericServerRunOptions.AdvertiseAddress.String(), []string{"kubernetes.default.svc", "kubernetes.default", "kubernetes"}, []net.IP{apiServerServiceIP}); err != nil {
 | 
						|
		return options, fmt.Errorf("error creating self-signed certificates: %v", err)
 | 
						|
	}
 | 
						|
 | 
						|
	if len(s.GenericServerRunOptions.ExternalHost) == 0 {
 | 
						|
		if len(s.GenericServerRunOptions.AdvertiseAddress) > 0 {
 | 
						|
			s.GenericServerRunOptions.ExternalHost = s.GenericServerRunOptions.AdvertiseAddress.String()
 | 
						|
		} else {
 | 
						|
			if hostname, err := os.Hostname(); err == nil {
 | 
						|
				s.GenericServerRunOptions.ExternalHost = hostname
 | 
						|
			} else {
 | 
						|
				return options, fmt.Errorf("error finding host name: %v", err)
 | 
						|
			}
 | 
						|
		}
 | 
						|
		klog.Infof("external host was not specified, using %v", s.GenericServerRunOptions.ExternalHost)
 | 
						|
	}
 | 
						|
 | 
						|
	s.Authentication.ApplyAuthorization(s.Authorization)
 | 
						|
 | 
						|
	// Use (ServiceAccountSigningKeyFile != "") as a proxy to the user enabling
 | 
						|
	// TokenRequest functionality. This defaulting was convenient, but messed up
 | 
						|
	// a lot of people when they rotated their serving cert with no idea it was
 | 
						|
	// connected to their service account keys. We are taking this opportunity to
 | 
						|
	// remove this problematic defaulting.
 | 
						|
	if s.ServiceAccountSigningKeyFile == "" {
 | 
						|
		// Default to the private server key for service account token signing
 | 
						|
		if len(s.Authentication.ServiceAccounts.KeyFiles) == 0 && s.SecureServing.ServerCert.CertKey.KeyFile != "" {
 | 
						|
			if kubeauthenticator.IsValidServiceAccountKeyFile(s.SecureServing.ServerCert.CertKey.KeyFile) {
 | 
						|
				s.Authentication.ServiceAccounts.KeyFiles = []string{s.SecureServing.ServerCert.CertKey.KeyFile}
 | 
						|
			} else {
 | 
						|
				klog.Warning("No TLS key provided, service account token authentication disabled")
 | 
						|
			}
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	if s.ServiceAccountSigningKeyFile != "" && s.Authentication.ServiceAccounts.Issuer != "" {
 | 
						|
		sk, err := keyutil.PrivateKeyFromFile(s.ServiceAccountSigningKeyFile)
 | 
						|
		if err != nil {
 | 
						|
			return options, fmt.Errorf("failed to parse service-account-issuer-key-file: %v", err)
 | 
						|
		}
 | 
						|
		if s.Authentication.ServiceAccounts.MaxExpiration != 0 {
 | 
						|
			lowBound := time.Hour
 | 
						|
			upBound := time.Duration(1<<32) * time.Second
 | 
						|
			if s.Authentication.ServiceAccounts.MaxExpiration < lowBound ||
 | 
						|
				s.Authentication.ServiceAccounts.MaxExpiration > upBound {
 | 
						|
				return options, fmt.Errorf("the service-account-max-token-expiration must be between 1 hour and 2^32 seconds")
 | 
						|
			}
 | 
						|
			if s.Authentication.ServiceAccounts.ExtendExpiration {
 | 
						|
				if s.Authentication.ServiceAccounts.MaxExpiration < serviceaccount.WarnOnlyBoundTokenExpirationSeconds*time.Second {
 | 
						|
					klog.Warningf("service-account-extend-token-expiration is true, in order to correctly trigger safe transition logic, service-account-max-token-expiration must be set longer than %d seconds (currently %s)", serviceaccount.WarnOnlyBoundTokenExpirationSeconds, s.Authentication.ServiceAccounts.MaxExpiration)
 | 
						|
				}
 | 
						|
				if s.Authentication.ServiceAccounts.MaxExpiration < serviceaccount.ExpirationExtensionSeconds*time.Second {
 | 
						|
					klog.Warningf("service-account-extend-token-expiration is true, enabling tokens valid up to %d seconds, which is longer than service-account-max-token-expiration set to %s seconds", serviceaccount.ExpirationExtensionSeconds, s.Authentication.ServiceAccounts.MaxExpiration)
 | 
						|
				}
 | 
						|
			}
 | 
						|
		}
 | 
						|
 | 
						|
		s.ServiceAccountIssuer, err = serviceaccount.JWTTokenGenerator(s.Authentication.ServiceAccounts.Issuer, sk)
 | 
						|
		if err != nil {
 | 
						|
			return options, fmt.Errorf("failed to build token generator: %v", err)
 | 
						|
		}
 | 
						|
		s.ServiceAccountTokenMaxExpiration = s.Authentication.ServiceAccounts.MaxExpiration
 | 
						|
	}
 | 
						|
 | 
						|
	if s.Etcd.EnableWatchCache {
 | 
						|
		sizes := kubeapiserver.DefaultWatchCacheSizes()
 | 
						|
		// Ensure that overrides parse correctly.
 | 
						|
		userSpecified, err := serveroptions.ParseWatchCacheSizes(s.Etcd.WatchCacheSizes)
 | 
						|
		if err != nil {
 | 
						|
			return options, err
 | 
						|
		}
 | 
						|
		for resource, size := range userSpecified {
 | 
						|
			sizes[resource] = size
 | 
						|
		}
 | 
						|
		s.Etcd.WatchCacheSizes, err = serveroptions.WriteWatchCacheSizes(sizes)
 | 
						|
		if err != nil {
 | 
						|
			return options, err
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	if s.APIEnablement.RuntimeConfig != nil {
 | 
						|
		for key, value := range s.APIEnablement.RuntimeConfig {
 | 
						|
			if key == "v1" || strings.HasPrefix(key, "v1/") ||
 | 
						|
				key == "api/v1" || strings.HasPrefix(key, "api/v1/") {
 | 
						|
				delete(s.APIEnablement.RuntimeConfig, key)
 | 
						|
				s.APIEnablement.RuntimeConfig["/v1"] = value
 | 
						|
			}
 | 
						|
			if key == "api/legacy" {
 | 
						|
				delete(s.APIEnablement.RuntimeConfig, key)
 | 
						|
			}
 | 
						|
		}
 | 
						|
	}
 | 
						|
	options.ServerRunOptions = s
 | 
						|
	return options, nil
 | 
						|
}
 | 
						|
 | 
						|
func buildServiceResolver(enabledAggregatorRouting bool, hostname string, informer clientgoinformers.SharedInformerFactory) webhook.ServiceResolver {
 | 
						|
	var serviceResolver webhook.ServiceResolver
 | 
						|
	if enabledAggregatorRouting {
 | 
						|
		serviceResolver = aggregatorapiserver.NewEndpointServiceResolver(
 | 
						|
			informer.Core().V1().Services().Lister(),
 | 
						|
			informer.Core().V1().Endpoints().Lister(),
 | 
						|
		)
 | 
						|
	} else {
 | 
						|
		serviceResolver = aggregatorapiserver.NewClusterIPServiceResolver(
 | 
						|
			informer.Core().V1().Services().Lister(),
 | 
						|
		)
 | 
						|
	}
 | 
						|
	// resolve kubernetes.default.svc locally
 | 
						|
	if localHost, err := url.Parse(hostname); err == nil {
 | 
						|
		serviceResolver = aggregatorapiserver.NewLoopbackServiceResolver(serviceResolver, localHost)
 | 
						|
	}
 | 
						|
	return serviceResolver
 | 
						|
}
 | 
						|
 | 
						|
func getServiceIPAndRanges(serviceClusterIPRanges string) (net.IP, net.IPNet, net.IPNet, error) {
 | 
						|
	serviceClusterIPRangeList := []string{}
 | 
						|
	if serviceClusterIPRanges != "" {
 | 
						|
		serviceClusterIPRangeList = strings.Split(serviceClusterIPRanges, ",")
 | 
						|
	}
 | 
						|
 | 
						|
	var apiServerServiceIP net.IP
 | 
						|
	var primaryServiceIPRange net.IPNet
 | 
						|
	var secondaryServiceIPRange net.IPNet
 | 
						|
	var err error
 | 
						|
	// nothing provided by user, use default range (only applies to the Primary)
 | 
						|
	if len(serviceClusterIPRangeList) == 0 {
 | 
						|
		var primaryServiceClusterCIDR net.IPNet
 | 
						|
		primaryServiceIPRange, apiServerServiceIP, err = controlplane.ServiceIPRange(primaryServiceClusterCIDR)
 | 
						|
		if err != nil {
 | 
						|
			return net.IP{}, net.IPNet{}, net.IPNet{}, fmt.Errorf("error determining service IP ranges: %v", err)
 | 
						|
		}
 | 
						|
		return apiServerServiceIP, primaryServiceIPRange, net.IPNet{}, nil
 | 
						|
	}
 | 
						|
 | 
						|
	if len(serviceClusterIPRangeList) > 0 {
 | 
						|
		_, primaryServiceClusterCIDR, err := net.ParseCIDR(serviceClusterIPRangeList[0])
 | 
						|
		if err != nil {
 | 
						|
			return net.IP{}, net.IPNet{}, net.IPNet{}, fmt.Errorf("service-cluster-ip-range[0] is not a valid cidr")
 | 
						|
		}
 | 
						|
 | 
						|
		primaryServiceIPRange, apiServerServiceIP, err = controlplane.ServiceIPRange(*(primaryServiceClusterCIDR))
 | 
						|
		if err != nil {
 | 
						|
			return net.IP{}, net.IPNet{}, net.IPNet{}, fmt.Errorf("error determining service IP ranges for primary service cidr: %v", err)
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	// user provided at least two entries
 | 
						|
	// note: validation asserts that the list is max of two dual stack entries
 | 
						|
	if len(serviceClusterIPRangeList) > 1 {
 | 
						|
		_, secondaryServiceClusterCIDR, err := net.ParseCIDR(serviceClusterIPRangeList[1])
 | 
						|
		if err != nil {
 | 
						|
			return net.IP{}, net.IPNet{}, net.IPNet{}, fmt.Errorf("service-cluster-ip-range[1] is not an ip net")
 | 
						|
		}
 | 
						|
		secondaryServiceIPRange = *secondaryServiceClusterCIDR
 | 
						|
	}
 | 
						|
	return apiServerServiceIP, primaryServiceIPRange, secondaryServiceIPRange, nil
 | 
						|
}
 |