kube-apiserver: cleanup node proxy setup code

This commit is contained in:
Dr. Stefan Schimanski 2017-06-08 18:19:12 +02:00
parent 88e1ecb4b3
commit 342a8fc657
7 changed files with 29 additions and 24 deletions

View File

@ -43,7 +43,7 @@ import (
"k8s.io/kubernetes/pkg/master/thirdparty"
)
func createAggregatorConfig(kubeAPIServerConfig genericapiserver.Config, commandOptions *options.ServerRunOptions) (*aggregatorapiserver.Config, error) {
func createAggregatorConfig(kubeAPIServerConfig genericapiserver.Config, commandOptions *options.ServerRunOptions, proxyTransport *http.Transport) (*aggregatorapiserver.Config, error) {
// make a shallow copy to let us twiddle a few things
// most of the config actually remains the same. We only need to mess with a couple items related to the particulars of the aggregator
genericConfig := kubeAPIServerConfig
@ -80,14 +80,15 @@ func createAggregatorConfig(kubeAPIServerConfig genericapiserver.Config, command
CoreAPIServerClient: client,
ProxyClientCert: certBytes,
ProxyClientKey: keyBytes,
ProxyTransport: proxyTransport,
EnableAggregatorRouting: commandOptions.EnableAggregatorRouting,
}
return aggregatorConfig, nil
}
func createAggregatorServer(aggregatorConfig *aggregatorapiserver.Config, delegateAPIServer genericapiserver.DelegationTarget, kubeInformers informers.SharedInformerFactory, apiExtensionInformers apiextensionsinformers.SharedInformerFactory, proxyTransport *http.Transport) (*aggregatorapiserver.APIAggregator, error) {
aggregatorServer, err := aggregatorConfig.Complete().NewWithDelegate(delegateAPIServer, proxyTransport)
func createAggregatorServer(aggregatorConfig *aggregatorapiserver.Config, delegateAPIServer genericapiserver.DelegationTarget, kubeInformers informers.SharedInformerFactory, apiExtensionInformers apiextensionsinformers.SharedInformerFactory) (*aggregatorapiserver.APIAggregator, error) {
aggregatorServer, err := aggregatorConfig.Complete().NewWithDelegate(delegateAPIServer)
if err != nil {
return nil, err
}

View File

@ -104,11 +104,11 @@ cluster's shared state through which all other components interact.`,
// Run runs the specified APIServer. This should never exit.
func Run(runOptions *options.ServerRunOptions, stopCh <-chan struct{}) error {
tunneler, proxyTransport, err := CreateDialer(runOptions)
nodeTunneler, proxyTransport, err := CreateNodeDialer(runOptions)
if err != nil {
return err
}
kubeAPIServerConfig, sharedInformers, insecureServingOptions, err := CreateKubeAPIServerConfig(runOptions, tunneler, proxyTransport)
kubeAPIServerConfig, sharedInformers, insecureServingOptions, err := CreateKubeAPIServerConfig(runOptions, nodeTunneler, proxyTransport)
if err != nil {
return err
}
@ -147,11 +147,11 @@ func Run(runOptions *options.ServerRunOptions, stopCh <-chan struct{}) error {
kubeAPIServer.GenericAPIServer.PrepareRun()
// aggregator comes last in the chain
aggregatorConfig, err := createAggregatorConfig(*kubeAPIServerConfig.GenericConfig, runOptions)
aggregatorConfig, err := createAggregatorConfig(*kubeAPIServerConfig.GenericConfig, runOptions, proxyTransport)
if err != nil {
return err
}
aggregatorServer, err := createAggregatorServer(aggregatorConfig, kubeAPIServer.GenericAPIServer, sharedInformers, apiExtensionsServer.Informers, proxyTransport)
aggregatorServer, err := createAggregatorServer(aggregatorConfig, kubeAPIServer.GenericAPIServer, sharedInformers, apiExtensionsServer.Informers)
if err != nil {
// we don't need special handling for innerStopCh because the aggregator server doesn't create any go routines
return err
@ -181,8 +181,8 @@ func CreateKubeAPIServer(kubeAPIServerConfig *master.Config, delegateAPIServer g
return kubeAPIServer, nil
}
// CreateDialer creates the dialer infrastructure and makes it available to APIServer and Aggregator
func CreateDialer(s *options.ServerRunOptions) (tunneler.Tunneler, *http.Transport, error) {
// CreateNodeDialer creates the dialer infrastructure to connect to the nodes.
func CreateNodeDialer(s *options.ServerRunOptions) (tunneler.Tunneler, *http.Transport, error) {
// Setup nodeTunneler if needed
var nodeTunneler tunneler.Tunneler
var proxyDialerFn utilnet.DialFunc
@ -214,8 +214,6 @@ func CreateDialer(s *options.ServerRunOptions) (tunneler.Tunneler, *http.Transpo
}
nodeTunneler = tunneler.New(s.SSHUser, s.SSHKeyfile, healthCheckPath, installSSHKey)
// Use the nodeTunneler's dialer to connect to the kubelet
s.KubeletConfig.Dial = nodeTunneler.Dial
// Use the nodeTunneler's dialer when proxying to pods, services, and nodes
proxyDialerFn = nodeTunneler.Dial
}
@ -315,6 +313,11 @@ func CreateKubeAPIServerConfig(s *options.ServerRunOptions, nodeTunneler tunnele
MasterCount: s.MasterCount,
}
if nodeTunneler != nil {
// Use the nodeTunneler's dialer to connect to the kubelet
config.KubeletClientConfig.Dial = nodeTunneler.Dial
}
return config, sharedInformers, insecureServingOptions, nil
}

View File

@ -42,10 +42,11 @@ import (
"bytes"
"fmt"
"io"
"github.com/go-openapi/spec"
"github.com/golang/glog"
"github.com/pkg/errors"
"io"
"k8s.io/apiserver/pkg/server/openapi"
"k8s.io/client-go/transport"
"k8s.io/kube-aggregator/pkg/apis/apiregistration"
@ -109,6 +110,7 @@ type Config struct {
// this to confirm the proxy's identity
ProxyClientCert []byte
ProxyClientKey []byte
ProxyTransport *http.Transport
// Indicates if the Aggregator should send to the cluster IP (false) or route to the endpoints IP (true)
EnableAggregatorRouting bool
@ -193,7 +195,7 @@ func (r *aggregatorClusterRouting) ResolveEndpoint(namespace, name string) (*url
}
// New returns a new instance of APIAggregator from the given config.
func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.DelegationTarget, proxyTransport *http.Transport) (*APIAggregator, error) {
func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.DelegationTarget) (*APIAggregator, error) {
genericServer, err := c.Config.GenericConfig.SkipComplete().New("kube-aggregator", delegationTarget) // completion is done in Complete, no need for a second time
if err != nil {
return nil, err
@ -228,7 +230,7 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg
contextMapper: c.GenericConfig.RequestContextMapper,
proxyClientCert: c.ProxyClientCert,
proxyClientKey: c.ProxyClientKey,
proxyTransport: proxyTransport,
proxyTransport: c.ProxyTransport,
proxyHandlers: map[string]*proxyHandler{},
apiServiceSpecs: map[string]*spec.Swagger{},
toLoadAPISpec: map[string]int{},

View File

@ -19,11 +19,12 @@ package apiserver
import (
"context"
"fmt"
"github.com/golang/glog"
"net/http"
"net/url"
"sync/atomic"
"github.com/golang/glog"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/httpstream"
"k8s.io/apimachinery/pkg/util/httpstream/spdy"
@ -124,15 +125,13 @@ func (r *proxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
newReq.Header = utilnet.CloneHeader(req.Header)
newReq.URL = location
var proxyRoundTripper http.RoundTripper
upgrade := false
proxyRoundTripper = handlingInfo.proxyRoundTripper
if proxyRoundTripper == nil {
if handlingInfo.proxyRoundTripper == nil {
http.Error(w, "", http.StatusNotFound)
return
}
// we need to wrap the roundtripper in another roundtripper which will apply the front proxy headers
proxyRoundTripper, upgrade, err = maybeWrapForConnectionUpgrades(handlingInfo.restConfig, proxyRoundTripper, req)
proxyRoundTripper, upgrade, err := maybeWrapForConnectionUpgrades(handlingInfo.restConfig, handlingInfo.proxyRoundTripper, req)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
@ -213,7 +212,7 @@ func (r *proxyHandler) updateAPIService(apiService *apiregistrationapi.APIServic
case *http.Transport:
transport.Dial = r.proxyTransport.Dial
default:
newInfo.transportBuildingError = fmt.Errorf("Unable to set dialer for %s as rest transport is of type %T", apiService.Spec.Service.Name, newInfo.proxyRoundTripper)
newInfo.transportBuildingError = fmt.Errorf("unable to set dialer for %s/%s as rest transport is of type %T", apiService.Spec.Service.Namespace, apiService.Spec.Service.Name, newInfo.proxyRoundTripper)
glog.Warning(newInfo.transportBuildingError.Error())
}
}

View File

@ -157,7 +157,7 @@ func (o AggregatorOptions) RunAggregator(stopCh <-chan struct{}) error {
return err
}
server, err := config.Complete().NewWithDelegate(genericapiserver.EmptyDelegate, nil)
server, err := config.Complete().NewWithDelegate(genericapiserver.EmptyDelegate)
if err != nil {
return err
}

View File

@ -604,7 +604,7 @@ func startRealMasterOrDie(t *testing.T, certDir string) (*allClient, clientv3.KV
kubeAPIServerOptions.SecureServing.BindPort = kubePort
tunneler, proxyTransport, err := app.CreateDialer(kubeAPIServerOptions)
tunneler, proxyTransport, err := app.CreateNodeDialer(kubeAPIServerOptions)
if err != nil {
t.Fatal(err)
}

View File

@ -112,7 +112,7 @@ func TestAggregatedAPIServer(t *testing.T) {
kubeAPIServerOptions.Authentication.ClientCert.ClientCA = clientCACertFile.Name()
kubeAPIServerOptions.Authorization.Mode = "RBAC"
tunneler, proxyTransport, err := app.CreateDialer(kubeAPIServerOptions)
tunneler, proxyTransport, err := app.CreateNodeDialer(kubeAPIServerOptions)
if err != nil {
t.Fatal(err)
}