Merge pull request #46623 from sttts/sttts-aggregator-unified-modes

Automatic merge from submit-queue (batch tested with PRs 43505, 45168, 46439, 46677, 46623)

aggregator: unify resolver implementation and tests

This is https://github.com/kubernetes/kubernetes/pull/45082, but without the port support.
This commit is contained in:
Kubernetes Submit Queue 2017-06-01 05:43:46 -07:00 committed by GitHub
commit 67724439fd
6 changed files with 274 additions and 302 deletions

View File

@ -18,11 +18,12 @@ go_test(
library = ":go_default_library",
tags = ["automanaged"],
deps = [
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/intstr:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/net:go_default_library",
"//vendor/k8s.io/client-go/listers/core/v1:go_default_library",
"//vendor/k8s.io/client-go/pkg/api/v1:go_default_library",
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
],
)
@ -40,6 +41,7 @@ go_library(
"//vendor/golang.org/x/net/html:go_default_library",
"//vendor/golang.org/x/net/html/atom:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/intstr:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/net:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//vendor/k8s.io/apimachinery/third_party/forked/golang/netutil:go_default_library",

View File

@ -24,48 +24,53 @@ import (
"strconv"
"k8s.io/apimachinery/pkg/api/errors"
utilnet "k8s.io/apimachinery/pkg/util/net"
listersv1 "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/pkg/api/v1"
"k8s.io/apimachinery/pkg/util/intstr"
)
// findServicePort finds the service port by name or numerically.
func findServicePort(svc *v1.Service, port intstr.IntOrString) (*v1.ServicePort, error) {
for _, svcPort := range svc.Spec.Ports {
if (port.Type == intstr.Int && int32(svcPort.Port) == port.IntVal) || (port.Type == intstr.String && svcPort.Name == port.StrVal) {
return &svcPort, nil
}
}
return nil, errors.NewServiceUnavailable(fmt.Sprintf("no service port %q found for service %q", port.String(), svc.Name))
}
// ResourceLocation returns a URL to which one can send traffic for the specified service.
func ResolveEndpoint(services listersv1.ServiceLister, endpoints listersv1.EndpointsLister, namespace, id string) (*url.URL, error) {
// Allow ID as "svcname", "svcname:port", or "scheme:svcname:port".
svcScheme, svcName, portStr, valid := utilnet.SplitSchemeNamePort(id)
if !valid {
return nil, errors.NewBadRequest(fmt.Sprintf("invalid service request %q", id))
svc, err := services.Services(namespace).Get(id)
if err != nil {
return nil, err
}
// If a port *number* was specified, find the corresponding service port name
if portNum, err := strconv.ParseInt(portStr, 10, 64); err == nil {
svc, err := services.Services(namespace).Get(svcName)
if err != nil {
return nil, err
}
found := false
for _, svcPort := range svc.Spec.Ports {
if int64(svcPort.Port) == portNum {
// use the declared port's name
portStr = svcPort.Name
found = true
break
}
}
if !found {
return nil, errors.NewServiceUnavailable(fmt.Sprintf("no service port %d found for service %q", portNum, svcName))
}
port := intstr.FromInt(443)
svcPort, err := findServicePort(svc, port)
if err != nil {
return nil, err
}
eps, err := endpoints.Endpoints(namespace).Get(svcName)
switch {
case svc.Spec.Type == v1.ServiceTypeClusterIP, svc.Spec.Type == v1.ServiceTypeLoadBalancer, svc.Spec.Type == v1.ServiceTypeNodePort:
// these are fine
default:
return nil, fmt.Errorf("unsupported service type %q", svc.Spec.Type)
}
eps, err := endpoints.Endpoints(namespace).Get(svc.Name)
if err != nil {
return nil, err
}
if len(eps.Subsets) == 0 {
return nil, errors.NewServiceUnavailable(fmt.Sprintf("no endpoints available for service %q", svcName))
return nil, errors.NewServiceUnavailable(fmt.Sprintf("no endpoints available for service %q", svc.Name))
}
// Pick a random Subset to start searching from.
ssSeed := rand.Intn(len(eps.Subsets))
// Find a Subset that has the port.
for ssi := 0; ssi < len(eps.Subsets); ssi++ {
ss := &eps.Subsets[(ssSeed+ssi)%len(eps.Subsets)]
@ -73,12 +78,12 @@ func ResolveEndpoint(services listersv1.ServiceLister, endpoints listersv1.Endpo
continue
}
for i := range ss.Ports {
if ss.Ports[i].Name == portStr {
if ss.Ports[i].Name == svcPort.Name {
// Pick a random address.
ip := ss.Addresses[rand.Intn(len(ss.Addresses))].IP
port := int(ss.Ports[i].Port)
return &url.URL{
Scheme: svcScheme,
Scheme: "https",
Host: net.JoinHostPort(ip, strconv.Itoa(port)),
}, nil
}
@ -88,30 +93,35 @@ func ResolveEndpoint(services listersv1.ServiceLister, endpoints listersv1.Endpo
}
func ResolveCluster(services listersv1.ServiceLister, namespace, id string) (*url.URL, error) {
if len(id) == 0 {
return &url.URL{Scheme: "https"}, nil
svc, err := services.Services(namespace).Get(id)
if err != nil {
return nil, err
}
destinationHost := id + "." + namespace + ".svc"
service, err := services.Services(namespace).Get(id)
if err != nil {
return &url.URL{
Scheme: "https",
Host: destinationHost,
}, nil
}
port := intstr.FromInt(443)
switch {
case svc.Spec.Type == v1.ServiceTypeClusterIP && svc.Spec.ClusterIP == v1.ClusterIPNone:
return nil, fmt.Errorf(`cannot route to service with ClusterIP "None"`)
// use IP from a clusterIP for these service types
case service.Spec.Type == v1.ServiceTypeClusterIP,
service.Spec.Type == v1.ServiceTypeNodePort,
service.Spec.Type == v1.ServiceTypeLoadBalancer:
case svc.Spec.Type == v1.ServiceTypeClusterIP, svc.Spec.Type == v1.ServiceTypeLoadBalancer, svc.Spec.Type == v1.ServiceTypeNodePort:
svcPort, err := findServicePort(svc, port)
if err != nil {
return nil, err
}
return &url.URL{
Scheme: "https",
Host: service.Spec.ClusterIP,
Host: net.JoinHostPort(svc.Spec.ClusterIP, fmt.Sprintf("%d", svcPort.Port)),
}, nil
case svc.Spec.Type == v1.ServiceTypeExternalName:
if port.Type != intstr.Int {
return nil, fmt.Errorf("named ports not supported")
}
return &url.URL{
Scheme: "https",
Host: net.JoinHostPort(svc.Spec.ExternalName, port.String()),
}, nil
default:
return nil, fmt.Errorf("unsupported service type %q", svc.Spec.Type)
}
return &url.URL{
Scheme: "https",
Host: destinationHost,
}, nil
}

View File

@ -17,103 +17,227 @@ limitations under the License.
package proxy
import (
"net/url"
"testing"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/labels"
listersv1 "k8s.io/client-go/listers/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
v1listers "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/pkg/api/v1"
"net/http"
"k8s.io/client-go/tools/cache"
)
type serviceListerMock struct {
services []*v1.Service
err error
}
func TestResolve(t *testing.T) {
matchingEndpoints := func(svc *v1.Service) []*v1.Endpoints {
ports := []v1.EndpointPort{}
for _, p := range svc.Spec.Ports {
if p.TargetPort.Type != intstr.Int {
continue
}
ports = append(ports, v1.EndpointPort{Name: p.Name, Port: p.TargetPort.IntVal})
}
func (s *serviceListerMock) List(selector labels.Selector) (ret []*v1.Service, err error) {
return s.services, err
}
func (s *serviceListerMock) Services(namespace string) listersv1.ServiceNamespaceLister {
return nil
}
func (s *serviceListerMock) GetPodServices(pod *v1.Pod) ([]*v1.Service, error) {
return nil, nil
}
type endpointsListerMock struct {
endpoints []*v1.Endpoints
err error
}
func (e *endpointsListerMock) List(selector labels.Selector) (ret []*v1.Endpoints, err error) {
return e.endpoints, e.err
}
func (e *endpointsListerMock) Endpoints(namespace string) listersv1.EndpointsNamespaceLister {
return endpointsNamespaceListMock{
endpoints: e.endpoints,
err: e.err,
}
}
type endpointsNamespaceListMock struct {
endpoints []*v1.Endpoints
err error
}
func (e endpointsNamespaceListMock) List(selector labels.Selector) (ret []*v1.Endpoints, err error) {
return e.endpoints, e.err
}
func (e endpointsNamespaceListMock) Get(name string) (*v1.Endpoints, error) {
if len(e.endpoints) == 0 {
return nil, e.err
}
return e.endpoints[0], e.err
}
func TestNoEndpointNoPort(t *testing.T) {
services := &serviceListerMock{}
endpoints := &endpointsListerMock{err: errors.NewNotFound(v1.Resource("endpoints"), "dummy-svc")}
url, err := ResolveEndpoint(services, endpoints, "dummy-ns", "dummy-svc")
if url != nil {
t.Error("Should not have gotten back an URL")
}
if err == nil {
t.Error("Should have gotten an error")
}
se, ok := err.(*errors.StatusError)
if !ok {
t.Error("Should have gotten a status error not %T", err)
}
if se.ErrStatus.Code != http.StatusNotFound {
t.Error("Should have gotten a http 404 not %d", se.ErrStatus.Code)
}
}
func TestOneEndpointNoPort(t *testing.T) {
services := &serviceListerMock{}
address := v1.EndpointAddress{Hostname: "dummy-host", IP: "127.0.0.1"}
addresses := []v1.EndpointAddress{address}
port := v1.EndpointPort{Port: 443}
ports := []v1.EndpointPort{port}
endpoint := v1.EndpointSubset{Addresses: addresses, Ports: ports}
subsets := []v1.EndpointSubset{endpoint}
one := &v1.Endpoints{Subsets: subsets}
slice := []*v1.Endpoints{one}
endpoints := &endpointsListerMock{endpoints: slice}
url, err := ResolveEndpoint(services, endpoints, "dummy-ns", "dummy-svc")
if err != nil {
t.Errorf("Should not have gotten error %v", err)
}
if url == nil {
t.Error("Should not have gotten back an URL")
}
if url.Host != "127.0.0.1:443" {
t.Error("Should have gotten back a host of dummy-host not %s", url.Host)
return []*v1.Endpoints{{
ObjectMeta: metav1.ObjectMeta{Namespace: svc.Namespace, Name: svc.Name},
Subsets: []v1.EndpointSubset{{
Addresses: []v1.EndpointAddress{{Hostname: "dummy-host", IP: "127.0.0.1"}},
Ports: ports,
}},
}}
}
type expectation struct {
url string
error bool
}
tests := []struct {
name string
services []*v1.Service
endpoints func(svc *v1.Service) []*v1.Endpoints
clusterMode expectation
endpointMode expectation
}{
{
name: "cluster ip without 443 port",
services: []*v1.Service{
{
ObjectMeta: metav1.ObjectMeta{Namespace: "one", Name: "alfa"},
Spec: v1.ServiceSpec{
Type: v1.ServiceTypeClusterIP,
ClusterIP: "hit",
Ports: []v1.ServicePort{
{Port: 1234, TargetPort: intstr.FromInt(1234)},
},
},
},
},
endpoints: matchingEndpoints,
clusterMode: expectation{error: true},
endpointMode: expectation{error: true},
},
{
name: "cluster ip",
services: []*v1.Service{
{
ObjectMeta: metav1.ObjectMeta{Namespace: "one", Name: "alfa"},
Spec: v1.ServiceSpec{
Type: v1.ServiceTypeClusterIP,
ClusterIP: "hit",
Ports: []v1.ServicePort{
{Name: "https", Port: 443, TargetPort: intstr.FromInt(1443)},
{Port: 1234, TargetPort: intstr.FromInt(1234)},
},
},
},
},
endpoints: matchingEndpoints,
clusterMode: expectation{url: "https://hit:443"},
endpointMode: expectation{url: "https://127.0.0.1:1443"},
},
{
name: "cluster ip without endpoints",
services: []*v1.Service{
{
ObjectMeta: metav1.ObjectMeta{Namespace: "one", Name: "alfa"},
Spec: v1.ServiceSpec{
Type: v1.ServiceTypeClusterIP,
ClusterIP: "hit",
Ports: []v1.ServicePort{
{Name: "https", Port: 443, TargetPort: intstr.FromInt(1443)},
{Port: 1234, TargetPort: intstr.FromInt(1234)},
},
},
},
},
endpoints: nil,
clusterMode: expectation{url: "https://hit:443"},
endpointMode: expectation{error: true},
},
{
name: "none cluster ip",
services: []*v1.Service{
{
ObjectMeta: metav1.ObjectMeta{Namespace: "one", Name: "alfa"},
Spec: v1.ServiceSpec{
Type: v1.ServiceTypeClusterIP,
ClusterIP: v1.ClusterIPNone,
},
},
},
endpoints: nil,
clusterMode: expectation{error: true},
endpointMode: expectation{error: true},
},
{
name: "loadbalancer",
services: []*v1.Service{
{
ObjectMeta: metav1.ObjectMeta{Namespace: "one", Name: "alfa"},
Spec: v1.ServiceSpec{
Type: v1.ServiceTypeLoadBalancer,
ClusterIP: "lb",
Ports: []v1.ServicePort{
{Name: "https", Port: 443, TargetPort: intstr.FromInt(1443)},
{Port: 1234, TargetPort: intstr.FromInt(1234)},
},
},
},
},
endpoints: matchingEndpoints,
clusterMode: expectation{url: "https://lb:443"},
endpointMode: expectation{url: "https://127.0.0.1:1443"},
},
{
name: "node port",
services: []*v1.Service{
{
ObjectMeta: metav1.ObjectMeta{Namespace: "one", Name: "alfa"},
Spec: v1.ServiceSpec{
Type: v1.ServiceTypeNodePort,
ClusterIP: "np",
Ports: []v1.ServicePort{
{Name: "https", Port: 443, TargetPort: intstr.FromInt(1443)},
{Port: 1234, TargetPort: intstr.FromInt(1234)},
},
},
},
},
endpoints: matchingEndpoints,
clusterMode: expectation{url: "https://np:443"},
endpointMode: expectation{url: "https://127.0.0.1:1443"},
},
{
name: "external name",
services: []*v1.Service{
{
ObjectMeta: metav1.ObjectMeta{Namespace: "one", Name: "alfa"},
Spec: v1.ServiceSpec{
Type: v1.ServiceTypeExternalName,
ExternalName: "foo.bar.com",
},
},
},
endpoints: nil,
clusterMode: expectation{url: "https://foo.bar.com:443"},
endpointMode: expectation{error: true},
},
{
name: "missing service",
services: nil,
endpoints: nil,
clusterMode: expectation{error: true},
endpointMode: expectation{error: true},
},
}
for _, test := range tests {
serviceCache := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
serviceLister := v1listers.NewServiceLister(serviceCache)
for i := range test.services {
if err := serviceCache.Add(test.services[i]); err != nil {
t.Fatalf("%s unexpected service add error: %v", test.name, err)
}
}
endpointCache := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
endpointLister := v1listers.NewEndpointsLister(endpointCache)
if test.endpoints != nil {
for _, svc := range test.services {
for _, ep := range test.endpoints(svc) {
if err := endpointCache.Add(ep); err != nil {
t.Fatalf("%s unexpected endpoint add error: %v", test.name, err)
}
}
}
}
check := func(mode string, expected expectation, url *url.URL, err error) {
switch {
case err == nil && expected.error:
t.Errorf("%s in %s mode expected error, got none", test.name, mode)
case err != nil && expected.error:
// ignore
case err != nil:
t.Errorf("%s in %s mode unexpected error: %v", test.name, mode, err)
case url.String() != expected.url:
t.Errorf("%s in %s mode expected url %q, got %q", test.name, mode, expected.url, url.String())
}
}
clusterURL, err := ResolveCluster(serviceLister, "one", "alfa")
check("cluster", test.clusterMode, clusterURL, err)
endpointURL, err := ResolveEndpoint(serviceLister, endpointLister, "one", "alfa")
check("endpoint", test.endpointMode, endpointURL, err)
}
}

View File

@ -11,7 +11,6 @@ load(
go_test(
name = "go_default_test",
srcs = [
"apiservice_controller_test.go",
"handler_apis_test.go",
"handler_proxy_test.go",
],
@ -25,8 +24,6 @@ go_test(
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//vendor/k8s.io/apiserver/pkg/authentication/user:go_default_library",
"//vendor/k8s.io/apiserver/pkg/endpoints/request:go_default_library",
"//vendor/k8s.io/client-go/listers/core/v1:go_default_library",
"//vendor/k8s.io/client-go/pkg/api/v1:go_default_library",
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
"//vendor/k8s.io/kube-aggregator/pkg/apis/apiregistration:go_default_library",
"//vendor/k8s.io/kube-aggregator/pkg/client/listers/apiregistration/internalversion:go_default_library",

View File

@ -106,28 +106,6 @@ func (c *APIServiceRegistrationController) sync(key string) error {
return nil
}
func (c *APIServiceRegistrationController) getDestinationHost(apiService *apiregistration.APIService) string {
if apiService.Spec.Service == nil {
return ""
}
destinationHost := apiService.Spec.Service.Name + "." + apiService.Spec.Service.Namespace + ".svc"
service, err := c.serviceLister.Services(apiService.Spec.Service.Namespace).Get(apiService.Spec.Service.Name)
if err != nil {
return destinationHost
}
switch {
// use IP from a clusterIP for these service types
case service.Spec.Type == v1.ServiceTypeClusterIP,
service.Spec.Type == v1.ServiceTypeNodePort,
service.Spec.Type == v1.ServiceTypeLoadBalancer:
return service.Spec.ClusterIP
}
// return the normal DNS name by default
return destinationHost
}
func (c *APIServiceRegistrationController) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer c.queue.ShutDown()

View File

@ -1,139 +0,0 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package apiserver
import (
"testing"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
v1listers "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/pkg/api/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/kube-aggregator/pkg/apis/apiregistration"
)
func TestGetDestinationHost(t *testing.T) {
tests := []struct {
name string
services []*v1.Service
apiService *apiregistration.APIService
expected string
}{
{
name: "cluster ip",
services: []*v1.Service{
{
ObjectMeta: metav1.ObjectMeta{Namespace: "one", Name: "alfa"},
Spec: v1.ServiceSpec{
Type: v1.ServiceTypeClusterIP,
ClusterIP: "hit",
},
},
},
apiService: &apiregistration.APIService{
ObjectMeta: metav1.ObjectMeta{Name: "v1."},
Spec: apiregistration.APIServiceSpec{
Service: &apiregistration.ServiceReference{
Namespace: "one",
Name: "alfa",
},
},
},
expected: "hit",
},
{
name: "loadbalancer",
services: []*v1.Service{
{
ObjectMeta: metav1.ObjectMeta{Namespace: "one", Name: "alfa"},
Spec: v1.ServiceSpec{
Type: v1.ServiceTypeLoadBalancer,
ClusterIP: "lb",
},
},
},
apiService: &apiregistration.APIService{
ObjectMeta: metav1.ObjectMeta{Name: "v1."},
Spec: apiregistration.APIServiceSpec{
Service: &apiregistration.ServiceReference{
Namespace: "one",
Name: "alfa",
},
},
},
expected: "lb",
},
{
name: "node port",
services: []*v1.Service{
{
ObjectMeta: metav1.ObjectMeta{Namespace: "one", Name: "alfa"},
Spec: v1.ServiceSpec{
Type: v1.ServiceTypeNodePort,
ClusterIP: "np",
},
},
},
apiService: &apiregistration.APIService{
ObjectMeta: metav1.ObjectMeta{Name: "v1."},
Spec: apiregistration.APIServiceSpec{
Service: &apiregistration.ServiceReference{
Namespace: "one",
Name: "alfa",
},
},
},
expected: "np",
},
{
name: "missing service",
apiService: &apiregistration.APIService{
ObjectMeta: metav1.ObjectMeta{Name: "v1."},
Spec: apiregistration.APIServiceSpec{
Service: &apiregistration.ServiceReference{
Namespace: "one",
Name: "alfa",
},
},
},
expected: "alfa.one.svc",
},
}
for _, test := range tests {
serviceCache := cache.NewIndexer(cache.DeletionHandlingMetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
serviceLister := v1listers.NewServiceLister(serviceCache)
c := &APIServiceRegistrationController{
serviceLister: serviceLister,
}
for i := range test.services {
serviceCache.Add(test.services[i])
}
actual := c.getDestinationHost(test.apiService)
if actual != test.expected {
t.Errorf("%s expected %v, got %v", test.name, test.expected, actual)
}
}
}