From aabdaa984f3cff92421bb87197e1767d51407333 Mon Sep 17 00:00:00 2001 From: Minhan Xia Date: Tue, 31 Jan 2017 10:31:37 -0800 Subject: [PATCH 1/4] refactor hostport logic --- pkg/kubelet/network/hostport/BUILD | 10 +- pkg/kubelet/network/hostport/hostport.go | 171 ++++++++++++++++++ .../network/hostport/hostport_syncer.go | 105 +---------- .../network/hostport/hostport_syncer_test.go | 22 +-- pkg/kubelet/network/hostport/hostport_test.go | 152 ++++++++++++++++ 5 files changed, 337 insertions(+), 123 deletions(-) create mode 100644 pkg/kubelet/network/hostport/hostport.go create mode 100644 pkg/kubelet/network/hostport/hostport_test.go diff --git a/pkg/kubelet/network/hostport/BUILD b/pkg/kubelet/network/hostport/BUILD index 64d9b90702f..79e9d6873b5 100644 --- a/pkg/kubelet/network/hostport/BUILD +++ b/pkg/kubelet/network/hostport/BUILD @@ -12,6 +12,8 @@ go_library( name = "go_default_library", srcs = [ "fake_iptables.go", + "hostport.go", + "hostport_manager.go", "hostport_syncer.go", ], tags = ["automanaged"], @@ -22,17 +24,23 @@ go_library( "//pkg/util/exec:go_default_library", "//pkg/util/iptables:go_default_library", "//vendor:github.com/golang/glog", + "//vendor:k8s.io/apimachinery/pkg/util/errors", ], ) go_test( name = "go_default_test", - srcs = ["hostport_syncer_test.go"], + srcs = [ + "hostport_manager_test.go", + "hostport_syncer_test.go", + "hostport_test.go", + ], library = ":go_default_library", tags = ["automanaged"], deps = [ "//pkg/api/v1:go_default_library", "//pkg/util/iptables:go_default_library", + "//vendor:github.com/stretchr/testify/assert", ], ) diff --git a/pkg/kubelet/network/hostport/hostport.go b/pkg/kubelet/network/hostport/hostport.go new file mode 100644 index 00000000000..374df8eb6c8 --- /dev/null +++ b/pkg/kubelet/network/hostport/hostport.go @@ -0,0 +1,171 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package hostport + +import ( + "fmt" + "github.com/golang/glog" + "net" + "strings" + + "k8s.io/kubernetes/pkg/api/v1" + utiliptables "k8s.io/kubernetes/pkg/util/iptables" +) + +const ( + // the hostport chain + kubeHostportsChain utiliptables.Chain = "KUBE-HOSTPORTS" + // prefix for hostport chains + kubeHostportChainPrefix string = "KUBE-HP-" +) + +// PortMapping represents a network port in a container +type PortMapping struct { + Name string + HostPort int32 + ContainerPort int32 + Protocol v1.Protocol + HostIP string +} + +// PodPortMapping represents a pod's network state and associated container port mappings +type PodPortMapping struct { + Namespace string + Name string + PortMappings []*PortMapping + HostNetwork bool + IP net.IP +} + +type hostport struct { + port int32 + protocol string +} + +type hostportOpener func(*hostport) (closeable, error) + +type closeable interface { + Close() error +} + +func openLocalPort(hp *hostport) (closeable, error) { + // For ports on node IPs, open the actual port and hold it, even though we + // use iptables to redirect traffic. + // This ensures a) that it's safe to use that port and b) that (a) stays + // true. The risk is that some process on the node (e.g. sshd or kubelet) + // is using a port and we give that same port out to a Service. That would + // be bad because iptables would silently claim the traffic but the process + // would never know. + // NOTE: We should not need to have a real listen()ing socket - bind() + // should be enough, but I can't figure out a way to e2e test without + // it. Tools like 'ss' and 'netstat' do not show sockets that are + // bind()ed but not listen()ed, and at least the default debian netcat + // has no way to avoid about 10 seconds of retries. + var socket closeable + switch hp.protocol { + case "tcp": + listener, err := net.Listen("tcp", fmt.Sprintf(":%d", hp.port)) + if err != nil { + return nil, err + } + socket = listener + case "udp": + addr, err := net.ResolveUDPAddr("udp", fmt.Sprintf(":%d", hp.port)) + if err != nil { + return nil, err + } + conn, err := net.ListenUDP("udp", addr) + if err != nil { + return nil, err + } + socket = conn + default: + return nil, fmt.Errorf("unknown protocol %q", hp.protocol) + } + glog.V(3).Infof("Opened local port %s", hp.String()) + return socket, nil +} + +// openHostports opens all given hostports using the given hostportOpener +// If encounter any error, clean up and return the error +// If all ports are opened successfully, return the hostport and socket mapping +// TODO: move openHostports and closeHostports into a common struct +func openHostports(portOpener hostportOpener, podPortMapping *PodPortMapping) (map[hostport]closeable, error) { + var retErr error + ports := make(map[hostport]closeable) + for _, pm := range podPortMapping.PortMappings { + if pm.HostPort <= 0 { + continue + } + hp := portMappingToHostport(pm) + socket, err := portOpener(&hp) + if err != nil { + retErr = fmt.Errorf("cannot open hostport %d for pod %s: %v", pm.HostPort, getPodFullName(podPortMapping), err) + break + } + ports[hp] = socket + } + + // If encounter any error, close all hostports that just got opened. + if retErr != nil { + for hp, socket := range ports { + if err := socket.Close(); err != nil { + glog.Errorf("Cannot clean up hostport %d for pod %s: %v", hp.port, getPodFullName(podPortMapping), err) + } + } + return nil, retErr + } + return ports, nil +} + +// portMappingToHostport creates hostport structure based on input portmapping +func portMappingToHostport(portMapping *PortMapping) hostport { + return hostport{ + port: portMapping.HostPort, + protocol: strings.ToLower(string(portMapping.Protocol)), + } +} + +// ensureKubeHostportChains ensures the KUBE-HOSTPORTS chain is setup correctly +func ensureKubeHostportChains(iptables utiliptables.Interface, natInterfaceName string) error { + glog.V(4).Info("Ensuring kubelet hostport chains") + // Ensure kubeHostportChain + if _, err := iptables.EnsureChain(utiliptables.TableNAT, kubeHostportsChain); err != nil { + return fmt.Errorf("Failed to ensure that %s chain %s exists: %v", utiliptables.TableNAT, kubeHostportsChain, err) + } + tableChainsNeedJumpServices := []struct { + table utiliptables.Table + chain utiliptables.Chain + }{ + {utiliptables.TableNAT, utiliptables.ChainOutput}, + {utiliptables.TableNAT, utiliptables.ChainPrerouting}, + } + args := []string{"-m", "comment", "--comment", "kube hostport portals", + "-m", "addrtype", "--dst-type", "LOCAL", + "-j", string(kubeHostportsChain)} + for _, tc := range tableChainsNeedJumpServices { + if _, err := iptables.EnsureRule(utiliptables.Prepend, tc.table, tc.chain, args...); err != nil { + return fmt.Errorf("Failed to ensure that %s chain %s jumps to %s: %v", tc.table, tc.chain, kubeHostportsChain, err) + } + } + // Need to SNAT traffic from localhost + args = []string{"-m", "comment", "--comment", "SNAT for localhost access to hostports", "-o", natInterfaceName, "-s", "127.0.0.0/8", "-j", "MASQUERADE"} + if _, err := iptables.EnsureRule(utiliptables.Append, utiliptables.TableNAT, utiliptables.ChainPostrouting, args...); err != nil { + return fmt.Errorf("Failed to ensure that %s chain %s jumps to MASQUERADE: %v", utiliptables.TableNAT, utiliptables.ChainPostrouting, err) + } + return nil +} diff --git a/pkg/kubelet/network/hostport/hostport_syncer.go b/pkg/kubelet/network/hostport/hostport_syncer.go index a79949c1ff4..bba67095dd8 100644 --- a/pkg/kubelet/network/hostport/hostport_syncer.go +++ b/pkg/kubelet/network/hostport/hostport_syncer.go @@ -21,26 +21,17 @@ import ( "crypto/sha256" "encoding/base32" "fmt" - "net" "strings" "time" "github.com/golang/glog" - "k8s.io/kubernetes/pkg/api/v1" iptablesproxy "k8s.io/kubernetes/pkg/proxy/iptables" utildbus "k8s.io/kubernetes/pkg/util/dbus" utilexec "k8s.io/kubernetes/pkg/util/exec" utiliptables "k8s.io/kubernetes/pkg/util/iptables" ) -const ( - // the hostport chain - kubeHostportsChain utiliptables.Chain = "KUBE-HOSTPORTS" - // prefix for hostport chains - kubeHostportChainPrefix string = "KUBE-HP-" -) - // HostportSyncer takes a list of PodPortMappings and implements hostport all at once type HostportSyncer interface { // SyncHostports gathers all hostports on node and setup iptables rules to enable them. @@ -52,26 +43,6 @@ type HostportSyncer interface { OpenPodHostportsAndSync(newPortMapping *PodPortMapping, natInterfaceName string, activePodPortMappings []*PodPortMapping) error } -// PortMapping represents a network port in a container -type PortMapping struct { - Name string - HostPort int32 - ContainerPort int32 - Protocol v1.Protocol - HostIP string -} - -// PodPortMapping represents a pod's network state and associated container port mappings -type PodPortMapping struct { - Namespace string - Name string - PortMappings []*PortMapping - HostNetwork bool - IP net.IP -} - -type hostportOpener func(*hostport) (closeable, error) - type hostportSyncer struct { hostPortMap map[hostport]closeable iptables utiliptables.Interface @@ -87,15 +58,6 @@ func NewHostportSyncer() HostportSyncer { } } -type closeable interface { - Close() error -} - -type hostport struct { - port int32 - protocol string -} - type targetPod struct { podFullName string podIP string @@ -120,7 +82,7 @@ func (h *hostportSyncer) openHostports(podHostportMapping *PodPortMapping) error } socket, err := h.portOpener(&hp) if err != nil { - retErr = fmt.Errorf("Cannot open hostport %d for pod %s: %v", port.HostPort, getPodFullName(podHostportMapping), err) + retErr = fmt.Errorf("cannot open hostport %d for pod %s: %v", port.HostPort, getPodFullName(podHostportMapping), err) break } ports[hp] = socket @@ -222,31 +184,8 @@ func (h *hostportSyncer) SyncHostports(natInterfaceName string, activePodPortMap return err } - glog.V(4).Info("Ensuring kubelet hostport chains") - // Ensure kubeHostportChain - if _, err := h.iptables.EnsureChain(utiliptables.TableNAT, kubeHostportsChain); err != nil { - return fmt.Errorf("Failed to ensure that %s chain %s exists: %v", utiliptables.TableNAT, kubeHostportsChain, err) - } - tableChainsNeedJumpServices := []struct { - table utiliptables.Table - chain utiliptables.Chain - }{ - {utiliptables.TableNAT, utiliptables.ChainOutput}, - {utiliptables.TableNAT, utiliptables.ChainPrerouting}, - } - args := []string{"-m", "comment", "--comment", "kube hostport portals", - "-m", "addrtype", "--dst-type", "LOCAL", - "-j", string(kubeHostportsChain)} - for _, tc := range tableChainsNeedJumpServices { - if _, err := h.iptables.EnsureRule(utiliptables.Prepend, tc.table, tc.chain, args...); err != nil { - return fmt.Errorf("Failed to ensure that %s chain %s jumps to %s: %v", tc.table, tc.chain, kubeHostportsChain, err) - } - } - // Need to SNAT traffic from localhost - args = []string{"-m", "comment", "--comment", "SNAT for localhost access to hostports", "-o", natInterfaceName, "-s", "127.0.0.0/8", "-j", "MASQUERADE"} - if _, err := h.iptables.EnsureRule(utiliptables.Append, utiliptables.TableNAT, utiliptables.ChainPostrouting, args...); err != nil { - return fmt.Errorf("Failed to ensure that %s chain %s jumps to MASQUERADE: %v", utiliptables.TableNAT, utiliptables.ChainPostrouting, err) - } + // Ensure KUBE-HOSTPORTS chains + ensureKubeHostportChains(h.iptables, natInterfaceName) // Get iptables-save output so we can check for existing chains and rules. // This will be a map of chain name to chain with rules as stored in iptables-save/iptables-restore @@ -341,44 +280,6 @@ func (h *hostportSyncer) SyncHostports(natInterfaceName string, activePodPortMap return nil } -func openLocalPort(hp *hostport) (closeable, error) { - // For ports on node IPs, open the actual port and hold it, even though we - // use iptables to redirect traffic. - // This ensures a) that it's safe to use that port and b) that (a) stays - // true. The risk is that some process on the node (e.g. sshd or kubelet) - // is using a port and we give that same port out to a Service. That would - // be bad because iptables would silently claim the traffic but the process - // would never know. - // NOTE: We should not need to have a real listen()ing socket - bind() - // should be enough, but I can't figure out a way to e2e test without - // it. Tools like 'ss' and 'netstat' do not show sockets that are - // bind()ed but not listen()ed, and at least the default debian netcat - // has no way to avoid about 10 seconds of retries. - var socket closeable - switch hp.protocol { - case "tcp": - listener, err := net.Listen("tcp", fmt.Sprintf(":%d", hp.port)) - if err != nil { - return nil, err - } - socket = listener - case "udp": - addr, err := net.ResolveUDPAddr("udp", fmt.Sprintf(":%d", hp.port)) - if err != nil { - return nil, err - } - conn, err := net.ListenUDP("udp", addr) - if err != nil { - return nil, err - } - socket = conn - default: - return nil, fmt.Errorf("unknown protocol %q", hp.protocol) - } - glog.V(3).Infof("Opened local port %s", hp.String()) - return socket, nil -} - // cleanupHostportMap closes obsolete hostports func (h *hostportSyncer) cleanupHostportMap(containerPortMap map[*PortMapping]targetPod) { // compute hostports that are supposed to be open diff --git a/pkg/kubelet/network/hostport/hostport_syncer_test.go b/pkg/kubelet/network/hostport/hostport_syncer_test.go index e0ce2c5820f..b24025ac692 100644 --- a/pkg/kubelet/network/hostport/hostport_syncer_test.go +++ b/pkg/kubelet/network/hostport/hostport_syncer_test.go @@ -17,7 +17,6 @@ limitations under the License. package hostport import ( - "fmt" "net" "reflect" "strings" @@ -27,24 +26,6 @@ import ( utiliptables "k8s.io/kubernetes/pkg/util/iptables" ) -type fakeSocket struct { - port int32 - protocol string - closed bool -} - -func (f *fakeSocket) Close() error { - if f.closed { - return fmt.Errorf("Socket %q.%s already closed!", f.port, f.protocol) - } - f.closed = true - return nil -} - -func openFakeSocket(hp *hostport) (closeable, error) { - return &fakeSocket{hp.port, hp.protocol, false}, nil -} - type ruleMatch struct { hostport int chain string @@ -53,11 +34,12 @@ type ruleMatch struct { func TestOpenPodHostports(t *testing.T) { fakeIPTables := NewFakeIPTables() + fakeOpener := NewFakeSocketManager() h := &hostportSyncer{ hostPortMap: make(map[hostport]closeable), iptables: fakeIPTables, - portOpener: openFakeSocket, + portOpener: fakeOpener.openFakeSocket, } tests := []struct { diff --git a/pkg/kubelet/network/hostport/hostport_test.go b/pkg/kubelet/network/hostport/hostport_test.go new file mode 100644 index 00000000000..7c6d33c6593 --- /dev/null +++ b/pkg/kubelet/network/hostport/hostport_test.go @@ -0,0 +1,152 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package hostport + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + "k8s.io/kubernetes/pkg/api/v1" + utiliptables "k8s.io/kubernetes/pkg/util/iptables" +) + +type fakeSocket struct { + port int32 + protocol string + closed bool +} + +func (f *fakeSocket) Close() error { + if f.closed { + return fmt.Errorf("Socket %q.%s already closed!", f.port, f.protocol) + } + f.closed = true + return nil +} + +func NewFakeSocketManager() *fakeSocketManager { + return &fakeSocketManager{mem: make(map[hostport]*fakeSocket)} +} + +type fakeSocketManager struct { + mem map[hostport]*fakeSocket +} + +func (f *fakeSocketManager) openFakeSocket(hp *hostport) (closeable, error) { + if socket, ok := f.mem[*hp]; ok && !socket.closed { + return nil, fmt.Errorf("hostport is occupied") + } + fs := &fakeSocket{hp.port, hp.protocol, false} + f.mem[*hp] = fs + return fs, nil +} + +func TestOpenHostports(t *testing.T) { + opener := NewFakeSocketManager() + testCases := []struct { + podPortMapping *PodPortMapping + expectError bool + }{ + { + &PodPortMapping{ + Namespace: "ns1", + Name: "n0", + }, + false, + }, + { + &PodPortMapping{ + Namespace: "ns1", + Name: "n1", + PortMappings: []*PortMapping{ + {HostPort: 80, Protocol: v1.Protocol("TCP")}, + {HostPort: 8080, Protocol: v1.Protocol("TCP")}, + {HostPort: 443, Protocol: v1.Protocol("TCP")}, + }, + }, + false, + }, + { + &PodPortMapping{ + Namespace: "ns1", + Name: "n2", + PortMappings: []*PortMapping{ + {HostPort: 80, Protocol: v1.Protocol("TCP")}, + }, + }, + true, + }, + { + &PodPortMapping{ + Namespace: "ns1", + Name: "n3", + PortMappings: []*PortMapping{ + {HostPort: 8081, Protocol: v1.Protocol("TCP")}, + {HostPort: 8080, Protocol: v1.Protocol("TCP")}, + }, + }, + true, + }, + { + &PodPortMapping{ + Namespace: "ns1", + Name: "n3", + PortMappings: []*PortMapping{ + {HostPort: 8081, Protocol: v1.Protocol("TCP")}, + }, + }, + false, + }, + } + + for _, tc := range testCases { + mapping, err := openHostports(opener.openFakeSocket, tc.podPortMapping) + if tc.expectError { + assert.Error(t, err) + continue + } + assert.NoError(t, err) + assert.EqualValues(t, len(mapping), len(tc.podPortMapping.PortMappings)) + } +} + +func TestEnsureKubeHostportChains(t *testing.T) { + interfaceName := "cbr0" + builtinChains := []string{"PREROUTING", "OUTPUT"} + jumpRule := "-m comment --comment \"kube hostport portals\" -m addrtype --dst-type LOCAL -j KUBE-HOSTPORTS" + masqRule := "-m comment --comment \"SNAT for localhost access to hostports\" -o cbr0 -s 127.0.0.0/8 -j MASQUERADE" + + fakeIPTables := NewFakeIPTables() + assert.NoError(t, ensureKubeHostportChains(fakeIPTables, interfaceName)) + + _, _, err := fakeIPTables.getChain(utiliptables.TableNAT, utiliptables.Chain("KUBE-HOSTPORTS")) + assert.NoError(t, err) + + _, chain, err := fakeIPTables.getChain(utiliptables.TableNAT, utiliptables.ChainPostrouting) + assert.NoError(t, err) + assert.EqualValues(t, len(chain.rules), 1) + assert.Contains(t, chain.rules[0], masqRule) + + for _, chainName := range builtinChains { + _, chain, err := fakeIPTables.getChain(utiliptables.TableNAT, utiliptables.Chain(chainName)) + assert.NoError(t, err) + assert.EqualValues(t, len(chain.rules), 1) + assert.Contains(t, chain.rules[0], jumpRule) + } + +} From 8e7219cbb4d66758d9e583631c9d59f780112508 Mon Sep 17 00:00:00 2001 From: Minhan Xia Date: Thu, 2 Feb 2017 19:50:37 -0800 Subject: [PATCH 2/4] add hostport manager --- pkg/kubelet/network/hostport/fake_iptables.go | 13 +- .../network/hostport/hostport_manager.go | 328 ++++++++++++++++++ .../network/hostport/hostport_manager_test.go | 197 +++++++++++ 3 files changed, 537 insertions(+), 1 deletion(-) create mode 100644 pkg/kubelet/network/hostport/hostport_manager.go create mode 100644 pkg/kubelet/network/hostport/hostport_manager_test.go diff --git a/pkg/kubelet/network/hostport/fake_iptables.go b/pkg/kubelet/network/hostport/fake_iptables.go index dad4b5cd0f1..d8c05baddc2 100644 --- a/pkg/kubelet/network/hostport/fake_iptables.go +++ b/pkg/kubelet/network/hostport/fake_iptables.go @@ -300,11 +300,22 @@ func (f *fakeIPTables) restore(restoreTableName utiliptables.Table, data []byte, if err != nil { return err } - } else if strings.HasPrefix(line, "-X") { + } else if strings.HasPrefix(line, "-I") { parts := strings.Split(line, " ") if len(parts) < 3 { return fmt.Errorf("Invalid iptables rule '%s'", line) } + chainName := utiliptables.Chain(parts[1]) + rule := strings.TrimPrefix(line, fmt.Sprintf("-I %s ", chainName)) + _, err := f.ensureRule(utiliptables.Prepend, tableName, chainName, rule) + if err != nil { + return err + } + } else if strings.HasPrefix(line, "-X") { + parts := strings.Split(line, " ") + if len(parts) < 2 { + return fmt.Errorf("Invalid iptables rule '%s'", line) + } if err := f.DeleteChain(tableName, utiliptables.Chain(parts[1])); err != nil { return err } diff --git a/pkg/kubelet/network/hostport/hostport_manager.go b/pkg/kubelet/network/hostport/hostport_manager.go new file mode 100644 index 00000000000..5c18f8fce17 --- /dev/null +++ b/pkg/kubelet/network/hostport/hostport_manager.go @@ -0,0 +1,328 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package hostport + +import ( + "bytes" + "crypto/sha256" + "encoding/base32" + "fmt" + "strings" + "sync" + + "github.com/golang/glog" + utilerrors "k8s.io/apimachinery/pkg/util/errors" + iptablesproxy "k8s.io/kubernetes/pkg/proxy/iptables" + utildbus "k8s.io/kubernetes/pkg/util/dbus" + utilexec "k8s.io/kubernetes/pkg/util/exec" + utiliptables "k8s.io/kubernetes/pkg/util/iptables" +) + +// HostPortManager is an interface for adding and removing hostport for a given pod sandbox. +type HostPortManager interface { + // Add implements port mappings. + // id should be a unique identifier for a pod, e.g. podSandboxID. + // podPortMapping is the associated port mapping information for the pod. + // natInterfaceName is the interface that localhost used to talk to the given pod. + Add(id string, podPortMapping *PodPortMapping, natInterfaceName string) error + // Remove cleans up matching port mappings + // Remove must be able to clean up port mappings without pod IP + Remove(id string, podPortMapping *PodPortMapping) error +} + +type hostportManager struct { + hostPortMap map[hostport]closeable + iptables utiliptables.Interface + portOpener hostportOpener + mu sync.Mutex +} + +func NewHostportManager() HostPortManager { + iptInterface := utiliptables.New(utilexec.New(), utildbus.New(), utiliptables.ProtocolIpv4) + return &hostportManager{ + hostPortMap: make(map[hostport]closeable), + iptables: iptInterface, + portOpener: openLocalPort, + } +} + +func (hm *hostportManager) Add(id string, podPortMapping *PodPortMapping, natInterfaceName string) (err error) { + if podPortMapping == nil || podPortMapping.HostNetwork { + return nil + } + podFullName := getPodFullName(podPortMapping) + + // skip if there is no hostport needed + hostportMappings := gatherHostportMappings(podPortMapping) + if len(hostportMappings) == 0 { + return nil + } + + if podPortMapping.IP.To4() == nil { + return fmt.Errorf("invalid or missing IP of pod %s", podFullName) + } + podIP := podPortMapping.IP.String() + + if err = ensureKubeHostportChains(hm.iptables, natInterfaceName); err != nil { + return err + } + + // Ensure atomicity for port opening and iptables operations + hm.mu.Lock() + defer hm.mu.Unlock() + + // try to open hostports + ports, err := openHostports(hm.portOpener, podPortMapping) + if err != nil { + return err + } + for hostport, socket := range ports { + hm.hostPortMap[hostport] = socket + } + + natChains := bytes.NewBuffer(nil) + natRules := bytes.NewBuffer(nil) + writeLine(natChains, "*nat") + + existingChains, existingRules, err := getExistingHostportIPTablesRules(hm.iptables) + if err != nil { + // clean up opened host port if encounter any error + return utilerrors.NewAggregate([]error{err, hm.closeHostports(hostportMappings)}) + } + + newChains := []utiliptables.Chain{} + for _, pm := range hostportMappings { + protocol := strings.ToLower(string(pm.Protocol)) + chain := getHostportChain(id, pm) + newChains = append(newChains, chain) + + // Add new hostport chain + writeLine(natChains, utiliptables.MakeChainLine(chain)) + + // Prepend the new chain to KUBE-HOSTPORTS + // This avoids any leaking iptables rule that takes up the same port + writeLine(natRules, "-I", string(kubeHostportsChain), + "-m", "comment", "--comment", fmt.Sprintf(`"%s hostport %d"`, podFullName, pm.HostPort), + "-m", protocol, "-p", protocol, "--dport", fmt.Sprintf("%d", pm.HostPort), + "-j", string(chain), + ) + + // SNAT if the traffic comes from the pod itself + writeLine(natRules, "-A", string(chain), + "-m", "comment", "--comment", fmt.Sprintf(`"%s hostport %d"`, podFullName, pm.HostPort), + "-s", podIP, + "-j", string(iptablesproxy.KubeMarkMasqChain)) + + // DNAT to the podIP:containerPort + writeLine(natRules, "-A", string(chain), + "-m", "comment", "--comment", fmt.Sprintf(`"%s hostport %d"`, podFullName, pm.HostPort), + "-m", protocol, "-p", protocol, + "-j", "DNAT", fmt.Sprintf("--to-destination=%s:%d", podIP, pm.ContainerPort)) + } + + // getHostportChain should be able to provide unique hostport chain name using hash + // if there is a chain conflict or multiple Adds have been triggered for a single pod, + // filtering should be able to avoid further problem + filterChains(existingChains, newChains) + existingRules = filterRules(existingRules, newChains) + + for _, chain := range existingChains { + writeLine(natChains, chain) + } + for _, rule := range existingRules { + writeLine(natRules, rule) + } + writeLine(natRules, "COMMIT") + + if err = hm.syncIPTables(append(natChains.Bytes(), natRules.Bytes()...)); err != nil { + // clean up opened host port if encounter any error + return utilerrors.NewAggregate([]error{err, hm.closeHostports(hostportMappings)}) + } + return nil +} + +func (hm *hostportManager) Remove(id string, podPortMapping *PodPortMapping) (err error) { + if podPortMapping == nil || podPortMapping.HostNetwork { + return nil + } + + hostportMappings := gatherHostportMappings(podPortMapping) + if len(hostportMappings) <= 0 { + return nil + } + + // Ensure atomicity for port closing and iptables operations + hm.mu.Lock() + defer hm.mu.Unlock() + + var existingChains map[utiliptables.Chain]string + var existingRules []string + existingChains, existingRules, err = getExistingHostportIPTablesRules(hm.iptables) + if err != nil { + return err + } + + // Gather target hostport chains for removal + chainsToRemove := []utiliptables.Chain{} + for _, pm := range hostportMappings { + chainsToRemove = append(chainsToRemove, getHostportChain(id, pm)) + + // To preserve backward compatibility for k8s 1.5 or earlier. + // Need to remove hostport chains added by hostportSyncer if there is any + // TODO: remove this in 1.7 + chainsToRemove = append(chainsToRemove, hostportChainName(pm, getPodFullName(podPortMapping))) + } + + // remove rules that consists of target chains + remainingRules := filterRules(existingRules, chainsToRemove) + + // gather target hostport chains that exists in iptables-save result + existingChainsToRemove := []utiliptables.Chain{} + for _, chain := range chainsToRemove { + if _, ok := existingChains[chain]; ok { + existingChainsToRemove = append(existingChainsToRemove, chain) + } + } + + natChains := bytes.NewBuffer(nil) + natRules := bytes.NewBuffer(nil) + writeLine(natChains, "*nat") + for _, chain := range existingChains { + writeLine(natChains, chain) + } + for _, rule := range remainingRules { + writeLine(natRules, rule) + } + for _, chain := range existingChainsToRemove { + writeLine(natRules, "-X", string(chain)) + } + writeLine(natRules, "COMMIT") + + if err = hm.syncIPTables(append(natChains.Bytes(), natRules.Bytes()...)); err != nil { + return err + } + + // clean up opened pod host ports + return hm.closeHostports(hostportMappings) +} + +// syncIPTables executes iptables-restore with given lines +func (hm *hostportManager) syncIPTables(lines []byte) error { + glog.V(3).Infof("Restoring iptables rules: %s", lines) + err := hm.iptables.RestoreAll(lines, utiliptables.NoFlushTables, utiliptables.RestoreCounters) + if err != nil { + return fmt.Errorf("Failed to execute iptables-restore: %v", err) + } + return nil +} + +// closeHostports tries to close all the listed host ports +// TODO: move closeHostports and openHostports into a common struct +func (hm *hostportManager) closeHostports(hostportMappings []*PortMapping) error { + errList := []error{} + for _, pm := range hostportMappings { + hp := portMappingToHostport(pm) + if socket, ok := hm.hostPortMap[hp]; ok { + glog.V(2).Infof("Closing host port %s", hp.String()) + if err := socket.Close(); err != nil { + errList = append(errList, fmt.Errorf("failed to close host port %s: %v", hp.String(), err)) + continue + } + delete(hm.hostPortMap, hp) + } + } + return utilerrors.NewAggregate(errList) +} + +// getHostportChain takes id, hostport and protocol for a pod and returns associated iptables chain. +// This is computed by hashing (sha256) then encoding to base32 and truncating with the prefix +// "KUBE-HP-". We do this because IPTables Chain Names must be <= 28 chars long, and the longer +// they are the harder they are to read. +// WARNING: Please do not change this function. Otherwise, HostportManager may not be able to +// identify existing iptables chains. +func getHostportChain(id string, pm *PortMapping) utiliptables.Chain { + hash := sha256.Sum256([]byte(id + string(pm.HostPort) + string(pm.Protocol))) + encoded := base32.StdEncoding.EncodeToString(hash[:]) + return utiliptables.Chain(kubeHostportChainPrefix + encoded[:16]) +} + +// gatherHostportMappings returns all the PortMappings which has hostport for a pod +func gatherHostportMappings(podPortMapping *PodPortMapping) []*PortMapping { + mappings := []*PortMapping{} + for _, pm := range podPortMapping.PortMappings { + if pm.HostPort <= 0 { + continue + } + mappings = append(mappings, pm) + } + return mappings +} + +// getExistingHostportIPTablesRules retrieves raw data from iptables-save, parse it, +// return all the hostport related chains and rules +func getExistingHostportIPTablesRules(iptables utiliptables.Interface) (map[utiliptables.Chain]string, []string, error) { + iptablesSaveRaw, err := iptables.Save(utiliptables.TableNAT) + if err != nil { // if we failed to get any rules + return nil, nil, fmt.Errorf("failed to execute iptables-save: %v", err) + } + existingNATChains := utiliptables.GetChainLines(utiliptables.TableNAT, iptablesSaveRaw) + + existingHostportChains := make(map[utiliptables.Chain]string) + existingHostportRules := []string{} + + for chain := range existingNATChains { + if strings.HasPrefix(string(chain), string(kubeHostportsChain)) || strings.HasPrefix(string(chain), kubeHostportChainPrefix) { + existingHostportChains[chain] = existingNATChains[chain] + } + } + + for _, line := range strings.Split(string(iptablesSaveRaw), "\n") { + if strings.HasPrefix(line, fmt.Sprintf("-A %s", kubeHostportChainPrefix)) || + strings.HasPrefix(line, fmt.Sprintf("-A %s", string(kubeHostportsChain))) { + existingHostportRules = append(existingHostportRules, line) + } + } + return existingHostportChains, existingHostportRules, nil +} + +// filterRules filters input rules with input chains. Rules that did not involve any filter chain will be returned. +// The order of the input rules is important and is preserved. +func filterRules(rules []string, filters []utiliptables.Chain) []string { + filtered := []string{} + for _, rule := range rules { + skip := false + for _, filter := range filters { + if strings.Contains(rule, string(filter)) { + skip = true + break + } + } + if !skip { + filtered = append(filtered, rule) + } + } + return filtered +} + +// filterChains deletes all entries of filter chains from chain map +func filterChains(chains map[utiliptables.Chain]string, filterChains []utiliptables.Chain) { + for _, chain := range filterChains { + if _, ok := chains[chain]; ok { + delete(chains, chain) + } + } +} diff --git a/pkg/kubelet/network/hostport/hostport_manager_test.go b/pkg/kubelet/network/hostport/hostport_manager_test.go new file mode 100644 index 00000000000..3499dc2d34f --- /dev/null +++ b/pkg/kubelet/network/hostport/hostport_manager_test.go @@ -0,0 +1,197 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package hostport + +import ( + "net" + "testing" + + "github.com/stretchr/testify/assert" + "k8s.io/kubernetes/pkg/api/v1" + utiliptables "k8s.io/kubernetes/pkg/util/iptables" + "strings" +) + +func NewFakeHostportManager() HostPortManager { + return &hostportManager{ + hostPortMap: make(map[hostport]closeable), + iptables: NewFakeIPTables(), + portOpener: NewFakeSocketManager().openFakeSocket, + } +} + +func TestHostportManager(t *testing.T) { + iptables := NewFakeIPTables() + portOpener := NewFakeSocketManager() + manager := &hostportManager{ + hostPortMap: make(map[hostport]closeable), + iptables: iptables, + portOpener: portOpener.openFakeSocket, + } + + testCases := []struct { + mapping *PodPortMapping + expectError bool + }{ + { + mapping: &PodPortMapping{ + Name: "pod1", + Namespace: "ns1", + IP: net.ParseIP("10.1.1.2"), + HostNetwork: false, + PortMappings: []*PortMapping{ + { + HostPort: 8080, + ContainerPort: 80, + Protocol: v1.ProtocolTCP, + }, + { + HostPort: 8081, + ContainerPort: 81, + Protocol: v1.ProtocolUDP, + }, + }, + }, + expectError: false, + }, + { + mapping: &PodPortMapping{ + Name: "pod2", + Namespace: "ns1", + IP: net.ParseIP("10.1.1.3"), + HostNetwork: false, + PortMappings: []*PortMapping{ + { + HostPort: 8082, + ContainerPort: 80, + Protocol: v1.ProtocolTCP, + }, + { + HostPort: 8081, + ContainerPort: 81, + Protocol: v1.ProtocolUDP, + }, + }, + }, + expectError: true, + }, + { + mapping: &PodPortMapping{ + Name: "pod3", + Namespace: "ns1", + IP: net.ParseIP("10.1.1.4"), + HostNetwork: false, + PortMappings: []*PortMapping{ + { + HostPort: 8443, + ContainerPort: 443, + Protocol: v1.ProtocolTCP, + }, + }, + }, + expectError: false, + }, + } + + // Add Hostports + for _, tc := range testCases { + err := manager.Add("id", tc.mapping, "cbr0") + if tc.expectError { + assert.Error(t, err) + continue + } + assert.NoError(t, err) + } + + // Check port opened + expectedPorts := []hostport{{8080, "tcp"}, {8081, "udp"}, {8443, "tcp"}} + openedPorts := make(map[hostport]bool) + for hp, port := range portOpener.mem { + if !port.closed { + openedPorts[hp] = true + } + } + assert.EqualValues(t, len(openedPorts), len(expectedPorts)) + for _, hp := range expectedPorts { + _, ok := openedPorts[hp] + assert.EqualValues(t, true, ok) + } + + // Check Iptables-save result after adding hostports + raw, err := iptables.Save(utiliptables.TableNAT) + assert.NoError(t, err) + + lines := strings.Split(string(raw), "\n") + expectedLines := map[string]bool{ + `*nat`: true, + `:KUBE-HOSTPORTS - [0:0]`: true, + `:OUTPUT - [0:0]`: true, + `:PREROUTING - [0:0]`: true, + `:POSTROUTING - [0:0]`: true, + `:KUBE-HP-4YVONL46AKYWSKS3 - [0:0]`: true, + `:KUBE-HP-7THKRFSEH4GIIXK7 - [0:0]`: true, + `:KUBE-HP-5N7UH5JAXCVP5UJR - [0:0]`: true, + "-A KUBE-HOSTPORTS -m comment --comment \"pod3_ns1 hostport 8443\" -m tcp -p tcp --dport 8443 -j KUBE-HP-5N7UH5JAXCVP5UJR": true, + "-A KUBE-HOSTPORTS -m comment --comment \"pod1_ns1 hostport 8081\" -m udp -p udp --dport 8081 -j KUBE-HP-7THKRFSEH4GIIXK7": true, + "-A KUBE-HOSTPORTS -m comment --comment \"pod1_ns1 hostport 8080\" -m tcp -p tcp --dport 8080 -j KUBE-HP-4YVONL46AKYWSKS3": true, + "-A OUTPUT -m comment --comment \"kube hostport portals\" -m addrtype --dst-type LOCAL -j KUBE-HOSTPORTS": true, + "-A PREROUTING -m comment --comment \"kube hostport portals\" -m addrtype --dst-type LOCAL -j KUBE-HOSTPORTS": true, + "-A POSTROUTING -m comment --comment \"SNAT for localhost access to hostports\" -o cbr0 -s 127.0.0.0/8 -j MASQUERADE": true, + "-A KUBE-HP-4YVONL46AKYWSKS3 -m comment --comment \"pod1_ns1 hostport 8080\" -s 10.1.1.2/32 -j KUBE-MARK-MASQ": true, + "-A KUBE-HP-4YVONL46AKYWSKS3 -m comment --comment \"pod1_ns1 hostport 8080\" -m tcp -p tcp -j DNAT --to-destination 10.1.1.2:80": true, + "-A KUBE-HP-7THKRFSEH4GIIXK7 -m comment --comment \"pod1_ns1 hostport 8081\" -s 10.1.1.2/32 -j KUBE-MARK-MASQ": true, + "-A KUBE-HP-7THKRFSEH4GIIXK7 -m comment --comment \"pod1_ns1 hostport 8081\" -m udp -p udp -j DNAT --to-destination 10.1.1.2:81": true, + "-A KUBE-HP-5N7UH5JAXCVP5UJR -m comment --comment \"pod3_ns1 hostport 8443\" -s 10.1.1.4/32 -j KUBE-MARK-MASQ": true, + "-A KUBE-HP-5N7UH5JAXCVP5UJR -m comment --comment \"pod3_ns1 hostport 8443\" -m tcp -p tcp -j DNAT --to-destination 10.1.1.4:443": true, + `COMMIT`: true, + } + for _, line := range lines { + if len(strings.TrimSpace(line)) > 0 { + _, ok := expectedLines[strings.TrimSpace(line)] + assert.EqualValues(t, true, ok) + } + } + + // Remove all added hostports + for _, tc := range testCases { + if !tc.expectError { + err := manager.Remove("id", tc.mapping) + assert.NoError(t, err) + } + } + + // Check Iptables-save result after deleting hostports + raw, err = iptables.Save(utiliptables.TableNAT) + assert.NoError(t, err) + lines = strings.Split(string(raw), "\n") + remainingChains := make(map[string]bool) + for _, line := range lines { + if strings.HasPrefix(line, ":") { + remainingChains[strings.TrimSpace(line)] = true + } + } + expectDeletedChains := []string{"KUBE-HP-4YVONL46AKYWSKS3", "KUBE-HP-7THKRFSEH4GIIXK7", "KUBE-HP-5N7UH5JAXCVP5UJR"} + for _, chain := range expectDeletedChains { + _, ok := remainingChains[chain] + assert.EqualValues(t, false, ok) + } + + // check if all ports are closed + for _, port := range portOpener.mem { + assert.EqualValues(t, true, port.closed) + } +} From bd05e1af2b754ceca7203d9a930939dbdfddbfe1 Mon Sep 17 00:00:00 2001 From: Minhan Xia Date: Fri, 3 Feb 2017 13:11:01 -0800 Subject: [PATCH 3/4] add portmapping getter into network host --- pkg/kubelet/dockershim/docker_service.go | 73 ++++++++++++++++++++---- pkg/kubelet/kubelet.go | 2 +- pkg/kubelet/network/cni/cni_test.go | 2 + pkg/kubelet/network/plugins.go | 31 +++++++--- pkg/kubelet/network/testing/fake_host.go | 10 ++++ pkg/kubelet/networks.go | 3 + 6 files changed, 101 insertions(+), 20 deletions(-) diff --git a/pkg/kubelet/dockershim/docker_service.go b/pkg/kubelet/dockershim/docker_service.go index f206d6474cd..aa8601bd759 100644 --- a/pkg/kubelet/dockershim/docker_service.go +++ b/pkg/kubelet/dockershim/docker_service.go @@ -25,6 +25,7 @@ import ( dockertypes "github.com/docker/engine-api/types" "github.com/golang/glog" + "k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/apis/componentconfig" internalapi "k8s.io/kubernetes/pkg/kubelet/api" runtimeapi "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime" @@ -34,6 +35,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/dockertools" "k8s.io/kubernetes/pkg/kubelet/network" "k8s.io/kubernetes/pkg/kubelet/network/cni" + "k8s.io/kubernetes/pkg/kubelet/network/hostport" "k8s.io/kubernetes/pkg/kubelet/network/kubenet" "k8s.io/kubernetes/pkg/kubelet/server/streaming" "k8s.io/kubernetes/pkg/kubelet/util/cache" @@ -107,6 +109,35 @@ type NetworkPluginSettings struct { LegacyRuntimeHost network.LegacyHost } +// namespaceGetter is a wrapper around the dockerService that implements +// the network.NamespaceGetter interface. +type namespaceGetter struct { + ds *dockerService +} + +func (n *namespaceGetter) GetNetNS(containerID string) (string, error) { + return n.ds.GetNetNS(containerID) +} + +// portMappingGetter is a wrapper around the dockerService that implements +// the network.PortMappingGetter interface. +type portMappingGetter struct { + ds *dockerService +} + +func (p *portMappingGetter) GetPodPortMappings(containerID string) ([]*hostport.PortMapping, error) { + return p.ds.GetPodPortMappings(containerID) +} + +// dockerNetworkHost implements network.Host by wrapping the legacy host passed in by the kubelet +// and dockerServices which implementes the rest of the network host interfaces. +// The legacy host methods are slated for deletion. +type dockerNetworkHost struct { + network.LegacyHost + *namespaceGetter + *portMappingGetter +} + var internalLabelKeys []string = []string{containerTypeLabelKey, containerLogPathLabelKey, sandboxIDLabelKey} // NOTE: Anything passed to DockerService should be eventually handled in another way when we switch to running the shim as a different process. @@ -138,6 +169,7 @@ func NewDockerService(client dockertools.DockerInterface, seccompProfileRoot str netHost := &dockerNetworkHost{ pluginSettings.LegacyRuntimeHost, &namespaceGetter{ds}, + &portMappingGetter{ds}, } plug, err := network.InitNetworkPlugin(cniPlugins, pluginSettings.PluginName, netHost, pluginSettings.HairpinMode, pluginSettings.NonMasqueradeCIDR, pluginSettings.MTU) if err != nil { @@ -240,12 +272,6 @@ func (ds *dockerService) UpdateRuntimeConfig(runtimeConfig *runtimeapi.RuntimeCo return } -// namespaceGetter is a wrapper around the dockerService that implements -// the network.NamespaceGetter interface. -type namespaceGetter struct { - *dockerService -} - // GetNetNS returns the network namespace of the given containerID. The ID // supplied is typically the ID of a pod sandbox. This getter doesn't try // to map non-sandbox IDs to their respective sandboxes. @@ -257,12 +283,24 @@ func (ds *dockerService) GetNetNS(podSandboxID string) (string, error) { return getNetworkNamespace(r), nil } -// dockerNetworkHost implements network.Host by wrapping the legacy host -// passed in by the kubelet and adding NamespaceGetter methods. The legacy -// host methods are slated for deletion. -type dockerNetworkHost struct { - network.LegacyHost - *namespaceGetter +// GetPodPortMappings returns the port mappings of the given podSandbox ID. +func (ds *dockerService) GetPodPortMappings(podSandboxID string) ([]*hostport.PortMapping, error) { + // TODO: get portmappings from docker labels for backward compatibility + checkpoint, err := ds.checkpointHandler.GetCheckpoint(podSandboxID) + if err != nil { + return nil, err + } + + portMappings := []*hostport.PortMapping{} + for _, pm := range checkpoint.Data.PortMappings { + proto := toAPIProtocol(*pm.Protocol) + portMappings = append(portMappings, &hostport.PortMapping{ + HostPort: *pm.HostPort, + ContainerPort: *pm.ContainerPort, + Protocol: proto, + }) + } + return portMappings, nil } // Start initializes and starts components in dockerService. @@ -351,3 +389,14 @@ func (ds *dockerService) getDockerVersionFromCache() (*dockertypes.Version, erro } return dv, nil } + +func toAPIProtocol(protocol Protocol) v1.Protocol { + switch protocol { + case protocolTCP: + return v1.ProtocolTCP + case protocolUDP: + return v1.ProtocolUDP + } + glog.Warningf("Unknown protocol %q: defaulting to TCP", protocol) + return v1.ProtocolTCP +} diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 65e4bdfc2cf..c2f38a54894 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -484,7 +484,7 @@ func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *Kub } glog.Infof("Hairpin mode set to %q", klet.hairpinMode) - if plug, err := network.InitNetworkPlugin(kubeDeps.NetworkPlugins, kubeCfg.NetworkPluginName, &criNetworkHost{&networkHost{klet}}, klet.hairpinMode, klet.nonMasqueradeCIDR, int(kubeCfg.NetworkPluginMTU)); err != nil { + if plug, err := network.InitNetworkPlugin(kubeDeps.NetworkPlugins, kubeCfg.NetworkPluginName, &criNetworkHost{&networkHost{klet}, &network.NoopPortMappingGetter{}}, klet.hairpinMode, klet.nonMasqueradeCIDR, int(kubeCfg.NetworkPluginMTU)); err != nil { return nil, err } else { klet.networkPlugin = plug diff --git a/pkg/kubelet/network/cni/cni_test.go b/pkg/kubelet/network/cni/cni_test.go index c2c8a1e73de..1d76236fea8 100644 --- a/pkg/kubelet/network/cni/cni_test.go +++ b/pkg/kubelet/network/cni/cni_test.go @@ -39,6 +39,7 @@ import ( containertest "k8s.io/kubernetes/pkg/kubelet/container/testing" "k8s.io/kubernetes/pkg/kubelet/network" "k8s.io/kubernetes/pkg/kubelet/network/cni/testing" + networktest "k8s.io/kubernetes/pkg/kubelet/network/testing" utilexec "k8s.io/kubernetes/pkg/util/exec" ) @@ -111,6 +112,7 @@ func tearDownPlugin(tmpDir string) { } type fakeNetworkHost struct { + networktest.FakePortMappingGetter kubeClient clientset.Interface runtime kubecontainer.Runtime } diff --git a/pkg/kubelet/network/plugins.go b/pkg/kubelet/network/plugins.go index ec59e09d8cc..ed925562e1c 100644 --- a/pkg/kubelet/network/plugins.go +++ b/pkg/kubelet/network/plugins.go @@ -30,6 +30,7 @@ import ( "k8s.io/kubernetes/pkg/apis/componentconfig" "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" + "k8s.io/kubernetes/pkg/kubelet/network/hostport" utilexec "k8s.io/kubernetes/pkg/util/exec" utilsysctl "k8s.io/kubernetes/pkg/util/sysctl" ) @@ -111,9 +112,9 @@ type LegacyHost interface { // Only used for hostport management GetRuntime() kubecontainer.Runtime - // SupportsLegacyFeaturs returns true if this host can support hostports - // and bandwidth shaping. Both will either get added to CNI or dropped, - // so differnt implementations can choose to ignore them. + // SupportsLegacyFeatures returns true if the network host support GetPodByName, KubeClient interface and kubelet + // runtime interface. These interfaces will no longer be implemented by CRI shims. + // This function helps network plugins to choose their behavior based on runtime. SupportsLegacyFeatures() bool } @@ -121,17 +122,19 @@ type LegacyHost interface { // TODO(#35457): get rid of this backchannel to the kubelet. The scope of // the back channel is restricted to host-ports/testing, and restricted // to kubenet. No other network plugin wrapper needs it. Other plugins -// only require a way to access namespace information, which they can do -// directly through the embedded NamespaceGetter. +// only require a way to access namespace information and port mapping +// information , which they can do directly through the embedded interfaces. type Host interface { // NamespaceGetter is a getter for sandbox namespace information. - // It's the only part of this interface that isn't currently deprecated. NamespaceGetter + // PortMappingGetter is a getter for sandbox port mapping information. + PortMappingGetter + // LegacyHost contains methods that trap back into the Kubelet. Dependence // *do not* add more dependencies in this interface. In a post-cri world, // network plugins will be invoked by the runtime shim, and should only - // require NamespaceGetter. + // require GetNetNS and GetPodPortMappings. LegacyHost } @@ -143,6 +146,14 @@ type NamespaceGetter interface { GetNetNS(containerID string) (string, error) } +// PortMappingGetter is an interface to retrieve port mapping information for a given +// sandboxID. Typically implemented by runtime shims that are closely coupled to +// CNI plugin wrappers like kubenet. +type PortMappingGetter interface { + // GetPodPortMappings returns sandbox port mappings information. + GetPodPortMappings(containerID string) ([]*hostport.PortMapping, error) +} + // InitNetworkPlugin inits the plugin that matches networkPluginName. Plugins must have unique names. func InitNetworkPlugin(plugins []NetworkPlugin, networkPluginName string, host Host, hairpinMode componentconfig.HairpinMode, nonMasqueradeCIDR string, mtu int) (NetworkPlugin, error) { if networkPluginName == "" { @@ -276,3 +287,9 @@ func GetPodIP(execer utilexec.Interface, nsenterPath, netnsPath, interfaceName s return ip, nil } + +type NoopPortMappingGetter struct{} + +func (*NoopPortMappingGetter) GetPodPortMappings(containerID string) ([]*hostport.PortMapping, error) { + return nil, nil +} diff --git a/pkg/kubelet/network/testing/fake_host.go b/pkg/kubelet/network/testing/fake_host.go index 61761a8c542..ba1922f282a 100644 --- a/pkg/kubelet/network/testing/fake_host.go +++ b/pkg/kubelet/network/testing/fake_host.go @@ -24,10 +24,12 @@ import ( "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" containertest "k8s.io/kubernetes/pkg/kubelet/container/testing" + "k8s.io/kubernetes/pkg/kubelet/network/hostport" ) type fakeNetworkHost struct { fakeNamespaceGetter + FakePortMappingGetter kubeClient clientset.Interface Legacy bool Runtime *containertest.FakeRuntime @@ -61,3 +63,11 @@ type fakeNamespaceGetter struct { func (nh *fakeNamespaceGetter) GetNetNS(containerID string) (string, error) { return nh.ns, nil } + +type FakePortMappingGetter struct { + mem map[string][]*hostport.PortMapping +} + +func (pm *FakePortMappingGetter) GetPodPortMappings(containerID string) ([]*hostport.PortMapping, error) { + return pm.mem[containerID], nil +} diff --git a/pkg/kubelet/networks.go b/pkg/kubelet/networks.go index 6481fe87209..d8382433372 100644 --- a/pkg/kubelet/networks.go +++ b/pkg/kubelet/networks.go @@ -20,6 +20,7 @@ import ( "k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" + "k8s.io/kubernetes/pkg/kubelet/network" ) // This just exports required functions from kubelet proper, for use by network @@ -54,6 +55,8 @@ func (nh *networkHost) SupportsLegacyFeatures() bool { // methods, because networkHost is slated for deletion. type criNetworkHost struct { *networkHost + // criNetworkHost currently support legacy features. Hence no need to support PortMappingGetter + *network.NoopPortMappingGetter } // GetNetNS returns the network namespace of the given containerID. From be9eca6b510c67a74f1dad5a39dd044d2b697b50 Mon Sep 17 00:00:00 2001 From: Minhan Xia Date: Fri, 3 Feb 2017 13:26:42 -0800 Subject: [PATCH 4/4] teach kubenet to use hostport_manager --- pkg/kubelet/dockershim/BUILD | 1 + pkg/kubelet/network/BUILD | 1 + pkg/kubelet/network/cni/BUILD | 1 + pkg/kubelet/network/kubenet/kubenet_linux.go | 101 ++++++++++++------- pkg/kubelet/network/testing/BUILD | 1 + 5 files changed, 69 insertions(+), 36 deletions(-) diff --git a/pkg/kubelet/dockershim/BUILD b/pkg/kubelet/dockershim/BUILD index 9977604cbbf..5e216f2b645 100644 --- a/pkg/kubelet/dockershim/BUILD +++ b/pkg/kubelet/dockershim/BUILD @@ -39,6 +39,7 @@ go_library( "//pkg/kubelet/leaky:go_default_library", "//pkg/kubelet/network:go_default_library", "//pkg/kubelet/network/cni:go_default_library", + "//pkg/kubelet/network/hostport:go_default_library", "//pkg/kubelet/network/kubenet:go_default_library", "//pkg/kubelet/qos:go_default_library", "//pkg/kubelet/server/streaming:go_default_library", diff --git a/pkg/kubelet/network/BUILD b/pkg/kubelet/network/BUILD index b20729285b0..1e493f99dc4 100644 --- a/pkg/kubelet/network/BUILD +++ b/pkg/kubelet/network/BUILD @@ -20,6 +20,7 @@ go_library( "//pkg/apis/componentconfig:go_default_library", "//pkg/client/clientset_generated/clientset:go_default_library", "//pkg/kubelet/container:go_default_library", + "//pkg/kubelet/network/hostport:go_default_library", "//pkg/util/exec:go_default_library", "//pkg/util/sysctl:go_default_library", "//vendor:github.com/golang/glog", diff --git a/pkg/kubelet/network/cni/BUILD b/pkg/kubelet/network/cni/BUILD index c3dbf3b9733..eea4163f1aa 100644 --- a/pkg/kubelet/network/cni/BUILD +++ b/pkg/kubelet/network/cni/BUILD @@ -37,6 +37,7 @@ go_test( "//pkg/kubelet/container/testing:go_default_library", "//pkg/kubelet/network:go_default_library", "//pkg/kubelet/network/cni/testing:go_default_library", + "//pkg/kubelet/network/testing:go_default_library", "//pkg/util/exec:go_default_library", "//vendor:github.com/containernetworking/cni/pkg/types", "//vendor:github.com/stretchr/testify/mock", diff --git a/pkg/kubelet/network/kubenet/kubenet_linux.go b/pkg/kubelet/network/kubenet/kubenet_linux.go index 5cab15784b8..86454c8cad6 100644 --- a/pkg/kubelet/network/kubenet/kubenet_linux.go +++ b/pkg/kubelet/network/kubenet/kubenet_linux.go @@ -89,7 +89,11 @@ type kubenetNetworkPlugin struct { execer utilexec.Interface nsenterPath string hairpinMode componentconfig.HairpinMode + // kubenet can use either hostportSyncer and hostportManager to implement hostports + // Currently, if network host supports legacy features, hostportSyncer will be used, + // otherwise, hostportManager will be used. hostportSyncer hostport.HostportSyncer + hostportManager hostport.HostPortManager iptables utiliptables.Interface sysctl utilsysctl.Interface ebtables utilebtables.Interface @@ -114,6 +118,7 @@ func NewPlugin(networkPluginDir string) network.NetworkPlugin { sysctl: sysctl, vendorDir: networkPluginDir, hostportSyncer: hostport.NewHostportSyncer(), + hostportManager: hostport.NewHostportManager(), nonMasqueradeCIDR: "10.0.0.0/8", } } @@ -356,35 +361,48 @@ func (plugin *kubenetNetworkPlugin) setup(namespace string, name string, id kube // The host can choose to not support "legacy" features. The remote // shim doesn't support it (#35457), but the kubelet does. - if !plugin.host.SupportsLegacyFeatures() { - return nil - } + if plugin.host.SupportsLegacyFeatures() { + // The first SetUpPod call creates the bridge; get a shaper for the sake of + // initialization + shaper := plugin.shaper() - // The first SetUpPod call creates the bridge; get a shaper for the sake of - // initialization - shaper := plugin.shaper() + ingress, egress, err := bandwidth.ExtractPodBandwidthResources(pod.Annotations) + if err != nil { + return fmt.Errorf("Error reading pod bandwidth annotations: %v", err) + } + if egress != nil || ingress != nil { + if err := shaper.ReconcileCIDR(fmt.Sprintf("%s/32", ip4.String()), egress, ingress); err != nil { + return fmt.Errorf("Failed to add pod to shaper: %v", err) + } + } - ingress, egress, err := bandwidth.ExtractPodBandwidthResources(pod.Annotations) - if err != nil { - return fmt.Errorf("Error reading pod bandwidth annotations: %v", err) - } - if egress != nil || ingress != nil { - if err := shaper.ReconcileCIDR(fmt.Sprintf("%s/32", ip4.String()), egress, ingress); err != nil { - return fmt.Errorf("Failed to add pod to shaper: %v", err) + // Open any hostports the pod's containers want + activePodPortMapping, err := plugin.getPodPortMapping() + if err != nil { + return err + } + + newPodPortMapping := constructPodPortMapping(pod, ip4) + if err := plugin.hostportSyncer.OpenPodHostportsAndSync(newPodPortMapping, BridgeName, activePodPortMapping); err != nil { + return err + } + } else { + portMappings, err := plugin.host.GetPodPortMappings(id.ID) + if err != nil { + return err + } + if portMappings != nil && len(portMappings) > 0 { + if err := plugin.hostportManager.Add(id.ID, &hostport.PodPortMapping{ + Namespace: namespace, + Name: name, + PortMappings: portMappings, + IP: ip4, + HostNetwork: false, + }, BridgeName); err != nil { + return err + } } } - - // Open any hostports the pod's containers want - activePodPortMapping, err := plugin.getPodPortMapping() - if err != nil { - return err - } - - newPodPortMapping := constructPodPortMapping(pod, ip4) - if err := plugin.hostportSyncer.OpenPodHostportsAndSync(newPodPortMapping, BridgeName, activePodPortMapping); err != nil { - return err - } - return nil } @@ -467,18 +485,29 @@ func (plugin *kubenetNetworkPlugin) teardown(namespace string, name string, id k // The host can choose to not support "legacy" features. The remote // shim doesn't support it (#35457), but the kubelet does. - if !plugin.host.SupportsLegacyFeatures() { - return utilerrors.NewAggregate(errList) + if plugin.host.SupportsLegacyFeatures() { + activePodPortMapping, err := plugin.getPodPortMapping() + if err == nil { + err = plugin.hostportSyncer.SyncHostports(BridgeName, activePodPortMapping) + } + if err != nil { + errList = append(errList, err) + } + } else { + portMappings, err := plugin.host.GetPodPortMappings(id.ID) + if err != nil { + errList = append(errList, err) + } else if portMappings != nil && len(portMappings) > 0 { + if err = plugin.hostportManager.Remove(id.ID, &hostport.PodPortMapping{ + Namespace: namespace, + Name: name, + PortMappings: portMappings, + HostNetwork: false, + }); err != nil { + errList = append(errList, err) + } + } } - - activePodPortMapping, err := plugin.getPodPortMapping() - if err == nil { - err = plugin.hostportSyncer.SyncHostports(BridgeName, activePodPortMapping) - } - if err != nil { - errList = append(errList, err) - } - return utilerrors.NewAggregate(errList) } diff --git a/pkg/kubelet/network/testing/BUILD b/pkg/kubelet/network/testing/BUILD index 98bccad274b..b4082c77ddf 100644 --- a/pkg/kubelet/network/testing/BUILD +++ b/pkg/kubelet/network/testing/BUILD @@ -16,6 +16,7 @@ go_library( "//pkg/client/clientset_generated/clientset:go_default_library", "//pkg/kubelet/container:go_default_library", "//pkg/kubelet/container/testing:go_default_library", + "//pkg/kubelet/network/hostport:go_default_library", ], )