diff --git a/cmd/kube-apiserver/app/aggregator.go b/cmd/kube-apiserver/app/aggregator.go index bbdc069ff71..f715a354ea5 100644 --- a/cmd/kube-apiserver/app/aggregator.go +++ b/cmd/kube-apiserver/app/aggregator.go @@ -85,11 +85,10 @@ func createAggregatorConfig(kubeAPIServerConfig genericapiserver.Config, command } return aggregatorConfig, nil - } -func createAggregatorServer(aggregatorConfig *aggregatorapiserver.Config, delegateAPIServer genericapiserver.DelegationTarget, kubeInformers informers.SharedInformerFactory, apiExtensionInformers apiextensionsinformers.SharedInformerFactory) (*aggregatorapiserver.APIAggregator, error) { - aggregatorServer, err := aggregatorConfig.Complete().NewWithDelegate(delegateAPIServer) +func createAggregatorServer(aggregatorConfig *aggregatorapiserver.Config, delegateAPIServer genericapiserver.DelegationTarget, kubeInformers informers.SharedInformerFactory, apiExtensionInformers apiextensionsinformers.SharedInformerFactory, proxyTransport *http.Transport) (*aggregatorapiserver.APIAggregator, error) { + aggregatorServer, err := aggregatorConfig.Complete().NewWithDelegate(delegateAPIServer, proxyTransport) if err != nil { return nil, err } diff --git a/cmd/kube-apiserver/app/server.go b/cmd/kube-apiserver/app/server.go index 081abddc66f..ae52ed0f62b 100644 --- a/cmd/kube-apiserver/app/server.go +++ b/cmd/kube-apiserver/app/server.go @@ -102,7 +102,11 @@ cluster's shared state through which all other components interact.`, // Run runs the specified APIServer. This should never exit. func Run(runOptions *options.ServerRunOptions, stopCh <-chan struct{}) error { - kubeAPIServerConfig, sharedInformers, insecureServingOptions, err := CreateKubeAPIServerConfig(runOptions) + tunneler, proxyTransport, err := CreateDialer(runOptions) + if err != nil { + return err + } + kubeAPIServerConfig, sharedInformers, insecureServingOptions, err := CreateKubeAPIServerConfig(runOptions, tunneler, proxyTransport) if err != nil { return err } @@ -145,7 +149,7 @@ func Run(runOptions *options.ServerRunOptions, stopCh <-chan struct{}) error { if err != nil { return err } - aggregatorServer, err := createAggregatorServer(aggregatorConfig, kubeAPIServer.GenericAPIServer, sharedInformers, apiExtensionsServer.Informers) + aggregatorServer, err := createAggregatorServer(aggregatorConfig, kubeAPIServer.GenericAPIServer, sharedInformers, apiExtensionsServer.Informers, proxyTransport) if err != nil { // we don't need special handling for innerStopCh because the aggregator server doesn't create any go routines return err @@ -175,8 +179,55 @@ func CreateKubeAPIServer(kubeAPIServerConfig *master.Config, delegateAPIServer g return kubeAPIServer, nil } +// CreateDialer creates the dialer infrastructure and makes it available to APIServer and Aggregator +func CreateDialer(s *options.ServerRunOptions) (tunneler.Tunneler, *http.Transport, error) { + // Setup nodeTunneler if needed + var nodeTunneler tunneler.Tunneler + var proxyDialerFn utilnet.DialFunc + if len(s.SSHUser) > 0 { + // Get ssh key distribution func, if supported + var installSSHKey tunneler.InstallSSHKey + cloud, err := cloudprovider.InitCloudProvider(s.CloudProvider.CloudProvider, s.CloudProvider.CloudConfigFile) + if err != nil { + return nil, nil, fmt.Errorf("cloud provider could not be initialized: %v", err) + } + if cloud != nil { + if instances, supported := cloud.Instances(); supported { + installSSHKey = instances.AddSSHKeyToAllInstances + } + } + if s.KubeletConfig.Port == 0 { + return nil, nil, fmt.Errorf("must enable kubelet port if proxy ssh-tunneling is specified") + } + if s.KubeletConfig.ReadOnlyPort == 0 { + return nil, nil, fmt.Errorf("must enable kubelet readonly port if proxy ssh-tunneling is specified") + } + // Set up the nodeTunneler + // TODO(cjcullen): If we want this to handle per-kubelet ports or other + // kubelet listen-addresses, we need to plumb through options. + healthCheckPath := &url.URL{ + Scheme: "http", + Host: net.JoinHostPort("127.0.0.1", strconv.FormatUint(uint64(s.KubeletConfig.ReadOnlyPort), 10)), + Path: "healthz", + } + nodeTunneler = tunneler.New(s.SSHUser, s.SSHKeyfile, healthCheckPath, installSSHKey) + + // Use the nodeTunneler's dialer to connect to the kubelet + s.KubeletConfig.Dial = nodeTunneler.Dial + // Use the nodeTunneler's dialer when proxying to pods, services, and nodes + proxyDialerFn = nodeTunneler.Dial + } + // Proxying to pods and services is IP-based... don't expect to be able to verify the hostname + proxyTLSClientConfig := &tls.Config{InsecureSkipVerify: true} + proxyTransport := utilnet.SetTransportDefaults(&http.Transport{ + Dial: proxyDialerFn, + TLSClientConfig: proxyTLSClientConfig, + }) + return nodeTunneler, proxyTransport, nil +} + // CreateKubeAPIServerConfig creates all the resources for running the API server, but runs none of them -func CreateKubeAPIServerConfig(s *options.ServerRunOptions) (*master.Config, informers.SharedInformerFactory, *kubeserver.InsecureServingInfo, error) { +func CreateKubeAPIServerConfig(s *options.ServerRunOptions, nodeTunneler tunneler.Tunneler, proxyTransport http.RoundTripper) (*master.Config, informers.SharedInformerFactory, *kubeserver.InsecureServingInfo, error) { // register all admission plugins registerAllAdmissionPlugins(s.Admission.Plugins) @@ -210,49 +261,6 @@ func CreateKubeAPIServerConfig(s *options.ServerRunOptions) (*master.Config, inf PerConnectionBandwidthLimitBytesPerSec: s.MaxConnectionBytesPerSec, }) - // Setup nodeTunneler if needed - var nodeTunneler tunneler.Tunneler - var proxyDialerFn utilnet.DialFunc - if len(s.SSHUser) > 0 { - // Get ssh key distribution func, if supported - var installSSHKey tunneler.InstallSSHKey - cloud, err := cloudprovider.InitCloudProvider(s.CloudProvider.CloudProvider, s.CloudProvider.CloudConfigFile) - if err != nil { - return nil, nil, nil, fmt.Errorf("cloud provider could not be initialized: %v", err) - } - if cloud != nil { - if instances, supported := cloud.Instances(); supported { - installSSHKey = instances.AddSSHKeyToAllInstances - } - } - if s.KubeletConfig.Port == 0 { - return nil, nil, nil, fmt.Errorf("must enable kubelet port if proxy ssh-tunneling is specified") - } - if s.KubeletConfig.ReadOnlyPort == 0 { - return nil, nil, nil, fmt.Errorf("must enable kubelet readonly port if proxy ssh-tunneling is specified") - } - // Set up the nodeTunneler - // TODO(cjcullen): If we want this to handle per-kubelet ports or other - // kubelet listen-addresses, we need to plumb through options. - healthCheckPath := &url.URL{ - Scheme: "http", - Host: net.JoinHostPort("127.0.0.1", strconv.FormatUint(uint64(s.KubeletConfig.ReadOnlyPort), 10)), - Path: "healthz", - } - nodeTunneler = tunneler.New(s.SSHUser, s.SSHKeyfile, healthCheckPath, installSSHKey) - - // Use the nodeTunneler's dialer to connect to the kubelet - s.KubeletConfig.Dial = nodeTunneler.Dial - // Use the nodeTunneler's dialer when proxying to pods, services, and nodes - proxyDialerFn = nodeTunneler.Dial - } - // Proxying to pods and services is IP-based... don't expect to be able to verify the hostname - proxyTLSClientConfig := &tls.Config{InsecureSkipVerify: true} - proxyTransport := utilnet.SetTransportDefaults(&http.Transport{ - Dial: proxyDialerFn, - TLSClientConfig: proxyTLSClientConfig, - }) - serviceIPRange, apiServerServiceIP, err := master.DefaultServiceIPRange(s.ServiceClusterIPRange) if err != nil { return nil, nil, nil, err diff --git a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go index a502582168d..87435586661 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go @@ -111,6 +111,7 @@ type APIAggregator struct { // this to confirm the proxy's identity proxyClientCert []byte proxyClientKey []byte + proxyTransport *http.Transport // proxyHandlers are the proxy handlers that are currently registered, keyed by apiservice.name proxyHandlers map[string]*proxyHandler @@ -159,7 +160,7 @@ func (r *aggregatorClusterRouting) ResolveEndpoint(namespace, name string) (*url } // New returns a new instance of APIAggregator from the given config. -func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.DelegationTarget) (*APIAggregator, error) { +func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.DelegationTarget, proxyTransport *http.Transport) (*APIAggregator, error) { genericServer, err := c.Config.GenericConfig.SkipComplete().New("kube-aggregator", delegationTarget) // completion is done in Complete, no need for a second time if err != nil { return nil, err @@ -193,6 +194,7 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg contextMapper: c.GenericConfig.RequestContextMapper, proxyClientCert: c.ProxyClientCert, proxyClientKey: c.ProxyClientKey, + proxyTransport: proxyTransport, proxyHandlers: map[string]*proxyHandler{}, handledGroups: sets.String{}, lister: informerFactory.Apiregistration().InternalVersion().APIServices().Lister(), @@ -267,6 +269,7 @@ func (s *APIAggregator) AddAPIService(apiService *apiregistration.APIService) { localDelegate: s.delegateHandler, proxyClientCert: s.proxyClientCert, proxyClientKey: s.proxyClientKey, + proxyTransport: s.proxyTransport, routing: s.routing, } proxyHandler.updateAPIService(apiService) diff --git a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy.go b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy.go index 397b0ab528f..eb5db565244 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy.go @@ -18,6 +18,8 @@ package apiserver import ( "context" + "fmt" + "github.com/golang/glog" "net/http" "net/url" "sync/atomic" @@ -33,7 +35,6 @@ import ( utilfeature "k8s.io/apiserver/pkg/util/feature" restclient "k8s.io/client-go/rest" "k8s.io/client-go/transport" - apiregistrationapi "k8s.io/kube-aggregator/pkg/apis/apiregistration" ) @@ -49,6 +50,7 @@ type proxyHandler struct { // this to confirm the proxy's identity proxyClientCert []byte proxyClientKey []byte + proxyTransport *http.Transport // Endpoints based routing to map from cluster IP to routable IP routing ServiceResolver @@ -93,11 +95,6 @@ func (r *proxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { http.Error(w, handlingInfo.transportBuildingError.Error(), http.StatusInternalServerError) return } - proxyRoundTripper := handlingInfo.proxyRoundTripper - if proxyRoundTripper == nil { - http.Error(w, "", http.StatusNotFound) - return - } ctx, ok := r.contextMapper.Get(req) if !ok { @@ -115,7 +112,7 @@ func (r *proxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { location.Scheme = "https" rloc, err := r.routing.ResolveEndpoint(handlingInfo.serviceNamespace, handlingInfo.serviceName) if err != nil { - http.Error(w, "missing route", http.StatusInternalServerError) + http.Error(w, fmt.Sprintf("missing route (%s)", err.Error()), http.StatusInternalServerError) return } location.Host = rloc.Host @@ -127,7 +124,13 @@ func (r *proxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { newReq.Header = utilnet.CloneHeader(req.Header) newReq.URL = location + var proxyRoundTripper http.RoundTripper upgrade := false + proxyRoundTripper = handlingInfo.proxyRoundTripper + if proxyRoundTripper == nil { + http.Error(w, "", http.StatusNotFound) + return + } // we need to wrap the roundtripper in another roundtripper which will apply the front proxy headers proxyRoundTripper, upgrade, err = maybeWrapForConnectionUpgrades(handlingInfo.restConfig, proxyRoundTripper, req) if err != nil { @@ -205,5 +208,14 @@ func (r *proxyHandler) updateAPIService(apiService *apiregistrationapi.APIServic serviceNamespace: apiService.Spec.Service.Namespace, } newInfo.proxyRoundTripper, newInfo.transportBuildingError = restclient.TransportFor(newInfo.restConfig) + if newInfo.transportBuildingError == nil && r.proxyTransport.Dial != nil { + switch transport := newInfo.proxyRoundTripper.(type) { + case *http.Transport: + transport.Dial = r.proxyTransport.Dial + default: + newInfo.transportBuildingError = fmt.Errorf("Unable to set dialer for %s as rest transport is of type %T", apiService.Spec.Service.Name, newInfo.proxyRoundTripper) + glog.Warning(newInfo.transportBuildingError.Error()) + } + } r.handlingInfo.Store(newInfo) } diff --git a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy_test.go b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy_test.go index d03a79fa87d..b4258ecdd2d 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy_test.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy_test.go @@ -170,8 +170,9 @@ func TestProxyHandler(t *testing.T) { func() { handler := &proxyHandler{ - localDelegate: http.NewServeMux(), - routing: &mockedRouter{destinationHost: targetServer.Listener.Addr().String()}, + localDelegate: http.NewServeMux(), + routing: &mockedRouter{destinationHost: targetServer.Listener.Addr().String()}, + proxyTransport: &http.Transport{}, } handler.contextMapper = &fakeRequestContextMapper{user: tc.user} server := httptest.NewServer(handler) diff --git a/staging/src/k8s.io/kube-aggregator/pkg/cmd/server/start.go b/staging/src/k8s.io/kube-aggregator/pkg/cmd/server/start.go index ab0b153ae5b..559307c73fb 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/cmd/server/start.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/cmd/server/start.go @@ -157,7 +157,7 @@ func (o AggregatorOptions) RunAggregator(stopCh <-chan struct{}) error { return err } - server, err := config.Complete().NewWithDelegate(genericapiserver.EmptyDelegate) + server, err := config.Complete().NewWithDelegate(genericapiserver.EmptyDelegate, nil) if err != nil { return err } diff --git a/test/integration/etcd/etcd_storage_path_test.go b/test/integration/etcd/etcd_storage_path_test.go index 896b2f69513..3a8fada6ee0 100644 --- a/test/integration/etcd/etcd_storage_path_test.go +++ b/test/integration/etcd/etcd_storage_path_test.go @@ -604,7 +604,11 @@ func startRealMasterOrDie(t *testing.T, certDir string) (*allClient, clientv3.KV kubeAPIServerOptions.SecureServing.BindPort = kubePort - kubeAPIServerConfig, sharedInformers, _, err := app.CreateKubeAPIServerConfig(kubeAPIServerOptions) + tunneler, proxyTransport, err := app.CreateDialer(kubeAPIServerOptions) + if err != nil { + t.Fatal(err) + } + kubeAPIServerConfig, sharedInformers, _, err := app.CreateKubeAPIServerConfig(kubeAPIServerOptions, tunneler, proxyTransport) if err != nil { t.Fatal(err) } diff --git a/test/integration/examples/apiserver_test.go b/test/integration/examples/apiserver_test.go index 87f401701f4..9b5a1852a85 100644 --- a/test/integration/examples/apiserver_test.go +++ b/test/integration/examples/apiserver_test.go @@ -112,7 +112,11 @@ func TestAggregatedAPIServer(t *testing.T) { kubeAPIServerOptions.Authentication.ClientCert.ClientCA = clientCACertFile.Name() kubeAPIServerOptions.Authorization.Mode = "RBAC" - kubeAPIServerConfig, sharedInformers, _, err := app.CreateKubeAPIServerConfig(kubeAPIServerOptions) + tunneler, proxyTransport, err := app.CreateDialer(kubeAPIServerOptions) + if err != nil { + t.Fatal(err) + } + kubeAPIServerConfig, sharedInformers, _, err := app.CreateKubeAPIServerConfig(kubeAPIServerOptions, tunneler, proxyTransport) if err != nil { t.Fatal(err) }