From 2f671340c9cbfb1817ea4d28c30298b19498089d Mon Sep 17 00:00:00 2001 From: Ted Yu Date: Thu, 15 Aug 2019 11:28:07 -0700 Subject: [PATCH] buildPortsToEndpointsMap should use flattened value type --- pkg/proxy/userspace/roundrobin.go | 54 +++--------------- pkg/proxy/userspace/roundrobin_test.go | 39 ------------- pkg/proxy/util/utils.go | 25 +++++++++ pkg/proxy/util/utils_test.go | 67 +++++++++++++++++++++++ pkg/proxy/winuserspace/BUILD | 1 + pkg/proxy/winuserspace/roundrobin.go | 54 +++--------------- pkg/proxy/winuserspace/roundrobin_test.go | 39 ------------- 7 files changed, 107 insertions(+), 172 deletions(-) diff --git a/pkg/proxy/userspace/roundrobin.go b/pkg/proxy/userspace/roundrobin.go index 17ba9e21649..643e5c43033 100644 --- a/pkg/proxy/userspace/roundrobin.go +++ b/pkg/proxy/userspace/roundrobin.go @@ -21,7 +21,6 @@ import ( "fmt" "net" "reflect" - "strconv" "sync" "time" @@ -29,6 +28,7 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/klog" "k8s.io/kubernetes/pkg/proxy" + "k8s.io/kubernetes/pkg/proxy/util" "k8s.io/kubernetes/pkg/util/slice" ) @@ -188,28 +188,6 @@ func (lb *LoadBalancerRR) NextEndpoint(svcPort proxy.ServicePortName, srcAddr ne return endpoint, nil } -type hostPortPair struct { - host string - port int -} - -func isValidEndpoint(hpp *hostPortPair) bool { - return hpp.host != "" && hpp.port > 0 -} - -func flattenValidEndpoints(endpoints []hostPortPair) []string { - // Convert Endpoint objects into strings for easier use later. Ignore - // the protocol field - we'll get that from the Service objects. - var result []string - for i := range endpoints { - hpp := &endpoints[i] - if isValidEndpoint(hpp) { - result = append(result, net.JoinHostPort(hpp.host, strconv.Itoa(hpp.port))) - } - } - return result -} - // Remove any session affinity records associated to a particular endpoint (for example when a pod goes down). func removeSessionAffinityByEndpoint(state *balancerState, svcPort proxy.ServicePortName, endpoint string) { for _, affinity := range state.affinity.affinityMap { @@ -243,33 +221,15 @@ func (lb *LoadBalancerRR) updateAffinityMap(svcPort proxy.ServicePortName, newEn } } -// buildPortsToEndpointsMap builds a map of portname -> all ip:ports for that -// portname. Expode Endpoints.Subsets[*] into this structure. -func buildPortsToEndpointsMap(endpoints *v1.Endpoints) map[string][]hostPortPair { - portsToEndpoints := map[string][]hostPortPair{} - for i := range endpoints.Subsets { - ss := &endpoints.Subsets[i] - for i := range ss.Ports { - port := &ss.Ports[i] - for i := range ss.Addresses { - addr := &ss.Addresses[i] - portsToEndpoints[port.Name] = append(portsToEndpoints[port.Name], hostPortPair{addr.IP, int(port.Port)}) - // Ignore the protocol field - we'll get that from the Service objects. - } - } - } - return portsToEndpoints -} - func (lb *LoadBalancerRR) OnEndpointsAdd(endpoints *v1.Endpoints) { - portsToEndpoints := buildPortsToEndpointsMap(endpoints) + portsToEndpoints := util.BuildPortsToEndpointsMap(endpoints) lb.lock.Lock() defer lb.lock.Unlock() for portname := range portsToEndpoints { svcPort := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}, Port: portname} - newEndpoints := flattenValidEndpoints(portsToEndpoints[portname]) + newEndpoints := portsToEndpoints[portname] state, exists := lb.services[svcPort] if !exists || state == nil || len(newEndpoints) > 0 { @@ -289,8 +249,8 @@ func (lb *LoadBalancerRR) OnEndpointsAdd(endpoints *v1.Endpoints) { } func (lb *LoadBalancerRR) OnEndpointsUpdate(oldEndpoints, endpoints *v1.Endpoints) { - portsToEndpoints := buildPortsToEndpointsMap(endpoints) - oldPortsToEndpoints := buildPortsToEndpointsMap(oldEndpoints) + portsToEndpoints := util.BuildPortsToEndpointsMap(endpoints) + oldPortsToEndpoints := util.BuildPortsToEndpointsMap(oldEndpoints) registeredEndpoints := make(map[proxy.ServicePortName]bool) lb.lock.Lock() @@ -298,7 +258,7 @@ func (lb *LoadBalancerRR) OnEndpointsUpdate(oldEndpoints, endpoints *v1.Endpoint for portname := range portsToEndpoints { svcPort := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}, Port: portname} - newEndpoints := flattenValidEndpoints(portsToEndpoints[portname]) + newEndpoints := portsToEndpoints[portname] state, exists := lb.services[svcPort] curEndpoints := []string{} @@ -344,7 +304,7 @@ func (lb *LoadBalancerRR) resetService(svcPort proxy.ServicePortName) { } func (lb *LoadBalancerRR) OnEndpointsDelete(endpoints *v1.Endpoints) { - portsToEndpoints := buildPortsToEndpointsMap(endpoints) + portsToEndpoints := util.BuildPortsToEndpointsMap(endpoints) lb.lock.Lock() defer lb.lock.Unlock() diff --git a/pkg/proxy/userspace/roundrobin_test.go b/pkg/proxy/userspace/roundrobin_test.go index 01b711e0d46..315a2ea9865 100644 --- a/pkg/proxy/userspace/roundrobin_test.go +++ b/pkg/proxy/userspace/roundrobin_test.go @@ -26,45 +26,6 @@ import ( "k8s.io/kubernetes/pkg/proxy" ) -func TestValidateWorks(t *testing.T) { - if isValidEndpoint(&hostPortPair{}) { - t.Errorf("Didn't fail for empty set") - } - if isValidEndpoint(&hostPortPair{host: "foobar"}) { - t.Errorf("Didn't fail with invalid port") - } - if isValidEndpoint(&hostPortPair{host: "foobar", port: -1}) { - t.Errorf("Didn't fail with a negative port") - } - if !isValidEndpoint(&hostPortPair{host: "foobar", port: 8080}) { - t.Errorf("Failed a valid config.") - } -} - -func TestFilterWorks(t *testing.T) { - endpoints := []hostPortPair{ - {host: "foobar", port: 1}, - {host: "foobar", port: 2}, - {host: "foobar", port: -1}, - {host: "foobar", port: 3}, - {host: "foobar", port: -2}, - } - filtered := flattenValidEndpoints(endpoints) - - if len(filtered) != 3 { - t.Errorf("Failed to filter to the correct size") - } - if filtered[0] != "foobar:1" { - t.Errorf("Index zero is not foobar:1") - } - if filtered[1] != "foobar:2" { - t.Errorf("Index one is not foobar:2") - } - if filtered[2] != "foobar:3" { - t.Errorf("Index two is not foobar:3") - } -} - func TestLoadBalanceFailsWithNoEndpoints(t *testing.T) { loadBalancer := NewLoadBalancerRR() service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: "does-not-exist"} diff --git a/pkg/proxy/util/utils.go b/pkg/proxy/util/utils.go index d390c7f5b0a..fd411c9c959 100644 --- a/pkg/proxy/util/utils.go +++ b/pkg/proxy/util/utils.go @@ -21,6 +21,7 @@ import ( "errors" "fmt" "net" + "strconv" "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" @@ -48,6 +49,30 @@ var ( ErrNoAddresses = errors.New("No addresses for hostname") ) +// isValidEndpoint checks that the given host / port pair are valid endpoint +func isValidEndpoint(host string, port int) bool { + return host != "" && port > 0 +} + +// BuildPortsToEndpointsMap builds a map of portname -> all ip:ports for that +// portname. Explode Endpoints.Subsets[*] into this structure. +func BuildPortsToEndpointsMap(endpoints *v1.Endpoints) map[string][]string { + portsToEndpoints := map[string][]string{} + for i := range endpoints.Subsets { + ss := &endpoints.Subsets[i] + for i := range ss.Ports { + port := &ss.Ports[i] + for i := range ss.Addresses { + addr := &ss.Addresses[i] + if isValidEndpoint(addr.IP, int(port.Port)) { + portsToEndpoints[port.Name] = append(portsToEndpoints[port.Name], net.JoinHostPort(addr.IP, strconv.Itoa(int(port.Port)))) + } + } + } + } + return portsToEndpoints +} + // IsZeroCIDR checks whether the input CIDR string is either // the IPv4 or IPv6 zero CIDR func IsZeroCIDR(cidr string) bool { diff --git a/pkg/proxy/util/utils_test.go b/pkg/proxy/util/utils_test.go index 9abf40fed4e..644b0e31888 100644 --- a/pkg/proxy/util/utils_test.go +++ b/pkg/proxy/util/utils_test.go @@ -19,6 +19,7 @@ package util import ( "context" "net" + "reflect" "testing" "k8s.io/api/core/v1" @@ -28,6 +29,72 @@ import ( fake "k8s.io/kubernetes/pkg/proxy/util/testing" ) +func TestValidateWorks(t *testing.T) { + if isValidEndpoint("", 0) { + t.Errorf("Didn't fail for empty set") + } + if isValidEndpoint("foobar", 0) { + t.Errorf("Didn't fail with invalid port") + } + if isValidEndpoint("foobar", -1) { + t.Errorf("Didn't fail with a negative port") + } + if !isValidEndpoint("foobar", 8080) { + t.Errorf("Failed a valid config.") + } +} + +func TestBuildPortsToEndpointsMap(t *testing.T) { + endpoints := &v1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "testnamespace"}, + Subsets: []v1.EndpointSubset{ + { + Addresses: []v1.EndpointAddress{ + {IP: "10.0.0.1"}, + {IP: "10.0.0.2"}, + }, + Ports: []v1.EndpointPort{ + {Name: "http", Port: 80}, + {Name: "https", Port: 443}, + }, + }, + { + Addresses: []v1.EndpointAddress{ + {IP: "10.0.0.1"}, + {IP: "10.0.0.3"}, + }, + Ports: []v1.EndpointPort{ + {Name: "http", Port: 8080}, + {Name: "dns", Port: 53}, + }, + }, + { + Addresses: []v1.EndpointAddress{}, + Ports: []v1.EndpointPort{ + {Name: "http", Port: 8888}, + {Name: "ssh", Port: 22}, + }, + }, + { + Addresses: []v1.EndpointAddress{ + {IP: "10.0.0.1"}, + }, + Ports: []v1.EndpointPort{}, + }, + }, + } + expectedPortsToEndpoints := map[string][]string{ + "http": {"10.0.0.1:80", "10.0.0.2:80", "10.0.0.1:8080", "10.0.0.3:8080"}, + "https": {"10.0.0.1:443", "10.0.0.2:443"}, + "dns": {"10.0.0.1:53", "10.0.0.3:53"}, + } + + portsToEndpoints := BuildPortsToEndpointsMap(endpoints) + if !reflect.DeepEqual(expectedPortsToEndpoints, portsToEndpoints) { + t.Errorf("expected ports to endpoints not seen") + } +} + func TestIsProxyableIP(t *testing.T) { testCases := []struct { ip string diff --git a/pkg/proxy/winuserspace/BUILD b/pkg/proxy/winuserspace/BUILD index 8bad75c386b..b8ef1e38a62 100644 --- a/pkg/proxy/winuserspace/BUILD +++ b/pkg/proxy/winuserspace/BUILD @@ -20,6 +20,7 @@ go_library( "//pkg/apis/core/v1/helper:go_default_library", "//pkg/proxy:go_default_library", "//pkg/proxy/config:go_default_library", + "//pkg/proxy/util:go_default_library", "//pkg/util/ipconfig:go_default_library", "//pkg/util/netsh:go_default_library", "//pkg/util/slice:go_default_library", diff --git a/pkg/proxy/winuserspace/roundrobin.go b/pkg/proxy/winuserspace/roundrobin.go index a712ed60bd1..efa93d2425a 100644 --- a/pkg/proxy/winuserspace/roundrobin.go +++ b/pkg/proxy/winuserspace/roundrobin.go @@ -21,7 +21,6 @@ import ( "fmt" "net" "reflect" - "strconv" "sync" "time" @@ -29,6 +28,7 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/klog" "k8s.io/kubernetes/pkg/proxy" + "k8s.io/kubernetes/pkg/proxy/util" "k8s.io/kubernetes/pkg/util/slice" ) @@ -178,28 +178,6 @@ func (lb *LoadBalancerRR) NextEndpoint(svcPort proxy.ServicePortName, srcAddr ne return endpoint, nil } -type hostPortPair struct { - host string - port int -} - -func isValidEndpoint(hpp *hostPortPair) bool { - return hpp.host != "" && hpp.port > 0 -} - -func flattenValidEndpoints(endpoints []hostPortPair) []string { - // Convert Endpoint objects into strings for easier use later. Ignore - // the protocol field - we'll get that from the Service objects. - var result []string - for i := range endpoints { - hpp := &endpoints[i] - if isValidEndpoint(hpp) { - result = append(result, net.JoinHostPort(hpp.host, strconv.Itoa(hpp.port))) - } - } - return result -} - // Remove any session affinity records associated to a particular endpoint (for example when a pod goes down). func removeSessionAffinityByEndpoint(state *balancerState, svcPort proxy.ServicePortName, endpoint string) { for _, affinity := range state.affinity.affinityMap { @@ -233,33 +211,15 @@ func (lb *LoadBalancerRR) updateAffinityMap(svcPort proxy.ServicePortName, newEn } } -// buildPortsToEndpointsMap builds a map of portname -> all ip:ports for that -// portname. Explode Endpoints.Subsets[*] into this structure. -func buildPortsToEndpointsMap(endpoints *v1.Endpoints) map[string][]hostPortPair { - portsToEndpoints := map[string][]hostPortPair{} - for i := range endpoints.Subsets { - ss := &endpoints.Subsets[i] - for i := range ss.Ports { - port := &ss.Ports[i] - for i := range ss.Addresses { - addr := &ss.Addresses[i] - portsToEndpoints[port.Name] = append(portsToEndpoints[port.Name], hostPortPair{addr.IP, int(port.Port)}) - // Ignore the protocol field - we'll get that from the Service objects. - } - } - } - return portsToEndpoints -} - func (lb *LoadBalancerRR) OnEndpointsAdd(endpoints *v1.Endpoints) { - portsToEndpoints := buildPortsToEndpointsMap(endpoints) + portsToEndpoints := util.BuildPortsToEndpointsMap(endpoints) lb.lock.Lock() defer lb.lock.Unlock() for portname := range portsToEndpoints { svcPort := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}, Port: portname} - newEndpoints := flattenValidEndpoints(portsToEndpoints[portname]) + newEndpoints := portsToEndpoints[portname] state, exists := lb.services[svcPort] if !exists || state == nil || len(newEndpoints) > 0 { @@ -279,8 +239,8 @@ func (lb *LoadBalancerRR) OnEndpointsAdd(endpoints *v1.Endpoints) { } func (lb *LoadBalancerRR) OnEndpointsUpdate(oldEndpoints, endpoints *v1.Endpoints) { - portsToEndpoints := buildPortsToEndpointsMap(endpoints) - oldPortsToEndpoints := buildPortsToEndpointsMap(oldEndpoints) + portsToEndpoints := util.BuildPortsToEndpointsMap(endpoints) + oldPortsToEndpoints := util.BuildPortsToEndpointsMap(oldEndpoints) registeredEndpoints := make(map[proxy.ServicePortName]bool) lb.lock.Lock() @@ -288,7 +248,7 @@ func (lb *LoadBalancerRR) OnEndpointsUpdate(oldEndpoints, endpoints *v1.Endpoint for portname := range portsToEndpoints { svcPort := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}, Port: portname} - newEndpoints := flattenValidEndpoints(portsToEndpoints[portname]) + newEndpoints := portsToEndpoints[portname] state, exists := lb.services[svcPort] curEndpoints := []string{} @@ -326,7 +286,7 @@ func (lb *LoadBalancerRR) OnEndpointsUpdate(oldEndpoints, endpoints *v1.Endpoint } func (lb *LoadBalancerRR) OnEndpointsDelete(endpoints *v1.Endpoints) { - portsToEndpoints := buildPortsToEndpointsMap(endpoints) + portsToEndpoints := util.BuildPortsToEndpointsMap(endpoints) lb.lock.Lock() defer lb.lock.Unlock() diff --git a/pkg/proxy/winuserspace/roundrobin_test.go b/pkg/proxy/winuserspace/roundrobin_test.go index c002ecf34da..f6cce8cff8e 100644 --- a/pkg/proxy/winuserspace/roundrobin_test.go +++ b/pkg/proxy/winuserspace/roundrobin_test.go @@ -26,45 +26,6 @@ import ( "k8s.io/kubernetes/pkg/proxy" ) -func TestValidateWorks(t *testing.T) { - if isValidEndpoint(&hostPortPair{}) { - t.Errorf("Didn't fail for empty set") - } - if isValidEndpoint(&hostPortPair{host: "foobar"}) { - t.Errorf("Didn't fail with invalid port") - } - if isValidEndpoint(&hostPortPair{host: "foobar", port: -1}) { - t.Errorf("Didn't fail with a negative port") - } - if !isValidEndpoint(&hostPortPair{host: "foobar", port: 8080}) { - t.Errorf("Failed a valid config.") - } -} - -func TestFilterWorks(t *testing.T) { - endpoints := []hostPortPair{ - {host: "foobar", port: 1}, - {host: "foobar", port: 2}, - {host: "foobar", port: -1}, - {host: "foobar", port: 3}, - {host: "foobar", port: -2}, - } - filtered := flattenValidEndpoints(endpoints) - - if len(filtered) != 3 { - t.Errorf("Failed to filter to the correct size") - } - if filtered[0] != "foobar:1" { - t.Errorf("Index zero is not foobar:1") - } - if filtered[1] != "foobar:2" { - t.Errorf("Index one is not foobar:2") - } - if filtered[2] != "foobar:3" { - t.Errorf("Index two is not foobar:3") - } -} - func TestLoadBalanceFailsWithNoEndpoints(t *testing.T) { loadBalancer := NewLoadBalancerRR() service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: "does-not-exist"}