Merge pull request #46812 from lavalamp/whitlockjc-plumbing

Automatic merge from submit-queue (batch tested with PRs 47726, 47693, 46909, 46812)

Plumb service resolver into webhook AC

This is the last piece of plumbing needed for https://github.com/kubernetes/features/issues/209
This commit is contained in:
Kubernetes Submit Queue 2017-06-19 18:34:06 -07:00 committed by GitHub
commit 6bab8dc493
7 changed files with 144 additions and 66 deletions

View File

@ -96,6 +96,7 @@ go_library(
"//vendor/k8s.io/apiserver/pkg/server/options:go_default_library",
"//vendor/k8s.io/apiserver/pkg/server/options/encryptionconfig:go_default_library",
"//vendor/k8s.io/apiserver/pkg/server/storage:go_default_library",
"//vendor/k8s.io/client-go/informers:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
"//vendor/k8s.io/kube-aggregator/pkg/apis/apiregistration:go_default_library",
"//vendor/k8s.io/kube-aggregator/pkg/apis/apiregistration/v1beta1:go_default_library",

View File

@ -52,7 +52,11 @@ import (
"k8s.io/apiserver/pkg/server/filters"
"k8s.io/apiserver/pkg/server/options/encryptionconfig"
serverstorage "k8s.io/apiserver/pkg/server/storage"
aggregatorapiserver "k8s.io/kube-aggregator/pkg/apiserver"
//aggregatorinformers "k8s.io/kube-aggregator/pkg/client/informers/internalversion"
clientgoinformers "k8s.io/client-go/informers"
clientgoclientset "k8s.io/client-go/kubernetes"
"k8s.io/kubernetes/cmd/kube-apiserver/app/options"
"k8s.io/kubernetes/cmd/kube-apiserver/app/preflight"
"k8s.io/kubernetes/pkg/api"
@ -111,7 +115,8 @@ func Run(runOptions *options.ServerRunOptions, stopCh <-chan struct{}) error {
if err != nil {
return err
}
kubeAPIServerConfig, sharedInformers, insecureServingOptions, err := CreateKubeAPIServerConfig(runOptions, nodeTunneler, proxyTransport)
kubeAPIServerConfig, sharedInformers, insecureServingOptions, serviceResolver, err := CreateKubeAPIServerConfig(runOptions, nodeTunneler, proxyTransport)
if err != nil {
return err
}
@ -154,6 +159,8 @@ func Run(runOptions *options.ServerRunOptions, stopCh <-chan struct{}) error {
if err != nil {
return err
}
aggregatorConfig.ProxyTransport = proxyTransport
aggregatorConfig.ServiceResolver = serviceResolver
aggregatorServer, err := createAggregatorServer(aggregatorConfig, kubeAPIServer.GenericAPIServer, sharedInformers, apiExtensionsServer.Informers)
if err != nil {
// we don't need special handling for innerStopCh because the aggregator server doesn't create any go routines
@ -230,27 +237,27 @@ func CreateNodeDialer(s *options.ServerRunOptions) (tunneler.Tunneler, *http.Tra
}
// CreateKubeAPIServerConfig creates all the resources for running the API server, but runs none of them
func CreateKubeAPIServerConfig(s *options.ServerRunOptions, nodeTunneler tunneler.Tunneler, proxyTransport http.RoundTripper) (*master.Config, informers.SharedInformerFactory, *kubeserver.InsecureServingInfo, error) {
func CreateKubeAPIServerConfig(s *options.ServerRunOptions, nodeTunneler tunneler.Tunneler, proxyTransport http.RoundTripper) (*master.Config, informers.SharedInformerFactory, *kubeserver.InsecureServingInfo, aggregatorapiserver.ServiceResolver, error) {
// register all admission plugins
registerAllAdmissionPlugins(s.Admission.Plugins)
// set defaults in the options before trying to create the generic config
if err := defaultOptions(s); err != nil {
return nil, nil, nil, err
return nil, nil, nil, nil, err
}
// validate options
if errs := s.Validate(); len(errs) != 0 {
return nil, nil, nil, utilerrors.NewAggregate(errs)
return nil, nil, nil, nil, utilerrors.NewAggregate(errs)
}
genericConfig, sharedInformers, insecureServingOptions, err := BuildGenericConfig(s)
genericConfig, sharedInformers, insecureServingOptions, serviceResolver, err := BuildGenericConfig(s)
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, nil, err
}
if err := utilwait.PollImmediate(etcdRetryInterval, etcdRetryLimit*etcdRetryInterval, preflight.EtcdConnection{ServerList: s.Etcd.StorageConfig.ServerList}.CheckEtcdServers); err != nil {
return nil, nil, nil, fmt.Errorf("error waiting for etcd connection: %v", err)
return nil, nil, nil, nil, fmt.Errorf("error waiting for etcd connection: %v", err)
}
capabilities.Initialize(capabilities.Capabilities{
@ -266,21 +273,21 @@ func CreateKubeAPIServerConfig(s *options.ServerRunOptions, nodeTunneler tunnele
serviceIPRange, apiServerServiceIP, err := master.DefaultServiceIPRange(s.ServiceClusterIPRange)
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, nil, err
}
storageFactory, err := BuildStorageFactory(s)
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, nil, err
}
clientCA, err := readCAorNil(s.Authentication.ClientCert.ClientCA)
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, nil, err
}
requestHeaderProxyCA, err := readCAorNil(s.Authentication.RequestHeader.ClientCAFile)
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, nil, err
}
config := &master.Config{
@ -321,30 +328,30 @@ func CreateKubeAPIServerConfig(s *options.ServerRunOptions, nodeTunneler tunnele
config.KubeletClientConfig.Dial = nodeTunneler.Dial
}
return config, sharedInformers, insecureServingOptions, nil
return config, sharedInformers, insecureServingOptions, serviceResolver, nil
}
// BuildGenericConfig takes the master server options and produces the genericapiserver.Config associated with it
func BuildGenericConfig(s *options.ServerRunOptions) (*genericapiserver.Config, informers.SharedInformerFactory, *kubeserver.InsecureServingInfo, error) {
func BuildGenericConfig(s *options.ServerRunOptions) (*genericapiserver.Config, informers.SharedInformerFactory, *kubeserver.InsecureServingInfo, aggregatorapiserver.ServiceResolver, error) {
genericConfig := genericapiserver.NewConfig(api.Codecs)
if err := s.GenericServerRunOptions.ApplyTo(genericConfig); err != nil {
return nil, nil, nil, err
return nil, nil, nil, nil, err
}
insecureServingOptions, err := s.InsecureServing.ApplyTo(genericConfig)
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, nil, err
}
if err := s.SecureServing.ApplyTo(genericConfig); err != nil {
return nil, nil, nil, err
return nil, nil, nil, nil, err
}
if err := s.Authentication.ApplyTo(genericConfig); err != nil {
return nil, nil, nil, err
return nil, nil, nil, nil, err
}
if err := s.Audit.ApplyTo(genericConfig); err != nil {
return nil, nil, nil, err
return nil, nil, nil, nil, err
}
if err := s.Features.ApplyTo(genericConfig); err != nil {
return nil, nil, nil, err
return nil, nil, nil, nil, err
}
genericConfig.OpenAPIConfig = genericapiserver.DefaultOpenAPIConfig(generatedopenapi.GetOpenAPIDefinitions, api.Scheme)
@ -362,10 +369,10 @@ func BuildGenericConfig(s *options.ServerRunOptions) (*genericapiserver.Config,
storageFactory, err := BuildStorageFactory(s)
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, nil, err
}
if err := s.Etcd.ApplyWithStorageFactoryTo(storageFactory, genericConfig); err != nil {
return nil, nil, nil, err
return nil, nil, nil, nil, err
}
// Use protobufs for self-communication.
@ -378,7 +385,7 @@ func BuildGenericConfig(s *options.ServerRunOptions) (*genericapiserver.Config,
if err != nil {
kubeAPIVersions := os.Getenv("KUBE_API_VERSIONS")
if len(kubeAPIVersions) == 0 {
return nil, nil, nil, fmt.Errorf("failed to create clientset: %v", err)
return nil, nil, nil, nil, fmt.Errorf("failed to create clientset: %v", err)
}
// KUBE_API_VERSIONS is used in test-update-storage-objects.sh, disabling a number of API
@ -389,18 +396,36 @@ func BuildGenericConfig(s *options.ServerRunOptions) (*genericapiserver.Config,
}
externalClient, err := clientset.NewForConfig(genericConfig.LoopbackClientConfig)
if err != nil {
return nil, nil, nil, fmt.Errorf("failed to create external clientset: %v", err)
return nil, nil, nil, nil, fmt.Errorf("failed to create external clientset: %v", err)
}
sharedInformers := informers.NewSharedInformerFactory(client, 10*time.Minute)
clientgoExternalClient, err := clientgoclientset.NewForConfig(genericConfig.LoopbackClientConfig)
if err != nil {
return nil, nil, nil, nil, fmt.Errorf("failed to create real external clientset: %v", err)
}
aggregatorInformers := clientgoinformers.NewSharedInformerFactory(clientgoExternalClient, 10*time.Minute)
var serviceResolver aggregatorapiserver.ServiceResolver
if s.EnableAggregatorRouting {
serviceResolver = aggregatorapiserver.NewEndpointServiceResolver(
aggregatorInformers.Core().V1().Services().Lister(),
aggregatorInformers.Core().V1().Endpoints().Lister(),
)
} else {
serviceResolver = aggregatorapiserver.NewClusterIPServiceResolver(
aggregatorInformers.Core().V1().Services().Lister(),
)
}
genericConfig.Authenticator, genericConfig.OpenAPIConfig.SecurityDefinitions, err = BuildAuthenticator(s, storageFactory, client, sharedInformers)
if err != nil {
return nil, nil, nil, fmt.Errorf("invalid authentication config: %v", err)
return nil, nil, nil, nil, fmt.Errorf("invalid authentication config: %v", err)
}
genericConfig.Authorizer, err = BuildAuthorizer(s, sharedInformers)
if err != nil {
return nil, nil, nil, fmt.Errorf("invalid authorization config: %v", err)
return nil, nil, nil, nil, fmt.Errorf("invalid authorization config: %v", err)
}
if !sets.NewString(s.Authorization.Modes()...).Has(modes.ModeRBAC) {
genericConfig.DisabledPostStartHooks.Insert(rbacrest.PostStartHookName)
@ -412,22 +437,23 @@ func BuildGenericConfig(s *options.ServerRunOptions) (*genericapiserver.Config,
externalClient,
sharedInformers,
genericConfig.Authorizer,
serviceResolver,
)
if err != nil {
return nil, nil, nil, fmt.Errorf("failed to create admission plugin initializer: %v", err)
return nil, nil, nil, nil, fmt.Errorf("failed to create admission plugin initializer: %v", err)
}
err = s.Admission.ApplyTo(
genericConfig,
pluginInitializer)
if err != nil {
return nil, nil, nil, fmt.Errorf("failed to initialize admission: %v", err)
return nil, nil, nil, nil, fmt.Errorf("failed to initialize admission: %v", err)
}
return genericConfig, sharedInformers, insecureServingOptions, nil
return genericConfig, sharedInformers, insecureServingOptions, serviceResolver, nil
}
// BuildAdmissionPluginInitializer constructs the admission plugin initializer
func BuildAdmissionPluginInitializer(s *options.ServerRunOptions, client internalclientset.Interface, externalClient clientset.Interface, sharedInformers informers.SharedInformerFactory, apiAuthorizer authorizer.Authorizer) (admission.PluginInitializer, error) {
func BuildAdmissionPluginInitializer(s *options.ServerRunOptions, client internalclientset.Interface, externalClient clientset.Interface, sharedInformers informers.SharedInformerFactory, apiAuthorizer authorizer.Authorizer, serviceResolver aggregatorapiserver.ServiceResolver) (admission.PluginInitializer, error) {
var cloudConfig []byte
if s.CloudProvider.CloudConfigFile != "" {
@ -460,6 +486,8 @@ func BuildAdmissionPluginInitializer(s *options.ServerRunOptions, client interna
pluginInitializer = pluginInitializer.SetClientCert(certBytes, keyBytes)
}
pluginInitializer = pluginInitializer.SetServiceResolver(serviceResolver)
return pluginInitializer, nil
}

View File

@ -37,6 +37,7 @@ go_library(
"apiservice_controller.go",
"handler_apis.go",
"handler_proxy.go",
"resolvers.go",
],
tags = ["automanaged"],
deps = [

View File

@ -20,7 +20,6 @@ import (
"context"
"encoding/json"
"net/http"
"net/url"
"sync"
"time"
@ -34,10 +33,8 @@ import (
genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/registry/rest"
genericapiserver "k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/util/proxy"
kubeinformers "k8s.io/client-go/informers"
kubeclientset "k8s.io/client-go/kubernetes"
listersv1 "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/pkg/version"
"bytes"
@ -89,19 +86,6 @@ func init() {
// legacyAPIServiceName is the fixed name of the only non-groupified API version
const legacyAPIServiceName = "v1."
type ServiceResolver interface {
ResolveEndpoint(namespace, name string) (*url.URL, error)
}
type aggregatorEndpointRouting struct {
services listersv1.ServiceLister
endpoints listersv1.EndpointsLister
}
type aggregatorClusterRouting struct {
services listersv1.ServiceLister
}
type Config struct {
GenericConfig *genericapiserver.Config
CoreAPIServerClient kubeclientset.Interface
@ -110,10 +94,19 @@ type Config struct {
// this to confirm the proxy's identity
ProxyClientCert []byte
ProxyClientKey []byte
ProxyTransport *http.Transport
// Indicates if the Aggregator should send to the cluster IP (false) or route to the endpoints IP (true)
// If present, the Dial method will be used for dialing out to delegate
// apiservers.
ProxyTransport *http.Transport
// Indicates if the Aggregator should send to the service's cluster IP
// (false) or route to the one of the service's endpoint's IP (true);
// if ServiceResolver is provided, then this is ignored.
EnableAggregatorRouting bool
// Mechanism by which the Aggregator will resolve services. If nil,
// constructed based on the value of EnableAggregatorRouting.
ServiceResolver ServiceResolver
}
// APIAggregator contains state for a Kubernetes cluster master/api server.
@ -186,14 +179,6 @@ func (c *Config) SkipComplete() completedConfig {
return completedConfig{c}
}
func (r *aggregatorEndpointRouting) ResolveEndpoint(namespace, name string) (*url.URL, error) {
return proxy.ResolveEndpoint(r.services, r.endpoints, namespace, name)
}
func (r *aggregatorClusterRouting) ResolveEndpoint(namespace, name string) (*url.URL, error) {
return proxy.ResolveCluster(r.services, namespace, name)
}
// New returns a new instance of APIAggregator from the given config.
func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.DelegationTarget) (*APIAggregator, error) {
genericServer, err := c.Config.GenericConfig.SkipComplete().New("kube-aggregator", delegationTarget) // completion is done in Complete, no need for a second time
@ -211,15 +196,15 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg
)
kubeInformers := kubeinformers.NewSharedInformerFactory(c.CoreAPIServerClient, 5*time.Minute)
var routing ServiceResolver
if c.EnableAggregatorRouting {
routing = &aggregatorEndpointRouting{
services: kubeInformers.Core().V1().Services().Lister(),
endpoints: kubeInformers.Core().V1().Endpoints().Lister(),
}
} else {
routing = &aggregatorClusterRouting{
services: kubeInformers.Core().V1().Services().Lister(),
var routing ServiceResolver = c.ServiceResolver
if routing == nil {
if c.EnableAggregatorRouting {
routing = NewEndpointServiceResolver(
kubeInformers.Core().V1().Services().Lister(),
kubeInformers.Core().V1().Endpoints().Lister(),
)
} else {
routing = NewClusterIPServiceResolver(kubeInformers.Core().V1().Services().Lister())
}
}

View File

@ -0,0 +1,63 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package apiserver
import (
"net/url"
"k8s.io/apiserver/pkg/util/proxy"
listersv1 "k8s.io/client-go/listers/core/v1"
)
// A ServiceResolver knows how to get a URL given a service.
type ServiceResolver interface {
ResolveEndpoint(namespace, name string) (*url.URL, error)
}
// NewEndpointServiceResolver returns a ServiceResolver that chooses one of the
// service's endpoints.
func NewEndpointServiceResolver(services listersv1.ServiceLister, endpoints listersv1.EndpointsLister) ServiceResolver {
return &aggregatorEndpointRouting{
services: services,
endpoints: endpoints,
}
}
type aggregatorEndpointRouting struct {
services listersv1.ServiceLister
endpoints listersv1.EndpointsLister
}
func (r *aggregatorEndpointRouting) ResolveEndpoint(namespace, name string) (*url.URL, error) {
return proxy.ResolveEndpoint(r.services, r.endpoints, namespace, name)
}
// NewEndpointServiceResolver returns a ServiceResolver that directly calls the
// service's cluster IP.
func NewClusterIPServiceResolver(services listersv1.ServiceLister) ServiceResolver {
return &aggregatorClusterRouting{
services: services,
}
}
type aggregatorClusterRouting struct {
services listersv1.ServiceLister
}
func (r *aggregatorClusterRouting) ResolveEndpoint(namespace, name string) (*url.URL, error) {
return proxy.ResolveCluster(r.services, namespace, name)
}

View File

@ -608,7 +608,7 @@ func startRealMasterOrDie(t *testing.T, certDir string) (*allClient, clientv3.KV
if err != nil {
t.Fatal(err)
}
kubeAPIServerConfig, sharedInformers, _, err := app.CreateKubeAPIServerConfig(kubeAPIServerOptions, tunneler, proxyTransport)
kubeAPIServerConfig, sharedInformers, _, _, err := app.CreateKubeAPIServerConfig(kubeAPIServerOptions, tunneler, proxyTransport)
if err != nil {
t.Fatal(err)
}

View File

@ -116,7 +116,7 @@ func TestAggregatedAPIServer(t *testing.T) {
if err != nil {
t.Fatal(err)
}
kubeAPIServerConfig, sharedInformers, _, err := app.CreateKubeAPIServerConfig(kubeAPIServerOptions, tunneler, proxyTransport)
kubeAPIServerConfig, sharedInformers, _, _, err := app.CreateKubeAPIServerConfig(kubeAPIServerOptions, tunneler, proxyTransport)
if err != nil {
t.Fatal(err)
}