From 3414231672e59b750fe135bdccc076eb3a72498d Mon Sep 17 00:00:00 2001 From: deads2k Date: Mon, 20 Mar 2017 10:58:27 -0400 Subject: [PATCH] proxy to IP instead of name, but still use host verification --- cmd/kube-apiserver/app/server.go | 1 + .../pkg/apiserver/apiserver.go | 8 +- .../pkg/apiserver/apiservice_controller.go | 110 +++++++++++++- .../apiserver/apiservice_controller_test.go | 139 ++++++++++++++++++ .../pkg/apiserver/handler_proxy.go | 13 +- .../pkg/apiserver/handler_proxy_test.go | 2 +- test/integration/examples/apiserver_test.go | 10 +- vendor/BUILD | 3 + 8 files changed, 260 insertions(+), 26 deletions(-) create mode 100644 staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiservice_controller_test.go diff --git a/cmd/kube-apiserver/app/server.go b/cmd/kube-apiserver/app/server.go index 43879a345ec..c255ea2cdd0 100644 --- a/cmd/kube-apiserver/app/server.go +++ b/cmd/kube-apiserver/app/server.go @@ -119,6 +119,7 @@ func Run(runOptions *options.ServerRunOptions, stopCh <-chan struct{}) error { } aggregatorServer, err := createAggregatorServer(aggregatorConfig, kubeAPIServer.GenericAPIServer, sharedInformers, stopCh) if err != nil { + // we don't need special handling for innerStopCh because the aggregator server doesn't create any go routines return err } return aggregatorServer.GenericAPIServer.PrepareRun().Run(stopCh) diff --git a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go index fb0e575a8d0..313c0c55bd4 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go @@ -179,7 +179,7 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg } s.GenericAPIServer.HandlerContainer.Handle("/apis", apisHandler) - apiserviceRegistrationController := NewAPIServiceRegistrationController(informerFactory.Apiregistration().InternalVersion().APIServices(), s) + apiserviceRegistrationController := NewAPIServiceRegistrationController(informerFactory.Apiregistration().InternalVersion().APIServices(), kubeInformers.Core().V1().Services(), s) s.GenericAPIServer.AddPostStartHook("start-kube-aggregator-informers", func(context genericapiserver.PostStartHookContext) error { informerFactory.Start(stopCh) @@ -196,11 +196,11 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg // AddAPIService adds an API service. It is not thread-safe, so only call it on one thread at a time please. // It's a slow moving API, so its ok to run the controller on a single thread -func (s *APIAggregator) AddAPIService(apiService *apiregistration.APIService) { +func (s *APIAggregator) AddAPIService(apiService *apiregistration.APIService, destinationHost string) { // if the proxyHandler already exists, it needs to be updated. The aggregation bits do not // since they are wired against listers because they require multiple resources to respond if proxyHandler, exists := s.proxyHandlers[apiService.Name]; exists { - proxyHandler.updateAPIService(apiService) + proxyHandler.updateAPIService(apiService, destinationHost) return } @@ -217,7 +217,7 @@ func (s *APIAggregator) AddAPIService(apiService *apiregistration.APIService) { proxyClientCert: s.proxyClientCert, proxyClientKey: s.proxyClientKey, } - proxyHandler.updateAPIService(apiService) + proxyHandler.updateAPIService(apiService, destinationHost) s.proxyHandlers[apiService.Name] = proxyHandler s.GenericAPIServer.HandlerContainer.ServeMux.Handle(proxyPath, proxyHandler) s.GenericAPIServer.HandlerContainer.ServeMux.Handle(proxyPath+"/", proxyHandler) diff --git a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiservice_controller.go b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiservice_controller.go index 5cc31b93546..b826a01178f 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiservice_controller.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiservice_controller.go @@ -23,8 +23,12 @@ import ( "github.com/golang/glog" apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/labels" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" + v1informers "k8s.io/client-go/informers/core/v1" + v1listers "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/pkg/api/v1" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" @@ -34,14 +38,19 @@ import ( ) type APIHandlerManager interface { - AddAPIService(apiServer *apiregistration.APIService) - RemoveAPIService(apiServerName string) + AddAPIService(apiService *apiregistration.APIService, destinationHost string) + RemoveAPIService(apiServiceName string) } type APIServiceRegistrationController struct { apiHandlerManager APIHandlerManager - apiServerLister listers.APIServiceLister + apiServiceLister listers.APIServiceLister + apiServiceSynced cache.InformerSynced + + // serviceLister is used to get the IP to create the transport for + serviceLister v1listers.ServiceLister + servicesSynced cache.InformerSynced // To allow injection for testing. syncFn func(key string) error @@ -49,26 +58,35 @@ type APIServiceRegistrationController struct { queue workqueue.RateLimitingInterface } -func NewAPIServiceRegistrationController(apiServerInformer informers.APIServiceInformer, apiHandlerManager APIHandlerManager) *APIServiceRegistrationController { +func NewAPIServiceRegistrationController(apiServiceInformer informers.APIServiceInformer, serviceInformer v1informers.ServiceInformer, apiHandlerManager APIHandlerManager) *APIServiceRegistrationController { c := &APIServiceRegistrationController{ apiHandlerManager: apiHandlerManager, - apiServerLister: apiServerInformer.Lister(), + apiServiceLister: apiServiceInformer.Lister(), + apiServiceSynced: apiServiceInformer.Informer().HasSynced, + serviceLister: serviceInformer.Lister(), + servicesSynced: serviceInformer.Informer().HasSynced, queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "APIServiceRegistrationController"), } - apiServerInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + apiServiceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: c.addAPIService, UpdateFunc: c.updateAPIService, DeleteFunc: c.deleteAPIService, }) + serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: c.addService, + UpdateFunc: c.updateService, + DeleteFunc: c.deleteService, + }) + c.syncFn = c.sync return c } func (c *APIServiceRegistrationController) sync(key string) error { - apiServer, err := c.apiServerLister.Get(key) + apiService, err := c.apiServiceLister.Get(key) if apierrors.IsNotFound(err) { c.apiHandlerManager.RemoveAPIService(key) return nil @@ -77,10 +95,32 @@ func (c *APIServiceRegistrationController) sync(key string) error { return err } - c.apiHandlerManager.AddAPIService(apiServer) + c.apiHandlerManager.AddAPIService(apiService, c.getDestinationHost(apiService)) return nil } +func (c *APIServiceRegistrationController) getDestinationHost(apiService *apiregistration.APIService) string { + if apiService.Spec.Service == nil { + return "" + } + + destinationHost := apiService.Spec.Service.Name + "." + apiService.Spec.Service.Namespace + ".svc" + service, err := c.serviceLister.Services(apiService.Spec.Service.Namespace).Get(apiService.Spec.Service.Name) + if err != nil { + return destinationHost + } + switch { + // use IP from a clusterIP for these service types + case service.Spec.Type == v1.ServiceTypeClusterIP, + service.Spec.Type == v1.ServiceTypeNodePort, + service.Spec.Type == v1.ServiceTypeLoadBalancer: + return service.Spec.ClusterIP + } + + // return the normal DNS name by default + return destinationHost +} + func (c *APIServiceRegistrationController) Run(stopCh <-chan struct{}) { defer utilruntime.HandleCrash() defer c.queue.ShutDown() @@ -88,6 +128,11 @@ func (c *APIServiceRegistrationController) Run(stopCh <-chan struct{}) { glog.Infof("Starting APIServiceRegistrationController") + if !cache.WaitForCacheSync(stopCh, c.apiServiceSynced, c.servicesSynced) { + utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync")) + return + } + // only start one worker thread since its a slow moving API and the aggregation server adding bits // aren't threadsafe go wait.Until(c.runWorker, time.Second, stopCh) @@ -159,3 +204,52 @@ func (c *APIServiceRegistrationController) deleteAPIService(obj interface{}) { glog.V(4).Infof("Deleting %q", castObj.Name) c.enqueue(castObj) } + +// there aren't very many apiservices, just check them all. +func (c *APIServiceRegistrationController) getAPIServicesFor(service *v1.Service) []*apiregistration.APIService { + var ret []*apiregistration.APIService + apiServiceList, _ := c.apiServiceLister.List(labels.Everything()) + for _, apiService := range apiServiceList { + if apiService.Spec.Service == nil { + continue + } + if apiService.Spec.Service.Namespace == service.Namespace && apiService.Spec.Service.Name == service.Name { + ret = append(ret, apiService) + } + } + + return ret +} + +// TODO, think of a way to avoid checking on every service manipulation + +func (c *APIServiceRegistrationController) addService(obj interface{}) { + for _, apiService := range c.getAPIServicesFor(obj.(*v1.Service)) { + c.enqueue(apiService) + } +} + +func (c *APIServiceRegistrationController) updateService(obj, _ interface{}) { + for _, apiService := range c.getAPIServicesFor(obj.(*v1.Service)) { + c.enqueue(apiService) + } +} + +func (c *APIServiceRegistrationController) deleteService(obj interface{}) { + castObj, ok := obj.(*v1.Service) + if !ok { + tombstone, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + glog.Errorf("Couldn't get object from tombstone %#v", obj) + return + } + castObj, ok = tombstone.Obj.(*v1.Service) + if !ok { + glog.Errorf("Tombstone contained object that is not expected %#v", obj) + return + } + } + for _, apiService := range c.getAPIServicesFor(castObj) { + c.enqueue(apiService) + } +} diff --git a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiservice_controller_test.go b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiservice_controller_test.go new file mode 100644 index 00000000000..0084802bbd7 --- /dev/null +++ b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiservice_controller_test.go @@ -0,0 +1,139 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package apiserver + +import ( + "testing" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + v1listers "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/pkg/api/v1" + "k8s.io/client-go/tools/cache" + + "k8s.io/kube-aggregator/pkg/apis/apiregistration" +) + +func TestGetDestinationHost(t *testing.T) { + tests := []struct { + name string + services []*v1.Service + apiService *apiregistration.APIService + + expected string + }{ + { + name: "cluster ip", + services: []*v1.Service{ + { + ObjectMeta: metav1.ObjectMeta{Namespace: "one", Name: "alfa"}, + Spec: v1.ServiceSpec{ + Type: v1.ServiceTypeClusterIP, + ClusterIP: "hit", + }, + }, + }, + apiService: &apiregistration.APIService{ + ObjectMeta: metav1.ObjectMeta{Name: "v1."}, + Spec: apiregistration.APIServiceSpec{ + Service: &apiregistration.ServiceReference{ + Namespace: "one", + Name: "alfa", + }, + }, + }, + + expected: "hit", + }, + { + name: "loadbalancer", + services: []*v1.Service{ + { + ObjectMeta: metav1.ObjectMeta{Namespace: "one", Name: "alfa"}, + Spec: v1.ServiceSpec{ + Type: v1.ServiceTypeLoadBalancer, + ClusterIP: "lb", + }, + }, + }, + apiService: &apiregistration.APIService{ + ObjectMeta: metav1.ObjectMeta{Name: "v1."}, + Spec: apiregistration.APIServiceSpec{ + Service: &apiregistration.ServiceReference{ + Namespace: "one", + Name: "alfa", + }, + }, + }, + + expected: "lb", + }, + { + name: "node port", + services: []*v1.Service{ + { + ObjectMeta: metav1.ObjectMeta{Namespace: "one", Name: "alfa"}, + Spec: v1.ServiceSpec{ + Type: v1.ServiceTypeNodePort, + ClusterIP: "np", + }, + }, + }, + apiService: &apiregistration.APIService{ + ObjectMeta: metav1.ObjectMeta{Name: "v1."}, + Spec: apiregistration.APIServiceSpec{ + Service: &apiregistration.ServiceReference{ + Namespace: "one", + Name: "alfa", + }, + }, + }, + + expected: "np", + }, + { + name: "missing service", + apiService: &apiregistration.APIService{ + ObjectMeta: metav1.ObjectMeta{Name: "v1."}, + Spec: apiregistration.APIServiceSpec{ + Service: &apiregistration.ServiceReference{ + Namespace: "one", + Name: "alfa", + }, + }, + }, + + expected: "alfa.one.svc", + }, + } + + for _, test := range tests { + serviceCache := cache.NewIndexer(cache.DeletionHandlingMetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) + serviceLister := v1listers.NewServiceLister(serviceCache) + c := &APIServiceRegistrationController{ + serviceLister: serviceLister, + } + for i := range test.services { + serviceCache.Add(test.services[i]) + } + + actual := c.getDestinationHost(test.apiService) + if actual != test.expected { + t.Errorf("%s expected %v, got %v", test.name, test.expected, actual) + } + + } +} diff --git a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy.go b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy.go index 9622a31d53f..d3fdc92a505 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy.go @@ -176,20 +176,21 @@ func (r *responder) Error(err error) { // these methods provide locked access to fields -func (r *proxyHandler) updateAPIService(apiService *apiregistrationapi.APIService) { +func (r *proxyHandler) updateAPIService(apiService *apiregistrationapi.APIService, destinationHost string) { if apiService.Spec.Service == nil { r.handlingInfo.Store(proxyHandlingInfo{local: true}) return } newInfo := proxyHandlingInfo{ - destinationHost: apiService.Spec.Service.Name + "." + apiService.Spec.Service.Namespace + ".svc", + destinationHost: destinationHost, restConfig: &restclient.Config{ TLSClientConfig: restclient.TLSClientConfig{ - Insecure: apiService.Spec.InsecureSkipTLSVerify, - CertData: r.proxyClientCert, - KeyData: r.proxyClientKey, - CAData: apiService.Spec.CABundle, + Insecure: apiService.Spec.InsecureSkipTLSVerify, + ServerName: apiService.Spec.Service.Name + "." + apiService.Spec.Service.Namespace + ".svc", + CertData: r.proxyClientCert, + KeyData: r.proxyClientKey, + CAData: apiService.Spec.CABundle, }, }, } diff --git a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy_test.go b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy_test.go index ca1e547af74..e819246d53d 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy_test.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy_test.go @@ -164,7 +164,7 @@ func TestProxyHandler(t *testing.T) { handler.contextMapper = &fakeRequestContextMapper{user: tc.user} handler.removeAPIService() if tc.apiService != nil { - handler.updateAPIService(tc.apiService) + handler.updateAPIService(tc.apiService, tc.apiService.Spec.Service.Name+"."+tc.apiService.Spec.Service.Namespace+".svc") curr := handler.handlingInfo.Load().(proxyHandlingInfo) curr.destinationHost = targetServer.Listener.Addr().String() handler.handlingInfo.Store(curr) diff --git a/test/integration/examples/apiserver_test.go b/test/integration/examples/apiserver_test.go index ad6dea52a18..356c469d7d9 100644 --- a/test/integration/examples/apiserver_test.go +++ b/test/integration/examples/apiserver_test.go @@ -25,7 +25,6 @@ import ( "os" "path" "strconv" - "strings" "sync/atomic" "testing" "time" @@ -331,13 +330,10 @@ func TestAggregatedAPIServer(t *testing.T) { _, err = aggregatorClient.ApiregistrationV1alpha1().APIServices().Create(&apiregistrationv1alpha1.APIService{ ObjectMeta: metav1.ObjectMeta{Name: "v1."}, Spec: apiregistrationv1alpha1.APIServiceSpec{ - Service: &apiregistrationv1alpha1.ServiceReference{ - Namespace: "default", - Name: "kubernetes", - }, + // register this as a loca service so it doesn't try to lookup the default kubernetes service + // which will have an unroutable IP address since its fake. Group: "", Version: "v1", - CABundle: kubeClientConfig.CAData, Priority: 100, }, }) @@ -349,7 +345,7 @@ func TestAggregatedAPIServer(t *testing.T) { // (the service is missing), we don't have an external signal. time.Sleep(100 * time.Millisecond) _, err = aggregatorDiscoveryClient.Discovery().ServerResources() - if err != nil && !strings.Contains(err.Error(), "lookup kubernetes.default.svc") { + if err != nil { t.Fatal(err) } diff --git a/vendor/BUILD b/vendor/BUILD index 5e100761b1e..d3fd104bf48 100644 --- a/vendor/BUILD +++ b/vendor/BUILD @@ -15456,6 +15456,7 @@ go_library( go_test( name = "k8s.io/kube-aggregator/pkg/apiserver_test", srcs = [ + "k8s.io/kube-aggregator/pkg/apiserver/apiservice_controller_test.go", "k8s.io/kube-aggregator/pkg/apiserver/handler_apis_test.go", "k8s.io/kube-aggregator/pkg/apiserver/handler_proxy_test.go", ], @@ -15506,8 +15507,10 @@ go_library( "//vendor:k8s.io/apiserver/pkg/registry/rest", "//vendor:k8s.io/apiserver/pkg/server", "//vendor:k8s.io/client-go/informers", + "//vendor:k8s.io/client-go/informers/core/v1", "//vendor:k8s.io/client-go/kubernetes", "//vendor:k8s.io/client-go/listers/core/v1", + "//vendor:k8s.io/client-go/pkg/api/v1", "//vendor:k8s.io/client-go/pkg/version", "//vendor:k8s.io/client-go/rest", "//vendor:k8s.io/client-go/tools/cache",