From ad8a83a7c1741efb507d924a17eb809748ee2e06 Mon Sep 17 00:00:00 2001 From: Walter Fender Date: Thu, 20 Apr 2017 16:23:00 -0700 Subject: [PATCH] Change to aggregator so it calls a user apiservice via its pod IP. proxy_handler now uses the endpoint router to map the cluster IP to appropriate endpoint (Pod) IP for the given resource. Added code to allow aggregator routing to be optional. Updated bazel build. Fixes to cover JLiggit comments. Added util ResourceLocation method based on Listers. Fixed issues from verification steps. Updated to add an interface to obfuscate some of the routing logic. Collapsed cluster IP resolution in to the aggregator routing implementation. Added 2 simple unit tests for ResolveEndpoint --- .../gce/container-linux/configure-helper.sh | 1 + cluster/gce/gci/configure-helper.sh | 1 + cmd/kube-apiserver/app/aggregator.go | 10 +- cmd/kube-apiserver/app/options/options.go | 5 + hack/verify-flags/known-flags.txt | 1 + .../src/k8s.io/apiserver/pkg/util/proxy/BUILD | 13 +- .../k8s.io/apiserver/pkg/util/proxy/proxy.go | 117 +++++++++++++++++ .../apiserver/pkg/util/proxy/proxy_test.go | 119 ++++++++++++++++++ .../kube-aggregator/pkg/apiserver/BUILD | 1 + .../pkg/apiserver/apiserver.go | 50 +++++++- .../pkg/apiserver/apiservice_controller.go | 5 +- .../pkg/apiserver/handler_proxy.go | 23 +++- .../pkg/apiserver/handler_proxy_test.go | 16 ++- 13 files changed, 343 insertions(+), 19 deletions(-) create mode 100644 staging/src/k8s.io/apiserver/pkg/util/proxy/proxy.go create mode 100644 staging/src/k8s.io/apiserver/pkg/util/proxy/proxy_test.go diff --git a/cluster/gce/container-linux/configure-helper.sh b/cluster/gce/container-linux/configure-helper.sh index 7924fd3cff1..52c8a029a13 100755 --- a/cluster/gce/container-linux/configure-helper.sh +++ b/cluster/gce/container-linux/configure-helper.sh @@ -810,6 +810,7 @@ function start-kube-apiserver { params+=" --tls-cert-file=/etc/srv/kubernetes/server.cert" params+=" --tls-private-key-file=/etc/srv/kubernetes/server.key" params+=" --token-auth-file=/etc/srv/kubernetes/known_tokens.csv" + params+=" --enable-aggregator-routing=true" if [[ -n "${KUBE_PASSWORD:-}" && -n "${KUBE_USER:-}" ]]; then params+=" --basic-auth-file=/etc/srv/kubernetes/basic_auth.csv" fi diff --git a/cluster/gce/gci/configure-helper.sh b/cluster/gce/gci/configure-helper.sh index 24d1db1984e..b4e5becf3bd 100644 --- a/cluster/gce/gci/configure-helper.sh +++ b/cluster/gce/gci/configure-helper.sh @@ -1015,6 +1015,7 @@ function start-kube-apiserver { params+=" --secure-port=443" params+=" --tls-cert-file=${APISERVER_SERVER_CERT_PATH}" params+=" --tls-private-key-file=${APISERVER_SERVER_KEY_PATH}" + params+=" --enable-aggregator-routing=true" if [[ -e "${APISERVER_CLIENT_CERT_PATH}" ]] && [[ -e "${APISERVER_CLIENT_KEY_PATH}" ]]; then params+=" --kubelet-client-certificate=${APISERVER_CLIENT_CERT_PATH}" params+=" --kubelet-client-key=${APISERVER_CLIENT_KEY_PATH}" diff --git a/cmd/kube-apiserver/app/aggregator.go b/cmd/kube-apiserver/app/aggregator.go index 2ced761f644..3aa6279b93d 100644 --- a/cmd/kube-apiserver/app/aggregator.go +++ b/cmd/kube-apiserver/app/aggregator.go @@ -79,11 +79,13 @@ func createAggregatorConfig(kubeAPIServerConfig genericapiserver.Config, command return nil, err } } + aggregatorConfig := &aggregatorapiserver.Config{ - GenericConfig: &genericConfig, - CoreAPIServerClient: client, - ProxyClientCert: certBytes, - ProxyClientKey: keyBytes, + GenericConfig: &genericConfig, + CoreAPIServerClient: client, + ProxyClientCert: certBytes, + ProxyClientKey: keyBytes, + EnableAggregatorRouting: commandOptions.EnableAggregatorRouting, } return aggregatorConfig, nil diff --git a/cmd/kube-apiserver/app/options/options.go b/cmd/kube-apiserver/app/options/options.go index 8ecc905b436..ee69ed645b0 100644 --- a/cmd/kube-apiserver/app/options/options.go +++ b/cmd/kube-apiserver/app/options/options.go @@ -68,6 +68,8 @@ type ServerRunOptions struct { ProxyClientCertFile string ProxyClientKeyFile string + + EnableAggregatorRouting bool } // NewServerRunOptions creates a new ServerRunOptions object with default parameters @@ -217,4 +219,7 @@ func (s *ServerRunOptions) AddFlags(fs *pflag.FlagSet) { fs.StringVar(&s.ProxyClientKeyFile, "proxy-client-key-file", s.ProxyClientKeyFile, "client certificate key used to prove the identity of the aggragator or kube-apiserver when it proxies requests to a user api-server") + fs.BoolVar(&s.EnableAggregatorRouting, "enable-aggregator-routing", s.EnableAggregatorRouting, + "Turns on aggregator routing requests to endoints IP rather than cluster IP.") + } diff --git a/hack/verify-flags/known-flags.txt b/hack/verify-flags/known-flags.txt index cf0b1e31268..c8c0fa86710 100644 --- a/hack/verify-flags/known-flags.txt +++ b/hack/verify-flags/known-flags.txt @@ -190,6 +190,7 @@ dump-logs-on-failure duration-sec e2e-output-dir e2e-verify-service-account +enable-aggregator-routing enable-controller-attach-detach enable-custom-metrics enable-debugging-handlers diff --git a/staging/src/k8s.io/apiserver/pkg/util/proxy/BUILD b/staging/src/k8s.io/apiserver/pkg/util/proxy/BUILD index 1d186a60f30..5f13fa1238f 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/proxy/BUILD +++ b/staging/src/k8s.io/apiserver/pkg/util/proxy/BUILD @@ -12,11 +12,18 @@ go_test( name = "go_default_test", srcs = [ "dial_test.go", + "proxy_test.go", "transport_test.go", ], library = ":go_default_library", tags = ["automanaged"], - deps = ["//vendor/k8s.io/apimachinery/pkg/util/net:go_default_library"], + deps = [ + "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/labels:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/net:go_default_library", + "//vendor/k8s.io/client-go/listers/core/v1:go_default_library", + "//vendor/k8s.io/client-go/pkg/api/v1:go_default_library", + ], ) go_library( @@ -24,6 +31,7 @@ go_library( srcs = [ "dial.go", "doc.go", + "proxy.go", "transport.go", ], tags = ["automanaged"], @@ -31,8 +39,11 @@ go_library( "//vendor/github.com/golang/glog:go_default_library", "//vendor/golang.org/x/net/html:go_default_library", "//vendor/golang.org/x/net/html/atom:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/net:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//vendor/k8s.io/apimachinery/third_party/forked/golang/netutil:go_default_library", + "//vendor/k8s.io/client-go/listers/core/v1:go_default_library", + "//vendor/k8s.io/client-go/pkg/api/v1:go_default_library", ], ) diff --git a/staging/src/k8s.io/apiserver/pkg/util/proxy/proxy.go b/staging/src/k8s.io/apiserver/pkg/util/proxy/proxy.go new file mode 100644 index 00000000000..925370d59e0 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/util/proxy/proxy.go @@ -0,0 +1,117 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package proxy + +import ( + "fmt" + "math/rand" + "net" + "net/url" + "strconv" + + "k8s.io/apimachinery/pkg/api/errors" + utilnet "k8s.io/apimachinery/pkg/util/net" + listersv1 "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/pkg/api/v1" +) + +// ResourceLocation returns a URL to which one can send traffic for the specified service. +func ResolveEndpoint(services listersv1.ServiceLister, endpoints listersv1.EndpointsLister, namespace, id string) (*url.URL, error) { + // Allow ID as "svcname", "svcname:port", or "scheme:svcname:port". + svcScheme, svcName, portStr, valid := utilnet.SplitSchemeNamePort(id) + if !valid { + return nil, errors.NewBadRequest(fmt.Sprintf("invalid service request %q", id)) + } + + // If a port *number* was specified, find the corresponding service port name + if portNum, err := strconv.ParseInt(portStr, 10, 64); err == nil { + svc, err := services.Services(namespace).Get(svcName) + if err != nil { + return nil, err + } + found := false + for _, svcPort := range svc.Spec.Ports { + if int64(svcPort.Port) == portNum { + // use the declared port's name + portStr = svcPort.Name + found = true + break + } + } + if !found { + return nil, errors.NewServiceUnavailable(fmt.Sprintf("no service port %d found for service %q", portNum, svcName)) + } + } + + eps, err := endpoints.Endpoints(namespace).Get(svcName) + if err != nil { + return nil, err + } + if len(eps.Subsets) == 0 { + return nil, errors.NewServiceUnavailable(fmt.Sprintf("no endpoints available for service %q", svcName)) + } + // Pick a random Subset to start searching from. + ssSeed := rand.Intn(len(eps.Subsets)) + // Find a Subset that has the port. + for ssi := 0; ssi < len(eps.Subsets); ssi++ { + ss := &eps.Subsets[(ssSeed+ssi)%len(eps.Subsets)] + if len(ss.Addresses) == 0 { + continue + } + for i := range ss.Ports { + if ss.Ports[i].Name == portStr { + // Pick a random address. + ip := ss.Addresses[rand.Intn(len(ss.Addresses))].IP + port := int(ss.Ports[i].Port) + return &url.URL{ + Scheme: svcScheme, + Host: net.JoinHostPort(ip, strconv.Itoa(port)), + }, nil + } + } + } + return nil, errors.NewServiceUnavailable(fmt.Sprintf("no endpoints available for service %q", id)) +} + +func ResolveCluster(services listersv1.ServiceLister, namespace, id string) (*url.URL, error) { + if len(id) == 0 { + return &url.URL{Scheme: "https"}, nil + } + + destinationHost := id + "." + namespace + ".svc" + service, err := services.Services(namespace).Get(id) + if err != nil { + return &url.URL{ + Scheme: "https", + Host: destinationHost, + }, nil + } + switch { + // use IP from a clusterIP for these service types + case service.Spec.Type == v1.ServiceTypeClusterIP, + service.Spec.Type == v1.ServiceTypeNodePort, + service.Spec.Type == v1.ServiceTypeLoadBalancer: + return &url.URL{ + Scheme: "https", + Host: service.Spec.ClusterIP, + }, nil + } + return &url.URL{ + Scheme: "https", + Host: destinationHost, + }, nil +} diff --git a/staging/src/k8s.io/apiserver/pkg/util/proxy/proxy_test.go b/staging/src/k8s.io/apiserver/pkg/util/proxy/proxy_test.go new file mode 100644 index 00000000000..101e50d6c24 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/util/proxy/proxy_test.go @@ -0,0 +1,119 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package proxy + +import ( + "testing" + + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/labels" + listersv1 "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/pkg/api/v1" + "net/http" +) + +type serviceListerMock struct { + services []*v1.Service + err error +} + +func (s *serviceListerMock) List(selector labels.Selector) (ret []*v1.Service, err error) { + return s.services, err +} + +func (s *serviceListerMock) Services(namespace string) listersv1.ServiceNamespaceLister { + return nil +} + +func (s *serviceListerMock) GetPodServices(pod *v1.Pod) ([]*v1.Service, error) { + return nil, nil +} + +type endpointsListerMock struct { + endpoints []*v1.Endpoints + err error +} + +func (e *endpointsListerMock) List(selector labels.Selector) (ret []*v1.Endpoints, err error) { + return e.endpoints, e.err +} + +func (e *endpointsListerMock) Endpoints(namespace string) listersv1.EndpointsNamespaceLister { + return endpointsNamespaceListMock{ + endpoints: e.endpoints, + err: e.err, + } +} + +type endpointsNamespaceListMock struct { + endpoints []*v1.Endpoints + err error +} + +func (e endpointsNamespaceListMock) List(selector labels.Selector) (ret []*v1.Endpoints, err error) { + return e.endpoints, e.err +} + +func (e endpointsNamespaceListMock) Get(name string) (*v1.Endpoints, error) { + if len(e.endpoints) == 0 { + return nil, e.err + } + return e.endpoints[0], e.err +} + +func TestNoEndpointNoPort(t *testing.T) { + services := &serviceListerMock{} + endpoints := &endpointsListerMock{err: errors.NewNotFound(v1.Resource("endpoints"), "dummy-svc")} + url, err := ResolveEndpoint(services, endpoints, "dummy-ns", "dummy-svc") + if url != nil { + t.Error("Should not have gotten back an URL") + } + if err == nil { + t.Error("Should have gotten an error") + } + se, ok := err.(*errors.StatusError) + if !ok { + t.Error("Should have gotten a status error not %T", err) + } + if se.ErrStatus.Code != http.StatusNotFound { + t.Error("Should have gotten a http 404 not %d", se.ErrStatus.Code) + } +} + +func TestOneEndpointNoPort(t *testing.T) { + services := &serviceListerMock{} + address := v1.EndpointAddress{Hostname: "dummy-host", IP: "127.0.0.1"} + addresses := []v1.EndpointAddress{address} + port := v1.EndpointPort{Port: 443} + ports := []v1.EndpointPort{port} + endpoint := v1.EndpointSubset{Addresses: addresses, Ports: ports} + subsets := []v1.EndpointSubset{endpoint} + one := &v1.Endpoints{Subsets: subsets} + slice := []*v1.Endpoints{one} + endpoints := &endpointsListerMock{endpoints: slice} + url, err := ResolveEndpoint(services, endpoints, "dummy-ns", "dummy-svc") + if err != nil { + t.Errorf("Should not have gotten error %v", err) + } + if url == nil { + t.Error("Should not have gotten back an URL") + } + if url.Host != "127.0.0.1:443" { + t.Error("Should have gotten back a host of dummy-host not %s", url.Host) + } + +} diff --git a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/BUILD b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/BUILD index 26c6515b151..062f4e923da 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/BUILD +++ b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/BUILD @@ -65,6 +65,7 @@ go_library( "//vendor/k8s.io/apiserver/pkg/registry/rest:go_default_library", "//vendor/k8s.io/apiserver/pkg/server:go_default_library", "//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library", + "//vendor/k8s.io/apiserver/pkg/util/proxy:go_default_library", "//vendor/k8s.io/client-go/informers:go_default_library", "//vendor/k8s.io/client-go/informers/core/v1:go_default_library", "//vendor/k8s.io/client-go/kubernetes:go_default_library", diff --git a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go index 87e0d3e695a..dbaa6b1b005 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go @@ -18,6 +18,7 @@ package apiserver import ( "net/http" + "net/url" "time" "k8s.io/apimachinery/pkg/apimachinery/announced" @@ -30,8 +31,10 @@ import ( genericapirequest "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/apiserver/pkg/registry/rest" genericapiserver "k8s.io/apiserver/pkg/server" + "k8s.io/apiserver/pkg/util/proxy" kubeinformers "k8s.io/client-go/informers" kubeclientset "k8s.io/client-go/kubernetes" + listersv1 "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/pkg/version" "k8s.io/kube-aggregator/pkg/apis/apiregistration" @@ -70,6 +73,19 @@ func init() { // legacyAPIServiceName is the fixed name of the only non-groupified API version const legacyAPIServiceName = "v1." +type ServiceResolver interface { + ResolveEndpoint(namespace, name string) (*url.URL, error) +} + +type aggregatorEndpointRouting struct { + services listersv1.ServiceLister + endpoints listersv1.EndpointsLister +} + +type aggregatorClusterRouting struct { + services listersv1.ServiceLister +} + type Config struct { GenericConfig *genericapiserver.Config CoreAPIServerClient kubeclientset.Interface @@ -78,6 +94,9 @@ type Config struct { // this to confirm the proxy's identity ProxyClientCert []byte ProxyClientKey []byte + + // Indicates if the Aggregator should send to the cluster IP (false) or route to the endpoints IP (true) + EnableAggregatorRouting bool } // APIAggregator contains state for a Kubernetes cluster master/api server. @@ -104,6 +123,9 @@ type APIAggregator struct { // provided for easier embedding APIRegistrationInformers informers.SharedInformerFactory + + // Information needed to determine routing for the aggregator + routing ServiceResolver } type completedConfig struct { @@ -128,6 +150,14 @@ func (c *Config) SkipComplete() completedConfig { return completedConfig{c} } +func (r *aggregatorEndpointRouting) ResolveEndpoint(namespace, name string) (*url.URL, error) { + return proxy.ResolveEndpoint(r.services, r.endpoints, namespace, name) +} + +func (r *aggregatorClusterRouting) ResolveEndpoint(namespace, name string) (*url.URL, error) { + return proxy.ResolveCluster(r.services, namespace, name) +} + // New returns a new instance of APIAggregator from the given config. func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.DelegationTarget) (*APIAggregator, error) { genericServer, err := c.Config.GenericConfig.SkipComplete().New(delegationTarget) // completion is done in Complete, no need for a second time @@ -145,6 +175,18 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg ) kubeInformers := kubeinformers.NewSharedInformerFactory(c.CoreAPIServerClient, 5*time.Minute) + var routing ServiceResolver + if c.EnableAggregatorRouting { + routing = &aggregatorEndpointRouting{ + services: kubeInformers.Core().V1().Services().Lister(), + endpoints: kubeInformers.Core().V1().Endpoints().Lister(), + } + } else { + routing = &aggregatorClusterRouting{ + services: kubeInformers.Core().V1().Services().Lister(), + } + } + s := &APIAggregator{ GenericAPIServer: genericServer, delegateHandler: delegationTarget.UnprotectedHandler(), @@ -155,6 +197,7 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg handledGroups: sets.String{}, lister: informerFactory.Apiregistration().InternalVersion().APIServices().Lister(), APIRegistrationInformers: informerFactory, + routing: routing, } apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(apiregistration.GroupName, registry, Scheme, metav1.ParameterCodec, Codecs) @@ -204,11 +247,11 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg // AddAPIService adds an API service. It is not thread-safe, so only call it on one thread at a time please. // It's a slow moving API, so its ok to run the controller on a single thread -func (s *APIAggregator) AddAPIService(apiService *apiregistration.APIService, destinationHost string) { +func (s *APIAggregator) AddAPIService(apiService *apiregistration.APIService) { // if the proxyHandler already exists, it needs to be updated. The aggregation bits do not // since they are wired against listers because they require multiple resources to respond if proxyHandler, exists := s.proxyHandlers[apiService.Name]; exists { - proxyHandler.updateAPIService(apiService, destinationHost) + proxyHandler.updateAPIService(apiService) return } @@ -224,8 +267,9 @@ func (s *APIAggregator) AddAPIService(apiService *apiregistration.APIService, de localDelegate: s.delegateHandler, proxyClientCert: s.proxyClientCert, proxyClientKey: s.proxyClientKey, + routing: s.routing, } - proxyHandler.updateAPIService(apiService, destinationHost) + proxyHandler.updateAPIService(apiService) s.proxyHandlers[apiService.Name] = proxyHandler s.GenericAPIServer.Handler.PostGoRestfulMux.Handle(proxyPath, proxyHandler) s.GenericAPIServer.Handler.PostGoRestfulMux.UnlistedHandlePrefix(proxyPath+"/", proxyHandler) diff --git a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiservice_controller.go b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiservice_controller.go index b8d9e07f842..9eecf58ff17 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiservice_controller.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiservice_controller.go @@ -39,7 +39,7 @@ import ( ) type APIHandlerManager interface { - AddAPIService(apiService *apiregistration.APIService, destinationHost string) + AddAPIService(apiService *apiregistration.APIService) RemoveAPIService(apiServiceName string) } @@ -102,8 +102,7 @@ func (c *APIServiceRegistrationController) sync(key string) error { return nil } - // TODO move the destination host to status so that you can see where its going - c.apiHandlerManager.AddAPIService(apiService, c.getDestinationHost(apiService)) + c.apiHandlerManager.AddAPIService(apiService) return nil } diff --git a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy.go b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy.go index 9820a6281fd..397b0ab528f 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy.go @@ -50,6 +50,9 @@ type proxyHandler struct { proxyClientCert []byte proxyClientKey []byte + // Endpoints based routing to map from cluster IP to routable IP + routing ServiceResolver + handlingInfo atomic.Value } @@ -64,8 +67,10 @@ type proxyHandlingInfo struct { transportBuildingError error // proxyRoundTripper is the re-useable portion of the transport. It does not vary with any request. proxyRoundTripper http.RoundTripper - // destinationHost is the hostname of the backing API server - destinationHost string + // serviceName is the name of the service this handler proxies to + serviceName string + // namespace is the namespace the service lives in + serviceNamespace string } func (r *proxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { @@ -108,7 +113,12 @@ func (r *proxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { // write a new location based on the existing request pointed at the target service location := &url.URL{} location.Scheme = "https" - location.Host = handlingInfo.destinationHost + rloc, err := r.routing.ResolveEndpoint(handlingInfo.serviceNamespace, handlingInfo.serviceName) + if err != nil { + http.Error(w, "missing route", http.StatusInternalServerError) + return + } + location.Host = rloc.Host location.Path = req.URL.Path location.RawQuery = req.URL.Query().Encode() @@ -119,7 +129,7 @@ func (r *proxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { upgrade := false // we need to wrap the roundtripper in another roundtripper which will apply the front proxy headers - proxyRoundTripper, upgrade, err := maybeWrapForConnectionUpgrades(handlingInfo.restConfig, proxyRoundTripper, req) + proxyRoundTripper, upgrade, err = maybeWrapForConnectionUpgrades(handlingInfo.restConfig, proxyRoundTripper, req) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return @@ -175,14 +185,13 @@ func (r *responder) Error(err error) { // these methods provide locked access to fields -func (r *proxyHandler) updateAPIService(apiService *apiregistrationapi.APIService, destinationHost string) { +func (r *proxyHandler) updateAPIService(apiService *apiregistrationapi.APIService) { if apiService.Spec.Service == nil { r.handlingInfo.Store(proxyHandlingInfo{local: true}) return } newInfo := proxyHandlingInfo{ - destinationHost: destinationHost, restConfig: &restclient.Config{ TLSClientConfig: restclient.TLSClientConfig{ Insecure: apiService.Spec.InsecureSkipTLSVerify, @@ -192,6 +201,8 @@ func (r *proxyHandler) updateAPIService(apiService *apiregistrationapi.APIServic CAData: apiService.Spec.CABundle, }, }, + serviceName: apiService.Spec.Service.Name, + serviceNamespace: apiService.Spec.Service.Namespace, } newInfo.proxyRoundTripper, newInfo.transportBuildingError = restclient.TransportFor(newInfo.restConfig) r.handlingInfo.Store(newInfo) diff --git a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy_test.go b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy_test.go index 046e0c9f450..d03a79fa87d 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy_test.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy_test.go @@ -31,6 +31,7 @@ import ( genericapirequest "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/kube-aggregator/pkg/apis/apiregistration" + "net/url" ) type targetHTTPHandler struct { @@ -78,6 +79,17 @@ func (*fakeRequestContextMapper) Update(req *http.Request, context genericapireq return nil } +type mockedRouter struct { + destinationHost string +} + +func (r *mockedRouter) ResolveEndpoint(namespace, name string) (*url.URL, error) { + return &url.URL{ + Scheme: "https", + Host: r.destinationHost, + }, nil +} + func TestProxyHandler(t *testing.T) { target := &targetHTTPHandler{} targetServer := httptest.NewTLSServer(target) @@ -159,15 +171,15 @@ func TestProxyHandler(t *testing.T) { func() { handler := &proxyHandler{ localDelegate: http.NewServeMux(), + routing: &mockedRouter{destinationHost: targetServer.Listener.Addr().String()}, } handler.contextMapper = &fakeRequestContextMapper{user: tc.user} server := httptest.NewServer(handler) defer server.Close() if tc.apiService != nil { - handler.updateAPIService(tc.apiService, tc.apiService.Spec.Service.Name+"."+tc.apiService.Spec.Service.Namespace+".svc") + handler.updateAPIService(tc.apiService) curr := handler.handlingInfo.Load().(proxyHandlingInfo) - curr.destinationHost = targetServer.Listener.Addr().String() handler.handlingInfo.Store(curr) }