From 5fbc9e45cceea8c4b081a520e12aea402bfae323 Mon Sep 17 00:00:00 2001 From: Dane LeBlanc Date: Wed, 13 Sep 2017 10:05:56 -0400 Subject: [PATCH] Add IPv6 support to iptables proxier The following changes are proposed for the iptables proxier: * There are three places where a string specifying IP:port is parsed using something like this: if index := strings.Index(e.endpoint, ":"); index != -1 { This will fail for IPv6 since V6 addresses contain colons. Also, the V6 address is expected to be surrounded by square brackets (i.e. []:). Fix this by replacing call to Index with call to LastIndex() and stripping out square brackets. * The String() method for the localPort struct should put square brackets around IPv6 addresses. * The logging in the merge() method for proxyServiceMap should put brackets around IPv6 addresses. * There are several places where filterRules destination is hardcoded to /32. This should be a /128 for IPv6 case. * Add IPv6 unit test cases fixes #48550 --- pkg/proxy/iptables/proxier.go | 74 +++++++++++++++++++++--------- pkg/proxy/iptables/proxier_test.go | 46 +++++++++++++++++++ pkg/proxy/util/port.go | 5 +- pkg/proxy/util/port_test.go | 27 +++++++++++ 4 files changed, 130 insertions(+), 22 deletions(-) diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index 6d3bcc3606f..919744d1307 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -166,12 +166,26 @@ type endpointsInfo struct { chainName utiliptables.Chain } +// Returns just the IP part of an IP or IP:port or endpoint string. If the IP +// part is an IPv6 address enclosed in brackets (e.g. "[fd00:1::5]:9999"), +// then the brackets are stripped as well. +func ipPart(s string) string { + if ip := net.ParseIP(s); ip != nil { + // IP address without port + return s + } + // Must be IP:port + ip, _, err := net.SplitHostPort(s) + if err != nil { + glog.Errorf("Error parsing '%s': %v", s, err) + return "" + } + return ip +} + // Returns just the IP part of the endpoint. func (e *endpointsInfo) IPPart() string { - if index := strings.Index(e.endpoint, ":"); index != -1 { - return e.endpoint[0:index] - } - return e.endpoint + return ipPart(e.endpoint) } // Returns the endpoint chain name for a given endpointsInfo. @@ -320,12 +334,14 @@ func (scm *serviceChangeMap) update(namespacedName *types.NamespacedName, previo func (sm *proxyServiceMap) merge(other proxyServiceMap) sets.String { existingPorts := sets.NewString() for svcPortName, info := range other { + port := strconv.Itoa(info.port) + clusterIPPort := net.JoinHostPort(info.clusterIP.String(), port) existingPorts.Insert(svcPortName.Port) _, exists := (*sm)[svcPortName] if !exists { - glog.V(1).Infof("Adding new service port %q at %s:%d/%s", svcPortName, info.clusterIP, info.port, info.protocol) + glog.V(1).Infof("Adding new service port %q at %s/%s", svcPortName, clusterIPPort, info.protocol) } else { - glog.V(1).Infof("Updating existing service port %q at %s:%d/%s", svcPortName, info.clusterIP, info.port, info.protocol) + glog.V(1).Infof("Updating existing service port %q at %s/%s", svcPortName, clusterIPPort, info.protocol) } (*sm)[svcPortName] = info } @@ -798,11 +814,15 @@ func getLocalIPs(endpointsMap proxyEndpointsMap) map[types.NamespacedName]sets.S for svcPortName := range endpointsMap { for _, ep := range endpointsMap[svcPortName] { if ep.isLocal { - nsn := svcPortName.NamespacedName - if localIPs[nsn] == nil { - localIPs[nsn] = sets.NewString() + // If the endpoint has a bad format, ipPart() will log an + // error and ep.IPPart() will return a null string. + if ip := ep.IPPart(); ip != "" { + nsn := svcPortName.NamespacedName + if localIPs[nsn] == nil { + localIPs[nsn] = sets.NewString() + } + localIPs[nsn].Insert(ip) } - localIPs[nsn].Insert(ep.IPPart()) // just the IP part } } } @@ -924,10 +944,7 @@ type endpointServicePair struct { } func (esp *endpointServicePair) IPPart() string { - if index := strings.Index(esp.endpoint, ":"); index != -1 { - return esp.endpoint[0:index] - } - return esp.endpoint + return ipPart(esp.endpoint) } // After a UDP endpoint has been removed, we must flush any pending conntrack entries to it, or else we @@ -945,6 +962,16 @@ func (proxier *Proxier) deleteEndpointConnections(connectionMap map[endpointServ } } +// hostAddress returns a host address of the form /32 for +// IPv4 and /128 for IPv6 +func hostAddress(ip net.IP) string { + len := 32 + if ip.To4() == nil { + len = 128 + } + return fmt.Sprintf("%s/%d", ip.String(), len) +} + // This is where all of the iptables-save/restore calls happen. // The only other iptables rules are those that are setup in iptablesInit() // This assumes proxier.mu is NOT held @@ -1162,7 +1189,7 @@ func (proxier *Proxier) syncProxyRules() { "-A", string(kubeServicesChain), "-m", "comment", "--comment", fmt.Sprintf(`"%s cluster IP"`, svcNameString), "-m", protocol, "-p", protocol, - "-d", fmt.Sprintf("%s/32", svcInfo.clusterIP.String()), + "-d", hostAddress(svcInfo.clusterIP), "--dport", strconv.Itoa(svcInfo.port), ) if proxier.masqueradeAll { @@ -1216,7 +1243,7 @@ func (proxier *Proxier) syncProxyRules() { "-A", string(kubeServicesChain), "-m", "comment", "--comment", fmt.Sprintf(`"%s external IP"`, svcNameString), "-m", protocol, "-p", protocol, - "-d", fmt.Sprintf("%s/32", externalIP), + "-d", hostAddress(net.ParseIP(externalIP)), "--dport", strconv.Itoa(svcInfo.port), ) // We have to SNAT packets to external IPs. @@ -1242,7 +1269,7 @@ func (proxier *Proxier) syncProxyRules() { "-A", string(kubeServicesChain), "-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString), "-m", protocol, "-p", protocol, - "-d", fmt.Sprintf("%s/32", externalIP), + "-d", hostAddress(net.ParseIP(externalIP)), "--dport", strconv.Itoa(svcInfo.port), "-j", "REJECT", ) @@ -1268,7 +1295,7 @@ func (proxier *Proxier) syncProxyRules() { "-A", string(kubeServicesChain), "-m", "comment", "--comment", fmt.Sprintf(`"%s loadbalancer IP"`, svcNameString), "-m", protocol, "-p", protocol, - "-d", fmt.Sprintf("%s/32", ingress.IP), + "-d", hostAddress(net.ParseIP(ingress.IP)), "--dport", strconv.Itoa(svcInfo.port), ) // jump to service firewall chain @@ -1306,7 +1333,7 @@ func (proxier *Proxier) syncProxyRules() { // loadbalancer's backend hosts. In this case, request will not hit the loadbalancer but loop back directly. // Need to add the following rule to allow request on host. if allowFromNode { - writeLine(proxier.natRules, append(args, "-s", fmt.Sprintf("%s/32", ingress.IP), "-j", string(chosenChain))...) + writeLine(proxier.natRules, append(args, "-s", hostAddress(net.ParseIP(ingress.IP)), "-j", string(chosenChain))...) } } @@ -1389,7 +1416,7 @@ func (proxier *Proxier) syncProxyRules() { "-A", string(kubeServicesChain), "-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString), "-m", protocol, "-p", protocol, - "-d", fmt.Sprintf("%s/32", svcInfo.clusterIP.String()), + "-d", hostAddress(svcInfo.clusterIP), "--dport", strconv.Itoa(svcInfo.port), "-j", "REJECT", ) @@ -1433,6 +1460,11 @@ func (proxier *Proxier) syncProxyRules() { // Now write loadbalancing & DNAT rules. n := len(endpointChains) for i, endpointChain := range endpointChains { + epIP := endpoints[i].IPPart() + if epIP == "" { + // Error parsing this endpoint has been logged. Skip to next endpoint. + continue + } // Balancing rules in the per-service chain. args = append(args[:0], []string{ "-A", string(svcChain), @@ -1456,7 +1488,7 @@ func (proxier *Proxier) syncProxyRules() { ) // Handle traffic that loops back to the originator with SNAT. writeLine(proxier.natRules, append(args, - "-s", fmt.Sprintf("%s/32", endpoints[i].IPPart()), + "-s", hostAddress(net.ParseIP(epIP)), "-j", string(KubeMarkMasqChain))...) // Update client-affinity lists. if svcInfo.sessionAffinityType == api.ServiceAffinityClientIP { diff --git a/pkg/proxy/iptables/proxier_test.go b/pkg/proxy/iptables/proxier_test.go index ee900e3f0d1..7ba8e13594e 100644 --- a/pkg/proxy/iptables/proxier_test.go +++ b/pkg/proxy/iptables/proxier_test.go @@ -56,6 +56,52 @@ func checkAllLines(t *testing.T, table utiliptables.Table, save []byte, expected } } +func TestIpPart(t *testing.T) { + const noError = "" + + testCases := []struct { + endpoint string + expectedIP string + expectedError string + }{ + {"1.2.3.4", "1.2.3.4", noError}, + {"1.2.3.4:9999", "1.2.3.4", noError}, + {"2001:db8::1:1", "2001:db8::1:1", noError}, + {"[2001:db8::2:2]:9999", "2001:db8::2:2", noError}, + {"1.2.3.4::9999", "", "too many colons"}, + {"1.2.3.4:[0]", "", "unexpected '[' in address"}, + } + + for _, tc := range testCases { + ip := ipPart(tc.endpoint) + if tc.expectedError == noError { + if ip != tc.expectedIP { + t.Errorf("Unexpected IP for %s: Expected: %s, Got %s", tc.endpoint, tc.expectedIP, ip) + } + } else if ip != "" { + t.Errorf("Error did not occur for %s, expected: '%s' error", tc.endpoint, tc.expectedError) + } + } +} + +func TestHostAddress(t *testing.T) { + testCases := []struct { + ip string + expectedAddr string + }{ + {"1.2.3.4", "1.2.3.4/32"}, + {"2001:db8::1:1", "2001:db8::1:1/128"}, + } + + for _, tc := range testCases { + ip := net.ParseIP(tc.ip) + addr := hostAddress(ip) + if addr != tc.expectedAddr { + t.Errorf("Unexpected host address for %s: Expected: %s, Got %s", tc.ip, tc.expectedAddr, addr) + } + } +} + func TestReadLinesFromByteBuffer(t *testing.T) { testFn := func(byteArray []byte, expected []string) { index := 0 diff --git a/pkg/proxy/util/port.go b/pkg/proxy/util/port.go index fd1a024dac8..96317b1dc82 100644 --- a/pkg/proxy/util/port.go +++ b/pkg/proxy/util/port.go @@ -18,6 +18,8 @@ package util import ( "fmt" + "net" + "strconv" "github.com/golang/glog" ) @@ -37,7 +39,8 @@ type LocalPort struct { } func (lp *LocalPort) String() string { - return fmt.Sprintf("%q (%s:%d/%s)", lp.Description, lp.IP, lp.Port, lp.Protocol) + ipPort := net.JoinHostPort(lp.IP, strconv.Itoa(lp.Port)) + return fmt.Sprintf("%q (%s/%s)", lp.Description, ipPort, lp.Protocol) } // Closeable is an interface around closing an port. diff --git a/pkg/proxy/util/port_test.go b/pkg/proxy/util/port_test.go index 7f1cb0f9e1b..f6a4fccb7f4 100644 --- a/pkg/proxy/util/port_test.go +++ b/pkg/proxy/util/port_test.go @@ -27,6 +27,33 @@ func (c *fakeClosable) Close() error { return nil } +func TestLocalPortString(t *testing.T) { + testCases := []struct { + description string + ip string + port int + protocol string + expectedStr string + }{ + {"IPv4 UDP", "1.2.3.4", 9999, "udp", "\"IPv4 UDP\" (1.2.3.4:9999/udp)"}, + {"IPv4 TCP", "5.6.7.8", 1053, "tcp", "\"IPv4 TCP\" (5.6.7.8:1053/tcp)"}, + {"IPv6 TCP", "2001:db8::1", 80, "tcp", "\"IPv6 TCP\" ([2001:db8::1]:80/tcp)"}, + } + + for _, tc := range testCases { + lp := &LocalPort{ + Description: tc.description, + IP: tc.ip, + Port: tc.port, + Protocol: tc.protocol, + } + str := lp.String() + if str != tc.expectedStr { + t.Errorf("Unexpected output for %s, expected: %s, got: %s", tc.description, tc.expectedStr, str) + } + } +} + func TestRevertPorts(t *testing.T) { testCases := []struct { replacementPorts []LocalPort