diff --git a/cmd/kube-apiserver/app/aggregator.go b/cmd/kube-apiserver/app/aggregator.go index bdf8b8a2e87..28dccf84b3e 100644 --- a/cmd/kube-apiserver/app/aggregator.go +++ b/cmd/kube-apiserver/app/aggregator.go @@ -43,7 +43,7 @@ import ( "k8s.io/kubernetes/pkg/master/thirdparty" ) -func createAggregatorConfig(kubeAPIServerConfig genericapiserver.Config, commandOptions *options.ServerRunOptions) (*aggregatorapiserver.Config, error) { +func createAggregatorConfig(kubeAPIServerConfig genericapiserver.Config, commandOptions *options.ServerRunOptions, proxyTransport *http.Transport) (*aggregatorapiserver.Config, error) { // make a shallow copy to let us twiddle a few things // most of the config actually remains the same. We only need to mess with a couple items related to the particulars of the aggregator genericConfig := kubeAPIServerConfig @@ -80,14 +80,15 @@ func createAggregatorConfig(kubeAPIServerConfig genericapiserver.Config, command CoreAPIServerClient: client, ProxyClientCert: certBytes, ProxyClientKey: keyBytes, + ProxyTransport: proxyTransport, EnableAggregatorRouting: commandOptions.EnableAggregatorRouting, } return aggregatorConfig, nil } -func createAggregatorServer(aggregatorConfig *aggregatorapiserver.Config, delegateAPIServer genericapiserver.DelegationTarget, kubeInformers informers.SharedInformerFactory, apiExtensionInformers apiextensionsinformers.SharedInformerFactory, proxyTransport *http.Transport) (*aggregatorapiserver.APIAggregator, error) { - aggregatorServer, err := aggregatorConfig.Complete().NewWithDelegate(delegateAPIServer, proxyTransport) +func createAggregatorServer(aggregatorConfig *aggregatorapiserver.Config, delegateAPIServer genericapiserver.DelegationTarget, kubeInformers informers.SharedInformerFactory, apiExtensionInformers apiextensionsinformers.SharedInformerFactory) (*aggregatorapiserver.APIAggregator, error) { + aggregatorServer, err := aggregatorConfig.Complete().NewWithDelegate(delegateAPIServer) if err != nil { return nil, err } diff --git a/cmd/kube-apiserver/app/server.go b/cmd/kube-apiserver/app/server.go index 6335dfe79ec..68b864312c5 100644 --- a/cmd/kube-apiserver/app/server.go +++ b/cmd/kube-apiserver/app/server.go @@ -104,11 +104,11 @@ cluster's shared state through which all other components interact.`, // Run runs the specified APIServer. This should never exit. func Run(runOptions *options.ServerRunOptions, stopCh <-chan struct{}) error { - tunneler, proxyTransport, err := CreateDialer(runOptions) + nodeTunneler, proxyTransport, err := CreateNodeDialer(runOptions) if err != nil { return err } - kubeAPIServerConfig, sharedInformers, insecureServingOptions, err := CreateKubeAPIServerConfig(runOptions, tunneler, proxyTransport) + kubeAPIServerConfig, sharedInformers, insecureServingOptions, err := CreateKubeAPIServerConfig(runOptions, nodeTunneler, proxyTransport) if err != nil { return err } @@ -147,11 +147,11 @@ func Run(runOptions *options.ServerRunOptions, stopCh <-chan struct{}) error { kubeAPIServer.GenericAPIServer.PrepareRun() // aggregator comes last in the chain - aggregatorConfig, err := createAggregatorConfig(*kubeAPIServerConfig.GenericConfig, runOptions) + aggregatorConfig, err := createAggregatorConfig(*kubeAPIServerConfig.GenericConfig, runOptions, proxyTransport) if err != nil { return err } - aggregatorServer, err := createAggregatorServer(aggregatorConfig, kubeAPIServer.GenericAPIServer, sharedInformers, apiExtensionsServer.Informers, proxyTransport) + aggregatorServer, err := createAggregatorServer(aggregatorConfig, kubeAPIServer.GenericAPIServer, sharedInformers, apiExtensionsServer.Informers) if err != nil { // we don't need special handling for innerStopCh because the aggregator server doesn't create any go routines return err @@ -181,8 +181,8 @@ func CreateKubeAPIServer(kubeAPIServerConfig *master.Config, delegateAPIServer g return kubeAPIServer, nil } -// CreateDialer creates the dialer infrastructure and makes it available to APIServer and Aggregator -func CreateDialer(s *options.ServerRunOptions) (tunneler.Tunneler, *http.Transport, error) { +// CreateNodeDialer creates the dialer infrastructure to connect to the nodes. +func CreateNodeDialer(s *options.ServerRunOptions) (tunneler.Tunneler, *http.Transport, error) { // Setup nodeTunneler if needed var nodeTunneler tunneler.Tunneler var proxyDialerFn utilnet.DialFunc @@ -214,8 +214,6 @@ func CreateDialer(s *options.ServerRunOptions) (tunneler.Tunneler, *http.Transpo } nodeTunneler = tunneler.New(s.SSHUser, s.SSHKeyfile, healthCheckPath, installSSHKey) - // Use the nodeTunneler's dialer to connect to the kubelet - s.KubeletConfig.Dial = nodeTunneler.Dial // Use the nodeTunneler's dialer when proxying to pods, services, and nodes proxyDialerFn = nodeTunneler.Dial } @@ -315,6 +313,11 @@ func CreateKubeAPIServerConfig(s *options.ServerRunOptions, nodeTunneler tunnele MasterCount: s.MasterCount, } + if nodeTunneler != nil { + // Use the nodeTunneler's dialer to connect to the kubelet + config.KubeletClientConfig.Dial = nodeTunneler.Dial + } + return config, sharedInformers, insecureServingOptions, nil } diff --git a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go index 2ac2ec700c6..611cfec0d82 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go @@ -42,10 +42,11 @@ import ( "bytes" "fmt" + "io" + "github.com/go-openapi/spec" "github.com/golang/glog" "github.com/pkg/errors" - "io" "k8s.io/apiserver/pkg/server/openapi" "k8s.io/client-go/transport" "k8s.io/kube-aggregator/pkg/apis/apiregistration" @@ -109,6 +110,7 @@ type Config struct { // this to confirm the proxy's identity ProxyClientCert []byte ProxyClientKey []byte + ProxyTransport *http.Transport // Indicates if the Aggregator should send to the cluster IP (false) or route to the endpoints IP (true) EnableAggregatorRouting bool @@ -193,7 +195,7 @@ func (r *aggregatorClusterRouting) ResolveEndpoint(namespace, name string) (*url } // New returns a new instance of APIAggregator from the given config. -func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.DelegationTarget, proxyTransport *http.Transport) (*APIAggregator, error) { +func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.DelegationTarget) (*APIAggregator, error) { genericServer, err := c.Config.GenericConfig.SkipComplete().New("kube-aggregator", delegationTarget) // completion is done in Complete, no need for a second time if err != nil { return nil, err @@ -228,7 +230,7 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg contextMapper: c.GenericConfig.RequestContextMapper, proxyClientCert: c.ProxyClientCert, proxyClientKey: c.ProxyClientKey, - proxyTransport: proxyTransport, + proxyTransport: c.ProxyTransport, proxyHandlers: map[string]*proxyHandler{}, apiServiceSpecs: map[string]*spec.Swagger{}, toLoadAPISpec: map[string]int{}, diff --git a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy.go b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy.go index eb5db565244..43e99f88db6 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy.go @@ -19,11 +19,12 @@ package apiserver import ( "context" "fmt" - "github.com/golang/glog" "net/http" "net/url" "sync/atomic" + "github.com/golang/glog" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/httpstream" "k8s.io/apimachinery/pkg/util/httpstream/spdy" @@ -124,15 +125,13 @@ func (r *proxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { newReq.Header = utilnet.CloneHeader(req.Header) newReq.URL = location - var proxyRoundTripper http.RoundTripper - upgrade := false - proxyRoundTripper = handlingInfo.proxyRoundTripper - if proxyRoundTripper == nil { + if handlingInfo.proxyRoundTripper == nil { http.Error(w, "", http.StatusNotFound) return } + // we need to wrap the roundtripper in another roundtripper which will apply the front proxy headers - proxyRoundTripper, upgrade, err = maybeWrapForConnectionUpgrades(handlingInfo.restConfig, proxyRoundTripper, req) + proxyRoundTripper, upgrade, err := maybeWrapForConnectionUpgrades(handlingInfo.restConfig, handlingInfo.proxyRoundTripper, req) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return @@ -213,7 +212,7 @@ func (r *proxyHandler) updateAPIService(apiService *apiregistrationapi.APIServic case *http.Transport: transport.Dial = r.proxyTransport.Dial default: - newInfo.transportBuildingError = fmt.Errorf("Unable to set dialer for %s as rest transport is of type %T", apiService.Spec.Service.Name, newInfo.proxyRoundTripper) + newInfo.transportBuildingError = fmt.Errorf("unable to set dialer for %s/%s as rest transport is of type %T", apiService.Spec.Service.Namespace, apiService.Spec.Service.Name, newInfo.proxyRoundTripper) glog.Warning(newInfo.transportBuildingError.Error()) } } diff --git a/staging/src/k8s.io/kube-aggregator/pkg/cmd/server/start.go b/staging/src/k8s.io/kube-aggregator/pkg/cmd/server/start.go index 559307c73fb..ab0b153ae5b 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/cmd/server/start.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/cmd/server/start.go @@ -157,7 +157,7 @@ func (o AggregatorOptions) RunAggregator(stopCh <-chan struct{}) error { return err } - server, err := config.Complete().NewWithDelegate(genericapiserver.EmptyDelegate, nil) + server, err := config.Complete().NewWithDelegate(genericapiserver.EmptyDelegate) if err != nil { return err } diff --git a/test/integration/etcd/etcd_storage_path_test.go b/test/integration/etcd/etcd_storage_path_test.go index 3a8fada6ee0..e2e56845161 100644 --- a/test/integration/etcd/etcd_storage_path_test.go +++ b/test/integration/etcd/etcd_storage_path_test.go @@ -604,7 +604,7 @@ func startRealMasterOrDie(t *testing.T, certDir string) (*allClient, clientv3.KV kubeAPIServerOptions.SecureServing.BindPort = kubePort - tunneler, proxyTransport, err := app.CreateDialer(kubeAPIServerOptions) + tunneler, proxyTransport, err := app.CreateNodeDialer(kubeAPIServerOptions) if err != nil { t.Fatal(err) } diff --git a/test/integration/examples/apiserver_test.go b/test/integration/examples/apiserver_test.go index 9b5a1852a85..f2fb02a2fa0 100644 --- a/test/integration/examples/apiserver_test.go +++ b/test/integration/examples/apiserver_test.go @@ -112,7 +112,7 @@ func TestAggregatedAPIServer(t *testing.T) { kubeAPIServerOptions.Authentication.ClientCert.ClientCA = clientCACertFile.Name() kubeAPIServerOptions.Authorization.Mode = "RBAC" - tunneler, proxyTransport, err := app.CreateDialer(kubeAPIServerOptions) + tunneler, proxyTransport, err := app.CreateNodeDialer(kubeAPIServerOptions) if err != nil { t.Fatal(err) }