Plumb service resolver to webhook AC

This commit is contained in:
Daniel Smith
2017-06-01 17:11:23 -07:00
parent ad4d965711
commit cadaaa349a
3 changed files with 140 additions and 64 deletions

View File

@@ -52,7 +52,11 @@ import (
"k8s.io/apiserver/pkg/server/filters"
"k8s.io/apiserver/pkg/server/options/encryptionconfig"
serverstorage "k8s.io/apiserver/pkg/server/storage"
aggregatorapiserver "k8s.io/kube-aggregator/pkg/apiserver"
//aggregatorinformers "k8s.io/kube-aggregator/pkg/client/informers/internalversion"
clientgoinformers "k8s.io/client-go/informers"
clientgoclientset "k8s.io/client-go/kubernetes"
"k8s.io/kubernetes/cmd/kube-apiserver/app/options"
"k8s.io/kubernetes/cmd/kube-apiserver/app/preflight"
"k8s.io/kubernetes/pkg/api"
@@ -111,7 +115,8 @@ func Run(runOptions *options.ServerRunOptions, stopCh <-chan struct{}) error {
if err != nil {
return err
}
kubeAPIServerConfig, sharedInformers, insecureServingOptions, err := CreateKubeAPIServerConfig(runOptions, nodeTunneler, proxyTransport)
kubeAPIServerConfig, sharedInformers, insecureServingOptions, serviceResolver, err := CreateKubeAPIServerConfig(runOptions, nodeTunneler, proxyTransport)
if err != nil {
return err
}
@@ -154,6 +159,8 @@ func Run(runOptions *options.ServerRunOptions, stopCh <-chan struct{}) error {
if err != nil {
return err
}
aggregatorConfig.ProxyTransport = proxyTransport
aggregatorConfig.ServiceResolver = serviceResolver
aggregatorServer, err := createAggregatorServer(aggregatorConfig, kubeAPIServer.GenericAPIServer, sharedInformers, apiExtensionsServer.Informers)
if err != nil {
// we don't need special handling for innerStopCh because the aggregator server doesn't create any go routines
@@ -230,27 +237,27 @@ func CreateNodeDialer(s *options.ServerRunOptions) (tunneler.Tunneler, *http.Tra
}
// CreateKubeAPIServerConfig creates all the resources for running the API server, but runs none of them
func CreateKubeAPIServerConfig(s *options.ServerRunOptions, nodeTunneler tunneler.Tunneler, proxyTransport http.RoundTripper) (*master.Config, informers.SharedInformerFactory, *kubeserver.InsecureServingInfo, error) {
func CreateKubeAPIServerConfig(s *options.ServerRunOptions, nodeTunneler tunneler.Tunneler, proxyTransport http.RoundTripper) (*master.Config, informers.SharedInformerFactory, *kubeserver.InsecureServingInfo, aggregatorapiserver.ServiceResolver, error) {
// register all admission plugins
registerAllAdmissionPlugins(s.Admission.Plugins)
// set defaults in the options before trying to create the generic config
if err := defaultOptions(s); err != nil {
return nil, nil, nil, err
return nil, nil, nil, nil, err
}
// validate options
if errs := s.Validate(); len(errs) != 0 {
return nil, nil, nil, utilerrors.NewAggregate(errs)
return nil, nil, nil, nil, utilerrors.NewAggregate(errs)
}
genericConfig, sharedInformers, insecureServingOptions, err := BuildGenericConfig(s)
genericConfig, sharedInformers, insecureServingOptions, serviceResolver, err := BuildGenericConfig(s)
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, nil, err
}
if err := utilwait.PollImmediate(etcdRetryInterval, etcdRetryLimit*etcdRetryInterval, preflight.EtcdConnection{ServerList: s.Etcd.StorageConfig.ServerList}.CheckEtcdServers); err != nil {
return nil, nil, nil, fmt.Errorf("error waiting for etcd connection: %v", err)
return nil, nil, nil, nil, fmt.Errorf("error waiting for etcd connection: %v", err)
}
capabilities.Initialize(capabilities.Capabilities{
@@ -266,21 +273,21 @@ func CreateKubeAPIServerConfig(s *options.ServerRunOptions, nodeTunneler tunnele
serviceIPRange, apiServerServiceIP, err := master.DefaultServiceIPRange(s.ServiceClusterIPRange)
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, nil, err
}
storageFactory, err := BuildStorageFactory(s)
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, nil, err
}
clientCA, err := readCAorNil(s.Authentication.ClientCert.ClientCA)
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, nil, err
}
requestHeaderProxyCA, err := readCAorNil(s.Authentication.RequestHeader.ClientCAFile)
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, nil, err
}
config := &master.Config{
@@ -321,30 +328,30 @@ func CreateKubeAPIServerConfig(s *options.ServerRunOptions, nodeTunneler tunnele
config.KubeletClientConfig.Dial = nodeTunneler.Dial
}
return config, sharedInformers, insecureServingOptions, nil
return config, sharedInformers, insecureServingOptions, serviceResolver, nil
}
// BuildGenericConfig takes the master server options and produces the genericapiserver.Config associated with it
func BuildGenericConfig(s *options.ServerRunOptions) (*genericapiserver.Config, informers.SharedInformerFactory, *kubeserver.InsecureServingInfo, error) {
func BuildGenericConfig(s *options.ServerRunOptions) (*genericapiserver.Config, informers.SharedInformerFactory, *kubeserver.InsecureServingInfo, aggregatorapiserver.ServiceResolver, error) {
genericConfig := genericapiserver.NewConfig(api.Codecs)
if err := s.GenericServerRunOptions.ApplyTo(genericConfig); err != nil {
return nil, nil, nil, err
return nil, nil, nil, nil, err
}
insecureServingOptions, err := s.InsecureServing.ApplyTo(genericConfig)
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, nil, err
}
if err := s.SecureServing.ApplyTo(genericConfig); err != nil {
return nil, nil, nil, err
return nil, nil, nil, nil, err
}
if err := s.Authentication.ApplyTo(genericConfig); err != nil {
return nil, nil, nil, err
return nil, nil, nil, nil, err
}
if err := s.Audit.ApplyTo(genericConfig); err != nil {
return nil, nil, nil, err
return nil, nil, nil, nil, err
}
if err := s.Features.ApplyTo(genericConfig); err != nil {
return nil, nil, nil, err
return nil, nil, nil, nil, err
}
genericConfig.OpenAPIConfig = genericapiserver.DefaultOpenAPIConfig(generatedopenapi.GetOpenAPIDefinitions, api.Scheme)
@@ -362,10 +369,10 @@ func BuildGenericConfig(s *options.ServerRunOptions) (*genericapiserver.Config,
storageFactory, err := BuildStorageFactory(s)
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, nil, err
}
if err := s.Etcd.ApplyWithStorageFactoryTo(storageFactory, genericConfig); err != nil {
return nil, nil, nil, err
return nil, nil, nil, nil, err
}
// Use protobufs for self-communication.
@@ -378,7 +385,7 @@ func BuildGenericConfig(s *options.ServerRunOptions) (*genericapiserver.Config,
if err != nil {
kubeAPIVersions := os.Getenv("KUBE_API_VERSIONS")
if len(kubeAPIVersions) == 0 {
return nil, nil, nil, fmt.Errorf("failed to create clientset: %v", err)
return nil, nil, nil, nil, fmt.Errorf("failed to create clientset: %v", err)
}
// KUBE_API_VERSIONS is used in test-update-storage-objects.sh, disabling a number of API
@@ -389,18 +396,36 @@ func BuildGenericConfig(s *options.ServerRunOptions) (*genericapiserver.Config,
}
externalClient, err := clientset.NewForConfig(genericConfig.LoopbackClientConfig)
if err != nil {
return nil, nil, nil, fmt.Errorf("failed to create external clientset: %v", err)
return nil, nil, nil, nil, fmt.Errorf("failed to create external clientset: %v", err)
}
sharedInformers := informers.NewSharedInformerFactory(client, 10*time.Minute)
clientgoExternalClient, err := clientgoclientset.NewForConfig(genericConfig.LoopbackClientConfig)
if err != nil {
return nil, nil, nil, nil, fmt.Errorf("failed to create real external clientset: %v", err)
}
aggregatorInformers := clientgoinformers.NewSharedInformerFactory(clientgoExternalClient, 10*time.Minute)
var serviceResolver aggregatorapiserver.ServiceResolver
if s.EnableAggregatorRouting {
serviceResolver = aggregatorapiserver.NewEndpointServiceResolver(
aggregatorInformers.Core().V1().Services().Lister(),
aggregatorInformers.Core().V1().Endpoints().Lister(),
)
} else {
serviceResolver = aggregatorapiserver.NewClusterIPServiceResolver(
aggregatorInformers.Core().V1().Services().Lister(),
)
}
genericConfig.Authenticator, genericConfig.OpenAPIConfig.SecurityDefinitions, err = BuildAuthenticator(s, storageFactory, client, sharedInformers)
if err != nil {
return nil, nil, nil, fmt.Errorf("invalid authentication config: %v", err)
return nil, nil, nil, nil, fmt.Errorf("invalid authentication config: %v", err)
}
genericConfig.Authorizer, err = BuildAuthorizer(s, sharedInformers)
if err != nil {
return nil, nil, nil, fmt.Errorf("invalid authorization config: %v", err)
return nil, nil, nil, nil, fmt.Errorf("invalid authorization config: %v", err)
}
if !sets.NewString(s.Authorization.Modes()...).Has(modes.ModeRBAC) {
genericConfig.DisabledPostStartHooks.Insert(rbacrest.PostStartHookName)
@@ -412,22 +437,23 @@ func BuildGenericConfig(s *options.ServerRunOptions) (*genericapiserver.Config,
externalClient,
sharedInformers,
genericConfig.Authorizer,
serviceResolver,
)
if err != nil {
return nil, nil, nil, fmt.Errorf("failed to create admission plugin initializer: %v", err)
return nil, nil, nil, nil, fmt.Errorf("failed to create admission plugin initializer: %v", err)
}
err = s.Admission.ApplyTo(
genericConfig,
pluginInitializer)
if err != nil {
return nil, nil, nil, fmt.Errorf("failed to initialize admission: %v", err)
return nil, nil, nil, nil, fmt.Errorf("failed to initialize admission: %v", err)
}
return genericConfig, sharedInformers, insecureServingOptions, nil
return genericConfig, sharedInformers, insecureServingOptions, serviceResolver, nil
}
// BuildAdmissionPluginInitializer constructs the admission plugin initializer
func BuildAdmissionPluginInitializer(s *options.ServerRunOptions, client internalclientset.Interface, externalClient clientset.Interface, sharedInformers informers.SharedInformerFactory, apiAuthorizer authorizer.Authorizer) (admission.PluginInitializer, error) {
func BuildAdmissionPluginInitializer(s *options.ServerRunOptions, client internalclientset.Interface, externalClient clientset.Interface, sharedInformers informers.SharedInformerFactory, apiAuthorizer authorizer.Authorizer, serviceResolver aggregatorapiserver.ServiceResolver) (admission.PluginInitializer, error) {
var cloudConfig []byte
if s.CloudProvider.CloudConfigFile != "" {
@@ -460,6 +486,8 @@ func BuildAdmissionPluginInitializer(s *options.ServerRunOptions, client interna
pluginInitializer = pluginInitializer.SetClientCert(certBytes, keyBytes)
}
pluginInitializer = pluginInitializer.SetServiceResolver(serviceResolver)
return pluginInitializer, nil
}