diff --git a/cmd/kube-apiserver/app/BUILD b/cmd/kube-apiserver/app/BUILD index 63e45a87100..31b6b0f9b9a 100644 --- a/cmd/kube-apiserver/app/BUILD +++ b/cmd/kube-apiserver/app/BUILD @@ -96,6 +96,7 @@ go_library( "//vendor/k8s.io/apiserver/pkg/server/options:go_default_library", "//vendor/k8s.io/apiserver/pkg/server/options/encryptionconfig:go_default_library", "//vendor/k8s.io/apiserver/pkg/server/storage:go_default_library", + "//vendor/k8s.io/client-go/informers:go_default_library", "//vendor/k8s.io/client-go/kubernetes:go_default_library", "//vendor/k8s.io/kube-aggregator/pkg/apis/apiregistration:go_default_library", "//vendor/k8s.io/kube-aggregator/pkg/apis/apiregistration/v1beta1:go_default_library", diff --git a/cmd/kube-apiserver/app/server.go b/cmd/kube-apiserver/app/server.go index 64e1866fb7e..5ee242e2653 100644 --- a/cmd/kube-apiserver/app/server.go +++ b/cmd/kube-apiserver/app/server.go @@ -52,7 +52,11 @@ import ( "k8s.io/apiserver/pkg/server/filters" "k8s.io/apiserver/pkg/server/options/encryptionconfig" serverstorage "k8s.io/apiserver/pkg/server/storage" + aggregatorapiserver "k8s.io/kube-aggregator/pkg/apiserver" + //aggregatorinformers "k8s.io/kube-aggregator/pkg/client/informers/internalversion" + clientgoinformers "k8s.io/client-go/informers" + clientgoclientset "k8s.io/client-go/kubernetes" "k8s.io/kubernetes/cmd/kube-apiserver/app/options" "k8s.io/kubernetes/cmd/kube-apiserver/app/preflight" "k8s.io/kubernetes/pkg/api" @@ -111,7 +115,8 @@ func Run(runOptions *options.ServerRunOptions, stopCh <-chan struct{}) error { if err != nil { return err } - kubeAPIServerConfig, sharedInformers, insecureServingOptions, err := CreateKubeAPIServerConfig(runOptions, nodeTunneler, proxyTransport) + + kubeAPIServerConfig, sharedInformers, insecureServingOptions, serviceResolver, err := CreateKubeAPIServerConfig(runOptions, nodeTunneler, proxyTransport) if err != nil { return err } @@ -154,6 +159,8 @@ func Run(runOptions *options.ServerRunOptions, stopCh <-chan struct{}) error { if err != nil { return err } + aggregatorConfig.ProxyTransport = proxyTransport + aggregatorConfig.ServiceResolver = serviceResolver aggregatorServer, err := createAggregatorServer(aggregatorConfig, kubeAPIServer.GenericAPIServer, sharedInformers, apiExtensionsServer.Informers) if err != nil { // we don't need special handling for innerStopCh because the aggregator server doesn't create any go routines @@ -230,27 +237,27 @@ func CreateNodeDialer(s *options.ServerRunOptions) (tunneler.Tunneler, *http.Tra } // CreateKubeAPIServerConfig creates all the resources for running the API server, but runs none of them -func CreateKubeAPIServerConfig(s *options.ServerRunOptions, nodeTunneler tunneler.Tunneler, proxyTransport http.RoundTripper) (*master.Config, informers.SharedInformerFactory, *kubeserver.InsecureServingInfo, error) { +func CreateKubeAPIServerConfig(s *options.ServerRunOptions, nodeTunneler tunneler.Tunneler, proxyTransport http.RoundTripper) (*master.Config, informers.SharedInformerFactory, *kubeserver.InsecureServingInfo, aggregatorapiserver.ServiceResolver, error) { // register all admission plugins registerAllAdmissionPlugins(s.Admission.Plugins) // set defaults in the options before trying to create the generic config if err := defaultOptions(s); err != nil { - return nil, nil, nil, err + return nil, nil, nil, nil, err } // validate options if errs := s.Validate(); len(errs) != 0 { - return nil, nil, nil, utilerrors.NewAggregate(errs) + return nil, nil, nil, nil, utilerrors.NewAggregate(errs) } - genericConfig, sharedInformers, insecureServingOptions, err := BuildGenericConfig(s) + genericConfig, sharedInformers, insecureServingOptions, serviceResolver, err := BuildGenericConfig(s) if err != nil { - return nil, nil, nil, err + return nil, nil, nil, nil, err } if err := utilwait.PollImmediate(etcdRetryInterval, etcdRetryLimit*etcdRetryInterval, preflight.EtcdConnection{ServerList: s.Etcd.StorageConfig.ServerList}.CheckEtcdServers); err != nil { - return nil, nil, nil, fmt.Errorf("error waiting for etcd connection: %v", err) + return nil, nil, nil, nil, fmt.Errorf("error waiting for etcd connection: %v", err) } capabilities.Initialize(capabilities.Capabilities{ @@ -266,21 +273,21 @@ func CreateKubeAPIServerConfig(s *options.ServerRunOptions, nodeTunneler tunnele serviceIPRange, apiServerServiceIP, err := master.DefaultServiceIPRange(s.ServiceClusterIPRange) if err != nil { - return nil, nil, nil, err + return nil, nil, nil, nil, err } storageFactory, err := BuildStorageFactory(s) if err != nil { - return nil, nil, nil, err + return nil, nil, nil, nil, err } clientCA, err := readCAorNil(s.Authentication.ClientCert.ClientCA) if err != nil { - return nil, nil, nil, err + return nil, nil, nil, nil, err } requestHeaderProxyCA, err := readCAorNil(s.Authentication.RequestHeader.ClientCAFile) if err != nil { - return nil, nil, nil, err + return nil, nil, nil, nil, err } config := &master.Config{ @@ -321,30 +328,30 @@ func CreateKubeAPIServerConfig(s *options.ServerRunOptions, nodeTunneler tunnele config.KubeletClientConfig.Dial = nodeTunneler.Dial } - return config, sharedInformers, insecureServingOptions, nil + return config, sharedInformers, insecureServingOptions, serviceResolver, nil } // BuildGenericConfig takes the master server options and produces the genericapiserver.Config associated with it -func BuildGenericConfig(s *options.ServerRunOptions) (*genericapiserver.Config, informers.SharedInformerFactory, *kubeserver.InsecureServingInfo, error) { +func BuildGenericConfig(s *options.ServerRunOptions) (*genericapiserver.Config, informers.SharedInformerFactory, *kubeserver.InsecureServingInfo, aggregatorapiserver.ServiceResolver, error) { genericConfig := genericapiserver.NewConfig(api.Codecs) if err := s.GenericServerRunOptions.ApplyTo(genericConfig); err != nil { - return nil, nil, nil, err + return nil, nil, nil, nil, err } insecureServingOptions, err := s.InsecureServing.ApplyTo(genericConfig) if err != nil { - return nil, nil, nil, err + return nil, nil, nil, nil, err } if err := s.SecureServing.ApplyTo(genericConfig); err != nil { - return nil, nil, nil, err + return nil, nil, nil, nil, err } if err := s.Authentication.ApplyTo(genericConfig); err != nil { - return nil, nil, nil, err + return nil, nil, nil, nil, err } if err := s.Audit.ApplyTo(genericConfig); err != nil { - return nil, nil, nil, err + return nil, nil, nil, nil, err } if err := s.Features.ApplyTo(genericConfig); err != nil { - return nil, nil, nil, err + return nil, nil, nil, nil, err } genericConfig.OpenAPIConfig = genericapiserver.DefaultOpenAPIConfig(generatedopenapi.GetOpenAPIDefinitions, api.Scheme) @@ -362,10 +369,10 @@ func BuildGenericConfig(s *options.ServerRunOptions) (*genericapiserver.Config, storageFactory, err := BuildStorageFactory(s) if err != nil { - return nil, nil, nil, err + return nil, nil, nil, nil, err } if err := s.Etcd.ApplyWithStorageFactoryTo(storageFactory, genericConfig); err != nil { - return nil, nil, nil, err + return nil, nil, nil, nil, err } // Use protobufs for self-communication. @@ -378,7 +385,7 @@ func BuildGenericConfig(s *options.ServerRunOptions) (*genericapiserver.Config, if err != nil { kubeAPIVersions := os.Getenv("KUBE_API_VERSIONS") if len(kubeAPIVersions) == 0 { - return nil, nil, nil, fmt.Errorf("failed to create clientset: %v", err) + return nil, nil, nil, nil, fmt.Errorf("failed to create clientset: %v", err) } // KUBE_API_VERSIONS is used in test-update-storage-objects.sh, disabling a number of API @@ -389,18 +396,36 @@ func BuildGenericConfig(s *options.ServerRunOptions) (*genericapiserver.Config, } externalClient, err := clientset.NewForConfig(genericConfig.LoopbackClientConfig) if err != nil { - return nil, nil, nil, fmt.Errorf("failed to create external clientset: %v", err) + return nil, nil, nil, nil, fmt.Errorf("failed to create external clientset: %v", err) } sharedInformers := informers.NewSharedInformerFactory(client, 10*time.Minute) + clientgoExternalClient, err := clientgoclientset.NewForConfig(genericConfig.LoopbackClientConfig) + if err != nil { + return nil, nil, nil, nil, fmt.Errorf("failed to create real external clientset: %v", err) + } + aggregatorInformers := clientgoinformers.NewSharedInformerFactory(clientgoExternalClient, 10*time.Minute) + + var serviceResolver aggregatorapiserver.ServiceResolver + if s.EnableAggregatorRouting { + serviceResolver = aggregatorapiserver.NewEndpointServiceResolver( + aggregatorInformers.Core().V1().Services().Lister(), + aggregatorInformers.Core().V1().Endpoints().Lister(), + ) + } else { + serviceResolver = aggregatorapiserver.NewClusterIPServiceResolver( + aggregatorInformers.Core().V1().Services().Lister(), + ) + } + genericConfig.Authenticator, genericConfig.OpenAPIConfig.SecurityDefinitions, err = BuildAuthenticator(s, storageFactory, client, sharedInformers) if err != nil { - return nil, nil, nil, fmt.Errorf("invalid authentication config: %v", err) + return nil, nil, nil, nil, fmt.Errorf("invalid authentication config: %v", err) } genericConfig.Authorizer, err = BuildAuthorizer(s, sharedInformers) if err != nil { - return nil, nil, nil, fmt.Errorf("invalid authorization config: %v", err) + return nil, nil, nil, nil, fmt.Errorf("invalid authorization config: %v", err) } if !sets.NewString(s.Authorization.Modes()...).Has(modes.ModeRBAC) { genericConfig.DisabledPostStartHooks.Insert(rbacrest.PostStartHookName) @@ -412,22 +437,23 @@ func BuildGenericConfig(s *options.ServerRunOptions) (*genericapiserver.Config, externalClient, sharedInformers, genericConfig.Authorizer, + serviceResolver, ) if err != nil { - return nil, nil, nil, fmt.Errorf("failed to create admission plugin initializer: %v", err) + return nil, nil, nil, nil, fmt.Errorf("failed to create admission plugin initializer: %v", err) } err = s.Admission.ApplyTo( genericConfig, pluginInitializer) if err != nil { - return nil, nil, nil, fmt.Errorf("failed to initialize admission: %v", err) + return nil, nil, nil, nil, fmt.Errorf("failed to initialize admission: %v", err) } - return genericConfig, sharedInformers, insecureServingOptions, nil + return genericConfig, sharedInformers, insecureServingOptions, serviceResolver, nil } // BuildAdmissionPluginInitializer constructs the admission plugin initializer -func BuildAdmissionPluginInitializer(s *options.ServerRunOptions, client internalclientset.Interface, externalClient clientset.Interface, sharedInformers informers.SharedInformerFactory, apiAuthorizer authorizer.Authorizer) (admission.PluginInitializer, error) { +func BuildAdmissionPluginInitializer(s *options.ServerRunOptions, client internalclientset.Interface, externalClient clientset.Interface, sharedInformers informers.SharedInformerFactory, apiAuthorizer authorizer.Authorizer, serviceResolver aggregatorapiserver.ServiceResolver) (admission.PluginInitializer, error) { var cloudConfig []byte if s.CloudProvider.CloudConfigFile != "" { @@ -460,6 +486,8 @@ func BuildAdmissionPluginInitializer(s *options.ServerRunOptions, client interna pluginInitializer = pluginInitializer.SetClientCert(certBytes, keyBytes) } + pluginInitializer = pluginInitializer.SetServiceResolver(serviceResolver) + return pluginInitializer, nil } diff --git a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/BUILD b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/BUILD index 4a747197a8b..f3bf2c678ae 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/BUILD +++ b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/BUILD @@ -37,6 +37,7 @@ go_library( "apiservice_controller.go", "handler_apis.go", "handler_proxy.go", + "resolvers.go", ], tags = ["automanaged"], deps = [ diff --git a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go index 611cfec0d82..db7712dac9b 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go @@ -20,7 +20,6 @@ import ( "context" "encoding/json" "net/http" - "net/url" "sync" "time" @@ -34,10 +33,8 @@ import ( genericapirequest "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/apiserver/pkg/registry/rest" genericapiserver "k8s.io/apiserver/pkg/server" - "k8s.io/apiserver/pkg/util/proxy" kubeinformers "k8s.io/client-go/informers" kubeclientset "k8s.io/client-go/kubernetes" - listersv1 "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/pkg/version" "bytes" @@ -89,19 +86,6 @@ func init() { // legacyAPIServiceName is the fixed name of the only non-groupified API version const legacyAPIServiceName = "v1." -type ServiceResolver interface { - ResolveEndpoint(namespace, name string) (*url.URL, error) -} - -type aggregatorEndpointRouting struct { - services listersv1.ServiceLister - endpoints listersv1.EndpointsLister -} - -type aggregatorClusterRouting struct { - services listersv1.ServiceLister -} - type Config struct { GenericConfig *genericapiserver.Config CoreAPIServerClient kubeclientset.Interface @@ -110,10 +94,19 @@ type Config struct { // this to confirm the proxy's identity ProxyClientCert []byte ProxyClientKey []byte - ProxyTransport *http.Transport - // Indicates if the Aggregator should send to the cluster IP (false) or route to the endpoints IP (true) + // If present, the Dial method will be used for dialing out to delegate + // apiservers. + ProxyTransport *http.Transport + + // Indicates if the Aggregator should send to the service's cluster IP + // (false) or route to the one of the service's endpoint's IP (true); + // if ServiceResolver is provided, then this is ignored. EnableAggregatorRouting bool + + // Mechanism by which the Aggregator will resolve services. If nil, + // constructed based on the value of EnableAggregatorRouting. + ServiceResolver ServiceResolver } // APIAggregator contains state for a Kubernetes cluster master/api server. @@ -186,14 +179,6 @@ func (c *Config) SkipComplete() completedConfig { return completedConfig{c} } -func (r *aggregatorEndpointRouting) ResolveEndpoint(namespace, name string) (*url.URL, error) { - return proxy.ResolveEndpoint(r.services, r.endpoints, namespace, name) -} - -func (r *aggregatorClusterRouting) ResolveEndpoint(namespace, name string) (*url.URL, error) { - return proxy.ResolveCluster(r.services, namespace, name) -} - // New returns a new instance of APIAggregator from the given config. func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.DelegationTarget) (*APIAggregator, error) { genericServer, err := c.Config.GenericConfig.SkipComplete().New("kube-aggregator", delegationTarget) // completion is done in Complete, no need for a second time @@ -211,15 +196,15 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg ) kubeInformers := kubeinformers.NewSharedInformerFactory(c.CoreAPIServerClient, 5*time.Minute) - var routing ServiceResolver - if c.EnableAggregatorRouting { - routing = &aggregatorEndpointRouting{ - services: kubeInformers.Core().V1().Services().Lister(), - endpoints: kubeInformers.Core().V1().Endpoints().Lister(), - } - } else { - routing = &aggregatorClusterRouting{ - services: kubeInformers.Core().V1().Services().Lister(), + var routing ServiceResolver = c.ServiceResolver + if routing == nil { + if c.EnableAggregatorRouting { + routing = NewEndpointServiceResolver( + kubeInformers.Core().V1().Services().Lister(), + kubeInformers.Core().V1().Endpoints().Lister(), + ) + } else { + routing = NewClusterIPServiceResolver(kubeInformers.Core().V1().Services().Lister()) } } diff --git a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/resolvers.go b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/resolvers.go new file mode 100644 index 00000000000..adeb1f4ee5b --- /dev/null +++ b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/resolvers.go @@ -0,0 +1,63 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package apiserver + +import ( + "net/url" + + "k8s.io/apiserver/pkg/util/proxy" + listersv1 "k8s.io/client-go/listers/core/v1" +) + +// A ServiceResolver knows how to get a URL given a service. +type ServiceResolver interface { + ResolveEndpoint(namespace, name string) (*url.URL, error) +} + +// NewEndpointServiceResolver returns a ServiceResolver that chooses one of the +// service's endpoints. +func NewEndpointServiceResolver(services listersv1.ServiceLister, endpoints listersv1.EndpointsLister) ServiceResolver { + return &aggregatorEndpointRouting{ + services: services, + endpoints: endpoints, + } +} + +type aggregatorEndpointRouting struct { + services listersv1.ServiceLister + endpoints listersv1.EndpointsLister +} + +func (r *aggregatorEndpointRouting) ResolveEndpoint(namespace, name string) (*url.URL, error) { + return proxy.ResolveEndpoint(r.services, r.endpoints, namespace, name) +} + +// NewEndpointServiceResolver returns a ServiceResolver that directly calls the +// service's cluster IP. +func NewClusterIPServiceResolver(services listersv1.ServiceLister) ServiceResolver { + return &aggregatorClusterRouting{ + services: services, + } +} + +type aggregatorClusterRouting struct { + services listersv1.ServiceLister +} + +func (r *aggregatorClusterRouting) ResolveEndpoint(namespace, name string) (*url.URL, error) { + return proxy.ResolveCluster(r.services, namespace, name) +} diff --git a/test/integration/etcd/etcd_storage_path_test.go b/test/integration/etcd/etcd_storage_path_test.go index e2e56845161..d6353988665 100644 --- a/test/integration/etcd/etcd_storage_path_test.go +++ b/test/integration/etcd/etcd_storage_path_test.go @@ -608,7 +608,7 @@ func startRealMasterOrDie(t *testing.T, certDir string) (*allClient, clientv3.KV if err != nil { t.Fatal(err) } - kubeAPIServerConfig, sharedInformers, _, err := app.CreateKubeAPIServerConfig(kubeAPIServerOptions, tunneler, proxyTransport) + kubeAPIServerConfig, sharedInformers, _, _, err := app.CreateKubeAPIServerConfig(kubeAPIServerOptions, tunneler, proxyTransport) if err != nil { t.Fatal(err) } diff --git a/test/integration/examples/apiserver_test.go b/test/integration/examples/apiserver_test.go index a940efe81f9..e7ac9ba6aad 100644 --- a/test/integration/examples/apiserver_test.go +++ b/test/integration/examples/apiserver_test.go @@ -116,7 +116,7 @@ func TestAggregatedAPIServer(t *testing.T) { if err != nil { t.Fatal(err) } - kubeAPIServerConfig, sharedInformers, _, err := app.CreateKubeAPIServerConfig(kubeAPIServerOptions, tunneler, proxyTransport) + kubeAPIServerConfig, sharedInformers, _, _, err := app.CreateKubeAPIServerConfig(kubeAPIServerOptions, tunneler, proxyTransport) if err != nil { t.Fatal(err) }