rename HostportHandler to HostportSyncer

This commit is contained in:
Minhan Xia 2017-01-25 16:28:03 -08:00
parent 8e318b8d9b
commit 548a6122c5
6 changed files with 28 additions and 25 deletions

View File

@ -12,7 +12,7 @@ go_library(
name = "go_default_library", name = "go_default_library",
srcs = [ srcs = [
"fake_iptables.go", "fake_iptables.go",
"hostport.go", "hostport_syncer.go",
], ],
tags = ["automanaged"], tags = ["automanaged"],
deps = [ deps = [
@ -27,7 +27,7 @@ go_library(
go_test( go_test(
name = "go_default_test", name = "go_default_test",
srcs = ["hostport_test.go"], srcs = ["hostport_syncer_test.go"],
library = ":go_default_library", library = ":go_default_library",
tags = ["automanaged"], tags = ["automanaged"],
deps = [ deps = [

View File

@ -41,16 +41,18 @@ const (
kubeHostportChainPrefix string = "KUBE-HP-" kubeHostportChainPrefix string = "KUBE-HP-"
) )
type HostportHandler interface { // HostportSyncer takes a list of PodPortMappings and implements hostport all at once
// SyncHostports gathers all hostports on node and setup iptables rules enable them. And finally clean up stale hostports type HostportSyncer interface {
// SyncHostports gathers all hostports on node and setup iptables rules to enable them.
// On each invocation existing ports are synced and stale rules are deleted.
SyncHostports(natInterfaceName string, activePodPortMappings []*PodPortMapping) error SyncHostports(natInterfaceName string, activePodPortMappings []*PodPortMapping) error
// OpenPodHostportsAndSync opens hostports for a new PodPortMapping, gathers all hostports on // OpenPodHostportsAndSync opens hostports for a new PodPortMapping, gathers all hostports on
// node, sets up iptables rules enable them. And finally clean up stale hostports. // node, sets up iptables rules enable them. On each invocation existing ports are synced and stale rules are deleted.
// 'newPortMapping' must also be present in 'activePodPortMappings'. // 'newPortMapping' must also be present in 'activePodPortMappings'.
OpenPodHostportsAndSync(newPortMapping *PodPortMapping, natInterfaceName string, activePodPortMappings []*PodPortMapping) error OpenPodHostportsAndSync(newPortMapping *PodPortMapping, natInterfaceName string, activePodPortMappings []*PodPortMapping) error
} }
// PortMapping represents a network port in a single container // PortMapping represents a network port in a container
type PortMapping struct { type PortMapping struct {
Name string Name string
HostPort int32 HostPort int32
@ -59,6 +61,7 @@ type PortMapping struct {
HostIP string HostIP string
} }
// PodPortMapping represents a pod's network state and associated container port mappings
type PodPortMapping struct { type PodPortMapping struct {
Namespace string Namespace string
Name string Name string
@ -69,15 +72,15 @@ type PodPortMapping struct {
type hostportOpener func(*hostport) (closeable, error) type hostportOpener func(*hostport) (closeable, error)
type handler struct { type hostportSyncer struct {
hostPortMap map[hostport]closeable hostPortMap map[hostport]closeable
iptables utiliptables.Interface iptables utiliptables.Interface
portOpener hostportOpener portOpener hostportOpener
} }
func NewHostportHandler() HostportHandler { func NewHostportSyncer() HostportSyncer {
iptInterface := utiliptables.New(utilexec.New(), utildbus.New(), utiliptables.ProtocolIpv4) iptInterface := utiliptables.New(utilexec.New(), utildbus.New(), utiliptables.ProtocolIpv4)
return &handler{ return &hostportSyncer{
hostPortMap: make(map[hostport]closeable), hostPortMap: make(map[hostport]closeable),
iptables: iptInterface, iptables: iptInterface,
portOpener: openLocalPort, portOpener: openLocalPort,
@ -103,12 +106,12 @@ func (hp *hostport) String() string {
} }
//openPodHostports opens all hostport for pod and returns the map of hostport and socket //openPodHostports opens all hostport for pod and returns the map of hostport and socket
func (h *handler) openHostports(podHostportMapping *PodPortMapping) error { func (h *hostportSyncer) openHostports(podHostportMapping *PodPortMapping) error {
var retErr error var retErr error
ports := make(map[hostport]closeable) ports := make(map[hostport]closeable)
for _, port := range podHostportMapping.PortMappings { for _, port := range podHostportMapping.PortMappings {
if port.HostPort <= 0 { if port.HostPort <= 0 {
// Ignore // Assume hostport is not specified in this portmapping. So skip
continue continue
} }
hp := hostport{ hp := hostport{
@ -186,7 +189,7 @@ func hostportChainName(pm *PortMapping, podFullName string) utiliptables.Chain {
// OpenPodHostportsAndSync opens hostports for a new PodPortMapping, gathers all hostports on // OpenPodHostportsAndSync opens hostports for a new PodPortMapping, gathers all hostports on
// node, sets up iptables rules enable them. And finally clean up stale hostports. // node, sets up iptables rules enable them. And finally clean up stale hostports.
// 'newPortMapping' must also be present in 'activePodPortMappings'. // 'newPortMapping' must also be present in 'activePodPortMappings'.
func (h *handler) OpenPodHostportsAndSync(newPortMapping *PodPortMapping, natInterfaceName string, activePodPortMappings []*PodPortMapping) error { func (h *hostportSyncer) OpenPodHostportsAndSync(newPortMapping *PodPortMapping, natInterfaceName string, activePodPortMappings []*PodPortMapping) error {
// try to open pod host port if specified // try to open pod host port if specified
if err := h.openHostports(newPortMapping); err != nil { if err := h.openHostports(newPortMapping); err != nil {
return err return err
@ -208,7 +211,7 @@ func (h *handler) OpenPodHostportsAndSync(newPortMapping *PodPortMapping, natInt
} }
// SyncHostports gathers all hostports on node and setup iptables rules enable them. And finally clean up stale hostports // SyncHostports gathers all hostports on node and setup iptables rules enable them. And finally clean up stale hostports
func (h *handler) SyncHostports(natInterfaceName string, activePodPortMappings []*PodPortMapping) error { func (h *hostportSyncer) SyncHostports(natInterfaceName string, activePodPortMappings []*PodPortMapping) error {
start := time.Now() start := time.Now()
defer func() { defer func() {
glog.V(4).Infof("syncHostportsRules took %v", time.Since(start)) glog.V(4).Infof("syncHostportsRules took %v", time.Since(start))
@ -377,7 +380,7 @@ func openLocalPort(hp *hostport) (closeable, error) {
} }
// cleanupHostportMap closes obsolete hostports // cleanupHostportMap closes obsolete hostports
func (h *handler) cleanupHostportMap(containerPortMap map[*PortMapping]targetPod) { func (h *hostportSyncer) cleanupHostportMap(containerPortMap map[*PortMapping]targetPod) {
// compute hostports that are supposed to be open // compute hostports that are supposed to be open
currentHostports := make(map[hostport]bool) currentHostports := make(map[hostport]bool)
for containerPort := range containerPortMap { for containerPort := range containerPortMap {

View File

@ -54,7 +54,7 @@ type ruleMatch struct {
func TestOpenPodHostports(t *testing.T) { func TestOpenPodHostports(t *testing.T) {
fakeIPTables := NewFakeIPTables() fakeIPTables := NewFakeIPTables()
h := &handler{ h := &hostportSyncer{
hostPortMap: make(map[hostport]closeable), hostPortMap: make(map[hostport]closeable),
iptables: fakeIPTables, iptables: fakeIPTables,
portOpener: openFakeSocket, portOpener: openFakeSocket,

View File

@ -22,17 +22,17 @@ import (
"k8s.io/kubernetes/pkg/kubelet/network/hostport" "k8s.io/kubernetes/pkg/kubelet/network/hostport"
) )
type fakeHandler struct{} type fakeSyncer struct{}
func NewFakeHostportHandler() hostport.HostportHandler { func NewFakeHostportSyncer() hostport.HostportSyncer {
return &fakeHandler{} return &fakeSyncer{}
} }
func (h *fakeHandler) OpenPodHostportsAndSync(newPortMapping *hostport.PodPortMapping, natInterfaceName string, activePortMapping []*hostport.PodPortMapping) error { func (h *fakeSyncer) OpenPodHostportsAndSync(newPortMapping *hostport.PodPortMapping, natInterfaceName string, activePortMapping []*hostport.PodPortMapping) error {
return h.SyncHostports(natInterfaceName, activePortMapping) return h.SyncHostports(natInterfaceName, activePortMapping)
} }
func (h *fakeHandler) SyncHostports(natInterfaceName string, activePortMapping []*hostport.PodPortMapping) error { func (h *fakeSyncer) SyncHostports(natInterfaceName string, activePortMapping []*hostport.PodPortMapping) error {
for _, r := range activePortMapping { for _, r := range activePortMapping {
if r.IP.To4() == nil { if r.IP.To4() == nil {
return fmt.Errorf("Invalid or missing pod %s/%s IP", r.Namespace, r.Name) return fmt.Errorf("Invalid or missing pod %s/%s IP", r.Namespace, r.Name)

View File

@ -89,7 +89,7 @@ type kubenetNetworkPlugin struct {
execer utilexec.Interface execer utilexec.Interface
nsenterPath string nsenterPath string
hairpinMode componentconfig.HairpinMode hairpinMode componentconfig.HairpinMode
hostportHandler hostport.HostportHandler hostportSyncer hostport.HostportSyncer
iptables utiliptables.Interface iptables utiliptables.Interface
sysctl utilsysctl.Interface sysctl utilsysctl.Interface
ebtables utilebtables.Interface ebtables utilebtables.Interface
@ -113,7 +113,7 @@ func NewPlugin(networkPluginDir string) network.NetworkPlugin {
iptables: iptInterface, iptables: iptInterface,
sysctl: sysctl, sysctl: sysctl,
vendorDir: networkPluginDir, vendorDir: networkPluginDir,
hostportHandler: hostport.NewHostportHandler(), hostportSyncer: hostport.NewHostportSyncer(),
nonMasqueradeCIDR: "10.0.0.0/8", nonMasqueradeCIDR: "10.0.0.0/8",
} }
} }
@ -381,7 +381,7 @@ func (plugin *kubenetNetworkPlugin) setup(namespace string, name string, id kube
} }
newPodPortMapping := constructPodPortMapping(pod, ip4) newPodPortMapping := constructPodPortMapping(pod, ip4)
if err := plugin.hostportHandler.OpenPodHostportsAndSync(newPodPortMapping, BridgeName, activePodPortMapping); err != nil { if err := plugin.hostportSyncer.OpenPodHostportsAndSync(newPodPortMapping, BridgeName, activePodPortMapping); err != nil {
return err return err
} }
@ -473,7 +473,7 @@ func (plugin *kubenetNetworkPlugin) teardown(namespace string, name string, id k
activePodPortMapping, err := plugin.getPodPortMapping() activePodPortMapping, err := plugin.getPodPortMapping()
if err == nil { if err == nil {
err = plugin.hostportHandler.SyncHostports(BridgeName, activePodPortMapping) err = plugin.hostportSyncer.SyncHostports(BridgeName, activePodPortMapping)
} }
if err != nil { if err != nil {
errList = append(errList, err) errList = append(errList, err)

View File

@ -141,7 +141,7 @@ func TestTeardownCallsShaper(t *testing.T) {
kubenet.cniConfig = mockcni kubenet.cniConfig = mockcni
kubenet.iptables = ipttest.NewFake() kubenet.iptables = ipttest.NewFake()
kubenet.bandwidthShaper = fshaper kubenet.bandwidthShaper = fshaper
kubenet.hostportHandler = hostporttest.NewFakeHostportHandler() kubenet.hostportSyncer = hostporttest.NewFakeHostportSyncer()
mockcni.On("DelNetwork", mock.AnythingOfType("*libcni.NetworkConfig"), mock.AnythingOfType("*libcni.RuntimeConf")).Return(nil) mockcni.On("DelNetwork", mock.AnythingOfType("*libcni.NetworkConfig"), mock.AnythingOfType("*libcni.RuntimeConf")).Return(nil)