Add IPv6 support to iptables proxier

The following changes are proposed for the iptables proxier:

* There are three places where a string specifying IP:port is parsed
  using something like this:

      if index := strings.Index(e.endpoint, ":"); index != -1 {

  This will fail for IPv6 since V6 addresses contain colons. Also,
  the V6 address is expected to be surrounded by square brackets
  (i.e. []:). Fix this by replacing call to Index with
  call to LastIndex() and stripping out square brackets.
* The String() method for the localPort struct should put square brackets
  around IPv6 addresses.
* The logging in the merge() method for proxyServiceMap should put brackets
  around IPv6 addresses.
* There are several places where filterRules destination is hardcoded to
  /32. This should be a /128 for IPv6 case.
* Add IPv6 unit test cases

fixes #48550
This commit is contained in:
Dane LeBlanc 2017-09-13 10:05:56 -04:00
parent db809c0eb7
commit 5fbc9e45cc
4 changed files with 130 additions and 22 deletions

View File

@ -166,12 +166,26 @@ type endpointsInfo struct {
chainName utiliptables.Chain
}
// Returns just the IP part of an IP or IP:port or endpoint string. If the IP
// part is an IPv6 address enclosed in brackets (e.g. "[fd00:1::5]:9999"),
// then the brackets are stripped as well.
func ipPart(s string) string {
if ip := net.ParseIP(s); ip != nil {
// IP address without port
return s
}
// Must be IP:port
ip, _, err := net.SplitHostPort(s)
if err != nil {
glog.Errorf("Error parsing '%s': %v", s, err)
return ""
}
return ip
}
// Returns just the IP part of the endpoint.
func (e *endpointsInfo) IPPart() string {
if index := strings.Index(e.endpoint, ":"); index != -1 {
return e.endpoint[0:index]
}
return e.endpoint
return ipPart(e.endpoint)
}
// Returns the endpoint chain name for a given endpointsInfo.
@ -320,12 +334,14 @@ func (scm *serviceChangeMap) update(namespacedName *types.NamespacedName, previo
func (sm *proxyServiceMap) merge(other proxyServiceMap) sets.String {
existingPorts := sets.NewString()
for svcPortName, info := range other {
port := strconv.Itoa(info.port)
clusterIPPort := net.JoinHostPort(info.clusterIP.String(), port)
existingPorts.Insert(svcPortName.Port)
_, exists := (*sm)[svcPortName]
if !exists {
glog.V(1).Infof("Adding new service port %q at %s:%d/%s", svcPortName, info.clusterIP, info.port, info.protocol)
glog.V(1).Infof("Adding new service port %q at %s/%s", svcPortName, clusterIPPort, info.protocol)
} else {
glog.V(1).Infof("Updating existing service port %q at %s:%d/%s", svcPortName, info.clusterIP, info.port, info.protocol)
glog.V(1).Infof("Updating existing service port %q at %s/%s", svcPortName, clusterIPPort, info.protocol)
}
(*sm)[svcPortName] = info
}
@ -798,11 +814,15 @@ func getLocalIPs(endpointsMap proxyEndpointsMap) map[types.NamespacedName]sets.S
for svcPortName := range endpointsMap {
for _, ep := range endpointsMap[svcPortName] {
if ep.isLocal {
nsn := svcPortName.NamespacedName
if localIPs[nsn] == nil {
localIPs[nsn] = sets.NewString()
// If the endpoint has a bad format, ipPart() will log an
// error and ep.IPPart() will return a null string.
if ip := ep.IPPart(); ip != "" {
nsn := svcPortName.NamespacedName
if localIPs[nsn] == nil {
localIPs[nsn] = sets.NewString()
}
localIPs[nsn].Insert(ip)
}
localIPs[nsn].Insert(ep.IPPart()) // just the IP part
}
}
}
@ -924,10 +944,7 @@ type endpointServicePair struct {
}
func (esp *endpointServicePair) IPPart() string {
if index := strings.Index(esp.endpoint, ":"); index != -1 {
return esp.endpoint[0:index]
}
return esp.endpoint
return ipPart(esp.endpoint)
}
// After a UDP endpoint has been removed, we must flush any pending conntrack entries to it, or else we
@ -945,6 +962,16 @@ func (proxier *Proxier) deleteEndpointConnections(connectionMap map[endpointServ
}
}
// hostAddress returns a host address of the form <ip-address>/32 for
// IPv4 and <ip-address>/128 for IPv6
func hostAddress(ip net.IP) string {
len := 32
if ip.To4() == nil {
len = 128
}
return fmt.Sprintf("%s/%d", ip.String(), len)
}
// This is where all of the iptables-save/restore calls happen.
// The only other iptables rules are those that are setup in iptablesInit()
// This assumes proxier.mu is NOT held
@ -1162,7 +1189,7 @@ func (proxier *Proxier) syncProxyRules() {
"-A", string(kubeServicesChain),
"-m", "comment", "--comment", fmt.Sprintf(`"%s cluster IP"`, svcNameString),
"-m", protocol, "-p", protocol,
"-d", fmt.Sprintf("%s/32", svcInfo.clusterIP.String()),
"-d", hostAddress(svcInfo.clusterIP),
"--dport", strconv.Itoa(svcInfo.port),
)
if proxier.masqueradeAll {
@ -1216,7 +1243,7 @@ func (proxier *Proxier) syncProxyRules() {
"-A", string(kubeServicesChain),
"-m", "comment", "--comment", fmt.Sprintf(`"%s external IP"`, svcNameString),
"-m", protocol, "-p", protocol,
"-d", fmt.Sprintf("%s/32", externalIP),
"-d", hostAddress(net.ParseIP(externalIP)),
"--dport", strconv.Itoa(svcInfo.port),
)
// We have to SNAT packets to external IPs.
@ -1242,7 +1269,7 @@ func (proxier *Proxier) syncProxyRules() {
"-A", string(kubeServicesChain),
"-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString),
"-m", protocol, "-p", protocol,
"-d", fmt.Sprintf("%s/32", externalIP),
"-d", hostAddress(net.ParseIP(externalIP)),
"--dport", strconv.Itoa(svcInfo.port),
"-j", "REJECT",
)
@ -1268,7 +1295,7 @@ func (proxier *Proxier) syncProxyRules() {
"-A", string(kubeServicesChain),
"-m", "comment", "--comment", fmt.Sprintf(`"%s loadbalancer IP"`, svcNameString),
"-m", protocol, "-p", protocol,
"-d", fmt.Sprintf("%s/32", ingress.IP),
"-d", hostAddress(net.ParseIP(ingress.IP)),
"--dport", strconv.Itoa(svcInfo.port),
)
// jump to service firewall chain
@ -1306,7 +1333,7 @@ func (proxier *Proxier) syncProxyRules() {
// loadbalancer's backend hosts. In this case, request will not hit the loadbalancer but loop back directly.
// Need to add the following rule to allow request on host.
if allowFromNode {
writeLine(proxier.natRules, append(args, "-s", fmt.Sprintf("%s/32", ingress.IP), "-j", string(chosenChain))...)
writeLine(proxier.natRules, append(args, "-s", hostAddress(net.ParseIP(ingress.IP)), "-j", string(chosenChain))...)
}
}
@ -1389,7 +1416,7 @@ func (proxier *Proxier) syncProxyRules() {
"-A", string(kubeServicesChain),
"-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString),
"-m", protocol, "-p", protocol,
"-d", fmt.Sprintf("%s/32", svcInfo.clusterIP.String()),
"-d", hostAddress(svcInfo.clusterIP),
"--dport", strconv.Itoa(svcInfo.port),
"-j", "REJECT",
)
@ -1433,6 +1460,11 @@ func (proxier *Proxier) syncProxyRules() {
// Now write loadbalancing & DNAT rules.
n := len(endpointChains)
for i, endpointChain := range endpointChains {
epIP := endpoints[i].IPPart()
if epIP == "" {
// Error parsing this endpoint has been logged. Skip to next endpoint.
continue
}
// Balancing rules in the per-service chain.
args = append(args[:0], []string{
"-A", string(svcChain),
@ -1456,7 +1488,7 @@ func (proxier *Proxier) syncProxyRules() {
)
// Handle traffic that loops back to the originator with SNAT.
writeLine(proxier.natRules, append(args,
"-s", fmt.Sprintf("%s/32", endpoints[i].IPPart()),
"-s", hostAddress(net.ParseIP(epIP)),
"-j", string(KubeMarkMasqChain))...)
// Update client-affinity lists.
if svcInfo.sessionAffinityType == api.ServiceAffinityClientIP {

View File

@ -56,6 +56,52 @@ func checkAllLines(t *testing.T, table utiliptables.Table, save []byte, expected
}
}
func TestIpPart(t *testing.T) {
const noError = ""
testCases := []struct {
endpoint string
expectedIP string
expectedError string
}{
{"1.2.3.4", "1.2.3.4", noError},
{"1.2.3.4:9999", "1.2.3.4", noError},
{"2001:db8::1:1", "2001:db8::1:1", noError},
{"[2001:db8::2:2]:9999", "2001:db8::2:2", noError},
{"1.2.3.4::9999", "", "too many colons"},
{"1.2.3.4:[0]", "", "unexpected '[' in address"},
}
for _, tc := range testCases {
ip := ipPart(tc.endpoint)
if tc.expectedError == noError {
if ip != tc.expectedIP {
t.Errorf("Unexpected IP for %s: Expected: %s, Got %s", tc.endpoint, tc.expectedIP, ip)
}
} else if ip != "" {
t.Errorf("Error did not occur for %s, expected: '%s' error", tc.endpoint, tc.expectedError)
}
}
}
func TestHostAddress(t *testing.T) {
testCases := []struct {
ip string
expectedAddr string
}{
{"1.2.3.4", "1.2.3.4/32"},
{"2001:db8::1:1", "2001:db8::1:1/128"},
}
for _, tc := range testCases {
ip := net.ParseIP(tc.ip)
addr := hostAddress(ip)
if addr != tc.expectedAddr {
t.Errorf("Unexpected host address for %s: Expected: %s, Got %s", tc.ip, tc.expectedAddr, addr)
}
}
}
func TestReadLinesFromByteBuffer(t *testing.T) {
testFn := func(byteArray []byte, expected []string) {
index := 0

View File

@ -18,6 +18,8 @@ package util
import (
"fmt"
"net"
"strconv"
"github.com/golang/glog"
)
@ -37,7 +39,8 @@ type LocalPort struct {
}
func (lp *LocalPort) String() string {
return fmt.Sprintf("%q (%s:%d/%s)", lp.Description, lp.IP, lp.Port, lp.Protocol)
ipPort := net.JoinHostPort(lp.IP, strconv.Itoa(lp.Port))
return fmt.Sprintf("%q (%s/%s)", lp.Description, ipPort, lp.Protocol)
}
// Closeable is an interface around closing an port.

View File

@ -27,6 +27,33 @@ func (c *fakeClosable) Close() error {
return nil
}
func TestLocalPortString(t *testing.T) {
testCases := []struct {
description string
ip string
port int
protocol string
expectedStr string
}{
{"IPv4 UDP", "1.2.3.4", 9999, "udp", "\"IPv4 UDP\" (1.2.3.4:9999/udp)"},
{"IPv4 TCP", "5.6.7.8", 1053, "tcp", "\"IPv4 TCP\" (5.6.7.8:1053/tcp)"},
{"IPv6 TCP", "2001:db8::1", 80, "tcp", "\"IPv6 TCP\" ([2001:db8::1]:80/tcp)"},
}
for _, tc := range testCases {
lp := &LocalPort{
Description: tc.description,
IP: tc.ip,
Port: tc.port,
Protocol: tc.protocol,
}
str := lp.String()
if str != tc.expectedStr {
t.Errorf("Unexpected output for %s, expected: %s, got: %s", tc.description, tc.expectedStr, str)
}
}
}
func TestRevertPorts(t *testing.T) {
testCases := []struct {
replacementPorts []LocalPort