From 8e7219cbb4d66758d9e583631c9d59f780112508 Mon Sep 17 00:00:00 2001 From: Minhan Xia Date: Thu, 2 Feb 2017 19:50:37 -0800 Subject: [PATCH] add hostport manager --- pkg/kubelet/network/hostport/fake_iptables.go | 13 +- .../network/hostport/hostport_manager.go | 328 ++++++++++++++++++ .../network/hostport/hostport_manager_test.go | 197 +++++++++++ 3 files changed, 537 insertions(+), 1 deletion(-) create mode 100644 pkg/kubelet/network/hostport/hostport_manager.go create mode 100644 pkg/kubelet/network/hostport/hostport_manager_test.go diff --git a/pkg/kubelet/network/hostport/fake_iptables.go b/pkg/kubelet/network/hostport/fake_iptables.go index dad4b5cd0f1..d8c05baddc2 100644 --- a/pkg/kubelet/network/hostport/fake_iptables.go +++ b/pkg/kubelet/network/hostport/fake_iptables.go @@ -300,11 +300,22 @@ func (f *fakeIPTables) restore(restoreTableName utiliptables.Table, data []byte, if err != nil { return err } - } else if strings.HasPrefix(line, "-X") { + } else if strings.HasPrefix(line, "-I") { parts := strings.Split(line, " ") if len(parts) < 3 { return fmt.Errorf("Invalid iptables rule '%s'", line) } + chainName := utiliptables.Chain(parts[1]) + rule := strings.TrimPrefix(line, fmt.Sprintf("-I %s ", chainName)) + _, err := f.ensureRule(utiliptables.Prepend, tableName, chainName, rule) + if err != nil { + return err + } + } else if strings.HasPrefix(line, "-X") { + parts := strings.Split(line, " ") + if len(parts) < 2 { + return fmt.Errorf("Invalid iptables rule '%s'", line) + } if err := f.DeleteChain(tableName, utiliptables.Chain(parts[1])); err != nil { return err } diff --git a/pkg/kubelet/network/hostport/hostport_manager.go b/pkg/kubelet/network/hostport/hostport_manager.go new file mode 100644 index 00000000000..5c18f8fce17 --- /dev/null +++ b/pkg/kubelet/network/hostport/hostport_manager.go @@ -0,0 +1,328 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package hostport + +import ( + "bytes" + "crypto/sha256" + "encoding/base32" + "fmt" + "strings" + "sync" + + "github.com/golang/glog" + utilerrors "k8s.io/apimachinery/pkg/util/errors" + iptablesproxy "k8s.io/kubernetes/pkg/proxy/iptables" + utildbus "k8s.io/kubernetes/pkg/util/dbus" + utilexec "k8s.io/kubernetes/pkg/util/exec" + utiliptables "k8s.io/kubernetes/pkg/util/iptables" +) + +// HostPortManager is an interface for adding and removing hostport for a given pod sandbox. +type HostPortManager interface { + // Add implements port mappings. + // id should be a unique identifier for a pod, e.g. podSandboxID. + // podPortMapping is the associated port mapping information for the pod. + // natInterfaceName is the interface that localhost used to talk to the given pod. + Add(id string, podPortMapping *PodPortMapping, natInterfaceName string) error + // Remove cleans up matching port mappings + // Remove must be able to clean up port mappings without pod IP + Remove(id string, podPortMapping *PodPortMapping) error +} + +type hostportManager struct { + hostPortMap map[hostport]closeable + iptables utiliptables.Interface + portOpener hostportOpener + mu sync.Mutex +} + +func NewHostportManager() HostPortManager { + iptInterface := utiliptables.New(utilexec.New(), utildbus.New(), utiliptables.ProtocolIpv4) + return &hostportManager{ + hostPortMap: make(map[hostport]closeable), + iptables: iptInterface, + portOpener: openLocalPort, + } +} + +func (hm *hostportManager) Add(id string, podPortMapping *PodPortMapping, natInterfaceName string) (err error) { + if podPortMapping == nil || podPortMapping.HostNetwork { + return nil + } + podFullName := getPodFullName(podPortMapping) + + // skip if there is no hostport needed + hostportMappings := gatherHostportMappings(podPortMapping) + if len(hostportMappings) == 0 { + return nil + } + + if podPortMapping.IP.To4() == nil { + return fmt.Errorf("invalid or missing IP of pod %s", podFullName) + } + podIP := podPortMapping.IP.String() + + if err = ensureKubeHostportChains(hm.iptables, natInterfaceName); err != nil { + return err + } + + // Ensure atomicity for port opening and iptables operations + hm.mu.Lock() + defer hm.mu.Unlock() + + // try to open hostports + ports, err := openHostports(hm.portOpener, podPortMapping) + if err != nil { + return err + } + for hostport, socket := range ports { + hm.hostPortMap[hostport] = socket + } + + natChains := bytes.NewBuffer(nil) + natRules := bytes.NewBuffer(nil) + writeLine(natChains, "*nat") + + existingChains, existingRules, err := getExistingHostportIPTablesRules(hm.iptables) + if err != nil { + // clean up opened host port if encounter any error + return utilerrors.NewAggregate([]error{err, hm.closeHostports(hostportMappings)}) + } + + newChains := []utiliptables.Chain{} + for _, pm := range hostportMappings { + protocol := strings.ToLower(string(pm.Protocol)) + chain := getHostportChain(id, pm) + newChains = append(newChains, chain) + + // Add new hostport chain + writeLine(natChains, utiliptables.MakeChainLine(chain)) + + // Prepend the new chain to KUBE-HOSTPORTS + // This avoids any leaking iptables rule that takes up the same port + writeLine(natRules, "-I", string(kubeHostportsChain), + "-m", "comment", "--comment", fmt.Sprintf(`"%s hostport %d"`, podFullName, pm.HostPort), + "-m", protocol, "-p", protocol, "--dport", fmt.Sprintf("%d", pm.HostPort), + "-j", string(chain), + ) + + // SNAT if the traffic comes from the pod itself + writeLine(natRules, "-A", string(chain), + "-m", "comment", "--comment", fmt.Sprintf(`"%s hostport %d"`, podFullName, pm.HostPort), + "-s", podIP, + "-j", string(iptablesproxy.KubeMarkMasqChain)) + + // DNAT to the podIP:containerPort + writeLine(natRules, "-A", string(chain), + "-m", "comment", "--comment", fmt.Sprintf(`"%s hostport %d"`, podFullName, pm.HostPort), + "-m", protocol, "-p", protocol, + "-j", "DNAT", fmt.Sprintf("--to-destination=%s:%d", podIP, pm.ContainerPort)) + } + + // getHostportChain should be able to provide unique hostport chain name using hash + // if there is a chain conflict or multiple Adds have been triggered for a single pod, + // filtering should be able to avoid further problem + filterChains(existingChains, newChains) + existingRules = filterRules(existingRules, newChains) + + for _, chain := range existingChains { + writeLine(natChains, chain) + } + for _, rule := range existingRules { + writeLine(natRules, rule) + } + writeLine(natRules, "COMMIT") + + if err = hm.syncIPTables(append(natChains.Bytes(), natRules.Bytes()...)); err != nil { + // clean up opened host port if encounter any error + return utilerrors.NewAggregate([]error{err, hm.closeHostports(hostportMappings)}) + } + return nil +} + +func (hm *hostportManager) Remove(id string, podPortMapping *PodPortMapping) (err error) { + if podPortMapping == nil || podPortMapping.HostNetwork { + return nil + } + + hostportMappings := gatherHostportMappings(podPortMapping) + if len(hostportMappings) <= 0 { + return nil + } + + // Ensure atomicity for port closing and iptables operations + hm.mu.Lock() + defer hm.mu.Unlock() + + var existingChains map[utiliptables.Chain]string + var existingRules []string + existingChains, existingRules, err = getExistingHostportIPTablesRules(hm.iptables) + if err != nil { + return err + } + + // Gather target hostport chains for removal + chainsToRemove := []utiliptables.Chain{} + for _, pm := range hostportMappings { + chainsToRemove = append(chainsToRemove, getHostportChain(id, pm)) + + // To preserve backward compatibility for k8s 1.5 or earlier. + // Need to remove hostport chains added by hostportSyncer if there is any + // TODO: remove this in 1.7 + chainsToRemove = append(chainsToRemove, hostportChainName(pm, getPodFullName(podPortMapping))) + } + + // remove rules that consists of target chains + remainingRules := filterRules(existingRules, chainsToRemove) + + // gather target hostport chains that exists in iptables-save result + existingChainsToRemove := []utiliptables.Chain{} + for _, chain := range chainsToRemove { + if _, ok := existingChains[chain]; ok { + existingChainsToRemove = append(existingChainsToRemove, chain) + } + } + + natChains := bytes.NewBuffer(nil) + natRules := bytes.NewBuffer(nil) + writeLine(natChains, "*nat") + for _, chain := range existingChains { + writeLine(natChains, chain) + } + for _, rule := range remainingRules { + writeLine(natRules, rule) + } + for _, chain := range existingChainsToRemove { + writeLine(natRules, "-X", string(chain)) + } + writeLine(natRules, "COMMIT") + + if err = hm.syncIPTables(append(natChains.Bytes(), natRules.Bytes()...)); err != nil { + return err + } + + // clean up opened pod host ports + return hm.closeHostports(hostportMappings) +} + +// syncIPTables executes iptables-restore with given lines +func (hm *hostportManager) syncIPTables(lines []byte) error { + glog.V(3).Infof("Restoring iptables rules: %s", lines) + err := hm.iptables.RestoreAll(lines, utiliptables.NoFlushTables, utiliptables.RestoreCounters) + if err != nil { + return fmt.Errorf("Failed to execute iptables-restore: %v", err) + } + return nil +} + +// closeHostports tries to close all the listed host ports +// TODO: move closeHostports and openHostports into a common struct +func (hm *hostportManager) closeHostports(hostportMappings []*PortMapping) error { + errList := []error{} + for _, pm := range hostportMappings { + hp := portMappingToHostport(pm) + if socket, ok := hm.hostPortMap[hp]; ok { + glog.V(2).Infof("Closing host port %s", hp.String()) + if err := socket.Close(); err != nil { + errList = append(errList, fmt.Errorf("failed to close host port %s: %v", hp.String(), err)) + continue + } + delete(hm.hostPortMap, hp) + } + } + return utilerrors.NewAggregate(errList) +} + +// getHostportChain takes id, hostport and protocol for a pod and returns associated iptables chain. +// This is computed by hashing (sha256) then encoding to base32 and truncating with the prefix +// "KUBE-HP-". We do this because IPTables Chain Names must be <= 28 chars long, and the longer +// they are the harder they are to read. +// WARNING: Please do not change this function. Otherwise, HostportManager may not be able to +// identify existing iptables chains. +func getHostportChain(id string, pm *PortMapping) utiliptables.Chain { + hash := sha256.Sum256([]byte(id + string(pm.HostPort) + string(pm.Protocol))) + encoded := base32.StdEncoding.EncodeToString(hash[:]) + return utiliptables.Chain(kubeHostportChainPrefix + encoded[:16]) +} + +// gatherHostportMappings returns all the PortMappings which has hostport for a pod +func gatherHostportMappings(podPortMapping *PodPortMapping) []*PortMapping { + mappings := []*PortMapping{} + for _, pm := range podPortMapping.PortMappings { + if pm.HostPort <= 0 { + continue + } + mappings = append(mappings, pm) + } + return mappings +} + +// getExistingHostportIPTablesRules retrieves raw data from iptables-save, parse it, +// return all the hostport related chains and rules +func getExistingHostportIPTablesRules(iptables utiliptables.Interface) (map[utiliptables.Chain]string, []string, error) { + iptablesSaveRaw, err := iptables.Save(utiliptables.TableNAT) + if err != nil { // if we failed to get any rules + return nil, nil, fmt.Errorf("failed to execute iptables-save: %v", err) + } + existingNATChains := utiliptables.GetChainLines(utiliptables.TableNAT, iptablesSaveRaw) + + existingHostportChains := make(map[utiliptables.Chain]string) + existingHostportRules := []string{} + + for chain := range existingNATChains { + if strings.HasPrefix(string(chain), string(kubeHostportsChain)) || strings.HasPrefix(string(chain), kubeHostportChainPrefix) { + existingHostportChains[chain] = existingNATChains[chain] + } + } + + for _, line := range strings.Split(string(iptablesSaveRaw), "\n") { + if strings.HasPrefix(line, fmt.Sprintf("-A %s", kubeHostportChainPrefix)) || + strings.HasPrefix(line, fmt.Sprintf("-A %s", string(kubeHostportsChain))) { + existingHostportRules = append(existingHostportRules, line) + } + } + return existingHostportChains, existingHostportRules, nil +} + +// filterRules filters input rules with input chains. Rules that did not involve any filter chain will be returned. +// The order of the input rules is important and is preserved. +func filterRules(rules []string, filters []utiliptables.Chain) []string { + filtered := []string{} + for _, rule := range rules { + skip := false + for _, filter := range filters { + if strings.Contains(rule, string(filter)) { + skip = true + break + } + } + if !skip { + filtered = append(filtered, rule) + } + } + return filtered +} + +// filterChains deletes all entries of filter chains from chain map +func filterChains(chains map[utiliptables.Chain]string, filterChains []utiliptables.Chain) { + for _, chain := range filterChains { + if _, ok := chains[chain]; ok { + delete(chains, chain) + } + } +} diff --git a/pkg/kubelet/network/hostport/hostport_manager_test.go b/pkg/kubelet/network/hostport/hostport_manager_test.go new file mode 100644 index 00000000000..3499dc2d34f --- /dev/null +++ b/pkg/kubelet/network/hostport/hostport_manager_test.go @@ -0,0 +1,197 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package hostport + +import ( + "net" + "testing" + + "github.com/stretchr/testify/assert" + "k8s.io/kubernetes/pkg/api/v1" + utiliptables "k8s.io/kubernetes/pkg/util/iptables" + "strings" +) + +func NewFakeHostportManager() HostPortManager { + return &hostportManager{ + hostPortMap: make(map[hostport]closeable), + iptables: NewFakeIPTables(), + portOpener: NewFakeSocketManager().openFakeSocket, + } +} + +func TestHostportManager(t *testing.T) { + iptables := NewFakeIPTables() + portOpener := NewFakeSocketManager() + manager := &hostportManager{ + hostPortMap: make(map[hostport]closeable), + iptables: iptables, + portOpener: portOpener.openFakeSocket, + } + + testCases := []struct { + mapping *PodPortMapping + expectError bool + }{ + { + mapping: &PodPortMapping{ + Name: "pod1", + Namespace: "ns1", + IP: net.ParseIP("10.1.1.2"), + HostNetwork: false, + PortMappings: []*PortMapping{ + { + HostPort: 8080, + ContainerPort: 80, + Protocol: v1.ProtocolTCP, + }, + { + HostPort: 8081, + ContainerPort: 81, + Protocol: v1.ProtocolUDP, + }, + }, + }, + expectError: false, + }, + { + mapping: &PodPortMapping{ + Name: "pod2", + Namespace: "ns1", + IP: net.ParseIP("10.1.1.3"), + HostNetwork: false, + PortMappings: []*PortMapping{ + { + HostPort: 8082, + ContainerPort: 80, + Protocol: v1.ProtocolTCP, + }, + { + HostPort: 8081, + ContainerPort: 81, + Protocol: v1.ProtocolUDP, + }, + }, + }, + expectError: true, + }, + { + mapping: &PodPortMapping{ + Name: "pod3", + Namespace: "ns1", + IP: net.ParseIP("10.1.1.4"), + HostNetwork: false, + PortMappings: []*PortMapping{ + { + HostPort: 8443, + ContainerPort: 443, + Protocol: v1.ProtocolTCP, + }, + }, + }, + expectError: false, + }, + } + + // Add Hostports + for _, tc := range testCases { + err := manager.Add("id", tc.mapping, "cbr0") + if tc.expectError { + assert.Error(t, err) + continue + } + assert.NoError(t, err) + } + + // Check port opened + expectedPorts := []hostport{{8080, "tcp"}, {8081, "udp"}, {8443, "tcp"}} + openedPorts := make(map[hostport]bool) + for hp, port := range portOpener.mem { + if !port.closed { + openedPorts[hp] = true + } + } + assert.EqualValues(t, len(openedPorts), len(expectedPorts)) + for _, hp := range expectedPorts { + _, ok := openedPorts[hp] + assert.EqualValues(t, true, ok) + } + + // Check Iptables-save result after adding hostports + raw, err := iptables.Save(utiliptables.TableNAT) + assert.NoError(t, err) + + lines := strings.Split(string(raw), "\n") + expectedLines := map[string]bool{ + `*nat`: true, + `:KUBE-HOSTPORTS - [0:0]`: true, + `:OUTPUT - [0:0]`: true, + `:PREROUTING - [0:0]`: true, + `:POSTROUTING - [0:0]`: true, + `:KUBE-HP-4YVONL46AKYWSKS3 - [0:0]`: true, + `:KUBE-HP-7THKRFSEH4GIIXK7 - [0:0]`: true, + `:KUBE-HP-5N7UH5JAXCVP5UJR - [0:0]`: true, + "-A KUBE-HOSTPORTS -m comment --comment \"pod3_ns1 hostport 8443\" -m tcp -p tcp --dport 8443 -j KUBE-HP-5N7UH5JAXCVP5UJR": true, + "-A KUBE-HOSTPORTS -m comment --comment \"pod1_ns1 hostport 8081\" -m udp -p udp --dport 8081 -j KUBE-HP-7THKRFSEH4GIIXK7": true, + "-A KUBE-HOSTPORTS -m comment --comment \"pod1_ns1 hostport 8080\" -m tcp -p tcp --dport 8080 -j KUBE-HP-4YVONL46AKYWSKS3": true, + "-A OUTPUT -m comment --comment \"kube hostport portals\" -m addrtype --dst-type LOCAL -j KUBE-HOSTPORTS": true, + "-A PREROUTING -m comment --comment \"kube hostport portals\" -m addrtype --dst-type LOCAL -j KUBE-HOSTPORTS": true, + "-A POSTROUTING -m comment --comment \"SNAT for localhost access to hostports\" -o cbr0 -s 127.0.0.0/8 -j MASQUERADE": true, + "-A KUBE-HP-4YVONL46AKYWSKS3 -m comment --comment \"pod1_ns1 hostport 8080\" -s 10.1.1.2/32 -j KUBE-MARK-MASQ": true, + "-A KUBE-HP-4YVONL46AKYWSKS3 -m comment --comment \"pod1_ns1 hostport 8080\" -m tcp -p tcp -j DNAT --to-destination 10.1.1.2:80": true, + "-A KUBE-HP-7THKRFSEH4GIIXK7 -m comment --comment \"pod1_ns1 hostport 8081\" -s 10.1.1.2/32 -j KUBE-MARK-MASQ": true, + "-A KUBE-HP-7THKRFSEH4GIIXK7 -m comment --comment \"pod1_ns1 hostport 8081\" -m udp -p udp -j DNAT --to-destination 10.1.1.2:81": true, + "-A KUBE-HP-5N7UH5JAXCVP5UJR -m comment --comment \"pod3_ns1 hostport 8443\" -s 10.1.1.4/32 -j KUBE-MARK-MASQ": true, + "-A KUBE-HP-5N7UH5JAXCVP5UJR -m comment --comment \"pod3_ns1 hostport 8443\" -m tcp -p tcp -j DNAT --to-destination 10.1.1.4:443": true, + `COMMIT`: true, + } + for _, line := range lines { + if len(strings.TrimSpace(line)) > 0 { + _, ok := expectedLines[strings.TrimSpace(line)] + assert.EqualValues(t, true, ok) + } + } + + // Remove all added hostports + for _, tc := range testCases { + if !tc.expectError { + err := manager.Remove("id", tc.mapping) + assert.NoError(t, err) + } + } + + // Check Iptables-save result after deleting hostports + raw, err = iptables.Save(utiliptables.TableNAT) + assert.NoError(t, err) + lines = strings.Split(string(raw), "\n") + remainingChains := make(map[string]bool) + for _, line := range lines { + if strings.HasPrefix(line, ":") { + remainingChains[strings.TrimSpace(line)] = true + } + } + expectDeletedChains := []string{"KUBE-HP-4YVONL46AKYWSKS3", "KUBE-HP-7THKRFSEH4GIIXK7", "KUBE-HP-5N7UH5JAXCVP5UJR"} + for _, chain := range expectDeletedChains { + _, ok := remainingChains[chain] + assert.EqualValues(t, false, ok) + } + + // check if all ports are closed + for _, port := range portOpener.mem { + assert.EqualValues(t, true, port.closed) + } +}