Merge pull request #26934 from dcbw/split-hostport

Automatic merge from submit-queue

kubelet/kubenet: split hostport handling into separate module

This pulls the hostport functionality of kubenet out into a separate module so that it can be more easily tested and potentially used from other code (maybe CNI, maybe downstream consumers like OpenShift, etc).  Couldn't find a mock iptables so I wrote one, but I didn't look very hard.

@freehan @thockin @bprashanth
This commit is contained in:
k8s-merge-robot 2016-06-18 15:24:57 -07:00 committed by GitHub
commit d80b60ef7c
6 changed files with 1119 additions and 392 deletions

View File

@ -0,0 +1,335 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package hostport
import (
"bytes"
"fmt"
"net"
"strings"
utiliptables "k8s.io/kubernetes/pkg/util/iptables"
)
type fakeChain struct {
name utiliptables.Chain
rules []string
}
type fakeTable struct {
name utiliptables.Table
chains map[string]*fakeChain
}
type fakeIptables struct {
tables map[string]*fakeTable
}
func NewFakeIptables() *fakeIptables {
return &fakeIptables{
tables: make(map[string]*fakeTable, 0),
}
}
func (f *fakeIptables) GetVersion() (string, error) {
return "1.4.21", nil
}
func (f *fakeIptables) getTable(tableName utiliptables.Table) (*fakeTable, error) {
table, ok := f.tables[string(tableName)]
if !ok {
return nil, fmt.Errorf("Table %s does not exist", tableName)
}
return table, nil
}
func (f *fakeIptables) getChain(tableName utiliptables.Table, chainName utiliptables.Chain) (*fakeTable, *fakeChain, error) {
table, err := f.getTable(tableName)
if err != nil {
return nil, nil, err
}
chain, ok := table.chains[string(chainName)]
if !ok {
return table, nil, fmt.Errorf("Chain %s/%s does not exist", tableName, chainName)
}
return table, chain, nil
}
func (f *fakeIptables) ensureChain(tableName utiliptables.Table, chainName utiliptables.Chain) (bool, *fakeChain) {
table, chain, err := f.getChain(tableName, chainName)
if err != nil {
// either table or table+chain don't exist yet
if table == nil {
table = &fakeTable{
name: tableName,
chains: make(map[string]*fakeChain),
}
f.tables[string(tableName)] = table
}
chain := &fakeChain{
name: chainName,
rules: make([]string, 0),
}
table.chains[string(chainName)] = chain
return false, chain
}
return true, chain
}
func (f *fakeIptables) EnsureChain(tableName utiliptables.Table, chainName utiliptables.Chain) (bool, error) {
existed, _ := f.ensureChain(tableName, chainName)
return existed, nil
}
func (f *fakeIptables) FlushChain(tableName utiliptables.Table, chainName utiliptables.Chain) error {
_, chain, err := f.getChain(tableName, chainName)
if err != nil {
return err
}
chain.rules = make([]string, 0)
return nil
}
func (f *fakeIptables) DeleteChain(tableName utiliptables.Table, chainName utiliptables.Chain) error {
table, _, err := f.getChain(tableName, chainName)
if err != nil {
return err
}
delete(table.chains, string(chainName))
return nil
}
// Returns index of rule in array; < 0 if rule is not found
func findRule(chain *fakeChain, rule string) int {
for i, candidate := range chain.rules {
if rule == candidate {
return i
}
}
return -1
}
func (f *fakeIptables) ensureRule(position utiliptables.RulePosition, tableName utiliptables.Table, chainName utiliptables.Chain, rule string) (bool, error) {
_, chain, err := f.getChain(tableName, chainName)
if err != nil {
_, chain = f.ensureChain(tableName, chainName)
}
rule, err = normalizeRule(rule)
if err != nil {
return false, err
}
ruleIdx := findRule(chain, rule)
if ruleIdx >= 0 {
return true, nil
}
if position == utiliptables.Prepend {
chain.rules = append([]string{rule}, chain.rules...)
} else if position == utiliptables.Append {
chain.rules = append(chain.rules, rule)
} else {
return false, fmt.Errorf("Unknown position argument %q", position)
}
return false, nil
}
func normalizeRule(rule string) (string, error) {
normalized := ""
remaining := strings.TrimSpace(rule)
for {
var end int
if strings.HasPrefix(remaining, "--to-destination=") {
remaining = strings.Replace(remaining, "=", " ", 1)
}
if remaining[0] == '"' {
end = strings.Index(remaining[1:], "\"")
if end < 0 {
return "", fmt.Errorf("Invalid rule syntax: mismatched quotes")
}
end += 2
} else {
end = strings.Index(remaining, " ")
if end < 0 {
end = len(remaining)
}
}
arg := remaining[:end]
// Normalize un-prefixed IP addresses like iptables does
if net.ParseIP(arg) != nil {
arg = arg + "/32"
}
if len(normalized) > 0 {
normalized += " "
}
normalized += strings.TrimSpace(arg)
if len(remaining) == end {
break
}
remaining = remaining[end+1:]
}
return normalized, nil
}
func (f *fakeIptables) EnsureRule(position utiliptables.RulePosition, tableName utiliptables.Table, chainName utiliptables.Chain, args ...string) (bool, error) {
ruleArgs := make([]string, 0)
for _, arg := range args {
// quote args with internal spaces (like comments)
if strings.Index(arg, " ") >= 0 {
arg = fmt.Sprintf("\"%s\"", arg)
}
ruleArgs = append(ruleArgs, arg)
}
return f.ensureRule(position, tableName, chainName, strings.Join(ruleArgs, " "))
}
func (f *fakeIptables) DeleteRule(tableName utiliptables.Table, chainName utiliptables.Chain, args ...string) error {
_, chain, err := f.getChain(tableName, chainName)
if err == nil {
rule := strings.Join(args, " ")
ruleIdx := findRule(chain, rule)
if ruleIdx < 0 {
return nil
}
chain.rules = append(chain.rules[:ruleIdx], chain.rules[ruleIdx+1:]...)
}
return nil
}
func (f *fakeIptables) IsIpv6() bool {
return false
}
func saveChain(chain *fakeChain, data *bytes.Buffer) {
for _, rule := range chain.rules {
data.WriteString(fmt.Sprintf("-A %s %s\n", chain.name, rule))
}
}
func (f *fakeIptables) Save(tableName utiliptables.Table) ([]byte, error) {
table, err := f.getTable(tableName)
if err != nil {
return nil, err
}
data := bytes.NewBuffer(nil)
data.WriteString(fmt.Sprintf("*%s\n", table.name))
rules := bytes.NewBuffer(nil)
for _, chain := range table.chains {
data.WriteString(fmt.Sprintf(":%s - [0:0]\n", string(chain.name)))
saveChain(chain, rules)
}
data.Write(rules.Bytes())
data.WriteString("COMMIT\n")
return data.Bytes(), nil
}
func (f *fakeIptables) SaveAll() ([]byte, error) {
data := bytes.NewBuffer(nil)
for _, table := range f.tables {
tableData, err := f.Save(table.name)
if err != nil {
return nil, err
}
if _, err = data.Write(tableData); err != nil {
return nil, err
}
}
return data.Bytes(), nil
}
func (f *fakeIptables) restore(restoreTableName utiliptables.Table, data []byte, flush utiliptables.FlushFlag) error {
buf := bytes.NewBuffer(data)
var tableName utiliptables.Table
for {
line, err := buf.ReadString('\n')
if err != nil {
break
}
if line[0] == '#' {
continue
}
line = strings.TrimSuffix(line, "\n")
if strings.HasPrefix(line, "*") {
tableName = utiliptables.Table(line[1:])
}
if tableName != "" {
if restoreTableName != "" && restoreTableName != tableName {
continue
}
if strings.HasPrefix(line, ":") {
chainName := utiliptables.Chain(strings.Split(line[1:], " ")[0])
if flush == utiliptables.FlushTables {
table, chain, _ := f.getChain(tableName, chainName)
if chain != nil {
delete(table.chains, string(chainName))
}
}
_, _ = f.ensureChain(tableName, chainName)
} else if strings.HasPrefix(line, "-A") {
parts := strings.Split(line, " ")
if len(parts) < 3 {
return fmt.Errorf("Invalid iptables rule '%s'", line)
}
chainName := utiliptables.Chain(parts[1])
rule := strings.TrimPrefix(line, fmt.Sprintf("-A %s ", chainName))
_, err := f.ensureRule(utiliptables.Append, tableName, chainName, rule)
if err != nil {
return err
}
} else if strings.HasPrefix(line, "-X") {
parts := strings.Split(line, " ")
if len(parts) < 3 {
return fmt.Errorf("Invalid iptables rule '%s'", line)
}
if err := f.DeleteChain(tableName, utiliptables.Chain(parts[1])); err != nil {
return err
}
} else if line == "COMMIT" {
if restoreTableName == tableName {
return nil
}
tableName = ""
}
}
}
return nil
}
func (f *fakeIptables) Restore(tableName utiliptables.Table, data []byte, flush utiliptables.FlushFlag, counters utiliptables.RestoreCountersFlag) error {
return f.restore(tableName, data, flush)
}
func (f *fakeIptables) RestoreAll(data []byte, flush utiliptables.FlushFlag, counters utiliptables.RestoreCountersFlag) error {
return f.restore("", data, flush)
}
func (f *fakeIptables) AddReloadFunc(reloadFunc func()) {
}
func (f *fakeIptables) Destroy() {
}

View File

@ -0,0 +1,379 @@
/*
Copyright 2014 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package hostport
import (
"bytes"
"crypto/sha256"
"encoding/base32"
"fmt"
"net"
"strings"
"time"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
iptablesproxy "k8s.io/kubernetes/pkg/proxy/iptables"
utildbus "k8s.io/kubernetes/pkg/util/dbus"
utilexec "k8s.io/kubernetes/pkg/util/exec"
utiliptables "k8s.io/kubernetes/pkg/util/iptables"
)
const (
// the hostport chain
kubeHostportsChain utiliptables.Chain = "KUBE-HOSTPORTS"
// prefix for hostport chains
kubeHostportChainPrefix string = "KUBE-HP-"
)
type HostportHandler interface {
OpenPodHostportsAndSync(newPod *api.Pod, natInterfaceName string, runningPods []*RunningPod) error
SyncHostports(natInterfaceName string, runningPods []*RunningPod) error
}
type RunningPod struct {
Pod *api.Pod
IP net.IP
}
type hostportOpener func(*hostport) (closeable, error)
type handler struct {
hostPortMap map[hostport]closeable
iptables utiliptables.Interface
portOpener hostportOpener
}
func NewHostportHandler() HostportHandler {
iptInterface := utiliptables.New(utilexec.New(), utildbus.New(), utiliptables.ProtocolIpv4)
return &handler{
hostPortMap: make(map[hostport]closeable),
iptables: iptInterface,
portOpener: openLocalPort,
}
}
type closeable interface {
Close() error
}
type hostport struct {
port int32
protocol string
}
type targetPod struct {
podFullName string
podIP string
}
func (hp *hostport) String() string {
return fmt.Sprintf("%s:%d", hp.protocol, hp.port)
}
//openPodHostports opens all hostport for pod and returns the map of hostport and socket
func (h *handler) openHostports(pod *api.Pod) error {
var retErr error
ports := make(map[hostport]closeable)
for _, container := range pod.Spec.Containers {
for _, port := range container.Ports {
if port.HostPort <= 0 {
// Ignore
continue
}
hp := hostport{
port: port.HostPort,
protocol: strings.ToLower(string(port.Protocol)),
}
socket, err := h.portOpener(&hp)
if err != nil {
retErr = fmt.Errorf("Cannot open hostport %d for pod %s: %v", port.HostPort, kubecontainer.GetPodFullName(pod), err)
break
}
ports[hp] = socket
}
if retErr != nil {
break
}
}
// If encounter any error, close all hostports that just got opened.
if retErr != nil {
for hp, socket := range ports {
if err := socket.Close(); err != nil {
glog.Errorf("Cannot clean up hostport %d for pod %s: %v", hp.port, kubecontainer.GetPodFullName(pod), err)
}
}
return retErr
}
for hostPort, socket := range ports {
h.hostPortMap[hostPort] = socket
}
return nil
}
// gatherAllHostports returns all hostports that should be presented on node,
// given the list of pods running on that node and ignoring host network
// pods (which don't need hostport <-> container port mapping).
func gatherAllHostports(runningPods []*RunningPod) (map[api.ContainerPort]targetPod, error) {
podHostportMap := make(map[api.ContainerPort]targetPod)
for _, r := range runningPods {
if r.IP.To4() == nil {
return nil, fmt.Errorf("Invalid or missing pod %s IP", kubecontainer.GetPodFullName(r.Pod))
}
// should not handle hostports for hostnetwork pods
if r.Pod.Spec.SecurityContext != nil && r.Pod.Spec.SecurityContext.HostNetwork {
continue
}
for _, container := range r.Pod.Spec.Containers {
for _, port := range container.Ports {
if port.HostPort != 0 {
podHostportMap[port] = targetPod{podFullName: kubecontainer.GetPodFullName(r.Pod), podIP: r.IP.String()}
}
}
}
}
return podHostportMap, nil
}
// Join all words with spaces, terminate with newline and write to buf.
func writeLine(buf *bytes.Buffer, words ...string) {
buf.WriteString(strings.Join(words, " ") + "\n")
}
//hostportChainName takes containerPort for a pod and returns associated iptables chain.
// This is computed by hashing (sha256)
// then encoding to base32 and truncating with the prefix "KUBE-SVC-". We do
// this because Iptables Chain Names must be <= 28 chars long, and the longer
// they are the harder they are to read.
func hostportChainName(cp api.ContainerPort, podFullName string) utiliptables.Chain {
hash := sha256.Sum256([]byte(string(cp.HostPort) + string(cp.Protocol) + podFullName))
encoded := base32.StdEncoding.EncodeToString(hash[:])
return utiliptables.Chain(kubeHostportChainPrefix + encoded[:16])
}
// OpenPodHostportsAndSync opens hostports for a new pod, gathers all hostports on
// node, sets up iptables rules enable them. And finally clean up stale hostports
func (h *handler) OpenPodHostportsAndSync(newPod *api.Pod, natInterfaceName string, runningPods []*RunningPod) error {
// try to open pod host port if specified
if err := h.openHostports(newPod); err != nil {
return err
}
return h.SyncHostports(natInterfaceName, runningPods)
}
// SyncHostports gathers all hostports on node and setup iptables rules enable them. And finally clean up stale hostports
func (h *handler) SyncHostports(natInterfaceName string, runningPods []*RunningPod) error {
start := time.Now()
defer func() {
glog.V(4).Infof("syncHostportsRules took %v", time.Since(start))
}()
containerPortMap, err := gatherAllHostports(runningPods)
if err != nil {
return err
}
glog.V(4).Info("Ensuring kubelet hostport chains")
// Ensure kubeHostportChain
if _, err := h.iptables.EnsureChain(utiliptables.TableNAT, kubeHostportsChain); err != nil {
return fmt.Errorf("Failed to ensure that %s chain %s exists: %v", utiliptables.TableNAT, kubeHostportsChain, err)
}
tableChainsNeedJumpServices := []struct {
table utiliptables.Table
chain utiliptables.Chain
}{
{utiliptables.TableNAT, utiliptables.ChainOutput},
{utiliptables.TableNAT, utiliptables.ChainPrerouting},
}
args := []string{"-m", "comment", "--comment", "kube hostport portals",
"-m", "addrtype", "--dst-type", "LOCAL",
"-j", string(kubeHostportsChain)}
for _, tc := range tableChainsNeedJumpServices {
if _, err := h.iptables.EnsureRule(utiliptables.Prepend, tc.table, tc.chain, args...); err != nil {
return fmt.Errorf("Failed to ensure that %s chain %s jumps to %s: %v", tc.table, tc.chain, kubeHostportsChain, err)
}
}
// Need to SNAT traffic from localhost
args = []string{"-m", "comment", "--comment", "SNAT for localhost access to hostports", "-o", natInterfaceName, "-s", "127.0.0.0/8", "-j", "MASQUERADE"}
if _, err := h.iptables.EnsureRule(utiliptables.Append, utiliptables.TableNAT, utiliptables.ChainPostrouting, args...); err != nil {
return fmt.Errorf("Failed to ensure that %s chain %s jumps to MASQUERADE: %v", utiliptables.TableNAT, utiliptables.ChainPostrouting, err)
}
// Get iptables-save output so we can check for existing chains and rules.
// This will be a map of chain name to chain with rules as stored in iptables-save/iptables-restore
existingNATChains := make(map[utiliptables.Chain]string)
iptablesSaveRaw, err := h.iptables.Save(utiliptables.TableNAT)
if err != nil { // if we failed to get any rules
glog.Errorf("Failed to execute iptables-save, syncing all rules: %v", err)
} else { // otherwise parse the output
existingNATChains = utiliptables.GetChainLines(utiliptables.TableNAT, iptablesSaveRaw)
}
natChains := bytes.NewBuffer(nil)
natRules := bytes.NewBuffer(nil)
writeLine(natChains, "*nat")
// Make sure we keep stats for the top-level chains, if they existed
// (which most should have because we created them above).
if chain, ok := existingNATChains[kubeHostportsChain]; ok {
writeLine(natChains, chain)
} else {
writeLine(natChains, utiliptables.MakeChainLine(kubeHostportsChain))
}
// Assuming the node is running kube-proxy in iptables mode
// Reusing kube-proxy's KubeMarkMasqChain for SNAT
// TODO: let kubelet manage KubeMarkMasqChain. Other components should just be able to use it
if chain, ok := existingNATChains[iptablesproxy.KubeMarkMasqChain]; ok {
writeLine(natChains, chain)
} else {
writeLine(natChains, utiliptables.MakeChainLine(iptablesproxy.KubeMarkMasqChain))
}
// Accumulate NAT chains to keep.
activeNATChains := map[utiliptables.Chain]bool{} // use a map as a set
for containerPort, target := range containerPortMap {
protocol := strings.ToLower(string(containerPort.Protocol))
hostportChain := hostportChainName(containerPort, target.podFullName)
if chain, ok := existingNATChains[hostportChain]; ok {
writeLine(natChains, chain)
} else {
writeLine(natChains, utiliptables.MakeChainLine(hostportChain))
}
activeNATChains[hostportChain] = true
// Redirect to hostport chain
args := []string{
"-A", string(kubeHostportsChain),
"-m", "comment", "--comment", fmt.Sprintf(`"%s hostport %d"`, target.podFullName, containerPort.HostPort),
"-m", protocol, "-p", protocol,
"--dport", fmt.Sprintf("%d", containerPort.HostPort),
"-j", string(hostportChain),
}
writeLine(natRules, args...)
// If the request comes from the pod that is serving the hostport, then SNAT
args = []string{
"-A", string(hostportChain),
"-m", "comment", "--comment", fmt.Sprintf(`"%s hostport %d"`, target.podFullName, containerPort.HostPort),
"-s", target.podIP, "-j", string(iptablesproxy.KubeMarkMasqChain),
}
writeLine(natRules, args...)
// Create hostport chain to DNAT traffic to final destination
// Iptables will maintained the stats for this chain
args = []string{
"-A", string(hostportChain),
"-m", "comment", "--comment", fmt.Sprintf(`"%s hostport %d"`, target.podFullName, containerPort.HostPort),
"-m", protocol, "-p", protocol,
"-j", "DNAT", fmt.Sprintf("--to-destination=%s:%d", target.podIP, containerPort.ContainerPort),
}
writeLine(natRules, args...)
}
// Delete chains no longer in use.
for chain := range existingNATChains {
if !activeNATChains[chain] {
chainString := string(chain)
if !strings.HasPrefix(chainString, kubeHostportChainPrefix) {
// Ignore chains that aren't ours.
continue
}
// We must (as per iptables) write a chain-line for it, which has
// the nice effect of flushing the chain. Then we can remove the
// chain.
writeLine(natChains, existingNATChains[chain])
writeLine(natRules, "-X", chainString)
}
}
writeLine(natRules, "COMMIT")
natLines := append(natChains.Bytes(), natRules.Bytes()...)
glog.V(3).Infof("Restoring iptables rules: %s", natLines)
err = h.iptables.RestoreAll(natLines, utiliptables.NoFlushTables, utiliptables.RestoreCounters)
if err != nil {
return fmt.Errorf("Failed to execute iptables-restore: %v", err)
}
h.cleanupHostportMap(containerPortMap)
return nil
}
func openLocalPort(hp *hostport) (closeable, error) {
// For ports on node IPs, open the actual port and hold it, even though we
// use iptables to redirect traffic.
// This ensures a) that it's safe to use that port and b) that (a) stays
// true. The risk is that some process on the node (e.g. sshd or kubelet)
// is using a port and we give that same port out to a Service. That would
// be bad because iptables would silently claim the traffic but the process
// would never know.
// NOTE: We should not need to have a real listen()ing socket - bind()
// should be enough, but I can't figure out a way to e2e test without
// it. Tools like 'ss' and 'netstat' do not show sockets that are
// bind()ed but not listen()ed, and at least the default debian netcat
// has no way to avoid about 10 seconds of retries.
var socket closeable
switch hp.protocol {
case "tcp":
listener, err := net.Listen("tcp", fmt.Sprintf(":%d", hp.port))
if err != nil {
return nil, err
}
socket = listener
case "udp":
addr, err := net.ResolveUDPAddr("udp", fmt.Sprintf(":%d", hp.port))
if err != nil {
return nil, err
}
conn, err := net.ListenUDP("udp", addr)
if err != nil {
return nil, err
}
socket = conn
default:
return nil, fmt.Errorf("unknown protocol %q", hp.protocol)
}
glog.V(3).Infof("Opened local port %s", hp.String())
return socket, nil
}
// cleanupHostportMap closes obsolete hostports
func (h *handler) cleanupHostportMap(containerPortMap map[api.ContainerPort]targetPod) {
// compute hostports that are supposed to be open
currentHostports := make(map[hostport]bool)
for containerPort := range containerPortMap {
hp := hostport{
port: containerPort.HostPort,
protocol: string(containerPort.Protocol),
}
currentHostports[hp] = true
}
// close and delete obsolete hostports
for hp, socket := range h.hostPortMap {
if _, ok := currentHostports[hp]; !ok {
socket.Close()
delete(h.hostPortMap, hp)
}
}
}

View File

@ -0,0 +1,232 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package hostport
import (
"fmt"
"net"
"strings"
"testing"
"k8s.io/kubernetes/pkg/api"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
utiliptables "k8s.io/kubernetes/pkg/util/iptables"
)
type fakeSocket struct {
port int32
protocol string
closed bool
}
func (f *fakeSocket) Close() error {
if f.closed {
return fmt.Errorf("Socket %q.%s already closed!", f.port, f.protocol)
}
f.closed = true
return nil
}
func openFakeSocket(hp *hostport) (closeable, error) {
return &fakeSocket{hp.port, hp.protocol, false}, nil
}
type ruleMatch struct {
hostport int
chain string
match string
}
func TestOpenPodHostports(t *testing.T) {
fakeIptables := NewFakeIptables()
h := &handler{
hostPortMap: make(map[hostport]closeable),
iptables: fakeIptables,
portOpener: openFakeSocket,
}
tests := []struct {
pod *api.Pod
ip string
matches []*ruleMatch
}{
// New pod that we are going to add
{
&api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "test-pod",
Namespace: api.NamespaceDefault,
},
Spec: api.PodSpec{
Containers: []api.Container{{
Ports: []api.ContainerPort{{
HostPort: 4567,
ContainerPort: 80,
Protocol: api.ProtocolTCP,
}, {
HostPort: 5678,
ContainerPort: 81,
Protocol: api.ProtocolUDP,
}},
}},
},
},
"10.1.1.2",
[]*ruleMatch{
{
-1,
"KUBE-HOSTPORTS",
"-m comment --comment \"test-pod_default hostport 4567\" -m tcp -p tcp --dport 4567",
},
{
4567,
"",
"-m comment --comment \"test-pod_default hostport 4567\" -s 10.1.1.2/32 -j KUBE-MARK-MASQ",
},
{
4567,
"",
"-m comment --comment \"test-pod_default hostport 4567\" -m tcp -p tcp -j DNAT --to-destination 10.1.1.2:80",
},
{
-1,
"KUBE-HOSTPORTS",
"-m comment --comment \"test-pod_default hostport 5678\" -m udp -p udp --dport 5678",
},
{
5678,
"",
"-m comment --comment \"test-pod_default hostport 5678\" -s 10.1.1.2/32 -j KUBE-MARK-MASQ",
},
{
5678,
"",
"-m comment --comment \"test-pod_default hostport 5678\" -m udp -p udp -j DNAT --to-destination 10.1.1.2:81",
},
},
},
// Already running pod
{
&api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "another-test-pod",
Namespace: api.NamespaceDefault,
},
Spec: api.PodSpec{
Containers: []api.Container{{
Ports: []api.ContainerPort{{
HostPort: 123,
ContainerPort: 654,
Protocol: api.ProtocolTCP,
}},
}},
},
},
"10.1.1.5",
[]*ruleMatch{
{
-1,
"KUBE-HOSTPORTS",
"-m comment --comment \"another-test-pod_default hostport 123\" -m tcp -p tcp --dport 123",
},
{
123,
"",
"-m comment --comment \"another-test-pod_default hostport 123\" -s 10.1.1.5/32 -j KUBE-MARK-MASQ",
},
{
123,
"",
"-m comment --comment \"another-test-pod_default hostport 123\" -m tcp -p tcp -j DNAT --to-destination 10.1.1.5:654",
},
},
},
}
runningPods := make([]*RunningPod, 0)
// Fill in any match rules missing chain names
for _, test := range tests {
for _, match := range test.matches {
if match.hostport >= 0 {
found := false
for _, c := range test.pod.Spec.Containers {
for _, cp := range c.Ports {
if int(cp.HostPort) == match.hostport {
match.chain = string(hostportChainName(cp, kubecontainer.GetPodFullName(test.pod)))
found = true
break
}
}
}
if !found {
t.Fatalf("Failed to find ContainerPort for match %d/'%s'", match.hostport, match.match)
}
}
}
runningPods = append(runningPods, &RunningPod{
Pod: test.pod,
IP: net.ParseIP(test.ip),
})
}
err := h.OpenPodHostportsAndSync(tests[0].pod, "br0", runningPods)
if err != nil {
t.Fatalf("Failed to OpenPodHostportsAndSync: %v", err)
}
// Generic rules
genericRules := []*ruleMatch{
{-1, "POSTROUTING", "-m comment --comment \"SNAT for localhost access to hostports\" -o br0 -s 127.0.0.0/8 -j MASQUERADE"},
{-1, "PREROUTING", "-m comment --comment \"kube hostport portals\" -m addrtype --dst-type LOCAL -j KUBE-HOSTPORTS"},
{-1, "OUTPUT", "-m comment --comment \"kube hostport portals\" -m addrtype --dst-type LOCAL -j KUBE-HOSTPORTS"},
}
for _, rule := range genericRules {
_, chain, err := fakeIptables.getChain(utiliptables.TableNAT, utiliptables.Chain(rule.chain))
if err != nil {
t.Fatalf("Expected NAT chain %s did not exist", rule.chain)
}
if !matchRule(chain, rule.match) {
t.Fatalf("Expected %s chain rule match '%s' not found", rule.chain, rule.match)
}
}
// Pod rules
for _, test := range tests {
for _, match := range test.matches {
// Ensure chain exists
_, chain, err := fakeIptables.getChain(utiliptables.TableNAT, utiliptables.Chain(match.chain))
if err != nil {
t.Fatalf("Expected NAT chain %s did not exist", match.chain)
}
if !matchRule(chain, match.match) {
t.Fatalf("Expected NAT chain %s rule containing '%s' not found", match.chain, match.match)
}
}
}
}
func matchRule(chain *fakeChain, match string) bool {
for _, rule := range chain.rules {
if strings.Contains(rule, match) {
return true
}
}
return false
}

View File

@ -0,0 +1,45 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package testing
import (
"fmt"
"k8s.io/kubernetes/pkg/api"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/network/hostport"
)
type fakeHandler struct{}
func NewFakeHostportHandler() hostport.HostportHandler {
return &fakeHandler{}
}
func (h *fakeHandler) OpenPodHostportsAndSync(newPod *api.Pod, natInterfaceName string, runningPods []*hostport.RunningPod) error {
return h.SyncHostports(natInterfaceName, runningPods)
}
func (h *fakeHandler) SyncHostports(natInterfaceName string, runningPods []*hostport.RunningPod) error {
for _, r := range runningPods {
if r.IP.To4() == nil {
return fmt.Errorf("Invalid or missing pod %s IP", kubecontainer.GetPodFullName(r.Pod))
}
}
return nil
}

View File

@ -19,9 +19,6 @@ limitations under the License.
package kubenet package kubenet
import ( import (
"bytes"
"crypto/sha256"
"encoding/base32"
"fmt" "fmt"
"net" "net"
"strings" "strings"
@ -39,13 +36,15 @@ import (
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/dockertools" "k8s.io/kubernetes/pkg/kubelet/dockertools"
"k8s.io/kubernetes/pkg/kubelet/network" "k8s.io/kubernetes/pkg/kubelet/network"
iptablesproxy "k8s.io/kubernetes/pkg/proxy/iptables"
"k8s.io/kubernetes/pkg/util/bandwidth" "k8s.io/kubernetes/pkg/util/bandwidth"
utildbus "k8s.io/kubernetes/pkg/util/dbus" utildbus "k8s.io/kubernetes/pkg/util/dbus"
utilerrors "k8s.io/kubernetes/pkg/util/errors"
utilexec "k8s.io/kubernetes/pkg/util/exec" utilexec "k8s.io/kubernetes/pkg/util/exec"
utiliptables "k8s.io/kubernetes/pkg/util/iptables" utiliptables "k8s.io/kubernetes/pkg/util/iptables"
utilsets "k8s.io/kubernetes/pkg/util/sets" utilsets "k8s.io/kubernetes/pkg/util/sets"
utilsysctl "k8s.io/kubernetes/pkg/util/sysctl" utilsysctl "k8s.io/kubernetes/pkg/util/sysctl"
"k8s.io/kubernetes/pkg/kubelet/network/hostport"
) )
const ( const (
@ -54,11 +53,6 @@ const (
DefaultCNIDir = "/opt/cni/bin" DefaultCNIDir = "/opt/cni/bin"
sysctlBridgeCallIptables = "net/bridge/bridge-nf-call-iptables" sysctlBridgeCallIptables = "net/bridge/bridge-nf-call-iptables"
// the hostport chain
kubenetHostportsChain utiliptables.Chain = "KUBENET-HOSTPORTS"
// prefix for kubenet hostport chains
kubenetHostportChainPrefix string = "KUBENET-HP-"
) )
type kubenetNetworkPlugin struct { type kubenetNetworkPlugin struct {
@ -75,7 +69,7 @@ type kubenetNetworkPlugin struct {
execer utilexec.Interface execer utilexec.Interface
nsenterPath string nsenterPath string
hairpinMode componentconfig.HairpinMode hairpinMode componentconfig.HairpinMode
hostPortMap map[hostport]closeable hostportHandler hostport.HostportHandler
iptables utiliptables.Interface iptables utiliptables.Interface
// vendorDir is passed by kubelet network-plugin-dir parameter. // vendorDir is passed by kubelet network-plugin-dir parameter.
// kubenet will search for cni binaries in DefaultCNIDir first, then continue to vendorDir. // kubenet will search for cni binaries in DefaultCNIDir first, then continue to vendorDir.
@ -90,11 +84,11 @@ func NewPlugin(networkPluginDir string) network.NetworkPlugin {
iptInterface := utiliptables.New(execer, dbus, protocol) iptInterface := utiliptables.New(execer, dbus, protocol)
return &kubenetNetworkPlugin{ return &kubenetNetworkPlugin{
podIPs: make(map[kubecontainer.ContainerID]string), podIPs: make(map[kubecontainer.ContainerID]string),
hostPortMap: make(map[hostport]closeable),
MTU: 1460, //TODO: don't hardcode this MTU: 1460, //TODO: don't hardcode this
execer: utilexec.New(), execer: utilexec.New(),
iptables: iptInterface, iptables: iptInterface,
vendorDir: networkPluginDir, vendorDir: networkPluginDir,
hostportHandler: hostport.NewHostportHandler(),
nonMasqueradeCIDR: "10.0.0.0/8", nonMasqueradeCIDR: "10.0.0.0/8",
} }
} }
@ -297,38 +291,7 @@ func (plugin *kubenetNetworkPlugin) Capabilities() utilsets.Int {
return utilsets.NewInt(network.NET_PLUGIN_CAPABILITY_SHAPING) return utilsets.NewInt(network.NET_PLUGIN_CAPABILITY_SHAPING)
} }
func (plugin *kubenetNetworkPlugin) SetUpPod(namespace string, name string, id kubecontainer.ContainerID) error { func (plugin *kubenetNetworkPlugin) setup(namespace string, name string, id kubecontainer.ContainerID, pod *api.Pod) error {
plugin.mu.Lock()
defer plugin.mu.Unlock()
start := time.Now()
defer func() {
glog.V(4).Infof("SetUpPod took %v for %s/%s", time.Since(start), namespace, name)
}()
pod, ok := plugin.host.GetPodByName(namespace, name)
if !ok {
return fmt.Errorf("pod %q cannot be found", name)
}
// try to open pod host port if specified
hostportMap, err := plugin.openPodHostports(pod)
if err != nil {
return err
}
if len(hostportMap) > 0 {
// defer to decide whether to keep the host port open based on the result of SetUpPod
defer plugin.syncHostportMap(id, hostportMap)
}
ingress, egress, err := bandwidth.ExtractPodBandwidthResources(pod.Annotations)
if err != nil {
return fmt.Errorf("Error reading pod bandwidth annotations: %v", err)
}
if err := plugin.Status(); err != nil {
return fmt.Errorf("Kubenet cannot SetUpPod: %v", err)
}
// Bring up container loopback interface // Bring up container loopback interface
if _, err := plugin.addContainerToNetwork(plugin.loConfig, "lo", namespace, name, id); err != nil { if _, err := plugin.addContainerToNetwork(plugin.loConfig, "lo", namespace, name, id); err != nil {
return err return err
@ -346,7 +309,6 @@ func (plugin *kubenetNetworkPlugin) SetUpPod(namespace string, name string, id k
if ip4 == nil { if ip4 == nil {
return fmt.Errorf("CNI plugin reported an invalid IPv4 address for container %v: %+v.", id, res.IP4) return fmt.Errorf("CNI plugin reported an invalid IPv4 address for container %v: %+v.", id, res.IP4)
} }
plugin.podIPs[id] = ip4.String()
// Put the container bridge into promiscuous mode to force it to accept hairpin packets. // Put the container bridge into promiscuous mode to force it to accept hairpin packets.
// TODO: Remove this once the kernel bug (#20096) is fixed. // TODO: Remove this once the kernel bug (#20096) is fixed.
@ -365,22 +327,102 @@ func (plugin *kubenetNetworkPlugin) SetUpPod(namespace string, name string, id k
// initialization // initialization
shaper := plugin.shaper() shaper := plugin.shaper()
ingress, egress, err := bandwidth.ExtractPodBandwidthResources(pod.Annotations)
if err != nil {
return fmt.Errorf("Error reading pod bandwidth annotations: %v", err)
}
if egress != nil || ingress != nil { if egress != nil || ingress != nil {
ipAddr := plugin.podIPs[id] if err := shaper.ReconcileCIDR(fmt.Sprintf("%s/32", ip4.String()), egress, ingress); err != nil {
if err := shaper.ReconcileCIDR(fmt.Sprintf("%s/32", ipAddr), egress, ingress); err != nil {
return fmt.Errorf("Failed to add pod to shaper: %v", err) return fmt.Errorf("Failed to add pod to shaper: %v", err)
} }
} }
plugin.syncHostportsRules() plugin.podIPs[id] = ip4.String()
// Open any hostports the pod's containers want
runningPods, err := plugin.getRunningPods()
if err != nil {
return err
}
if err := plugin.hostportHandler.OpenPodHostportsAndSync(pod, BridgeName, runningPods); err != nil {
return err
}
return nil
}
func (plugin *kubenetNetworkPlugin) SetUpPod(namespace string, name string, id kubecontainer.ContainerID) error {
plugin.mu.Lock()
defer plugin.mu.Unlock()
start := time.Now()
defer func() {
glog.V(4).Infof("SetUpPod took %v for %s/%s", time.Since(start), namespace, name)
}()
pod, ok := plugin.host.GetPodByName(namespace, name)
if !ok {
return fmt.Errorf("pod %q cannot be found", name)
}
if err := plugin.Status(); err != nil {
return fmt.Errorf("Kubenet cannot SetUpPod: %v", err)
}
if err := plugin.setup(namespace, name, id, pod); err != nil {
// Make sure everything gets cleaned up on errors
podIP, _ := plugin.podIPs[id]
if err := plugin.teardown(namespace, name, id, podIP); err != nil {
// Not a hard error or warning
glog.V(4).Infof("Failed to clean up %s/%s after SetUpPod failure: %v", namespace, name, err)
}
return err
}
// Need to SNAT outbound traffic from cluster // Need to SNAT outbound traffic from cluster
if err = plugin.ensureMasqRule(); err != nil { if err := plugin.ensureMasqRule(); err != nil {
glog.Errorf("Failed to ensure MASQ rule: %v", err) glog.Errorf("Failed to ensure MASQ rule: %v", err)
} }
return nil return nil
} }
// Tears down as much of a pod's network as it can even if errors occur. Returns
// an aggregate error composed of all errors encountered during the teardown.
func (plugin *kubenetNetworkPlugin) teardown(namespace string, name string, id kubecontainer.ContainerID, podIP string) error {
errList := []error{}
if podIP != "" {
glog.V(5).Infof("Removing pod IP %s from shaper", podIP)
// shaper wants /32
if err := plugin.shaper().Reset(fmt.Sprintf("%s/32", podIP)); err != nil {
// Possible bandwidth shaping wasn't enabled for this pod anyways
glog.V(4).Infof("Failed to remove pod IP %s from shaper: %v", podIP, err)
}
delete(plugin.podIPs, id)
}
if err := plugin.delContainerFromNetwork(plugin.netConfig, network.DefaultInterfaceName, namespace, name, id); err != nil {
// This is to prevent returning error when TearDownPod is called twice on the same pod. This helps to reduce event pollution.
if podIP != "" {
glog.Warningf("Failed to delete container from kubenet: %v", err)
} else {
errList = append(errList, err)
}
}
runningPods, err := plugin.getRunningPods()
if err == nil {
err = plugin.hostportHandler.SyncHostports(BridgeName, runningPods)
}
if err != nil {
errList = append(errList, err)
}
return utilerrors.NewAggregate(errList)
}
func (plugin *kubenetNetworkPlugin) TearDownPod(namespace string, name string, id kubecontainer.ContainerID) error { func (plugin *kubenetNetworkPlugin) TearDownPod(namespace string, name string, id kubecontainer.ContainerID) error {
plugin.mu.Lock() plugin.mu.Lock()
defer plugin.mu.Unlock() defer plugin.mu.Unlock()
@ -395,31 +437,16 @@ func (plugin *kubenetNetworkPlugin) TearDownPod(namespace string, name string, i
} }
// no cached IP is Ok during teardown // no cached IP is Ok during teardown
podIP, hasIP := plugin.podIPs[id] podIP, _ := plugin.podIPs[id]
if hasIP { if err := plugin.teardown(namespace, name, id, podIP); err != nil {
glog.V(5).Infof("Removing pod IP %s from shaper", podIP)
// shaper wants /32
if err := plugin.shaper().Reset(fmt.Sprintf("%s/32", podIP)); err != nil {
// Possible bandwidth shaping wasn't enabled for this pod anyways
glog.V(4).Infof("Failed to remove pod IP %s from shaper: %v", podIP, err)
}
}
if err := plugin.delContainerFromNetwork(plugin.netConfig, network.DefaultInterfaceName, namespace, name, id); err != nil {
// This is to prevent returning error when TearDownPod is called twice on the same pod. This helps to reduce event pollution.
if !hasIP {
glog.Warningf("Failed to delete container from kubenet: %v", err)
return nil
}
return err return err
} }
delete(plugin.podIPs, id)
plugin.syncHostportsRules()
// Need to SNAT outbound traffic from cluster // Need to SNAT outbound traffic from cluster
if err := plugin.ensureMasqRule(); err != nil { if err := plugin.ensureMasqRule(); err != nil {
glog.Errorf("Failed to ensure MASQ rule: %v", err) glog.Errorf("Failed to ensure MASQ rule: %v", err)
} }
return nil return nil
} }
@ -467,6 +494,39 @@ func (plugin *kubenetNetworkPlugin) Status() error {
return nil return nil
} }
// Returns a list of pods running on this node and each pod's IP address. Assumes
// PodSpecs retrieved from the runtime include the name and ID of containers in
// each pod.
func (plugin *kubenetNetworkPlugin) getRunningPods() ([]*hostport.RunningPod, error) {
pods, err := plugin.host.GetRuntime().GetPods(false)
if err != nil {
return nil, fmt.Errorf("Failed to retrieve pods from runtime: %v", err)
}
runningPods := make([]*hostport.RunningPod, 0)
for _, p := range pods {
for _, c := range p.Containers {
if c.Name != dockertools.PodInfraContainerName {
continue
}
ipString, ok := plugin.podIPs[c.ID]
if !ok {
continue
}
podIP := net.ParseIP(ipString)
if podIP == nil {
continue
}
if pod, ok := plugin.host.GetPodByName(p.Namespace, p.Name); ok {
runningPods = append(runningPods, &hostport.RunningPod{
Pod: pod,
IP: podIP,
})
}
}
}
return runningPods, nil
}
func (plugin *kubenetNetworkPlugin) buildCNIRuntimeConf(ifName string, id kubecontainer.ContainerID) (*libcni.RuntimeConf, error) { func (plugin *kubenetNetworkPlugin) buildCNIRuntimeConf(ifName string, id kubecontainer.ContainerID) (*libcni.RuntimeConf, error) {
netnsPath, err := plugin.host.GetRuntime().GetNetNS(id) netnsPath, err := plugin.host.GetRuntime().GetNetNS(id)
if err != nil { if err != nil {
@ -518,332 +578,6 @@ func (plugin *kubenetNetworkPlugin) getNsenterPath() (string, error) {
return plugin.nsenterPath, nil return plugin.nsenterPath, nil
} }
type closeable interface {
Close() error
}
type hostport struct {
port int32
protocol string
}
type targetPod struct {
podFullName string
podIP string
}
func (hp *hostport) String() string {
return fmt.Sprintf("%s:%d", hp.protocol, hp.port)
}
//openPodHostports opens all hostport for pod and returns the map of hostport and socket
func (plugin *kubenetNetworkPlugin) openPodHostports(pod *api.Pod) (map[hostport]closeable, error) {
var retErr error
hostportMap := make(map[hostport]closeable)
for _, container := range pod.Spec.Containers {
for _, port := range container.Ports {
if port.HostPort <= 0 {
// Ignore
continue
}
hp := hostport{
port: port.HostPort,
protocol: strings.ToLower(string(port.Protocol)),
}
socket, err := openLocalPort(&hp)
if err != nil {
retErr = fmt.Errorf("Cannot open hostport %d for pod %s: %v", port.HostPort, kubecontainer.GetPodFullName(pod), err)
break
}
hostportMap[hp] = socket
}
if retErr != nil {
break
}
}
// If encounter any error, close all hostports that just got opened.
if retErr != nil {
for hp, socket := range hostportMap {
if err := socket.Close(); err != nil {
glog.Errorf("Cannot clean up hostport %d for pod %s: %v", hp.port, kubecontainer.GetPodFullName(pod), err)
}
}
}
return hostportMap, retErr
}
//syncHostportMap syncs newly opened hostports to kubenet on successful pod setup. If pod setup failed, then clean up.
func (plugin *kubenetNetworkPlugin) syncHostportMap(id kubecontainer.ContainerID, hostportMap map[hostport]closeable) {
// if pod ip cannot be retrieved from podCIDR, then assume pod setup failed.
if _, ok := plugin.podIPs[id]; !ok {
for hp, socket := range hostportMap {
err := socket.Close()
if err != nil {
glog.Errorf("Failed to close socket for hostport %v", hp)
}
}
return
}
// add newly opened hostports
for hp, socket := range hostportMap {
plugin.hostPortMap[hp] = socket
}
}
// gatherAllHostports returns all hostports that should be presented on node
func (plugin *kubenetNetworkPlugin) gatherAllHostports() (map[api.ContainerPort]targetPod, error) {
podHostportMap := make(map[api.ContainerPort]targetPod)
pods, err := plugin.host.GetRuntime().GetPods(false)
if err != nil {
return nil, fmt.Errorf("Failed to retrieve pods from runtime: %v", err)
}
for _, p := range pods {
var podInfraContainerId kubecontainer.ContainerID
for _, c := range p.Containers {
if c.Name == dockertools.PodInfraContainerName {
podInfraContainerId = c.ID
break
}
}
// Assuming if kubenet has the pod's ip, the pod is alive and its host port should be presented.
podIP, ok := plugin.podIPs[podInfraContainerId]
if !ok {
// The POD has been delete. Ignore
continue
}
// Need the complete api.Pod object
pod, ok := plugin.host.GetPodByName(p.Namespace, p.Name)
// kubenet should not handle hostports for hostnetwork pods
if ok && !pod.Spec.SecurityContext.HostNetwork {
for _, container := range pod.Spec.Containers {
for _, port := range container.Ports {
if port.HostPort != 0 {
podHostportMap[port] = targetPod{podFullName: kubecontainer.GetPodFullName(pod), podIP: podIP}
}
}
}
}
}
return podHostportMap, nil
}
// Join all words with spaces, terminate with newline and write to buf.
func writeLine(buf *bytes.Buffer, words ...string) {
buf.WriteString(strings.Join(words, " ") + "\n")
}
//hostportChainName takes containerPort for a pod and returns associated iptables chain.
// This is computed by hashing (sha256)
// then encoding to base32 and truncating with the prefix "KUBE-SVC-". We do
// this because Iptables Chain Names must be <= 28 chars long, and the longer
// they are the harder they are to read.
func hostportChainName(cp api.ContainerPort, podFullName string) utiliptables.Chain {
hash := sha256.Sum256([]byte(string(cp.HostPort) + string(cp.Protocol) + podFullName))
encoded := base32.StdEncoding.EncodeToString(hash[:])
return utiliptables.Chain(kubenetHostportChainPrefix + encoded[:16])
}
// syncHostportsRules gathers all hostports on node and setup iptables rules enable them. And finally clean up stale hostports
func (plugin *kubenetNetworkPlugin) syncHostportsRules() {
start := time.Now()
defer func() {
glog.V(4).Infof("syncHostportsRules took %v", time.Since(start))
}()
containerPortMap, err := plugin.gatherAllHostports()
if err != nil {
glog.Errorf("Fail to get hostports: %v", err)
return
}
glog.V(4).Info("Ensuring kubenet hostport chains")
// Ensure kubenetHostportChain
if _, err := plugin.iptables.EnsureChain(utiliptables.TableNAT, kubenetHostportsChain); err != nil {
glog.Errorf("Failed to ensure that %s chain %s exists: %v", utiliptables.TableNAT, kubenetHostportsChain, err)
return
}
tableChainsNeedJumpServices := []struct {
table utiliptables.Table
chain utiliptables.Chain
}{
{utiliptables.TableNAT, utiliptables.ChainOutput},
{utiliptables.TableNAT, utiliptables.ChainPrerouting},
}
args := []string{"-m", "comment", "--comment", "kubenet hostport portals",
"-m", "addrtype", "--dst-type", "LOCAL",
"-j", string(kubenetHostportsChain)}
for _, tc := range tableChainsNeedJumpServices {
if _, err := plugin.iptables.EnsureRule(utiliptables.Prepend, tc.table, tc.chain, args...); err != nil {
glog.Errorf("Failed to ensure that %s chain %s jumps to %s: %v", tc.table, tc.chain, kubenetHostportsChain, err)
return
}
}
// Need to SNAT traffic from localhost
args = []string{"-m", "comment", "--comment", "SNAT for localhost access to hostports", "-o", BridgeName, "-s", "127.0.0.0/8", "-j", "MASQUERADE"}
if _, err := plugin.iptables.EnsureRule(utiliptables.Append, utiliptables.TableNAT, utiliptables.ChainPostrouting, args...); err != nil {
glog.Errorf("Failed to ensure that %s chain %s jumps to MASQUERADE: %v", utiliptables.TableNAT, utiliptables.ChainPostrouting, err)
return
}
// Get iptables-save output so we can check for existing chains and rules.
// This will be a map of chain name to chain with rules as stored in iptables-save/iptables-restore
existingNATChains := make(map[utiliptables.Chain]string)
iptablesSaveRaw, err := plugin.iptables.Save(utiliptables.TableNAT)
if err != nil { // if we failed to get any rules
glog.Errorf("Failed to execute iptables-save, syncing all rules: %v", err)
} else { // otherwise parse the output
existingNATChains = utiliptables.GetChainLines(utiliptables.TableNAT, iptablesSaveRaw)
}
natChains := bytes.NewBuffer(nil)
natRules := bytes.NewBuffer(nil)
writeLine(natChains, "*nat")
// Make sure we keep stats for the top-level chains, if they existed
// (which most should have because we created them above).
if chain, ok := existingNATChains[kubenetHostportsChain]; ok {
writeLine(natChains, chain)
} else {
writeLine(natChains, utiliptables.MakeChainLine(kubenetHostportsChain))
}
// Assuming the node is running kube-proxy in iptables mode
// Reusing kube-proxy's KubeMarkMasqChain for SNAT
// TODO: let kubelet manage KubeMarkMasqChain. Other components should just be able to use it
if chain, ok := existingNATChains[iptablesproxy.KubeMarkMasqChain]; ok {
writeLine(natChains, chain)
} else {
writeLine(natChains, utiliptables.MakeChainLine(iptablesproxy.KubeMarkMasqChain))
}
// Accumulate NAT chains to keep.
activeNATChains := map[utiliptables.Chain]bool{} // use a map as a set
for containerPort, target := range containerPortMap {
protocol := strings.ToLower(string(containerPort.Protocol))
hostportChain := hostportChainName(containerPort, target.podFullName)
if chain, ok := existingNATChains[hostportChain]; ok {
writeLine(natChains, chain)
} else {
writeLine(natChains, utiliptables.MakeChainLine(hostportChain))
}
activeNATChains[hostportChain] = true
// Redirect to hostport chain
args := []string{
"-A", string(kubenetHostportsChain),
"-m", "comment", "--comment", fmt.Sprintf(`"%s hostport %d"`, target.podFullName, containerPort.HostPort),
"-m", protocol, "-p", protocol,
"--dport", fmt.Sprintf("%d", containerPort.HostPort),
"-j", string(hostportChain),
}
writeLine(natRules, args...)
// If the request comes from the pod that is serving the hostport, then SNAT
args = []string{
"-A", string(hostportChain),
"-m", "comment", "--comment", fmt.Sprintf(`"%s hostport %d"`, target.podFullName, containerPort.HostPort),
"-s", target.podIP, "-j", string(iptablesproxy.KubeMarkMasqChain),
}
writeLine(natRules, args...)
// Create hostport chain to DNAT traffic to final destination
// Iptables will maintained the stats for this chain
args = []string{
"-A", string(hostportChain),
"-m", "comment", "--comment", fmt.Sprintf(`"%s hostport %d"`, target.podFullName, containerPort.HostPort),
"-m", protocol, "-p", protocol,
"-j", "DNAT", fmt.Sprintf("--to-destination=%s:%d", target.podIP, containerPort.ContainerPort),
}
writeLine(natRules, args...)
}
// Delete chains no longer in use.
for chain := range existingNATChains {
if !activeNATChains[chain] {
chainString := string(chain)
if !strings.HasPrefix(chainString, kubenetHostportChainPrefix) {
// Ignore chains that aren't ours.
continue
}
// We must (as per iptables) write a chain-line for it, which has
// the nice effect of flushing the chain. Then we can remove the
// chain.
writeLine(natChains, existingNATChains[chain])
writeLine(natRules, "-X", chainString)
}
}
writeLine(natRules, "COMMIT")
natLines := append(natChains.Bytes(), natRules.Bytes()...)
glog.V(3).Infof("Restoring iptables rules: %s", natLines)
err = plugin.iptables.RestoreAll(natLines, utiliptables.NoFlushTables, utiliptables.RestoreCounters)
if err != nil {
glog.Errorf("Failed to execute iptables-restore: %v", err)
return
}
plugin.cleanupHostportMap(containerPortMap)
}
func openLocalPort(hp *hostport) (closeable, error) {
// For ports on node IPs, open the actual port and hold it, even though we
// use iptables to redirect traffic.
// This ensures a) that it's safe to use that port and b) that (a) stays
// true. The risk is that some process on the node (e.g. sshd or kubelet)
// is using a port and we give that same port out to a Service. That would
// be bad because iptables would silently claim the traffic but the process
// would never know.
// NOTE: We should not need to have a real listen()ing socket - bind()
// should be enough, but I can't figure out a way to e2e test without
// it. Tools like 'ss' and 'netstat' do not show sockets that are
// bind()ed but not listen()ed, and at least the default debian netcat
// has no way to avoid about 10 seconds of retries.
var socket closeable
switch hp.protocol {
case "tcp":
listener, err := net.Listen("tcp", fmt.Sprintf(":%d", hp.port))
if err != nil {
return nil, err
}
socket = listener
case "udp":
addr, err := net.ResolveUDPAddr("udp", fmt.Sprintf(":%d", hp.port))
if err != nil {
return nil, err
}
conn, err := net.ListenUDP("udp", addr)
if err != nil {
return nil, err
}
socket = conn
default:
return nil, fmt.Errorf("unknown protocol %q", hp.protocol)
}
glog.V(2).Infof("Opened local port %s", hp.String())
return socket, nil
}
// cleanupHostportMap closes obsolete hostports
func (plugin *kubenetNetworkPlugin) cleanupHostportMap(containerPortMap map[api.ContainerPort]targetPod) {
// compute hostports that are supposed to be open
currentHostports := make(map[hostport]bool)
for containerPort := range containerPortMap {
hp := hostport{
port: containerPort.HostPort,
protocol: string(containerPort.Protocol),
}
currentHostports[hp] = true
}
// close and delete obsolete hostports
for hp, socket := range plugin.hostPortMap {
if _, ok := currentHostports[hp]; !ok {
socket.Close()
delete(plugin.hostPortMap, hp)
}
}
}
// shaper retrieves the bandwidth shaper and, if it hasn't been fetched before, // shaper retrieves the bandwidth shaper and, if it hasn't been fetched before,
// initializes it and ensures the bridge is appropriately configured // initializes it and ensures the bridge is appropriately configured
// This function should only be called while holding the `plugin.mu` lock // This function should only be called while holding the `plugin.mu` lock

View File

@ -27,6 +27,7 @@ import (
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/network" "k8s.io/kubernetes/pkg/kubelet/network"
"k8s.io/kubernetes/pkg/kubelet/network/cni/testing" "k8s.io/kubernetes/pkg/kubelet/network/cni/testing"
hostporttest "k8s.io/kubernetes/pkg/kubelet/network/hostport/testing"
nettest "k8s.io/kubernetes/pkg/kubelet/network/testing" nettest "k8s.io/kubernetes/pkg/kubelet/network/testing"
"k8s.io/kubernetes/pkg/util/bandwidth" "k8s.io/kubernetes/pkg/util/bandwidth"
"k8s.io/kubernetes/pkg/util/exec" "k8s.io/kubernetes/pkg/util/exec"
@ -137,6 +138,7 @@ func TestTeardownCallsShaper(t *testing.T) {
kubenet.cniConfig = mockcni kubenet.cniConfig = mockcni
kubenet.iptables = ipttest.NewFake() kubenet.iptables = ipttest.NewFake()
kubenet.bandwidthShaper = fshaper kubenet.bandwidthShaper = fshaper
kubenet.hostportHandler = hostporttest.NewFakeHostportHandler()
mockcni.On("DelNetwork", mock.AnythingOfType("*libcni.NetworkConfig"), mock.AnythingOfType("*libcni.RuntimeConf")).Return(nil) mockcni.On("DelNetwork", mock.AnythingOfType("*libcni.NetworkConfig"), mock.AnythingOfType("*libcni.RuntimeConf")).Return(nil)