Merge pull request #80854 from aojea/hostportv6

Add IPv6 support to kubenet hostport
This commit is contained in:
Kubernetes Prow Robot 2019-11-16 04:35:41 -08:00 committed by GitHub
commit 09d142a7ab
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 531 additions and 42 deletions

View File

@ -40,6 +40,7 @@ type fakeTable struct {
type fakeIPTables struct { type fakeIPTables struct {
tables map[string]*fakeTable tables map[string]*fakeTable
builtinChains map[string]sets.String builtinChains map[string]sets.String
ipv6 bool
} }
func NewFakeIPTables() *fakeIPTables { func NewFakeIPTables() *fakeIPTables {
@ -50,6 +51,7 @@ func NewFakeIPTables() *fakeIPTables {
string(utiliptables.TableNAT): sets.NewString("PREROUTING", "INPUT", "OUTPUT", "POSTROUTING"), string(utiliptables.TableNAT): sets.NewString("PREROUTING", "INPUT", "OUTPUT", "POSTROUTING"),
string(utiliptables.TableMangle): sets.NewString("PREROUTING", "INPUT", "FORWARD", "OUTPUT", "POSTROUTING"), string(utiliptables.TableMangle): sets.NewString("PREROUTING", "INPUT", "FORWARD", "OUTPUT", "POSTROUTING"),
}, },
ipv6: false,
} }
} }
@ -222,7 +224,7 @@ func (f *fakeIPTables) DeleteRule(tableName utiliptables.Table, chainName utilip
} }
func (f *fakeIPTables) IsIpv6() bool { func (f *fakeIPTables) IsIpv6() bool {
return false return f.ipv6
} }
func saveChain(chain *fakeChain, data *bytes.Buffer) { func saveChain(chain *fakeChain, data *bytes.Buffer) {

View File

@ -23,7 +23,7 @@ import (
"k8s.io/klog" "k8s.io/klog"
"k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
utiliptables "k8s.io/kubernetes/pkg/util/iptables" utiliptables "k8s.io/kubernetes/pkg/util/iptables"
) )
@ -136,7 +136,11 @@ func ensureKubeHostportChains(iptables utiliptables.Interface, natInterfaceName
} }
if natInterfaceName != "" && natInterfaceName != "lo" { if natInterfaceName != "" && natInterfaceName != "lo" {
// Need to SNAT traffic from localhost // Need to SNAT traffic from localhost
args = []string{"-m", "comment", "--comment", "SNAT for localhost access to hostports", "-o", natInterfaceName, "-s", "127.0.0.0/8", "-j", "MASQUERADE"} localhost := "127.0.0.0/8"
if iptables.IsIpv6() {
localhost = "::1/128"
}
args = []string{"-m", "comment", "--comment", "SNAT for localhost access to hostports", "-o", natInterfaceName, "-s", localhost, "-j", "MASQUERADE"}
if _, err := iptables.EnsureRule(utiliptables.Append, utiliptables.TableNAT, utiliptables.ChainPostrouting, args...); err != nil { if _, err := iptables.EnsureRule(utiliptables.Append, utiliptables.TableNAT, utiliptables.ChainPostrouting, args...); err != nil {
return fmt.Errorf("failed to ensure that %s chain %s jumps to MASQUERADE: %v", utiliptables.TableNAT, utiliptables.ChainPostrouting, err) return fmt.Errorf("failed to ensure that %s chain %s jumps to MASQUERADE: %v", utiliptables.TableNAT, utiliptables.ChainPostrouting, err)
} }

View File

@ -21,11 +21,12 @@ import (
"crypto/sha256" "crypto/sha256"
"encoding/base32" "encoding/base32"
"fmt" "fmt"
"net"
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
"k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
utilerrors "k8s.io/apimachinery/pkg/util/errors" utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/klog" "k8s.io/klog"
iptablesproxy "k8s.io/kubernetes/pkg/proxy/iptables" iptablesproxy "k8s.io/kubernetes/pkg/proxy/iptables"
@ -82,10 +83,16 @@ func (hm *hostportManager) Add(id string, podPortMapping *PodPortMapping, natInt
return nil return nil
} }
if podPortMapping.IP.To4() == nil { // IP.To16() returns nil if IP is not a valid IPv4 or IPv6 address
if podPortMapping.IP.To16() == nil {
return fmt.Errorf("invalid or missing IP of pod %s", podFullName) return fmt.Errorf("invalid or missing IP of pod %s", podFullName)
} }
podIP := podPortMapping.IP.String() podIP := podPortMapping.IP.String()
isIpv6 := utilnet.IsIPv6(podPortMapping.IP)
if isIpv6 != hm.iptables.IsIpv6() {
return fmt.Errorf("HostPortManager IP family mismatch: %v, isIPv6 - %v", podIP, isIpv6)
}
if err = ensureKubeHostportChains(hm.iptables, natInterfaceName); err != nil { if err = ensureKubeHostportChains(hm.iptables, natInterfaceName); err != nil {
return err return err
@ -142,10 +149,11 @@ func (hm *hostportManager) Add(id string, podPortMapping *PodPortMapping, natInt
"-j", string(iptablesproxy.KubeMarkMasqChain)) "-j", string(iptablesproxy.KubeMarkMasqChain))
// DNAT to the podIP:containerPort // DNAT to the podIP:containerPort
hostPortBinding := net.JoinHostPort(podIP, strconv.Itoa(int(pm.ContainerPort)))
writeLine(natRules, "-A", string(chain), writeLine(natRules, "-A", string(chain),
"-m", "comment", "--comment", fmt.Sprintf(`"%s hostport %d"`, podFullName, pm.HostPort), "-m", "comment", "--comment", fmt.Sprintf(`"%s hostport %d"`, podFullName, pm.HostPort),
"-m", protocol, "-p", protocol, "-m", protocol, "-p", protocol,
"-j", "DNAT", fmt.Sprintf("--to-destination=%s:%d", podIP, pm.ContainerPort)) "-j", "DNAT", fmt.Sprintf("--to-destination=%s", hostPortBinding))
} }
// getHostportChain should be able to provide unique hostport chain name using hash // getHostportChain should be able to provide unique hostport chain name using hash
@ -166,7 +174,6 @@ func (hm *hostportManager) Add(id string, podPortMapping *PodPortMapping, natInt
// clean up opened host port if encounter any error // clean up opened host port if encounter any error
return utilerrors.NewAggregate([]error{err, hm.closeHostports(hostportMappings)}) return utilerrors.NewAggregate([]error{err, hm.closeHostports(hostportMappings)})
} }
isIpv6 := utilnet.IsIPv6(podPortMapping.IP)
// Remove conntrack entries just after adding the new iptables rules. If the conntrack entry is removed along with // Remove conntrack entries just after adding the new iptables rules. If the conntrack entry is removed along with
// the IP tables rule, it can be the case that the packets received by the node after iptables rule removal will // the IP tables rule, it can be the case that the packets received by the node after iptables rule removal will

View File

@ -23,7 +23,7 @@ import (
"testing" "testing"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
utiliptables "k8s.io/kubernetes/pkg/util/iptables" utiliptables "k8s.io/kubernetes/pkg/util/iptables"
"k8s.io/utils/exec" "k8s.io/utils/exec"
) )
@ -253,6 +253,22 @@ func TestHostportManager(t *testing.T) {
}, },
expectError: false, expectError: false,
}, },
{
mapping: &PodPortMapping{
Name: "pod3",
Namespace: "ns1",
IP: net.ParseIP("2001:beef::2"),
HostNetwork: false,
PortMappings: []*PortMapping{
{
HostPort: 8443,
ContainerPort: 443,
Protocol: v1.ProtocolTCP,
},
},
},
expectError: true,
},
} }
// Add Hostports // Add Hostports
@ -362,3 +378,199 @@ func TestGetHostportChain(t *testing.T) {
t.Fatal(m) t.Fatal(m)
} }
} }
func TestHostportManagerIPv6(t *testing.T) {
iptables := NewFakeIPTables()
iptables.ipv6 = true
portOpener := NewFakeSocketManager()
manager := &hostportManager{
hostPortMap: make(map[hostport]closeable),
iptables: iptables,
portOpener: portOpener.openFakeSocket,
execer: exec.New(),
}
testCases := []struct {
mapping *PodPortMapping
expectError bool
}{
{
mapping: &PodPortMapping{
Name: "pod1",
Namespace: "ns1",
IP: net.ParseIP("2001:beef::2"),
HostNetwork: false,
PortMappings: []*PortMapping{
{
HostPort: 8080,
ContainerPort: 80,
Protocol: v1.ProtocolTCP,
},
{
HostPort: 8081,
ContainerPort: 81,
Protocol: v1.ProtocolUDP,
},
{
HostPort: 8083,
ContainerPort: 83,
Protocol: v1.ProtocolSCTP,
},
},
},
expectError: false,
},
{
mapping: &PodPortMapping{
Name: "pod2",
Namespace: "ns1",
IP: net.ParseIP("2001:beef::3"),
HostNetwork: false,
PortMappings: []*PortMapping{
{
HostPort: 8082,
ContainerPort: 80,
Protocol: v1.ProtocolTCP,
},
{
HostPort: 8081,
ContainerPort: 81,
Protocol: v1.ProtocolUDP,
},
{
HostPort: 8083,
ContainerPort: 83,
Protocol: v1.ProtocolSCTP,
},
},
},
expectError: true,
},
{
mapping: &PodPortMapping{
Name: "pod3",
Namespace: "ns1",
IP: net.ParseIP("2001:beef::4"),
HostNetwork: false,
PortMappings: []*PortMapping{
{
HostPort: 8443,
ContainerPort: 443,
Protocol: v1.ProtocolTCP,
},
},
},
expectError: false,
},
{
mapping: &PodPortMapping{
Name: "pod4",
Namespace: "ns2",
IP: net.ParseIP("192.168.2.2"),
HostNetwork: false,
PortMappings: []*PortMapping{
{
HostPort: 8443,
ContainerPort: 443,
Protocol: v1.ProtocolTCP,
},
},
},
expectError: true,
},
}
// Add Hostports
for _, tc := range testCases {
err := manager.Add("id", tc.mapping, "cbr0")
if tc.expectError {
assert.Error(t, err)
continue
}
assert.NoError(t, err)
}
// Check port opened
expectedPorts := []hostport{{8080, "tcp"}, {8081, "udp"}, {8443, "tcp"}}
openedPorts := make(map[hostport]bool)
for hp, port := range portOpener.mem {
if !port.closed {
openedPorts[hp] = true
}
}
assert.EqualValues(t, len(openedPorts), len(expectedPorts))
for _, hp := range expectedPorts {
_, ok := openedPorts[hp]
assert.EqualValues(t, true, ok)
}
// Check Iptables-save result after adding hostports
raw := bytes.NewBuffer(nil)
err := iptables.SaveInto(utiliptables.TableNAT, raw)
assert.NoError(t, err)
lines := strings.Split(string(raw.Bytes()), "\n")
expectedLines := map[string]bool{
`*nat`: true,
`:KUBE-HOSTPORTS - [0:0]`: true,
`:OUTPUT - [0:0]`: true,
`:PREROUTING - [0:0]`: true,
`:POSTROUTING - [0:0]`: true,
`:KUBE-HP-IJHALPHTORMHHPPK - [0:0]`: true,
`:KUBE-HP-63UPIDJXVRSZGSUZ - [0:0]`: true,
`:KUBE-HP-WFBOALXEP42XEMJK - [0:0]`: true,
`:KUBE-HP-XU6AWMMJYOZOFTFZ - [0:0]`: true,
"-A KUBE-HOSTPORTS -m comment --comment \"pod3_ns1 hostport 8443\" -m tcp -p tcp --dport 8443 -j KUBE-HP-WFBOALXEP42XEMJK": true,
"-A KUBE-HOSTPORTS -m comment --comment \"pod1_ns1 hostport 8081\" -m udp -p udp --dport 8081 -j KUBE-HP-63UPIDJXVRSZGSUZ": true,
"-A KUBE-HOSTPORTS -m comment --comment \"pod1_ns1 hostport 8080\" -m tcp -p tcp --dport 8080 -j KUBE-HP-IJHALPHTORMHHPPK": true,
"-A KUBE-HOSTPORTS -m comment --comment \"pod1_ns1 hostport 8083\" -m sctp -p sctp --dport 8083 -j KUBE-HP-XU6AWMMJYOZOFTFZ": true,
"-A OUTPUT -m comment --comment \"kube hostport portals\" -m addrtype --dst-type LOCAL -j KUBE-HOSTPORTS": true,
"-A PREROUTING -m comment --comment \"kube hostport portals\" -m addrtype --dst-type LOCAL -j KUBE-HOSTPORTS": true,
"-A POSTROUTING -m comment --comment \"SNAT for localhost access to hostports\" -o cbr0 -s ::1/128 -j MASQUERADE": true,
"-A KUBE-HP-IJHALPHTORMHHPPK -m comment --comment \"pod1_ns1 hostport 8080\" -s 2001:beef::2/32 -j KUBE-MARK-MASQ": true,
"-A KUBE-HP-IJHALPHTORMHHPPK -m comment --comment \"pod1_ns1 hostport 8080\" -m tcp -p tcp -j DNAT --to-destination [2001:beef::2]:80": true,
"-A KUBE-HP-63UPIDJXVRSZGSUZ -m comment --comment \"pod1_ns1 hostport 8081\" -s 2001:beef::2/32 -j KUBE-MARK-MASQ": true,
"-A KUBE-HP-63UPIDJXVRSZGSUZ -m comment --comment \"pod1_ns1 hostport 8081\" -m udp -p udp -j DNAT --to-destination [2001:beef::2]:81": true,
"-A KUBE-HP-XU6AWMMJYOZOFTFZ -m comment --comment \"pod1_ns1 hostport 8083\" -s 2001:beef::2/32 -j KUBE-MARK-MASQ": true,
"-A KUBE-HP-XU6AWMMJYOZOFTFZ -m comment --comment \"pod1_ns1 hostport 8083\" -m sctp -p sctp -j DNAT --to-destination [2001:beef::2]:83": true,
"-A KUBE-HP-WFBOALXEP42XEMJK -m comment --comment \"pod3_ns1 hostport 8443\" -s 2001:beef::4/32 -j KUBE-MARK-MASQ": true,
"-A KUBE-HP-WFBOALXEP42XEMJK -m comment --comment \"pod3_ns1 hostport 8443\" -m tcp -p tcp -j DNAT --to-destination [2001:beef::4]:443": true,
`COMMIT`: true,
}
for _, line := range lines {
if len(strings.TrimSpace(line)) > 0 {
_, ok := expectedLines[strings.TrimSpace(line)]
assert.EqualValues(t, true, ok)
}
}
// Remove all added hostports
for _, tc := range testCases {
if !tc.expectError {
err := manager.Remove("id", tc.mapping)
assert.NoError(t, err)
}
}
// Check Iptables-save result after deleting hostports
raw.Reset()
err = iptables.SaveInto(utiliptables.TableNAT, raw)
assert.NoError(t, err)
lines = strings.Split(string(raw.Bytes()), "\n")
remainingChains := make(map[string]bool)
for _, line := range lines {
if strings.HasPrefix(line, ":") {
remainingChains[strings.TrimSpace(line)] = true
}
}
expectDeletedChains := []string{"KUBE-HP-4YVONL46AKYWSKS3", "KUBE-HP-7THKRFSEH4GIIXK7", "KUBE-HP-5N7UH5JAXCVP5UJR"}
for _, chain := range expectDeletedChains {
_, ok := remainingChains[chain]
assert.EqualValues(t, false, ok)
}
// check if all ports are closed
for _, port := range portOpener.mem {
assert.EqualValues(t, true, port.closed)
}
}

View File

@ -21,15 +21,17 @@ import (
"crypto/sha256" "crypto/sha256"
"encoding/base32" "encoding/base32"
"fmt" "fmt"
"net"
"strconv" "strconv"
"strings" "strings"
"time" "time"
"k8s.io/klog" "k8s.io/klog"
"k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
iptablesproxy "k8s.io/kubernetes/pkg/proxy/iptables" iptablesproxy "k8s.io/kubernetes/pkg/proxy/iptables"
utiliptables "k8s.io/kubernetes/pkg/util/iptables" utiliptables "k8s.io/kubernetes/pkg/util/iptables"
utilnet "k8s.io/utils/net"
) )
// HostportSyncer takes a list of PodPortMappings and implements hostport all at once // HostportSyncer takes a list of PodPortMappings and implements hostport all at once
@ -118,12 +120,18 @@ func getPodFullName(pod *PodPortMapping) string {
// gatherAllHostports returns all hostports that should be presented on node, // gatherAllHostports returns all hostports that should be presented on node,
// given the list of pods running on that node and ignoring host network // given the list of pods running on that node and ignoring host network
// pods (which don't need hostport <-> container port mapping). // pods (which don't need hostport <-> container port mapping)
func gatherAllHostports(activePodPortMappings []*PodPortMapping) (map[*PortMapping]targetPod, error) { // It only returns the hosports that match the IP family passed as parameter
func gatherAllHostports(activePodPortMappings []*PodPortMapping, isIPv6 bool) (map[*PortMapping]targetPod, error) {
podHostportMap := make(map[*PortMapping]targetPod) podHostportMap := make(map[*PortMapping]targetPod)
for _, pm := range activePodPortMappings { for _, pm := range activePodPortMappings {
if pm.IP.To4() == nil { // IP.To16() returns nil if IP is not a valid IPv4 or IPv6 address
return nil, fmt.Errorf("invalid or missing pod %s IP", getPodFullName(pm)) if pm.IP.To16() == nil {
return nil, fmt.Errorf("Invalid or missing pod %s IP", getPodFullName(pm))
}
// return only entries from the same IP family
if utilnet.IsIPv6(pm.IP) != isIPv6 {
continue
} }
// should not handle hostports for hostnetwork pods // should not handle hostports for hostnetwork pods
if pm.HostNetwork { if pm.HostNetwork {
@ -191,7 +199,7 @@ func (h *hostportSyncer) SyncHostports(natInterfaceName string, activePodPortMap
klog.V(4).Infof("syncHostportsRules took %v", time.Since(start)) klog.V(4).Infof("syncHostportsRules took %v", time.Since(start))
}() }()
hostportPodMap, err := gatherAllHostports(activePodPortMappings) hostportPodMap, err := gatherAllHostports(activePodPortMappings, h.iptables.IsIpv6())
if err != nil { if err != nil {
return err return err
} }
@ -227,6 +235,7 @@ func (h *hostportSyncer) SyncHostports(natInterfaceName string, activePodPortMap
for port, target := range hostportPodMap { for port, target := range hostportPodMap {
protocol := strings.ToLower(string(port.Protocol)) protocol := strings.ToLower(string(port.Protocol))
hostportChain := hostportChainName(port, target.podFullName) hostportChain := hostportChainName(port, target.podFullName)
if chain, ok := existingNATChains[hostportChain]; ok { if chain, ok := existingNATChains[hostportChain]; ok {
writeBytesLine(natChains, chain) writeBytesLine(natChains, chain)
} else { } else {
@ -256,11 +265,12 @@ func (h *hostportSyncer) SyncHostports(natInterfaceName string, activePodPortMap
// Create hostport chain to DNAT traffic to final destination // Create hostport chain to DNAT traffic to final destination
// IPTables will maintained the stats for this chain // IPTables will maintained the stats for this chain
hostPortBinding := net.JoinHostPort(target.podIP, strconv.Itoa(int(port.ContainerPort)))
args = []string{ args = []string{
"-A", string(hostportChain), "-A", string(hostportChain),
"-m", "comment", "--comment", fmt.Sprintf(`"%s hostport %d"`, target.podFullName, port.HostPort), "-m", "comment", "--comment", fmt.Sprintf(`"%s hostport %d"`, target.podFullName, port.HostPort),
"-m", protocol, "-p", protocol, "-m", protocol, "-p", protocol,
"-j", "DNAT", fmt.Sprintf("--to-destination=%s:%d", target.podIP, port.ContainerPort), "-j", "DNAT", fmt.Sprintf("--to-destination=%s", hostPortBinding),
} }
writeLine(natRules, args...) writeLine(natRules, args...)
} }

View File

@ -23,7 +23,7 @@ import (
"testing" "testing"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
utiliptables "k8s.io/kubernetes/pkg/util/iptables" utiliptables "k8s.io/kubernetes/pkg/util/iptables"
) )
@ -133,6 +133,23 @@ func TestOpenPodHostports(t *testing.T) {
}, },
}, },
}, },
// IPv6 pod
{
&PodPortMapping{
Name: "ipv6-test-pod",
Namespace: v1.NamespaceDefault,
IP: net.ParseIP("2001:dead::5"),
HostNetwork: false,
PortMappings: []*PortMapping{
{
HostPort: 123,
ContainerPort: 654,
Protocol: v1.ProtocolTCP,
},
},
},
[]*ruleMatch{},
},
} }
activePodPortMapping := make([]*PodPortMapping, 0) activePodPortMapping := make([]*PodPortMapping, 0)
@ -225,6 +242,207 @@ func matchRule(chain *fakeChain, match string) bool {
return false return false
} }
func TestOpenPodHostportsIPv6(t *testing.T) {
fakeIPTables := NewFakeIPTables()
fakeIPTables.ipv6 = true
fakeOpener := NewFakeSocketManager()
h := &hostportSyncer{
hostPortMap: make(map[hostport]closeable),
iptables: fakeIPTables,
portOpener: fakeOpener.openFakeSocket,
}
tests := []struct {
mapping *PodPortMapping
matches []*ruleMatch
}{
// New pod that we are going to add
{
&PodPortMapping{
Name: "test-pod",
Namespace: v1.NamespaceDefault,
IP: net.ParseIP("2001:beef::2"),
HostNetwork: false,
PortMappings: []*PortMapping{
{
HostPort: 4567,
ContainerPort: 80,
Protocol: v1.ProtocolTCP,
},
{
HostPort: 5678,
ContainerPort: 81,
Protocol: v1.ProtocolUDP,
},
},
},
[]*ruleMatch{
{
-1,
"KUBE-HOSTPORTS",
"-m comment --comment \"test-pod_default hostport 4567\" -m tcp -p tcp --dport 4567",
},
{
4567,
"",
"-m comment --comment \"test-pod_default hostport 4567\" -s 2001:beef::2/32 -j KUBE-MARK-MASQ",
},
{
4567,
"",
"-m comment --comment \"test-pod_default hostport 4567\" -m tcp -p tcp -j DNAT --to-destination [2001:beef::2]:80",
},
{
-1,
"KUBE-HOSTPORTS",
"-m comment --comment \"test-pod_default hostport 5678\" -m udp -p udp --dport 5678",
},
{
5678,
"",
"-m comment --comment \"test-pod_default hostport 5678\" -s 2001:beef::2/32 -j KUBE-MARK-MASQ",
},
{
5678,
"",
"-m comment --comment \"test-pod_default hostport 5678\" -m udp -p udp -j DNAT --to-destination [2001:beef::2]:81",
},
},
},
// Already running pod
{
&PodPortMapping{
Name: "another-test-pod",
Namespace: v1.NamespaceDefault,
IP: net.ParseIP("2001:beef::5"),
HostNetwork: false,
PortMappings: []*PortMapping{
{
HostPort: 123,
ContainerPort: 654,
Protocol: v1.ProtocolTCP,
},
},
},
[]*ruleMatch{
{
-1,
"KUBE-HOSTPORTS",
"-m comment --comment \"another-test-pod_default hostport 123\" -m tcp -p tcp --dport 123",
},
{
123,
"",
"-m comment --comment \"another-test-pod_default hostport 123\" -s 2001:beef::5/32 -j KUBE-MARK-MASQ",
},
{
123,
"",
"-m comment --comment \"another-test-pod_default hostport 123\" -m tcp -p tcp -j DNAT --to-destination [2001:beef::5]:654",
},
},
},
// IPv4 pod
{
&PodPortMapping{
Name: "ipv4-test-pod",
Namespace: v1.NamespaceDefault,
IP: net.ParseIP("192.168.2.5"),
HostNetwork: false,
PortMappings: []*PortMapping{
{
HostPort: 123,
ContainerPort: 654,
Protocol: v1.ProtocolTCP,
},
},
},
[]*ruleMatch{},
},
}
activePodPortMapping := make([]*PodPortMapping, 0)
// Fill in any match rules missing chain names
for _, test := range tests {
for _, match := range test.matches {
if match.hostport >= 0 {
found := false
for _, pm := range test.mapping.PortMappings {
if int(pm.HostPort) == match.hostport {
match.chain = string(hostportChainName(pm, getPodFullName(test.mapping)))
found = true
break
}
}
if !found {
t.Fatalf("Failed to find ContainerPort for match %d/'%s'", match.hostport, match.match)
}
}
}
activePodPortMapping = append(activePodPortMapping, test.mapping)
}
// Already running pod's host port
hp := hostport{
tests[1].mapping.PortMappings[0].HostPort,
strings.ToLower(string(tests[1].mapping.PortMappings[0].Protocol)),
}
h.hostPortMap[hp] = &fakeSocket{
tests[1].mapping.PortMappings[0].HostPort,
strings.ToLower(string(tests[1].mapping.PortMappings[0].Protocol)),
false,
}
err := h.OpenPodHostportsAndSync(tests[0].mapping, "br0", activePodPortMapping)
if err != nil {
t.Fatalf("Failed to OpenPodHostportsAndSync: %v", err)
}
// Generic rules
genericRules := []*ruleMatch{
{-1, "POSTROUTING", "-m comment --comment \"SNAT for localhost access to hostports\" -o br0 -s ::1/128 -j MASQUERADE"},
{-1, "PREROUTING", "-m comment --comment \"kube hostport portals\" -m addrtype --dst-type LOCAL -j KUBE-HOSTPORTS"},
{-1, "OUTPUT", "-m comment --comment \"kube hostport portals\" -m addrtype --dst-type LOCAL -j KUBE-HOSTPORTS"},
}
for _, rule := range genericRules {
_, chain, err := fakeIPTables.getChain(utiliptables.TableNAT, utiliptables.Chain(rule.chain))
if err != nil {
t.Fatalf("Expected NAT chain %s did not exist", rule.chain)
}
if !matchRule(chain, rule.match) {
t.Fatalf("Expected %s chain rule match '%s' not found", rule.chain, rule.match)
}
}
// Pod rules
for _, test := range tests {
for _, match := range test.matches {
// Ensure chain exists
_, chain, err := fakeIPTables.getChain(utiliptables.TableNAT, utiliptables.Chain(match.chain))
if err != nil {
t.Fatalf("Expected NAT chain %s did not exist", match.chain)
}
if !matchRule(chain, match.match) {
t.Fatalf("Expected NAT chain %s rule containing '%s' not found", match.chain, match.match)
}
}
}
// Socket
hostPortMap := map[hostport]closeable{
{123, "tcp"}: &fakeSocket{123, "tcp", false},
{4567, "tcp"}: &fakeSocket{4567, "tcp", false},
{5678, "udp"}: &fakeSocket{5678, "udp", false},
}
if !reflect.DeepEqual(hostPortMap, h.hostPortMap) {
t.Fatalf("Mismatch in expected hostPortMap. Expected '%v', got '%v'", hostPortMap, h.hostPortMap)
}
}
func TestHostportChainName(t *testing.T) { func TestHostportChainName(t *testing.T) {
m := make(map[string]int) m := make(map[string]int)
chain := hostportChainName(&PortMapping{HostPort: 57119, Protocol: "TCP", ContainerPort: 57119}, "testrdma-2") chain := hostportChainName(&PortMapping{HostPort: 57119, Protocol: "TCP", ContainerPort: 57119}, "testrdma-2")

View File

@ -104,12 +104,14 @@ type kubenetNetworkPlugin struct {
// kubenet can use either hostportSyncer and hostportManager to implement hostports // kubenet can use either hostportSyncer and hostportManager to implement hostports
// Currently, if network host supports legacy features, hostportSyncer will be used, // Currently, if network host supports legacy features, hostportSyncer will be used,
// otherwise, hostportManager will be used. // otherwise, hostportManager will be used.
hostportSyncer hostport.HostportSyncer hostportSyncer hostport.HostportSyncer
hostportManager hostport.HostPortManager hostportSyncerv6 hostport.HostportSyncer
iptables utiliptables.Interface hostportManager hostport.HostPortManager
iptablesv6 utiliptables.Interface hostportManagerv6 hostport.HostPortManager
sysctl utilsysctl.Interface iptables utiliptables.Interface
ebtables utilebtables.Interface iptablesv6 utiliptables.Interface
sysctl utilsysctl.Interface
ebtables utilebtables.Interface
// binDirs is passed by kubelet cni-bin-dir parameter. // binDirs is passed by kubelet cni-bin-dir parameter.
// kubenet will search for CNI binaries in DefaultCNIDir first, then continue to binDirs. // kubenet will search for CNI binaries in DefaultCNIDir first, then continue to binDirs.
binDirs []string binDirs []string
@ -131,7 +133,9 @@ func NewPlugin(networkPluginDirs []string, cacheDir string) network.NetworkPlugi
sysctl: utilsysctl.New(), sysctl: utilsysctl.New(),
binDirs: append([]string{DefaultCNIDir}, networkPluginDirs...), binDirs: append([]string{DefaultCNIDir}, networkPluginDirs...),
hostportSyncer: hostport.NewHostportSyncer(iptInterface), hostportSyncer: hostport.NewHostportSyncer(iptInterface),
hostportSyncerv6: hostport.NewHostportSyncer(iptInterfacev6),
hostportManager: hostport.NewHostportManager(iptInterface), hostportManager: hostport.NewHostportManager(iptInterface),
hostportManagerv6: hostport.NewHostportManager(iptInterfacev6),
nonMasqueradeCIDR: "10.0.0.0/8", nonMasqueradeCIDR: "10.0.0.0/8",
cacheDir: cacheDir, cacheDir: cacheDir,
podCIDRs: make([]*net.IPNet, 0), podCIDRs: make([]*net.IPNet, 0),
@ -454,8 +458,14 @@ func (plugin *kubenetNetworkPlugin) addPortMapping(id kubecontainer.ContainerID,
IP: net.ParseIP(ip), IP: net.ParseIP(ip),
HostNetwork: false, HostNetwork: false,
} }
if err := plugin.hostportManager.Add(id.ID, pm, BridgeName); err != nil { if netutils.IsIPv6(pm.IP) {
return err if err := plugin.hostportManagerv6.Add(id.ID, pm, BridgeName); err != nil {
return err
}
} else {
if err := plugin.hostportManager.Add(id.ID, pm, BridgeName); err != nil {
return err
}
} }
} }
@ -497,37 +507,57 @@ func (plugin *kubenetNetworkPlugin) teardown(namespace string, name string, id k
// Loopback network deletion failure should not be fatal on teardown // Loopback network deletion failure should not be fatal on teardown
if err := plugin.delContainerFromNetwork(plugin.loConfig, "lo", namespace, name, id); err != nil { if err := plugin.delContainerFromNetwork(plugin.loConfig, "lo", namespace, name, id); err != nil {
klog.Warningf("Failed to delete loopback network: %v", err) klog.Warningf("Failed to delete loopback network: %v", err)
errList = append(errList, err)
} }
// no ip dependent actions // no ip dependent actions
if err := plugin.delContainerFromNetwork(plugin.netConfig, network.DefaultInterfaceName, namespace, name, id); err != nil { if err := plugin.delContainerFromNetwork(plugin.netConfig, network.DefaultInterfaceName, namespace, name, id); err != nil {
klog.Warningf("Failed to delete %q network: %v", network.DefaultInterfaceName, err)
errList = append(errList, err) errList = append(errList, err)
} }
portMappings, err := plugin.host.GetPodPortMappings(id.ID) // If there are no IPs registered we can't teardown pod's IP dependencies
if err != nil {
errList = append(errList, err)
} else if len(portMappings) > 0 {
if err = plugin.hostportManager.Remove(id.ID, &hostport.PodPortMapping{
Namespace: namespace,
Name: name,
PortMappings: portMappings,
HostNetwork: false,
}); err != nil {
errList = append(errList, err)
}
}
iplist, exists := plugin.getCachedPodIPs(id) iplist, exists := plugin.getCachedPodIPs(id)
if !exists || len(iplist) == 0 { if !exists || len(iplist) == 0 {
klog.V(5).Infof("container %s (%s/%s) does not have recorded. ignoring teardown call", id, name, namespace) klog.V(5).Infof("container %s (%s/%s) does not have recorded. ignoring teardown call", id, name, namespace)
return nil return nil
} }
// get the list of port mappings
portMappings, err := plugin.host.GetPodPortMappings(id.ID)
if err != nil {
errList = append(errList, err)
}
// process each pod IP
for _, ip := range iplist { for _, ip := range iplist {
isV6 := netutils.IsIPv6String(ip)
klog.V(5).Infof("Removing pod port mappings from IP %s", ip)
if portMappings != nil && len(portMappings) > 0 {
if isV6 {
if err = plugin.hostportManagerv6.Remove(id.ID, &hostport.PodPortMapping{
Namespace: namespace,
Name: name,
PortMappings: portMappings,
HostNetwork: false,
}); err != nil {
errList = append(errList, err)
}
} else {
if err = plugin.hostportManager.Remove(id.ID, &hostport.PodPortMapping{
Namespace: namespace,
Name: name,
PortMappings: portMappings,
HostNetwork: false,
}); err != nil {
errList = append(errList, err)
}
}
}
klog.V(5).Infof("Removing pod IP %s from shaper for (%s/%s)", ip, name, namespace) klog.V(5).Infof("Removing pod IP %s from shaper for (%s/%s)", ip, name, namespace)
// shaper uses a cidr, but we are using a single IP. // shaper uses a cidr, but we are using a single IP.
isV6 := netutils.IsIPv6String(ip)
mask := "32" mask := "32"
if isV6 { if isV6 {
mask = "128" mask = "128"

View File

@ -31,7 +31,7 @@ import (
kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config" kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/dockershim/network" "k8s.io/kubernetes/pkg/kubelet/dockershim/network"
"k8s.io/kubernetes/pkg/kubelet/dockershim/network/cni/testing" mockcni "k8s.io/kubernetes/pkg/kubelet/dockershim/network/cni/testing"
hostporttest "k8s.io/kubernetes/pkg/kubelet/dockershim/network/hostport/testing" hostporttest "k8s.io/kubernetes/pkg/kubelet/dockershim/network/hostport/testing"
nettest "k8s.io/kubernetes/pkg/kubelet/dockershim/network/testing" nettest "k8s.io/kubernetes/pkg/kubelet/dockershim/network/testing"
"k8s.io/kubernetes/pkg/util/bandwidth" "k8s.io/kubernetes/pkg/util/bandwidth"
@ -58,6 +58,7 @@ func TestGetPodNetworkStatus(t *testing.T) {
podIPMap[kubecontainer.ContainerID{ID: "1"}] = utilsets.NewString("10.245.0.2") podIPMap[kubecontainer.ContainerID{ID: "1"}] = utilsets.NewString("10.245.0.2")
podIPMap[kubecontainer.ContainerID{ID: "2"}] = utilsets.NewString("10.245.0.3") podIPMap[kubecontainer.ContainerID{ID: "2"}] = utilsets.NewString("10.245.0.3")
podIPMap[kubecontainer.ContainerID{ID: "3"}] = utilsets.NewString("10.245.0.4", "2000::") podIPMap[kubecontainer.ContainerID{ID: "3"}] = utilsets.NewString("10.245.0.4", "2000::")
podIPMap[kubecontainer.ContainerID{ID: "4"}] = utilsets.NewString("2000::2")
testCases := []struct { testCases := []struct {
id string id string
@ -80,6 +81,11 @@ func TestGetPodNetworkStatus(t *testing.T) {
expectError: false, expectError: false,
expectIP: utilsets.NewString("10.245.0.4", "2000::"), expectIP: utilsets.NewString("10.245.0.4", "2000::"),
}, },
{
id: "4",
expectError: false,
expectIP: utilsets.NewString("2000::2"),
},
//not in podIP map //not in podIP map
{ {
@ -159,7 +165,7 @@ func TestTeardownCallsShaper(t *testing.T) {
} }
fhost := nettest.NewFakeHost(nil) fhost := nettest.NewFakeHost(nil)
fshaper := &bandwidth.FakeShaper{} fshaper := &bandwidth.FakeShaper{}
mockcni := &mock_cni.MockCNI{} mockcni := &mockcni.MockCNI{}
ips := make(map[kubecontainer.ContainerID]utilsets.String) ips := make(map[kubecontainer.ContainerID]utilsets.String)
kubenet := newFakeKubenetPlugin(ips, fexec, fhost) kubenet := newFakeKubenetPlugin(ips, fexec, fhost)
kubenet.loConfig = &libcni.NetworkConfig{ kubenet.loConfig = &libcni.NetworkConfig{
@ -253,7 +259,7 @@ func TestTearDownWithoutRuntime(t *testing.T) {
fhost := nettest.NewFakeHost(nil) fhost := nettest.NewFakeHost(nil)
fhost.Legacy = false fhost.Legacy = false
mockcni := &mock_cni.MockCNI{} mockcni := &mockcni.MockCNI{}
fexec := &fakeexec.FakeExec{ fexec := &fakeexec.FakeExec{
CommandScript: []fakeexec.FakeCommandAction{}, CommandScript: []fakeexec.FakeCommandAction{},