restore working aggregator and avoid duplicate informers

This commit is contained in:
deads2k 2017-06-21 13:02:52 -04:00
parent a3501fb994
commit f525c0815e
10 changed files with 71 additions and 88 deletions

View File

@ -34,7 +34,7 @@ import (
genericapiserver "k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/server/healthz"
genericoptions "k8s.io/apiserver/pkg/server/options"
kubeclientset "k8s.io/client-go/kubernetes"
kubeexternalinformers "k8s.io/client-go/informers"
"k8s.io/kube-aggregator/pkg/apis/apiregistration"
"k8s.io/kube-aggregator/pkg/apis/apiregistration/v1beta1"
aggregatorapiserver "k8s.io/kube-aggregator/pkg/apiserver"
@ -45,7 +45,7 @@ import (
"k8s.io/kubernetes/pkg/master/thirdparty"
)
func createAggregatorConfig(kubeAPIServerConfig genericapiserver.Config, commandOptions *options.ServerRunOptions, proxyTransport *http.Transport) (*aggregatorapiserver.Config, error) {
func createAggregatorConfig(kubeAPIServerConfig genericapiserver.Config, commandOptions *options.ServerRunOptions, externalInformers kubeexternalinformers.SharedInformerFactory, serviceResolver aggregatorapiserver.ServiceResolver, proxyTransport *http.Transport) (*aggregatorapiserver.Config, error) {
// make a shallow copy to let us twiddle a few things
// most of the config actually remains the same. We only need to mess with a couple items related to the particulars of the aggregator
genericConfig := kubeAPIServerConfig
@ -60,11 +60,7 @@ func createAggregatorConfig(kubeAPIServerConfig genericapiserver.Config, command
etcdOptions.StorageConfig.Copier = aggregatorapiserver.Scheme
genericConfig.RESTOptionsGetter = &genericoptions.SimpleRestOptionsFactory{Options: etcdOptions}
client, err := kubeclientset.NewForConfig(genericConfig.LoopbackClientConfig)
if err != nil {
return nil, err
}
var err error
var certBytes, keyBytes []byte
if len(commandOptions.ProxyClientCertFile) > 0 && len(commandOptions.ProxyClientKeyFile) > 0 {
certBytes, err = ioutil.ReadFile(commandOptions.ProxyClientCertFile)
@ -78,12 +74,12 @@ func createAggregatorConfig(kubeAPIServerConfig genericapiserver.Config, command
}
aggregatorConfig := &aggregatorapiserver.Config{
GenericConfig: &genericConfig,
CoreAPIServerClient: client,
ProxyClientCert: certBytes,
ProxyClientKey: keyBytes,
ProxyTransport: proxyTransport,
EnableAggregatorRouting: commandOptions.EnableAggregatorRouting,
GenericConfig: &genericConfig,
CoreKubeInformers: externalInformers,
ProxyClientCert: certBytes,
ProxyClientKey: keyBytes,
ServiceResolver: serviceResolver,
ProxyTransport: proxyTransport,
}
return aggregatorConfig, nil

View File

@ -116,7 +116,7 @@ func Run(runOptions *options.ServerRunOptions, stopCh <-chan struct{}) error {
return err
}
kubeAPIServerConfig, sharedInformers, insecureServingOptions, serviceResolver, err := CreateKubeAPIServerConfig(runOptions, nodeTunneler, proxyTransport)
kubeAPIServerConfig, sharedInformers, versionedInformers, insecureServingOptions, serviceResolver, err := CreateKubeAPIServerConfig(runOptions, nodeTunneler, proxyTransport)
if err != nil {
return err
}
@ -155,7 +155,7 @@ func Run(runOptions *options.ServerRunOptions, stopCh <-chan struct{}) error {
kubeAPIServer.GenericAPIServer.PrepareRun()
// aggregator comes last in the chain
aggregatorConfig, err := createAggregatorConfig(*kubeAPIServerConfig.GenericConfig, runOptions, proxyTransport)
aggregatorConfig, err := createAggregatorConfig(*kubeAPIServerConfig.GenericConfig, runOptions, versionedInformers, serviceResolver, proxyTransport)
if err != nil {
return err
}
@ -237,27 +237,27 @@ func CreateNodeDialer(s *options.ServerRunOptions) (tunneler.Tunneler, *http.Tra
}
// CreateKubeAPIServerConfig creates all the resources for running the API server, but runs none of them
func CreateKubeAPIServerConfig(s *options.ServerRunOptions, nodeTunneler tunneler.Tunneler, proxyTransport http.RoundTripper) (*master.Config, informers.SharedInformerFactory, *kubeserver.InsecureServingInfo, aggregatorapiserver.ServiceResolver, error) {
func CreateKubeAPIServerConfig(s *options.ServerRunOptions, nodeTunneler tunneler.Tunneler, proxyTransport http.RoundTripper) (*master.Config, informers.SharedInformerFactory, clientgoinformers.SharedInformerFactory, *kubeserver.InsecureServingInfo, aggregatorapiserver.ServiceResolver, error) {
// register all admission plugins
registerAllAdmissionPlugins(s.Admission.Plugins)
// set defaults in the options before trying to create the generic config
if err := defaultOptions(s); err != nil {
return nil, nil, nil, nil, err
return nil, nil, nil, nil, nil, err
}
// validate options
if errs := s.Validate(); len(errs) != 0 {
return nil, nil, nil, nil, utilerrors.NewAggregate(errs)
return nil, nil, nil, nil, nil, utilerrors.NewAggregate(errs)
}
genericConfig, sharedInformers, insecureServingOptions, serviceResolver, err := BuildGenericConfig(s)
genericConfig, sharedInformers, versionedInformers, insecureServingOptions, serviceResolver, err := BuildGenericConfig(s)
if err != nil {
return nil, nil, nil, nil, err
return nil, nil, nil, nil, nil, err
}
if err := utilwait.PollImmediate(etcdRetryInterval, etcdRetryLimit*etcdRetryInterval, preflight.EtcdConnection{ServerList: s.Etcd.StorageConfig.ServerList}.CheckEtcdServers); err != nil {
return nil, nil, nil, nil, fmt.Errorf("error waiting for etcd connection: %v", err)
return nil, nil, nil, nil, nil, fmt.Errorf("error waiting for etcd connection: %v", err)
}
capabilities.Initialize(capabilities.Capabilities{
@ -273,21 +273,21 @@ func CreateKubeAPIServerConfig(s *options.ServerRunOptions, nodeTunneler tunnele
serviceIPRange, apiServerServiceIP, err := master.DefaultServiceIPRange(s.ServiceClusterIPRange)
if err != nil {
return nil, nil, nil, nil, err
return nil, nil, nil, nil, nil, err
}
storageFactory, err := BuildStorageFactory(s)
if err != nil {
return nil, nil, nil, nil, err
return nil, nil, nil, nil, nil, err
}
clientCA, err := readCAorNil(s.Authentication.ClientCert.ClientCA)
if err != nil {
return nil, nil, nil, nil, err
return nil, nil, nil, nil, nil, err
}
requestHeaderProxyCA, err := readCAorNil(s.Authentication.RequestHeader.ClientCAFile)
if err != nil {
return nil, nil, nil, nil, err
return nil, nil, nil, nil, nil, err
}
config := &master.Config{
@ -328,30 +328,30 @@ func CreateKubeAPIServerConfig(s *options.ServerRunOptions, nodeTunneler tunnele
config.KubeletClientConfig.Dial = nodeTunneler.Dial
}
return config, sharedInformers, insecureServingOptions, serviceResolver, nil
return config, sharedInformers, versionedInformers, insecureServingOptions, serviceResolver, nil
}
// BuildGenericConfig takes the master server options and produces the genericapiserver.Config associated with it
func BuildGenericConfig(s *options.ServerRunOptions) (*genericapiserver.Config, informers.SharedInformerFactory, *kubeserver.InsecureServingInfo, aggregatorapiserver.ServiceResolver, error) {
func BuildGenericConfig(s *options.ServerRunOptions) (*genericapiserver.Config, informers.SharedInformerFactory, clientgoinformers.SharedInformerFactory, *kubeserver.InsecureServingInfo, aggregatorapiserver.ServiceResolver, error) {
genericConfig := genericapiserver.NewConfig(api.Codecs)
if err := s.GenericServerRunOptions.ApplyTo(genericConfig); err != nil {
return nil, nil, nil, nil, err
return nil, nil, nil, nil, nil, err
}
insecureServingOptions, err := s.InsecureServing.ApplyTo(genericConfig)
if err != nil {
return nil, nil, nil, nil, err
return nil, nil, nil, nil, nil, err
}
if err := s.SecureServing.ApplyTo(genericConfig); err != nil {
return nil, nil, nil, nil, err
return nil, nil, nil, nil, nil, err
}
if err := s.Authentication.ApplyTo(genericConfig); err != nil {
return nil, nil, nil, nil, err
return nil, nil, nil, nil, nil, err
}
if err := s.Audit.ApplyTo(genericConfig); err != nil {
return nil, nil, nil, nil, err
return nil, nil, nil, nil, nil, err
}
if err := s.Features.ApplyTo(genericConfig); err != nil {
return nil, nil, nil, nil, err
return nil, nil, nil, nil, nil, err
}
genericConfig.OpenAPIConfig = genericapiserver.DefaultOpenAPIConfig(generatedopenapi.GetOpenAPIDefinitions, api.Scheme)
@ -369,10 +369,10 @@ func BuildGenericConfig(s *options.ServerRunOptions) (*genericapiserver.Config,
storageFactory, err := BuildStorageFactory(s)
if err != nil {
return nil, nil, nil, nil, err
return nil, nil, nil, nil, nil, err
}
if err := s.Etcd.ApplyWithStorageFactoryTo(storageFactory, genericConfig); err != nil {
return nil, nil, nil, nil, err
return nil, nil, nil, nil, nil, err
}
// Use protobufs for self-communication.
@ -385,7 +385,7 @@ func BuildGenericConfig(s *options.ServerRunOptions) (*genericapiserver.Config,
if err != nil {
kubeAPIVersions := os.Getenv("KUBE_API_VERSIONS")
if len(kubeAPIVersions) == 0 {
return nil, nil, nil, nil, fmt.Errorf("failed to create clientset: %v", err)
return nil, nil, nil, nil, nil, fmt.Errorf("failed to create clientset: %v", err)
}
// KUBE_API_VERSIONS is used in test-update-storage-objects.sh, disabling a number of API
@ -396,36 +396,36 @@ func BuildGenericConfig(s *options.ServerRunOptions) (*genericapiserver.Config,
}
externalClient, err := clientset.NewForConfig(genericConfig.LoopbackClientConfig)
if err != nil {
return nil, nil, nil, nil, fmt.Errorf("failed to create external clientset: %v", err)
return nil, nil, nil, nil, nil, fmt.Errorf("failed to create external clientset: %v", err)
}
sharedInformers := informers.NewSharedInformerFactory(client, 10*time.Minute)
clientgoExternalClient, err := clientgoclientset.NewForConfig(genericConfig.LoopbackClientConfig)
if err != nil {
return nil, nil, nil, nil, fmt.Errorf("failed to create real external clientset: %v", err)
return nil, nil, nil, nil, nil, fmt.Errorf("failed to create real external clientset: %v", err)
}
aggregatorInformers := clientgoinformers.NewSharedInformerFactory(clientgoExternalClient, 10*time.Minute)
versionedInformers := clientgoinformers.NewSharedInformerFactory(clientgoExternalClient, 10*time.Minute)
var serviceResolver aggregatorapiserver.ServiceResolver
if s.EnableAggregatorRouting {
serviceResolver = aggregatorapiserver.NewEndpointServiceResolver(
aggregatorInformers.Core().V1().Services().Lister(),
aggregatorInformers.Core().V1().Endpoints().Lister(),
versionedInformers.Core().V1().Services().Lister(),
versionedInformers.Core().V1().Endpoints().Lister(),
)
} else {
serviceResolver = aggregatorapiserver.NewClusterIPServiceResolver(
aggregatorInformers.Core().V1().Services().Lister(),
versionedInformers.Core().V1().Services().Lister(),
)
}
genericConfig.Authenticator, genericConfig.OpenAPIConfig.SecurityDefinitions, err = BuildAuthenticator(s, storageFactory, client, sharedInformers)
if err != nil {
return nil, nil, nil, nil, fmt.Errorf("invalid authentication config: %v", err)
return nil, nil, nil, nil, nil, fmt.Errorf("invalid authentication config: %v", err)
}
genericConfig.Authorizer, err = BuildAuthorizer(s, sharedInformers)
if err != nil {
return nil, nil, nil, nil, fmt.Errorf("invalid authorization config: %v", err)
return nil, nil, nil, nil, nil, fmt.Errorf("invalid authorization config: %v", err)
}
if !sets.NewString(s.Authorization.Modes()...).Has(modes.ModeRBAC) {
genericConfig.DisabledPostStartHooks.Insert(rbacrest.PostStartHookName)
@ -440,16 +440,16 @@ func BuildGenericConfig(s *options.ServerRunOptions) (*genericapiserver.Config,
serviceResolver,
)
if err != nil {
return nil, nil, nil, nil, fmt.Errorf("failed to create admission plugin initializer: %v", err)
return nil, nil, nil, nil, nil, fmt.Errorf("failed to create admission plugin initializer: %v", err)
}
err = s.Admission.ApplyTo(
genericConfig,
pluginInitializer)
if err != nil {
return nil, nil, nil, nil, fmt.Errorf("failed to initialize admission: %v", err)
return nil, nil, nil, nil, nil, fmt.Errorf("failed to initialize admission: %v", err)
}
return genericConfig, sharedInformers, insecureServingOptions, serviceResolver, nil
return genericConfig, sharedInformers, versionedInformers, insecureServingOptions, serviceResolver, nil
}
// BuildAdmissionPluginInitializer constructs the admission plugin initializer

View File

@ -69,7 +69,6 @@ go_library(
"//vendor/k8s.io/apiserver/pkg/util/proxy:go_default_library",
"//vendor/k8s.io/client-go/informers:go_default_library",
"//vendor/k8s.io/client-go/informers/core/v1:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
"//vendor/k8s.io/client-go/listers/core/v1:go_default_library",
"//vendor/k8s.io/client-go/pkg/api/v1:go_default_library",
"//vendor/k8s.io/client-go/pkg/version:go_default_library",

View File

@ -34,7 +34,6 @@ import (
"k8s.io/apiserver/pkg/registry/rest"
genericapiserver "k8s.io/apiserver/pkg/server"
kubeinformers "k8s.io/client-go/informers"
kubeclientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/pkg/version"
"bytes"
@ -87,8 +86,10 @@ func init() {
const legacyAPIServiceName = "v1."
type Config struct {
GenericConfig *genericapiserver.Config
CoreAPIServerClient kubeclientset.Interface
GenericConfig *genericapiserver.Config
// CoreKubeInformers is used to watch kube resources
CoreKubeInformers kubeinformers.SharedInformerFactory
// ProxyClientCert/Key are the client cert used to identify this proxy. Backing APIServices use
// this to confirm the proxy's identity
@ -99,13 +100,7 @@ type Config struct {
// apiservers.
ProxyTransport *http.Transport
// Indicates if the Aggregator should send to the service's cluster IP
// (false) or route to the one of the service's endpoint's IP (true);
// if ServiceResolver is provided, then this is ignored.
EnableAggregatorRouting bool
// Mechanism by which the Aggregator will resolve services. If nil,
// constructed based on the value of EnableAggregatorRouting.
// Mechanism by which the Aggregator will resolve services. Required.
ServiceResolver ServiceResolver
}
@ -154,7 +149,7 @@ type APIAggregator struct {
APIRegistrationInformers informers.SharedInformerFactory
// Information needed to determine routing for the aggregator
routing ServiceResolver
serviceResolver ServiceResolver
}
type completedConfig struct {
@ -194,19 +189,6 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg
apiregistrationClient,
5*time.Minute, // this is effectively used as a refresh interval right now. Might want to do something nicer later on.
)
kubeInformers := kubeinformers.NewSharedInformerFactory(c.CoreAPIServerClient, 5*time.Minute)
var routing ServiceResolver = c.ServiceResolver
if routing == nil {
if c.EnableAggregatorRouting {
routing = NewEndpointServiceResolver(
kubeInformers.Core().V1().Services().Lister(),
kubeInformers.Core().V1().Endpoints().Lister(),
)
} else {
routing = NewClusterIPServiceResolver(kubeInformers.Core().V1().Services().Lister())
}
}
s := &APIAggregator{
GenericAPIServer: genericServer,
@ -222,7 +204,7 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg
handledGroups: sets.String{},
lister: informerFactory.Apiregistration().InternalVersion().APIServices().Lister(),
APIRegistrationInformers: informerFactory,
routing: routing,
serviceResolver: c.ServiceResolver,
}
apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(apiregistration.GroupName, registry, Scheme, metav1.ParameterCodec, Codecs)
@ -245,17 +227,17 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg
s.GenericAPIServer.Handler.NonGoRestfulMux.Handle("/apis", apisHandler)
s.GenericAPIServer.Handler.NonGoRestfulMux.UnlistedHandle("/apis/", apisHandler)
apiserviceRegistrationController := NewAPIServiceRegistrationController(informerFactory.Apiregistration().InternalVersion().APIServices(), kubeInformers.Core().V1().Services(), s)
apiserviceRegistrationController := NewAPIServiceRegistrationController(informerFactory.Apiregistration().InternalVersion().APIServices(), c.CoreKubeInformers.Core().V1().Services(), s)
availableController := statuscontrollers.NewAvailableConditionController(
informerFactory.Apiregistration().InternalVersion().APIServices(),
kubeInformers.Core().V1().Services(),
kubeInformers.Core().V1().Endpoints(),
c.CoreKubeInformers.Core().V1().Services(),
c.CoreKubeInformers.Core().V1().Endpoints(),
apiregistrationClient.Apiregistration(),
)
s.GenericAPIServer.AddPostStartHook("start-kube-aggregator-informers", func(context genericapiserver.PostStartHookContext) error {
informerFactory.Start(context.StopCh)
kubeInformers.Start(context.StopCh)
c.CoreKubeInformers.Start(context.StopCh)
return nil
})
s.GenericAPIServer.AddPostStartHook("apiservice-registration-controller", func(context genericapiserver.PostStartHookContext) error {
@ -307,7 +289,7 @@ func (s *APIAggregator) AddAPIService(apiService *apiregistration.APIService) {
proxyClientCert: s.proxyClientCert,
proxyClientKey: s.proxyClientKey,
proxyTransport: s.proxyTransport,
routing: s.routing,
serviceResolver: s.serviceResolver,
}
proxyHandler.updateAPIService(apiService)
s.proxyHandlers[apiService.Name] = proxyHandler
@ -384,7 +366,7 @@ func (_ *APIAggregator) loadOpenAPISpec(p *proxyHandler, r *http.Request) (*spec
if handlingInfo.local {
return nil, nil
}
loc, err := p.routing.ResolveEndpoint(handlingInfo.serviceNamespace, handlingInfo.serviceName)
loc, err := p.serviceResolver.ResolveEndpoint(handlingInfo.serviceNamespace, handlingInfo.serviceName)
if err != nil {
return nil, fmt.Errorf("missing route")
}

View File

@ -54,7 +54,7 @@ type proxyHandler struct {
proxyTransport *http.Transport
// Endpoints based routing to map from cluster IP to routable IP
routing ServiceResolver
serviceResolver ServiceResolver
handlingInfo atomic.Value
}
@ -111,7 +111,7 @@ func (r *proxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
// write a new location based on the existing request pointed at the target service
location := &url.URL{}
location.Scheme = "https"
rloc, err := r.routing.ResolveEndpoint(handlingInfo.serviceNamespace, handlingInfo.serviceName)
rloc, err := r.serviceResolver.ResolveEndpoint(handlingInfo.serviceNamespace, handlingInfo.serviceName)
if err != nil {
http.Error(w, fmt.Sprintf("missing route (%s)", err.Error()), http.StatusInternalServerError)
return

View File

@ -170,9 +170,9 @@ func TestProxyHandler(t *testing.T) {
func() {
handler := &proxyHandler{
localDelegate: http.NewServeMux(),
routing: &mockedRouter{destinationHost: targetServer.Listener.Addr().String()},
proxyTransport: &http.Transport{},
localDelegate: http.NewServeMux(),
serviceResolver: &mockedRouter{destinationHost: targetServer.Listener.Addr().String()},
proxyTransport: &http.Transport{},
}
handler.contextMapper = &fakeRequestContextMapper{user: tc.user}
server := httptest.NewServer(handler)

View File

@ -18,6 +18,7 @@ go_library(
"//vendor/k8s.io/apiserver/pkg/server:go_default_library",
"//vendor/k8s.io/apiserver/pkg/server/filters:go_default_library",
"//vendor/k8s.io/apiserver/pkg/server/options:go_default_library",
"//vendor/k8s.io/client-go/informers:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
"//vendor/k8s.io/client-go/rest:go_default_library",
"//vendor/k8s.io/client-go/tools/clientcmd:go_default_library",

View File

@ -20,6 +20,7 @@ import (
"fmt"
"io"
"io/ioutil"
"time"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
@ -28,6 +29,7 @@ import (
genericapiserver "k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/server/filters"
genericoptions "k8s.io/apiserver/pkg/server/options"
kubeinformers "k8s.io/client-go/informers"
kubeclientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
@ -142,10 +144,13 @@ func (o AggregatorOptions) RunAggregator(stopCh <-chan struct{}) error {
if err != nil {
return err
}
kubeInformers := kubeinformers.NewSharedInformerFactory(coreAPIServerClient, 5*time.Minute)
serviceResolver := apiserver.NewClusterIPServiceResolver(kubeInformers.Core().V1().Services().Lister())
config := apiserver.Config{
GenericConfig: serverConfig,
CoreAPIServerClient: coreAPIServerClient,
GenericConfig: serverConfig,
CoreKubeInformers: kubeInformers,
ServiceResolver: serviceResolver,
}
config.ProxyClientCert, err = ioutil.ReadFile(o.ProxyClientCertFile)

View File

@ -608,7 +608,7 @@ func startRealMasterOrDie(t *testing.T, certDir string) (*allClient, clientv3.KV
if err != nil {
t.Fatal(err)
}
kubeAPIServerConfig, sharedInformers, _, _, err := app.CreateKubeAPIServerConfig(kubeAPIServerOptions, tunneler, proxyTransport)
kubeAPIServerConfig, sharedInformers, _, _, _, err := app.CreateKubeAPIServerConfig(kubeAPIServerOptions, tunneler, proxyTransport)
if err != nil {
t.Fatal(err)
}

View File

@ -116,7 +116,7 @@ func TestAggregatedAPIServer(t *testing.T) {
if err != nil {
t.Fatal(err)
}
kubeAPIServerConfig, sharedInformers, _, _, err := app.CreateKubeAPIServerConfig(kubeAPIServerOptions, tunneler, proxyTransport)
kubeAPIServerConfig, sharedInformers, _, _, _, err := app.CreateKubeAPIServerConfig(kubeAPIServerOptions, tunneler, proxyTransport)
if err != nil {
t.Fatal(err)
}