From d56759452cccd34c33d987d001092128d0b0f2ff Mon Sep 17 00:00:00 2001 From: "Dr. Stefan Schimanski" Date: Tue, 30 May 2017 12:48:35 +0200 Subject: [PATCH] aggregation: unify mode implementations --- .../k8s.io/apiserver/pkg/util/proxy/proxy.go | 90 +++++------ .../apiserver/pkg/util/proxy/proxy_test.go | 152 ++++++++---------- 2 files changed, 111 insertions(+), 131 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/util/proxy/proxy.go b/staging/src/k8s.io/apiserver/pkg/util/proxy/proxy.go index 925370d59e0..afad35f03b7 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/proxy/proxy.go +++ b/staging/src/k8s.io/apiserver/pkg/util/proxy/proxy.go @@ -24,48 +24,53 @@ import ( "strconv" "k8s.io/apimachinery/pkg/api/errors" - utilnet "k8s.io/apimachinery/pkg/util/net" listersv1 "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/pkg/api/v1" + + "k8s.io/apimachinery/pkg/util/intstr" ) +// findServicePort finds the service port by name or numerically. +func findServicePort(svc *v1.Service, port intstr.IntOrString) (*v1.ServicePort, error) { + for _, svcPort := range svc.Spec.Ports { + if (port.Type == intstr.Int && int32(svcPort.Port) == port.IntVal) || (port.Type == intstr.String && svcPort.Name == port.StrVal) { + return &svcPort, nil + } + } + return nil, errors.NewServiceUnavailable(fmt.Sprintf("no service port %q found for service %q", port.String(), svc.Name)) +} + // ResourceLocation returns a URL to which one can send traffic for the specified service. func ResolveEndpoint(services listersv1.ServiceLister, endpoints listersv1.EndpointsLister, namespace, id string) (*url.URL, error) { - // Allow ID as "svcname", "svcname:port", or "scheme:svcname:port". - svcScheme, svcName, portStr, valid := utilnet.SplitSchemeNamePort(id) - if !valid { - return nil, errors.NewBadRequest(fmt.Sprintf("invalid service request %q", id)) + svc, err := services.Services(namespace).Get(id) + if err != nil { + return nil, err } - // If a port *number* was specified, find the corresponding service port name - if portNum, err := strconv.ParseInt(portStr, 10, 64); err == nil { - svc, err := services.Services(namespace).Get(svcName) - if err != nil { - return nil, err - } - found := false - for _, svcPort := range svc.Spec.Ports { - if int64(svcPort.Port) == portNum { - // use the declared port's name - portStr = svcPort.Name - found = true - break - } - } - if !found { - return nil, errors.NewServiceUnavailable(fmt.Sprintf("no service port %d found for service %q", portNum, svcName)) - } + port := intstr.FromInt(443) + svcPort, err := findServicePort(svc, port) + if err != nil { + return nil, err } - eps, err := endpoints.Endpoints(namespace).Get(svcName) + switch { + case svc.Spec.Type == v1.ServiceTypeClusterIP, svc.Spec.Type == v1.ServiceTypeLoadBalancer, svc.Spec.Type == v1.ServiceTypeNodePort: + // these are fine + default: + return nil, fmt.Errorf("unsupported service type %q", svc.Spec.Type) + } + + eps, err := endpoints.Endpoints(namespace).Get(svc.Name) if err != nil { return nil, err } if len(eps.Subsets) == 0 { - return nil, errors.NewServiceUnavailable(fmt.Sprintf("no endpoints available for service %q", svcName)) + return nil, errors.NewServiceUnavailable(fmt.Sprintf("no endpoints available for service %q", svc.Name)) } + // Pick a random Subset to start searching from. ssSeed := rand.Intn(len(eps.Subsets)) + // Find a Subset that has the port. for ssi := 0; ssi < len(eps.Subsets); ssi++ { ss := &eps.Subsets[(ssSeed+ssi)%len(eps.Subsets)] @@ -73,12 +78,12 @@ func ResolveEndpoint(services listersv1.ServiceLister, endpoints listersv1.Endpo continue } for i := range ss.Ports { - if ss.Ports[i].Name == portStr { + if ss.Ports[i].Name == svcPort.Name { // Pick a random address. ip := ss.Addresses[rand.Intn(len(ss.Addresses))].IP port := int(ss.Ports[i].Port) return &url.URL{ - Scheme: svcScheme, + Scheme: "https", Host: net.JoinHostPort(ip, strconv.Itoa(port)), }, nil } @@ -88,30 +93,25 @@ func ResolveEndpoint(services listersv1.ServiceLister, endpoints listersv1.Endpo } func ResolveCluster(services listersv1.ServiceLister, namespace, id string) (*url.URL, error) { - if len(id) == 0 { - return &url.URL{Scheme: "https"}, nil + svc, err := services.Services(namespace).Get(id) + if err != nil { + return nil, err } - destinationHost := id + "." + namespace + ".svc" - service, err := services.Services(namespace).Get(id) - if err != nil { - return &url.URL{ - Scheme: "https", - Host: destinationHost, - }, nil - } + port := intstr.FromInt(443) + switch { // use IP from a clusterIP for these service types - case service.Spec.Type == v1.ServiceTypeClusterIP, - service.Spec.Type == v1.ServiceTypeNodePort, - service.Spec.Type == v1.ServiceTypeLoadBalancer: + case svc.Spec.Type == v1.ServiceTypeClusterIP, svc.Spec.Type == v1.ServiceTypeLoadBalancer, svc.Spec.Type == v1.ServiceTypeNodePort: + svcPort, err := findServicePort(svc, port) + if err != nil { + return nil, err + } return &url.URL{ Scheme: "https", - Host: service.Spec.ClusterIP, + Host: net.JoinHostPort(svc.Spec.ClusterIP, fmt.Sprintf("%d", svcPort.Port)), }, nil + default: + return nil, fmt.Errorf("unsupported service type %q", svc.Spec.Type) } - return &url.URL{ - Scheme: "https", - Host: destinationHost, - }, nil } diff --git a/staging/src/k8s.io/apiserver/pkg/util/proxy/proxy_test.go b/staging/src/k8s.io/apiserver/pkg/util/proxy/proxy_test.go index 4cd6fbf198e..89ad51831fe 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/proxy/proxy_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/proxy/proxy_test.go @@ -25,18 +25,26 @@ import ( v1listers "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/pkg/api/v1" "k8s.io/client-go/tools/cache" - - "k8s.io/kube-aggregator/pkg/apis/apiregistration" ) func TestResolve(t *testing.T) { - endpoints := []*v1.Endpoints{{ - ObjectMeta: metav1.ObjectMeta{Namespace: "one", Name: "alfa"}, - Subsets: []v1.EndpointSubset{{ - Addresses: []v1.EndpointAddress{{Hostname: "dummy-host", IP: "127.0.0.1"}}, - Ports: []v1.EndpointPort{{Port: 443}}, - }}, - }} + matchingEndpoints := func(svc *v1.Service) []*v1.Endpoints { + ports := []v1.EndpointPort{} + for _, p := range svc.Spec.Ports { + if p.TargetPort.Type != intstr.Int { + continue + } + ports = append(ports, v1.EndpointPort{Name: p.Name, Port: p.TargetPort.IntVal}) + } + + return []*v1.Endpoints{{ + ObjectMeta: metav1.ObjectMeta{Namespace: svc.Namespace, Name: svc.Name}, + Subsets: []v1.EndpointSubset{{ + Addresses: []v1.EndpointAddress{{Hostname: "dummy-host", IP: "127.0.0.1"}}, + Ports: ports, + }}, + }} + } type expectation struct { url string @@ -44,37 +52,30 @@ func TestResolve(t *testing.T) { } tests := []struct { - name string - services []*v1.Service - endpoints []*v1.Endpoints - apiService *apiregistration.APIService + name string + services []*v1.Service + endpoints func(svc *v1.Service) []*v1.Endpoints clusterMode expectation endpointMode expectation }{ { - name: "cluster ip without ports", + name: "cluster ip without 443 port", services: []*v1.Service{ { ObjectMeta: metav1.ObjectMeta{Namespace: "one", Name: "alfa"}, Spec: v1.ServiceSpec{ Type: v1.ServiceTypeClusterIP, ClusterIP: "hit", + Ports: []v1.ServicePort{ + {Port: 1234, TargetPort: intstr.FromInt(1234)}, + }, }, }, }, - endpoints: endpoints, // TODO: do we have endpoints without ports? - apiService: &apiregistration.APIService{ - ObjectMeta: metav1.ObjectMeta{Name: "v1."}, - Spec: apiregistration.APIServiceSpec{ - Service: &apiregistration.ServiceReference{ - Namespace: "one", - Name: "alfa", - }, - }, - }, + endpoints: matchingEndpoints, - clusterMode: expectation{url: "https://hit"}, // TODO: this should be an error as well + clusterMode: expectation{error: true}, endpointMode: expectation{error: true}, }, { @@ -85,22 +86,17 @@ func TestResolve(t *testing.T) { Spec: v1.ServiceSpec{ Type: v1.ServiceTypeClusterIP, ClusterIP: "hit", + Ports: []v1.ServicePort{ + {Name: "https", Port: 443, TargetPort: intstr.FromInt(1443)}, + {Port: 1234, TargetPort: intstr.FromInt(1234)}, + }, }, }, }, - endpoints: endpoints, - apiService: &apiregistration.APIService{ - ObjectMeta: metav1.ObjectMeta{Name: "v1."}, - Spec: apiregistration.APIServiceSpec{ - Service: &apiregistration.ServiceReference{ - Namespace: "one", - Name: "alfa", - }, - }, - }, + endpoints: matchingEndpoints, - clusterMode: expectation{url: "https://hit"}, - endpointMode: expectation{url: "https://127.0.0.1"}, + clusterMode: expectation{url: "https://hit:443"}, + endpointMode: expectation{url: "https://127.0.0.1:1443"}, }, { name: "cluster ip without endpoints", @@ -110,21 +106,16 @@ func TestResolve(t *testing.T) { Spec: v1.ServiceSpec{ Type: v1.ServiceTypeClusterIP, ClusterIP: "hit", + Ports: []v1.ServicePort{ + {Name: "https", Port: 443, TargetPort: intstr.FromInt(1443)}, + {Port: 1234, TargetPort: intstr.FromInt(1234)}, + }, }, }, }, endpoints: nil, - apiService: &apiregistration.APIService{ - ObjectMeta: metav1.ObjectMeta{Name: "v1."}, - Spec: apiregistration.APIServiceSpec{ - Service: &apiregistration.ServiceReference{ - Namespace: "one", - Name: "alfa", - }, - }, - }, - clusterMode: expectation{url: "https://hit"}, + clusterMode: expectation{url: "https://hit:443"}, endpointMode: expectation{error: true}, }, { @@ -135,22 +126,17 @@ func TestResolve(t *testing.T) { Spec: v1.ServiceSpec{ Type: v1.ServiceTypeLoadBalancer, ClusterIP: "lb", + Ports: []v1.ServicePort{ + {Name: "https", Port: 443, TargetPort: intstr.FromInt(1443)}, + {Port: 1234, TargetPort: intstr.FromInt(1234)}, + }, }, }, }, - endpoints: nil, - apiService: &apiregistration.APIService{ - ObjectMeta: metav1.ObjectMeta{Name: "v1."}, - Spec: apiregistration.APIServiceSpec{ - Service: &apiregistration.ServiceReference{ - Namespace: "one", - Name: "alfa", - }, - }, - }, + endpoints: matchingEndpoints, - clusterMode: expectation{url: "https://lb"}, - endpointMode: expectation{error: true}, + clusterMode: expectation{url: "https://lb:443"}, + endpointMode: expectation{url: "https://127.0.0.1:1443"}, }, { name: "node port", @@ -160,53 +146,47 @@ func TestResolve(t *testing.T) { Spec: v1.ServiceSpec{ Type: v1.ServiceTypeNodePort, ClusterIP: "np", + Ports: []v1.ServicePort{ + {Name: "https", Port: 443, TargetPort: intstr.FromInt(1443)}, + {Port: 1234, TargetPort: intstr.FromInt(1234)}, + }, }, }, }, - endpoints: nil, - apiService: &apiregistration.APIService{ - ObjectMeta: metav1.ObjectMeta{Name: "v1."}, - Spec: apiregistration.APIServiceSpec{ - Service: &apiregistration.ServiceReference{ - Namespace: "one", - Name: "alfa", - }, - }, - }, + endpoints: matchingEndpoints, - clusterMode: expectation{url: "https://np"}, - endpointMode: expectation{error: true}, + clusterMode: expectation{url: "https://np:443"}, + endpointMode: expectation{url: "https://127.0.0.1:1443"}, }, { name: "missing service", services: nil, endpoints: nil, - apiService: &apiregistration.APIService{ - ObjectMeta: metav1.ObjectMeta{Name: "v1."}, - Spec: apiregistration.APIServiceSpec{ - Service: &apiregistration.ServiceReference{ - Namespace: "one", - Name: "alfa", - }, - }, - }, - clusterMode: expectation{url: "https://alfa.one.svc"}, // defaulting to 443 due to https:// prefix + clusterMode: expectation{error: true}, endpointMode: expectation{error: true}, }, } for _, test := range tests { - serviceCache := cache.NewIndexer(cache.DeletionHandlingMetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) + serviceCache := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) serviceLister := v1listers.NewServiceLister(serviceCache) for i := range test.services { - serviceCache.Add(test.services[i]) + if err := serviceCache.Add(test.services[i]); err != nil { + t.Fatalf("%s unexpected service add error: %v", test.name, err) + } } - endpointCache := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{}) + endpointCache := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) endpointLister := v1listers.NewEndpointsLister(endpointCache) - for i := range test.endpoints { - endpointCache.Add(test.endpoints[i]) + if test.endpoints != nil { + for _, svc := range test.services { + for _, ep := range test.endpoints(svc) { + if err := endpointCache.Add(ep); err != nil { + t.Fatalf("%s unexpected endpoint add error: %v", test.name, err) + } + } + } } check := func(mode string, expected expectation, url *url.URL, err error) {