From 965492fdd0a4022820e41e5d5fff06a5fd8191bf Mon Sep 17 00:00:00 2001 From: Dan Williams Date: Mon, 6 Jun 2016 21:45:46 -0500 Subject: [PATCH 1/2] kubelet/kubenet: split hostport handling into separate module --- pkg/kubelet/network/hostport/fake_iptables.go | 335 ++++++++++++++ pkg/kubelet/network/hostport/hostport.go | 379 ++++++++++++++++ pkg/kubelet/network/hostport/hostport_test.go | 232 ++++++++++ pkg/kubelet/network/hostport/testing/fake.go | 45 ++ pkg/kubelet/network/kubenet/kubenet_linux.go | 408 +++--------------- .../network/kubenet/kubenet_linux_test.go | 2 + 6 files changed, 1047 insertions(+), 354 deletions(-) create mode 100644 pkg/kubelet/network/hostport/fake_iptables.go create mode 100644 pkg/kubelet/network/hostport/hostport.go create mode 100644 pkg/kubelet/network/hostport/hostport_test.go create mode 100644 pkg/kubelet/network/hostport/testing/fake.go diff --git a/pkg/kubelet/network/hostport/fake_iptables.go b/pkg/kubelet/network/hostport/fake_iptables.go new file mode 100644 index 00000000000..12c7f8bb618 --- /dev/null +++ b/pkg/kubelet/network/hostport/fake_iptables.go @@ -0,0 +1,335 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package hostport + +import ( + "bytes" + "fmt" + "net" + "strings" + + utiliptables "k8s.io/kubernetes/pkg/util/iptables" +) + +type fakeChain struct { + name utiliptables.Chain + rules []string +} + +type fakeTable struct { + name utiliptables.Table + chains map[string]*fakeChain +} + +type fakeIptables struct { + tables map[string]*fakeTable +} + +func NewFakeIptables() *fakeIptables { + return &fakeIptables{ + tables: make(map[string]*fakeTable, 0), + } +} + +func (f *fakeIptables) GetVersion() (string, error) { + return "1.4.21", nil +} + +func (f *fakeIptables) getTable(tableName utiliptables.Table) (*fakeTable, error) { + table, ok := f.tables[string(tableName)] + if !ok { + return nil, fmt.Errorf("Table %s does not exist", tableName) + } + return table, nil +} + +func (f *fakeIptables) getChain(tableName utiliptables.Table, chainName utiliptables.Chain) (*fakeTable, *fakeChain, error) { + table, err := f.getTable(tableName) + if err != nil { + return nil, nil, err + } + + chain, ok := table.chains[string(chainName)] + if !ok { + return table, nil, fmt.Errorf("Chain %s/%s does not exist", tableName, chainName) + } + + return table, chain, nil +} + +func (f *fakeIptables) ensureChain(tableName utiliptables.Table, chainName utiliptables.Chain) (bool, *fakeChain) { + table, chain, err := f.getChain(tableName, chainName) + if err != nil { + // either table or table+chain don't exist yet + if table == nil { + table = &fakeTable{ + name: tableName, + chains: make(map[string]*fakeChain), + } + f.tables[string(tableName)] = table + } + chain := &fakeChain{ + name: chainName, + rules: make([]string, 0), + } + table.chains[string(chainName)] = chain + return false, chain + } + return true, chain +} + +func (f *fakeIptables) EnsureChain(tableName utiliptables.Table, chainName utiliptables.Chain) (bool, error) { + existed, _ := f.ensureChain(tableName, chainName) + return existed, nil +} + +func (f *fakeIptables) FlushChain(tableName utiliptables.Table, chainName utiliptables.Chain) error { + _, chain, err := f.getChain(tableName, chainName) + if err != nil { + return err + } + chain.rules = make([]string, 0) + return nil +} + +func (f *fakeIptables) DeleteChain(tableName utiliptables.Table, chainName utiliptables.Chain) error { + table, _, err := f.getChain(tableName, chainName) + if err != nil { + return err + } + delete(table.chains, string(chainName)) + return nil +} + +// Returns index of rule in array; < 0 if rule is not found +func findRule(chain *fakeChain, rule string) int { + for i, candidate := range chain.rules { + if rule == candidate { + return i + } + } + return -1 +} + +func (f *fakeIptables) ensureRule(position utiliptables.RulePosition, tableName utiliptables.Table, chainName utiliptables.Chain, rule string) (bool, error) { + _, chain, err := f.getChain(tableName, chainName) + if err != nil { + _, chain = f.ensureChain(tableName, chainName) + } + + rule, err = normalizeRule(rule) + if err != nil { + return false, err + } + ruleIdx := findRule(chain, rule) + if ruleIdx >= 0 { + return true, nil + } + + if position == utiliptables.Prepend { + chain.rules = append([]string{rule}, chain.rules...) + } else if position == utiliptables.Append { + chain.rules = append(chain.rules, rule) + } else { + return false, fmt.Errorf("Unknown position argument %q", position) + } + + return false, nil +} + +func normalizeRule(rule string) (string, error) { + normalized := "" + remaining := strings.TrimSpace(rule) + for { + var end int + + if strings.HasPrefix(remaining, "--to-destination=") { + remaining = strings.Replace(remaining, "=", " ", 1) + } + + if remaining[0] == '"' { + end = strings.Index(remaining[1:], "\"") + if end < 0 { + return "", fmt.Errorf("Invalid rule syntax: mismatched quotes") + } + end += 2 + } else { + end = strings.Index(remaining, " ") + if end < 0 { + end = len(remaining) + } + } + arg := remaining[:end] + + // Normalize un-prefixed IP addresses like iptables does + if net.ParseIP(arg) != nil { + arg = arg + "/32" + } + + if len(normalized) > 0 { + normalized += " " + } + normalized += strings.TrimSpace(arg) + if len(remaining) == end { + break + } + remaining = remaining[end+1:] + } + return normalized, nil +} + +func (f *fakeIptables) EnsureRule(position utiliptables.RulePosition, tableName utiliptables.Table, chainName utiliptables.Chain, args ...string) (bool, error) { + ruleArgs := make([]string, 0) + for _, arg := range args { + // quote args with internal spaces (like comments) + if strings.Index(arg, " ") >= 0 { + arg = fmt.Sprintf("\"%s\"", arg) + } + ruleArgs = append(ruleArgs, arg) + } + return f.ensureRule(position, tableName, chainName, strings.Join(ruleArgs, " ")) +} + +func (f *fakeIptables) DeleteRule(tableName utiliptables.Table, chainName utiliptables.Chain, args ...string) error { + _, chain, err := f.getChain(tableName, chainName) + if err == nil { + rule := strings.Join(args, " ") + ruleIdx := findRule(chain, rule) + if ruleIdx < 0 { + return nil + } + chain.rules = append(chain.rules[:ruleIdx], chain.rules[ruleIdx+1:]...) + } + return nil +} + +func (f *fakeIptables) IsIpv6() bool { + return false +} + +func saveChain(chain *fakeChain, data *bytes.Buffer) { + for _, rule := range chain.rules { + data.WriteString(fmt.Sprintf("-A %s %s\n", chain.name, rule)) + } +} + +func (f *fakeIptables) Save(tableName utiliptables.Table) ([]byte, error) { + table, err := f.getTable(tableName) + if err != nil { + return nil, err + } + + data := bytes.NewBuffer(nil) + data.WriteString(fmt.Sprintf("*%s\n", table.name)) + + rules := bytes.NewBuffer(nil) + for _, chain := range table.chains { + data.WriteString(fmt.Sprintf(":%s - [0:0]\n", string(chain.name))) + saveChain(chain, rules) + } + data.Write(rules.Bytes()) + data.WriteString("COMMIT\n") + return data.Bytes(), nil +} + +func (f *fakeIptables) SaveAll() ([]byte, error) { + data := bytes.NewBuffer(nil) + for _, table := range f.tables { + tableData, err := f.Save(table.name) + if err != nil { + return nil, err + } + if _, err = data.Write(tableData); err != nil { + return nil, err + } + } + return data.Bytes(), nil +} + +func (f *fakeIptables) restore(restoreTableName utiliptables.Table, data []byte, flush utiliptables.FlushFlag) error { + buf := bytes.NewBuffer(data) + var tableName utiliptables.Table + for { + line, err := buf.ReadString('\n') + if err != nil { + break + } + if line[0] == '#' { + continue + } + + line = strings.TrimSuffix(line, "\n") + if strings.HasPrefix(line, "*") { + tableName = utiliptables.Table(line[1:]) + } + if tableName != "" { + if restoreTableName != "" && restoreTableName != tableName { + continue + } + if strings.HasPrefix(line, ":") { + chainName := utiliptables.Chain(strings.Split(line[1:], " ")[0]) + if flush == utiliptables.FlushTables { + table, chain, _ := f.getChain(tableName, chainName) + if chain != nil { + delete(table.chains, string(chainName)) + } + } + _, _ = f.ensureChain(tableName, chainName) + } else if strings.HasPrefix(line, "-A") { + parts := strings.Split(line, " ") + if len(parts) < 3 { + return fmt.Errorf("Invalid iptables rule '%s'", line) + } + chainName := utiliptables.Chain(parts[1]) + rule := strings.TrimPrefix(line, fmt.Sprintf("-A %s ", chainName)) + _, err := f.ensureRule(utiliptables.Append, tableName, chainName, rule) + if err != nil { + return err + } + } else if strings.HasPrefix(line, "-X") { + parts := strings.Split(line, " ") + if len(parts) < 3 { + return fmt.Errorf("Invalid iptables rule '%s'", line) + } + if err := f.DeleteChain(tableName, utiliptables.Chain(parts[1])); err != nil { + return err + } + } else if line == "COMMIT" { + if restoreTableName == tableName { + return nil + } + tableName = "" + } + } + } + + return nil +} + +func (f *fakeIptables) Restore(tableName utiliptables.Table, data []byte, flush utiliptables.FlushFlag, counters utiliptables.RestoreCountersFlag) error { + return f.restore(tableName, data, flush) +} + +func (f *fakeIptables) RestoreAll(data []byte, flush utiliptables.FlushFlag, counters utiliptables.RestoreCountersFlag) error { + return f.restore("", data, flush) +} + +func (f *fakeIptables) AddReloadFunc(reloadFunc func()) { +} + +func (f *fakeIptables) Destroy() { +} diff --git a/pkg/kubelet/network/hostport/hostport.go b/pkg/kubelet/network/hostport/hostport.go new file mode 100644 index 00000000000..23587e2fde3 --- /dev/null +++ b/pkg/kubelet/network/hostport/hostport.go @@ -0,0 +1,379 @@ +/* +Copyright 2014 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package hostport + +import ( + "bytes" + "crypto/sha256" + "encoding/base32" + "fmt" + "net" + "strings" + "time" + + "github.com/golang/glog" + "k8s.io/kubernetes/pkg/api" + kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" + iptablesproxy "k8s.io/kubernetes/pkg/proxy/iptables" + utildbus "k8s.io/kubernetes/pkg/util/dbus" + utilexec "k8s.io/kubernetes/pkg/util/exec" + utiliptables "k8s.io/kubernetes/pkg/util/iptables" +) + +const ( + // the hostport chain + kubeHostportsChain utiliptables.Chain = "KUBE-HOSTPORTS" + // prefix for hostport chains + kubeHostportChainPrefix string = "KUBE-HP-" +) + +type HostportHandler interface { + OpenPodHostportsAndSync(newPod *api.Pod, natInterfaceName string, runningPods []*RunningPod) error + SyncHostports(natInterfaceName string, runningPods []*RunningPod) error +} + +type RunningPod struct { + Pod *api.Pod + IP net.IP +} + +type hostportOpener func(*hostport) (closeable, error) + +type handler struct { + hostPortMap map[hostport]closeable + iptables utiliptables.Interface + portOpener hostportOpener +} + +func NewHostportHandler() HostportHandler { + iptInterface := utiliptables.New(utilexec.New(), utildbus.New(), utiliptables.ProtocolIpv4) + return &handler{ + hostPortMap: make(map[hostport]closeable), + iptables: iptInterface, + portOpener: openLocalPort, + } +} + +type closeable interface { + Close() error +} + +type hostport struct { + port int32 + protocol string +} + +type targetPod struct { + podFullName string + podIP string +} + +func (hp *hostport) String() string { + return fmt.Sprintf("%s:%d", hp.protocol, hp.port) +} + +//openPodHostports opens all hostport for pod and returns the map of hostport and socket +func (h *handler) openHostports(pod *api.Pod) error { + var retErr error + ports := make(map[hostport]closeable) + for _, container := range pod.Spec.Containers { + for _, port := range container.Ports { + if port.HostPort <= 0 { + // Ignore + continue + } + hp := hostport{ + port: port.HostPort, + protocol: strings.ToLower(string(port.Protocol)), + } + socket, err := h.portOpener(&hp) + if err != nil { + retErr = fmt.Errorf("Cannot open hostport %d for pod %s: %v", port.HostPort, kubecontainer.GetPodFullName(pod), err) + break + } + ports[hp] = socket + } + if retErr != nil { + break + } + } + // If encounter any error, close all hostports that just got opened. + if retErr != nil { + for hp, socket := range ports { + if err := socket.Close(); err != nil { + glog.Errorf("Cannot clean up hostport %d for pod %s: %v", hp.port, kubecontainer.GetPodFullName(pod), err) + } + } + return retErr + } + + for hostPort, socket := range ports { + h.hostPortMap[hostPort] = socket + } + + return nil +} + +// gatherAllHostports returns all hostports that should be presented on node, +// given the list of pods running on that node and ignoring host network +// pods (which don't need hostport <-> container port mapping). +func gatherAllHostports(runningPods []*RunningPod) (map[api.ContainerPort]targetPod, error) { + podHostportMap := make(map[api.ContainerPort]targetPod) + for _, r := range runningPods { + if r.IP.To4() == nil { + return nil, fmt.Errorf("Invalid or missing pod %s IP", kubecontainer.GetPodFullName(r.Pod)) + } + + // should not handle hostports for hostnetwork pods + if r.Pod.Spec.SecurityContext != nil && r.Pod.Spec.SecurityContext.HostNetwork { + continue + } + + for _, container := range r.Pod.Spec.Containers { + for _, port := range container.Ports { + if port.HostPort != 0 { + podHostportMap[port] = targetPod{podFullName: kubecontainer.GetPodFullName(r.Pod), podIP: r.IP.String()} + } + } + } + } + return podHostportMap, nil +} + +// Join all words with spaces, terminate with newline and write to buf. +func writeLine(buf *bytes.Buffer, words ...string) { + buf.WriteString(strings.Join(words, " ") + "\n") +} + +//hostportChainName takes containerPort for a pod and returns associated iptables chain. +// This is computed by hashing (sha256) +// then encoding to base32 and truncating with the prefix "KUBE-SVC-". We do +// this because Iptables Chain Names must be <= 28 chars long, and the longer +// they are the harder they are to read. +func hostportChainName(cp api.ContainerPort, podFullName string) utiliptables.Chain { + hash := sha256.Sum256([]byte(string(cp.HostPort) + string(cp.Protocol) + podFullName)) + encoded := base32.StdEncoding.EncodeToString(hash[:]) + return utiliptables.Chain(kubeHostportChainPrefix + encoded[:16]) +} + +// OpenPodHostportsAndSync opens hostports for a new pod, gathers all hostports on +// node, sets up iptables rules enable them. And finally clean up stale hostports +func (h *handler) OpenPodHostportsAndSync(newPod *api.Pod, natInterfaceName string, runningPods []*RunningPod) error { + // try to open pod host port if specified + if err := h.openHostports(newPod); err != nil { + return err + } + + return h.SyncHostports(natInterfaceName, runningPods) +} + +// SyncHostports gathers all hostports on node and setup iptables rules enable them. And finally clean up stale hostports +func (h *handler) SyncHostports(natInterfaceName string, runningPods []*RunningPod) error { + start := time.Now() + defer func() { + glog.V(4).Infof("syncHostportsRules took %v", time.Since(start)) + }() + + containerPortMap, err := gatherAllHostports(runningPods) + if err != nil { + return err + } + + glog.V(4).Info("Ensuring kubelet hostport chains") + // Ensure kubeHostportChain + if _, err := h.iptables.EnsureChain(utiliptables.TableNAT, kubeHostportsChain); err != nil { + return fmt.Errorf("Failed to ensure that %s chain %s exists: %v", utiliptables.TableNAT, kubeHostportsChain, err) + } + tableChainsNeedJumpServices := []struct { + table utiliptables.Table + chain utiliptables.Chain + }{ + {utiliptables.TableNAT, utiliptables.ChainOutput}, + {utiliptables.TableNAT, utiliptables.ChainPrerouting}, + } + args := []string{"-m", "comment", "--comment", "kube hostport portals", + "-m", "addrtype", "--dst-type", "LOCAL", + "-j", string(kubeHostportsChain)} + for _, tc := range tableChainsNeedJumpServices { + if _, err := h.iptables.EnsureRule(utiliptables.Prepend, tc.table, tc.chain, args...); err != nil { + return fmt.Errorf("Failed to ensure that %s chain %s jumps to %s: %v", tc.table, tc.chain, kubeHostportsChain, err) + } + } + // Need to SNAT traffic from localhost + args = []string{"-m", "comment", "--comment", "SNAT for localhost access to hostports", "-o", natInterfaceName, "-s", "127.0.0.0/8", "-j", "MASQUERADE"} + if _, err := h.iptables.EnsureRule(utiliptables.Append, utiliptables.TableNAT, utiliptables.ChainPostrouting, args...); err != nil { + return fmt.Errorf("Failed to ensure that %s chain %s jumps to MASQUERADE: %v", utiliptables.TableNAT, utiliptables.ChainPostrouting, err) + } + + // Get iptables-save output so we can check for existing chains and rules. + // This will be a map of chain name to chain with rules as stored in iptables-save/iptables-restore + existingNATChains := make(map[utiliptables.Chain]string) + iptablesSaveRaw, err := h.iptables.Save(utiliptables.TableNAT) + if err != nil { // if we failed to get any rules + glog.Errorf("Failed to execute iptables-save, syncing all rules: %v", err) + } else { // otherwise parse the output + existingNATChains = utiliptables.GetChainLines(utiliptables.TableNAT, iptablesSaveRaw) + } + + natChains := bytes.NewBuffer(nil) + natRules := bytes.NewBuffer(nil) + writeLine(natChains, "*nat") + // Make sure we keep stats for the top-level chains, if they existed + // (which most should have because we created them above). + if chain, ok := existingNATChains[kubeHostportsChain]; ok { + writeLine(natChains, chain) + } else { + writeLine(natChains, utiliptables.MakeChainLine(kubeHostportsChain)) + } + // Assuming the node is running kube-proxy in iptables mode + // Reusing kube-proxy's KubeMarkMasqChain for SNAT + // TODO: let kubelet manage KubeMarkMasqChain. Other components should just be able to use it + if chain, ok := existingNATChains[iptablesproxy.KubeMarkMasqChain]; ok { + writeLine(natChains, chain) + } else { + writeLine(natChains, utiliptables.MakeChainLine(iptablesproxy.KubeMarkMasqChain)) + } + + // Accumulate NAT chains to keep. + activeNATChains := map[utiliptables.Chain]bool{} // use a map as a set + + for containerPort, target := range containerPortMap { + protocol := strings.ToLower(string(containerPort.Protocol)) + hostportChain := hostportChainName(containerPort, target.podFullName) + if chain, ok := existingNATChains[hostportChain]; ok { + writeLine(natChains, chain) + } else { + writeLine(natChains, utiliptables.MakeChainLine(hostportChain)) + } + + activeNATChains[hostportChain] = true + + // Redirect to hostport chain + args := []string{ + "-A", string(kubeHostportsChain), + "-m", "comment", "--comment", fmt.Sprintf(`"%s hostport %d"`, target.podFullName, containerPort.HostPort), + "-m", protocol, "-p", protocol, + "--dport", fmt.Sprintf("%d", containerPort.HostPort), + "-j", string(hostportChain), + } + writeLine(natRules, args...) + + // If the request comes from the pod that is serving the hostport, then SNAT + args = []string{ + "-A", string(hostportChain), + "-m", "comment", "--comment", fmt.Sprintf(`"%s hostport %d"`, target.podFullName, containerPort.HostPort), + "-s", target.podIP, "-j", string(iptablesproxy.KubeMarkMasqChain), + } + writeLine(natRules, args...) + + // Create hostport chain to DNAT traffic to final destination + // Iptables will maintained the stats for this chain + args = []string{ + "-A", string(hostportChain), + "-m", "comment", "--comment", fmt.Sprintf(`"%s hostport %d"`, target.podFullName, containerPort.HostPort), + "-m", protocol, "-p", protocol, + "-j", "DNAT", fmt.Sprintf("--to-destination=%s:%d", target.podIP, containerPort.ContainerPort), + } + writeLine(natRules, args...) + } + + // Delete chains no longer in use. + for chain := range existingNATChains { + if !activeNATChains[chain] { + chainString := string(chain) + if !strings.HasPrefix(chainString, kubeHostportChainPrefix) { + // Ignore chains that aren't ours. + continue + } + // We must (as per iptables) write a chain-line for it, which has + // the nice effect of flushing the chain. Then we can remove the + // chain. + writeLine(natChains, existingNATChains[chain]) + writeLine(natRules, "-X", chainString) + } + } + writeLine(natRules, "COMMIT") + + natLines := append(natChains.Bytes(), natRules.Bytes()...) + glog.V(3).Infof("Restoring iptables rules: %s", natLines) + err = h.iptables.RestoreAll(natLines, utiliptables.NoFlushTables, utiliptables.RestoreCounters) + if err != nil { + return fmt.Errorf("Failed to execute iptables-restore: %v", err) + } + + h.cleanupHostportMap(containerPortMap) + return nil +} + +func openLocalPort(hp *hostport) (closeable, error) { + // For ports on node IPs, open the actual port and hold it, even though we + // use iptables to redirect traffic. + // This ensures a) that it's safe to use that port and b) that (a) stays + // true. The risk is that some process on the node (e.g. sshd or kubelet) + // is using a port and we give that same port out to a Service. That would + // be bad because iptables would silently claim the traffic but the process + // would never know. + // NOTE: We should not need to have a real listen()ing socket - bind() + // should be enough, but I can't figure out a way to e2e test without + // it. Tools like 'ss' and 'netstat' do not show sockets that are + // bind()ed but not listen()ed, and at least the default debian netcat + // has no way to avoid about 10 seconds of retries. + var socket closeable + switch hp.protocol { + case "tcp": + listener, err := net.Listen("tcp", fmt.Sprintf(":%d", hp.port)) + if err != nil { + return nil, err + } + socket = listener + case "udp": + addr, err := net.ResolveUDPAddr("udp", fmt.Sprintf(":%d", hp.port)) + if err != nil { + return nil, err + } + conn, err := net.ListenUDP("udp", addr) + if err != nil { + return nil, err + } + socket = conn + default: + return nil, fmt.Errorf("unknown protocol %q", hp.protocol) + } + glog.V(3).Infof("Opened local port %s", hp.String()) + return socket, nil +} + +// cleanupHostportMap closes obsolete hostports +func (h *handler) cleanupHostportMap(containerPortMap map[api.ContainerPort]targetPod) { + // compute hostports that are supposed to be open + currentHostports := make(map[hostport]bool) + for containerPort := range containerPortMap { + hp := hostport{ + port: containerPort.HostPort, + protocol: string(containerPort.Protocol), + } + currentHostports[hp] = true + } + + // close and delete obsolete hostports + for hp, socket := range h.hostPortMap { + if _, ok := currentHostports[hp]; !ok { + socket.Close() + delete(h.hostPortMap, hp) + } + } +} diff --git a/pkg/kubelet/network/hostport/hostport_test.go b/pkg/kubelet/network/hostport/hostport_test.go new file mode 100644 index 00000000000..5b8159c3ef0 --- /dev/null +++ b/pkg/kubelet/network/hostport/hostport_test.go @@ -0,0 +1,232 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package hostport + +import ( + "fmt" + "net" + "strings" + "testing" + + "k8s.io/kubernetes/pkg/api" + kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" + utiliptables "k8s.io/kubernetes/pkg/util/iptables" +) + +type fakeSocket struct { + port int32 + protocol string + closed bool +} + +func (f *fakeSocket) Close() error { + if f.closed { + return fmt.Errorf("Socket %q.%s already closed!", f.port, f.protocol) + } + f.closed = true + return nil +} + +func openFakeSocket(hp *hostport) (closeable, error) { + return &fakeSocket{hp.port, hp.protocol, false}, nil +} + +type ruleMatch struct { + hostport int + chain string + match string +} + +func TestOpenPodHostports(t *testing.T) { + fakeIptables := NewFakeIptables() + + h := &handler{ + hostPortMap: make(map[hostport]closeable), + iptables: fakeIptables, + portOpener: openFakeSocket, + } + + tests := []struct { + pod *api.Pod + ip string + matches []*ruleMatch + }{ + // New pod that we are going to add + { + &api.Pod{ + ObjectMeta: api.ObjectMeta{ + Name: "test-pod", + Namespace: api.NamespaceDefault, + }, + Spec: api.PodSpec{ + Containers: []api.Container{{ + Ports: []api.ContainerPort{{ + HostPort: 4567, + ContainerPort: 80, + Protocol: api.ProtocolTCP, + }, { + HostPort: 5678, + ContainerPort: 81, + Protocol: api.ProtocolUDP, + }}, + }}, + }, + }, + "10.1.1.2", + []*ruleMatch{ + { + -1, + "KUBE-HOSTPORTS", + "-m comment --comment \"test-pod_default hostport 4567\" -m tcp -p tcp --dport 4567", + }, + { + 4567, + "", + "-m comment --comment \"test-pod_default hostport 4567\" -s 10.1.1.2/32 -j KUBE-MARK-MASQ", + }, + { + 4567, + "", + "-m comment --comment \"test-pod_default hostport 4567\" -m tcp -p tcp -j DNAT --to-destination 10.1.1.2:80", + }, + { + -1, + "KUBE-HOSTPORTS", + "-m comment --comment \"test-pod_default hostport 5678\" -m udp -p udp --dport 5678", + }, + { + 5678, + "", + "-m comment --comment \"test-pod_default hostport 5678\" -s 10.1.1.2/32 -j KUBE-MARK-MASQ", + }, + { + 5678, + "", + "-m comment --comment \"test-pod_default hostport 5678\" -m udp -p udp -j DNAT --to-destination 10.1.1.2:81", + }, + }, + }, + // Already running pod + { + &api.Pod{ + ObjectMeta: api.ObjectMeta{ + Name: "another-test-pod", + Namespace: api.NamespaceDefault, + }, + Spec: api.PodSpec{ + Containers: []api.Container{{ + Ports: []api.ContainerPort{{ + HostPort: 123, + ContainerPort: 654, + Protocol: api.ProtocolTCP, + }}, + }}, + }, + }, + "10.1.1.5", + []*ruleMatch{ + { + -1, + "KUBE-HOSTPORTS", + "-m comment --comment \"another-test-pod_default hostport 123\" -m tcp -p tcp --dport 123", + }, + { + 123, + "", + "-m comment --comment \"another-test-pod_default hostport 123\" -s 10.1.1.5/32 -j KUBE-MARK-MASQ", + }, + { + 123, + "", + "-m comment --comment \"another-test-pod_default hostport 123\" -m tcp -p tcp -j DNAT --to-destination 10.1.1.5:654", + }, + }, + }, + } + + runningPods := make([]*RunningPod, 0) + + // Fill in any match rules missing chain names + for _, test := range tests { + for _, match := range test.matches { + if match.hostport >= 0 { + found := false + for _, c := range test.pod.Spec.Containers { + for _, cp := range c.Ports { + if int(cp.HostPort) == match.hostport { + match.chain = string(hostportChainName(cp, kubecontainer.GetPodFullName(test.pod))) + found = true + break + } + } + } + if !found { + t.Fatalf("Failed to find ContainerPort for match %d/'%s'", match.hostport, match.match) + } + } + } + runningPods = append(runningPods, &RunningPod{ + Pod: test.pod, + IP: net.ParseIP(test.ip), + }) + } + + err := h.OpenPodHostportsAndSync(tests[0].pod, "br0", runningPods) + if err != nil { + t.Fatalf("Failed to OpenPodHostportsAndSync: %v", err) + } + + // Generic rules + genericRules := []*ruleMatch{ + {-1, "POSTROUTING", "-m comment --comment \"SNAT for localhost access to hostports\" -o br0 -s 127.0.0.0/8 -j MASQUERADE"}, + {-1, "PREROUTING", "-m comment --comment \"kube hostport portals\" -m addrtype --dst-type LOCAL -j KUBE-HOSTPORTS"}, + {-1, "OUTPUT", "-m comment --comment \"kube hostport portals\" -m addrtype --dst-type LOCAL -j KUBE-HOSTPORTS"}, + } + + for _, rule := range genericRules { + _, chain, err := fakeIptables.getChain(utiliptables.TableNAT, utiliptables.Chain(rule.chain)) + if err != nil { + t.Fatalf("Expected NAT chain %s did not exist", rule.chain) + } + if !matchRule(chain, rule.match) { + t.Fatalf("Expected %s chain rule match '%s' not found", rule.chain, rule.match) + } + } + + // Pod rules + for _, test := range tests { + for _, match := range test.matches { + // Ensure chain exists + _, chain, err := fakeIptables.getChain(utiliptables.TableNAT, utiliptables.Chain(match.chain)) + if err != nil { + t.Fatalf("Expected NAT chain %s did not exist", match.chain) + } + if !matchRule(chain, match.match) { + t.Fatalf("Expected NAT chain %s rule containing '%s' not found", match.chain, match.match) + } + } + } +} + +func matchRule(chain *fakeChain, match string) bool { + for _, rule := range chain.rules { + if strings.Contains(rule, match) { + return true + } + } + return false +} diff --git a/pkg/kubelet/network/hostport/testing/fake.go b/pkg/kubelet/network/hostport/testing/fake.go new file mode 100644 index 00000000000..d2032741375 --- /dev/null +++ b/pkg/kubelet/network/hostport/testing/fake.go @@ -0,0 +1,45 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package testing + +import ( + "fmt" + + "k8s.io/kubernetes/pkg/api" + kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" + "k8s.io/kubernetes/pkg/kubelet/network/hostport" +) + +type fakeHandler struct{} + +func NewFakeHostportHandler() hostport.HostportHandler { + return &fakeHandler{} +} + +func (h *fakeHandler) OpenPodHostportsAndSync(newPod *api.Pod, natInterfaceName string, runningPods []*hostport.RunningPod) error { + return h.SyncHostports(natInterfaceName, runningPods) +} + +func (h *fakeHandler) SyncHostports(natInterfaceName string, runningPods []*hostport.RunningPod) error { + for _, r := range runningPods { + if r.IP.To4() == nil { + return fmt.Errorf("Invalid or missing pod %s IP", kubecontainer.GetPodFullName(r.Pod)) + } + } + + return nil +} diff --git a/pkg/kubelet/network/kubenet/kubenet_linux.go b/pkg/kubelet/network/kubenet/kubenet_linux.go index bdc8baabad3..3e942234f32 100644 --- a/pkg/kubelet/network/kubenet/kubenet_linux.go +++ b/pkg/kubelet/network/kubenet/kubenet_linux.go @@ -19,9 +19,6 @@ limitations under the License. package kubenet import ( - "bytes" - "crypto/sha256" - "encoding/base32" "fmt" "net" "strings" @@ -34,18 +31,19 @@ import ( "github.com/golang/glog" "github.com/vishvananda/netlink" "github.com/vishvananda/netlink/nl" - "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/apis/componentconfig" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/dockertools" "k8s.io/kubernetes/pkg/kubelet/network" - iptablesproxy "k8s.io/kubernetes/pkg/proxy/iptables" "k8s.io/kubernetes/pkg/util/bandwidth" utildbus "k8s.io/kubernetes/pkg/util/dbus" + utilerrors "k8s.io/kubernetes/pkg/util/errors" utilexec "k8s.io/kubernetes/pkg/util/exec" utiliptables "k8s.io/kubernetes/pkg/util/iptables" utilsets "k8s.io/kubernetes/pkg/util/sets" utilsysctl "k8s.io/kubernetes/pkg/util/sysctl" + + "k8s.io/kubernetes/pkg/kubelet/network/hostport" ) const ( @@ -54,11 +52,6 @@ const ( DefaultCNIDir = "/opt/cni/bin" sysctlBridgeCallIptables = "net/bridge/bridge-nf-call-iptables" - - // the hostport chain - kubenetHostportsChain utiliptables.Chain = "KUBENET-HOSTPORTS" - // prefix for kubenet hostport chains - kubenetHostportChainPrefix string = "KUBENET-HP-" ) type kubenetNetworkPlugin struct { @@ -75,7 +68,7 @@ type kubenetNetworkPlugin struct { execer utilexec.Interface nsenterPath string hairpinMode componentconfig.HairpinMode - hostPortMap map[hostport]closeable + hostportHandler hostport.HostportHandler iptables utiliptables.Interface // vendorDir is passed by kubelet network-plugin-dir parameter. // kubenet will search for cni binaries in DefaultCNIDir first, then continue to vendorDir. @@ -90,11 +83,11 @@ func NewPlugin(networkPluginDir string) network.NetworkPlugin { iptInterface := utiliptables.New(execer, dbus, protocol) return &kubenetNetworkPlugin{ podIPs: make(map[kubecontainer.ContainerID]string), - hostPortMap: make(map[hostport]closeable), MTU: 1460, //TODO: don't hardcode this execer: utilexec.New(), iptables: iptInterface, vendorDir: networkPluginDir, + hostportHandler: hostport.NewHostportHandler(), nonMasqueradeCIDR: "10.0.0.0/8", } } @@ -310,15 +303,6 @@ func (plugin *kubenetNetworkPlugin) SetUpPod(namespace string, name string, id k if !ok { return fmt.Errorf("pod %q cannot be found", name) } - // try to open pod host port if specified - hostportMap, err := plugin.openPodHostports(pod) - if err != nil { - return err - } - if len(hostportMap) > 0 { - // defer to decide whether to keep the host port open based on the result of SetUpPod - defer plugin.syncHostportMap(id, hostportMap) - } ingress, egress, err := bandwidth.ExtractPodBandwidthResources(pod.Annotations) if err != nil { @@ -346,7 +330,6 @@ func (plugin *kubenetNetworkPlugin) SetUpPod(namespace string, name string, id k if ip4 == nil { return fmt.Errorf("CNI plugin reported an invalid IPv4 address for container %v: %+v.", id, res.IP4) } - plugin.podIPs[id] = ip4.String() // Put the container bridge into promiscuous mode to force it to accept hairpin packets. // TODO: Remove this once the kernel bug (#20096) is fixed. @@ -366,19 +349,25 @@ func (plugin *kubenetNetworkPlugin) SetUpPod(namespace string, name string, id k shaper := plugin.shaper() if egress != nil || ingress != nil { - ipAddr := plugin.podIPs[id] - if err := shaper.ReconcileCIDR(fmt.Sprintf("%s/32", ipAddr), egress, ingress); err != nil { + if err := shaper.ReconcileCIDR(fmt.Sprintf("%s/32", ip4.String()), egress, ingress); err != nil { return fmt.Errorf("Failed to add pod to shaper: %v", err) } } - plugin.syncHostportsRules() + plugin.podIPs[id] = ip4.String() + + // Open any hostports the pod's containers want + runningPods, err := plugin.getRunningPods() + if err == nil { + err = plugin.hostportHandler.OpenPodHostportsAndSync(pod, BridgeName, runningPods) + } // Need to SNAT outbound traffic from cluster if err = plugin.ensureMasqRule(); err != nil { glog.Errorf("Failed to ensure MASQ rule: %v", err) } - return nil + + return err } func (plugin *kubenetNetworkPlugin) TearDownPod(namespace string, name string, id kubecontainer.ContainerID) error { @@ -414,13 +403,17 @@ func (plugin *kubenetNetworkPlugin) TearDownPod(namespace string, name string, i } delete(plugin.podIPs, id) - plugin.syncHostportsRules() + runningPods, err := plugin.getRunningPods() + if err == nil { + err = plugin.hostportHandler.SyncHostports(BridgeName, runningPods) + } // Need to SNAT outbound traffic from cluster if err := plugin.ensureMasqRule(); err != nil { glog.Errorf("Failed to ensure MASQ rule: %v", err) } - return nil + + return err } // TODO: Use the addToNetwork function to obtain the IP of the Pod. That will assume idempotent ADD call to the plugin. @@ -467,6 +460,39 @@ func (plugin *kubenetNetworkPlugin) Status() error { return nil } +// Returns a list of pods running on this node and each pod's IP address. Assumes +// PodSpecs retrieved from the runtime include the name and ID of containers in +// each pod. +func (plugin *kubenetNetworkPlugin) getRunningPods() ([]*hostport.RunningPod, error) { + pods, err := plugin.host.GetRuntime().GetPods(false) + if err != nil { + return nil, fmt.Errorf("Failed to retrieve pods from runtime: %v", err) + } + runningPods := make([]*hostport.RunningPod, 0) + for _, p := range pods { + for _, c := range p.Containers { + if c.Name != dockertools.PodInfraContainerName { + continue + } + ipString, ok := plugin.podIPs[c.ID] + if !ok { + continue + } + podIP := net.ParseIP(ipString) + if podIP == nil { + continue + } + if pod, ok := plugin.host.GetPodByName(p.Namespace, p.Name); ok { + runningPods = append(runningPods, &hostport.RunningPod{ + Pod: pod, + IP: podIP, + }) + } + } + } + return runningPods, nil +} + func (plugin *kubenetNetworkPlugin) buildCNIRuntimeConf(ifName string, id kubecontainer.ContainerID) (*libcni.RuntimeConf, error) { netnsPath, err := plugin.host.GetRuntime().GetNetNS(id) if err != nil { @@ -518,332 +544,6 @@ func (plugin *kubenetNetworkPlugin) getNsenterPath() (string, error) { return plugin.nsenterPath, nil } -type closeable interface { - Close() error -} - -type hostport struct { - port int32 - protocol string -} - -type targetPod struct { - podFullName string - podIP string -} - -func (hp *hostport) String() string { - return fmt.Sprintf("%s:%d", hp.protocol, hp.port) -} - -//openPodHostports opens all hostport for pod and returns the map of hostport and socket -func (plugin *kubenetNetworkPlugin) openPodHostports(pod *api.Pod) (map[hostport]closeable, error) { - var retErr error - hostportMap := make(map[hostport]closeable) - for _, container := range pod.Spec.Containers { - for _, port := range container.Ports { - if port.HostPort <= 0 { - // Ignore - continue - } - hp := hostport{ - port: port.HostPort, - protocol: strings.ToLower(string(port.Protocol)), - } - socket, err := openLocalPort(&hp) - if err != nil { - retErr = fmt.Errorf("Cannot open hostport %d for pod %s: %v", port.HostPort, kubecontainer.GetPodFullName(pod), err) - break - } - hostportMap[hp] = socket - } - if retErr != nil { - break - } - } - // If encounter any error, close all hostports that just got opened. - if retErr != nil { - for hp, socket := range hostportMap { - if err := socket.Close(); err != nil { - glog.Errorf("Cannot clean up hostport %d for pod %s: %v", hp.port, kubecontainer.GetPodFullName(pod), err) - } - } - } - return hostportMap, retErr -} - -//syncHostportMap syncs newly opened hostports to kubenet on successful pod setup. If pod setup failed, then clean up. -func (plugin *kubenetNetworkPlugin) syncHostportMap(id kubecontainer.ContainerID, hostportMap map[hostport]closeable) { - // if pod ip cannot be retrieved from podCIDR, then assume pod setup failed. - if _, ok := plugin.podIPs[id]; !ok { - for hp, socket := range hostportMap { - err := socket.Close() - if err != nil { - glog.Errorf("Failed to close socket for hostport %v", hp) - } - } - return - } - // add newly opened hostports - for hp, socket := range hostportMap { - plugin.hostPortMap[hp] = socket - } -} - -// gatherAllHostports returns all hostports that should be presented on node -func (plugin *kubenetNetworkPlugin) gatherAllHostports() (map[api.ContainerPort]targetPod, error) { - podHostportMap := make(map[api.ContainerPort]targetPod) - pods, err := plugin.host.GetRuntime().GetPods(false) - if err != nil { - return nil, fmt.Errorf("Failed to retrieve pods from runtime: %v", err) - } - for _, p := range pods { - var podInfraContainerId kubecontainer.ContainerID - for _, c := range p.Containers { - if c.Name == dockertools.PodInfraContainerName { - podInfraContainerId = c.ID - break - } - } - // Assuming if kubenet has the pod's ip, the pod is alive and its host port should be presented. - podIP, ok := plugin.podIPs[podInfraContainerId] - if !ok { - // The POD has been delete. Ignore - continue - } - // Need the complete api.Pod object - pod, ok := plugin.host.GetPodByName(p.Namespace, p.Name) - // kubenet should not handle hostports for hostnetwork pods - if ok && !pod.Spec.SecurityContext.HostNetwork { - for _, container := range pod.Spec.Containers { - for _, port := range container.Ports { - if port.HostPort != 0 { - podHostportMap[port] = targetPod{podFullName: kubecontainer.GetPodFullName(pod), podIP: podIP} - } - } - } - } - } - return podHostportMap, nil -} - -// Join all words with spaces, terminate with newline and write to buf. -func writeLine(buf *bytes.Buffer, words ...string) { - buf.WriteString(strings.Join(words, " ") + "\n") -} - -//hostportChainName takes containerPort for a pod and returns associated iptables chain. -// This is computed by hashing (sha256) -// then encoding to base32 and truncating with the prefix "KUBE-SVC-". We do -// this because Iptables Chain Names must be <= 28 chars long, and the longer -// they are the harder they are to read. -func hostportChainName(cp api.ContainerPort, podFullName string) utiliptables.Chain { - hash := sha256.Sum256([]byte(string(cp.HostPort) + string(cp.Protocol) + podFullName)) - encoded := base32.StdEncoding.EncodeToString(hash[:]) - return utiliptables.Chain(kubenetHostportChainPrefix + encoded[:16]) -} - -// syncHostportsRules gathers all hostports on node and setup iptables rules enable them. And finally clean up stale hostports -func (plugin *kubenetNetworkPlugin) syncHostportsRules() { - start := time.Now() - defer func() { - glog.V(4).Infof("syncHostportsRules took %v", time.Since(start)) - }() - - containerPortMap, err := plugin.gatherAllHostports() - if err != nil { - glog.Errorf("Fail to get hostports: %v", err) - return - } - - glog.V(4).Info("Ensuring kubenet hostport chains") - // Ensure kubenetHostportChain - if _, err := plugin.iptables.EnsureChain(utiliptables.TableNAT, kubenetHostportsChain); err != nil { - glog.Errorf("Failed to ensure that %s chain %s exists: %v", utiliptables.TableNAT, kubenetHostportsChain, err) - return - } - tableChainsNeedJumpServices := []struct { - table utiliptables.Table - chain utiliptables.Chain - }{ - {utiliptables.TableNAT, utiliptables.ChainOutput}, - {utiliptables.TableNAT, utiliptables.ChainPrerouting}, - } - args := []string{"-m", "comment", "--comment", "kubenet hostport portals", - "-m", "addrtype", "--dst-type", "LOCAL", - "-j", string(kubenetHostportsChain)} - for _, tc := range tableChainsNeedJumpServices { - if _, err := plugin.iptables.EnsureRule(utiliptables.Prepend, tc.table, tc.chain, args...); err != nil { - glog.Errorf("Failed to ensure that %s chain %s jumps to %s: %v", tc.table, tc.chain, kubenetHostportsChain, err) - return - } - } - // Need to SNAT traffic from localhost - args = []string{"-m", "comment", "--comment", "SNAT for localhost access to hostports", "-o", BridgeName, "-s", "127.0.0.0/8", "-j", "MASQUERADE"} - if _, err := plugin.iptables.EnsureRule(utiliptables.Append, utiliptables.TableNAT, utiliptables.ChainPostrouting, args...); err != nil { - glog.Errorf("Failed to ensure that %s chain %s jumps to MASQUERADE: %v", utiliptables.TableNAT, utiliptables.ChainPostrouting, err) - return - } - - // Get iptables-save output so we can check for existing chains and rules. - // This will be a map of chain name to chain with rules as stored in iptables-save/iptables-restore - existingNATChains := make(map[utiliptables.Chain]string) - iptablesSaveRaw, err := plugin.iptables.Save(utiliptables.TableNAT) - if err != nil { // if we failed to get any rules - glog.Errorf("Failed to execute iptables-save, syncing all rules: %v", err) - } else { // otherwise parse the output - existingNATChains = utiliptables.GetChainLines(utiliptables.TableNAT, iptablesSaveRaw) - } - - natChains := bytes.NewBuffer(nil) - natRules := bytes.NewBuffer(nil) - writeLine(natChains, "*nat") - // Make sure we keep stats for the top-level chains, if they existed - // (which most should have because we created them above). - if chain, ok := existingNATChains[kubenetHostportsChain]; ok { - writeLine(natChains, chain) - } else { - writeLine(natChains, utiliptables.MakeChainLine(kubenetHostportsChain)) - } - // Assuming the node is running kube-proxy in iptables mode - // Reusing kube-proxy's KubeMarkMasqChain for SNAT - // TODO: let kubelet manage KubeMarkMasqChain. Other components should just be able to use it - if chain, ok := existingNATChains[iptablesproxy.KubeMarkMasqChain]; ok { - writeLine(natChains, chain) - } else { - writeLine(natChains, utiliptables.MakeChainLine(iptablesproxy.KubeMarkMasqChain)) - } - - // Accumulate NAT chains to keep. - activeNATChains := map[utiliptables.Chain]bool{} // use a map as a set - - for containerPort, target := range containerPortMap { - protocol := strings.ToLower(string(containerPort.Protocol)) - hostportChain := hostportChainName(containerPort, target.podFullName) - if chain, ok := existingNATChains[hostportChain]; ok { - writeLine(natChains, chain) - } else { - writeLine(natChains, utiliptables.MakeChainLine(hostportChain)) - } - - activeNATChains[hostportChain] = true - - // Redirect to hostport chain - args := []string{ - "-A", string(kubenetHostportsChain), - "-m", "comment", "--comment", fmt.Sprintf(`"%s hostport %d"`, target.podFullName, containerPort.HostPort), - "-m", protocol, "-p", protocol, - "--dport", fmt.Sprintf("%d", containerPort.HostPort), - "-j", string(hostportChain), - } - writeLine(natRules, args...) - - // If the request comes from the pod that is serving the hostport, then SNAT - args = []string{ - "-A", string(hostportChain), - "-m", "comment", "--comment", fmt.Sprintf(`"%s hostport %d"`, target.podFullName, containerPort.HostPort), - "-s", target.podIP, "-j", string(iptablesproxy.KubeMarkMasqChain), - } - writeLine(natRules, args...) - - // Create hostport chain to DNAT traffic to final destination - // Iptables will maintained the stats for this chain - args = []string{ - "-A", string(hostportChain), - "-m", "comment", "--comment", fmt.Sprintf(`"%s hostport %d"`, target.podFullName, containerPort.HostPort), - "-m", protocol, "-p", protocol, - "-j", "DNAT", fmt.Sprintf("--to-destination=%s:%d", target.podIP, containerPort.ContainerPort), - } - writeLine(natRules, args...) - } - - // Delete chains no longer in use. - for chain := range existingNATChains { - if !activeNATChains[chain] { - chainString := string(chain) - if !strings.HasPrefix(chainString, kubenetHostportChainPrefix) { - // Ignore chains that aren't ours. - continue - } - // We must (as per iptables) write a chain-line for it, which has - // the nice effect of flushing the chain. Then we can remove the - // chain. - writeLine(natChains, existingNATChains[chain]) - writeLine(natRules, "-X", chainString) - } - } - writeLine(natRules, "COMMIT") - - natLines := append(natChains.Bytes(), natRules.Bytes()...) - glog.V(3).Infof("Restoring iptables rules: %s", natLines) - err = plugin.iptables.RestoreAll(natLines, utiliptables.NoFlushTables, utiliptables.RestoreCounters) - if err != nil { - glog.Errorf("Failed to execute iptables-restore: %v", err) - return - } - - plugin.cleanupHostportMap(containerPortMap) -} - -func openLocalPort(hp *hostport) (closeable, error) { - // For ports on node IPs, open the actual port and hold it, even though we - // use iptables to redirect traffic. - // This ensures a) that it's safe to use that port and b) that (a) stays - // true. The risk is that some process on the node (e.g. sshd or kubelet) - // is using a port and we give that same port out to a Service. That would - // be bad because iptables would silently claim the traffic but the process - // would never know. - // NOTE: We should not need to have a real listen()ing socket - bind() - // should be enough, but I can't figure out a way to e2e test without - // it. Tools like 'ss' and 'netstat' do not show sockets that are - // bind()ed but not listen()ed, and at least the default debian netcat - // has no way to avoid about 10 seconds of retries. - var socket closeable - switch hp.protocol { - case "tcp": - listener, err := net.Listen("tcp", fmt.Sprintf(":%d", hp.port)) - if err != nil { - return nil, err - } - socket = listener - case "udp": - addr, err := net.ResolveUDPAddr("udp", fmt.Sprintf(":%d", hp.port)) - if err != nil { - return nil, err - } - conn, err := net.ListenUDP("udp", addr) - if err != nil { - return nil, err - } - socket = conn - default: - return nil, fmt.Errorf("unknown protocol %q", hp.protocol) - } - glog.V(2).Infof("Opened local port %s", hp.String()) - return socket, nil -} - -// cleanupHostportMap closes obsolete hostports -func (plugin *kubenetNetworkPlugin) cleanupHostportMap(containerPortMap map[api.ContainerPort]targetPod) { - // compute hostports that are supposed to be open - currentHostports := make(map[hostport]bool) - for containerPort := range containerPortMap { - hp := hostport{ - port: containerPort.HostPort, - protocol: string(containerPort.Protocol), - } - currentHostports[hp] = true - } - - // close and delete obsolete hostports - for hp, socket := range plugin.hostPortMap { - if _, ok := currentHostports[hp]; !ok { - socket.Close() - delete(plugin.hostPortMap, hp) - } - } -} - // shaper retrieves the bandwidth shaper and, if it hasn't been fetched before, // initializes it and ensures the bridge is appropriately configured // This function should only be called while holding the `plugin.mu` lock diff --git a/pkg/kubelet/network/kubenet/kubenet_linux_test.go b/pkg/kubelet/network/kubenet/kubenet_linux_test.go index 051a0f2494f..02b2b93a0de 100644 --- a/pkg/kubelet/network/kubenet/kubenet_linux_test.go +++ b/pkg/kubelet/network/kubenet/kubenet_linux_test.go @@ -27,6 +27,7 @@ import ( kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/network" "k8s.io/kubernetes/pkg/kubelet/network/cni/testing" + hostporttest "k8s.io/kubernetes/pkg/kubelet/network/hostport/testing" nettest "k8s.io/kubernetes/pkg/kubelet/network/testing" "k8s.io/kubernetes/pkg/util/bandwidth" "k8s.io/kubernetes/pkg/util/exec" @@ -137,6 +138,7 @@ func TestTeardownCallsShaper(t *testing.T) { kubenet.cniConfig = mockcni kubenet.iptables = ipttest.NewFake() kubenet.bandwidthShaper = fshaper + kubenet.hostportHandler = hostporttest.NewFakeHostportHandler() mockcni.On("DelNetwork", mock.AnythingOfType("*libcni.NetworkConfig"), mock.AnythingOfType("*libcni.RuntimeConf")).Return(nil) From a519e8a403f4a9106bfe729ab685e84daea26c11 Mon Sep 17 00:00:00 2001 From: Dan Williams Date: Mon, 13 Jun 2016 17:19:04 -0500 Subject: [PATCH 2/2] kubenet: clean up networking when setup errors occur Relying on the runtime to later call cleanup is fragile, so make sure that everything gets nicely cleaned up when setup errors occur. --- pkg/kubelet/network/kubenet/kubenet_linux.go | 132 ++++++++++++------- 1 file changed, 83 insertions(+), 49 deletions(-) diff --git a/pkg/kubelet/network/kubenet/kubenet_linux.go b/pkg/kubelet/network/kubenet/kubenet_linux.go index 3e942234f32..8ced3032dd6 100644 --- a/pkg/kubelet/network/kubenet/kubenet_linux.go +++ b/pkg/kubelet/network/kubenet/kubenet_linux.go @@ -31,6 +31,7 @@ import ( "github.com/golang/glog" "github.com/vishvananda/netlink" "github.com/vishvananda/netlink/nl" + "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/apis/componentconfig" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/dockertools" @@ -290,29 +291,7 @@ func (plugin *kubenetNetworkPlugin) Capabilities() utilsets.Int { return utilsets.NewInt(network.NET_PLUGIN_CAPABILITY_SHAPING) } -func (plugin *kubenetNetworkPlugin) SetUpPod(namespace string, name string, id kubecontainer.ContainerID) error { - plugin.mu.Lock() - defer plugin.mu.Unlock() - - start := time.Now() - defer func() { - glog.V(4).Infof("SetUpPod took %v for %s/%s", time.Since(start), namespace, name) - }() - - pod, ok := plugin.host.GetPodByName(namespace, name) - if !ok { - return fmt.Errorf("pod %q cannot be found", name) - } - - ingress, egress, err := bandwidth.ExtractPodBandwidthResources(pod.Annotations) - if err != nil { - return fmt.Errorf("Error reading pod bandwidth annotations: %v", err) - } - - if err := plugin.Status(); err != nil { - return fmt.Errorf("Kubenet cannot SetUpPod: %v", err) - } - +func (plugin *kubenetNetworkPlugin) setup(namespace string, name string, id kubecontainer.ContainerID, pod *api.Pod) error { // Bring up container loopback interface if _, err := plugin.addContainerToNetwork(plugin.loConfig, "lo", namespace, name, id); err != nil { return err @@ -348,6 +327,10 @@ func (plugin *kubenetNetworkPlugin) SetUpPod(namespace string, name string, id k // initialization shaper := plugin.shaper() + ingress, egress, err := bandwidth.ExtractPodBandwidthResources(pod.Annotations) + if err != nil { + return fmt.Errorf("Error reading pod bandwidth annotations: %v", err) + } if egress != nil || ingress != nil { if err := shaper.ReconcileCIDR(fmt.Sprintf("%s/32", ip4.String()), egress, ingress); err != nil { return fmt.Errorf("Failed to add pod to shaper: %v", err) @@ -358,16 +341,86 @@ func (plugin *kubenetNetworkPlugin) SetUpPod(namespace string, name string, id k // Open any hostports the pod's containers want runningPods, err := plugin.getRunningPods() - if err == nil { - err = plugin.hostportHandler.OpenPodHostportsAndSync(pod, BridgeName, runningPods) + if err != nil { + return err + } + if err := plugin.hostportHandler.OpenPodHostportsAndSync(pod, BridgeName, runningPods); err != nil { + return err + } + + return nil +} + +func (plugin *kubenetNetworkPlugin) SetUpPod(namespace string, name string, id kubecontainer.ContainerID) error { + plugin.mu.Lock() + defer plugin.mu.Unlock() + + start := time.Now() + defer func() { + glog.V(4).Infof("SetUpPod took %v for %s/%s", time.Since(start), namespace, name) + }() + + pod, ok := plugin.host.GetPodByName(namespace, name) + if !ok { + return fmt.Errorf("pod %q cannot be found", name) + } + + if err := plugin.Status(); err != nil { + return fmt.Errorf("Kubenet cannot SetUpPod: %v", err) + } + + if err := plugin.setup(namespace, name, id, pod); err != nil { + // Make sure everything gets cleaned up on errors + podIP, _ := plugin.podIPs[id] + if err := plugin.teardown(namespace, name, id, podIP); err != nil { + // Not a hard error or warning + glog.V(4).Infof("Failed to clean up %s/%s after SetUpPod failure: %v", namespace, name, err) + } + return err } // Need to SNAT outbound traffic from cluster - if err = plugin.ensureMasqRule(); err != nil { + if err := plugin.ensureMasqRule(); err != nil { glog.Errorf("Failed to ensure MASQ rule: %v", err) } - return err + return nil +} + +// Tears down as much of a pod's network as it can even if errors occur. Returns +// an aggregate error composed of all errors encountered during the teardown. +func (plugin *kubenetNetworkPlugin) teardown(namespace string, name string, id kubecontainer.ContainerID, podIP string) error { + errList := []error{} + + if podIP != "" { + glog.V(5).Infof("Removing pod IP %s from shaper", podIP) + // shaper wants /32 + if err := plugin.shaper().Reset(fmt.Sprintf("%s/32", podIP)); err != nil { + // Possible bandwidth shaping wasn't enabled for this pod anyways + glog.V(4).Infof("Failed to remove pod IP %s from shaper: %v", podIP, err) + } + + delete(plugin.podIPs, id) + } + + if err := plugin.delContainerFromNetwork(plugin.netConfig, network.DefaultInterfaceName, namespace, name, id); err != nil { + // This is to prevent returning error when TearDownPod is called twice on the same pod. This helps to reduce event pollution. + if podIP != "" { + glog.Warningf("Failed to delete container from kubenet: %v", err) + } else { + errList = append(errList, err) + } + } + + runningPods, err := plugin.getRunningPods() + if err == nil { + err = plugin.hostportHandler.SyncHostports(BridgeName, runningPods) + } + if err != nil { + errList = append(errList, err) + } + + return utilerrors.NewAggregate(errList) } func (plugin *kubenetNetworkPlugin) TearDownPod(namespace string, name string, id kubecontainer.ContainerID) error { @@ -384,36 +437,17 @@ func (plugin *kubenetNetworkPlugin) TearDownPod(namespace string, name string, i } // no cached IP is Ok during teardown - podIP, hasIP := plugin.podIPs[id] - if hasIP { - glog.V(5).Infof("Removing pod IP %s from shaper", podIP) - // shaper wants /32 - if err := plugin.shaper().Reset(fmt.Sprintf("%s/32", podIP)); err != nil { - // Possible bandwidth shaping wasn't enabled for this pod anyways - glog.V(4).Infof("Failed to remove pod IP %s from shaper: %v", podIP, err) - } - } - if err := plugin.delContainerFromNetwork(plugin.netConfig, network.DefaultInterfaceName, namespace, name, id); err != nil { - // This is to prevent returning error when TearDownPod is called twice on the same pod. This helps to reduce event pollution. - if !hasIP { - glog.Warningf("Failed to delete container from kubenet: %v", err) - return nil - } + podIP, _ := plugin.podIPs[id] + if err := plugin.teardown(namespace, name, id, podIP); err != nil { return err } - delete(plugin.podIPs, id) - - runningPods, err := plugin.getRunningPods() - if err == nil { - err = plugin.hostportHandler.SyncHostports(BridgeName, runningPods) - } // Need to SNAT outbound traffic from cluster if err := plugin.ensureMasqRule(); err != nil { glog.Errorf("Failed to ensure MASQ rule: %v", err) } - return err + return nil } // TODO: Use the addToNetwork function to obtain the IP of the Pod. That will assume idempotent ADD call to the plugin.