Merge pull request #62003 from m1093782566/fix-nodeport

Automatic merge from submit-queue (batch tested with PRs 63787, 62003). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

Fix localport open with --nodeport-addresses specified

**What this PR does / why we need it**:

Fix localport open with --nodeport-addresses specified.

**Which issue(s) this PR fixes**:
Fixes #61953

**Special notes for your reviewer**:

@ephur

**Release note**:

```release-note
Services can listen on same host ports on different interfaces with --nodeport-addresses specified
```
This commit is contained in:
Kubernetes Submit Queue 2018-05-14 12:21:12 -07:00 committed by GitHub
commit fc28745535
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 81 additions and 44 deletions

View File

@ -960,32 +960,53 @@ func (proxier *Proxier) syncProxyRules() {
if svcInfo.NodePort != 0 {
// Hold the local port open so no other process can open it
// (because the socket might open but it would never work).
lp := utilproxy.LocalPort{
Description: "nodePort for " + svcNameString,
IP: "",
Port: svcInfo.NodePort,
Protocol: protocol,
addresses, err := utilproxy.GetNodeAddresses(proxier.nodePortAddresses, proxier.networkInterfacer)
if err != nil {
glog.Errorf("Failed to get node ip address matching nodeport cidr: %v", err)
continue
}
if proxier.portsMap[lp] != nil {
glog.V(4).Infof("Port %s was open before and is still needed", lp.String())
replacementPortsMap[lp] = proxier.portsMap[lp]
} else {
socket, err := proxier.portMapper.OpenLocalPort(&lp)
if err != nil {
glog.Errorf("can't open %s, skipping this nodePort: %v", lp.String(), err)
continue
lps := make([]utilproxy.LocalPort, 0)
for address := range addresses {
lp := utilproxy.LocalPort{
Description: "nodePort for " + svcNameString,
IP: address,
Port: svcInfo.NodePort,
Protocol: protocol,
}
if lp.Protocol == "udp" {
// TODO: We might have multiple services using the same port, and this will clear conntrack for all of them.
// This is very low impact. The NodePort range is intentionally obscure, and unlikely to actually collide with real Services.
// This only affects UDP connections, which are not common.
// See issue: https://github.com/kubernetes/kubernetes/issues/49881
err := conntrack.ClearEntriesForPort(proxier.exec, lp.Port, isIPv6, v1.ProtocolUDP)
if utilproxy.IsZeroCIDR(address) {
// Empty IP address means all
lp.IP = ""
lps = append(lps, lp)
// If we encounter a zero CIDR, then there is no point in processing the rest of the addresses.
break
}
lps = append(lps, lp)
}
// For ports on node IPs, open the actual port and hold it.
for _, lp := range lps {
if proxier.portsMap[lp] != nil {
glog.V(4).Infof("Port %s was open before and is still needed", lp.String())
replacementPortsMap[lp] = proxier.portsMap[lp]
} else {
socket, err := proxier.portMapper.OpenLocalPort(&lp)
if err != nil {
glog.Errorf("Failed to clear udp conntrack for port %d, error: %v", lp.Port, err)
glog.Errorf("can't open %s, skipping this nodePort: %v", lp.String(), err)
continue
}
if lp.Protocol == "udp" {
// TODO: We might have multiple services using the same port, and this will clear conntrack for all of them.
// This is very low impact. The NodePort range is intentionally obscure, and unlikely to actually collide with real Services.
// This only affects UDP connections, which are not common.
// See issue: https://github.com/kubernetes/kubernetes/issues/49881
err := conntrack.ClearEntriesForPort(proxier.exec, lp.Port, isIPv6, v1.ProtocolUDP)
if err != nil {
glog.Errorf("Failed to clear udp conntrack for port %d, error: %v", lp.Port, err)
}
}
replacementPortsMap[lp] = socket
}
replacementPortsMap[lp] = socket
}
if hasEndpoints {

View File

@ -1066,27 +1066,48 @@ func (proxier *Proxier) syncProxyRules() {
}
if svcInfo.NodePort != 0 {
lp := utilproxy.LocalPort{
Description: "nodePort for " + svcNameString,
IP: "",
Port: svcInfo.NodePort,
Protocol: protocol,
addresses, err := utilproxy.GetNodeAddresses(proxier.nodePortAddresses, proxier.networkInterfacer)
if err != nil {
glog.Errorf("Failed to get node ip address matching nodeport cidr: %v", err)
continue
}
if proxier.portsMap[lp] != nil {
glog.V(4).Infof("Port %s was open before and is still needed", lp.String())
replacementPortsMap[lp] = proxier.portsMap[lp]
} else {
socket, err := proxier.portMapper.OpenLocalPort(&lp)
if err != nil {
glog.Errorf("can't open %s, skipping this nodePort: %v", lp.String(), err)
continue
lps := make([]utilproxy.LocalPort, 0)
for address := range addresses {
lp := utilproxy.LocalPort{
Description: "nodePort for " + svcNameString,
IP: address,
Port: svcInfo.NodePort,
Protocol: protocol,
}
if lp.Protocol == "udp" {
isIPv6 := utilnet.IsIPv6(svcInfo.ClusterIP)
conntrack.ClearEntriesForPort(proxier.exec, lp.Port, isIPv6, clientv1.ProtocolUDP)
if utilproxy.IsZeroCIDR(address) {
// Empty IP address means all
lp.IP = ""
lps = append(lps, lp)
// If we encounter a zero CIDR, then there is no point in processing the rest of the addresses.
break
}
replacementPortsMap[lp] = socket
} // We're holding the port, so it's OK to install ipvs rules.
lps = append(lps, lp)
}
// For ports on node IPs, open the actual port and hold it.
for _, lp := range lps {
if proxier.portsMap[lp] != nil {
glog.V(4).Infof("Port %s was open before and is still needed", lp.String())
replacementPortsMap[lp] = proxier.portsMap[lp]
} else {
socket, err := proxier.portMapper.OpenLocalPort(&lp)
if err != nil {
glog.Errorf("can't open %s, skipping this nodePort: %v", lp.String(), err)
continue
}
if lp.Protocol == "udp" {
isIPv6 := utilnet.IsIPv6(svcInfo.ClusterIP)
conntrack.ClearEntriesForPort(proxier.exec, lp.Port, isIPv6, clientv1.ProtocolUDP)
}
replacementPortsMap[lp] = socket
} // We're holding the port, so it's OK to install ipvs rules.
}
// Nodeports need SNAT, unless they're local.
// ipset call
@ -1137,11 +1158,6 @@ func (proxier *Proxier) syncProxyRules() {
// Build ipvs kernel routes for each node ip address
nodeIPs := make([]net.IP, 0)
addresses, err := utilproxy.GetNodeAddresses(proxier.nodePortAddresses, proxier.networkInterfacer)
if err != nil {
glog.Errorf("Failed to get node ip address matching nodeport cidr")
continue
}
for address := range addresses {
if !utilproxy.IsZeroCIDR(address) {
nodeIPs = append(nodeIPs, net.ParseIP(address))