From 5b3f4684edc9653ee2930855b9486f495fd3c0b6 Mon Sep 17 00:00:00 2001 From: Walter Fender Date: Sun, 28 May 2017 20:28:41 -0700 Subject: [PATCH] Enable Dialer on the Aggregator Centralize the creation of the dialer during startup. Have the dialer then passed in to both APIServer and Aggregator. Aggregator the sets the dialer on its Transport base. This should allow the SSTunnel to be used but also allow the Aggregation Auth to work with it. Depending on Environment InsecureSkipTLSVerify *may* need to be set to true. Fixed as few tests to call CreateDialer as part of start-up. --- cmd/kube-apiserver/app/aggregator.go | 5 +- cmd/kube-apiserver/app/server.go | 100 ++++++++++-------- .../pkg/apiserver/apiserver.go | 5 +- .../pkg/apiserver/handler_proxy.go | 26 +++-- .../pkg/apiserver/handler_proxy_test.go | 5 +- .../kube-aggregator/pkg/cmd/server/start.go | 2 +- .../etcd/etcd_storage_path_test.go | 6 +- test/integration/examples/apiserver_test.go | 6 +- 8 files changed, 93 insertions(+), 62 deletions(-) diff --git a/cmd/kube-apiserver/app/aggregator.go b/cmd/kube-apiserver/app/aggregator.go index 3aa6279b93d..dfaeb5b2f3a 100644 --- a/cmd/kube-apiserver/app/aggregator.go +++ b/cmd/kube-apiserver/app/aggregator.go @@ -89,11 +89,10 @@ func createAggregatorConfig(kubeAPIServerConfig genericapiserver.Config, command } return aggregatorConfig, nil - } -func createAggregatorServer(aggregatorConfig *aggregatorapiserver.Config, delegateAPIServer genericapiserver.DelegationTarget, kubeInformers informers.SharedInformerFactory, apiExtensionInformers apiextensionsinformers.SharedInformerFactory) (*aggregatorapiserver.APIAggregator, error) { - aggregatorServer, err := aggregatorConfig.Complete().NewWithDelegate(delegateAPIServer) +func createAggregatorServer(aggregatorConfig *aggregatorapiserver.Config, delegateAPIServer genericapiserver.DelegationTarget, kubeInformers informers.SharedInformerFactory, apiExtensionInformers apiextensionsinformers.SharedInformerFactory, proxyTransport *http.Transport) (*aggregatorapiserver.APIAggregator, error) { + aggregatorServer, err := aggregatorConfig.Complete().NewWithDelegate(delegateAPIServer, proxyTransport) if err != nil { return nil, err } diff --git a/cmd/kube-apiserver/app/server.go b/cmd/kube-apiserver/app/server.go index 2e92c5fe14a..ff5f791fa4e 100644 --- a/cmd/kube-apiserver/app/server.go +++ b/cmd/kube-apiserver/app/server.go @@ -101,7 +101,11 @@ cluster's shared state through which all other components interact.`, // Run runs the specified APIServer. This should never exit. func Run(runOptions *options.ServerRunOptions, stopCh <-chan struct{}) error { - kubeAPIServerConfig, sharedInformers, insecureServingOptions, err := CreateKubeAPIServerConfig(runOptions) + tunneler, proxyTransport, err := CreateDialer(runOptions) + if err != nil { + return err + } + kubeAPIServerConfig, sharedInformers, insecureServingOptions, err := CreateKubeAPIServerConfig(runOptions, tunneler, proxyTransport) if err != nil { return err } @@ -144,7 +148,7 @@ func Run(runOptions *options.ServerRunOptions, stopCh <-chan struct{}) error { if err != nil { return err } - aggregatorServer, err := createAggregatorServer(aggregatorConfig, kubeAPIServer.GenericAPIServer, sharedInformers, apiExtensionsServer.Informers) + aggregatorServer, err := createAggregatorServer(aggregatorConfig, kubeAPIServer.GenericAPIServer, sharedInformers, apiExtensionsServer.Informers, proxyTransport) if err != nil { // we don't need special handling for innerStopCh because the aggregator server doesn't create any go routines return err @@ -174,8 +178,55 @@ func CreateKubeAPIServer(kubeAPIServerConfig *master.Config, delegateAPIServer g return kubeAPIServer, nil } +// CreateDialer creates the dialer infrastructure and makes it available to APIServer and Aggregator +func CreateDialer(s *options.ServerRunOptions) (tunneler.Tunneler, *http.Transport, error) { + // Setup nodeTunneler if needed + var nodeTunneler tunneler.Tunneler + var proxyDialerFn utilnet.DialFunc + if len(s.SSHUser) > 0 { + // Get ssh key distribution func, if supported + var installSSHKey tunneler.InstallSSHKey + cloud, err := cloudprovider.InitCloudProvider(s.CloudProvider.CloudProvider, s.CloudProvider.CloudConfigFile) + if err != nil { + return nil, nil, fmt.Errorf("cloud provider could not be initialized: %v", err) + } + if cloud != nil { + if instances, supported := cloud.Instances(); supported { + installSSHKey = instances.AddSSHKeyToAllInstances + } + } + if s.KubeletConfig.Port == 0 { + return nil, nil, fmt.Errorf("must enable kubelet port if proxy ssh-tunneling is specified") + } + if s.KubeletConfig.ReadOnlyPort == 0 { + return nil, nil, fmt.Errorf("must enable kubelet readonly port if proxy ssh-tunneling is specified") + } + // Set up the nodeTunneler + // TODO(cjcullen): If we want this to handle per-kubelet ports or other + // kubelet listen-addresses, we need to plumb through options. + healthCheckPath := &url.URL{ + Scheme: "http", + Host: net.JoinHostPort("127.0.0.1", strconv.FormatUint(uint64(s.KubeletConfig.ReadOnlyPort), 10)), + Path: "healthz", + } + nodeTunneler = tunneler.New(s.SSHUser, s.SSHKeyfile, healthCheckPath, installSSHKey) + + // Use the nodeTunneler's dialer to connect to the kubelet + s.KubeletConfig.Dial = nodeTunneler.Dial + // Use the nodeTunneler's dialer when proxying to pods, services, and nodes + proxyDialerFn = nodeTunneler.Dial + } + // Proxying to pods and services is IP-based... don't expect to be able to verify the hostname + proxyTLSClientConfig := &tls.Config{InsecureSkipVerify: true} + proxyTransport := utilnet.SetTransportDefaults(&http.Transport{ + Dial: proxyDialerFn, + TLSClientConfig: proxyTLSClientConfig, + }) + return nodeTunneler, proxyTransport, nil +} + // CreateKubeAPIServerConfig creates all the resources for running the API server, but runs none of them -func CreateKubeAPIServerConfig(s *options.ServerRunOptions) (*master.Config, informers.SharedInformerFactory, *kubeserver.InsecureServingInfo, error) { +func CreateKubeAPIServerConfig(s *options.ServerRunOptions, nodeTunneler tunneler.Tunneler, proxyTransport http.RoundTripper) (*master.Config, informers.SharedInformerFactory, *kubeserver.InsecureServingInfo, error) { // register all admission plugins registerAllAdmissionPlugins(s.Admission.Plugins) @@ -209,49 +260,6 @@ func CreateKubeAPIServerConfig(s *options.ServerRunOptions) (*master.Config, inf PerConnectionBandwidthLimitBytesPerSec: s.MaxConnectionBytesPerSec, }) - // Setup nodeTunneler if needed - var nodeTunneler tunneler.Tunneler - var proxyDialerFn utilnet.DialFunc - if len(s.SSHUser) > 0 { - // Get ssh key distribution func, if supported - var installSSHKey tunneler.InstallSSHKey - cloud, err := cloudprovider.InitCloudProvider(s.CloudProvider.CloudProvider, s.CloudProvider.CloudConfigFile) - if err != nil { - return nil, nil, nil, fmt.Errorf("cloud provider could not be initialized: %v", err) - } - if cloud != nil { - if instances, supported := cloud.Instances(); supported { - installSSHKey = instances.AddSSHKeyToAllInstances - } - } - if s.KubeletConfig.Port == 0 { - return nil, nil, nil, fmt.Errorf("must enable kubelet port if proxy ssh-tunneling is specified") - } - if s.KubeletConfig.ReadOnlyPort == 0 { - return nil, nil, nil, fmt.Errorf("must enable kubelet readonly port if proxy ssh-tunneling is specified") - } - // Set up the nodeTunneler - // TODO(cjcullen): If we want this to handle per-kubelet ports or other - // kubelet listen-addresses, we need to plumb through options. - healthCheckPath := &url.URL{ - Scheme: "http", - Host: net.JoinHostPort("127.0.0.1", strconv.FormatUint(uint64(s.KubeletConfig.ReadOnlyPort), 10)), - Path: "healthz", - } - nodeTunneler = tunneler.New(s.SSHUser, s.SSHKeyfile, healthCheckPath, installSSHKey) - - // Use the nodeTunneler's dialer to connect to the kubelet - s.KubeletConfig.Dial = nodeTunneler.Dial - // Use the nodeTunneler's dialer when proxying to pods, services, and nodes - proxyDialerFn = nodeTunneler.Dial - } - // Proxying to pods and services is IP-based... don't expect to be able to verify the hostname - proxyTLSClientConfig := &tls.Config{InsecureSkipVerify: true} - proxyTransport := utilnet.SetTransportDefaults(&http.Transport{ - Dial: proxyDialerFn, - TLSClientConfig: proxyTLSClientConfig, - }) - serviceIPRange, apiServerServiceIP, err := master.DefaultServiceIPRange(s.ServiceClusterIPRange) if err != nil { return nil, nil, nil, err diff --git a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go index a502582168d..87435586661 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go @@ -111,6 +111,7 @@ type APIAggregator struct { // this to confirm the proxy's identity proxyClientCert []byte proxyClientKey []byte + proxyTransport *http.Transport // proxyHandlers are the proxy handlers that are currently registered, keyed by apiservice.name proxyHandlers map[string]*proxyHandler @@ -159,7 +160,7 @@ func (r *aggregatorClusterRouting) ResolveEndpoint(namespace, name string) (*url } // New returns a new instance of APIAggregator from the given config. -func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.DelegationTarget) (*APIAggregator, error) { +func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.DelegationTarget, proxyTransport *http.Transport) (*APIAggregator, error) { genericServer, err := c.Config.GenericConfig.SkipComplete().New("kube-aggregator", delegationTarget) // completion is done in Complete, no need for a second time if err != nil { return nil, err @@ -193,6 +194,7 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg contextMapper: c.GenericConfig.RequestContextMapper, proxyClientCert: c.ProxyClientCert, proxyClientKey: c.ProxyClientKey, + proxyTransport: proxyTransport, proxyHandlers: map[string]*proxyHandler{}, handledGroups: sets.String{}, lister: informerFactory.Apiregistration().InternalVersion().APIServices().Lister(), @@ -267,6 +269,7 @@ func (s *APIAggregator) AddAPIService(apiService *apiregistration.APIService) { localDelegate: s.delegateHandler, proxyClientCert: s.proxyClientCert, proxyClientKey: s.proxyClientKey, + proxyTransport: s.proxyTransport, routing: s.routing, } proxyHandler.updateAPIService(apiService) diff --git a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy.go b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy.go index 397b0ab528f..eb5db565244 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy.go @@ -18,6 +18,8 @@ package apiserver import ( "context" + "fmt" + "github.com/golang/glog" "net/http" "net/url" "sync/atomic" @@ -33,7 +35,6 @@ import ( utilfeature "k8s.io/apiserver/pkg/util/feature" restclient "k8s.io/client-go/rest" "k8s.io/client-go/transport" - apiregistrationapi "k8s.io/kube-aggregator/pkg/apis/apiregistration" ) @@ -49,6 +50,7 @@ type proxyHandler struct { // this to confirm the proxy's identity proxyClientCert []byte proxyClientKey []byte + proxyTransport *http.Transport // Endpoints based routing to map from cluster IP to routable IP routing ServiceResolver @@ -93,11 +95,6 @@ func (r *proxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { http.Error(w, handlingInfo.transportBuildingError.Error(), http.StatusInternalServerError) return } - proxyRoundTripper := handlingInfo.proxyRoundTripper - if proxyRoundTripper == nil { - http.Error(w, "", http.StatusNotFound) - return - } ctx, ok := r.contextMapper.Get(req) if !ok { @@ -115,7 +112,7 @@ func (r *proxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { location.Scheme = "https" rloc, err := r.routing.ResolveEndpoint(handlingInfo.serviceNamespace, handlingInfo.serviceName) if err != nil { - http.Error(w, "missing route", http.StatusInternalServerError) + http.Error(w, fmt.Sprintf("missing route (%s)", err.Error()), http.StatusInternalServerError) return } location.Host = rloc.Host @@ -127,7 +124,13 @@ func (r *proxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { newReq.Header = utilnet.CloneHeader(req.Header) newReq.URL = location + var proxyRoundTripper http.RoundTripper upgrade := false + proxyRoundTripper = handlingInfo.proxyRoundTripper + if proxyRoundTripper == nil { + http.Error(w, "", http.StatusNotFound) + return + } // we need to wrap the roundtripper in another roundtripper which will apply the front proxy headers proxyRoundTripper, upgrade, err = maybeWrapForConnectionUpgrades(handlingInfo.restConfig, proxyRoundTripper, req) if err != nil { @@ -205,5 +208,14 @@ func (r *proxyHandler) updateAPIService(apiService *apiregistrationapi.APIServic serviceNamespace: apiService.Spec.Service.Namespace, } newInfo.proxyRoundTripper, newInfo.transportBuildingError = restclient.TransportFor(newInfo.restConfig) + if newInfo.transportBuildingError == nil && r.proxyTransport.Dial != nil { + switch transport := newInfo.proxyRoundTripper.(type) { + case *http.Transport: + transport.Dial = r.proxyTransport.Dial + default: + newInfo.transportBuildingError = fmt.Errorf("Unable to set dialer for %s as rest transport is of type %T", apiService.Spec.Service.Name, newInfo.proxyRoundTripper) + glog.Warning(newInfo.transportBuildingError.Error()) + } + } r.handlingInfo.Store(newInfo) } diff --git a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy_test.go b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy_test.go index d03a79fa87d..b4258ecdd2d 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy_test.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy_test.go @@ -170,8 +170,9 @@ func TestProxyHandler(t *testing.T) { func() { handler := &proxyHandler{ - localDelegate: http.NewServeMux(), - routing: &mockedRouter{destinationHost: targetServer.Listener.Addr().String()}, + localDelegate: http.NewServeMux(), + routing: &mockedRouter{destinationHost: targetServer.Listener.Addr().String()}, + proxyTransport: &http.Transport{}, } handler.contextMapper = &fakeRequestContextMapper{user: tc.user} server := httptest.NewServer(handler) diff --git a/staging/src/k8s.io/kube-aggregator/pkg/cmd/server/start.go b/staging/src/k8s.io/kube-aggregator/pkg/cmd/server/start.go index ab0b153ae5b..559307c73fb 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/cmd/server/start.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/cmd/server/start.go @@ -157,7 +157,7 @@ func (o AggregatorOptions) RunAggregator(stopCh <-chan struct{}) error { return err } - server, err := config.Complete().NewWithDelegate(genericapiserver.EmptyDelegate) + server, err := config.Complete().NewWithDelegate(genericapiserver.EmptyDelegate, nil) if err != nil { return err } diff --git a/test/integration/etcd/etcd_storage_path_test.go b/test/integration/etcd/etcd_storage_path_test.go index e2712ab98bb..bf1da2b3c6f 100644 --- a/test/integration/etcd/etcd_storage_path_test.go +++ b/test/integration/etcd/etcd_storage_path_test.go @@ -600,7 +600,11 @@ func startRealMasterOrDie(t *testing.T, certDir string) (*allClient, clientv3.KV kubeAPIServerOptions.SecureServing.BindPort = kubePort - kubeAPIServerConfig, sharedInformers, _, err := app.CreateKubeAPIServerConfig(kubeAPIServerOptions) + tunneler, proxyTransport, err := app.CreateDialer(kubeAPIServerOptions) + if err != nil { + t.Fatal(err) + } + kubeAPIServerConfig, sharedInformers, _, err := app.CreateKubeAPIServerConfig(kubeAPIServerOptions, tunneler, proxyTransport) if err != nil { t.Fatal(err) } diff --git a/test/integration/examples/apiserver_test.go b/test/integration/examples/apiserver_test.go index 7766282e7d2..37a94c088a7 100644 --- a/test/integration/examples/apiserver_test.go +++ b/test/integration/examples/apiserver_test.go @@ -112,7 +112,11 @@ func TestAggregatedAPIServer(t *testing.T) { kubeAPIServerOptions.Authentication.ClientCert.ClientCA = clientCACertFile.Name() kubeAPIServerOptions.Authorization.Mode = "RBAC" - kubeAPIServerConfig, sharedInformers, _, err := app.CreateKubeAPIServerConfig(kubeAPIServerOptions) + tunneler, proxyTransport, err := app.CreateDialer(kubeAPIServerOptions) + if err != nil { + t.Fatal(err) + } + kubeAPIServerConfig, sharedInformers, _, err := app.CreateKubeAPIServerConfig(kubeAPIServerOptions, tunneler, proxyTransport) if err != nil { t.Fatal(err) }