Merge pull request #46680 from cheftako/aggregate

Automatic merge from submit-queue (batch tested with PRs 46681, 46786, 46264, 46680, 46805)

Enable Dialer on the Aggregator

Centralize the creation of the dialer during startup.
Have the dialer then passed in to both APIServer and Aggregator.
Aggregator the uses the dialer as its Transport base.

**What this PR does / why we need it**:Enables the Aggregator to use the Dialer/SSHTunneler to connect to the user-apiserver.

**Which issue this PR fixes** : fixes ##46679

**Special notes for your reviewer**:

**Release note**: None
This commit is contained in:
Kubernetes Submit Queue 2017-06-03 21:16:46 -07:00 committed by GitHub
commit f28fe811ad
8 changed files with 93 additions and 62 deletions

View File

@ -85,11 +85,10 @@ func createAggregatorConfig(kubeAPIServerConfig genericapiserver.Config, command
}
return aggregatorConfig, nil
}
func createAggregatorServer(aggregatorConfig *aggregatorapiserver.Config, delegateAPIServer genericapiserver.DelegationTarget, kubeInformers informers.SharedInformerFactory, apiExtensionInformers apiextensionsinformers.SharedInformerFactory) (*aggregatorapiserver.APIAggregator, error) {
aggregatorServer, err := aggregatorConfig.Complete().NewWithDelegate(delegateAPIServer)
func createAggregatorServer(aggregatorConfig *aggregatorapiserver.Config, delegateAPIServer genericapiserver.DelegationTarget, kubeInformers informers.SharedInformerFactory, apiExtensionInformers apiextensionsinformers.SharedInformerFactory, proxyTransport *http.Transport) (*aggregatorapiserver.APIAggregator, error) {
aggregatorServer, err := aggregatorConfig.Complete().NewWithDelegate(delegateAPIServer, proxyTransport)
if err != nil {
return nil, err
}

View File

@ -102,7 +102,11 @@ cluster's shared state through which all other components interact.`,
// Run runs the specified APIServer. This should never exit.
func Run(runOptions *options.ServerRunOptions, stopCh <-chan struct{}) error {
kubeAPIServerConfig, sharedInformers, insecureServingOptions, err := CreateKubeAPIServerConfig(runOptions)
tunneler, proxyTransport, err := CreateDialer(runOptions)
if err != nil {
return err
}
kubeAPIServerConfig, sharedInformers, insecureServingOptions, err := CreateKubeAPIServerConfig(runOptions, tunneler, proxyTransport)
if err != nil {
return err
}
@ -145,7 +149,7 @@ func Run(runOptions *options.ServerRunOptions, stopCh <-chan struct{}) error {
if err != nil {
return err
}
aggregatorServer, err := createAggregatorServer(aggregatorConfig, kubeAPIServer.GenericAPIServer, sharedInformers, apiExtensionsServer.Informers)
aggregatorServer, err := createAggregatorServer(aggregatorConfig, kubeAPIServer.GenericAPIServer, sharedInformers, apiExtensionsServer.Informers, proxyTransport)
if err != nil {
// we don't need special handling for innerStopCh because the aggregator server doesn't create any go routines
return err
@ -175,8 +179,55 @@ func CreateKubeAPIServer(kubeAPIServerConfig *master.Config, delegateAPIServer g
return kubeAPIServer, nil
}
// CreateDialer creates the dialer infrastructure and makes it available to APIServer and Aggregator
func CreateDialer(s *options.ServerRunOptions) (tunneler.Tunneler, *http.Transport, error) {
// Setup nodeTunneler if needed
var nodeTunneler tunneler.Tunneler
var proxyDialerFn utilnet.DialFunc
if len(s.SSHUser) > 0 {
// Get ssh key distribution func, if supported
var installSSHKey tunneler.InstallSSHKey
cloud, err := cloudprovider.InitCloudProvider(s.CloudProvider.CloudProvider, s.CloudProvider.CloudConfigFile)
if err != nil {
return nil, nil, fmt.Errorf("cloud provider could not be initialized: %v", err)
}
if cloud != nil {
if instances, supported := cloud.Instances(); supported {
installSSHKey = instances.AddSSHKeyToAllInstances
}
}
if s.KubeletConfig.Port == 0 {
return nil, nil, fmt.Errorf("must enable kubelet port if proxy ssh-tunneling is specified")
}
if s.KubeletConfig.ReadOnlyPort == 0 {
return nil, nil, fmt.Errorf("must enable kubelet readonly port if proxy ssh-tunneling is specified")
}
// Set up the nodeTunneler
// TODO(cjcullen): If we want this to handle per-kubelet ports or other
// kubelet listen-addresses, we need to plumb through options.
healthCheckPath := &url.URL{
Scheme: "http",
Host: net.JoinHostPort("127.0.0.1", strconv.FormatUint(uint64(s.KubeletConfig.ReadOnlyPort), 10)),
Path: "healthz",
}
nodeTunneler = tunneler.New(s.SSHUser, s.SSHKeyfile, healthCheckPath, installSSHKey)
// Use the nodeTunneler's dialer to connect to the kubelet
s.KubeletConfig.Dial = nodeTunneler.Dial
// Use the nodeTunneler's dialer when proxying to pods, services, and nodes
proxyDialerFn = nodeTunneler.Dial
}
// Proxying to pods and services is IP-based... don't expect to be able to verify the hostname
proxyTLSClientConfig := &tls.Config{InsecureSkipVerify: true}
proxyTransport := utilnet.SetTransportDefaults(&http.Transport{
Dial: proxyDialerFn,
TLSClientConfig: proxyTLSClientConfig,
})
return nodeTunneler, proxyTransport, nil
}
// CreateKubeAPIServerConfig creates all the resources for running the API server, but runs none of them
func CreateKubeAPIServerConfig(s *options.ServerRunOptions) (*master.Config, informers.SharedInformerFactory, *kubeserver.InsecureServingInfo, error) {
func CreateKubeAPIServerConfig(s *options.ServerRunOptions, nodeTunneler tunneler.Tunneler, proxyTransport http.RoundTripper) (*master.Config, informers.SharedInformerFactory, *kubeserver.InsecureServingInfo, error) {
// register all admission plugins
registerAllAdmissionPlugins(s.Admission.Plugins)
@ -210,49 +261,6 @@ func CreateKubeAPIServerConfig(s *options.ServerRunOptions) (*master.Config, inf
PerConnectionBandwidthLimitBytesPerSec: s.MaxConnectionBytesPerSec,
})
// Setup nodeTunneler if needed
var nodeTunneler tunneler.Tunneler
var proxyDialerFn utilnet.DialFunc
if len(s.SSHUser) > 0 {
// Get ssh key distribution func, if supported
var installSSHKey tunneler.InstallSSHKey
cloud, err := cloudprovider.InitCloudProvider(s.CloudProvider.CloudProvider, s.CloudProvider.CloudConfigFile)
if err != nil {
return nil, nil, nil, fmt.Errorf("cloud provider could not be initialized: %v", err)
}
if cloud != nil {
if instances, supported := cloud.Instances(); supported {
installSSHKey = instances.AddSSHKeyToAllInstances
}
}
if s.KubeletConfig.Port == 0 {
return nil, nil, nil, fmt.Errorf("must enable kubelet port if proxy ssh-tunneling is specified")
}
if s.KubeletConfig.ReadOnlyPort == 0 {
return nil, nil, nil, fmt.Errorf("must enable kubelet readonly port if proxy ssh-tunneling is specified")
}
// Set up the nodeTunneler
// TODO(cjcullen): If we want this to handle per-kubelet ports or other
// kubelet listen-addresses, we need to plumb through options.
healthCheckPath := &url.URL{
Scheme: "http",
Host: net.JoinHostPort("127.0.0.1", strconv.FormatUint(uint64(s.KubeletConfig.ReadOnlyPort), 10)),
Path: "healthz",
}
nodeTunneler = tunneler.New(s.SSHUser, s.SSHKeyfile, healthCheckPath, installSSHKey)
// Use the nodeTunneler's dialer to connect to the kubelet
s.KubeletConfig.Dial = nodeTunneler.Dial
// Use the nodeTunneler's dialer when proxying to pods, services, and nodes
proxyDialerFn = nodeTunneler.Dial
}
// Proxying to pods and services is IP-based... don't expect to be able to verify the hostname
proxyTLSClientConfig := &tls.Config{InsecureSkipVerify: true}
proxyTransport := utilnet.SetTransportDefaults(&http.Transport{
Dial: proxyDialerFn,
TLSClientConfig: proxyTLSClientConfig,
})
serviceIPRange, apiServerServiceIP, err := master.DefaultServiceIPRange(s.ServiceClusterIPRange)
if err != nil {
return nil, nil, nil, err

View File

@ -111,6 +111,7 @@ type APIAggregator struct {
// this to confirm the proxy's identity
proxyClientCert []byte
proxyClientKey []byte
proxyTransport *http.Transport
// proxyHandlers are the proxy handlers that are currently registered, keyed by apiservice.name
proxyHandlers map[string]*proxyHandler
@ -159,7 +160,7 @@ func (r *aggregatorClusterRouting) ResolveEndpoint(namespace, name string) (*url
}
// New returns a new instance of APIAggregator from the given config.
func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.DelegationTarget) (*APIAggregator, error) {
func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.DelegationTarget, proxyTransport *http.Transport) (*APIAggregator, error) {
genericServer, err := c.Config.GenericConfig.SkipComplete().New("kube-aggregator", delegationTarget) // completion is done in Complete, no need for a second time
if err != nil {
return nil, err
@ -193,6 +194,7 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg
contextMapper: c.GenericConfig.RequestContextMapper,
proxyClientCert: c.ProxyClientCert,
proxyClientKey: c.ProxyClientKey,
proxyTransport: proxyTransport,
proxyHandlers: map[string]*proxyHandler{},
handledGroups: sets.String{},
lister: informerFactory.Apiregistration().InternalVersion().APIServices().Lister(),
@ -267,6 +269,7 @@ func (s *APIAggregator) AddAPIService(apiService *apiregistration.APIService) {
localDelegate: s.delegateHandler,
proxyClientCert: s.proxyClientCert,
proxyClientKey: s.proxyClientKey,
proxyTransport: s.proxyTransport,
routing: s.routing,
}
proxyHandler.updateAPIService(apiService)

View File

@ -18,6 +18,8 @@ package apiserver
import (
"context"
"fmt"
"github.com/golang/glog"
"net/http"
"net/url"
"sync/atomic"
@ -33,7 +35,6 @@ import (
utilfeature "k8s.io/apiserver/pkg/util/feature"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/transport"
apiregistrationapi "k8s.io/kube-aggregator/pkg/apis/apiregistration"
)
@ -49,6 +50,7 @@ type proxyHandler struct {
// this to confirm the proxy's identity
proxyClientCert []byte
proxyClientKey []byte
proxyTransport *http.Transport
// Endpoints based routing to map from cluster IP to routable IP
routing ServiceResolver
@ -93,11 +95,6 @@ func (r *proxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
http.Error(w, handlingInfo.transportBuildingError.Error(), http.StatusInternalServerError)
return
}
proxyRoundTripper := handlingInfo.proxyRoundTripper
if proxyRoundTripper == nil {
http.Error(w, "", http.StatusNotFound)
return
}
ctx, ok := r.contextMapper.Get(req)
if !ok {
@ -115,7 +112,7 @@ func (r *proxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
location.Scheme = "https"
rloc, err := r.routing.ResolveEndpoint(handlingInfo.serviceNamespace, handlingInfo.serviceName)
if err != nil {
http.Error(w, "missing route", http.StatusInternalServerError)
http.Error(w, fmt.Sprintf("missing route (%s)", err.Error()), http.StatusInternalServerError)
return
}
location.Host = rloc.Host
@ -127,7 +124,13 @@ func (r *proxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
newReq.Header = utilnet.CloneHeader(req.Header)
newReq.URL = location
var proxyRoundTripper http.RoundTripper
upgrade := false
proxyRoundTripper = handlingInfo.proxyRoundTripper
if proxyRoundTripper == nil {
http.Error(w, "", http.StatusNotFound)
return
}
// we need to wrap the roundtripper in another roundtripper which will apply the front proxy headers
proxyRoundTripper, upgrade, err = maybeWrapForConnectionUpgrades(handlingInfo.restConfig, proxyRoundTripper, req)
if err != nil {
@ -205,5 +208,14 @@ func (r *proxyHandler) updateAPIService(apiService *apiregistrationapi.APIServic
serviceNamespace: apiService.Spec.Service.Namespace,
}
newInfo.proxyRoundTripper, newInfo.transportBuildingError = restclient.TransportFor(newInfo.restConfig)
if newInfo.transportBuildingError == nil && r.proxyTransport.Dial != nil {
switch transport := newInfo.proxyRoundTripper.(type) {
case *http.Transport:
transport.Dial = r.proxyTransport.Dial
default:
newInfo.transportBuildingError = fmt.Errorf("Unable to set dialer for %s as rest transport is of type %T", apiService.Spec.Service.Name, newInfo.proxyRoundTripper)
glog.Warning(newInfo.transportBuildingError.Error())
}
}
r.handlingInfo.Store(newInfo)
}

View File

@ -170,8 +170,9 @@ func TestProxyHandler(t *testing.T) {
func() {
handler := &proxyHandler{
localDelegate: http.NewServeMux(),
routing: &mockedRouter{destinationHost: targetServer.Listener.Addr().String()},
localDelegate: http.NewServeMux(),
routing: &mockedRouter{destinationHost: targetServer.Listener.Addr().String()},
proxyTransport: &http.Transport{},
}
handler.contextMapper = &fakeRequestContextMapper{user: tc.user}
server := httptest.NewServer(handler)

View File

@ -157,7 +157,7 @@ func (o AggregatorOptions) RunAggregator(stopCh <-chan struct{}) error {
return err
}
server, err := config.Complete().NewWithDelegate(genericapiserver.EmptyDelegate)
server, err := config.Complete().NewWithDelegate(genericapiserver.EmptyDelegate, nil)
if err != nil {
return err
}

View File

@ -604,7 +604,11 @@ func startRealMasterOrDie(t *testing.T, certDir string) (*allClient, clientv3.KV
kubeAPIServerOptions.SecureServing.BindPort = kubePort
kubeAPIServerConfig, sharedInformers, _, err := app.CreateKubeAPIServerConfig(kubeAPIServerOptions)
tunneler, proxyTransport, err := app.CreateDialer(kubeAPIServerOptions)
if err != nil {
t.Fatal(err)
}
kubeAPIServerConfig, sharedInformers, _, err := app.CreateKubeAPIServerConfig(kubeAPIServerOptions, tunneler, proxyTransport)
if err != nil {
t.Fatal(err)
}

View File

@ -112,7 +112,11 @@ func TestAggregatedAPIServer(t *testing.T) {
kubeAPIServerOptions.Authentication.ClientCert.ClientCA = clientCACertFile.Name()
kubeAPIServerOptions.Authorization.Mode = "RBAC"
kubeAPIServerConfig, sharedInformers, _, err := app.CreateKubeAPIServerConfig(kubeAPIServerOptions)
tunneler, proxyTransport, err := app.CreateDialer(kubeAPIServerOptions)
if err != nil {
t.Fatal(err)
}
kubeAPIServerConfig, sharedInformers, _, err := app.CreateKubeAPIServerConfig(kubeAPIServerOptions, tunneler, proxyTransport)
if err != nil {
t.Fatal(err)
}