Hold node ports in iptables proxier

This commit is contained in:
Tim Hockin 2015-08-19 23:11:56 -07:00
parent 5087ae6c93
commit 8e503f3814

View File

@ -144,13 +144,31 @@ func newServiceInfo(service proxy.ServicePortName) *serviceInfo {
// Proxier is an iptables based proxy for connections between a localhost:lport // Proxier is an iptables based proxy for connections between a localhost:lport
// and services that provide the actual backends. // and services that provide the actual backends.
type Proxier struct { type Proxier struct {
mu sync.Mutex // protects serviceMap mu sync.Mutex // protects the following fields
serviceMap map[proxy.ServicePortName]*serviceInfo serviceMap map[proxy.ServicePortName]*serviceInfo
syncPeriod time.Duration portsMap map[localPort]closeable
iptables utiliptables.Interface
haveReceivedServiceUpdate bool // true once we've seen an OnServiceUpdate event haveReceivedServiceUpdate bool // true once we've seen an OnServiceUpdate event
haveReceivedEndpointsUpdate bool // true once we've seen an OnEndpointsUpdate event haveReceivedEndpointsUpdate bool // true once we've seen an OnEndpointsUpdate event
MasqueradeAll bool
// These are effectively const and do not need the mutex to be held.
syncPeriod time.Duration
iptables utiliptables.Interface
masqueradeAll bool
}
type localPort struct {
desc string
ip string
port int
protocol string
}
func (lp *localPort) String() string {
return fmt.Sprintf("%q (%s:%d/%s)", lp.desc, lp.ip, lp.port, lp.protocol)
}
type closeable interface {
Close() error
} }
// Proxier implements ProxyProvider // Proxier implements ProxyProvider
@ -161,8 +179,7 @@ var _ proxy.ProxyProvider = &Proxier{}
// An error will be returned if iptables fails to update or acquire the initial lock. // An error will be returned if iptables fails to update or acquire the initial lock.
// Once a proxier is created, it will keep iptables up to date in the background and // Once a proxier is created, it will keep iptables up to date in the background and
// will not terminate if a particular iptables call fails. // will not terminate if a particular iptables call fails.
func NewProxier(ipt utiliptables.Interface, exec utilexec.Interface, syncPeriod time.Duration, MasqueradeAll bool) (*Proxier, error) { func NewProxier(ipt utiliptables.Interface, exec utilexec.Interface, syncPeriod time.Duration, masqueradeAll bool) (*Proxier, error) {
// Set the route_localnet sysctl we need for // Set the route_localnet sysctl we need for
if err := setSysctl(sysctlRouteLocalnet, 1); err != nil { if err := setSysctl(sysctlRouteLocalnet, 1); err != nil {
return nil, fmt.Errorf("can't set sysctl %s: %v", sysctlRouteLocalnet, err) return nil, fmt.Errorf("can't set sysctl %s: %v", sysctlRouteLocalnet, err)
@ -178,9 +195,10 @@ func NewProxier(ipt utiliptables.Interface, exec utilexec.Interface, syncPeriod
return &Proxier{ return &Proxier{
serviceMap: make(map[proxy.ServicePortName]*serviceInfo), serviceMap: make(map[proxy.ServicePortName]*serviceInfo),
portsMap: make(map[localPort]closeable),
syncPeriod: syncPeriod, syncPeriod: syncPeriod,
iptables: ipt, iptables: ipt,
MasqueradeAll: MasqueradeAll, masqueradeAll: masqueradeAll,
}, nil }, nil
} }
@ -241,9 +259,7 @@ func (proxier *Proxier) SyncLoop() {
func() { func() {
proxier.mu.Lock() proxier.mu.Lock()
defer proxier.mu.Unlock() defer proxier.mu.Unlock()
if err := proxier.syncProxyRules(); err != nil { proxier.syncProxyRules()
glog.Errorf("Failed to sync iptables rules: %v", err)
}
}() }()
} }
} }
@ -259,17 +275,24 @@ func (proxier *Proxier) OnServiceUpdate(allServices []api.Service) {
for i := range allServices { for i := range allServices {
service := &allServices[i] service := &allServices[i]
svcName := types.NamespacedName{
Namespace: service.Namespace,
Name: service.Name,
}
// if ClusterIP is "None" or empty, skip proxying // if ClusterIP is "None" or empty, skip proxying
if !api.IsServiceIPSet(service) { if !api.IsServiceIPSet(service) {
glog.V(3).Infof("Skipping service %s due to clusterIP = %q", types.NamespacedName{Namespace: service.Namespace, Name: service.Name}, service.Spec.ClusterIP) glog.V(3).Infof("Skipping service %s due to clusterIP = %q", svcName, service.Spec.ClusterIP)
continue continue
} }
for i := range service.Spec.Ports { for i := range service.Spec.Ports {
servicePort := &service.Spec.Ports[i] servicePort := &service.Spec.Ports[i]
serviceName := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: service.Namespace, Name: service.Name}, Port: servicePort.Name} serviceName := proxy.ServicePortName{
NamespacedName: svcName,
Port: servicePort.Name,
}
activeServices[serviceName] = true activeServices[serviceName] = true
info, exists := proxier.serviceMap[serviceName] info, exists := proxier.serviceMap[serviceName]
if exists && proxier.sameConfig(info, service, servicePort) { if exists && proxier.sameConfig(info, service, servicePort) {
@ -308,9 +331,7 @@ func (proxier *Proxier) OnServiceUpdate(allServices []api.Service) {
} }
} }
if err := proxier.syncProxyRules(); err != nil { proxier.syncProxyRules()
glog.Errorf("Failed to sync iptables rules: %v", err)
}
} }
// OnEndpointsUpdate takes in a slice of updated endpoints. // OnEndpointsUpdate takes in a slice of updated endpoints.
@ -371,9 +392,7 @@ func (proxier *Proxier) OnEndpointsUpdate(allEndpoints []api.Endpoints) {
} }
} }
if err := proxier.syncProxyRules(); err != nil { proxier.syncProxyRules()
glog.Errorf("Failed to sync iptables rules: %v", err)
}
} }
// used in OnEndpointsUpdate // used in OnEndpointsUpdate
@ -404,25 +423,26 @@ func flattenValidEndpoints(endpoints []hostPortPair) []string {
hpp := &endpoints[i] hpp := &endpoints[i]
if isValidEndpoint(hpp) { if isValidEndpoint(hpp) {
result = append(result, net.JoinHostPort(hpp.host, strconv.Itoa(hpp.port))) result = append(result, net.JoinHostPort(hpp.host, strconv.Itoa(hpp.port)))
} else {
glog.Warningf("got invalid endpoint: %+v", *hpp)
} }
} }
return result return result
} }
// servicePortToServiceChain takes the ServicePortName for a service and // servicePortChainName takes the ServicePortName for a service and
// returns the associated iptables chain. This is computed by hashing (sha256) // returns the associated iptables chain. This is computed by hashing (sha256)
// then encoding to base32 and truncating with the prefix "KUBE-SVC-". We do // then encoding to base32 and truncating with the prefix "KUBE-SVC-". We do
// this because Iptables Chain Names must be <= 28 chars long, and the longer // this because Iptables Chain Names must be <= 28 chars long, and the longer
// they are the harder they are to read. // they are the harder they are to read.
func servicePortToServiceChain(s proxy.ServicePortName, protocol string) utiliptables.Chain { func servicePortChainName(s proxy.ServicePortName, protocol string) utiliptables.Chain {
hash := sha256.Sum256([]byte(s.String() + protocol)) hash := sha256.Sum256([]byte(s.String() + protocol))
encoded := base32.StdEncoding.EncodeToString(hash[:]) encoded := base32.StdEncoding.EncodeToString(hash[:])
return utiliptables.Chain("KUBE-SVC-" + encoded[:16]) return utiliptables.Chain("KUBE-SVC-" + encoded[:16])
} }
// This is the same as servicePortToServiceChain but with the endpoint // This is the same as servicePortChainName but with the endpoint included.
// included. func servicePortEndpointChainName(s proxy.ServicePortName, protocol string, endpoint string) utiliptables.Chain {
func servicePortAndEndpointToServiceChain(s proxy.ServicePortName, protocol string, endpoint string) utiliptables.Chain {
hash := sha256.Sum256([]byte(s.String() + protocol + endpoint)) hash := sha256.Sum256([]byte(s.String() + protocol + endpoint))
encoded := base32.StdEncoding.EncodeToString(hash[:]) encoded := base32.StdEncoding.EncodeToString(hash[:])
return utiliptables.Chain("KUBE-SEP-" + encoded[:16]) return utiliptables.Chain("KUBE-SEP-" + encoded[:16])
@ -431,11 +451,11 @@ func servicePortAndEndpointToServiceChain(s proxy.ServicePortName, protocol stri
// This is where all of the iptables-save/restore calls happen. // This is where all of the iptables-save/restore calls happen.
// The only other iptables rules are those that are setup in iptablesInit() // The only other iptables rules are those that are setup in iptablesInit()
// assumes proxier.mu is held // assumes proxier.mu is held
func (proxier *Proxier) syncProxyRules() error { func (proxier *Proxier) syncProxyRules() {
// don't sync rules till we've received services and endpoints // don't sync rules till we've received services and endpoints
if !proxier.haveReceivedEndpointsUpdate || !proxier.haveReceivedServiceUpdate { if !proxier.haveReceivedEndpointsUpdate || !proxier.haveReceivedServiceUpdate {
glog.V(2).Info("Not syncing iptables until Services and Endpoints have been received from master") glog.V(2).Info("Not syncing iptables until Services and Endpoints have been received from master")
return nil return
} }
glog.V(3).Infof("Syncing iptables rules") glog.V(3).Infof("Syncing iptables rules")
@ -444,12 +464,14 @@ func (proxier *Proxier) syncProxyRules() error {
// Link the services chain. // Link the services chain.
for _, chain := range inputChains { for _, chain := range inputChains {
if _, err := proxier.iptables.EnsureChain(utiliptables.TableNAT, iptablesServicesChain); err != nil { if _, err := proxier.iptables.EnsureChain(utiliptables.TableNAT, iptablesServicesChain); err != nil {
return err glog.Errorf("Failed to ensure that chain %s exists: %v", iptablesServicesChain, err)
return
} }
comment := "kubernetes service portals" comment := "kubernetes service portals"
args := []string{"-m", "comment", "--comment", comment, "-j", string(iptablesServicesChain)} args := []string{"-m", "comment", "--comment", comment, "-j", string(iptablesServicesChain)}
if _, err := proxier.iptables.EnsureRule(utiliptables.Prepend, utiliptables.TableNAT, chain, args...); err != nil { if _, err := proxier.iptables.EnsureRule(utiliptables.Prepend, utiliptables.TableNAT, chain, args...); err != nil {
return err glog.Errorf("Failed to ensure that chain %s jumps to %s: %v", chain, iptablesServicesChain, err)
return
} }
} }
// Link the output rules. // Link the output rules.
@ -457,7 +479,8 @@ func (proxier *Proxier) syncProxyRules() error {
comment := "kubernetes service traffic requiring SNAT" comment := "kubernetes service traffic requiring SNAT"
args := []string{"-m", "comment", "--comment", comment, "-m", "mark", "--mark", iptablesMasqueradeMark, "-j", "MASQUERADE"} args := []string{"-m", "comment", "--comment", comment, "-m", "mark", "--mark", iptablesMasqueradeMark, "-j", "MASQUERADE"}
if _, err := proxier.iptables.EnsureRule(utiliptables.Append, utiliptables.TableNAT, utiliptables.ChainPostrouting, args...); err != nil { if _, err := proxier.iptables.EnsureRule(utiliptables.Append, utiliptables.TableNAT, utiliptables.ChainPostrouting, args...); err != nil {
return err glog.Errorf("Failed to ensure that chain %s obeys MASQUERADE mark: %v", utiliptables.ChainPostrouting, err)
return
} }
} }
@ -491,14 +514,17 @@ func (proxier *Proxier) syncProxyRules() error {
} }
// Accumulate chains to keep. // Accumulate chains to keep.
activeChains := make(map[utiliptables.Chain]bool) // use a map as a set activeChains := map[utiliptables.Chain]bool{} // use a map as a set
// Accumulate new local ports that we have opened.
newLocalPorts := map[localPort]closeable{}
// Build rules for each service. // Build rules for each service.
for name, info := range proxier.serviceMap { for svcName, svcInfo := range proxier.serviceMap {
protocol := strings.ToLower(string(info.protocol)) protocol := strings.ToLower(string(svcInfo.protocol))
// Create the per-service chain, retaining counters if possible. // Create the per-service chain, retaining counters if possible.
svcChain := servicePortToServiceChain(name, protocol) svcChain := servicePortChainName(svcName, protocol)
if chain, ok := existingChains[svcChain]; ok { if chain, ok := existingChains[svcChain]; ok {
writeLine(chainsLines, chain) writeLine(chainsLines, chain)
} else { } else {
@ -509,12 +535,12 @@ func (proxier *Proxier) syncProxyRules() error {
// Capture the clusterIP. // Capture the clusterIP.
args := []string{ args := []string{
"-A", string(iptablesServicesChain), "-A", string(iptablesServicesChain),
"-m", "comment", "--comment", fmt.Sprintf("\"%s cluster IP\"", name.String()), "-m", "comment", "--comment", fmt.Sprintf("\"%s cluster IP\"", svcName.String()),
"-m", protocol, "-p", protocol, "-m", protocol, "-p", protocol,
"-d", fmt.Sprintf("%s/32", info.clusterIP.String()), "-d", fmt.Sprintf("%s/32", svcInfo.clusterIP.String()),
"--dport", fmt.Sprintf("%d", info.port), "--dport", fmt.Sprintf("%d", svcInfo.port),
} }
if proxier.MasqueradeAll { if proxier.masqueradeAll {
writeLine(rulesLines, append(args, writeLine(rulesLines, append(args,
"-j", "MARK", "--set-xmark", fmt.Sprintf("%s/0xffffffff", iptablesMasqueradeMark))...) "-j", "MARK", "--set-xmark", fmt.Sprintf("%s/0xffffffff", iptablesMasqueradeMark))...)
} }
@ -522,13 +548,36 @@ func (proxier *Proxier) syncProxyRules() error {
"-j", string(svcChain))...) "-j", string(svcChain))...)
// Capture externalIPs. // Capture externalIPs.
for _, externalIP := range info.externalIPs { for _, externalIP := range svcInfo.externalIPs {
// If the "external" IP happens to be an IP that is local to this
// machine, hold the local port open so no other process can open it
// (because the socket might open but it would never work).
if local, err := isLocalIP(externalIP); err != nil {
glog.Errorf("can't determine if IP is local, assuming not: %v", err)
} else if local {
lp := localPort{
desc: "externalIP for " + svcName.String(),
ip: externalIP,
port: svcInfo.port,
protocol: protocol,
}
if proxier.portsMap[lp] != nil {
newLocalPorts[lp] = proxier.portsMap[lp]
} else {
socket, err := openLocalPort(&lp)
if err != nil {
glog.Errorf("can't open %s, skipping this externalIP: %v", lp.String(), err)
continue
}
newLocalPorts[lp] = socket
}
} // We're holding the port, so it's OK to install iptables rules.
args := []string{ args := []string{
"-A", string(iptablesServicesChain), "-A", string(iptablesServicesChain),
"-m", "comment", "--comment", fmt.Sprintf("\"%s external IP\"", name.String()), "-m", "comment", "--comment", fmt.Sprintf("\"%s external IP\"", svcName.String()),
"-m", protocol, "-p", protocol, "-m", protocol, "-p", protocol,
"-d", fmt.Sprintf("%s/32", externalIP), "-d", fmt.Sprintf("%s/32", externalIP),
"--dport", fmt.Sprintf("%d", info.port), "--dport", fmt.Sprintf("%d", svcInfo.port),
} }
// We have to SNAT packets to external IPs. // We have to SNAT packets to external IPs.
writeLine(rulesLines, append(args, writeLine(rulesLines, append(args,
@ -551,14 +600,14 @@ func (proxier *Proxier) syncProxyRules() error {
} }
// Capture load-balancer ingress. // Capture load-balancer ingress.
for _, ingress := range info.loadBalancerStatus.Ingress { for _, ingress := range svcInfo.loadBalancerStatus.Ingress {
if ingress.IP != "" { if ingress.IP != "" {
args := []string{ args := []string{
"-A", string(iptablesServicesChain), "-A", string(iptablesServicesChain),
"-m", "comment", "--comment", fmt.Sprintf("\"%s loadbalancer IP\"", name.String()), "-m", "comment", "--comment", fmt.Sprintf("\"%s loadbalancer IP\"", svcName.String()),
"-m", protocol, "-p", protocol, "-m", protocol, "-p", protocol,
"-d", fmt.Sprintf("%s/32", ingress.IP), "-d", fmt.Sprintf("%s/32", ingress.IP),
"--dport", fmt.Sprintf("%d", info.port), "--dport", fmt.Sprintf("%d", svcInfo.port),
} }
// We have to SNAT packets from external IPs. // We have to SNAT packets from external IPs.
writeLine(rulesLines, append(args, writeLine(rulesLines, append(args,
@ -571,20 +620,38 @@ func (proxier *Proxier) syncProxyRules() error {
// Capture nodeports. If we had more than 2 rules it might be // Capture nodeports. If we had more than 2 rules it might be
// worthwhile to make a new per-service chain for nodeport rules, but // worthwhile to make a new per-service chain for nodeport rules, but
// with just 2 rules it ends up being a waste and a cognitive burden. // with just 2 rules it ends up being a waste and a cognitive burden.
if info.nodePort != 0 { if svcInfo.nodePort != 0 {
// Hold the local port open so no other process can open it
// (because the socket might open but it would never work).
lp := localPort{
desc: "nodePort for " + svcName.String(),
ip: "",
port: svcInfo.nodePort,
protocol: protocol,
}
if proxier.portsMap[lp] != nil {
newLocalPorts[lp] = proxier.portsMap[lp]
} else {
socket, err := openLocalPort(&lp)
if err != nil {
glog.Errorf("can't open %s, skipping this nodePort: %v", lp.String(), err)
continue
}
newLocalPorts[lp] = socket
} // We're holding the port, so it's OK to install iptables rules.
// Nodeports need SNAT. // Nodeports need SNAT.
writeLine(rulesLines, writeLine(rulesLines,
"-A", string(iptablesNodePortsChain), "-A", string(iptablesNodePortsChain),
"-m", "comment", "--comment", name.String(), "-m", "comment", "--comment", svcName.String(),
"-m", protocol, "-p", protocol, "-m", protocol, "-p", protocol,
"--dport", fmt.Sprintf("%d", info.nodePort), "--dport", fmt.Sprintf("%d", svcInfo.nodePort),
"-j", "MARK", "--set-xmark", fmt.Sprintf("%s/0xffffffff", iptablesMasqueradeMark)) "-j", "MARK", "--set-xmark", fmt.Sprintf("%s/0xffffffff", iptablesMasqueradeMark))
// Jump to the service chain. // Jump to the service chain.
writeLine(rulesLines, writeLine(rulesLines,
"-A", string(iptablesNodePortsChain), "-A", string(iptablesNodePortsChain),
"-m", "comment", "--comment", name.String(), "-m", "comment", "--comment", svcName.String(),
"-m", protocol, "-p", protocol, "-m", protocol, "-p", protocol,
"--dport", fmt.Sprintf("%d", info.nodePort), "--dport", fmt.Sprintf("%d", svcInfo.nodePort),
"-j", string(svcChain)) "-j", string(svcChain))
} }
@ -592,9 +659,9 @@ func (proxier *Proxier) syncProxyRules() error {
// can group rules together. // can group rules together.
endpoints := make([]string, 0) endpoints := make([]string, 0)
endpointChains := make([]utiliptables.Chain, 0) endpointChains := make([]utiliptables.Chain, 0)
for _, ep := range info.endpoints { for _, ep := range svcInfo.endpoints {
endpoints = append(endpoints, ep) endpoints = append(endpoints, ep)
endpointChain := servicePortAndEndpointToServiceChain(name, protocol, ep) endpointChain := servicePortEndpointChainName(svcName, protocol, ep)
endpointChains = append(endpointChains, endpointChain) endpointChains = append(endpointChains, endpointChain)
// Create the endpoint chain, retaining counters if possible. // Create the endpoint chain, retaining counters if possible.
@ -607,13 +674,13 @@ func (proxier *Proxier) syncProxyRules() error {
} }
// First write session affinity rules, if applicable. // First write session affinity rules, if applicable.
if info.sessionAffinityType == api.ServiceAffinityClientIP { if svcInfo.sessionAffinityType == api.ServiceAffinityClientIP {
for _, endpointChain := range endpointChains { for _, endpointChain := range endpointChains {
writeLine(rulesLines, writeLine(rulesLines,
"-A", string(svcChain), "-A", string(svcChain),
"-m", "comment", "--comment", name.String(), "-m", "comment", "--comment", svcName.String(),
"-m", "recent", "--name", string(endpointChain), "-m", "recent", "--name", string(endpointChain),
"--rcheck", "--seconds", fmt.Sprintf("%d", info.stickyMaxAgeSeconds), "--reap", "--rcheck", "--seconds", fmt.Sprintf("%d", svcInfo.stickyMaxAgeSeconds), "--reap",
"-j", string(endpointChain)) "-j", string(endpointChain))
} }
} }
@ -624,7 +691,7 @@ func (proxier *Proxier) syncProxyRules() error {
// Balancing rules in the per-service chain. // Balancing rules in the per-service chain.
args := []string{ args := []string{
"-A", string(svcChain), "-A", string(svcChain),
"-m", "comment", "--comment", name.String(), "-m", "comment", "--comment", svcName.String(),
} }
if i < (n - 1) { if i < (n - 1) {
// Each rule is a probabilistic match. // Each rule is a probabilistic match.
@ -640,7 +707,7 @@ func (proxier *Proxier) syncProxyRules() error {
// Rules in the per-endpoint chain. // Rules in the per-endpoint chain.
args = []string{ args = []string{
"-A", string(endpointChain), "-A", string(endpointChain),
"-m", "comment", "--comment", name.String(), "-m", "comment", "--comment", svcName.String(),
} }
// Handle traffic that loops back to the originator with SNAT. // Handle traffic that loops back to the originator with SNAT.
// Technically we only need to do this if the endpoint is on this // Technically we only need to do this if the endpoint is on this
@ -652,7 +719,7 @@ func (proxier *Proxier) syncProxyRules() error {
"-j", "MARK", "--set-xmark", fmt.Sprintf("%s/0xffffffff", iptablesMasqueradeMark))...) "-j", "MARK", "--set-xmark", fmt.Sprintf("%s/0xffffffff", iptablesMasqueradeMark))...)
// Update client-affinity lists. // Update client-affinity lists.
if info.sessionAffinityType == api.ServiceAffinityClientIP { if svcInfo.sessionAffinityType == api.ServiceAffinityClientIP {
args = append(args, "-m", "recent", "--name", string(endpointChain), "--set") args = append(args, "-m", "recent", "--name", string(endpointChain), "--set")
} }
// DNAT to final destination. // DNAT to final destination.
@ -694,7 +761,23 @@ func (proxier *Proxier) syncProxyRules() error {
// NOTE: NoFlushTables is used so we don't flush non-kubernetes chains in the table. // NOTE: NoFlushTables is used so we don't flush non-kubernetes chains in the table.
lines := append(chainsLines.Bytes(), rulesLines.Bytes()...) lines := append(chainsLines.Bytes(), rulesLines.Bytes()...)
glog.V(3).Infof("Syncing rules: %s", lines) glog.V(3).Infof("Syncing rules: %s", lines)
return proxier.iptables.Restore(utiliptables.TableNAT, lines, utiliptables.NoFlushTables, utiliptables.RestoreCounters) err = proxier.iptables.Restore(utiliptables.TableNAT, lines, utiliptables.NoFlushTables, utiliptables.RestoreCounters)
if err != nil {
glog.Errorf("Failed to sync iptables rules: %v", err)
// Revert new local ports.
for k, v := range newLocalPorts {
glog.Errorf("Closing local port %s", k.String())
v.Close()
}
} else {
// Close old local ports and save new ones.
for k, v := range proxier.portsMap {
if newLocalPorts[k] == nil {
v.Close()
}
}
proxier.portsMap = newLocalPorts
}
} }
// Join all words with spaces, terminate with newline and write to buf. // Join all words with spaces, terminate with newline and write to buf.
@ -736,3 +819,58 @@ func getChainLines(table utiliptables.Table, save []byte) map[utiliptables.Chain
} }
return chainsMap return chainsMap
} }
func isLocalIP(ip string) (bool, error) {
addrs, err := net.InterfaceAddrs()
if err != nil {
return false, err
}
for i := range addrs {
intf, _, err := net.ParseCIDR(addrs[i].String())
if err != nil {
return false, err
}
if net.ParseIP(ip).Equal(intf) {
return true, nil
}
}
return false, nil
}
func openLocalPort(lp *localPort) (closeable, error) {
// For ports on node IPs, open the actual port and hold it, even though we
// use iptables to redirect traffic.
// This ensures a) that it's safe to use that port and b) that (a) stays
// true. The risk is that some process on the node (e.g. sshd or kubelet)
// is using a port and we give that same port out to a Service. That would
// be bad because iptables would silently claim the traffic but the process
// would never know.
// NOTE: We should not need to have a real listen()ing socket - bind()
// should be enough, but I can't figure out a way to e2e test without
// it. Tools like 'ss' and 'netstat' do not show sockets that are
// bind()ed but not listen()ed, and at least the default debian netcat
// has no way to avoid about 10 seconds of retries.
var socket closeable
switch lp.protocol {
case "tcp":
listener, err := net.Listen("tcp", net.JoinHostPort(lp.ip, strconv.Itoa(lp.port)))
if err != nil {
return nil, err
}
socket = listener
case "udp":
addr, err := net.ResolveUDPAddr("udp", net.JoinHostPort(lp.ip, strconv.Itoa(lp.port)))
if err != nil {
return nil, err
}
conn, err := net.ListenUDP("udp", addr)
if err != nil {
return nil, err
}
socket = conn
default:
return nil, fmt.Errorf("unknown protocol %q", lp.protocol)
}
glog.V(2).Infof("Opened local port %s", lp.String())
return socket, nil
}