Merge pull request #39443 from freehan/cri-hostport

Automatic merge from submit-queue (batch tested with PRs 40111, 40368, 40342, 40274, 39443)

refactor hostport logic

Refactor hostport logic to avoid using api.Pod object directly.
This commit is contained in:
Kubernetes Submit Queue 2017-01-31 19:18:44 -08:00 committed by GitHub
commit 32d79c3461
7 changed files with 167 additions and 153 deletions

View File

@ -12,12 +12,11 @@ go_library(
name = "go_default_library", name = "go_default_library",
srcs = [ srcs = [
"fake_iptables.go", "fake_iptables.go",
"hostport.go", "hostport_syncer.go",
], ],
tags = ["automanaged"], tags = ["automanaged"],
deps = [ deps = [
"//pkg/api/v1:go_default_library", "//pkg/api/v1:go_default_library",
"//pkg/kubelet/container:go_default_library",
"//pkg/proxy/iptables:go_default_library", "//pkg/proxy/iptables:go_default_library",
"//pkg/util/dbus:go_default_library", "//pkg/util/dbus:go_default_library",
"//pkg/util/exec:go_default_library", "//pkg/util/exec:go_default_library",
@ -28,14 +27,12 @@ go_library(
go_test( go_test(
name = "go_default_test", name = "go_default_test",
srcs = ["hostport_test.go"], srcs = ["hostport_syncer_test.go"],
library = ":go_default_library", library = ":go_default_library",
tags = ["automanaged"], tags = ["automanaged"],
deps = [ deps = [
"//pkg/api/v1:go_default_library", "//pkg/api/v1:go_default_library",
"//pkg/kubelet/container:go_default_library",
"//pkg/util/iptables:go_default_library", "//pkg/util/iptables:go_default_library",
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
], ],
) )

View File

@ -26,8 +26,8 @@ import (
"time" "time"
"github.com/golang/glog" "github.com/golang/glog"
"k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/api/v1"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
iptablesproxy "k8s.io/kubernetes/pkg/proxy/iptables" iptablesproxy "k8s.io/kubernetes/pkg/proxy/iptables"
utildbus "k8s.io/kubernetes/pkg/util/dbus" utildbus "k8s.io/kubernetes/pkg/util/dbus"
utilexec "k8s.io/kubernetes/pkg/util/exec" utilexec "k8s.io/kubernetes/pkg/util/exec"
@ -41,27 +41,46 @@ const (
kubeHostportChainPrefix string = "KUBE-HP-" kubeHostportChainPrefix string = "KUBE-HP-"
) )
type HostportHandler interface { // HostportSyncer takes a list of PodPortMappings and implements hostport all at once
OpenPodHostportsAndSync(newPod *ActivePod, natInterfaceName string, activePods []*ActivePod) error type HostportSyncer interface {
SyncHostports(natInterfaceName string, activePods []*ActivePod) error // SyncHostports gathers all hostports on node and setup iptables rules to enable them.
// On each invocation existing ports are synced and stale rules are deleted.
SyncHostports(natInterfaceName string, activePodPortMappings []*PodPortMapping) error
// OpenPodHostportsAndSync opens hostports for a new PodPortMapping, gathers all hostports on
// node, sets up iptables rules enable them. On each invocation existing ports are synced and stale rules are deleted.
// 'newPortMapping' must also be present in 'activePodPortMappings'.
OpenPodHostportsAndSync(newPortMapping *PodPortMapping, natInterfaceName string, activePodPortMappings []*PodPortMapping) error
} }
type ActivePod struct { // PortMapping represents a network port in a container
Pod *v1.Pod type PortMapping struct {
IP net.IP Name string
HostPort int32
ContainerPort int32
Protocol v1.Protocol
HostIP string
}
// PodPortMapping represents a pod's network state and associated container port mappings
type PodPortMapping struct {
Namespace string
Name string
PortMappings []*PortMapping
HostNetwork bool
IP net.IP
} }
type hostportOpener func(*hostport) (closeable, error) type hostportOpener func(*hostport) (closeable, error)
type handler struct { type hostportSyncer struct {
hostPortMap map[hostport]closeable hostPortMap map[hostport]closeable
iptables utiliptables.Interface iptables utiliptables.Interface
portOpener hostportOpener portOpener hostportOpener
} }
func NewHostportHandler() HostportHandler { func NewHostportSyncer() HostportSyncer {
iptInterface := utiliptables.New(utilexec.New(), utildbus.New(), utiliptables.ProtocolIpv4) iptInterface := utiliptables.New(utilexec.New(), utildbus.New(), utiliptables.ProtocolIpv4)
return &handler{ return &hostportSyncer{
hostPortMap: make(map[hostport]closeable), hostPortMap: make(map[hostport]closeable),
iptables: iptInterface, iptables: iptInterface,
portOpener: openLocalPort, portOpener: openLocalPort,
@ -87,35 +106,31 @@ func (hp *hostport) String() string {
} }
//openPodHostports opens all hostport for pod and returns the map of hostport and socket //openPodHostports opens all hostport for pod and returns the map of hostport and socket
func (h *handler) openHostports(pod *v1.Pod) error { func (h *hostportSyncer) openHostports(podHostportMapping *PodPortMapping) error {
var retErr error var retErr error
ports := make(map[hostport]closeable) ports := make(map[hostport]closeable)
for _, container := range pod.Spec.Containers { for _, port := range podHostportMapping.PortMappings {
for _, port := range container.Ports { if port.HostPort <= 0 {
if port.HostPort <= 0 { // Assume hostport is not specified in this portmapping. So skip
// Ignore continue
continue
}
hp := hostport{
port: port.HostPort,
protocol: strings.ToLower(string(port.Protocol)),
}
socket, err := h.portOpener(&hp)
if err != nil {
retErr = fmt.Errorf("Cannot open hostport %d for pod %s: %v", port.HostPort, kubecontainer.GetPodFullName(pod), err)
break
}
ports[hp] = socket
} }
if retErr != nil { hp := hostport{
port: port.HostPort,
protocol: strings.ToLower(string(port.Protocol)),
}
socket, err := h.portOpener(&hp)
if err != nil {
retErr = fmt.Errorf("Cannot open hostport %d for pod %s: %v", port.HostPort, getPodFullName(podHostportMapping), err)
break break
} }
ports[hp] = socket
} }
// If encounter any error, close all hostports that just got opened. // If encounter any error, close all hostports that just got opened.
if retErr != nil { if retErr != nil {
for hp, socket := range ports { for hp, socket := range ports {
if err := socket.Close(); err != nil { if err := socket.Close(); err != nil {
glog.Errorf("Cannot clean up hostport %d for pod %s: %v", hp.port, kubecontainer.GetPodFullName(pod), err) glog.Errorf("Cannot clean up hostport %d for pod %s: %v", hp.port, getPodFullName(podHostportMapping), err)
} }
} }
return retErr return retErr
@ -128,27 +143,28 @@ func (h *handler) openHostports(pod *v1.Pod) error {
return nil return nil
} }
func getPodFullName(pod *PodPortMapping) string {
// Use underscore as the delimiter because it is not allowed in pod name
// (DNS subdomain format), while allowed in the container name format.
return pod.Name + "_" + pod.Namespace
}
// gatherAllHostports returns all hostports that should be presented on node, // gatherAllHostports returns all hostports that should be presented on node,
// given the list of pods running on that node and ignoring host network // given the list of pods running on that node and ignoring host network
// pods (which don't need hostport <-> container port mapping). // pods (which don't need hostport <-> container port mapping).
func gatherAllHostports(activePods []*ActivePod) (map[v1.ContainerPort]targetPod, error) { func gatherAllHostports(activePodPortMapping []*PodPortMapping) (map[*PortMapping]targetPod, error) {
podHostportMap := make(map[v1.ContainerPort]targetPod) podHostportMap := make(map[*PortMapping]targetPod)
for _, r := range activePods { for _, pm := range activePodPortMapping {
if r.IP.To4() == nil { if pm.IP.To4() == nil {
return nil, fmt.Errorf("Invalid or missing pod %s IP", kubecontainer.GetPodFullName(r.Pod)) return nil, fmt.Errorf("Invalid or missing pod %s IP", getPodFullName(pm))
} }
// should not handle hostports for hostnetwork pods // should not handle hostports for hostnetwork pods
if r.Pod.Spec.HostNetwork { if pm.HostNetwork {
continue continue
} }
for _, container := range r.Pod.Spec.Containers { for _, port := range pm.PortMappings {
for _, port := range container.Ports { podHostportMap[port] = targetPod{podFullName: getPodFullName(pm), podIP: pm.IP.String()}
if port.HostPort != 0 {
podHostportMap[port] = targetPod{podFullName: kubecontainer.GetPodFullName(r.Pod), podIP: r.IP.String()}
}
}
} }
} }
return podHostportMap, nil return podHostportMap, nil
@ -164,44 +180,44 @@ func writeLine(buf *bytes.Buffer, words ...string) {
// then encoding to base32 and truncating with the prefix "KUBE-SVC-". We do // then encoding to base32 and truncating with the prefix "KUBE-SVC-". We do
// this because IPTables Chain Names must be <= 28 chars long, and the longer // this because IPTables Chain Names must be <= 28 chars long, and the longer
// they are the harder they are to read. // they are the harder they are to read.
func hostportChainName(cp v1.ContainerPort, podFullName string) utiliptables.Chain { func hostportChainName(pm *PortMapping, podFullName string) utiliptables.Chain {
hash := sha256.Sum256([]byte(string(cp.HostPort) + string(cp.Protocol) + podFullName)) hash := sha256.Sum256([]byte(string(pm.HostPort) + string(pm.Protocol) + podFullName))
encoded := base32.StdEncoding.EncodeToString(hash[:]) encoded := base32.StdEncoding.EncodeToString(hash[:])
return utiliptables.Chain(kubeHostportChainPrefix + encoded[:16]) return utiliptables.Chain(kubeHostportChainPrefix + encoded[:16])
} }
// OpenPodHostportsAndSync opens hostports for a new pod, gathers all hostports on // OpenPodHostportsAndSync opens hostports for a new PodPortMapping, gathers all hostports on
// node, sets up iptables rules enable them. And finally clean up stale hostports. // node, sets up iptables rules enable them. And finally clean up stale hostports.
// 'newPod' must also be present in 'activePods'. // 'newPortMapping' must also be present in 'activePodPortMappings'.
func (h *handler) OpenPodHostportsAndSync(newPod *ActivePod, natInterfaceName string, activePods []*ActivePod) error { func (h *hostportSyncer) OpenPodHostportsAndSync(newPortMapping *PodPortMapping, natInterfaceName string, activePodPortMappings []*PodPortMapping) error {
// try to open pod host port if specified // try to open pod host port if specified
if err := h.openHostports(newPod.Pod); err != nil { if err := h.openHostports(newPortMapping); err != nil {
return err return err
} }
// Add the new pod to active pods if it's not present. // Add the new pod to active pods if it's not present.
var found bool var found bool
for _, p := range activePods { for _, pm := range activePodPortMappings {
if p.Pod.UID == newPod.Pod.UID { if pm.Namespace == newPortMapping.Namespace && pm.Name == newPortMapping.Name {
found = true found = true
break break
} }
} }
if !found { if !found {
activePods = append(activePods, newPod) activePodPortMappings = append(activePodPortMappings, newPortMapping)
} }
return h.SyncHostports(natInterfaceName, activePods) return h.SyncHostports(natInterfaceName, activePodPortMappings)
} }
// SyncHostports gathers all hostports on node and setup iptables rules enable them. And finally clean up stale hostports // SyncHostports gathers all hostports on node and setup iptables rules enable them. And finally clean up stale hostports
func (h *handler) SyncHostports(natInterfaceName string, activePods []*ActivePod) error { func (h *hostportSyncer) SyncHostports(natInterfaceName string, activePodPortMappings []*PodPortMapping) error {
start := time.Now() start := time.Now()
defer func() { defer func() {
glog.V(4).Infof("syncHostportsRules took %v", time.Since(start)) glog.V(4).Infof("syncHostportsRules took %v", time.Since(start))
}() }()
containerPortMap, err := gatherAllHostports(activePods) hostportPodMap, err := gatherAllHostports(activePodPortMappings)
if err != nil { if err != nil {
return err return err
} }
@ -256,9 +272,9 @@ func (h *handler) SyncHostports(natInterfaceName string, activePods []*ActivePod
// Accumulate NAT chains to keep. // Accumulate NAT chains to keep.
activeNATChains := map[utiliptables.Chain]bool{} // use a map as a set activeNATChains := map[utiliptables.Chain]bool{} // use a map as a set
for containerPort, target := range containerPortMap { for port, target := range hostportPodMap {
protocol := strings.ToLower(string(containerPort.Protocol)) protocol := strings.ToLower(string(port.Protocol))
hostportChain := hostportChainName(containerPort, target.podFullName) hostportChain := hostportChainName(port, target.podFullName)
if chain, ok := existingNATChains[hostportChain]; ok { if chain, ok := existingNATChains[hostportChain]; ok {
writeLine(natChains, chain) writeLine(natChains, chain)
} else { } else {
@ -270,9 +286,9 @@ func (h *handler) SyncHostports(natInterfaceName string, activePods []*ActivePod
// Redirect to hostport chain // Redirect to hostport chain
args := []string{ args := []string{
"-A", string(kubeHostportsChain), "-A", string(kubeHostportsChain),
"-m", "comment", "--comment", fmt.Sprintf(`"%s hostport %d"`, target.podFullName, containerPort.HostPort), "-m", "comment", "--comment", fmt.Sprintf(`"%s hostport %d"`, target.podFullName, port.HostPort),
"-m", protocol, "-p", protocol, "-m", protocol, "-p", protocol,
"--dport", fmt.Sprintf("%d", containerPort.HostPort), "--dport", fmt.Sprintf("%d", port.HostPort),
"-j", string(hostportChain), "-j", string(hostportChain),
} }
writeLine(natRules, args...) writeLine(natRules, args...)
@ -281,7 +297,7 @@ func (h *handler) SyncHostports(natInterfaceName string, activePods []*ActivePod
// If the request comes from the pod that is serving the hostport, then SNAT // If the request comes from the pod that is serving the hostport, then SNAT
args = []string{ args = []string{
"-A", string(hostportChain), "-A", string(hostportChain),
"-m", "comment", "--comment", fmt.Sprintf(`"%s hostport %d"`, target.podFullName, containerPort.HostPort), "-m", "comment", "--comment", fmt.Sprintf(`"%s hostport %d"`, target.podFullName, port.HostPort),
"-s", target.podIP, "-j", string(iptablesproxy.KubeMarkMasqChain), "-s", target.podIP, "-j", string(iptablesproxy.KubeMarkMasqChain),
} }
writeLine(natRules, args...) writeLine(natRules, args...)
@ -290,9 +306,9 @@ func (h *handler) SyncHostports(natInterfaceName string, activePods []*ActivePod
// IPTables will maintained the stats for this chain // IPTables will maintained the stats for this chain
args = []string{ args = []string{
"-A", string(hostportChain), "-A", string(hostportChain),
"-m", "comment", "--comment", fmt.Sprintf(`"%s hostport %d"`, target.podFullName, containerPort.HostPort), "-m", "comment", "--comment", fmt.Sprintf(`"%s hostport %d"`, target.podFullName, port.HostPort),
"-m", protocol, "-p", protocol, "-m", protocol, "-p", protocol,
"-j", "DNAT", fmt.Sprintf("--to-destination=%s:%d", target.podIP, containerPort.ContainerPort), "-j", "DNAT", fmt.Sprintf("--to-destination=%s:%d", target.podIP, port.ContainerPort),
} }
writeLine(natRules, args...) writeLine(natRules, args...)
} }
@ -321,7 +337,7 @@ func (h *handler) SyncHostports(natInterfaceName string, activePods []*ActivePod
return fmt.Errorf("Failed to execute iptables-restore: %v", err) return fmt.Errorf("Failed to execute iptables-restore: %v", err)
} }
h.cleanupHostportMap(containerPortMap) h.cleanupHostportMap(hostportPodMap)
return nil return nil
} }
@ -364,7 +380,7 @@ func openLocalPort(hp *hostport) (closeable, error) {
} }
// cleanupHostportMap closes obsolete hostports // cleanupHostportMap closes obsolete hostports
func (h *handler) cleanupHostportMap(containerPortMap map[v1.ContainerPort]targetPod) { func (h *hostportSyncer) cleanupHostportMap(containerPortMap map[*PortMapping]targetPod) {
// compute hostports that are supposed to be open // compute hostports that are supposed to be open
currentHostports := make(map[hostport]bool) currentHostports := make(map[hostport]bool)
for containerPort := range containerPortMap { for containerPort := range containerPortMap {

View File

@ -23,9 +23,7 @@ import (
"strings" "strings"
"testing" "testing"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/api/v1"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
utiliptables "k8s.io/kubernetes/pkg/util/iptables" utiliptables "k8s.io/kubernetes/pkg/util/iptables"
) )
@ -56,39 +54,36 @@ type ruleMatch struct {
func TestOpenPodHostports(t *testing.T) { func TestOpenPodHostports(t *testing.T) {
fakeIPTables := NewFakeIPTables() fakeIPTables := NewFakeIPTables()
h := &handler{ h := &hostportSyncer{
hostPortMap: make(map[hostport]closeable), hostPortMap: make(map[hostport]closeable),
iptables: fakeIPTables, iptables: fakeIPTables,
portOpener: openFakeSocket, portOpener: openFakeSocket,
} }
tests := []struct { tests := []struct {
pod *v1.Pod mapping *PodPortMapping
ip string
matches []*ruleMatch matches []*ruleMatch
}{ }{
// New pod that we are going to add // New pod that we are going to add
{ {
&v1.Pod{ &PodPortMapping{
ObjectMeta: metav1.ObjectMeta{ Name: "test-pod",
Name: "test-pod", Namespace: v1.NamespaceDefault,
Namespace: metav1.NamespaceDefault, IP: net.ParseIP("10.1.1.2"),
}, HostNetwork: false,
Spec: v1.PodSpec{ PortMappings: []*PortMapping{
Containers: []v1.Container{{ {
Ports: []v1.ContainerPort{{ HostPort: 4567,
HostPort: 4567, ContainerPort: 80,
ContainerPort: 80, Protocol: v1.ProtocolTCP,
Protocol: v1.ProtocolTCP, },
}, { {
HostPort: 5678, HostPort: 5678,
ContainerPort: 81, ContainerPort: 81,
Protocol: v1.ProtocolUDP, Protocol: v1.ProtocolUDP,
}}, },
}},
}, },
}, },
"10.1.1.2",
[]*ruleMatch{ []*ruleMatch{
{ {
-1, -1,
@ -124,22 +119,19 @@ func TestOpenPodHostports(t *testing.T) {
}, },
// Already running pod // Already running pod
{ {
&v1.Pod{ &PodPortMapping{
ObjectMeta: metav1.ObjectMeta{ Name: "another-test-pod",
Name: "another-test-pod", Namespace: v1.NamespaceDefault,
Namespace: metav1.NamespaceDefault, IP: net.ParseIP("10.1.1.5"),
}, HostNetwork: false,
Spec: v1.PodSpec{ PortMappings: []*PortMapping{
Containers: []v1.Container{{ {
Ports: []v1.ContainerPort{{ HostPort: 123,
HostPort: 123, ContainerPort: 654,
ContainerPort: 654, Protocol: v1.ProtocolTCP,
Protocol: v1.ProtocolTCP, },
}},
}},
}, },
}, },
"10.1.1.5",
[]*ruleMatch{ []*ruleMatch{
{ {
-1, -1,
@ -160,20 +152,18 @@ func TestOpenPodHostports(t *testing.T) {
}, },
} }
activePods := make([]*ActivePod, 0) activePodPortMapping := make([]*PodPortMapping, 0)
// Fill in any match rules missing chain names // Fill in any match rules missing chain names
for _, test := range tests { for _, test := range tests {
for _, match := range test.matches { for _, match := range test.matches {
if match.hostport >= 0 { if match.hostport >= 0 {
found := false found := false
for _, c := range test.pod.Spec.Containers { for _, pm := range test.mapping.PortMappings {
for _, cp := range c.Ports { if int(pm.HostPort) == match.hostport {
if int(cp.HostPort) == match.hostport { match.chain = string(hostportChainName(pm, getPodFullName(test.mapping)))
match.chain = string(hostportChainName(cp, kubecontainer.GetPodFullName(test.pod))) found = true
found = true break
break
}
} }
} }
if !found { if !found {
@ -181,24 +171,22 @@ func TestOpenPodHostports(t *testing.T) {
} }
} }
} }
activePods = append(activePods, &ActivePod{ activePodPortMapping = append(activePodPortMapping, test.mapping)
Pod: test.pod,
IP: net.ParseIP(test.ip),
})
} }
// Already running pod's host port // Already running pod's host port
hp := hostport{ hp := hostport{
tests[1].pod.Spec.Containers[0].Ports[0].HostPort, tests[1].mapping.PortMappings[0].HostPort,
strings.ToLower(string(tests[1].pod.Spec.Containers[0].Ports[0].Protocol)), strings.ToLower(string(tests[1].mapping.PortMappings[0].Protocol)),
} }
h.hostPortMap[hp] = &fakeSocket{ h.hostPortMap[hp] = &fakeSocket{
tests[1].pod.Spec.Containers[0].Ports[0].HostPort, tests[1].mapping.PortMappings[0].HostPort,
strings.ToLower(string(tests[1].pod.Spec.Containers[0].Ports[0].Protocol)), strings.ToLower(string(tests[1].mapping.PortMappings[0].Protocol)),
false, false,
} }
err := h.OpenPodHostportsAndSync(&ActivePod{Pod: tests[0].pod, IP: net.ParseIP(tests[0].ip)}, "br0", activePods) err := h.OpenPodHostportsAndSync(tests[0].mapping, "br0", activePodPortMapping)
if err != nil { if err != nil {
t.Fatalf("Failed to OpenPodHostportsAndSync: %v", err) t.Fatalf("Failed to OpenPodHostportsAndSync: %v", err)
} }

View File

@ -11,10 +11,7 @@ go_library(
name = "go_default_library", name = "go_default_library",
srcs = ["fake.go"], srcs = ["fake.go"],
tags = ["automanaged"], tags = ["automanaged"],
deps = [ deps = ["//pkg/kubelet/network/hostport:go_default_library"],
"//pkg/kubelet/container:go_default_library",
"//pkg/kubelet/network/hostport:go_default_library",
],
) )
filegroup( filegroup(

View File

@ -19,24 +19,23 @@ package testing
import ( import (
"fmt" "fmt"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/network/hostport" "k8s.io/kubernetes/pkg/kubelet/network/hostport"
) )
type fakeHandler struct{} type fakeSyncer struct{}
func NewFakeHostportHandler() hostport.HostportHandler { func NewFakeHostportSyncer() hostport.HostportSyncer {
return &fakeHandler{} return &fakeSyncer{}
} }
func (h *fakeHandler) OpenPodHostportsAndSync(newPod *hostport.ActivePod, natInterfaceName string, activePods []*hostport.ActivePod) error { func (h *fakeSyncer) OpenPodHostportsAndSync(newPortMapping *hostport.PodPortMapping, natInterfaceName string, activePortMapping []*hostport.PodPortMapping) error {
return h.SyncHostports(natInterfaceName, activePods) return h.SyncHostports(natInterfaceName, activePortMapping)
} }
func (h *fakeHandler) SyncHostports(natInterfaceName string, activePods []*hostport.ActivePod) error { func (h *fakeSyncer) SyncHostports(natInterfaceName string, activePortMapping []*hostport.PodPortMapping) error {
for _, r := range activePods { for _, r := range activePortMapping {
if r.IP.To4() == nil { if r.IP.To4() == nil {
return fmt.Errorf("Invalid or missing pod %s IP", kubecontainer.GetPodFullName(r.Pod)) return fmt.Errorf("Invalid or missing pod %s/%s IP", r.Namespace, r.Name)
} }
} }

View File

@ -89,7 +89,7 @@ type kubenetNetworkPlugin struct {
execer utilexec.Interface execer utilexec.Interface
nsenterPath string nsenterPath string
hairpinMode componentconfig.HairpinMode hairpinMode componentconfig.HairpinMode
hostportHandler hostport.HostportHandler hostportSyncer hostport.HostportSyncer
iptables utiliptables.Interface iptables utiliptables.Interface
sysctl utilsysctl.Interface sysctl utilsysctl.Interface
ebtables utilebtables.Interface ebtables utilebtables.Interface
@ -113,7 +113,7 @@ func NewPlugin(networkPluginDir string) network.NetworkPlugin {
iptables: iptInterface, iptables: iptInterface,
sysctl: sysctl, sysctl: sysctl,
vendorDir: networkPluginDir, vendorDir: networkPluginDir,
hostportHandler: hostport.NewHostportHandler(), hostportSyncer: hostport.NewHostportSyncer(),
nonMasqueradeCIDR: "10.0.0.0/8", nonMasqueradeCIDR: "10.0.0.0/8",
} }
} }
@ -375,13 +375,13 @@ func (plugin *kubenetNetworkPlugin) setup(namespace string, name string, id kube
} }
// Open any hostports the pod's containers want // Open any hostports the pod's containers want
activePods, err := plugin.getActivePods() activePodPortMapping, err := plugin.getPodPortMapping()
if err != nil { if err != nil {
return err return err
} }
newPod := &hostport.ActivePod{Pod: pod, IP: ip4} newPodPortMapping := constructPodPortMapping(pod, ip4)
if err := plugin.hostportHandler.OpenPodHostportsAndSync(newPod, BridgeName, activePods); err != nil { if err := plugin.hostportSyncer.OpenPodHostportsAndSync(newPodPortMapping, BridgeName, activePodPortMapping); err != nil {
return err return err
} }
@ -471,9 +471,9 @@ func (plugin *kubenetNetworkPlugin) teardown(namespace string, name string, id k
return utilerrors.NewAggregate(errList) return utilerrors.NewAggregate(errList)
} }
activePods, err := plugin.getActivePods() activePodPortMapping, err := plugin.getPodPortMapping()
if err == nil { if err == nil {
err = plugin.hostportHandler.SyncHostports(BridgeName, activePods) err = plugin.hostportSyncer.SyncHostports(BridgeName, activePodPortMapping)
} }
if err != nil { if err != nil {
errList = append(errList, err) errList = append(errList, err)
@ -589,15 +589,12 @@ func (plugin *kubenetNetworkPlugin) getNonExitedPods() ([]*kubecontainer.Pod, er
return ret, nil return ret, nil
} }
// Returns a list of pods running or ready to run on this node and each pod's IP address. func (plugin *kubenetNetworkPlugin) getPodPortMapping() ([]*hostport.PodPortMapping, error) {
// Assumes PodSpecs retrieved from the runtime include the name and ID of containers in
// each pod.
func (plugin *kubenetNetworkPlugin) getActivePods() ([]*hostport.ActivePod, error) {
pods, err := plugin.getNonExitedPods() pods, err := plugin.getNonExitedPods()
if err != nil { if err != nil {
return nil, err return nil, err
} }
activePods := make([]*hostport.ActivePod, 0) activePodPortMappings := make([]*hostport.PodPortMapping, 0)
for _, p := range pods { for _, p := range pods {
containerID, err := plugin.host.GetRuntime().GetPodContainerID(p) containerID, err := plugin.host.GetRuntime().GetPodContainerID(p)
if err != nil { if err != nil {
@ -612,13 +609,33 @@ func (plugin *kubenetNetworkPlugin) getActivePods() ([]*hostport.ActivePod, erro
continue continue
} }
if pod, ok := plugin.host.GetPodByName(p.Namespace, p.Name); ok { if pod, ok := plugin.host.GetPodByName(p.Namespace, p.Name); ok {
activePods = append(activePods, &hostport.ActivePod{ activePodPortMappings = append(activePodPortMappings, constructPodPortMapping(pod, podIP))
Pod: pod, }
IP: podIP, }
return activePodPortMappings, nil
}
func constructPodPortMapping(pod *v1.Pod, podIP net.IP) *hostport.PodPortMapping {
portMappings := make([]*hostport.PortMapping, 0)
for _, c := range pod.Spec.Containers {
for _, port := range c.Ports {
portMappings = append(portMappings, &hostport.PortMapping{
Name: port.Name,
HostPort: port.HostPort,
ContainerPort: port.ContainerPort,
Protocol: port.Protocol,
HostIP: port.HostIP,
}) })
} }
} }
return activePods, nil
return &hostport.PodPortMapping{
Namespace: pod.Namespace,
Name: pod.Name,
PortMappings: portMappings,
HostNetwork: pod.Spec.HostNetwork,
IP: podIP,
}
} }
// ipamGarbageCollection will release unused IP. // ipamGarbageCollection will release unused IP.

View File

@ -141,7 +141,7 @@ func TestTeardownCallsShaper(t *testing.T) {
kubenet.cniConfig = mockcni kubenet.cniConfig = mockcni
kubenet.iptables = ipttest.NewFake() kubenet.iptables = ipttest.NewFake()
kubenet.bandwidthShaper = fshaper kubenet.bandwidthShaper = fshaper
kubenet.hostportHandler = hostporttest.NewFakeHostportHandler() kubenet.hostportSyncer = hostporttest.NewFakeHostportSyncer()
mockcni.On("DelNetwork", mock.AnythingOfType("*libcni.NetworkConfig"), mock.AnythingOfType("*libcni.RuntimeConf")).Return(nil) mockcni.On("DelNetwork", mock.AnythingOfType("*libcni.NetworkConfig"), mock.AnythingOfType("*libcni.RuntimeConf")).Return(nil)