From f525c0815e06a3e32971756047fb2f8135fbfe11 Mon Sep 17 00:00:00 2001 From: deads2k Date: Wed, 21 Jun 2017 13:02:52 -0400 Subject: [PATCH] restore working aggregator and avoid duplicate informers --- cmd/kube-apiserver/app/aggregator.go | 22 +++--- cmd/kube-apiserver/app/server.go | 68 +++++++++---------- .../kube-aggregator/pkg/apiserver/BUILD | 1 - .../pkg/apiserver/apiserver.go | 44 ++++-------- .../pkg/apiserver/handler_proxy.go | 4 +- .../pkg/apiserver/handler_proxy_test.go | 6 +- .../kube-aggregator/pkg/cmd/server/BUILD | 1 + .../kube-aggregator/pkg/cmd/server/start.go | 9 ++- .../etcd/etcd_storage_path_test.go | 2 +- test/integration/examples/apiserver_test.go | 2 +- 10 files changed, 71 insertions(+), 88 deletions(-) diff --git a/cmd/kube-apiserver/app/aggregator.go b/cmd/kube-apiserver/app/aggregator.go index e3f413b1c9a..e146d98c9b9 100644 --- a/cmd/kube-apiserver/app/aggregator.go +++ b/cmd/kube-apiserver/app/aggregator.go @@ -34,7 +34,7 @@ import ( genericapiserver "k8s.io/apiserver/pkg/server" "k8s.io/apiserver/pkg/server/healthz" genericoptions "k8s.io/apiserver/pkg/server/options" - kubeclientset "k8s.io/client-go/kubernetes" + kubeexternalinformers "k8s.io/client-go/informers" "k8s.io/kube-aggregator/pkg/apis/apiregistration" "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1beta1" aggregatorapiserver "k8s.io/kube-aggregator/pkg/apiserver" @@ -45,7 +45,7 @@ import ( "k8s.io/kubernetes/pkg/master/thirdparty" ) -func createAggregatorConfig(kubeAPIServerConfig genericapiserver.Config, commandOptions *options.ServerRunOptions, proxyTransport *http.Transport) (*aggregatorapiserver.Config, error) { +func createAggregatorConfig(kubeAPIServerConfig genericapiserver.Config, commandOptions *options.ServerRunOptions, externalInformers kubeexternalinformers.SharedInformerFactory, serviceResolver aggregatorapiserver.ServiceResolver, proxyTransport *http.Transport) (*aggregatorapiserver.Config, error) { // make a shallow copy to let us twiddle a few things // most of the config actually remains the same. We only need to mess with a couple items related to the particulars of the aggregator genericConfig := kubeAPIServerConfig @@ -60,11 +60,7 @@ func createAggregatorConfig(kubeAPIServerConfig genericapiserver.Config, command etcdOptions.StorageConfig.Copier = aggregatorapiserver.Scheme genericConfig.RESTOptionsGetter = &genericoptions.SimpleRestOptionsFactory{Options: etcdOptions} - client, err := kubeclientset.NewForConfig(genericConfig.LoopbackClientConfig) - if err != nil { - return nil, err - } - + var err error var certBytes, keyBytes []byte if len(commandOptions.ProxyClientCertFile) > 0 && len(commandOptions.ProxyClientKeyFile) > 0 { certBytes, err = ioutil.ReadFile(commandOptions.ProxyClientCertFile) @@ -78,12 +74,12 @@ func createAggregatorConfig(kubeAPIServerConfig genericapiserver.Config, command } aggregatorConfig := &aggregatorapiserver.Config{ - GenericConfig: &genericConfig, - CoreAPIServerClient: client, - ProxyClientCert: certBytes, - ProxyClientKey: keyBytes, - ProxyTransport: proxyTransport, - EnableAggregatorRouting: commandOptions.EnableAggregatorRouting, + GenericConfig: &genericConfig, + CoreKubeInformers: externalInformers, + ProxyClientCert: certBytes, + ProxyClientKey: keyBytes, + ServiceResolver: serviceResolver, + ProxyTransport: proxyTransport, } return aggregatorConfig, nil diff --git a/cmd/kube-apiserver/app/server.go b/cmd/kube-apiserver/app/server.go index 5ee242e2653..a5c03e4ef75 100644 --- a/cmd/kube-apiserver/app/server.go +++ b/cmd/kube-apiserver/app/server.go @@ -116,7 +116,7 @@ func Run(runOptions *options.ServerRunOptions, stopCh <-chan struct{}) error { return err } - kubeAPIServerConfig, sharedInformers, insecureServingOptions, serviceResolver, err := CreateKubeAPIServerConfig(runOptions, nodeTunneler, proxyTransport) + kubeAPIServerConfig, sharedInformers, versionedInformers, insecureServingOptions, serviceResolver, err := CreateKubeAPIServerConfig(runOptions, nodeTunneler, proxyTransport) if err != nil { return err } @@ -155,7 +155,7 @@ func Run(runOptions *options.ServerRunOptions, stopCh <-chan struct{}) error { kubeAPIServer.GenericAPIServer.PrepareRun() // aggregator comes last in the chain - aggregatorConfig, err := createAggregatorConfig(*kubeAPIServerConfig.GenericConfig, runOptions, proxyTransport) + aggregatorConfig, err := createAggregatorConfig(*kubeAPIServerConfig.GenericConfig, runOptions, versionedInformers, serviceResolver, proxyTransport) if err != nil { return err } @@ -237,27 +237,27 @@ func CreateNodeDialer(s *options.ServerRunOptions) (tunneler.Tunneler, *http.Tra } // CreateKubeAPIServerConfig creates all the resources for running the API server, but runs none of them -func CreateKubeAPIServerConfig(s *options.ServerRunOptions, nodeTunneler tunneler.Tunneler, proxyTransport http.RoundTripper) (*master.Config, informers.SharedInformerFactory, *kubeserver.InsecureServingInfo, aggregatorapiserver.ServiceResolver, error) { +func CreateKubeAPIServerConfig(s *options.ServerRunOptions, nodeTunneler tunneler.Tunneler, proxyTransport http.RoundTripper) (*master.Config, informers.SharedInformerFactory, clientgoinformers.SharedInformerFactory, *kubeserver.InsecureServingInfo, aggregatorapiserver.ServiceResolver, error) { // register all admission plugins registerAllAdmissionPlugins(s.Admission.Plugins) // set defaults in the options before trying to create the generic config if err := defaultOptions(s); err != nil { - return nil, nil, nil, nil, err + return nil, nil, nil, nil, nil, err } // validate options if errs := s.Validate(); len(errs) != 0 { - return nil, nil, nil, nil, utilerrors.NewAggregate(errs) + return nil, nil, nil, nil, nil, utilerrors.NewAggregate(errs) } - genericConfig, sharedInformers, insecureServingOptions, serviceResolver, err := BuildGenericConfig(s) + genericConfig, sharedInformers, versionedInformers, insecureServingOptions, serviceResolver, err := BuildGenericConfig(s) if err != nil { - return nil, nil, nil, nil, err + return nil, nil, nil, nil, nil, err } if err := utilwait.PollImmediate(etcdRetryInterval, etcdRetryLimit*etcdRetryInterval, preflight.EtcdConnection{ServerList: s.Etcd.StorageConfig.ServerList}.CheckEtcdServers); err != nil { - return nil, nil, nil, nil, fmt.Errorf("error waiting for etcd connection: %v", err) + return nil, nil, nil, nil, nil, fmt.Errorf("error waiting for etcd connection: %v", err) } capabilities.Initialize(capabilities.Capabilities{ @@ -273,21 +273,21 @@ func CreateKubeAPIServerConfig(s *options.ServerRunOptions, nodeTunneler tunnele serviceIPRange, apiServerServiceIP, err := master.DefaultServiceIPRange(s.ServiceClusterIPRange) if err != nil { - return nil, nil, nil, nil, err + return nil, nil, nil, nil, nil, err } storageFactory, err := BuildStorageFactory(s) if err != nil { - return nil, nil, nil, nil, err + return nil, nil, nil, nil, nil, err } clientCA, err := readCAorNil(s.Authentication.ClientCert.ClientCA) if err != nil { - return nil, nil, nil, nil, err + return nil, nil, nil, nil, nil, err } requestHeaderProxyCA, err := readCAorNil(s.Authentication.RequestHeader.ClientCAFile) if err != nil { - return nil, nil, nil, nil, err + return nil, nil, nil, nil, nil, err } config := &master.Config{ @@ -328,30 +328,30 @@ func CreateKubeAPIServerConfig(s *options.ServerRunOptions, nodeTunneler tunnele config.KubeletClientConfig.Dial = nodeTunneler.Dial } - return config, sharedInformers, insecureServingOptions, serviceResolver, nil + return config, sharedInformers, versionedInformers, insecureServingOptions, serviceResolver, nil } // BuildGenericConfig takes the master server options and produces the genericapiserver.Config associated with it -func BuildGenericConfig(s *options.ServerRunOptions) (*genericapiserver.Config, informers.SharedInformerFactory, *kubeserver.InsecureServingInfo, aggregatorapiserver.ServiceResolver, error) { +func BuildGenericConfig(s *options.ServerRunOptions) (*genericapiserver.Config, informers.SharedInformerFactory, clientgoinformers.SharedInformerFactory, *kubeserver.InsecureServingInfo, aggregatorapiserver.ServiceResolver, error) { genericConfig := genericapiserver.NewConfig(api.Codecs) if err := s.GenericServerRunOptions.ApplyTo(genericConfig); err != nil { - return nil, nil, nil, nil, err + return nil, nil, nil, nil, nil, err } insecureServingOptions, err := s.InsecureServing.ApplyTo(genericConfig) if err != nil { - return nil, nil, nil, nil, err + return nil, nil, nil, nil, nil, err } if err := s.SecureServing.ApplyTo(genericConfig); err != nil { - return nil, nil, nil, nil, err + return nil, nil, nil, nil, nil, err } if err := s.Authentication.ApplyTo(genericConfig); err != nil { - return nil, nil, nil, nil, err + return nil, nil, nil, nil, nil, err } if err := s.Audit.ApplyTo(genericConfig); err != nil { - return nil, nil, nil, nil, err + return nil, nil, nil, nil, nil, err } if err := s.Features.ApplyTo(genericConfig); err != nil { - return nil, nil, nil, nil, err + return nil, nil, nil, nil, nil, err } genericConfig.OpenAPIConfig = genericapiserver.DefaultOpenAPIConfig(generatedopenapi.GetOpenAPIDefinitions, api.Scheme) @@ -369,10 +369,10 @@ func BuildGenericConfig(s *options.ServerRunOptions) (*genericapiserver.Config, storageFactory, err := BuildStorageFactory(s) if err != nil { - return nil, nil, nil, nil, err + return nil, nil, nil, nil, nil, err } if err := s.Etcd.ApplyWithStorageFactoryTo(storageFactory, genericConfig); err != nil { - return nil, nil, nil, nil, err + return nil, nil, nil, nil, nil, err } // Use protobufs for self-communication. @@ -385,7 +385,7 @@ func BuildGenericConfig(s *options.ServerRunOptions) (*genericapiserver.Config, if err != nil { kubeAPIVersions := os.Getenv("KUBE_API_VERSIONS") if len(kubeAPIVersions) == 0 { - return nil, nil, nil, nil, fmt.Errorf("failed to create clientset: %v", err) + return nil, nil, nil, nil, nil, fmt.Errorf("failed to create clientset: %v", err) } // KUBE_API_VERSIONS is used in test-update-storage-objects.sh, disabling a number of API @@ -396,36 +396,36 @@ func BuildGenericConfig(s *options.ServerRunOptions) (*genericapiserver.Config, } externalClient, err := clientset.NewForConfig(genericConfig.LoopbackClientConfig) if err != nil { - return nil, nil, nil, nil, fmt.Errorf("failed to create external clientset: %v", err) + return nil, nil, nil, nil, nil, fmt.Errorf("failed to create external clientset: %v", err) } sharedInformers := informers.NewSharedInformerFactory(client, 10*time.Minute) clientgoExternalClient, err := clientgoclientset.NewForConfig(genericConfig.LoopbackClientConfig) if err != nil { - return nil, nil, nil, nil, fmt.Errorf("failed to create real external clientset: %v", err) + return nil, nil, nil, nil, nil, fmt.Errorf("failed to create real external clientset: %v", err) } - aggregatorInformers := clientgoinformers.NewSharedInformerFactory(clientgoExternalClient, 10*time.Minute) + versionedInformers := clientgoinformers.NewSharedInformerFactory(clientgoExternalClient, 10*time.Minute) var serviceResolver aggregatorapiserver.ServiceResolver if s.EnableAggregatorRouting { serviceResolver = aggregatorapiserver.NewEndpointServiceResolver( - aggregatorInformers.Core().V1().Services().Lister(), - aggregatorInformers.Core().V1().Endpoints().Lister(), + versionedInformers.Core().V1().Services().Lister(), + versionedInformers.Core().V1().Endpoints().Lister(), ) } else { serviceResolver = aggregatorapiserver.NewClusterIPServiceResolver( - aggregatorInformers.Core().V1().Services().Lister(), + versionedInformers.Core().V1().Services().Lister(), ) } genericConfig.Authenticator, genericConfig.OpenAPIConfig.SecurityDefinitions, err = BuildAuthenticator(s, storageFactory, client, sharedInformers) if err != nil { - return nil, nil, nil, nil, fmt.Errorf("invalid authentication config: %v", err) + return nil, nil, nil, nil, nil, fmt.Errorf("invalid authentication config: %v", err) } genericConfig.Authorizer, err = BuildAuthorizer(s, sharedInformers) if err != nil { - return nil, nil, nil, nil, fmt.Errorf("invalid authorization config: %v", err) + return nil, nil, nil, nil, nil, fmt.Errorf("invalid authorization config: %v", err) } if !sets.NewString(s.Authorization.Modes()...).Has(modes.ModeRBAC) { genericConfig.DisabledPostStartHooks.Insert(rbacrest.PostStartHookName) @@ -440,16 +440,16 @@ func BuildGenericConfig(s *options.ServerRunOptions) (*genericapiserver.Config, serviceResolver, ) if err != nil { - return nil, nil, nil, nil, fmt.Errorf("failed to create admission plugin initializer: %v", err) + return nil, nil, nil, nil, nil, fmt.Errorf("failed to create admission plugin initializer: %v", err) } err = s.Admission.ApplyTo( genericConfig, pluginInitializer) if err != nil { - return nil, nil, nil, nil, fmt.Errorf("failed to initialize admission: %v", err) + return nil, nil, nil, nil, nil, fmt.Errorf("failed to initialize admission: %v", err) } - return genericConfig, sharedInformers, insecureServingOptions, serviceResolver, nil + return genericConfig, sharedInformers, versionedInformers, insecureServingOptions, serviceResolver, nil } // BuildAdmissionPluginInitializer constructs the admission plugin initializer diff --git a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/BUILD b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/BUILD index f3bf2c678ae..bea2e5baf35 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/BUILD +++ b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/BUILD @@ -69,7 +69,6 @@ go_library( "//vendor/k8s.io/apiserver/pkg/util/proxy:go_default_library", "//vendor/k8s.io/client-go/informers:go_default_library", "//vendor/k8s.io/client-go/informers/core/v1:go_default_library", - "//vendor/k8s.io/client-go/kubernetes:go_default_library", "//vendor/k8s.io/client-go/listers/core/v1:go_default_library", "//vendor/k8s.io/client-go/pkg/api/v1:go_default_library", "//vendor/k8s.io/client-go/pkg/version:go_default_library", diff --git a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go index db7712dac9b..5e07d36ecbf 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go @@ -34,7 +34,6 @@ import ( "k8s.io/apiserver/pkg/registry/rest" genericapiserver "k8s.io/apiserver/pkg/server" kubeinformers "k8s.io/client-go/informers" - kubeclientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/pkg/version" "bytes" @@ -87,8 +86,10 @@ func init() { const legacyAPIServiceName = "v1." type Config struct { - GenericConfig *genericapiserver.Config - CoreAPIServerClient kubeclientset.Interface + GenericConfig *genericapiserver.Config + + // CoreKubeInformers is used to watch kube resources + CoreKubeInformers kubeinformers.SharedInformerFactory // ProxyClientCert/Key are the client cert used to identify this proxy. Backing APIServices use // this to confirm the proxy's identity @@ -99,13 +100,7 @@ type Config struct { // apiservers. ProxyTransport *http.Transport - // Indicates if the Aggregator should send to the service's cluster IP - // (false) or route to the one of the service's endpoint's IP (true); - // if ServiceResolver is provided, then this is ignored. - EnableAggregatorRouting bool - - // Mechanism by which the Aggregator will resolve services. If nil, - // constructed based on the value of EnableAggregatorRouting. + // Mechanism by which the Aggregator will resolve services. Required. ServiceResolver ServiceResolver } @@ -154,7 +149,7 @@ type APIAggregator struct { APIRegistrationInformers informers.SharedInformerFactory // Information needed to determine routing for the aggregator - routing ServiceResolver + serviceResolver ServiceResolver } type completedConfig struct { @@ -194,19 +189,6 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg apiregistrationClient, 5*time.Minute, // this is effectively used as a refresh interval right now. Might want to do something nicer later on. ) - kubeInformers := kubeinformers.NewSharedInformerFactory(c.CoreAPIServerClient, 5*time.Minute) - - var routing ServiceResolver = c.ServiceResolver - if routing == nil { - if c.EnableAggregatorRouting { - routing = NewEndpointServiceResolver( - kubeInformers.Core().V1().Services().Lister(), - kubeInformers.Core().V1().Endpoints().Lister(), - ) - } else { - routing = NewClusterIPServiceResolver(kubeInformers.Core().V1().Services().Lister()) - } - } s := &APIAggregator{ GenericAPIServer: genericServer, @@ -222,7 +204,7 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg handledGroups: sets.String{}, lister: informerFactory.Apiregistration().InternalVersion().APIServices().Lister(), APIRegistrationInformers: informerFactory, - routing: routing, + serviceResolver: c.ServiceResolver, } apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(apiregistration.GroupName, registry, Scheme, metav1.ParameterCodec, Codecs) @@ -245,17 +227,17 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg s.GenericAPIServer.Handler.NonGoRestfulMux.Handle("/apis", apisHandler) s.GenericAPIServer.Handler.NonGoRestfulMux.UnlistedHandle("/apis/", apisHandler) - apiserviceRegistrationController := NewAPIServiceRegistrationController(informerFactory.Apiregistration().InternalVersion().APIServices(), kubeInformers.Core().V1().Services(), s) + apiserviceRegistrationController := NewAPIServiceRegistrationController(informerFactory.Apiregistration().InternalVersion().APIServices(), c.CoreKubeInformers.Core().V1().Services(), s) availableController := statuscontrollers.NewAvailableConditionController( informerFactory.Apiregistration().InternalVersion().APIServices(), - kubeInformers.Core().V1().Services(), - kubeInformers.Core().V1().Endpoints(), + c.CoreKubeInformers.Core().V1().Services(), + c.CoreKubeInformers.Core().V1().Endpoints(), apiregistrationClient.Apiregistration(), ) s.GenericAPIServer.AddPostStartHook("start-kube-aggregator-informers", func(context genericapiserver.PostStartHookContext) error { informerFactory.Start(context.StopCh) - kubeInformers.Start(context.StopCh) + c.CoreKubeInformers.Start(context.StopCh) return nil }) s.GenericAPIServer.AddPostStartHook("apiservice-registration-controller", func(context genericapiserver.PostStartHookContext) error { @@ -307,7 +289,7 @@ func (s *APIAggregator) AddAPIService(apiService *apiregistration.APIService) { proxyClientCert: s.proxyClientCert, proxyClientKey: s.proxyClientKey, proxyTransport: s.proxyTransport, - routing: s.routing, + serviceResolver: s.serviceResolver, } proxyHandler.updateAPIService(apiService) s.proxyHandlers[apiService.Name] = proxyHandler @@ -384,7 +366,7 @@ func (_ *APIAggregator) loadOpenAPISpec(p *proxyHandler, r *http.Request) (*spec if handlingInfo.local { return nil, nil } - loc, err := p.routing.ResolveEndpoint(handlingInfo.serviceNamespace, handlingInfo.serviceName) + loc, err := p.serviceResolver.ResolveEndpoint(handlingInfo.serviceNamespace, handlingInfo.serviceName) if err != nil { return nil, fmt.Errorf("missing route") } diff --git a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy.go b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy.go index 43e99f88db6..fdde1897876 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy.go @@ -54,7 +54,7 @@ type proxyHandler struct { proxyTransport *http.Transport // Endpoints based routing to map from cluster IP to routable IP - routing ServiceResolver + serviceResolver ServiceResolver handlingInfo atomic.Value } @@ -111,7 +111,7 @@ func (r *proxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { // write a new location based on the existing request pointed at the target service location := &url.URL{} location.Scheme = "https" - rloc, err := r.routing.ResolveEndpoint(handlingInfo.serviceNamespace, handlingInfo.serviceName) + rloc, err := r.serviceResolver.ResolveEndpoint(handlingInfo.serviceNamespace, handlingInfo.serviceName) if err != nil { http.Error(w, fmt.Sprintf("missing route (%s)", err.Error()), http.StatusInternalServerError) return diff --git a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy_test.go b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy_test.go index b4258ecdd2d..482c971ed8a 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy_test.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy_test.go @@ -170,9 +170,9 @@ func TestProxyHandler(t *testing.T) { func() { handler := &proxyHandler{ - localDelegate: http.NewServeMux(), - routing: &mockedRouter{destinationHost: targetServer.Listener.Addr().String()}, - proxyTransport: &http.Transport{}, + localDelegate: http.NewServeMux(), + serviceResolver: &mockedRouter{destinationHost: targetServer.Listener.Addr().String()}, + proxyTransport: &http.Transport{}, } handler.contextMapper = &fakeRequestContextMapper{user: tc.user} server := httptest.NewServer(handler) diff --git a/staging/src/k8s.io/kube-aggregator/pkg/cmd/server/BUILD b/staging/src/k8s.io/kube-aggregator/pkg/cmd/server/BUILD index d1f0adb6edb..b495b42d0c1 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/cmd/server/BUILD +++ b/staging/src/k8s.io/kube-aggregator/pkg/cmd/server/BUILD @@ -18,6 +18,7 @@ go_library( "//vendor/k8s.io/apiserver/pkg/server:go_default_library", "//vendor/k8s.io/apiserver/pkg/server/filters:go_default_library", "//vendor/k8s.io/apiserver/pkg/server/options:go_default_library", + "//vendor/k8s.io/client-go/informers:go_default_library", "//vendor/k8s.io/client-go/kubernetes:go_default_library", "//vendor/k8s.io/client-go/rest:go_default_library", "//vendor/k8s.io/client-go/tools/clientcmd:go_default_library", diff --git a/staging/src/k8s.io/kube-aggregator/pkg/cmd/server/start.go b/staging/src/k8s.io/kube-aggregator/pkg/cmd/server/start.go index ab0b153ae5b..c5f39f0a3c9 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/cmd/server/start.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/cmd/server/start.go @@ -20,6 +20,7 @@ import ( "fmt" "io" "io/ioutil" + "time" "github.com/spf13/cobra" "github.com/spf13/pflag" @@ -28,6 +29,7 @@ import ( genericapiserver "k8s.io/apiserver/pkg/server" "k8s.io/apiserver/pkg/server/filters" genericoptions "k8s.io/apiserver/pkg/server/options" + kubeinformers "k8s.io/client-go/informers" kubeclientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" @@ -142,10 +144,13 @@ func (o AggregatorOptions) RunAggregator(stopCh <-chan struct{}) error { if err != nil { return err } + kubeInformers := kubeinformers.NewSharedInformerFactory(coreAPIServerClient, 5*time.Minute) + serviceResolver := apiserver.NewClusterIPServiceResolver(kubeInformers.Core().V1().Services().Lister()) config := apiserver.Config{ - GenericConfig: serverConfig, - CoreAPIServerClient: coreAPIServerClient, + GenericConfig: serverConfig, + CoreKubeInformers: kubeInformers, + ServiceResolver: serviceResolver, } config.ProxyClientCert, err = ioutil.ReadFile(o.ProxyClientCertFile) diff --git a/test/integration/etcd/etcd_storage_path_test.go b/test/integration/etcd/etcd_storage_path_test.go index d6353988665..e3c1efd0c59 100644 --- a/test/integration/etcd/etcd_storage_path_test.go +++ b/test/integration/etcd/etcd_storage_path_test.go @@ -608,7 +608,7 @@ func startRealMasterOrDie(t *testing.T, certDir string) (*allClient, clientv3.KV if err != nil { t.Fatal(err) } - kubeAPIServerConfig, sharedInformers, _, _, err := app.CreateKubeAPIServerConfig(kubeAPIServerOptions, tunneler, proxyTransport) + kubeAPIServerConfig, sharedInformers, _, _, _, err := app.CreateKubeAPIServerConfig(kubeAPIServerOptions, tunneler, proxyTransport) if err != nil { t.Fatal(err) } diff --git a/test/integration/examples/apiserver_test.go b/test/integration/examples/apiserver_test.go index e7ac9ba6aad..26f140cbdcd 100644 --- a/test/integration/examples/apiserver_test.go +++ b/test/integration/examples/apiserver_test.go @@ -116,7 +116,7 @@ func TestAggregatedAPIServer(t *testing.T) { if err != nil { t.Fatal(err) } - kubeAPIServerConfig, sharedInformers, _, _, err := app.CreateKubeAPIServerConfig(kubeAPIServerOptions, tunneler, proxyTransport) + kubeAPIServerConfig, sharedInformers, _, _, _, err := app.CreateKubeAPIServerConfig(kubeAPIServerOptions, tunneler, proxyTransport) if err != nil { t.Fatal(err) }