Merge pull request #44742 from cheftako/aggregate

Automatic merge from submit-queue (batch tested with PRs 46302, 44597, 44742, 46554)

Change to aggregator so it calls a user apiservice via its pod IP.

proxy_handler now does a sideways call to lookup the pod IPs for aservice.
It will then pick a random pod IP to forward the use apiserver request to.

**What this PR does / why we need it**: It allows the aggregator to work without setting up the full network stack on the kube master (i.e. with kube-dns or kube-proxy)

**Which issue this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close that issue when PR gets merged)*: fixes #44619

**Special notes for your reviewer**:

**Release note**:

```release-note
```
This commit is contained in:
Kubernetes Submit Queue 2017-05-26 20:39:56 -07:00 committed by GitHub
commit fdb4fa689e
13 changed files with 343 additions and 19 deletions

View File

@ -810,6 +810,7 @@ function start-kube-apiserver {
params+=" --tls-cert-file=/etc/srv/kubernetes/server.cert"
params+=" --tls-private-key-file=/etc/srv/kubernetes/server.key"
params+=" --token-auth-file=/etc/srv/kubernetes/known_tokens.csv"
params+=" --enable-aggregator-routing=true"
if [[ -n "${KUBE_PASSWORD:-}" && -n "${KUBE_USER:-}" ]]; then
params+=" --basic-auth-file=/etc/srv/kubernetes/basic_auth.csv"
fi

View File

@ -1015,6 +1015,7 @@ function start-kube-apiserver {
params+=" --secure-port=443"
params+=" --tls-cert-file=${APISERVER_SERVER_CERT_PATH}"
params+=" --tls-private-key-file=${APISERVER_SERVER_KEY_PATH}"
params+=" --enable-aggregator-routing=true"
if [[ -e "${APISERVER_CLIENT_CERT_PATH}" ]] && [[ -e "${APISERVER_CLIENT_KEY_PATH}" ]]; then
params+=" --kubelet-client-certificate=${APISERVER_CLIENT_CERT_PATH}"
params+=" --kubelet-client-key=${APISERVER_CLIENT_KEY_PATH}"

View File

@ -79,11 +79,13 @@ func createAggregatorConfig(kubeAPIServerConfig genericapiserver.Config, command
return nil, err
}
}
aggregatorConfig := &aggregatorapiserver.Config{
GenericConfig: &genericConfig,
CoreAPIServerClient: client,
ProxyClientCert: certBytes,
ProxyClientKey: keyBytes,
GenericConfig: &genericConfig,
CoreAPIServerClient: client,
ProxyClientCert: certBytes,
ProxyClientKey: keyBytes,
EnableAggregatorRouting: commandOptions.EnableAggregatorRouting,
}
return aggregatorConfig, nil

View File

@ -68,6 +68,8 @@ type ServerRunOptions struct {
ProxyClientCertFile string
ProxyClientKeyFile string
EnableAggregatorRouting bool
}
// NewServerRunOptions creates a new ServerRunOptions object with default parameters
@ -217,4 +219,7 @@ func (s *ServerRunOptions) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&s.ProxyClientKeyFile, "proxy-client-key-file", s.ProxyClientKeyFile,
"client certificate key used to prove the identity of the aggragator or kube-apiserver when it proxies requests to a user api-server")
fs.BoolVar(&s.EnableAggregatorRouting, "enable-aggregator-routing", s.EnableAggregatorRouting,
"Turns on aggregator routing requests to endoints IP rather than cluster IP.")
}

View File

@ -190,6 +190,7 @@ dump-logs-on-failure
duration-sec
e2e-output-dir
e2e-verify-service-account
enable-aggregator-routing
enable-controller-attach-detach
enable-custom-metrics
enable-debugging-handlers

View File

@ -12,11 +12,18 @@ go_test(
name = "go_default_test",
srcs = [
"dial_test.go",
"proxy_test.go",
"transport_test.go",
],
library = ":go_default_library",
tags = ["automanaged"],
deps = ["//vendor/k8s.io/apimachinery/pkg/util/net:go_default_library"],
deps = [
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/net:go_default_library",
"//vendor/k8s.io/client-go/listers/core/v1:go_default_library",
"//vendor/k8s.io/client-go/pkg/api/v1:go_default_library",
],
)
go_library(
@ -24,6 +31,7 @@ go_library(
srcs = [
"dial.go",
"doc.go",
"proxy.go",
"transport.go",
],
tags = ["automanaged"],
@ -31,8 +39,11 @@ go_library(
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/golang.org/x/net/html:go_default_library",
"//vendor/golang.org/x/net/html/atom:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/net:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//vendor/k8s.io/apimachinery/third_party/forked/golang/netutil:go_default_library",
"//vendor/k8s.io/client-go/listers/core/v1:go_default_library",
"//vendor/k8s.io/client-go/pkg/api/v1:go_default_library",
],
)

View File

@ -0,0 +1,117 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package proxy
import (
"fmt"
"math/rand"
"net"
"net/url"
"strconv"
"k8s.io/apimachinery/pkg/api/errors"
utilnet "k8s.io/apimachinery/pkg/util/net"
listersv1 "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/pkg/api/v1"
)
// ResourceLocation returns a URL to which one can send traffic for the specified service.
func ResolveEndpoint(services listersv1.ServiceLister, endpoints listersv1.EndpointsLister, namespace, id string) (*url.URL, error) {
// Allow ID as "svcname", "svcname:port", or "scheme:svcname:port".
svcScheme, svcName, portStr, valid := utilnet.SplitSchemeNamePort(id)
if !valid {
return nil, errors.NewBadRequest(fmt.Sprintf("invalid service request %q", id))
}
// If a port *number* was specified, find the corresponding service port name
if portNum, err := strconv.ParseInt(portStr, 10, 64); err == nil {
svc, err := services.Services(namespace).Get(svcName)
if err != nil {
return nil, err
}
found := false
for _, svcPort := range svc.Spec.Ports {
if int64(svcPort.Port) == portNum {
// use the declared port's name
portStr = svcPort.Name
found = true
break
}
}
if !found {
return nil, errors.NewServiceUnavailable(fmt.Sprintf("no service port %d found for service %q", portNum, svcName))
}
}
eps, err := endpoints.Endpoints(namespace).Get(svcName)
if err != nil {
return nil, err
}
if len(eps.Subsets) == 0 {
return nil, errors.NewServiceUnavailable(fmt.Sprintf("no endpoints available for service %q", svcName))
}
// Pick a random Subset to start searching from.
ssSeed := rand.Intn(len(eps.Subsets))
// Find a Subset that has the port.
for ssi := 0; ssi < len(eps.Subsets); ssi++ {
ss := &eps.Subsets[(ssSeed+ssi)%len(eps.Subsets)]
if len(ss.Addresses) == 0 {
continue
}
for i := range ss.Ports {
if ss.Ports[i].Name == portStr {
// Pick a random address.
ip := ss.Addresses[rand.Intn(len(ss.Addresses))].IP
port := int(ss.Ports[i].Port)
return &url.URL{
Scheme: svcScheme,
Host: net.JoinHostPort(ip, strconv.Itoa(port)),
}, nil
}
}
}
return nil, errors.NewServiceUnavailable(fmt.Sprintf("no endpoints available for service %q", id))
}
func ResolveCluster(services listersv1.ServiceLister, namespace, id string) (*url.URL, error) {
if len(id) == 0 {
return &url.URL{Scheme: "https"}, nil
}
destinationHost := id + "." + namespace + ".svc"
service, err := services.Services(namespace).Get(id)
if err != nil {
return &url.URL{
Scheme: "https",
Host: destinationHost,
}, nil
}
switch {
// use IP from a clusterIP for these service types
case service.Spec.Type == v1.ServiceTypeClusterIP,
service.Spec.Type == v1.ServiceTypeNodePort,
service.Spec.Type == v1.ServiceTypeLoadBalancer:
return &url.URL{
Scheme: "https",
Host: service.Spec.ClusterIP,
}, nil
}
return &url.URL{
Scheme: "https",
Host: destinationHost,
}, nil
}

View File

@ -0,0 +1,119 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package proxy
import (
"testing"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/labels"
listersv1 "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/pkg/api/v1"
"net/http"
)
type serviceListerMock struct {
services []*v1.Service
err error
}
func (s *serviceListerMock) List(selector labels.Selector) (ret []*v1.Service, err error) {
return s.services, err
}
func (s *serviceListerMock) Services(namespace string) listersv1.ServiceNamespaceLister {
return nil
}
func (s *serviceListerMock) GetPodServices(pod *v1.Pod) ([]*v1.Service, error) {
return nil, nil
}
type endpointsListerMock struct {
endpoints []*v1.Endpoints
err error
}
func (e *endpointsListerMock) List(selector labels.Selector) (ret []*v1.Endpoints, err error) {
return e.endpoints, e.err
}
func (e *endpointsListerMock) Endpoints(namespace string) listersv1.EndpointsNamespaceLister {
return endpointsNamespaceListMock{
endpoints: e.endpoints,
err: e.err,
}
}
type endpointsNamespaceListMock struct {
endpoints []*v1.Endpoints
err error
}
func (e endpointsNamespaceListMock) List(selector labels.Selector) (ret []*v1.Endpoints, err error) {
return e.endpoints, e.err
}
func (e endpointsNamespaceListMock) Get(name string) (*v1.Endpoints, error) {
if len(e.endpoints) == 0 {
return nil, e.err
}
return e.endpoints[0], e.err
}
func TestNoEndpointNoPort(t *testing.T) {
services := &serviceListerMock{}
endpoints := &endpointsListerMock{err: errors.NewNotFound(v1.Resource("endpoints"), "dummy-svc")}
url, err := ResolveEndpoint(services, endpoints, "dummy-ns", "dummy-svc")
if url != nil {
t.Error("Should not have gotten back an URL")
}
if err == nil {
t.Error("Should have gotten an error")
}
se, ok := err.(*errors.StatusError)
if !ok {
t.Error("Should have gotten a status error not %T", err)
}
if se.ErrStatus.Code != http.StatusNotFound {
t.Error("Should have gotten a http 404 not %d", se.ErrStatus.Code)
}
}
func TestOneEndpointNoPort(t *testing.T) {
services := &serviceListerMock{}
address := v1.EndpointAddress{Hostname: "dummy-host", IP: "127.0.0.1"}
addresses := []v1.EndpointAddress{address}
port := v1.EndpointPort{Port: 443}
ports := []v1.EndpointPort{port}
endpoint := v1.EndpointSubset{Addresses: addresses, Ports: ports}
subsets := []v1.EndpointSubset{endpoint}
one := &v1.Endpoints{Subsets: subsets}
slice := []*v1.Endpoints{one}
endpoints := &endpointsListerMock{endpoints: slice}
url, err := ResolveEndpoint(services, endpoints, "dummy-ns", "dummy-svc")
if err != nil {
t.Errorf("Should not have gotten error %v", err)
}
if url == nil {
t.Error("Should not have gotten back an URL")
}
if url.Host != "127.0.0.1:443" {
t.Error("Should have gotten back a host of dummy-host not %s", url.Host)
}
}

View File

@ -65,6 +65,7 @@ go_library(
"//vendor/k8s.io/apiserver/pkg/registry/rest:go_default_library",
"//vendor/k8s.io/apiserver/pkg/server:go_default_library",
"//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//vendor/k8s.io/apiserver/pkg/util/proxy:go_default_library",
"//vendor/k8s.io/client-go/informers:go_default_library",
"//vendor/k8s.io/client-go/informers/core/v1:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library",

View File

@ -18,6 +18,7 @@ package apiserver
import (
"net/http"
"net/url"
"time"
"k8s.io/apimachinery/pkg/apimachinery/announced"
@ -30,8 +31,10 @@ import (
genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/registry/rest"
genericapiserver "k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/util/proxy"
kubeinformers "k8s.io/client-go/informers"
kubeclientset "k8s.io/client-go/kubernetes"
listersv1 "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/pkg/version"
"k8s.io/kube-aggregator/pkg/apis/apiregistration"
@ -70,6 +73,19 @@ func init() {
// legacyAPIServiceName is the fixed name of the only non-groupified API version
const legacyAPIServiceName = "v1."
type ServiceResolver interface {
ResolveEndpoint(namespace, name string) (*url.URL, error)
}
type aggregatorEndpointRouting struct {
services listersv1.ServiceLister
endpoints listersv1.EndpointsLister
}
type aggregatorClusterRouting struct {
services listersv1.ServiceLister
}
type Config struct {
GenericConfig *genericapiserver.Config
CoreAPIServerClient kubeclientset.Interface
@ -78,6 +94,9 @@ type Config struct {
// this to confirm the proxy's identity
ProxyClientCert []byte
ProxyClientKey []byte
// Indicates if the Aggregator should send to the cluster IP (false) or route to the endpoints IP (true)
EnableAggregatorRouting bool
}
// APIAggregator contains state for a Kubernetes cluster master/api server.
@ -104,6 +123,9 @@ type APIAggregator struct {
// provided for easier embedding
APIRegistrationInformers informers.SharedInformerFactory
// Information needed to determine routing for the aggregator
routing ServiceResolver
}
type completedConfig struct {
@ -128,6 +150,14 @@ func (c *Config) SkipComplete() completedConfig {
return completedConfig{c}
}
func (r *aggregatorEndpointRouting) ResolveEndpoint(namespace, name string) (*url.URL, error) {
return proxy.ResolveEndpoint(r.services, r.endpoints, namespace, name)
}
func (r *aggregatorClusterRouting) ResolveEndpoint(namespace, name string) (*url.URL, error) {
return proxy.ResolveCluster(r.services, namespace, name)
}
// New returns a new instance of APIAggregator from the given config.
func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.DelegationTarget) (*APIAggregator, error) {
genericServer, err := c.Config.GenericConfig.SkipComplete().New("kube-aggregator", delegationTarget) // completion is done in Complete, no need for a second time
@ -145,6 +175,18 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg
)
kubeInformers := kubeinformers.NewSharedInformerFactory(c.CoreAPIServerClient, 5*time.Minute)
var routing ServiceResolver
if c.EnableAggregatorRouting {
routing = &aggregatorEndpointRouting{
services: kubeInformers.Core().V1().Services().Lister(),
endpoints: kubeInformers.Core().V1().Endpoints().Lister(),
}
} else {
routing = &aggregatorClusterRouting{
services: kubeInformers.Core().V1().Services().Lister(),
}
}
s := &APIAggregator{
GenericAPIServer: genericServer,
delegateHandler: delegationTarget.UnprotectedHandler(),
@ -155,6 +197,7 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg
handledGroups: sets.String{},
lister: informerFactory.Apiregistration().InternalVersion().APIServices().Lister(),
APIRegistrationInformers: informerFactory,
routing: routing,
}
apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(apiregistration.GroupName, registry, Scheme, metav1.ParameterCodec, Codecs)
@ -204,11 +247,11 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg
// AddAPIService adds an API service. It is not thread-safe, so only call it on one thread at a time please.
// It's a slow moving API, so its ok to run the controller on a single thread
func (s *APIAggregator) AddAPIService(apiService *apiregistration.APIService, destinationHost string) {
func (s *APIAggregator) AddAPIService(apiService *apiregistration.APIService) {
// if the proxyHandler already exists, it needs to be updated. The aggregation bits do not
// since they are wired against listers because they require multiple resources to respond
if proxyHandler, exists := s.proxyHandlers[apiService.Name]; exists {
proxyHandler.updateAPIService(apiService, destinationHost)
proxyHandler.updateAPIService(apiService)
return
}
@ -224,8 +267,9 @@ func (s *APIAggregator) AddAPIService(apiService *apiregistration.APIService, de
localDelegate: s.delegateHandler,
proxyClientCert: s.proxyClientCert,
proxyClientKey: s.proxyClientKey,
routing: s.routing,
}
proxyHandler.updateAPIService(apiService, destinationHost)
proxyHandler.updateAPIService(apiService)
s.proxyHandlers[apiService.Name] = proxyHandler
s.GenericAPIServer.Handler.NonGoRestfulMux.Handle(proxyPath, proxyHandler)
s.GenericAPIServer.Handler.NonGoRestfulMux.UnlistedHandlePrefix(proxyPath+"/", proxyHandler)

View File

@ -39,7 +39,7 @@ import (
)
type APIHandlerManager interface {
AddAPIService(apiService *apiregistration.APIService, destinationHost string)
AddAPIService(apiService *apiregistration.APIService)
RemoveAPIService(apiServiceName string)
}
@ -102,8 +102,7 @@ func (c *APIServiceRegistrationController) sync(key string) error {
return nil
}
// TODO move the destination host to status so that you can see where its going
c.apiHandlerManager.AddAPIService(apiService, c.getDestinationHost(apiService))
c.apiHandlerManager.AddAPIService(apiService)
return nil
}

View File

@ -50,6 +50,9 @@ type proxyHandler struct {
proxyClientCert []byte
proxyClientKey []byte
// Endpoints based routing to map from cluster IP to routable IP
routing ServiceResolver
handlingInfo atomic.Value
}
@ -64,8 +67,10 @@ type proxyHandlingInfo struct {
transportBuildingError error
// proxyRoundTripper is the re-useable portion of the transport. It does not vary with any request.
proxyRoundTripper http.RoundTripper
// destinationHost is the hostname of the backing API server
destinationHost string
// serviceName is the name of the service this handler proxies to
serviceName string
// namespace is the namespace the service lives in
serviceNamespace string
}
func (r *proxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
@ -108,7 +113,12 @@ func (r *proxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
// write a new location based on the existing request pointed at the target service
location := &url.URL{}
location.Scheme = "https"
location.Host = handlingInfo.destinationHost
rloc, err := r.routing.ResolveEndpoint(handlingInfo.serviceNamespace, handlingInfo.serviceName)
if err != nil {
http.Error(w, "missing route", http.StatusInternalServerError)
return
}
location.Host = rloc.Host
location.Path = req.URL.Path
location.RawQuery = req.URL.Query().Encode()
@ -119,7 +129,7 @@ func (r *proxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
upgrade := false
// we need to wrap the roundtripper in another roundtripper which will apply the front proxy headers
proxyRoundTripper, upgrade, err := maybeWrapForConnectionUpgrades(handlingInfo.restConfig, proxyRoundTripper, req)
proxyRoundTripper, upgrade, err = maybeWrapForConnectionUpgrades(handlingInfo.restConfig, proxyRoundTripper, req)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
@ -175,14 +185,13 @@ func (r *responder) Error(err error) {
// these methods provide locked access to fields
func (r *proxyHandler) updateAPIService(apiService *apiregistrationapi.APIService, destinationHost string) {
func (r *proxyHandler) updateAPIService(apiService *apiregistrationapi.APIService) {
if apiService.Spec.Service == nil {
r.handlingInfo.Store(proxyHandlingInfo{local: true})
return
}
newInfo := proxyHandlingInfo{
destinationHost: destinationHost,
restConfig: &restclient.Config{
TLSClientConfig: restclient.TLSClientConfig{
Insecure: apiService.Spec.InsecureSkipTLSVerify,
@ -192,6 +201,8 @@ func (r *proxyHandler) updateAPIService(apiService *apiregistrationapi.APIServic
CAData: apiService.Spec.CABundle,
},
},
serviceName: apiService.Spec.Service.Name,
serviceNamespace: apiService.Spec.Service.Namespace,
}
newInfo.proxyRoundTripper, newInfo.transportBuildingError = restclient.TransportFor(newInfo.restConfig)
r.handlingInfo.Store(newInfo)

View File

@ -31,6 +31,7 @@ import (
genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/kube-aggregator/pkg/apis/apiregistration"
"net/url"
)
type targetHTTPHandler struct {
@ -78,6 +79,17 @@ func (*fakeRequestContextMapper) Update(req *http.Request, context genericapireq
return nil
}
type mockedRouter struct {
destinationHost string
}
func (r *mockedRouter) ResolveEndpoint(namespace, name string) (*url.URL, error) {
return &url.URL{
Scheme: "https",
Host: r.destinationHost,
}, nil
}
func TestProxyHandler(t *testing.T) {
target := &targetHTTPHandler{}
targetServer := httptest.NewTLSServer(target)
@ -159,15 +171,15 @@ func TestProxyHandler(t *testing.T) {
func() {
handler := &proxyHandler{
localDelegate: http.NewServeMux(),
routing: &mockedRouter{destinationHost: targetServer.Listener.Addr().String()},
}
handler.contextMapper = &fakeRequestContextMapper{user: tc.user}
server := httptest.NewServer(handler)
defer server.Close()
if tc.apiService != nil {
handler.updateAPIService(tc.apiService, tc.apiService.Spec.Service.Name+"."+tc.apiService.Spec.Service.Namespace+".svc")
handler.updateAPIService(tc.apiService)
curr := handler.handlingInfo.Load().(proxyHandlingInfo)
curr.destinationHost = targetServer.Listener.Addr().String()
handler.handlingInfo.Store(curr)
}