Change to aggregator so it calls a user apiservice via its pod IP.

proxy_handler now uses the endpoint router to map the cluster IP to
appropriate endpoint (Pod) IP for the given resource.
Added code to allow aggregator routing to be optional.
Updated bazel build.
Fixes to cover JLiggit comments.
Added util ResourceLocation method based on Listers.
Fixed issues from verification steps.
Updated to add an interface to obfuscate some of the routing logic.
Collapsed cluster IP resolution in to the aggregator routing
implementation.
Added 2 simple unit tests for ResolveEndpoint
This commit is contained in:
Walter Fender 2017-04-20 16:23:00 -07:00
parent 2ada6e62d5
commit ad8a83a7c1
13 changed files with 343 additions and 19 deletions

View File

@ -810,6 +810,7 @@ function start-kube-apiserver {
params+=" --tls-cert-file=/etc/srv/kubernetes/server.cert"
params+=" --tls-private-key-file=/etc/srv/kubernetes/server.key"
params+=" --token-auth-file=/etc/srv/kubernetes/known_tokens.csv"
params+=" --enable-aggregator-routing=true"
if [[ -n "${KUBE_PASSWORD:-}" && -n "${KUBE_USER:-}" ]]; then
params+=" --basic-auth-file=/etc/srv/kubernetes/basic_auth.csv"
fi

View File

@ -1015,6 +1015,7 @@ function start-kube-apiserver {
params+=" --secure-port=443"
params+=" --tls-cert-file=${APISERVER_SERVER_CERT_PATH}"
params+=" --tls-private-key-file=${APISERVER_SERVER_KEY_PATH}"
params+=" --enable-aggregator-routing=true"
if [[ -e "${APISERVER_CLIENT_CERT_PATH}" ]] && [[ -e "${APISERVER_CLIENT_KEY_PATH}" ]]; then
params+=" --kubelet-client-certificate=${APISERVER_CLIENT_CERT_PATH}"
params+=" --kubelet-client-key=${APISERVER_CLIENT_KEY_PATH}"

View File

@ -79,11 +79,13 @@ func createAggregatorConfig(kubeAPIServerConfig genericapiserver.Config, command
return nil, err
}
}
aggregatorConfig := &aggregatorapiserver.Config{
GenericConfig: &genericConfig,
CoreAPIServerClient: client,
ProxyClientCert: certBytes,
ProxyClientKey: keyBytes,
GenericConfig: &genericConfig,
CoreAPIServerClient: client,
ProxyClientCert: certBytes,
ProxyClientKey: keyBytes,
EnableAggregatorRouting: commandOptions.EnableAggregatorRouting,
}
return aggregatorConfig, nil

View File

@ -68,6 +68,8 @@ type ServerRunOptions struct {
ProxyClientCertFile string
ProxyClientKeyFile string
EnableAggregatorRouting bool
}
// NewServerRunOptions creates a new ServerRunOptions object with default parameters
@ -217,4 +219,7 @@ func (s *ServerRunOptions) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&s.ProxyClientKeyFile, "proxy-client-key-file", s.ProxyClientKeyFile,
"client certificate key used to prove the identity of the aggragator or kube-apiserver when it proxies requests to a user api-server")
fs.BoolVar(&s.EnableAggregatorRouting, "enable-aggregator-routing", s.EnableAggregatorRouting,
"Turns on aggregator routing requests to endoints IP rather than cluster IP.")
}

View File

@ -190,6 +190,7 @@ dump-logs-on-failure
duration-sec
e2e-output-dir
e2e-verify-service-account
enable-aggregator-routing
enable-controller-attach-detach
enable-custom-metrics
enable-debugging-handlers

View File

@ -12,11 +12,18 @@ go_test(
name = "go_default_test",
srcs = [
"dial_test.go",
"proxy_test.go",
"transport_test.go",
],
library = ":go_default_library",
tags = ["automanaged"],
deps = ["//vendor/k8s.io/apimachinery/pkg/util/net:go_default_library"],
deps = [
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/net:go_default_library",
"//vendor/k8s.io/client-go/listers/core/v1:go_default_library",
"//vendor/k8s.io/client-go/pkg/api/v1:go_default_library",
],
)
go_library(
@ -24,6 +31,7 @@ go_library(
srcs = [
"dial.go",
"doc.go",
"proxy.go",
"transport.go",
],
tags = ["automanaged"],
@ -31,8 +39,11 @@ go_library(
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/golang.org/x/net/html:go_default_library",
"//vendor/golang.org/x/net/html/atom:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/net:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//vendor/k8s.io/apimachinery/third_party/forked/golang/netutil:go_default_library",
"//vendor/k8s.io/client-go/listers/core/v1:go_default_library",
"//vendor/k8s.io/client-go/pkg/api/v1:go_default_library",
],
)

View File

@ -0,0 +1,117 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package proxy
import (
"fmt"
"math/rand"
"net"
"net/url"
"strconv"
"k8s.io/apimachinery/pkg/api/errors"
utilnet "k8s.io/apimachinery/pkg/util/net"
listersv1 "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/pkg/api/v1"
)
// ResourceLocation returns a URL to which one can send traffic for the specified service.
func ResolveEndpoint(services listersv1.ServiceLister, endpoints listersv1.EndpointsLister, namespace, id string) (*url.URL, error) {
// Allow ID as "svcname", "svcname:port", or "scheme:svcname:port".
svcScheme, svcName, portStr, valid := utilnet.SplitSchemeNamePort(id)
if !valid {
return nil, errors.NewBadRequest(fmt.Sprintf("invalid service request %q", id))
}
// If a port *number* was specified, find the corresponding service port name
if portNum, err := strconv.ParseInt(portStr, 10, 64); err == nil {
svc, err := services.Services(namespace).Get(svcName)
if err != nil {
return nil, err
}
found := false
for _, svcPort := range svc.Spec.Ports {
if int64(svcPort.Port) == portNum {
// use the declared port's name
portStr = svcPort.Name
found = true
break
}
}
if !found {
return nil, errors.NewServiceUnavailable(fmt.Sprintf("no service port %d found for service %q", portNum, svcName))
}
}
eps, err := endpoints.Endpoints(namespace).Get(svcName)
if err != nil {
return nil, err
}
if len(eps.Subsets) == 0 {
return nil, errors.NewServiceUnavailable(fmt.Sprintf("no endpoints available for service %q", svcName))
}
// Pick a random Subset to start searching from.
ssSeed := rand.Intn(len(eps.Subsets))
// Find a Subset that has the port.
for ssi := 0; ssi < len(eps.Subsets); ssi++ {
ss := &eps.Subsets[(ssSeed+ssi)%len(eps.Subsets)]
if len(ss.Addresses) == 0 {
continue
}
for i := range ss.Ports {
if ss.Ports[i].Name == portStr {
// Pick a random address.
ip := ss.Addresses[rand.Intn(len(ss.Addresses))].IP
port := int(ss.Ports[i].Port)
return &url.URL{
Scheme: svcScheme,
Host: net.JoinHostPort(ip, strconv.Itoa(port)),
}, nil
}
}
}
return nil, errors.NewServiceUnavailable(fmt.Sprintf("no endpoints available for service %q", id))
}
func ResolveCluster(services listersv1.ServiceLister, namespace, id string) (*url.URL, error) {
if len(id) == 0 {
return &url.URL{Scheme: "https"}, nil
}
destinationHost := id + "." + namespace + ".svc"
service, err := services.Services(namespace).Get(id)
if err != nil {
return &url.URL{
Scheme: "https",
Host: destinationHost,
}, nil
}
switch {
// use IP from a clusterIP for these service types
case service.Spec.Type == v1.ServiceTypeClusterIP,
service.Spec.Type == v1.ServiceTypeNodePort,
service.Spec.Type == v1.ServiceTypeLoadBalancer:
return &url.URL{
Scheme: "https",
Host: service.Spec.ClusterIP,
}, nil
}
return &url.URL{
Scheme: "https",
Host: destinationHost,
}, nil
}

View File

@ -0,0 +1,119 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package proxy
import (
"testing"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/labels"
listersv1 "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/pkg/api/v1"
"net/http"
)
type serviceListerMock struct {
services []*v1.Service
err error
}
func (s *serviceListerMock) List(selector labels.Selector) (ret []*v1.Service, err error) {
return s.services, err
}
func (s *serviceListerMock) Services(namespace string) listersv1.ServiceNamespaceLister {
return nil
}
func (s *serviceListerMock) GetPodServices(pod *v1.Pod) ([]*v1.Service, error) {
return nil, nil
}
type endpointsListerMock struct {
endpoints []*v1.Endpoints
err error
}
func (e *endpointsListerMock) List(selector labels.Selector) (ret []*v1.Endpoints, err error) {
return e.endpoints, e.err
}
func (e *endpointsListerMock) Endpoints(namespace string) listersv1.EndpointsNamespaceLister {
return endpointsNamespaceListMock{
endpoints: e.endpoints,
err: e.err,
}
}
type endpointsNamespaceListMock struct {
endpoints []*v1.Endpoints
err error
}
func (e endpointsNamespaceListMock) List(selector labels.Selector) (ret []*v1.Endpoints, err error) {
return e.endpoints, e.err
}
func (e endpointsNamespaceListMock) Get(name string) (*v1.Endpoints, error) {
if len(e.endpoints) == 0 {
return nil, e.err
}
return e.endpoints[0], e.err
}
func TestNoEndpointNoPort(t *testing.T) {
services := &serviceListerMock{}
endpoints := &endpointsListerMock{err: errors.NewNotFound(v1.Resource("endpoints"), "dummy-svc")}
url, err := ResolveEndpoint(services, endpoints, "dummy-ns", "dummy-svc")
if url != nil {
t.Error("Should not have gotten back an URL")
}
if err == nil {
t.Error("Should have gotten an error")
}
se, ok := err.(*errors.StatusError)
if !ok {
t.Error("Should have gotten a status error not %T", err)
}
if se.ErrStatus.Code != http.StatusNotFound {
t.Error("Should have gotten a http 404 not %d", se.ErrStatus.Code)
}
}
func TestOneEndpointNoPort(t *testing.T) {
services := &serviceListerMock{}
address := v1.EndpointAddress{Hostname: "dummy-host", IP: "127.0.0.1"}
addresses := []v1.EndpointAddress{address}
port := v1.EndpointPort{Port: 443}
ports := []v1.EndpointPort{port}
endpoint := v1.EndpointSubset{Addresses: addresses, Ports: ports}
subsets := []v1.EndpointSubset{endpoint}
one := &v1.Endpoints{Subsets: subsets}
slice := []*v1.Endpoints{one}
endpoints := &endpointsListerMock{endpoints: slice}
url, err := ResolveEndpoint(services, endpoints, "dummy-ns", "dummy-svc")
if err != nil {
t.Errorf("Should not have gotten error %v", err)
}
if url == nil {
t.Error("Should not have gotten back an URL")
}
if url.Host != "127.0.0.1:443" {
t.Error("Should have gotten back a host of dummy-host not %s", url.Host)
}
}

View File

@ -65,6 +65,7 @@ go_library(
"//vendor/k8s.io/apiserver/pkg/registry/rest:go_default_library",
"//vendor/k8s.io/apiserver/pkg/server:go_default_library",
"//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//vendor/k8s.io/apiserver/pkg/util/proxy:go_default_library",
"//vendor/k8s.io/client-go/informers:go_default_library",
"//vendor/k8s.io/client-go/informers/core/v1:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library",

View File

@ -18,6 +18,7 @@ package apiserver
import (
"net/http"
"net/url"
"time"
"k8s.io/apimachinery/pkg/apimachinery/announced"
@ -30,8 +31,10 @@ import (
genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/registry/rest"
genericapiserver "k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/util/proxy"
kubeinformers "k8s.io/client-go/informers"
kubeclientset "k8s.io/client-go/kubernetes"
listersv1 "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/pkg/version"
"k8s.io/kube-aggregator/pkg/apis/apiregistration"
@ -70,6 +73,19 @@ func init() {
// legacyAPIServiceName is the fixed name of the only non-groupified API version
const legacyAPIServiceName = "v1."
type ServiceResolver interface {
ResolveEndpoint(namespace, name string) (*url.URL, error)
}
type aggregatorEndpointRouting struct {
services listersv1.ServiceLister
endpoints listersv1.EndpointsLister
}
type aggregatorClusterRouting struct {
services listersv1.ServiceLister
}
type Config struct {
GenericConfig *genericapiserver.Config
CoreAPIServerClient kubeclientset.Interface
@ -78,6 +94,9 @@ type Config struct {
// this to confirm the proxy's identity
ProxyClientCert []byte
ProxyClientKey []byte
// Indicates if the Aggregator should send to the cluster IP (false) or route to the endpoints IP (true)
EnableAggregatorRouting bool
}
// APIAggregator contains state for a Kubernetes cluster master/api server.
@ -104,6 +123,9 @@ type APIAggregator struct {
// provided for easier embedding
APIRegistrationInformers informers.SharedInformerFactory
// Information needed to determine routing for the aggregator
routing ServiceResolver
}
type completedConfig struct {
@ -128,6 +150,14 @@ func (c *Config) SkipComplete() completedConfig {
return completedConfig{c}
}
func (r *aggregatorEndpointRouting) ResolveEndpoint(namespace, name string) (*url.URL, error) {
return proxy.ResolveEndpoint(r.services, r.endpoints, namespace, name)
}
func (r *aggregatorClusterRouting) ResolveEndpoint(namespace, name string) (*url.URL, error) {
return proxy.ResolveCluster(r.services, namespace, name)
}
// New returns a new instance of APIAggregator from the given config.
func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.DelegationTarget) (*APIAggregator, error) {
genericServer, err := c.Config.GenericConfig.SkipComplete().New(delegationTarget) // completion is done in Complete, no need for a second time
@ -145,6 +175,18 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg
)
kubeInformers := kubeinformers.NewSharedInformerFactory(c.CoreAPIServerClient, 5*time.Minute)
var routing ServiceResolver
if c.EnableAggregatorRouting {
routing = &aggregatorEndpointRouting{
services: kubeInformers.Core().V1().Services().Lister(),
endpoints: kubeInformers.Core().V1().Endpoints().Lister(),
}
} else {
routing = &aggregatorClusterRouting{
services: kubeInformers.Core().V1().Services().Lister(),
}
}
s := &APIAggregator{
GenericAPIServer: genericServer,
delegateHandler: delegationTarget.UnprotectedHandler(),
@ -155,6 +197,7 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg
handledGroups: sets.String{},
lister: informerFactory.Apiregistration().InternalVersion().APIServices().Lister(),
APIRegistrationInformers: informerFactory,
routing: routing,
}
apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(apiregistration.GroupName, registry, Scheme, metav1.ParameterCodec, Codecs)
@ -204,11 +247,11 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg
// AddAPIService adds an API service. It is not thread-safe, so only call it on one thread at a time please.
// It's a slow moving API, so its ok to run the controller on a single thread
func (s *APIAggregator) AddAPIService(apiService *apiregistration.APIService, destinationHost string) {
func (s *APIAggregator) AddAPIService(apiService *apiregistration.APIService) {
// if the proxyHandler already exists, it needs to be updated. The aggregation bits do not
// since they are wired against listers because they require multiple resources to respond
if proxyHandler, exists := s.proxyHandlers[apiService.Name]; exists {
proxyHandler.updateAPIService(apiService, destinationHost)
proxyHandler.updateAPIService(apiService)
return
}
@ -224,8 +267,9 @@ func (s *APIAggregator) AddAPIService(apiService *apiregistration.APIService, de
localDelegate: s.delegateHandler,
proxyClientCert: s.proxyClientCert,
proxyClientKey: s.proxyClientKey,
routing: s.routing,
}
proxyHandler.updateAPIService(apiService, destinationHost)
proxyHandler.updateAPIService(apiService)
s.proxyHandlers[apiService.Name] = proxyHandler
s.GenericAPIServer.Handler.PostGoRestfulMux.Handle(proxyPath, proxyHandler)
s.GenericAPIServer.Handler.PostGoRestfulMux.UnlistedHandlePrefix(proxyPath+"/", proxyHandler)

View File

@ -39,7 +39,7 @@ import (
)
type APIHandlerManager interface {
AddAPIService(apiService *apiregistration.APIService, destinationHost string)
AddAPIService(apiService *apiregistration.APIService)
RemoveAPIService(apiServiceName string)
}
@ -102,8 +102,7 @@ func (c *APIServiceRegistrationController) sync(key string) error {
return nil
}
// TODO move the destination host to status so that you can see where its going
c.apiHandlerManager.AddAPIService(apiService, c.getDestinationHost(apiService))
c.apiHandlerManager.AddAPIService(apiService)
return nil
}

View File

@ -50,6 +50,9 @@ type proxyHandler struct {
proxyClientCert []byte
proxyClientKey []byte
// Endpoints based routing to map from cluster IP to routable IP
routing ServiceResolver
handlingInfo atomic.Value
}
@ -64,8 +67,10 @@ type proxyHandlingInfo struct {
transportBuildingError error
// proxyRoundTripper is the re-useable portion of the transport. It does not vary with any request.
proxyRoundTripper http.RoundTripper
// destinationHost is the hostname of the backing API server
destinationHost string
// serviceName is the name of the service this handler proxies to
serviceName string
// namespace is the namespace the service lives in
serviceNamespace string
}
func (r *proxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
@ -108,7 +113,12 @@ func (r *proxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
// write a new location based on the existing request pointed at the target service
location := &url.URL{}
location.Scheme = "https"
location.Host = handlingInfo.destinationHost
rloc, err := r.routing.ResolveEndpoint(handlingInfo.serviceNamespace, handlingInfo.serviceName)
if err != nil {
http.Error(w, "missing route", http.StatusInternalServerError)
return
}
location.Host = rloc.Host
location.Path = req.URL.Path
location.RawQuery = req.URL.Query().Encode()
@ -119,7 +129,7 @@ func (r *proxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
upgrade := false
// we need to wrap the roundtripper in another roundtripper which will apply the front proxy headers
proxyRoundTripper, upgrade, err := maybeWrapForConnectionUpgrades(handlingInfo.restConfig, proxyRoundTripper, req)
proxyRoundTripper, upgrade, err = maybeWrapForConnectionUpgrades(handlingInfo.restConfig, proxyRoundTripper, req)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
@ -175,14 +185,13 @@ func (r *responder) Error(err error) {
// these methods provide locked access to fields
func (r *proxyHandler) updateAPIService(apiService *apiregistrationapi.APIService, destinationHost string) {
func (r *proxyHandler) updateAPIService(apiService *apiregistrationapi.APIService) {
if apiService.Spec.Service == nil {
r.handlingInfo.Store(proxyHandlingInfo{local: true})
return
}
newInfo := proxyHandlingInfo{
destinationHost: destinationHost,
restConfig: &restclient.Config{
TLSClientConfig: restclient.TLSClientConfig{
Insecure: apiService.Spec.InsecureSkipTLSVerify,
@ -192,6 +201,8 @@ func (r *proxyHandler) updateAPIService(apiService *apiregistrationapi.APIServic
CAData: apiService.Spec.CABundle,
},
},
serviceName: apiService.Spec.Service.Name,
serviceNamespace: apiService.Spec.Service.Namespace,
}
newInfo.proxyRoundTripper, newInfo.transportBuildingError = restclient.TransportFor(newInfo.restConfig)
r.handlingInfo.Store(newInfo)

View File

@ -31,6 +31,7 @@ import (
genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/kube-aggregator/pkg/apis/apiregistration"
"net/url"
)
type targetHTTPHandler struct {
@ -78,6 +79,17 @@ func (*fakeRequestContextMapper) Update(req *http.Request, context genericapireq
return nil
}
type mockedRouter struct {
destinationHost string
}
func (r *mockedRouter) ResolveEndpoint(namespace, name string) (*url.URL, error) {
return &url.URL{
Scheme: "https",
Host: r.destinationHost,
}, nil
}
func TestProxyHandler(t *testing.T) {
target := &targetHTTPHandler{}
targetServer := httptest.NewTLSServer(target)
@ -159,15 +171,15 @@ func TestProxyHandler(t *testing.T) {
func() {
handler := &proxyHandler{
localDelegate: http.NewServeMux(),
routing: &mockedRouter{destinationHost: targetServer.Listener.Addr().String()},
}
handler.contextMapper = &fakeRequestContextMapper{user: tc.user}
server := httptest.NewServer(handler)
defer server.Close()
if tc.apiService != nil {
handler.updateAPIService(tc.apiService, tc.apiService.Spec.Service.Name+"."+tc.apiService.Spec.Service.Namespace+".svc")
handler.updateAPIService(tc.apiService)
curr := handler.handlingInfo.Load().(proxyHandlingInfo)
curr.destinationHost = targetServer.Listener.Addr().String()
handler.handlingInfo.Store(curr)
}