mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-22 03:11:40 +00:00
Merge pull request #43383 from deads2k/server-10-safe-proxy
Automatic merge from submit-queue proxy to IP instead of name, but still use host verification I think I found a setting that lets us proxy to an IP and still do hostname verification on the certificate. @liggitt @sttts Can you see if you agree that this knob does what I think it does? Last commit only, still needs tests.
This commit is contained in:
commit
dfbbb115dd
@ -119,6 +119,7 @@ func Run(runOptions *options.ServerRunOptions, stopCh <-chan struct{}) error {
|
||||
}
|
||||
aggregatorServer, err := createAggregatorServer(aggregatorConfig, kubeAPIServer.GenericAPIServer, sharedInformers, stopCh)
|
||||
if err != nil {
|
||||
// we don't need special handling for innerStopCh because the aggregator server doesn't create any go routines
|
||||
return err
|
||||
}
|
||||
return aggregatorServer.GenericAPIServer.PrepareRun().Run(stopCh)
|
||||
|
@ -179,7 +179,7 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg
|
||||
}
|
||||
s.GenericAPIServer.HandlerContainer.Handle("/apis", apisHandler)
|
||||
|
||||
apiserviceRegistrationController := NewAPIServiceRegistrationController(informerFactory.Apiregistration().InternalVersion().APIServices(), s)
|
||||
apiserviceRegistrationController := NewAPIServiceRegistrationController(informerFactory.Apiregistration().InternalVersion().APIServices(), kubeInformers.Core().V1().Services(), s)
|
||||
|
||||
s.GenericAPIServer.AddPostStartHook("start-kube-aggregator-informers", func(context genericapiserver.PostStartHookContext) error {
|
||||
informerFactory.Start(stopCh)
|
||||
@ -196,11 +196,11 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg
|
||||
|
||||
// AddAPIService adds an API service. It is not thread-safe, so only call it on one thread at a time please.
|
||||
// It's a slow moving API, so its ok to run the controller on a single thread
|
||||
func (s *APIAggregator) AddAPIService(apiService *apiregistration.APIService) {
|
||||
func (s *APIAggregator) AddAPIService(apiService *apiregistration.APIService, destinationHost string) {
|
||||
// if the proxyHandler already exists, it needs to be updated. The aggregation bits do not
|
||||
// since they are wired against listers because they require multiple resources to respond
|
||||
if proxyHandler, exists := s.proxyHandlers[apiService.Name]; exists {
|
||||
proxyHandler.updateAPIService(apiService)
|
||||
proxyHandler.updateAPIService(apiService, destinationHost)
|
||||
return
|
||||
}
|
||||
|
||||
@ -217,7 +217,7 @@ func (s *APIAggregator) AddAPIService(apiService *apiregistration.APIService) {
|
||||
proxyClientCert: s.proxyClientCert,
|
||||
proxyClientKey: s.proxyClientKey,
|
||||
}
|
||||
proxyHandler.updateAPIService(apiService)
|
||||
proxyHandler.updateAPIService(apiService, destinationHost)
|
||||
s.proxyHandlers[apiService.Name] = proxyHandler
|
||||
s.GenericAPIServer.HandlerContainer.ServeMux.Handle(proxyPath, proxyHandler)
|
||||
s.GenericAPIServer.HandlerContainer.ServeMux.Handle(proxyPath+"/", proxyHandler)
|
||||
|
@ -23,8 +23,12 @@ import (
|
||||
"github.com/golang/glog"
|
||||
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
v1informers "k8s.io/client-go/informers/core/v1"
|
||||
v1listers "k8s.io/client-go/listers/core/v1"
|
||||
"k8s.io/client-go/pkg/api/v1"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
|
||||
@ -34,14 +38,19 @@ import (
|
||||
)
|
||||
|
||||
type APIHandlerManager interface {
|
||||
AddAPIService(apiServer *apiregistration.APIService)
|
||||
RemoveAPIService(apiServerName string)
|
||||
AddAPIService(apiService *apiregistration.APIService, destinationHost string)
|
||||
RemoveAPIService(apiServiceName string)
|
||||
}
|
||||
|
||||
type APIServiceRegistrationController struct {
|
||||
apiHandlerManager APIHandlerManager
|
||||
|
||||
apiServerLister listers.APIServiceLister
|
||||
apiServiceLister listers.APIServiceLister
|
||||
apiServiceSynced cache.InformerSynced
|
||||
|
||||
// serviceLister is used to get the IP to create the transport for
|
||||
serviceLister v1listers.ServiceLister
|
||||
servicesSynced cache.InformerSynced
|
||||
|
||||
// To allow injection for testing.
|
||||
syncFn func(key string) error
|
||||
@ -49,26 +58,35 @@ type APIServiceRegistrationController struct {
|
||||
queue workqueue.RateLimitingInterface
|
||||
}
|
||||
|
||||
func NewAPIServiceRegistrationController(apiServerInformer informers.APIServiceInformer, apiHandlerManager APIHandlerManager) *APIServiceRegistrationController {
|
||||
func NewAPIServiceRegistrationController(apiServiceInformer informers.APIServiceInformer, serviceInformer v1informers.ServiceInformer, apiHandlerManager APIHandlerManager) *APIServiceRegistrationController {
|
||||
c := &APIServiceRegistrationController{
|
||||
apiHandlerManager: apiHandlerManager,
|
||||
apiServerLister: apiServerInformer.Lister(),
|
||||
apiServiceLister: apiServiceInformer.Lister(),
|
||||
apiServiceSynced: apiServiceInformer.Informer().HasSynced,
|
||||
serviceLister: serviceInformer.Lister(),
|
||||
servicesSynced: serviceInformer.Informer().HasSynced,
|
||||
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "APIServiceRegistrationController"),
|
||||
}
|
||||
|
||||
apiServerInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
apiServiceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: c.addAPIService,
|
||||
UpdateFunc: c.updateAPIService,
|
||||
DeleteFunc: c.deleteAPIService,
|
||||
})
|
||||
|
||||
serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: c.addService,
|
||||
UpdateFunc: c.updateService,
|
||||
DeleteFunc: c.deleteService,
|
||||
})
|
||||
|
||||
c.syncFn = c.sync
|
||||
|
||||
return c
|
||||
}
|
||||
|
||||
func (c *APIServiceRegistrationController) sync(key string) error {
|
||||
apiServer, err := c.apiServerLister.Get(key)
|
||||
apiService, err := c.apiServiceLister.Get(key)
|
||||
if apierrors.IsNotFound(err) {
|
||||
c.apiHandlerManager.RemoveAPIService(key)
|
||||
return nil
|
||||
@ -77,10 +95,32 @@ func (c *APIServiceRegistrationController) sync(key string) error {
|
||||
return err
|
||||
}
|
||||
|
||||
c.apiHandlerManager.AddAPIService(apiServer)
|
||||
c.apiHandlerManager.AddAPIService(apiService, c.getDestinationHost(apiService))
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *APIServiceRegistrationController) getDestinationHost(apiService *apiregistration.APIService) string {
|
||||
if apiService.Spec.Service == nil {
|
||||
return ""
|
||||
}
|
||||
|
||||
destinationHost := apiService.Spec.Service.Name + "." + apiService.Spec.Service.Namespace + ".svc"
|
||||
service, err := c.serviceLister.Services(apiService.Spec.Service.Namespace).Get(apiService.Spec.Service.Name)
|
||||
if err != nil {
|
||||
return destinationHost
|
||||
}
|
||||
switch {
|
||||
// use IP from a clusterIP for these service types
|
||||
case service.Spec.Type == v1.ServiceTypeClusterIP,
|
||||
service.Spec.Type == v1.ServiceTypeNodePort,
|
||||
service.Spec.Type == v1.ServiceTypeLoadBalancer:
|
||||
return service.Spec.ClusterIP
|
||||
}
|
||||
|
||||
// return the normal DNS name by default
|
||||
return destinationHost
|
||||
}
|
||||
|
||||
func (c *APIServiceRegistrationController) Run(stopCh <-chan struct{}) {
|
||||
defer utilruntime.HandleCrash()
|
||||
defer c.queue.ShutDown()
|
||||
@ -88,6 +128,11 @@ func (c *APIServiceRegistrationController) Run(stopCh <-chan struct{}) {
|
||||
|
||||
glog.Infof("Starting APIServiceRegistrationController")
|
||||
|
||||
if !cache.WaitForCacheSync(stopCh, c.apiServiceSynced, c.servicesSynced) {
|
||||
utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync"))
|
||||
return
|
||||
}
|
||||
|
||||
// only start one worker thread since its a slow moving API and the aggregation server adding bits
|
||||
// aren't threadsafe
|
||||
go wait.Until(c.runWorker, time.Second, stopCh)
|
||||
@ -159,3 +204,52 @@ func (c *APIServiceRegistrationController) deleteAPIService(obj interface{}) {
|
||||
glog.V(4).Infof("Deleting %q", castObj.Name)
|
||||
c.enqueue(castObj)
|
||||
}
|
||||
|
||||
// there aren't very many apiservices, just check them all.
|
||||
func (c *APIServiceRegistrationController) getAPIServicesFor(service *v1.Service) []*apiregistration.APIService {
|
||||
var ret []*apiregistration.APIService
|
||||
apiServiceList, _ := c.apiServiceLister.List(labels.Everything())
|
||||
for _, apiService := range apiServiceList {
|
||||
if apiService.Spec.Service == nil {
|
||||
continue
|
||||
}
|
||||
if apiService.Spec.Service.Namespace == service.Namespace && apiService.Spec.Service.Name == service.Name {
|
||||
ret = append(ret, apiService)
|
||||
}
|
||||
}
|
||||
|
||||
return ret
|
||||
}
|
||||
|
||||
// TODO, think of a way to avoid checking on every service manipulation
|
||||
|
||||
func (c *APIServiceRegistrationController) addService(obj interface{}) {
|
||||
for _, apiService := range c.getAPIServicesFor(obj.(*v1.Service)) {
|
||||
c.enqueue(apiService)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *APIServiceRegistrationController) updateService(obj, _ interface{}) {
|
||||
for _, apiService := range c.getAPIServicesFor(obj.(*v1.Service)) {
|
||||
c.enqueue(apiService)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *APIServiceRegistrationController) deleteService(obj interface{}) {
|
||||
castObj, ok := obj.(*v1.Service)
|
||||
if !ok {
|
||||
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
|
||||
if !ok {
|
||||
glog.Errorf("Couldn't get object from tombstone %#v", obj)
|
||||
return
|
||||
}
|
||||
castObj, ok = tombstone.Obj.(*v1.Service)
|
||||
if !ok {
|
||||
glog.Errorf("Tombstone contained object that is not expected %#v", obj)
|
||||
return
|
||||
}
|
||||
}
|
||||
for _, apiService := range c.getAPIServicesFor(castObj) {
|
||||
c.enqueue(apiService)
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,139 @@
|
||||
/*
|
||||
Copyright 2016 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package apiserver
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
v1listers "k8s.io/client-go/listers/core/v1"
|
||||
"k8s.io/client-go/pkg/api/v1"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
|
||||
"k8s.io/kube-aggregator/pkg/apis/apiregistration"
|
||||
)
|
||||
|
||||
func TestGetDestinationHost(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
services []*v1.Service
|
||||
apiService *apiregistration.APIService
|
||||
|
||||
expected string
|
||||
}{
|
||||
{
|
||||
name: "cluster ip",
|
||||
services: []*v1.Service{
|
||||
{
|
||||
ObjectMeta: metav1.ObjectMeta{Namespace: "one", Name: "alfa"},
|
||||
Spec: v1.ServiceSpec{
|
||||
Type: v1.ServiceTypeClusterIP,
|
||||
ClusterIP: "hit",
|
||||
},
|
||||
},
|
||||
},
|
||||
apiService: &apiregistration.APIService{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "v1."},
|
||||
Spec: apiregistration.APIServiceSpec{
|
||||
Service: &apiregistration.ServiceReference{
|
||||
Namespace: "one",
|
||||
Name: "alfa",
|
||||
},
|
||||
},
|
||||
},
|
||||
|
||||
expected: "hit",
|
||||
},
|
||||
{
|
||||
name: "loadbalancer",
|
||||
services: []*v1.Service{
|
||||
{
|
||||
ObjectMeta: metav1.ObjectMeta{Namespace: "one", Name: "alfa"},
|
||||
Spec: v1.ServiceSpec{
|
||||
Type: v1.ServiceTypeLoadBalancer,
|
||||
ClusterIP: "lb",
|
||||
},
|
||||
},
|
||||
},
|
||||
apiService: &apiregistration.APIService{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "v1."},
|
||||
Spec: apiregistration.APIServiceSpec{
|
||||
Service: &apiregistration.ServiceReference{
|
||||
Namespace: "one",
|
||||
Name: "alfa",
|
||||
},
|
||||
},
|
||||
},
|
||||
|
||||
expected: "lb",
|
||||
},
|
||||
{
|
||||
name: "node port",
|
||||
services: []*v1.Service{
|
||||
{
|
||||
ObjectMeta: metav1.ObjectMeta{Namespace: "one", Name: "alfa"},
|
||||
Spec: v1.ServiceSpec{
|
||||
Type: v1.ServiceTypeNodePort,
|
||||
ClusterIP: "np",
|
||||
},
|
||||
},
|
||||
},
|
||||
apiService: &apiregistration.APIService{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "v1."},
|
||||
Spec: apiregistration.APIServiceSpec{
|
||||
Service: &apiregistration.ServiceReference{
|
||||
Namespace: "one",
|
||||
Name: "alfa",
|
||||
},
|
||||
},
|
||||
},
|
||||
|
||||
expected: "np",
|
||||
},
|
||||
{
|
||||
name: "missing service",
|
||||
apiService: &apiregistration.APIService{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "v1."},
|
||||
Spec: apiregistration.APIServiceSpec{
|
||||
Service: &apiregistration.ServiceReference{
|
||||
Namespace: "one",
|
||||
Name: "alfa",
|
||||
},
|
||||
},
|
||||
},
|
||||
|
||||
expected: "alfa.one.svc",
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
serviceCache := cache.NewIndexer(cache.DeletionHandlingMetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
|
||||
serviceLister := v1listers.NewServiceLister(serviceCache)
|
||||
c := &APIServiceRegistrationController{
|
||||
serviceLister: serviceLister,
|
||||
}
|
||||
for i := range test.services {
|
||||
serviceCache.Add(test.services[i])
|
||||
}
|
||||
|
||||
actual := c.getDestinationHost(test.apiService)
|
||||
if actual != test.expected {
|
||||
t.Errorf("%s expected %v, got %v", test.name, test.expected, actual)
|
||||
}
|
||||
|
||||
}
|
||||
}
|
@ -176,20 +176,21 @@ func (r *responder) Error(err error) {
|
||||
|
||||
// these methods provide locked access to fields
|
||||
|
||||
func (r *proxyHandler) updateAPIService(apiService *apiregistrationapi.APIService) {
|
||||
func (r *proxyHandler) updateAPIService(apiService *apiregistrationapi.APIService, destinationHost string) {
|
||||
if apiService.Spec.Service == nil {
|
||||
r.handlingInfo.Store(proxyHandlingInfo{local: true})
|
||||
return
|
||||
}
|
||||
|
||||
newInfo := proxyHandlingInfo{
|
||||
destinationHost: apiService.Spec.Service.Name + "." + apiService.Spec.Service.Namespace + ".svc",
|
||||
destinationHost: destinationHost,
|
||||
restConfig: &restclient.Config{
|
||||
TLSClientConfig: restclient.TLSClientConfig{
|
||||
Insecure: apiService.Spec.InsecureSkipTLSVerify,
|
||||
CertData: r.proxyClientCert,
|
||||
KeyData: r.proxyClientKey,
|
||||
CAData: apiService.Spec.CABundle,
|
||||
Insecure: apiService.Spec.InsecureSkipTLSVerify,
|
||||
ServerName: apiService.Spec.Service.Name + "." + apiService.Spec.Service.Namespace + ".svc",
|
||||
CertData: r.proxyClientCert,
|
||||
KeyData: r.proxyClientKey,
|
||||
CAData: apiService.Spec.CABundle,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
@ -164,7 +164,7 @@ func TestProxyHandler(t *testing.T) {
|
||||
handler.contextMapper = &fakeRequestContextMapper{user: tc.user}
|
||||
handler.removeAPIService()
|
||||
if tc.apiService != nil {
|
||||
handler.updateAPIService(tc.apiService)
|
||||
handler.updateAPIService(tc.apiService, tc.apiService.Spec.Service.Name+"."+tc.apiService.Spec.Service.Namespace+".svc")
|
||||
curr := handler.handlingInfo.Load().(proxyHandlingInfo)
|
||||
curr.destinationHost = targetServer.Listener.Addr().String()
|
||||
handler.handlingInfo.Store(curr)
|
||||
|
@ -25,7 +25,6 @@ import (
|
||||
"os"
|
||||
"path"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
@ -331,13 +330,10 @@ func TestAggregatedAPIServer(t *testing.T) {
|
||||
_, err = aggregatorClient.ApiregistrationV1alpha1().APIServices().Create(&apiregistrationv1alpha1.APIService{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "v1."},
|
||||
Spec: apiregistrationv1alpha1.APIServiceSpec{
|
||||
Service: &apiregistrationv1alpha1.ServiceReference{
|
||||
Namespace: "default",
|
||||
Name: "kubernetes",
|
||||
},
|
||||
// register this as a loca service so it doesn't try to lookup the default kubernetes service
|
||||
// which will have an unroutable IP address since its fake.
|
||||
Group: "",
|
||||
Version: "v1",
|
||||
CABundle: kubeClientConfig.CAData,
|
||||
Priority: 100,
|
||||
},
|
||||
})
|
||||
@ -349,7 +345,7 @@ func TestAggregatedAPIServer(t *testing.T) {
|
||||
// (the service is missing), we don't have an external signal.
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
_, err = aggregatorDiscoveryClient.Discovery().ServerResources()
|
||||
if err != nil && !strings.Contains(err.Error(), "lookup kubernetes.default.svc") {
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
|
3
vendor/BUILD
vendored
3
vendor/BUILD
vendored
@ -15487,6 +15487,7 @@ go_library(
|
||||
go_test(
|
||||
name = "k8s.io/kube-aggregator/pkg/apiserver_test",
|
||||
srcs = [
|
||||
"k8s.io/kube-aggregator/pkg/apiserver/apiservice_controller_test.go",
|
||||
"k8s.io/kube-aggregator/pkg/apiserver/handler_apis_test.go",
|
||||
"k8s.io/kube-aggregator/pkg/apiserver/handler_proxy_test.go",
|
||||
],
|
||||
@ -15537,8 +15538,10 @@ go_library(
|
||||
"//vendor:k8s.io/apiserver/pkg/registry/rest",
|
||||
"//vendor:k8s.io/apiserver/pkg/server",
|
||||
"//vendor:k8s.io/client-go/informers",
|
||||
"//vendor:k8s.io/client-go/informers/core/v1",
|
||||
"//vendor:k8s.io/client-go/kubernetes",
|
||||
"//vendor:k8s.io/client-go/listers/core/v1",
|
||||
"//vendor:k8s.io/client-go/pkg/api/v1",
|
||||
"//vendor:k8s.io/client-go/pkg/version",
|
||||
"//vendor:k8s.io/client-go/rest",
|
||||
"//vendor:k8s.io/client-go/tools/cache",
|
||||
|
Loading…
Reference in New Issue
Block a user