Merge pull request #40948 from freehan/cri-hostport

Automatic merge from submit-queue (batch tested with PRs 40873, 40948, 39580, 41065, 40815)

[CRI] Enable Hostport Feature for Dockershim

Commits:
1. Refactor common hostport util logics and add more tests

2. Add HostportManager which can ADD/DEL hostports instead of a complete sync.

3. Add Interface for retreiving portMappings information of a pod in Network Host interface. 
Implement GetPodPortMappings interface in dockerService. 

4. Teach kubenet to use HostportManager
This commit is contained in:
Kubernetes Submit Queue 2017-02-08 14:14:43 -08:00 committed by GitHub
commit 42d8d4ca88
19 changed files with 1044 additions and 180 deletions

View File

@ -39,6 +39,7 @@ go_library(
"//pkg/kubelet/leaky:go_default_library", "//pkg/kubelet/leaky:go_default_library",
"//pkg/kubelet/network:go_default_library", "//pkg/kubelet/network:go_default_library",
"//pkg/kubelet/network/cni:go_default_library", "//pkg/kubelet/network/cni:go_default_library",
"//pkg/kubelet/network/hostport:go_default_library",
"//pkg/kubelet/network/kubenet:go_default_library", "//pkg/kubelet/network/kubenet:go_default_library",
"//pkg/kubelet/qos:go_default_library", "//pkg/kubelet/qos:go_default_library",
"//pkg/kubelet/server/streaming:go_default_library", "//pkg/kubelet/server/streaming:go_default_library",

View File

@ -25,6 +25,7 @@ import (
dockertypes "github.com/docker/engine-api/types" dockertypes "github.com/docker/engine-api/types"
"github.com/golang/glog" "github.com/golang/glog"
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/apis/componentconfig" "k8s.io/kubernetes/pkg/apis/componentconfig"
internalapi "k8s.io/kubernetes/pkg/kubelet/api" internalapi "k8s.io/kubernetes/pkg/kubelet/api"
runtimeapi "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime" runtimeapi "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime"
@ -34,6 +35,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/dockertools" "k8s.io/kubernetes/pkg/kubelet/dockertools"
"k8s.io/kubernetes/pkg/kubelet/network" "k8s.io/kubernetes/pkg/kubelet/network"
"k8s.io/kubernetes/pkg/kubelet/network/cni" "k8s.io/kubernetes/pkg/kubelet/network/cni"
"k8s.io/kubernetes/pkg/kubelet/network/hostport"
"k8s.io/kubernetes/pkg/kubelet/network/kubenet" "k8s.io/kubernetes/pkg/kubelet/network/kubenet"
"k8s.io/kubernetes/pkg/kubelet/server/streaming" "k8s.io/kubernetes/pkg/kubelet/server/streaming"
"k8s.io/kubernetes/pkg/kubelet/util/cache" "k8s.io/kubernetes/pkg/kubelet/util/cache"
@ -107,6 +109,35 @@ type NetworkPluginSettings struct {
LegacyRuntimeHost network.LegacyHost LegacyRuntimeHost network.LegacyHost
} }
// namespaceGetter is a wrapper around the dockerService that implements
// the network.NamespaceGetter interface.
type namespaceGetter struct {
ds *dockerService
}
func (n *namespaceGetter) GetNetNS(containerID string) (string, error) {
return n.ds.GetNetNS(containerID)
}
// portMappingGetter is a wrapper around the dockerService that implements
// the network.PortMappingGetter interface.
type portMappingGetter struct {
ds *dockerService
}
func (p *portMappingGetter) GetPodPortMappings(containerID string) ([]*hostport.PortMapping, error) {
return p.ds.GetPodPortMappings(containerID)
}
// dockerNetworkHost implements network.Host by wrapping the legacy host passed in by the kubelet
// and dockerServices which implementes the rest of the network host interfaces.
// The legacy host methods are slated for deletion.
type dockerNetworkHost struct {
network.LegacyHost
*namespaceGetter
*portMappingGetter
}
var internalLabelKeys []string = []string{containerTypeLabelKey, containerLogPathLabelKey, sandboxIDLabelKey} var internalLabelKeys []string = []string{containerTypeLabelKey, containerLogPathLabelKey, sandboxIDLabelKey}
// NOTE: Anything passed to DockerService should be eventually handled in another way when we switch to running the shim as a different process. // NOTE: Anything passed to DockerService should be eventually handled in another way when we switch to running the shim as a different process.
@ -138,6 +169,7 @@ func NewDockerService(client dockertools.DockerInterface, seccompProfileRoot str
netHost := &dockerNetworkHost{ netHost := &dockerNetworkHost{
pluginSettings.LegacyRuntimeHost, pluginSettings.LegacyRuntimeHost,
&namespaceGetter{ds}, &namespaceGetter{ds},
&portMappingGetter{ds},
} }
plug, err := network.InitNetworkPlugin(cniPlugins, pluginSettings.PluginName, netHost, pluginSettings.HairpinMode, pluginSettings.NonMasqueradeCIDR, pluginSettings.MTU) plug, err := network.InitNetworkPlugin(cniPlugins, pluginSettings.PluginName, netHost, pluginSettings.HairpinMode, pluginSettings.NonMasqueradeCIDR, pluginSettings.MTU)
if err != nil { if err != nil {
@ -240,12 +272,6 @@ func (ds *dockerService) UpdateRuntimeConfig(runtimeConfig *runtimeapi.RuntimeCo
return return
} }
// namespaceGetter is a wrapper around the dockerService that implements
// the network.NamespaceGetter interface.
type namespaceGetter struct {
*dockerService
}
// GetNetNS returns the network namespace of the given containerID. The ID // GetNetNS returns the network namespace of the given containerID. The ID
// supplied is typically the ID of a pod sandbox. This getter doesn't try // supplied is typically the ID of a pod sandbox. This getter doesn't try
// to map non-sandbox IDs to their respective sandboxes. // to map non-sandbox IDs to their respective sandboxes.
@ -257,12 +283,24 @@ func (ds *dockerService) GetNetNS(podSandboxID string) (string, error) {
return getNetworkNamespace(r), nil return getNetworkNamespace(r), nil
} }
// dockerNetworkHost implements network.Host by wrapping the legacy host // GetPodPortMappings returns the port mappings of the given podSandbox ID.
// passed in by the kubelet and adding NamespaceGetter methods. The legacy func (ds *dockerService) GetPodPortMappings(podSandboxID string) ([]*hostport.PortMapping, error) {
// host methods are slated for deletion. // TODO: get portmappings from docker labels for backward compatibility
type dockerNetworkHost struct { checkpoint, err := ds.checkpointHandler.GetCheckpoint(podSandboxID)
network.LegacyHost if err != nil {
*namespaceGetter return nil, err
}
portMappings := []*hostport.PortMapping{}
for _, pm := range checkpoint.Data.PortMappings {
proto := toAPIProtocol(*pm.Protocol)
portMappings = append(portMappings, &hostport.PortMapping{
HostPort: *pm.HostPort,
ContainerPort: *pm.ContainerPort,
Protocol: proto,
})
}
return portMappings, nil
} }
// Start initializes and starts components in dockerService. // Start initializes and starts components in dockerService.
@ -351,3 +389,14 @@ func (ds *dockerService) getDockerVersionFromCache() (*dockertypes.Version, erro
} }
return dv, nil return dv, nil
} }
func toAPIProtocol(protocol Protocol) v1.Protocol {
switch protocol {
case protocolTCP:
return v1.ProtocolTCP
case protocolUDP:
return v1.ProtocolUDP
}
glog.Warningf("Unknown protocol %q: defaulting to TCP", protocol)
return v1.ProtocolTCP
}

View File

@ -484,7 +484,7 @@ func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *Kub
} }
glog.Infof("Hairpin mode set to %q", klet.hairpinMode) glog.Infof("Hairpin mode set to %q", klet.hairpinMode)
if plug, err := network.InitNetworkPlugin(kubeDeps.NetworkPlugins, kubeCfg.NetworkPluginName, &criNetworkHost{&networkHost{klet}}, klet.hairpinMode, klet.nonMasqueradeCIDR, int(kubeCfg.NetworkPluginMTU)); err != nil { if plug, err := network.InitNetworkPlugin(kubeDeps.NetworkPlugins, kubeCfg.NetworkPluginName, &criNetworkHost{&networkHost{klet}, &network.NoopPortMappingGetter{}}, klet.hairpinMode, klet.nonMasqueradeCIDR, int(kubeCfg.NetworkPluginMTU)); err != nil {
return nil, err return nil, err
} else { } else {
klet.networkPlugin = plug klet.networkPlugin = plug

View File

@ -20,6 +20,7 @@ go_library(
"//pkg/apis/componentconfig:go_default_library", "//pkg/apis/componentconfig:go_default_library",
"//pkg/client/clientset_generated/clientset:go_default_library", "//pkg/client/clientset_generated/clientset:go_default_library",
"//pkg/kubelet/container:go_default_library", "//pkg/kubelet/container:go_default_library",
"//pkg/kubelet/network/hostport:go_default_library",
"//pkg/util/exec:go_default_library", "//pkg/util/exec:go_default_library",
"//pkg/util/sysctl:go_default_library", "//pkg/util/sysctl:go_default_library",
"//vendor:github.com/golang/glog", "//vendor:github.com/golang/glog",

View File

@ -37,6 +37,7 @@ go_test(
"//pkg/kubelet/container/testing:go_default_library", "//pkg/kubelet/container/testing:go_default_library",
"//pkg/kubelet/network:go_default_library", "//pkg/kubelet/network:go_default_library",
"//pkg/kubelet/network/cni/testing:go_default_library", "//pkg/kubelet/network/cni/testing:go_default_library",
"//pkg/kubelet/network/testing:go_default_library",
"//pkg/util/exec:go_default_library", "//pkg/util/exec:go_default_library",
"//vendor:github.com/containernetworking/cni/pkg/types", "//vendor:github.com/containernetworking/cni/pkg/types",
"//vendor:github.com/stretchr/testify/mock", "//vendor:github.com/stretchr/testify/mock",

View File

@ -39,6 +39,7 @@ import (
containertest "k8s.io/kubernetes/pkg/kubelet/container/testing" containertest "k8s.io/kubernetes/pkg/kubelet/container/testing"
"k8s.io/kubernetes/pkg/kubelet/network" "k8s.io/kubernetes/pkg/kubelet/network"
"k8s.io/kubernetes/pkg/kubelet/network/cni/testing" "k8s.io/kubernetes/pkg/kubelet/network/cni/testing"
networktest "k8s.io/kubernetes/pkg/kubelet/network/testing"
utilexec "k8s.io/kubernetes/pkg/util/exec" utilexec "k8s.io/kubernetes/pkg/util/exec"
) )
@ -111,6 +112,7 @@ func tearDownPlugin(tmpDir string) {
} }
type fakeNetworkHost struct { type fakeNetworkHost struct {
networktest.FakePortMappingGetter
kubeClient clientset.Interface kubeClient clientset.Interface
runtime kubecontainer.Runtime runtime kubecontainer.Runtime
} }

View File

@ -12,6 +12,8 @@ go_library(
name = "go_default_library", name = "go_default_library",
srcs = [ srcs = [
"fake_iptables.go", "fake_iptables.go",
"hostport.go",
"hostport_manager.go",
"hostport_syncer.go", "hostport_syncer.go",
], ],
tags = ["automanaged"], tags = ["automanaged"],
@ -22,17 +24,23 @@ go_library(
"//pkg/util/exec:go_default_library", "//pkg/util/exec:go_default_library",
"//pkg/util/iptables:go_default_library", "//pkg/util/iptables:go_default_library",
"//vendor:github.com/golang/glog", "//vendor:github.com/golang/glog",
"//vendor:k8s.io/apimachinery/pkg/util/errors",
], ],
) )
go_test( go_test(
name = "go_default_test", name = "go_default_test",
srcs = ["hostport_syncer_test.go"], srcs = [
"hostport_manager_test.go",
"hostport_syncer_test.go",
"hostport_test.go",
],
library = ":go_default_library", library = ":go_default_library",
tags = ["automanaged"], tags = ["automanaged"],
deps = [ deps = [
"//pkg/api/v1:go_default_library", "//pkg/api/v1:go_default_library",
"//pkg/util/iptables:go_default_library", "//pkg/util/iptables:go_default_library",
"//vendor:github.com/stretchr/testify/assert",
], ],
) )

View File

@ -300,11 +300,22 @@ func (f *fakeIPTables) restore(restoreTableName utiliptables.Table, data []byte,
if err != nil { if err != nil {
return err return err
} }
} else if strings.HasPrefix(line, "-X") { } else if strings.HasPrefix(line, "-I") {
parts := strings.Split(line, " ") parts := strings.Split(line, " ")
if len(parts) < 3 { if len(parts) < 3 {
return fmt.Errorf("Invalid iptables rule '%s'", line) return fmt.Errorf("Invalid iptables rule '%s'", line)
} }
chainName := utiliptables.Chain(parts[1])
rule := strings.TrimPrefix(line, fmt.Sprintf("-I %s ", chainName))
_, err := f.ensureRule(utiliptables.Prepend, tableName, chainName, rule)
if err != nil {
return err
}
} else if strings.HasPrefix(line, "-X") {
parts := strings.Split(line, " ")
if len(parts) < 2 {
return fmt.Errorf("Invalid iptables rule '%s'", line)
}
if err := f.DeleteChain(tableName, utiliptables.Chain(parts[1])); err != nil { if err := f.DeleteChain(tableName, utiliptables.Chain(parts[1])); err != nil {
return err return err
} }

View File

@ -0,0 +1,171 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package hostport
import (
"fmt"
"github.com/golang/glog"
"net"
"strings"
"k8s.io/kubernetes/pkg/api/v1"
utiliptables "k8s.io/kubernetes/pkg/util/iptables"
)
const (
// the hostport chain
kubeHostportsChain utiliptables.Chain = "KUBE-HOSTPORTS"
// prefix for hostport chains
kubeHostportChainPrefix string = "KUBE-HP-"
)
// PortMapping represents a network port in a container
type PortMapping struct {
Name string
HostPort int32
ContainerPort int32
Protocol v1.Protocol
HostIP string
}
// PodPortMapping represents a pod's network state and associated container port mappings
type PodPortMapping struct {
Namespace string
Name string
PortMappings []*PortMapping
HostNetwork bool
IP net.IP
}
type hostport struct {
port int32
protocol string
}
type hostportOpener func(*hostport) (closeable, error)
type closeable interface {
Close() error
}
func openLocalPort(hp *hostport) (closeable, error) {
// For ports on node IPs, open the actual port and hold it, even though we
// use iptables to redirect traffic.
// This ensures a) that it's safe to use that port and b) that (a) stays
// true. The risk is that some process on the node (e.g. sshd or kubelet)
// is using a port and we give that same port out to a Service. That would
// be bad because iptables would silently claim the traffic but the process
// would never know.
// NOTE: We should not need to have a real listen()ing socket - bind()
// should be enough, but I can't figure out a way to e2e test without
// it. Tools like 'ss' and 'netstat' do not show sockets that are
// bind()ed but not listen()ed, and at least the default debian netcat
// has no way to avoid about 10 seconds of retries.
var socket closeable
switch hp.protocol {
case "tcp":
listener, err := net.Listen("tcp", fmt.Sprintf(":%d", hp.port))
if err != nil {
return nil, err
}
socket = listener
case "udp":
addr, err := net.ResolveUDPAddr("udp", fmt.Sprintf(":%d", hp.port))
if err != nil {
return nil, err
}
conn, err := net.ListenUDP("udp", addr)
if err != nil {
return nil, err
}
socket = conn
default:
return nil, fmt.Errorf("unknown protocol %q", hp.protocol)
}
glog.V(3).Infof("Opened local port %s", hp.String())
return socket, nil
}
// openHostports opens all given hostports using the given hostportOpener
// If encounter any error, clean up and return the error
// If all ports are opened successfully, return the hostport and socket mapping
// TODO: move openHostports and closeHostports into a common struct
func openHostports(portOpener hostportOpener, podPortMapping *PodPortMapping) (map[hostport]closeable, error) {
var retErr error
ports := make(map[hostport]closeable)
for _, pm := range podPortMapping.PortMappings {
if pm.HostPort <= 0 {
continue
}
hp := portMappingToHostport(pm)
socket, err := portOpener(&hp)
if err != nil {
retErr = fmt.Errorf("cannot open hostport %d for pod %s: %v", pm.HostPort, getPodFullName(podPortMapping), err)
break
}
ports[hp] = socket
}
// If encounter any error, close all hostports that just got opened.
if retErr != nil {
for hp, socket := range ports {
if err := socket.Close(); err != nil {
glog.Errorf("Cannot clean up hostport %d for pod %s: %v", hp.port, getPodFullName(podPortMapping), err)
}
}
return nil, retErr
}
return ports, nil
}
// portMappingToHostport creates hostport structure based on input portmapping
func portMappingToHostport(portMapping *PortMapping) hostport {
return hostport{
port: portMapping.HostPort,
protocol: strings.ToLower(string(portMapping.Protocol)),
}
}
// ensureKubeHostportChains ensures the KUBE-HOSTPORTS chain is setup correctly
func ensureKubeHostportChains(iptables utiliptables.Interface, natInterfaceName string) error {
glog.V(4).Info("Ensuring kubelet hostport chains")
// Ensure kubeHostportChain
if _, err := iptables.EnsureChain(utiliptables.TableNAT, kubeHostportsChain); err != nil {
return fmt.Errorf("Failed to ensure that %s chain %s exists: %v", utiliptables.TableNAT, kubeHostportsChain, err)
}
tableChainsNeedJumpServices := []struct {
table utiliptables.Table
chain utiliptables.Chain
}{
{utiliptables.TableNAT, utiliptables.ChainOutput},
{utiliptables.TableNAT, utiliptables.ChainPrerouting},
}
args := []string{"-m", "comment", "--comment", "kube hostport portals",
"-m", "addrtype", "--dst-type", "LOCAL",
"-j", string(kubeHostportsChain)}
for _, tc := range tableChainsNeedJumpServices {
if _, err := iptables.EnsureRule(utiliptables.Prepend, tc.table, tc.chain, args...); err != nil {
return fmt.Errorf("Failed to ensure that %s chain %s jumps to %s: %v", tc.table, tc.chain, kubeHostportsChain, err)
}
}
// Need to SNAT traffic from localhost
args = []string{"-m", "comment", "--comment", "SNAT for localhost access to hostports", "-o", natInterfaceName, "-s", "127.0.0.0/8", "-j", "MASQUERADE"}
if _, err := iptables.EnsureRule(utiliptables.Append, utiliptables.TableNAT, utiliptables.ChainPostrouting, args...); err != nil {
return fmt.Errorf("Failed to ensure that %s chain %s jumps to MASQUERADE: %v", utiliptables.TableNAT, utiliptables.ChainPostrouting, err)
}
return nil
}

View File

@ -0,0 +1,328 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package hostport
import (
"bytes"
"crypto/sha256"
"encoding/base32"
"fmt"
"strings"
"sync"
"github.com/golang/glog"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
iptablesproxy "k8s.io/kubernetes/pkg/proxy/iptables"
utildbus "k8s.io/kubernetes/pkg/util/dbus"
utilexec "k8s.io/kubernetes/pkg/util/exec"
utiliptables "k8s.io/kubernetes/pkg/util/iptables"
)
// HostPortManager is an interface for adding and removing hostport for a given pod sandbox.
type HostPortManager interface {
// Add implements port mappings.
// id should be a unique identifier for a pod, e.g. podSandboxID.
// podPortMapping is the associated port mapping information for the pod.
// natInterfaceName is the interface that localhost used to talk to the given pod.
Add(id string, podPortMapping *PodPortMapping, natInterfaceName string) error
// Remove cleans up matching port mappings
// Remove must be able to clean up port mappings without pod IP
Remove(id string, podPortMapping *PodPortMapping) error
}
type hostportManager struct {
hostPortMap map[hostport]closeable
iptables utiliptables.Interface
portOpener hostportOpener
mu sync.Mutex
}
func NewHostportManager() HostPortManager {
iptInterface := utiliptables.New(utilexec.New(), utildbus.New(), utiliptables.ProtocolIpv4)
return &hostportManager{
hostPortMap: make(map[hostport]closeable),
iptables: iptInterface,
portOpener: openLocalPort,
}
}
func (hm *hostportManager) Add(id string, podPortMapping *PodPortMapping, natInterfaceName string) (err error) {
if podPortMapping == nil || podPortMapping.HostNetwork {
return nil
}
podFullName := getPodFullName(podPortMapping)
// skip if there is no hostport needed
hostportMappings := gatherHostportMappings(podPortMapping)
if len(hostportMappings) == 0 {
return nil
}
if podPortMapping.IP.To4() == nil {
return fmt.Errorf("invalid or missing IP of pod %s", podFullName)
}
podIP := podPortMapping.IP.String()
if err = ensureKubeHostportChains(hm.iptables, natInterfaceName); err != nil {
return err
}
// Ensure atomicity for port opening and iptables operations
hm.mu.Lock()
defer hm.mu.Unlock()
// try to open hostports
ports, err := openHostports(hm.portOpener, podPortMapping)
if err != nil {
return err
}
for hostport, socket := range ports {
hm.hostPortMap[hostport] = socket
}
natChains := bytes.NewBuffer(nil)
natRules := bytes.NewBuffer(nil)
writeLine(natChains, "*nat")
existingChains, existingRules, err := getExistingHostportIPTablesRules(hm.iptables)
if err != nil {
// clean up opened host port if encounter any error
return utilerrors.NewAggregate([]error{err, hm.closeHostports(hostportMappings)})
}
newChains := []utiliptables.Chain{}
for _, pm := range hostportMappings {
protocol := strings.ToLower(string(pm.Protocol))
chain := getHostportChain(id, pm)
newChains = append(newChains, chain)
// Add new hostport chain
writeLine(natChains, utiliptables.MakeChainLine(chain))
// Prepend the new chain to KUBE-HOSTPORTS
// This avoids any leaking iptables rule that takes up the same port
writeLine(natRules, "-I", string(kubeHostportsChain),
"-m", "comment", "--comment", fmt.Sprintf(`"%s hostport %d"`, podFullName, pm.HostPort),
"-m", protocol, "-p", protocol, "--dport", fmt.Sprintf("%d", pm.HostPort),
"-j", string(chain),
)
// SNAT if the traffic comes from the pod itself
writeLine(natRules, "-A", string(chain),
"-m", "comment", "--comment", fmt.Sprintf(`"%s hostport %d"`, podFullName, pm.HostPort),
"-s", podIP,
"-j", string(iptablesproxy.KubeMarkMasqChain))
// DNAT to the podIP:containerPort
writeLine(natRules, "-A", string(chain),
"-m", "comment", "--comment", fmt.Sprintf(`"%s hostport %d"`, podFullName, pm.HostPort),
"-m", protocol, "-p", protocol,
"-j", "DNAT", fmt.Sprintf("--to-destination=%s:%d", podIP, pm.ContainerPort))
}
// getHostportChain should be able to provide unique hostport chain name using hash
// if there is a chain conflict or multiple Adds have been triggered for a single pod,
// filtering should be able to avoid further problem
filterChains(existingChains, newChains)
existingRules = filterRules(existingRules, newChains)
for _, chain := range existingChains {
writeLine(natChains, chain)
}
for _, rule := range existingRules {
writeLine(natRules, rule)
}
writeLine(natRules, "COMMIT")
if err = hm.syncIPTables(append(natChains.Bytes(), natRules.Bytes()...)); err != nil {
// clean up opened host port if encounter any error
return utilerrors.NewAggregate([]error{err, hm.closeHostports(hostportMappings)})
}
return nil
}
func (hm *hostportManager) Remove(id string, podPortMapping *PodPortMapping) (err error) {
if podPortMapping == nil || podPortMapping.HostNetwork {
return nil
}
hostportMappings := gatherHostportMappings(podPortMapping)
if len(hostportMappings) <= 0 {
return nil
}
// Ensure atomicity for port closing and iptables operations
hm.mu.Lock()
defer hm.mu.Unlock()
var existingChains map[utiliptables.Chain]string
var existingRules []string
existingChains, existingRules, err = getExistingHostportIPTablesRules(hm.iptables)
if err != nil {
return err
}
// Gather target hostport chains for removal
chainsToRemove := []utiliptables.Chain{}
for _, pm := range hostportMappings {
chainsToRemove = append(chainsToRemove, getHostportChain(id, pm))
// To preserve backward compatibility for k8s 1.5 or earlier.
// Need to remove hostport chains added by hostportSyncer if there is any
// TODO: remove this in 1.7
chainsToRemove = append(chainsToRemove, hostportChainName(pm, getPodFullName(podPortMapping)))
}
// remove rules that consists of target chains
remainingRules := filterRules(existingRules, chainsToRemove)
// gather target hostport chains that exists in iptables-save result
existingChainsToRemove := []utiliptables.Chain{}
for _, chain := range chainsToRemove {
if _, ok := existingChains[chain]; ok {
existingChainsToRemove = append(existingChainsToRemove, chain)
}
}
natChains := bytes.NewBuffer(nil)
natRules := bytes.NewBuffer(nil)
writeLine(natChains, "*nat")
for _, chain := range existingChains {
writeLine(natChains, chain)
}
for _, rule := range remainingRules {
writeLine(natRules, rule)
}
for _, chain := range existingChainsToRemove {
writeLine(natRules, "-X", string(chain))
}
writeLine(natRules, "COMMIT")
if err = hm.syncIPTables(append(natChains.Bytes(), natRules.Bytes()...)); err != nil {
return err
}
// clean up opened pod host ports
return hm.closeHostports(hostportMappings)
}
// syncIPTables executes iptables-restore with given lines
func (hm *hostportManager) syncIPTables(lines []byte) error {
glog.V(3).Infof("Restoring iptables rules: %s", lines)
err := hm.iptables.RestoreAll(lines, utiliptables.NoFlushTables, utiliptables.RestoreCounters)
if err != nil {
return fmt.Errorf("Failed to execute iptables-restore: %v", err)
}
return nil
}
// closeHostports tries to close all the listed host ports
// TODO: move closeHostports and openHostports into a common struct
func (hm *hostportManager) closeHostports(hostportMappings []*PortMapping) error {
errList := []error{}
for _, pm := range hostportMappings {
hp := portMappingToHostport(pm)
if socket, ok := hm.hostPortMap[hp]; ok {
glog.V(2).Infof("Closing host port %s", hp.String())
if err := socket.Close(); err != nil {
errList = append(errList, fmt.Errorf("failed to close host port %s: %v", hp.String(), err))
continue
}
delete(hm.hostPortMap, hp)
}
}
return utilerrors.NewAggregate(errList)
}
// getHostportChain takes id, hostport and protocol for a pod and returns associated iptables chain.
// This is computed by hashing (sha256) then encoding to base32 and truncating with the prefix
// "KUBE-HP-". We do this because IPTables Chain Names must be <= 28 chars long, and the longer
// they are the harder they are to read.
// WARNING: Please do not change this function. Otherwise, HostportManager may not be able to
// identify existing iptables chains.
func getHostportChain(id string, pm *PortMapping) utiliptables.Chain {
hash := sha256.Sum256([]byte(id + string(pm.HostPort) + string(pm.Protocol)))
encoded := base32.StdEncoding.EncodeToString(hash[:])
return utiliptables.Chain(kubeHostportChainPrefix + encoded[:16])
}
// gatherHostportMappings returns all the PortMappings which has hostport for a pod
func gatherHostportMappings(podPortMapping *PodPortMapping) []*PortMapping {
mappings := []*PortMapping{}
for _, pm := range podPortMapping.PortMappings {
if pm.HostPort <= 0 {
continue
}
mappings = append(mappings, pm)
}
return mappings
}
// getExistingHostportIPTablesRules retrieves raw data from iptables-save, parse it,
// return all the hostport related chains and rules
func getExistingHostportIPTablesRules(iptables utiliptables.Interface) (map[utiliptables.Chain]string, []string, error) {
iptablesSaveRaw, err := iptables.Save(utiliptables.TableNAT)
if err != nil { // if we failed to get any rules
return nil, nil, fmt.Errorf("failed to execute iptables-save: %v", err)
}
existingNATChains := utiliptables.GetChainLines(utiliptables.TableNAT, iptablesSaveRaw)
existingHostportChains := make(map[utiliptables.Chain]string)
existingHostportRules := []string{}
for chain := range existingNATChains {
if strings.HasPrefix(string(chain), string(kubeHostportsChain)) || strings.HasPrefix(string(chain), kubeHostportChainPrefix) {
existingHostportChains[chain] = existingNATChains[chain]
}
}
for _, line := range strings.Split(string(iptablesSaveRaw), "\n") {
if strings.HasPrefix(line, fmt.Sprintf("-A %s", kubeHostportChainPrefix)) ||
strings.HasPrefix(line, fmt.Sprintf("-A %s", string(kubeHostportsChain))) {
existingHostportRules = append(existingHostportRules, line)
}
}
return existingHostportChains, existingHostportRules, nil
}
// filterRules filters input rules with input chains. Rules that did not involve any filter chain will be returned.
// The order of the input rules is important and is preserved.
func filterRules(rules []string, filters []utiliptables.Chain) []string {
filtered := []string{}
for _, rule := range rules {
skip := false
for _, filter := range filters {
if strings.Contains(rule, string(filter)) {
skip = true
break
}
}
if !skip {
filtered = append(filtered, rule)
}
}
return filtered
}
// filterChains deletes all entries of filter chains from chain map
func filterChains(chains map[utiliptables.Chain]string, filterChains []utiliptables.Chain) {
for _, chain := range filterChains {
if _, ok := chains[chain]; ok {
delete(chains, chain)
}
}
}

View File

@ -0,0 +1,197 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package hostport
import (
"net"
"testing"
"github.com/stretchr/testify/assert"
"k8s.io/kubernetes/pkg/api/v1"
utiliptables "k8s.io/kubernetes/pkg/util/iptables"
"strings"
)
func NewFakeHostportManager() HostPortManager {
return &hostportManager{
hostPortMap: make(map[hostport]closeable),
iptables: NewFakeIPTables(),
portOpener: NewFakeSocketManager().openFakeSocket,
}
}
func TestHostportManager(t *testing.T) {
iptables := NewFakeIPTables()
portOpener := NewFakeSocketManager()
manager := &hostportManager{
hostPortMap: make(map[hostport]closeable),
iptables: iptables,
portOpener: portOpener.openFakeSocket,
}
testCases := []struct {
mapping *PodPortMapping
expectError bool
}{
{
mapping: &PodPortMapping{
Name: "pod1",
Namespace: "ns1",
IP: net.ParseIP("10.1.1.2"),
HostNetwork: false,
PortMappings: []*PortMapping{
{
HostPort: 8080,
ContainerPort: 80,
Protocol: v1.ProtocolTCP,
},
{
HostPort: 8081,
ContainerPort: 81,
Protocol: v1.ProtocolUDP,
},
},
},
expectError: false,
},
{
mapping: &PodPortMapping{
Name: "pod2",
Namespace: "ns1",
IP: net.ParseIP("10.1.1.3"),
HostNetwork: false,
PortMappings: []*PortMapping{
{
HostPort: 8082,
ContainerPort: 80,
Protocol: v1.ProtocolTCP,
},
{
HostPort: 8081,
ContainerPort: 81,
Protocol: v1.ProtocolUDP,
},
},
},
expectError: true,
},
{
mapping: &PodPortMapping{
Name: "pod3",
Namespace: "ns1",
IP: net.ParseIP("10.1.1.4"),
HostNetwork: false,
PortMappings: []*PortMapping{
{
HostPort: 8443,
ContainerPort: 443,
Protocol: v1.ProtocolTCP,
},
},
},
expectError: false,
},
}
// Add Hostports
for _, tc := range testCases {
err := manager.Add("id", tc.mapping, "cbr0")
if tc.expectError {
assert.Error(t, err)
continue
}
assert.NoError(t, err)
}
// Check port opened
expectedPorts := []hostport{{8080, "tcp"}, {8081, "udp"}, {8443, "tcp"}}
openedPorts := make(map[hostport]bool)
for hp, port := range portOpener.mem {
if !port.closed {
openedPorts[hp] = true
}
}
assert.EqualValues(t, len(openedPorts), len(expectedPorts))
for _, hp := range expectedPorts {
_, ok := openedPorts[hp]
assert.EqualValues(t, true, ok)
}
// Check Iptables-save result after adding hostports
raw, err := iptables.Save(utiliptables.TableNAT)
assert.NoError(t, err)
lines := strings.Split(string(raw), "\n")
expectedLines := map[string]bool{
`*nat`: true,
`:KUBE-HOSTPORTS - [0:0]`: true,
`:OUTPUT - [0:0]`: true,
`:PREROUTING - [0:0]`: true,
`:POSTROUTING - [0:0]`: true,
`:KUBE-HP-4YVONL46AKYWSKS3 - [0:0]`: true,
`:KUBE-HP-7THKRFSEH4GIIXK7 - [0:0]`: true,
`:KUBE-HP-5N7UH5JAXCVP5UJR - [0:0]`: true,
"-A KUBE-HOSTPORTS -m comment --comment \"pod3_ns1 hostport 8443\" -m tcp -p tcp --dport 8443 -j KUBE-HP-5N7UH5JAXCVP5UJR": true,
"-A KUBE-HOSTPORTS -m comment --comment \"pod1_ns1 hostport 8081\" -m udp -p udp --dport 8081 -j KUBE-HP-7THKRFSEH4GIIXK7": true,
"-A KUBE-HOSTPORTS -m comment --comment \"pod1_ns1 hostport 8080\" -m tcp -p tcp --dport 8080 -j KUBE-HP-4YVONL46AKYWSKS3": true,
"-A OUTPUT -m comment --comment \"kube hostport portals\" -m addrtype --dst-type LOCAL -j KUBE-HOSTPORTS": true,
"-A PREROUTING -m comment --comment \"kube hostport portals\" -m addrtype --dst-type LOCAL -j KUBE-HOSTPORTS": true,
"-A POSTROUTING -m comment --comment \"SNAT for localhost access to hostports\" -o cbr0 -s 127.0.0.0/8 -j MASQUERADE": true,
"-A KUBE-HP-4YVONL46AKYWSKS3 -m comment --comment \"pod1_ns1 hostport 8080\" -s 10.1.1.2/32 -j KUBE-MARK-MASQ": true,
"-A KUBE-HP-4YVONL46AKYWSKS3 -m comment --comment \"pod1_ns1 hostport 8080\" -m tcp -p tcp -j DNAT --to-destination 10.1.1.2:80": true,
"-A KUBE-HP-7THKRFSEH4GIIXK7 -m comment --comment \"pod1_ns1 hostport 8081\" -s 10.1.1.2/32 -j KUBE-MARK-MASQ": true,
"-A KUBE-HP-7THKRFSEH4GIIXK7 -m comment --comment \"pod1_ns1 hostport 8081\" -m udp -p udp -j DNAT --to-destination 10.1.1.2:81": true,
"-A KUBE-HP-5N7UH5JAXCVP5UJR -m comment --comment \"pod3_ns1 hostport 8443\" -s 10.1.1.4/32 -j KUBE-MARK-MASQ": true,
"-A KUBE-HP-5N7UH5JAXCVP5UJR -m comment --comment \"pod3_ns1 hostport 8443\" -m tcp -p tcp -j DNAT --to-destination 10.1.1.4:443": true,
`COMMIT`: true,
}
for _, line := range lines {
if len(strings.TrimSpace(line)) > 0 {
_, ok := expectedLines[strings.TrimSpace(line)]
assert.EqualValues(t, true, ok)
}
}
// Remove all added hostports
for _, tc := range testCases {
if !tc.expectError {
err := manager.Remove("id", tc.mapping)
assert.NoError(t, err)
}
}
// Check Iptables-save result after deleting hostports
raw, err = iptables.Save(utiliptables.TableNAT)
assert.NoError(t, err)
lines = strings.Split(string(raw), "\n")
remainingChains := make(map[string]bool)
for _, line := range lines {
if strings.HasPrefix(line, ":") {
remainingChains[strings.TrimSpace(line)] = true
}
}
expectDeletedChains := []string{"KUBE-HP-4YVONL46AKYWSKS3", "KUBE-HP-7THKRFSEH4GIIXK7", "KUBE-HP-5N7UH5JAXCVP5UJR"}
for _, chain := range expectDeletedChains {
_, ok := remainingChains[chain]
assert.EqualValues(t, false, ok)
}
// check if all ports are closed
for _, port := range portOpener.mem {
assert.EqualValues(t, true, port.closed)
}
}

View File

@ -21,26 +21,17 @@ import (
"crypto/sha256" "crypto/sha256"
"encoding/base32" "encoding/base32"
"fmt" "fmt"
"net"
"strings" "strings"
"time" "time"
"github.com/golang/glog" "github.com/golang/glog"
"k8s.io/kubernetes/pkg/api/v1"
iptablesproxy "k8s.io/kubernetes/pkg/proxy/iptables" iptablesproxy "k8s.io/kubernetes/pkg/proxy/iptables"
utildbus "k8s.io/kubernetes/pkg/util/dbus" utildbus "k8s.io/kubernetes/pkg/util/dbus"
utilexec "k8s.io/kubernetes/pkg/util/exec" utilexec "k8s.io/kubernetes/pkg/util/exec"
utiliptables "k8s.io/kubernetes/pkg/util/iptables" utiliptables "k8s.io/kubernetes/pkg/util/iptables"
) )
const (
// the hostport chain
kubeHostportsChain utiliptables.Chain = "KUBE-HOSTPORTS"
// prefix for hostport chains
kubeHostportChainPrefix string = "KUBE-HP-"
)
// HostportSyncer takes a list of PodPortMappings and implements hostport all at once // HostportSyncer takes a list of PodPortMappings and implements hostport all at once
type HostportSyncer interface { type HostportSyncer interface {
// SyncHostports gathers all hostports on node and setup iptables rules to enable them. // SyncHostports gathers all hostports on node and setup iptables rules to enable them.
@ -52,26 +43,6 @@ type HostportSyncer interface {
OpenPodHostportsAndSync(newPortMapping *PodPortMapping, natInterfaceName string, activePodPortMappings []*PodPortMapping) error OpenPodHostportsAndSync(newPortMapping *PodPortMapping, natInterfaceName string, activePodPortMappings []*PodPortMapping) error
} }
// PortMapping represents a network port in a container
type PortMapping struct {
Name string
HostPort int32
ContainerPort int32
Protocol v1.Protocol
HostIP string
}
// PodPortMapping represents a pod's network state and associated container port mappings
type PodPortMapping struct {
Namespace string
Name string
PortMappings []*PortMapping
HostNetwork bool
IP net.IP
}
type hostportOpener func(*hostport) (closeable, error)
type hostportSyncer struct { type hostportSyncer struct {
hostPortMap map[hostport]closeable hostPortMap map[hostport]closeable
iptables utiliptables.Interface iptables utiliptables.Interface
@ -87,15 +58,6 @@ func NewHostportSyncer() HostportSyncer {
} }
} }
type closeable interface {
Close() error
}
type hostport struct {
port int32
protocol string
}
type targetPod struct { type targetPod struct {
podFullName string podFullName string
podIP string podIP string
@ -120,7 +82,7 @@ func (h *hostportSyncer) openHostports(podHostportMapping *PodPortMapping) error
} }
socket, err := h.portOpener(&hp) socket, err := h.portOpener(&hp)
if err != nil { if err != nil {
retErr = fmt.Errorf("Cannot open hostport %d for pod %s: %v", port.HostPort, getPodFullName(podHostportMapping), err) retErr = fmt.Errorf("cannot open hostport %d for pod %s: %v", port.HostPort, getPodFullName(podHostportMapping), err)
break break
} }
ports[hp] = socket ports[hp] = socket
@ -222,31 +184,8 @@ func (h *hostportSyncer) SyncHostports(natInterfaceName string, activePodPortMap
return err return err
} }
glog.V(4).Info("Ensuring kubelet hostport chains") // Ensure KUBE-HOSTPORTS chains
// Ensure kubeHostportChain ensureKubeHostportChains(h.iptables, natInterfaceName)
if _, err := h.iptables.EnsureChain(utiliptables.TableNAT, kubeHostportsChain); err != nil {
return fmt.Errorf("Failed to ensure that %s chain %s exists: %v", utiliptables.TableNAT, kubeHostportsChain, err)
}
tableChainsNeedJumpServices := []struct {
table utiliptables.Table
chain utiliptables.Chain
}{
{utiliptables.TableNAT, utiliptables.ChainOutput},
{utiliptables.TableNAT, utiliptables.ChainPrerouting},
}
args := []string{"-m", "comment", "--comment", "kube hostport portals",
"-m", "addrtype", "--dst-type", "LOCAL",
"-j", string(kubeHostportsChain)}
for _, tc := range tableChainsNeedJumpServices {
if _, err := h.iptables.EnsureRule(utiliptables.Prepend, tc.table, tc.chain, args...); err != nil {
return fmt.Errorf("Failed to ensure that %s chain %s jumps to %s: %v", tc.table, tc.chain, kubeHostportsChain, err)
}
}
// Need to SNAT traffic from localhost
args = []string{"-m", "comment", "--comment", "SNAT for localhost access to hostports", "-o", natInterfaceName, "-s", "127.0.0.0/8", "-j", "MASQUERADE"}
if _, err := h.iptables.EnsureRule(utiliptables.Append, utiliptables.TableNAT, utiliptables.ChainPostrouting, args...); err != nil {
return fmt.Errorf("Failed to ensure that %s chain %s jumps to MASQUERADE: %v", utiliptables.TableNAT, utiliptables.ChainPostrouting, err)
}
// Get iptables-save output so we can check for existing chains and rules. // Get iptables-save output so we can check for existing chains and rules.
// This will be a map of chain name to chain with rules as stored in iptables-save/iptables-restore // This will be a map of chain name to chain with rules as stored in iptables-save/iptables-restore
@ -341,44 +280,6 @@ func (h *hostportSyncer) SyncHostports(natInterfaceName string, activePodPortMap
return nil return nil
} }
func openLocalPort(hp *hostport) (closeable, error) {
// For ports on node IPs, open the actual port and hold it, even though we
// use iptables to redirect traffic.
// This ensures a) that it's safe to use that port and b) that (a) stays
// true. The risk is that some process on the node (e.g. sshd or kubelet)
// is using a port and we give that same port out to a Service. That would
// be bad because iptables would silently claim the traffic but the process
// would never know.
// NOTE: We should not need to have a real listen()ing socket - bind()
// should be enough, but I can't figure out a way to e2e test without
// it. Tools like 'ss' and 'netstat' do not show sockets that are
// bind()ed but not listen()ed, and at least the default debian netcat
// has no way to avoid about 10 seconds of retries.
var socket closeable
switch hp.protocol {
case "tcp":
listener, err := net.Listen("tcp", fmt.Sprintf(":%d", hp.port))
if err != nil {
return nil, err
}
socket = listener
case "udp":
addr, err := net.ResolveUDPAddr("udp", fmt.Sprintf(":%d", hp.port))
if err != nil {
return nil, err
}
conn, err := net.ListenUDP("udp", addr)
if err != nil {
return nil, err
}
socket = conn
default:
return nil, fmt.Errorf("unknown protocol %q", hp.protocol)
}
glog.V(3).Infof("Opened local port %s", hp.String())
return socket, nil
}
// cleanupHostportMap closes obsolete hostports // cleanupHostportMap closes obsolete hostports
func (h *hostportSyncer) cleanupHostportMap(containerPortMap map[*PortMapping]targetPod) { func (h *hostportSyncer) cleanupHostportMap(containerPortMap map[*PortMapping]targetPod) {
// compute hostports that are supposed to be open // compute hostports that are supposed to be open

View File

@ -17,7 +17,6 @@ limitations under the License.
package hostport package hostport
import ( import (
"fmt"
"net" "net"
"reflect" "reflect"
"strings" "strings"
@ -27,24 +26,6 @@ import (
utiliptables "k8s.io/kubernetes/pkg/util/iptables" utiliptables "k8s.io/kubernetes/pkg/util/iptables"
) )
type fakeSocket struct {
port int32
protocol string
closed bool
}
func (f *fakeSocket) Close() error {
if f.closed {
return fmt.Errorf("Socket %q.%s already closed!", f.port, f.protocol)
}
f.closed = true
return nil
}
func openFakeSocket(hp *hostport) (closeable, error) {
return &fakeSocket{hp.port, hp.protocol, false}, nil
}
type ruleMatch struct { type ruleMatch struct {
hostport int hostport int
chain string chain string
@ -53,11 +34,12 @@ type ruleMatch struct {
func TestOpenPodHostports(t *testing.T) { func TestOpenPodHostports(t *testing.T) {
fakeIPTables := NewFakeIPTables() fakeIPTables := NewFakeIPTables()
fakeOpener := NewFakeSocketManager()
h := &hostportSyncer{ h := &hostportSyncer{
hostPortMap: make(map[hostport]closeable), hostPortMap: make(map[hostport]closeable),
iptables: fakeIPTables, iptables: fakeIPTables,
portOpener: openFakeSocket, portOpener: fakeOpener.openFakeSocket,
} }
tests := []struct { tests := []struct {

View File

@ -0,0 +1,152 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package hostport
import (
"fmt"
"testing"
"github.com/stretchr/testify/assert"
"k8s.io/kubernetes/pkg/api/v1"
utiliptables "k8s.io/kubernetes/pkg/util/iptables"
)
type fakeSocket struct {
port int32
protocol string
closed bool
}
func (f *fakeSocket) Close() error {
if f.closed {
return fmt.Errorf("Socket %q.%s already closed!", f.port, f.protocol)
}
f.closed = true
return nil
}
func NewFakeSocketManager() *fakeSocketManager {
return &fakeSocketManager{mem: make(map[hostport]*fakeSocket)}
}
type fakeSocketManager struct {
mem map[hostport]*fakeSocket
}
func (f *fakeSocketManager) openFakeSocket(hp *hostport) (closeable, error) {
if socket, ok := f.mem[*hp]; ok && !socket.closed {
return nil, fmt.Errorf("hostport is occupied")
}
fs := &fakeSocket{hp.port, hp.protocol, false}
f.mem[*hp] = fs
return fs, nil
}
func TestOpenHostports(t *testing.T) {
opener := NewFakeSocketManager()
testCases := []struct {
podPortMapping *PodPortMapping
expectError bool
}{
{
&PodPortMapping{
Namespace: "ns1",
Name: "n0",
},
false,
},
{
&PodPortMapping{
Namespace: "ns1",
Name: "n1",
PortMappings: []*PortMapping{
{HostPort: 80, Protocol: v1.Protocol("TCP")},
{HostPort: 8080, Protocol: v1.Protocol("TCP")},
{HostPort: 443, Protocol: v1.Protocol("TCP")},
},
},
false,
},
{
&PodPortMapping{
Namespace: "ns1",
Name: "n2",
PortMappings: []*PortMapping{
{HostPort: 80, Protocol: v1.Protocol("TCP")},
},
},
true,
},
{
&PodPortMapping{
Namespace: "ns1",
Name: "n3",
PortMappings: []*PortMapping{
{HostPort: 8081, Protocol: v1.Protocol("TCP")},
{HostPort: 8080, Protocol: v1.Protocol("TCP")},
},
},
true,
},
{
&PodPortMapping{
Namespace: "ns1",
Name: "n3",
PortMappings: []*PortMapping{
{HostPort: 8081, Protocol: v1.Protocol("TCP")},
},
},
false,
},
}
for _, tc := range testCases {
mapping, err := openHostports(opener.openFakeSocket, tc.podPortMapping)
if tc.expectError {
assert.Error(t, err)
continue
}
assert.NoError(t, err)
assert.EqualValues(t, len(mapping), len(tc.podPortMapping.PortMappings))
}
}
func TestEnsureKubeHostportChains(t *testing.T) {
interfaceName := "cbr0"
builtinChains := []string{"PREROUTING", "OUTPUT"}
jumpRule := "-m comment --comment \"kube hostport portals\" -m addrtype --dst-type LOCAL -j KUBE-HOSTPORTS"
masqRule := "-m comment --comment \"SNAT for localhost access to hostports\" -o cbr0 -s 127.0.0.0/8 -j MASQUERADE"
fakeIPTables := NewFakeIPTables()
assert.NoError(t, ensureKubeHostportChains(fakeIPTables, interfaceName))
_, _, err := fakeIPTables.getChain(utiliptables.TableNAT, utiliptables.Chain("KUBE-HOSTPORTS"))
assert.NoError(t, err)
_, chain, err := fakeIPTables.getChain(utiliptables.TableNAT, utiliptables.ChainPostrouting)
assert.NoError(t, err)
assert.EqualValues(t, len(chain.rules), 1)
assert.Contains(t, chain.rules[0], masqRule)
for _, chainName := range builtinChains {
_, chain, err := fakeIPTables.getChain(utiliptables.TableNAT, utiliptables.Chain(chainName))
assert.NoError(t, err)
assert.EqualValues(t, len(chain.rules), 1)
assert.Contains(t, chain.rules[0], jumpRule)
}
}

View File

@ -89,7 +89,11 @@ type kubenetNetworkPlugin struct {
execer utilexec.Interface execer utilexec.Interface
nsenterPath string nsenterPath string
hairpinMode componentconfig.HairpinMode hairpinMode componentconfig.HairpinMode
// kubenet can use either hostportSyncer and hostportManager to implement hostports
// Currently, if network host supports legacy features, hostportSyncer will be used,
// otherwise, hostportManager will be used.
hostportSyncer hostport.HostportSyncer hostportSyncer hostport.HostportSyncer
hostportManager hostport.HostPortManager
iptables utiliptables.Interface iptables utiliptables.Interface
sysctl utilsysctl.Interface sysctl utilsysctl.Interface
ebtables utilebtables.Interface ebtables utilebtables.Interface
@ -114,6 +118,7 @@ func NewPlugin(networkPluginDir string) network.NetworkPlugin {
sysctl: sysctl, sysctl: sysctl,
vendorDir: networkPluginDir, vendorDir: networkPluginDir,
hostportSyncer: hostport.NewHostportSyncer(), hostportSyncer: hostport.NewHostportSyncer(),
hostportManager: hostport.NewHostportManager(),
nonMasqueradeCIDR: "10.0.0.0/8", nonMasqueradeCIDR: "10.0.0.0/8",
} }
} }
@ -356,35 +361,48 @@ func (plugin *kubenetNetworkPlugin) setup(namespace string, name string, id kube
// The host can choose to not support "legacy" features. The remote // The host can choose to not support "legacy" features. The remote
// shim doesn't support it (#35457), but the kubelet does. // shim doesn't support it (#35457), but the kubelet does.
if !plugin.host.SupportsLegacyFeatures() { if plugin.host.SupportsLegacyFeatures() {
return nil // The first SetUpPod call creates the bridge; get a shaper for the sake of
} // initialization
shaper := plugin.shaper()
// The first SetUpPod call creates the bridge; get a shaper for the sake of ingress, egress, err := bandwidth.ExtractPodBandwidthResources(pod.Annotations)
// initialization if err != nil {
shaper := plugin.shaper() return fmt.Errorf("Error reading pod bandwidth annotations: %v", err)
}
if egress != nil || ingress != nil {
if err := shaper.ReconcileCIDR(fmt.Sprintf("%s/32", ip4.String()), egress, ingress); err != nil {
return fmt.Errorf("Failed to add pod to shaper: %v", err)
}
}
ingress, egress, err := bandwidth.ExtractPodBandwidthResources(pod.Annotations) // Open any hostports the pod's containers want
if err != nil { activePodPortMapping, err := plugin.getPodPortMapping()
return fmt.Errorf("Error reading pod bandwidth annotations: %v", err) if err != nil {
} return err
if egress != nil || ingress != nil { }
if err := shaper.ReconcileCIDR(fmt.Sprintf("%s/32", ip4.String()), egress, ingress); err != nil {
return fmt.Errorf("Failed to add pod to shaper: %v", err) newPodPortMapping := constructPodPortMapping(pod, ip4)
if err := plugin.hostportSyncer.OpenPodHostportsAndSync(newPodPortMapping, BridgeName, activePodPortMapping); err != nil {
return err
}
} else {
portMappings, err := plugin.host.GetPodPortMappings(id.ID)
if err != nil {
return err
}
if portMappings != nil && len(portMappings) > 0 {
if err := plugin.hostportManager.Add(id.ID, &hostport.PodPortMapping{
Namespace: namespace,
Name: name,
PortMappings: portMappings,
IP: ip4,
HostNetwork: false,
}, BridgeName); err != nil {
return err
}
} }
} }
// Open any hostports the pod's containers want
activePodPortMapping, err := plugin.getPodPortMapping()
if err != nil {
return err
}
newPodPortMapping := constructPodPortMapping(pod, ip4)
if err := plugin.hostportSyncer.OpenPodHostportsAndSync(newPodPortMapping, BridgeName, activePodPortMapping); err != nil {
return err
}
return nil return nil
} }
@ -467,18 +485,29 @@ func (plugin *kubenetNetworkPlugin) teardown(namespace string, name string, id k
// The host can choose to not support "legacy" features. The remote // The host can choose to not support "legacy" features. The remote
// shim doesn't support it (#35457), but the kubelet does. // shim doesn't support it (#35457), but the kubelet does.
if !plugin.host.SupportsLegacyFeatures() { if plugin.host.SupportsLegacyFeatures() {
return utilerrors.NewAggregate(errList) activePodPortMapping, err := plugin.getPodPortMapping()
if err == nil {
err = plugin.hostportSyncer.SyncHostports(BridgeName, activePodPortMapping)
}
if err != nil {
errList = append(errList, err)
}
} else {
portMappings, err := plugin.host.GetPodPortMappings(id.ID)
if err != nil {
errList = append(errList, err)
} else if portMappings != nil && len(portMappings) > 0 {
if err = plugin.hostportManager.Remove(id.ID, &hostport.PodPortMapping{
Namespace: namespace,
Name: name,
PortMappings: portMappings,
HostNetwork: false,
}); err != nil {
errList = append(errList, err)
}
}
} }
activePodPortMapping, err := plugin.getPodPortMapping()
if err == nil {
err = plugin.hostportSyncer.SyncHostports(BridgeName, activePodPortMapping)
}
if err != nil {
errList = append(errList, err)
}
return utilerrors.NewAggregate(errList) return utilerrors.NewAggregate(errList)
} }

View File

@ -30,6 +30,7 @@ import (
"k8s.io/kubernetes/pkg/apis/componentconfig" "k8s.io/kubernetes/pkg/apis/componentconfig"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset" "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/network/hostport"
utilexec "k8s.io/kubernetes/pkg/util/exec" utilexec "k8s.io/kubernetes/pkg/util/exec"
utilsysctl "k8s.io/kubernetes/pkg/util/sysctl" utilsysctl "k8s.io/kubernetes/pkg/util/sysctl"
) )
@ -111,9 +112,9 @@ type LegacyHost interface {
// Only used for hostport management // Only used for hostport management
GetRuntime() kubecontainer.Runtime GetRuntime() kubecontainer.Runtime
// SupportsLegacyFeaturs returns true if this host can support hostports // SupportsLegacyFeatures returns true if the network host support GetPodByName, KubeClient interface and kubelet
// and bandwidth shaping. Both will either get added to CNI or dropped, // runtime interface. These interfaces will no longer be implemented by CRI shims.
// so differnt implementations can choose to ignore them. // This function helps network plugins to choose their behavior based on runtime.
SupportsLegacyFeatures() bool SupportsLegacyFeatures() bool
} }
@ -121,17 +122,19 @@ type LegacyHost interface {
// TODO(#35457): get rid of this backchannel to the kubelet. The scope of // TODO(#35457): get rid of this backchannel to the kubelet. The scope of
// the back channel is restricted to host-ports/testing, and restricted // the back channel is restricted to host-ports/testing, and restricted
// to kubenet. No other network plugin wrapper needs it. Other plugins // to kubenet. No other network plugin wrapper needs it. Other plugins
// only require a way to access namespace information, which they can do // only require a way to access namespace information and port mapping
// directly through the embedded NamespaceGetter. // information , which they can do directly through the embedded interfaces.
type Host interface { type Host interface {
// NamespaceGetter is a getter for sandbox namespace information. // NamespaceGetter is a getter for sandbox namespace information.
// It's the only part of this interface that isn't currently deprecated.
NamespaceGetter NamespaceGetter
// PortMappingGetter is a getter for sandbox port mapping information.
PortMappingGetter
// LegacyHost contains methods that trap back into the Kubelet. Dependence // LegacyHost contains methods that trap back into the Kubelet. Dependence
// *do not* add more dependencies in this interface. In a post-cri world, // *do not* add more dependencies in this interface. In a post-cri world,
// network plugins will be invoked by the runtime shim, and should only // network plugins will be invoked by the runtime shim, and should only
// require NamespaceGetter. // require GetNetNS and GetPodPortMappings.
LegacyHost LegacyHost
} }
@ -143,6 +146,14 @@ type NamespaceGetter interface {
GetNetNS(containerID string) (string, error) GetNetNS(containerID string) (string, error)
} }
// PortMappingGetter is an interface to retrieve port mapping information for a given
// sandboxID. Typically implemented by runtime shims that are closely coupled to
// CNI plugin wrappers like kubenet.
type PortMappingGetter interface {
// GetPodPortMappings returns sandbox port mappings information.
GetPodPortMappings(containerID string) ([]*hostport.PortMapping, error)
}
// InitNetworkPlugin inits the plugin that matches networkPluginName. Plugins must have unique names. // InitNetworkPlugin inits the plugin that matches networkPluginName. Plugins must have unique names.
func InitNetworkPlugin(plugins []NetworkPlugin, networkPluginName string, host Host, hairpinMode componentconfig.HairpinMode, nonMasqueradeCIDR string, mtu int) (NetworkPlugin, error) { func InitNetworkPlugin(plugins []NetworkPlugin, networkPluginName string, host Host, hairpinMode componentconfig.HairpinMode, nonMasqueradeCIDR string, mtu int) (NetworkPlugin, error) {
if networkPluginName == "" { if networkPluginName == "" {
@ -276,3 +287,9 @@ func GetPodIP(execer utilexec.Interface, nsenterPath, netnsPath, interfaceName s
return ip, nil return ip, nil
} }
type NoopPortMappingGetter struct{}
func (*NoopPortMappingGetter) GetPodPortMappings(containerID string) ([]*hostport.PortMapping, error) {
return nil, nil
}

View File

@ -16,6 +16,7 @@ go_library(
"//pkg/client/clientset_generated/clientset:go_default_library", "//pkg/client/clientset_generated/clientset:go_default_library",
"//pkg/kubelet/container:go_default_library", "//pkg/kubelet/container:go_default_library",
"//pkg/kubelet/container/testing:go_default_library", "//pkg/kubelet/container/testing:go_default_library",
"//pkg/kubelet/network/hostport:go_default_library",
], ],
) )

View File

@ -24,10 +24,12 @@ import (
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset" "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
containertest "k8s.io/kubernetes/pkg/kubelet/container/testing" containertest "k8s.io/kubernetes/pkg/kubelet/container/testing"
"k8s.io/kubernetes/pkg/kubelet/network/hostport"
) )
type fakeNetworkHost struct { type fakeNetworkHost struct {
fakeNamespaceGetter fakeNamespaceGetter
FakePortMappingGetter
kubeClient clientset.Interface kubeClient clientset.Interface
Legacy bool Legacy bool
Runtime *containertest.FakeRuntime Runtime *containertest.FakeRuntime
@ -61,3 +63,11 @@ type fakeNamespaceGetter struct {
func (nh *fakeNamespaceGetter) GetNetNS(containerID string) (string, error) { func (nh *fakeNamespaceGetter) GetNetNS(containerID string) (string, error) {
return nh.ns, nil return nh.ns, nil
} }
type FakePortMappingGetter struct {
mem map[string][]*hostport.PortMapping
}
func (pm *FakePortMappingGetter) GetPodPortMappings(containerID string) ([]*hostport.PortMapping, error) {
return pm.mem[containerID], nil
}

View File

@ -20,6 +20,7 @@ import (
"k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset" "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/network"
) )
// This just exports required functions from kubelet proper, for use by network // This just exports required functions from kubelet proper, for use by network
@ -54,6 +55,8 @@ func (nh *networkHost) SupportsLegacyFeatures() bool {
// methods, because networkHost is slated for deletion. // methods, because networkHost is slated for deletion.
type criNetworkHost struct { type criNetworkHost struct {
*networkHost *networkHost
// criNetworkHost currently support legacy features. Hence no need to support PortMappingGetter
*network.NoopPortMappingGetter
} }
// GetNetNS returns the network namespace of the given containerID. // GetNetNS returns the network namespace of the given containerID.