diff --git a/pkg/kubelet/network/hostport/BUILD b/pkg/kubelet/network/hostport/BUILD index 64d9b90702f..79e9d6873b5 100644 --- a/pkg/kubelet/network/hostport/BUILD +++ b/pkg/kubelet/network/hostport/BUILD @@ -12,6 +12,8 @@ go_library( name = "go_default_library", srcs = [ "fake_iptables.go", + "hostport.go", + "hostport_manager.go", "hostport_syncer.go", ], tags = ["automanaged"], @@ -22,17 +24,23 @@ go_library( "//pkg/util/exec:go_default_library", "//pkg/util/iptables:go_default_library", "//vendor:github.com/golang/glog", + "//vendor:k8s.io/apimachinery/pkg/util/errors", ], ) go_test( name = "go_default_test", - srcs = ["hostport_syncer_test.go"], + srcs = [ + "hostport_manager_test.go", + "hostport_syncer_test.go", + "hostport_test.go", + ], library = ":go_default_library", tags = ["automanaged"], deps = [ "//pkg/api/v1:go_default_library", "//pkg/util/iptables:go_default_library", + "//vendor:github.com/stretchr/testify/assert", ], ) diff --git a/pkg/kubelet/network/hostport/hostport.go b/pkg/kubelet/network/hostport/hostport.go new file mode 100644 index 00000000000..374df8eb6c8 --- /dev/null +++ b/pkg/kubelet/network/hostport/hostport.go @@ -0,0 +1,171 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package hostport + +import ( + "fmt" + "github.com/golang/glog" + "net" + "strings" + + "k8s.io/kubernetes/pkg/api/v1" + utiliptables "k8s.io/kubernetes/pkg/util/iptables" +) + +const ( + // the hostport chain + kubeHostportsChain utiliptables.Chain = "KUBE-HOSTPORTS" + // prefix for hostport chains + kubeHostportChainPrefix string = "KUBE-HP-" +) + +// PortMapping represents a network port in a container +type PortMapping struct { + Name string + HostPort int32 + ContainerPort int32 + Protocol v1.Protocol + HostIP string +} + +// PodPortMapping represents a pod's network state and associated container port mappings +type PodPortMapping struct { + Namespace string + Name string + PortMappings []*PortMapping + HostNetwork bool + IP net.IP +} + +type hostport struct { + port int32 + protocol string +} + +type hostportOpener func(*hostport) (closeable, error) + +type closeable interface { + Close() error +} + +func openLocalPort(hp *hostport) (closeable, error) { + // For ports on node IPs, open the actual port and hold it, even though we + // use iptables to redirect traffic. + // This ensures a) that it's safe to use that port and b) that (a) stays + // true. The risk is that some process on the node (e.g. sshd or kubelet) + // is using a port and we give that same port out to a Service. That would + // be bad because iptables would silently claim the traffic but the process + // would never know. + // NOTE: We should not need to have a real listen()ing socket - bind() + // should be enough, but I can't figure out a way to e2e test without + // it. Tools like 'ss' and 'netstat' do not show sockets that are + // bind()ed but not listen()ed, and at least the default debian netcat + // has no way to avoid about 10 seconds of retries. + var socket closeable + switch hp.protocol { + case "tcp": + listener, err := net.Listen("tcp", fmt.Sprintf(":%d", hp.port)) + if err != nil { + return nil, err + } + socket = listener + case "udp": + addr, err := net.ResolveUDPAddr("udp", fmt.Sprintf(":%d", hp.port)) + if err != nil { + return nil, err + } + conn, err := net.ListenUDP("udp", addr) + if err != nil { + return nil, err + } + socket = conn + default: + return nil, fmt.Errorf("unknown protocol %q", hp.protocol) + } + glog.V(3).Infof("Opened local port %s", hp.String()) + return socket, nil +} + +// openHostports opens all given hostports using the given hostportOpener +// If encounter any error, clean up and return the error +// If all ports are opened successfully, return the hostport and socket mapping +// TODO: move openHostports and closeHostports into a common struct +func openHostports(portOpener hostportOpener, podPortMapping *PodPortMapping) (map[hostport]closeable, error) { + var retErr error + ports := make(map[hostport]closeable) + for _, pm := range podPortMapping.PortMappings { + if pm.HostPort <= 0 { + continue + } + hp := portMappingToHostport(pm) + socket, err := portOpener(&hp) + if err != nil { + retErr = fmt.Errorf("cannot open hostport %d for pod %s: %v", pm.HostPort, getPodFullName(podPortMapping), err) + break + } + ports[hp] = socket + } + + // If encounter any error, close all hostports that just got opened. + if retErr != nil { + for hp, socket := range ports { + if err := socket.Close(); err != nil { + glog.Errorf("Cannot clean up hostport %d for pod %s: %v", hp.port, getPodFullName(podPortMapping), err) + } + } + return nil, retErr + } + return ports, nil +} + +// portMappingToHostport creates hostport structure based on input portmapping +func portMappingToHostport(portMapping *PortMapping) hostport { + return hostport{ + port: portMapping.HostPort, + protocol: strings.ToLower(string(portMapping.Protocol)), + } +} + +// ensureKubeHostportChains ensures the KUBE-HOSTPORTS chain is setup correctly +func ensureKubeHostportChains(iptables utiliptables.Interface, natInterfaceName string) error { + glog.V(4).Info("Ensuring kubelet hostport chains") + // Ensure kubeHostportChain + if _, err := iptables.EnsureChain(utiliptables.TableNAT, kubeHostportsChain); err != nil { + return fmt.Errorf("Failed to ensure that %s chain %s exists: %v", utiliptables.TableNAT, kubeHostportsChain, err) + } + tableChainsNeedJumpServices := []struct { + table utiliptables.Table + chain utiliptables.Chain + }{ + {utiliptables.TableNAT, utiliptables.ChainOutput}, + {utiliptables.TableNAT, utiliptables.ChainPrerouting}, + } + args := []string{"-m", "comment", "--comment", "kube hostport portals", + "-m", "addrtype", "--dst-type", "LOCAL", + "-j", string(kubeHostportsChain)} + for _, tc := range tableChainsNeedJumpServices { + if _, err := iptables.EnsureRule(utiliptables.Prepend, tc.table, tc.chain, args...); err != nil { + return fmt.Errorf("Failed to ensure that %s chain %s jumps to %s: %v", tc.table, tc.chain, kubeHostportsChain, err) + } + } + // Need to SNAT traffic from localhost + args = []string{"-m", "comment", "--comment", "SNAT for localhost access to hostports", "-o", natInterfaceName, "-s", "127.0.0.0/8", "-j", "MASQUERADE"} + if _, err := iptables.EnsureRule(utiliptables.Append, utiliptables.TableNAT, utiliptables.ChainPostrouting, args...); err != nil { + return fmt.Errorf("Failed to ensure that %s chain %s jumps to MASQUERADE: %v", utiliptables.TableNAT, utiliptables.ChainPostrouting, err) + } + return nil +} diff --git a/pkg/kubelet/network/hostport/hostport_syncer.go b/pkg/kubelet/network/hostport/hostport_syncer.go index a79949c1ff4..bba67095dd8 100644 --- a/pkg/kubelet/network/hostport/hostport_syncer.go +++ b/pkg/kubelet/network/hostport/hostport_syncer.go @@ -21,26 +21,17 @@ import ( "crypto/sha256" "encoding/base32" "fmt" - "net" "strings" "time" "github.com/golang/glog" - "k8s.io/kubernetes/pkg/api/v1" iptablesproxy "k8s.io/kubernetes/pkg/proxy/iptables" utildbus "k8s.io/kubernetes/pkg/util/dbus" utilexec "k8s.io/kubernetes/pkg/util/exec" utiliptables "k8s.io/kubernetes/pkg/util/iptables" ) -const ( - // the hostport chain - kubeHostportsChain utiliptables.Chain = "KUBE-HOSTPORTS" - // prefix for hostport chains - kubeHostportChainPrefix string = "KUBE-HP-" -) - // HostportSyncer takes a list of PodPortMappings and implements hostport all at once type HostportSyncer interface { // SyncHostports gathers all hostports on node and setup iptables rules to enable them. @@ -52,26 +43,6 @@ type HostportSyncer interface { OpenPodHostportsAndSync(newPortMapping *PodPortMapping, natInterfaceName string, activePodPortMappings []*PodPortMapping) error } -// PortMapping represents a network port in a container -type PortMapping struct { - Name string - HostPort int32 - ContainerPort int32 - Protocol v1.Protocol - HostIP string -} - -// PodPortMapping represents a pod's network state and associated container port mappings -type PodPortMapping struct { - Namespace string - Name string - PortMappings []*PortMapping - HostNetwork bool - IP net.IP -} - -type hostportOpener func(*hostport) (closeable, error) - type hostportSyncer struct { hostPortMap map[hostport]closeable iptables utiliptables.Interface @@ -87,15 +58,6 @@ func NewHostportSyncer() HostportSyncer { } } -type closeable interface { - Close() error -} - -type hostport struct { - port int32 - protocol string -} - type targetPod struct { podFullName string podIP string @@ -120,7 +82,7 @@ func (h *hostportSyncer) openHostports(podHostportMapping *PodPortMapping) error } socket, err := h.portOpener(&hp) if err != nil { - retErr = fmt.Errorf("Cannot open hostport %d for pod %s: %v", port.HostPort, getPodFullName(podHostportMapping), err) + retErr = fmt.Errorf("cannot open hostport %d for pod %s: %v", port.HostPort, getPodFullName(podHostportMapping), err) break } ports[hp] = socket @@ -222,31 +184,8 @@ func (h *hostportSyncer) SyncHostports(natInterfaceName string, activePodPortMap return err } - glog.V(4).Info("Ensuring kubelet hostport chains") - // Ensure kubeHostportChain - if _, err := h.iptables.EnsureChain(utiliptables.TableNAT, kubeHostportsChain); err != nil { - return fmt.Errorf("Failed to ensure that %s chain %s exists: %v", utiliptables.TableNAT, kubeHostportsChain, err) - } - tableChainsNeedJumpServices := []struct { - table utiliptables.Table - chain utiliptables.Chain - }{ - {utiliptables.TableNAT, utiliptables.ChainOutput}, - {utiliptables.TableNAT, utiliptables.ChainPrerouting}, - } - args := []string{"-m", "comment", "--comment", "kube hostport portals", - "-m", "addrtype", "--dst-type", "LOCAL", - "-j", string(kubeHostportsChain)} - for _, tc := range tableChainsNeedJumpServices { - if _, err := h.iptables.EnsureRule(utiliptables.Prepend, tc.table, tc.chain, args...); err != nil { - return fmt.Errorf("Failed to ensure that %s chain %s jumps to %s: %v", tc.table, tc.chain, kubeHostportsChain, err) - } - } - // Need to SNAT traffic from localhost - args = []string{"-m", "comment", "--comment", "SNAT for localhost access to hostports", "-o", natInterfaceName, "-s", "127.0.0.0/8", "-j", "MASQUERADE"} - if _, err := h.iptables.EnsureRule(utiliptables.Append, utiliptables.TableNAT, utiliptables.ChainPostrouting, args...); err != nil { - return fmt.Errorf("Failed to ensure that %s chain %s jumps to MASQUERADE: %v", utiliptables.TableNAT, utiliptables.ChainPostrouting, err) - } + // Ensure KUBE-HOSTPORTS chains + ensureKubeHostportChains(h.iptables, natInterfaceName) // Get iptables-save output so we can check for existing chains and rules. // This will be a map of chain name to chain with rules as stored in iptables-save/iptables-restore @@ -341,44 +280,6 @@ func (h *hostportSyncer) SyncHostports(natInterfaceName string, activePodPortMap return nil } -func openLocalPort(hp *hostport) (closeable, error) { - // For ports on node IPs, open the actual port and hold it, even though we - // use iptables to redirect traffic. - // This ensures a) that it's safe to use that port and b) that (a) stays - // true. The risk is that some process on the node (e.g. sshd or kubelet) - // is using a port and we give that same port out to a Service. That would - // be bad because iptables would silently claim the traffic but the process - // would never know. - // NOTE: We should not need to have a real listen()ing socket - bind() - // should be enough, but I can't figure out a way to e2e test without - // it. Tools like 'ss' and 'netstat' do not show sockets that are - // bind()ed but not listen()ed, and at least the default debian netcat - // has no way to avoid about 10 seconds of retries. - var socket closeable - switch hp.protocol { - case "tcp": - listener, err := net.Listen("tcp", fmt.Sprintf(":%d", hp.port)) - if err != nil { - return nil, err - } - socket = listener - case "udp": - addr, err := net.ResolveUDPAddr("udp", fmt.Sprintf(":%d", hp.port)) - if err != nil { - return nil, err - } - conn, err := net.ListenUDP("udp", addr) - if err != nil { - return nil, err - } - socket = conn - default: - return nil, fmt.Errorf("unknown protocol %q", hp.protocol) - } - glog.V(3).Infof("Opened local port %s", hp.String()) - return socket, nil -} - // cleanupHostportMap closes obsolete hostports func (h *hostportSyncer) cleanupHostportMap(containerPortMap map[*PortMapping]targetPod) { // compute hostports that are supposed to be open diff --git a/pkg/kubelet/network/hostport/hostport_syncer_test.go b/pkg/kubelet/network/hostport/hostport_syncer_test.go index e0ce2c5820f..b24025ac692 100644 --- a/pkg/kubelet/network/hostport/hostport_syncer_test.go +++ b/pkg/kubelet/network/hostport/hostport_syncer_test.go @@ -17,7 +17,6 @@ limitations under the License. package hostport import ( - "fmt" "net" "reflect" "strings" @@ -27,24 +26,6 @@ import ( utiliptables "k8s.io/kubernetes/pkg/util/iptables" ) -type fakeSocket struct { - port int32 - protocol string - closed bool -} - -func (f *fakeSocket) Close() error { - if f.closed { - return fmt.Errorf("Socket %q.%s already closed!", f.port, f.protocol) - } - f.closed = true - return nil -} - -func openFakeSocket(hp *hostport) (closeable, error) { - return &fakeSocket{hp.port, hp.protocol, false}, nil -} - type ruleMatch struct { hostport int chain string @@ -53,11 +34,12 @@ type ruleMatch struct { func TestOpenPodHostports(t *testing.T) { fakeIPTables := NewFakeIPTables() + fakeOpener := NewFakeSocketManager() h := &hostportSyncer{ hostPortMap: make(map[hostport]closeable), iptables: fakeIPTables, - portOpener: openFakeSocket, + portOpener: fakeOpener.openFakeSocket, } tests := []struct { diff --git a/pkg/kubelet/network/hostport/hostport_test.go b/pkg/kubelet/network/hostport/hostport_test.go new file mode 100644 index 00000000000..7c6d33c6593 --- /dev/null +++ b/pkg/kubelet/network/hostport/hostport_test.go @@ -0,0 +1,152 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package hostport + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + "k8s.io/kubernetes/pkg/api/v1" + utiliptables "k8s.io/kubernetes/pkg/util/iptables" +) + +type fakeSocket struct { + port int32 + protocol string + closed bool +} + +func (f *fakeSocket) Close() error { + if f.closed { + return fmt.Errorf("Socket %q.%s already closed!", f.port, f.protocol) + } + f.closed = true + return nil +} + +func NewFakeSocketManager() *fakeSocketManager { + return &fakeSocketManager{mem: make(map[hostport]*fakeSocket)} +} + +type fakeSocketManager struct { + mem map[hostport]*fakeSocket +} + +func (f *fakeSocketManager) openFakeSocket(hp *hostport) (closeable, error) { + if socket, ok := f.mem[*hp]; ok && !socket.closed { + return nil, fmt.Errorf("hostport is occupied") + } + fs := &fakeSocket{hp.port, hp.protocol, false} + f.mem[*hp] = fs + return fs, nil +} + +func TestOpenHostports(t *testing.T) { + opener := NewFakeSocketManager() + testCases := []struct { + podPortMapping *PodPortMapping + expectError bool + }{ + { + &PodPortMapping{ + Namespace: "ns1", + Name: "n0", + }, + false, + }, + { + &PodPortMapping{ + Namespace: "ns1", + Name: "n1", + PortMappings: []*PortMapping{ + {HostPort: 80, Protocol: v1.Protocol("TCP")}, + {HostPort: 8080, Protocol: v1.Protocol("TCP")}, + {HostPort: 443, Protocol: v1.Protocol("TCP")}, + }, + }, + false, + }, + { + &PodPortMapping{ + Namespace: "ns1", + Name: "n2", + PortMappings: []*PortMapping{ + {HostPort: 80, Protocol: v1.Protocol("TCP")}, + }, + }, + true, + }, + { + &PodPortMapping{ + Namespace: "ns1", + Name: "n3", + PortMappings: []*PortMapping{ + {HostPort: 8081, Protocol: v1.Protocol("TCP")}, + {HostPort: 8080, Protocol: v1.Protocol("TCP")}, + }, + }, + true, + }, + { + &PodPortMapping{ + Namespace: "ns1", + Name: "n3", + PortMappings: []*PortMapping{ + {HostPort: 8081, Protocol: v1.Protocol("TCP")}, + }, + }, + false, + }, + } + + for _, tc := range testCases { + mapping, err := openHostports(opener.openFakeSocket, tc.podPortMapping) + if tc.expectError { + assert.Error(t, err) + continue + } + assert.NoError(t, err) + assert.EqualValues(t, len(mapping), len(tc.podPortMapping.PortMappings)) + } +} + +func TestEnsureKubeHostportChains(t *testing.T) { + interfaceName := "cbr0" + builtinChains := []string{"PREROUTING", "OUTPUT"} + jumpRule := "-m comment --comment \"kube hostport portals\" -m addrtype --dst-type LOCAL -j KUBE-HOSTPORTS" + masqRule := "-m comment --comment \"SNAT for localhost access to hostports\" -o cbr0 -s 127.0.0.0/8 -j MASQUERADE" + + fakeIPTables := NewFakeIPTables() + assert.NoError(t, ensureKubeHostportChains(fakeIPTables, interfaceName)) + + _, _, err := fakeIPTables.getChain(utiliptables.TableNAT, utiliptables.Chain("KUBE-HOSTPORTS")) + assert.NoError(t, err) + + _, chain, err := fakeIPTables.getChain(utiliptables.TableNAT, utiliptables.ChainPostrouting) + assert.NoError(t, err) + assert.EqualValues(t, len(chain.rules), 1) + assert.Contains(t, chain.rules[0], masqRule) + + for _, chainName := range builtinChains { + _, chain, err := fakeIPTables.getChain(utiliptables.TableNAT, utiliptables.Chain(chainName)) + assert.NoError(t, err) + assert.EqualValues(t, len(chain.rules), 1) + assert.Contains(t, chain.rules[0], jumpRule) + } + +}