proxy to IP instead of name, but still use host verification

This commit is contained in:
deads2k 2017-03-20 10:58:27 -04:00
parent b705835bae
commit 3414231672
8 changed files with 260 additions and 26 deletions

View File

@ -119,6 +119,7 @@ func Run(runOptions *options.ServerRunOptions, stopCh <-chan struct{}) error {
} }
aggregatorServer, err := createAggregatorServer(aggregatorConfig, kubeAPIServer.GenericAPIServer, sharedInformers, stopCh) aggregatorServer, err := createAggregatorServer(aggregatorConfig, kubeAPIServer.GenericAPIServer, sharedInformers, stopCh)
if err != nil { if err != nil {
// we don't need special handling for innerStopCh because the aggregator server doesn't create any go routines
return err return err
} }
return aggregatorServer.GenericAPIServer.PrepareRun().Run(stopCh) return aggregatorServer.GenericAPIServer.PrepareRun().Run(stopCh)

View File

@ -179,7 +179,7 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg
} }
s.GenericAPIServer.HandlerContainer.Handle("/apis", apisHandler) s.GenericAPIServer.HandlerContainer.Handle("/apis", apisHandler)
apiserviceRegistrationController := NewAPIServiceRegistrationController(informerFactory.Apiregistration().InternalVersion().APIServices(), s) apiserviceRegistrationController := NewAPIServiceRegistrationController(informerFactory.Apiregistration().InternalVersion().APIServices(), kubeInformers.Core().V1().Services(), s)
s.GenericAPIServer.AddPostStartHook("start-kube-aggregator-informers", func(context genericapiserver.PostStartHookContext) error { s.GenericAPIServer.AddPostStartHook("start-kube-aggregator-informers", func(context genericapiserver.PostStartHookContext) error {
informerFactory.Start(stopCh) informerFactory.Start(stopCh)
@ -196,11 +196,11 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg
// AddAPIService adds an API service. It is not thread-safe, so only call it on one thread at a time please. // AddAPIService adds an API service. It is not thread-safe, so only call it on one thread at a time please.
// It's a slow moving API, so its ok to run the controller on a single thread // It's a slow moving API, so its ok to run the controller on a single thread
func (s *APIAggregator) AddAPIService(apiService *apiregistration.APIService) { func (s *APIAggregator) AddAPIService(apiService *apiregistration.APIService, destinationHost string) {
// if the proxyHandler already exists, it needs to be updated. The aggregation bits do not // if the proxyHandler already exists, it needs to be updated. The aggregation bits do not
// since they are wired against listers because they require multiple resources to respond // since they are wired against listers because they require multiple resources to respond
if proxyHandler, exists := s.proxyHandlers[apiService.Name]; exists { if proxyHandler, exists := s.proxyHandlers[apiService.Name]; exists {
proxyHandler.updateAPIService(apiService) proxyHandler.updateAPIService(apiService, destinationHost)
return return
} }
@ -217,7 +217,7 @@ func (s *APIAggregator) AddAPIService(apiService *apiregistration.APIService) {
proxyClientCert: s.proxyClientCert, proxyClientCert: s.proxyClientCert,
proxyClientKey: s.proxyClientKey, proxyClientKey: s.proxyClientKey,
} }
proxyHandler.updateAPIService(apiService) proxyHandler.updateAPIService(apiService, destinationHost)
s.proxyHandlers[apiService.Name] = proxyHandler s.proxyHandlers[apiService.Name] = proxyHandler
s.GenericAPIServer.HandlerContainer.ServeMux.Handle(proxyPath, proxyHandler) s.GenericAPIServer.HandlerContainer.ServeMux.Handle(proxyPath, proxyHandler)
s.GenericAPIServer.HandlerContainer.ServeMux.Handle(proxyPath+"/", proxyHandler) s.GenericAPIServer.HandlerContainer.ServeMux.Handle(proxyPath+"/", proxyHandler)

View File

@ -23,8 +23,12 @@ import (
"github.com/golang/glog" "github.com/golang/glog"
apierrors "k8s.io/apimachinery/pkg/api/errors" apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/labels"
utilruntime "k8s.io/apimachinery/pkg/util/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
v1informers "k8s.io/client-go/informers/core/v1"
v1listers "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/pkg/api/v1"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue" "k8s.io/client-go/util/workqueue"
@ -34,14 +38,19 @@ import (
) )
type APIHandlerManager interface { type APIHandlerManager interface {
AddAPIService(apiServer *apiregistration.APIService) AddAPIService(apiService *apiregistration.APIService, destinationHost string)
RemoveAPIService(apiServerName string) RemoveAPIService(apiServiceName string)
} }
type APIServiceRegistrationController struct { type APIServiceRegistrationController struct {
apiHandlerManager APIHandlerManager apiHandlerManager APIHandlerManager
apiServerLister listers.APIServiceLister apiServiceLister listers.APIServiceLister
apiServiceSynced cache.InformerSynced
// serviceLister is used to get the IP to create the transport for
serviceLister v1listers.ServiceLister
servicesSynced cache.InformerSynced
// To allow injection for testing. // To allow injection for testing.
syncFn func(key string) error syncFn func(key string) error
@ -49,26 +58,35 @@ type APIServiceRegistrationController struct {
queue workqueue.RateLimitingInterface queue workqueue.RateLimitingInterface
} }
func NewAPIServiceRegistrationController(apiServerInformer informers.APIServiceInformer, apiHandlerManager APIHandlerManager) *APIServiceRegistrationController { func NewAPIServiceRegistrationController(apiServiceInformer informers.APIServiceInformer, serviceInformer v1informers.ServiceInformer, apiHandlerManager APIHandlerManager) *APIServiceRegistrationController {
c := &APIServiceRegistrationController{ c := &APIServiceRegistrationController{
apiHandlerManager: apiHandlerManager, apiHandlerManager: apiHandlerManager,
apiServerLister: apiServerInformer.Lister(), apiServiceLister: apiServiceInformer.Lister(),
apiServiceSynced: apiServiceInformer.Informer().HasSynced,
serviceLister: serviceInformer.Lister(),
servicesSynced: serviceInformer.Informer().HasSynced,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "APIServiceRegistrationController"), queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "APIServiceRegistrationController"),
} }
apiServerInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ apiServiceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.addAPIService, AddFunc: c.addAPIService,
UpdateFunc: c.updateAPIService, UpdateFunc: c.updateAPIService,
DeleteFunc: c.deleteAPIService, DeleteFunc: c.deleteAPIService,
}) })
serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.addService,
UpdateFunc: c.updateService,
DeleteFunc: c.deleteService,
})
c.syncFn = c.sync c.syncFn = c.sync
return c return c
} }
func (c *APIServiceRegistrationController) sync(key string) error { func (c *APIServiceRegistrationController) sync(key string) error {
apiServer, err := c.apiServerLister.Get(key) apiService, err := c.apiServiceLister.Get(key)
if apierrors.IsNotFound(err) { if apierrors.IsNotFound(err) {
c.apiHandlerManager.RemoveAPIService(key) c.apiHandlerManager.RemoveAPIService(key)
return nil return nil
@ -77,10 +95,32 @@ func (c *APIServiceRegistrationController) sync(key string) error {
return err return err
} }
c.apiHandlerManager.AddAPIService(apiServer) c.apiHandlerManager.AddAPIService(apiService, c.getDestinationHost(apiService))
return nil return nil
} }
func (c *APIServiceRegistrationController) getDestinationHost(apiService *apiregistration.APIService) string {
if apiService.Spec.Service == nil {
return ""
}
destinationHost := apiService.Spec.Service.Name + "." + apiService.Spec.Service.Namespace + ".svc"
service, err := c.serviceLister.Services(apiService.Spec.Service.Namespace).Get(apiService.Spec.Service.Name)
if err != nil {
return destinationHost
}
switch {
// use IP from a clusterIP for these service types
case service.Spec.Type == v1.ServiceTypeClusterIP,
service.Spec.Type == v1.ServiceTypeNodePort,
service.Spec.Type == v1.ServiceTypeLoadBalancer:
return service.Spec.ClusterIP
}
// return the normal DNS name by default
return destinationHost
}
func (c *APIServiceRegistrationController) Run(stopCh <-chan struct{}) { func (c *APIServiceRegistrationController) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash() defer utilruntime.HandleCrash()
defer c.queue.ShutDown() defer c.queue.ShutDown()
@ -88,6 +128,11 @@ func (c *APIServiceRegistrationController) Run(stopCh <-chan struct{}) {
glog.Infof("Starting APIServiceRegistrationController") glog.Infof("Starting APIServiceRegistrationController")
if !cache.WaitForCacheSync(stopCh, c.apiServiceSynced, c.servicesSynced) {
utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync"))
return
}
// only start one worker thread since its a slow moving API and the aggregation server adding bits // only start one worker thread since its a slow moving API and the aggregation server adding bits
// aren't threadsafe // aren't threadsafe
go wait.Until(c.runWorker, time.Second, stopCh) go wait.Until(c.runWorker, time.Second, stopCh)
@ -159,3 +204,52 @@ func (c *APIServiceRegistrationController) deleteAPIService(obj interface{}) {
glog.V(4).Infof("Deleting %q", castObj.Name) glog.V(4).Infof("Deleting %q", castObj.Name)
c.enqueue(castObj) c.enqueue(castObj)
} }
// there aren't very many apiservices, just check them all.
func (c *APIServiceRegistrationController) getAPIServicesFor(service *v1.Service) []*apiregistration.APIService {
var ret []*apiregistration.APIService
apiServiceList, _ := c.apiServiceLister.List(labels.Everything())
for _, apiService := range apiServiceList {
if apiService.Spec.Service == nil {
continue
}
if apiService.Spec.Service.Namespace == service.Namespace && apiService.Spec.Service.Name == service.Name {
ret = append(ret, apiService)
}
}
return ret
}
// TODO, think of a way to avoid checking on every service manipulation
func (c *APIServiceRegistrationController) addService(obj interface{}) {
for _, apiService := range c.getAPIServicesFor(obj.(*v1.Service)) {
c.enqueue(apiService)
}
}
func (c *APIServiceRegistrationController) updateService(obj, _ interface{}) {
for _, apiService := range c.getAPIServicesFor(obj.(*v1.Service)) {
c.enqueue(apiService)
}
}
func (c *APIServiceRegistrationController) deleteService(obj interface{}) {
castObj, ok := obj.(*v1.Service)
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
glog.Errorf("Couldn't get object from tombstone %#v", obj)
return
}
castObj, ok = tombstone.Obj.(*v1.Service)
if !ok {
glog.Errorf("Tombstone contained object that is not expected %#v", obj)
return
}
}
for _, apiService := range c.getAPIServicesFor(castObj) {
c.enqueue(apiService)
}
}

View File

@ -0,0 +1,139 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package apiserver
import (
"testing"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
v1listers "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/pkg/api/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/kube-aggregator/pkg/apis/apiregistration"
)
func TestGetDestinationHost(t *testing.T) {
tests := []struct {
name string
services []*v1.Service
apiService *apiregistration.APIService
expected string
}{
{
name: "cluster ip",
services: []*v1.Service{
{
ObjectMeta: metav1.ObjectMeta{Namespace: "one", Name: "alfa"},
Spec: v1.ServiceSpec{
Type: v1.ServiceTypeClusterIP,
ClusterIP: "hit",
},
},
},
apiService: &apiregistration.APIService{
ObjectMeta: metav1.ObjectMeta{Name: "v1."},
Spec: apiregistration.APIServiceSpec{
Service: &apiregistration.ServiceReference{
Namespace: "one",
Name: "alfa",
},
},
},
expected: "hit",
},
{
name: "loadbalancer",
services: []*v1.Service{
{
ObjectMeta: metav1.ObjectMeta{Namespace: "one", Name: "alfa"},
Spec: v1.ServiceSpec{
Type: v1.ServiceTypeLoadBalancer,
ClusterIP: "lb",
},
},
},
apiService: &apiregistration.APIService{
ObjectMeta: metav1.ObjectMeta{Name: "v1."},
Spec: apiregistration.APIServiceSpec{
Service: &apiregistration.ServiceReference{
Namespace: "one",
Name: "alfa",
},
},
},
expected: "lb",
},
{
name: "node port",
services: []*v1.Service{
{
ObjectMeta: metav1.ObjectMeta{Namespace: "one", Name: "alfa"},
Spec: v1.ServiceSpec{
Type: v1.ServiceTypeNodePort,
ClusterIP: "np",
},
},
},
apiService: &apiregistration.APIService{
ObjectMeta: metav1.ObjectMeta{Name: "v1."},
Spec: apiregistration.APIServiceSpec{
Service: &apiregistration.ServiceReference{
Namespace: "one",
Name: "alfa",
},
},
},
expected: "np",
},
{
name: "missing service",
apiService: &apiregistration.APIService{
ObjectMeta: metav1.ObjectMeta{Name: "v1."},
Spec: apiregistration.APIServiceSpec{
Service: &apiregistration.ServiceReference{
Namespace: "one",
Name: "alfa",
},
},
},
expected: "alfa.one.svc",
},
}
for _, test := range tests {
serviceCache := cache.NewIndexer(cache.DeletionHandlingMetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
serviceLister := v1listers.NewServiceLister(serviceCache)
c := &APIServiceRegistrationController{
serviceLister: serviceLister,
}
for i := range test.services {
serviceCache.Add(test.services[i])
}
actual := c.getDestinationHost(test.apiService)
if actual != test.expected {
t.Errorf("%s expected %v, got %v", test.name, test.expected, actual)
}
}
}

View File

@ -176,17 +176,18 @@ func (r *responder) Error(err error) {
// these methods provide locked access to fields // these methods provide locked access to fields
func (r *proxyHandler) updateAPIService(apiService *apiregistrationapi.APIService) { func (r *proxyHandler) updateAPIService(apiService *apiregistrationapi.APIService, destinationHost string) {
if apiService.Spec.Service == nil { if apiService.Spec.Service == nil {
r.handlingInfo.Store(proxyHandlingInfo{local: true}) r.handlingInfo.Store(proxyHandlingInfo{local: true})
return return
} }
newInfo := proxyHandlingInfo{ newInfo := proxyHandlingInfo{
destinationHost: apiService.Spec.Service.Name + "." + apiService.Spec.Service.Namespace + ".svc", destinationHost: destinationHost,
restConfig: &restclient.Config{ restConfig: &restclient.Config{
TLSClientConfig: restclient.TLSClientConfig{ TLSClientConfig: restclient.TLSClientConfig{
Insecure: apiService.Spec.InsecureSkipTLSVerify, Insecure: apiService.Spec.InsecureSkipTLSVerify,
ServerName: apiService.Spec.Service.Name + "." + apiService.Spec.Service.Namespace + ".svc",
CertData: r.proxyClientCert, CertData: r.proxyClientCert,
KeyData: r.proxyClientKey, KeyData: r.proxyClientKey,
CAData: apiService.Spec.CABundle, CAData: apiService.Spec.CABundle,

View File

@ -164,7 +164,7 @@ func TestProxyHandler(t *testing.T) {
handler.contextMapper = &fakeRequestContextMapper{user: tc.user} handler.contextMapper = &fakeRequestContextMapper{user: tc.user}
handler.removeAPIService() handler.removeAPIService()
if tc.apiService != nil { if tc.apiService != nil {
handler.updateAPIService(tc.apiService) handler.updateAPIService(tc.apiService, tc.apiService.Spec.Service.Name+"."+tc.apiService.Spec.Service.Namespace+".svc")
curr := handler.handlingInfo.Load().(proxyHandlingInfo) curr := handler.handlingInfo.Load().(proxyHandlingInfo)
curr.destinationHost = targetServer.Listener.Addr().String() curr.destinationHost = targetServer.Listener.Addr().String()
handler.handlingInfo.Store(curr) handler.handlingInfo.Store(curr)

View File

@ -25,7 +25,6 @@ import (
"os" "os"
"path" "path"
"strconv" "strconv"
"strings"
"sync/atomic" "sync/atomic"
"testing" "testing"
"time" "time"
@ -331,13 +330,10 @@ func TestAggregatedAPIServer(t *testing.T) {
_, err = aggregatorClient.ApiregistrationV1alpha1().APIServices().Create(&apiregistrationv1alpha1.APIService{ _, err = aggregatorClient.ApiregistrationV1alpha1().APIServices().Create(&apiregistrationv1alpha1.APIService{
ObjectMeta: metav1.ObjectMeta{Name: "v1."}, ObjectMeta: metav1.ObjectMeta{Name: "v1."},
Spec: apiregistrationv1alpha1.APIServiceSpec{ Spec: apiregistrationv1alpha1.APIServiceSpec{
Service: &apiregistrationv1alpha1.ServiceReference{ // register this as a loca service so it doesn't try to lookup the default kubernetes service
Namespace: "default", // which will have an unroutable IP address since its fake.
Name: "kubernetes",
},
Group: "", Group: "",
Version: "v1", Version: "v1",
CABundle: kubeClientConfig.CAData,
Priority: 100, Priority: 100,
}, },
}) })
@ -349,7 +345,7 @@ func TestAggregatedAPIServer(t *testing.T) {
// (the service is missing), we don't have an external signal. // (the service is missing), we don't have an external signal.
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
_, err = aggregatorDiscoveryClient.Discovery().ServerResources() _, err = aggregatorDiscoveryClient.Discovery().ServerResources()
if err != nil && !strings.Contains(err.Error(), "lookup kubernetes.default.svc") { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

3
vendor/BUILD vendored
View File

@ -15456,6 +15456,7 @@ go_library(
go_test( go_test(
name = "k8s.io/kube-aggregator/pkg/apiserver_test", name = "k8s.io/kube-aggregator/pkg/apiserver_test",
srcs = [ srcs = [
"k8s.io/kube-aggregator/pkg/apiserver/apiservice_controller_test.go",
"k8s.io/kube-aggregator/pkg/apiserver/handler_apis_test.go", "k8s.io/kube-aggregator/pkg/apiserver/handler_apis_test.go",
"k8s.io/kube-aggregator/pkg/apiserver/handler_proxy_test.go", "k8s.io/kube-aggregator/pkg/apiserver/handler_proxy_test.go",
], ],
@ -15506,8 +15507,10 @@ go_library(
"//vendor:k8s.io/apiserver/pkg/registry/rest", "//vendor:k8s.io/apiserver/pkg/registry/rest",
"//vendor:k8s.io/apiserver/pkg/server", "//vendor:k8s.io/apiserver/pkg/server",
"//vendor:k8s.io/client-go/informers", "//vendor:k8s.io/client-go/informers",
"//vendor:k8s.io/client-go/informers/core/v1",
"//vendor:k8s.io/client-go/kubernetes", "//vendor:k8s.io/client-go/kubernetes",
"//vendor:k8s.io/client-go/listers/core/v1", "//vendor:k8s.io/client-go/listers/core/v1",
"//vendor:k8s.io/client-go/pkg/api/v1",
"//vendor:k8s.io/client-go/pkg/version", "//vendor:k8s.io/client-go/pkg/version",
"//vendor:k8s.io/client-go/rest", "//vendor:k8s.io/client-go/rest",
"//vendor:k8s.io/client-go/tools/cache", "//vendor:k8s.io/client-go/tools/cache",